https://hub.docker.com/r/canal/canal-server
https://github.com/alibaba/canal/releases
阿里巴巴 MySQL binlog 增量订阅&消费组件
---canal工作原理
伪装成slaver,从master复制数据
---mysql开启binlog日志
vim /etc/mysql/my.cnf
server-id=1 # 指定binlog日志存储位置 log-bin=/var/lib/mysql/mysql-bin # 开启GTID模式 gtid-mode=ON # 设置主从强一致性 enforce-gtid-consistency=1 # 记录日志, 所以从库做为其他从库的主库时需要在配置文件中添加log-slave-updates参数, 同步异常慢 log-slave-updates=1 binlog_format=ROW #binlog-do-db 具体要同步的数据库 binlog-do-db = yfc
---查看binlog日志文件列表:
show binary logs;
---安装部署canal
一. canal-devloper
1.canal.properties (系统根配置文件)
-指定目的文件
canal.desinations = example
2.instance.properties(instance.properties的优先级高于canal.properties)
-实际需要修改的配置文件:
cd example
vim instance.properties
-需要修改的几处配置
#源数据库地址 canal.instance.master.address=localhotse:3306 #查询元数据库中的配置 canal.instance.master.journal.name=mysql-bin.000003 canal.instance.master.position=194 #........ #数据的账号和密码 canal.instance.dbUsername=root canal.instance.dbPassword=123456 canal.instance.connectionCharset = UTF-8
show master status
-命令列出了日志位点信息,包括binlog file,binlog position等
-回到主目录启动服务:
sh bin/startup.sh
至此:canal服务端启动成功
二.canal-adapter
注:我在搭建cklickhouse的时候没有默认没有设置账户和密码,所以账号密码不用填
srcDataSources: defaultDS: url: jdbc:mysql://localhost:3306/yfc?useUnicode=true username: root password: Aa123123 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: rdb key: mysql1 properties: jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver jdbc.url: jdbc:clickhouse://localhost:8123/yfc?useUnicode=true jdbc.username: default jdbc.password:
修改同步表文件:
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: yfc
table: sku_info
targetTable: yfc.sku_info
targetPk:
id: id
# mapAll: true
targetColumns:
id:
name:
role_id:
c_time:
test1:
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest加载顺序:bootstrap.yml (配置如下)> application.yml
三. canal-admin
执行sql脚本(使用可视化客户端需要在sql中初始化脚本canal_manager.sql)
司. 测试
# adapter定义配置部分
canal.conf:
canalServerHost: 127.0.0.1:11111 # 对应单机模式下的canal server的ip:port zookeeperHosts: slave1:2181 # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准 mqServers: slave1:6667 #or rocketmq # kafka或rocketMQ地址, 与canalServerHost不能并存 flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效 batchSize: 50 # 每次获取数据的批大小, 单位为K syncBatchSize: 1000 # 每次同步的批数量 retries: 0 # 重试次数, -1为无限重试 timeout: # 同步超时时间, 单位毫秒 mode: tcp # kafka rocketMQ # canal client的模式: tcp kafka rocketMQ srcDataSources: # 源数据库 defaultDS: # 自定义名称 url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # jdbc url username: root # jdbc 账号 password: 121212 # jdbc 密码 canalAdapters: # 适配器列表 - instance: example # canal 实例名或者 MQ topic 名 groups: # 分组列表 - groupId: g1 # 分组id, 如果是MQ模式将用到该值 outerAdapters: # 分组内适配器列表 - name: logger # 日志打印适配器 ......
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ #客户端消费模式,对应下面的consumerProperties flatMessage: true #是否以json字符串传递数据,仅对mq生效 #canalServerHost: #canal adapter直连canal server #canalServerHost在1.1.5后不起作用 #tcp mode相关属性在consumerProperties tcp中设置 #kafka、rocketmq、rabbitmq的属性也在对应的 consumerProperties中设置 zookeeperHosts: #canal server集群部署时,创建curator客户端 #tcp mode需要在consumerProperties tcp中设置 batchSize: 50 #每次获取的数据大小,单位为 k syncBatchSize: 1000 #每次同步的批数量 retries: 0 #重试次数,-1为无限次 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer #canal adapter连接的canal server canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer #canal adapter连接的kafka kafka.bootstrap.servers: 127.0.0.1:9092 kafka.enable.auto.commit: false kafka.auto.commit.interval.ms: 1000 kafka.auto.offset.reset: latest kafka.request.timeout.ms: 40000 kafka.session.timeout.ms: 30000 kafka.isolation.level: read_committed kafka.max.poll.records: 1000 # rocketMQ consumer #canal adapter连接的rocketmq rocketmq.namespace: rocketmq.namesrv.addr: 127.0.0.1:9876 rocketmq.batch.size: 1000 rocketmq.enable.message.trace: false rocketmq.customized.trace.topic: rocketmq.access.channel: rocketmq.subscribe.filter: # rabbitMQ consumer #canal adapter连接的rabbitmq rabbitmq.host: rabbitmq.virtual.host: rabbitmq.username: rabbitmq.password: rabbitmq.resource.ownerId: # srcDataSources: #数据源 # defaultDS: # url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # username: root # password: 121212 canalAdapters: - instance: example # canal instance Name or mq topic name # tcp mode:instance name # mq mode:topic name # 注意:instance name、topic name不支持通配符匹配 groups: - groupId: g1 #一份数据可被多个groupId消费 #不同groupId并发执行, #同一groupId内的adapters顺序执行 outerAdapters: - name: logger #输出到日志 # - name: rdb #输出到rdb(关系型数据库) # key: mysql1 #输出到mysql数据库 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 #输出到oracle数据库 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 #输出到postgress数据库 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase #输出到hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase # - name: es #输出到es # hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode # properties: # mode: transport # or rest # # security.auth: test:123456 # only used for rest mode # cluster.name: elasticsearch # - name: kudu #输出到kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address
1) 当新增相同MySQL下库表的同步时,调整过滤规则即可,比如如下设置,表示同步test库下的test和test2两张表。
canal.instance.filter.regex=test\\.test,test\\.test2
2) 当新增不同Mysql下库表的同步时,需要增加适配器,即按照上述配置CanalAdapter的方式再配置一套适配器。
3) MaterializeMySQL引擎方式创建ods库,实时同步MySQL ods库的表数据。
4) 全量数据导入
上述仅说明了增量同步的方式,而Canal实现全量同步数据,需要通过手动调用接口实现,命令如下:
curl http://127.0.0.1:10005/etl/rdb/mysql1/test.yml -X POST
#10005表示Adapter配置的application.yml中配置的tomcat的port值
#mysql1表示Adapter配置的application.yml中配置的outerAdapters中的参数key值
#test.yml表示要同步的表对应的yml文件,在Adatper home的conf/rdb目录下
上述全量同步过程中,如果数据量比较大,需要耐心等待一段时间。
# 案例
1 修改启动器配置: application.yml, 这里以oracle目标库为例
canal.conf: canalServerHost: 127.0.0.1:11111 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: mode: tcp # kafka rocketMQ srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true username: root password: 121212 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: rdb # 指定为rdb类型同步 key: oracle1 # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应 properties: jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc驱动名, 部分jdbc的jar包需要自行放致lib目录下 jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc url jdbc.username: mytest # jdbc username jdbc.password: m121212 # jdbc password threads: 5 # 并行执行的线程数, 默认为1
注意点:
其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件
2.RDB表映射文件
-修改 conf/rdb/mytest_user.yml文件:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example # cannal的instance或者MQ的topic groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据 outerAdapterKey: oracle1 # adapter key, 对应上面配置outAdapters中的key concurrent: true # 是否按主键hash并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!! dbMapping: database: mytest # 源数据源的database/shcema table: user # 源数据源表名 targetTable: mytest.tb_user # 目标数据源的库名.表名 targetPk: # 主键映射 id: id # 如果是复合主键可以换行映射多个 # mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准) targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填 id: name: role_id: c_time: test1:
导入的类型以目标表的元类型为准, 将自动进行类型转换
3 Mysql 库间镜像schema DDL DML同步
-修改 application.yml:
canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: rdb key: mysql1 properties: jdbc.driverClassName: com.mysql.jdbc.Driver jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true jdbc.username: root jdbc.password: 121212
-修改 conf/rdb/mytest_user.yml文件:
dataSourceKey: defaultDS destination: example outerAdapterKey: mysql1 concurrent: true dbMapping: mirrorDb: true database: mytest
其中dbMapping.database的值代表源库和目标库的schema名称,即两库的schema要一模一样
4 RDB启动
-将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar (如果是其他数据库则放入对应的驱动)
-启动canal-adapter启动器
bin/startup.sh
-验证 修改mysql mytest.user表的数据, 将会自动同步到Oracle的MYTEST.TB_USER表下面, 并会打出DML的log
---同步---
# canal-server安装配置
vi /usr/local/canal/conf/example/instance.properties
canal.instance.master.address=192.168.107.216:3306 # position info canal.instance.dbUsername=canal # username canal.instance.dbPassword=root # password
-安装目录下执行以下命令,server,instance出现下面日记说明启动成功
bash /usr/local/canal/bin/startup.sh # 启动
bash /usr/local/canal/bin/stop.sh # 关闭
bash /usr/local/canal/bin/restart.sh # 重启
-查看server日记
cat /usr/local/canal/logs/canal/canal.log | tail -n 200
-查看instance日记
cat /usr/local/canal/logs/example/example.log | tail -n 200
# canal-client安装配置
vim /usr/local/canal-adapter/conf/application.yml
#canal-server地址 canalServerHost: 127.0.0.1:11111 #同步数据源配置 srcDataSources defaultDS: #mysql连接信息 url: jdbc:mysql://192.168.107.216:3306/canal?useUnicode=true&characterEncoding=utf8&useSSL=false username: canal password: root canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: rdb #rdb类型 key: mysql1 properties: #clickhouse数据看配置 jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver jdbc.url: jdbc:clickhouse://192.168.107.216:8123/canal jdbc.username: default jdbc.password:
vim /usr/local/canal-adapter/conf/rdb/mytest_user.yml
#添加以下内容 ## Mirror schema synchronize config dataSourceKey: defaultDS #application.yml中一致 destination: example #跟application.yml中instance值一样 groupId: g1 #跟application.yml中groupId值一样 outerAdapterKey: mysql1 #跟application.yml中outerAdapters中key值一样 concurrent: true dbMapping: mirrorDb: true database: canal #要同步的mysql库
# 修改rdb文件夹中配置
#1.同步增个库配置,前提条件是mysql数据库名,跟clickhouse数据库名一致 vim /usr/local/canal-adapter/conf/rdb/mytest_user.yml #添加以下内容 ## Mirror schema synchronize config dataSourceKey: defaultDS #application.yml中一致 destination: example #跟application.yml中instance值一样 groupId: g1 #跟application.yml中groupId值一样 outerAdapterKey: mysql1 #跟application.yml中outerAdapters中key值一样 concurrent: true dbMapping: mirrorDb: true database: canal #要同步的mysql库 #2.如果mysql,clickhouse库名不一致,则要同步的表分别在rdb中新增一个配置文件,以t_download表为列 vim /usr/local/canal-adapter/conf/rdb/mytest_user.yml #添加以下内容 dataSourceKey: defaultDS # application.yml中一致 destination: example # 跟application.yml中instance值一样 groupId: g1 # 跟application.yml中groupId值一样 outerAdapterKey: mysql1 # 跟application.yml中outerAdapters中key值一样 concurrent: true dbMapping: database: clickhouse # mysql数据库名 table: t_download # mysql要同步的表 targetTable: maxwell.t_download # clickhouse中对应的表 id: id # 如果是复合主键可以换行映射多个 mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准) #targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填 # id: # name: # role_id: #c_time: #test1:
-启动
bash /usr/local/canal-adapter/bin/startup.sh
bash /usr/local/canal-adapter/bin/stop.sh
bash /usr/local/canal-adapter/bin/restart.sh
-查看日志
cat /usr/local/canal-adapter/logs/adapter/adapter.log | tail -n 200
-查看clickhouse日志
cat /var/log/clickhouse-server/clickhouse-server.log | tail -n 200
cat /var/log/clickhouse-server/clickhouse-server.err.log | tail -n 200