Canal实战教程

1. 简介

Canal翻译为管道,主要用途是基于mysql数据库的增量日志Binlog解析,提供增量数据订阅和消费。

早起阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trgger获取增量变更。从2010年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括:

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务cache刷新

  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 的版本包括 5.1.x,5.5.x,5.6.x,5.7.x,8.0.x。

2. 工作原理

2.1 Mysql主备复制原理

  • Mysql master将数据变更二进制日志事件(binary log events,可以通过show binlog events)

  • Mysql slave 将master的binary log events拷贝到它的中继日志(relay log)

  • Mysql slave重放 relay log中事件,将数据变更反映为它自己的数据

2.2 canal 工作原理

  1. canal 模拟Mysql slave的交互协议,伪装自己为Mysql slave,向Mysql master发送dump协议

  2. Mysql master 收到dump请求,开始推送binary log给 canal

  3. canal解析binary log 对象(原始对象为byte流)

【github地址】 【wiki地址】

3. Canal架构

一个Server代表一个canal运行实例,对应一个jvm,一个instance对应一个数据队列。

instance 是canal数据同步的核心,在一个canal实例中只有启动instance才能进行数据的同步任务。一个canal server实例中够可以创建多个canal instance实例,每一个canal instance可以看成对应一个Mysql实例

instance模块:

  • eventParser:数据源接入,模拟slave协议和master进行交互,协议解析

  • eventSink:Parser和Store链接器,进行数据过滤、加工、分发的工作

  • eventStore:数据存储

  • MetaManager:增量订阅 & 消费信息管理器

4. Canal-HA 机制

HA(High Available)高可用,通常一个服务要支持高可用都需要借助于第三方的分布式同步协调服务,最常用的是zookeeper。canal实现高可用,也是依赖zookeeper的几个特性: watcher和 Ephemeral节点

canal的高可用分为两部分:canal server 和 canal client

  • canal server:为了减少对mysql dump的请求,同步server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态,也就是说,只会有一个canal server的instance处于active状态,但是当这个instance down掉后会重新选出一个canal server
  • canal client:为了保证有序性,一份instance同一时间只能有一个canal client进行 get/ack/rollback操作,否则客户端接收无法保证有序

server HA的架构图如下:

server HA架构图

