1. 简介
Canal翻译为管道,主要用途是基于mysql数据库的增量日志Binlog解析,提供增量数据订阅和消费。
早起阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trgger获取增量变更。从2010年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括:
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务cache刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 的版本包括 5.1.x,5.5.x,5.6.x,5.7.x,8.0.x。
2. 工作原理
2.1 Mysql主备复制原理
Mysql master将数据变更二进制日志事件(binary log events,可以通过
show binlog events
)Mysql slave 将master的
binary log events
拷贝到它的中继日志(relay log)Mysql slave重放 relay log中事件,将数据变更反映为它自己的数据
2.2 canal 工作原理
canal 模拟Mysql slave的交互协议,伪装自己为Mysql slave,向Mysql master发送dump协议
Mysql master 收到dump请求,开始推送binary log给 canal
canal解析binary log 对象(原始对象为byte流)
3. Canal架构
一个Server代表一个canal运行实例,对应一个jvm,一个instance对应一个数据队列。
instance 是canal数据同步的核心,在一个canal实例中只有启动instance才能进行数据的同步任务。一个canal server实例中够可以创建多个canal instance实例,每一个canal instance可以看成对应一个Mysql实例
instance模块:
eventParser
:数据源接入,模拟slave协议和master进行交互,协议解析eventSink
:Parser和Store链接器,进行数据过滤、加工、分发的工作eventStore
:数据存储MetaManager
:增量订阅 & 消费信息管理器
4. Canal-HA 机制
HA(High Available)高可用,通常一个服务要支持高可用都需要借助于第三方的分布式同步协调服务,最常用的是zookeeper。canal实现高可用,也是依赖zookeeper的几个特性: watcher和 Ephemeral节点
canal的高可用分为两部分:canal server 和 canal client
-
canal server
:为了减少对mysqldump
的请求,同步server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态,也就是说,只会有一个canal server
的instance处于active
状态,但是当这个instance
down掉后会重新选出一个canal server
canal client
:为了保证有序性,一份instance同一时间只能有一个canal client进行get/ack/rollback
操作,否则客户端接收无法保证有序
server HA的架构图如下:
大致步骤如下:
- canal server要启动某个canal instance时都先向zookeeper进行依次尝试启动判断(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
- 一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance
- canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立连接,一旦连接不可用,会重新尝试connect。
Canal Client的方式和Canal Server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制。
5. 应用场景
5.1 同步缓存Redis/全文搜索ES
当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法。
5.2 下发任务
当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/Kafka进行任务下发,比如商品数据变更后通知商品详情页、列表页、搜索页等相关系统。
这种方式可以保证数据下发的精确性,通过 MQ 发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发 MQ 的代码,从而实现了下发归集。
5.3 数据异构
在大型网站架构中,DB都会采用分库分表来解决容量和性能问题。但是分库分表之后带来的新的问题,比如不同维度的查询或聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决这个问题。
所谓的数据异构:就是将需要join查询的多表按照某一个维度又聚合在一个DB中让你去查询,canal就是实现数据异构的手段之一。
6. Mysql配置
6.1 开启binlog
首先在Mysql的配置文件目录中查找配置文件my.cnf
1 | xiaoyuge@xiaoyuge-2 / % cd /usr/local/mysql-8.0.23-macos10.15-x86_64/support-files |
在[mysqld]区块下添加开启binlog配置
1 | server-id=1 #master端的ID号【必须是唯一的】; |
1 | 重启 |
会发现在/var/lib/mysql
下会生成两个文件mysql-bin.000001
和mysql-bin.index
,当mysql重启或达到单个文件大小的阈值时,会新生成一个文件,编号按照顺序mysql-bin.000002
以此类推。
6.1.1 扩展
binlog 日志有三种格式,可以通过binlog_format
参数指定
statement
记录的内容是SQL语句原文,比如执行一条
update T SET update_time=now() where id = 1
,记录的内容如下:
同步数据时,会执行记录中的SQL语句,但是有个问题,
update_time=now()
这里会获取当前系统时间,直接执行会导致与原库的数据不一致。row
为了解决上述问题,我们需要指定为row,记录的内容不再是简单的SQL语句来,还包含操作的具体数据,记录内容如下:
row格式记录的内容看不到详细信息,要通过mysql binlog工具解析出来
update_time=now()
变成了具体的时间,条件后面的@1,@2,@3
都是该行数据第1~3个字段的原始值(假设这张表只有3个字段)这样就能保证同步数据的一致性,通常情况下都是指定为
row
,这样可以为数据库的恢复与同步带来更好的可靠性。缺点:占空间、恢复与同步消耗更多的IO资源,影响执行速度。
mixed
Mysql会判断这条SQL语句是否可能引起数据不一致,如果是,就用row格式,否则就用statement格式。
6.2 配置权限
1 | CREATE USER canal IDENTIFIED BY 'canal'; 创建用户名和密码都是canal的用户 |
7. Canal 配置
7.1 下载
【Canal官网下载地址】,下载最新版本,上传到服务器
1 | 解压 |
7.2 配置
通过查看conf/canal.properties
配置,需要暴露三个端口
1 | 11110 = |
修改conf/canal.properties
1 | # 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2 |
修改conf/example/instance.properties
实例配置
1 | # 配置 slaveId 自定义,不等于 mysql 的 server Id 即可 |
如果系统是1个CPU,需要将
canal.instance.parse.paraller= false
。
7.3 启动
1 | /Users/xiaoyuge/Software/canal.deployer-1.1.7-SNAPSHOT |
8. 实战
创建项目并引入依赖
1
2
3
4
5
6
7
8
9
10
11
12<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>
</dependencies>样例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
/**
* 样例代码来自官网,仅用于测试使用
*/
public class SimpleCanalClientDemo {
public static void main(String args[]) {
// 创建链接:换成自己的数据库ip地址
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}启动查看日志
1
2
3
4
5
6
7empty count : 1
empty count : 2
empty count : 3
empty count : 4
empty count : 5
empty count : 6
......手动修改数据库中字段
1
2
3
4
5
6
7
8
9
10
11
12
13================> binlog[mysql-bin.000002:8377] , name[cheetah,product_info] , eventType : UPDATE
-------> before
id : 3 update=false
name : java开发1 update=false
price : 87.0 update=false
create_date : 2021-03-27 22:43:31 update=false
update_date : 2021-03-27 22:43:34 update=false
-------> after
id : 3 update=false
name : java开发 update=true
price : 87.0 update=false
create_date : 2021-03-27 22:43:31 update=false
update_date : 2021-03-27 22:43:34 update=false可以看出是在
mysql-bin.000002
文件中,数据库名称cheetah
,表名product_info
,事件类型:update