本文以MySQL-->canalServer-->kafka-->canalAdaper-->MySQL为例介绍canal-adapter的使用
vim canal-adapter/conf/application.yml
server:
port: 8081
logging:
level:
com.alibaba.otter.canal.client.adapter.rdb: INFO #日志级别默认是DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: kafka # kafka rocketMQ tcp
canalServerHost: canal1:11111
zookeeperHosts: zk1:2181
mqServers: kafka1:9092,kafka2:9092,kafka3:9092 #or rocketmq
flatMessage: false #开启了数据压缩格式,protobuf
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://mysql1:3306/testOrg?useUnicode=true
username: rootOrg
password: rootOrg
canalAdapters:
- instance: topic1 # canal 实例的名字后者是kafka的topic名字
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://mysql2:3306/test?useUnicode=true
jdbc.username: root
jdbc.password: root2020
vim canal-adapter/conf/rdb/mytest_user.yml
dataSourceKey: defaultDS #和application里面的srcDataSources参数一致
destination: topic1 #canal的实例名字或者是kafka的topic名字
groupId: g1 #和application里面的一致
outerAdapterKey: mysql1 #和application里面的一致
concurrent: true
dbMapping:
database: testOrg #源MySQL的数据库
table: orders #源MySQL的org的表orders
targetTable: test.orders #目标数据库和表
targetPk:
id: id #MySQL的主键
mapAll: true #全量映射
# targetColumns: #部分映射
# id:
# name:
# role_id:
# c_time:
# test1:
# etlCondition: "where c_time>={}" # 简单的etl处理
commitBatch: 100
签名:这个人很懒,什么也没有留下!