大致步骤如下:

  1. canal server要启动某个canal instance时都先向zookeeper进行依次尝试启动判断(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  1. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
  1. 一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance
  1. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立连接,一旦连接不可用,会重新尝试connect。

Canal Client的方式和Canal Server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制。

5. 应用场景

5.1 同步缓存Redis/全文搜索ES

当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法。

5.2 下发任务

当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/Kafka进行任务下发,比如商品数据变更后通知商品详情页、列表页、搜索页等相关系统。

这种方式可以保证数据下发的精确性,通过 MQ 发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发 MQ 的代码,从而实现了下发归集。

5.3 数据异构

在大型网站架构中,DB都会采用分库分表来解决容量和性能问题。但是分库分表之后带来的新的问题,比如不同维度的查询或聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决这个问题。

所谓的数据异构:就是将需要join查询的多表按照某一个维度又聚合在一个DB中让你去查询,canal就是实现数据异构的手段之一。

6. Mysql配置

6.1 开启binlog

首先在Mysql的配置文件目录中查找配置文件my.cnf

1
2
3
4
xiaoyuge@xiaoyuge-2 / % cd /usr/local/mysql-8.0.23-macos10.15-x86_64/support-files 
xiaoyuge@xiaoyuge-2 support-files % ls
my.cnf mysql-log-rotate mysql.server mysqld_multi.server
xiaoyuge@xiaoyuge-2 support-files % vim my.cnf

在[mysqld]区块下添加开启binlog配置

1
2
3
4
server-id=1 #master端的ID号【必须是唯一的】;
log_bin=mysql-bin #同步的日志路径,一定注意这个目录要是mysql有权限写入的
binlog-format=row #行级,记录每次操作后每行记录的变化。
binlog-do-db=cheetah #指定库,缩小监控的范围。
1
2
#重启
service myqld restart

会发现在/var/lib/mysql下会生成两个文件mysql-bin.000001mysql-bin.index,当mysql重启或达到单个文件大小的阈值时,会新生成一个文件,编号按照顺序mysql-bin.000002以此类推。

6.1.1 扩展

binlog 日志有三种格式,可以通过binlog_format参数指定

  1. statement

    记录的内容是SQL语句原文,比如执行一条update T SET update_time=now() where id = 1,记录的内容如下:

    同步数据时,会执行记录中的SQL语句,但是有个问题,update_time=now()这里会获取当前系统时间,直接执行会导致与原库的数据不一致。

  2. row

    为了解决上述问题,我们需要指定为row,记录的内容不再是简单的SQL语句来,还包含操作的具体数据,记录内容如下:

    row格式记录的内容看不到详细信息,要通过mysql binlog工具解析出来

    update_time=now()变成了具体的时间,条件后面的@1,@2,@3都是该行数据第1~3个字段的原始值(假设这张表只有3个字段)

    这样就能保证同步数据的一致性,通常情况下都是指定为row,这样可以为数据库的恢复与同步带来更好的可靠性。

    缺点:占空间、恢复与同步消耗更多的IO资源,影响执行速度。

  1. mixed

    Mysql会判断这条SQL语句是否可能引起数据不一致,如果是,就用row格式,否则就用statement格式。

6.2 配置权限

1
2
3
CREATE USER canal IDENTIFIED BY 'canal'; 创建用户名和密码都是canal的用户
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; #授予该用户对所有数据库和表的查询、复制主节点数据的操作权限
FLUSH PRIVILEGES; #重新加载权限

7. Canal 配置

7.1 下载

【Canal官网下载地址】,下载最新版本,上传到服务器

1
2
#解压
tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz

7.2 配置

通过查看conf/canal.properties配置,需要暴露三个端口

1
2
3
canal.admin.port = 11110
canal.port = 11111
canal.metrics.pull.port = 11112

修改conf/canal.properties

1
2
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example

修改conf/example/instance.properties实例配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 配置 slaveId 自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10

# 数据库地址:自己的数据库ip+端口
canal.instance.master.address=127.0.0.1:3306

# 数据库用户名和密码
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx

#代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
canal.instance.connectionCharset = UTF-8

# 指定库和表,这里的 .* 表示 canal.instance.master.address 下面的所有数据库
canal.instance.filter.regex=.*\\..*

如果系统是1个CPU,需要将canal.instance.parse.paraller= false

7.3 启动

1
2
/Users/xiaoyuge/Software/canal.deployer-1.1.7-SNAPSHOT
sh bin/startup.sh

8. 实战

  1. 创建项目并引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <dependencies>
    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
    </dependency>
    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.6</version>
    </dependency>
    </dependencies>
  2. 样例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import java.net.InetSocketAddress;
    import java.util.List;

    /**
    * 样例代码来自官网,仅用于测试使用
    */
    public class SimpleCanalClientDemo {
    public static void main(String args[]) {
    // 创建链接:换成自己的数据库ip地址
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
    11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
    connector.connect();
    connector.subscribe(".*\\..*");
    connector.rollback();
    int totalEmptyCount = 120;
    while (emptyCount < totalEmptyCount) {
    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
    long batchId = message.getId();
    int size = message.getEntries().size();
    if (batchId == -1 || size == 0) {
    emptyCount++;
    System.out.println("empty count : " + emptyCount);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    } else {
    emptyCount = 0;
    printEntry(message.getEntries());
    }

    connector.ack(batchId); // 提交确认
    }

    System.out.println("empty too many times, exit");
    } finally {
    connector.disconnect();
    }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
    for (CanalEntry.Entry entry : entrys) {
    if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
    continue;
    }

    CanalEntry.RowChange rowChage = null;
    try {
    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
    e);
    }

    CanalEntry.EventType eventType = rowChage.getEventType();
    System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    eventType));

    for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
    if (eventType == CanalEntry.EventType.DELETE) {
    printColumn(rowData.getBeforeColumnsList());
    } else if (eventType == CanalEntry.EventType.INSERT) {
    printColumn(rowData.getAfterColumnsList());
    } else {
    System.out.println("-------&gt; before");
    printColumn(rowData.getBeforeColumnsList());
    System.out.println("-------&gt; after");
    printColumn(rowData.getAfterColumnsList());
    }
    }
    }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
    for (CanalEntry.Column column : columns) {
    System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    }
    }
    }
  3. 启动查看日志

    1
    2
    3
    4
    5
    6
    7
    empty count : 1
    empty count : 2
    empty count : 3
    empty count : 4
    empty count : 5
    empty count : 6
    ......
  4. 手动修改数据库中字段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ================&gt; binlog[mysql-bin.000002:8377] , name[cheetah,product_info] , eventType : UPDATE
    -------&gt; before
    id : 3 update=false
    name : java开发1 update=false
    price : 87.0 update=false
    create_date : 2021-03-27 22:43:31 update=false
    update_date : 2021-03-27 22:43:34 update=false
    -------&gt; after
    id : 3 update=false
    name : java开发 update=true
    price : 87.0 update=false
    create_date : 2021-03-27 22:43:31 update=false
    update_date : 2021-03-27 22:43:34 update=false

    可以看出是在 mysql-bin.000002文件中,数据库名称 cheetah ,表名 product_info,事件类型:update