canal-adapter使用,MySQL到kafka到MySQL ETL


本文以MySQL-->canalServer-->kafka-->canalAdaper-->MySQL为例介绍canal-adapter的使用


vim canal-adapter/conf/application.yml

server:
  port: 8081
logging:
  level:
    com.alibaba.otter.canal.client.adapter.rdb: INFO  #日志级别默认是DEBUG
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
 
canal.conf:
  mode: kafka # kafka rocketMQ tcp
  canalServerHost: canal1:11111
  zookeeperHosts: zk1:2181
  mqServers: kafka1:9092,kafka2:9092,kafka3:9092 #or rocketmq
  flatMessage: false   #开启了数据压缩格式,protobuf
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://mysql1:3306/testOrg?useUnicode=true
      username: rootOrg
      password: rootOrg
  canalAdapters:
  - instance: topic1 # canal 实例的名字后者是kafka的topic名字
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://mysql2:3306/test?useUnicode=true
          jdbc.username: root
          jdbc.password: root2020

vim canal-adapter/conf/rdb/mytest_user.yml

dataSourceKey: defaultDS      #和application里面的srcDataSources参数一致
destination: topic1           #canal的实例名字或者是kafka的topic名字
groupId: g1                   #和application里面的一致
outerAdapterKey: mysql1       #和application里面的一致
concurrent: true
dbMapping:
  database: testOrg               #源MySQL的数据库
  table: orders               #源MySQL的org的表orders
  targetTable: test.orders    #目标数据库和表
  targetPk:
    id: id                    #MySQL的主键
  mapAll: true                #全量映射
#  targetColumns:             #部分映射
#    id:
#    name:
#    role_id:
#    c_time:
#    test1:
#  etlCondition: "where c_time>={}" # 简单的etl处理
  commitBatch: 100


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回