canal实现mysql同步到clickhouse ETL mysql clickhouse


https://hub.docker.com/r/canal/canal-server

https://github.com/alibaba/canal/releases


阿里巴巴 MySQL binlog 增量订阅&消费组件

---canal工作原理

伪装成slaver,从master复制数据

---mysql开启binlog日志

vim   /etc/mysql/my.cnf

server-id=1
# 指定binlog日志存储位置
log-bin=/var/lib/mysql/mysql-bin
# 开启GTID模式
gtid-mode=ON
# 设置主从强一致性
enforce-gtid-consistency=1
# 记录日志, 所以从库做为其他从库的主库时需要在配置文件中添加log-slave-updates参数, 同步异常慢
log-slave-updates=1
binlog_format=ROW
#binlog-do-db 具体要同步的数据库
binlog-do-db = yfc

---查看binlog日志文件列表:

show binary logs;


---安装部署canal

一. canal-devloper

1.canal.properties (系统根配置文件)

-指定目的文件

canal.desinations = example

2.instance.properties(instance.properties的优先级高于canal.properties)

-实际需要修改的配置文件:

cd  example

vim  instance.properties

-需要修改的几处配置

#源数据库地址
canal.instance.master.address=localhotse:3306
#查询元数据库中的配置
canal.instance.master.journal.name=mysql-bin.000003
canal.instance.master.position=194
 
 
#........
#数据的账号和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8

show master status

-命令列出了日志位点信息,包括binlog file,binlog position等

-回到主目录启动服务:

sh bin/startup.sh

至此:canal服务端启动成功


二.canal-adapter

注:我在搭建cklickhouse的时候没有默认没有设置账户和密码,所以账号密码不用填

srcDataSources:
    defaultDS:
      url: jdbc:mysql://localhost:3306/yfc?useUnicode=true
      username: root
      password: Aa123123
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
          jdbc.url: jdbc:clickhouse://localhost:8123/yfc?useUnicode=true
          jdbc.username: default
          jdbc.password:

修改同步表文件:

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  database: yfc
  table: sku_info
  targetTable: yfc.sku_info
  targetPk:
    id: id
#  mapAll: true
  targetColumns:
    id:
    name:
    role_id:
    c_time:
    test1:
  etlCondition: "where c_time>={}"
  commitBatch: 3000 # 批量提交的大小
 
 
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
#  mirrorDb: true
#  database: mytest


加载顺序:bootstrap.yml (配置如下)> application.yml  


三. canal-admin

执行sql脚本(使用可视化客户端需要在sql中初始化脚本canal_manager.sql)


司. 测试


# adapter定义配置部分

canal.conf:

  canalServerHost: 127.0.0.1:11111          # 对应单机模式下的canal server的ip:port
  zookeeperHosts: slave1:2181               # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准
  mqServers: slave1:6667 #or rocketmq       # kafka或rocketMQ地址, 与canalServerHost不能并存
  flatMessage: true                         # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  batchSize: 50                             # 每次获取数据的批大小, 单位为K
  syncBatchSize: 1000                       # 每次同步的批数量
  retries: 0                                # 重试次数, -1为无限重试
  timeout:                                  # 同步超时时间, 单位毫秒
  mode: tcp # kafka rocketMQ                # canal client的模式: tcp kafka rocketMQ
  srcDataSources:                           # 源数据库
    defaultDS:                              # 自定义名称
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url 
      username: root                                            # jdbc 账号
      password: 121212                                          # jdbc 密码
  canalAdapters:                            # 适配器列表
  - instance: example                       # canal 实例名或者 MQ topic 名
    groups:                                 # 分组列表
    - groupId: g1                           # 分组id, 如果是MQ模式将用到该值
      outerAdapters:                        # 分组内适配器列表
      - name: logger                        # 日志打印适配器
......
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
 
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ      #客户端消费模式,对应下面的consumerProperties
  flatMessage: true                           #是否以json字符串传递数据,仅对mq生效
  #canalServerHost:               #canal adapter直连canal server
                                  #canalServerHost在1.1.5后不起作用
                                  #tcp mode相关属性在consumerProperties tcp中设置
                                  #kafka、rocketmq、rabbitmq的属性也在对应的 consumerProperties中设置
 
  zookeeperHosts:                 #canal server集群部署时,创建curator客户端
                                  #tcp mode需要在consumerProperties tcp中设置
  batchSize: 50                               #每次获取的数据大小,单位为 k
  syncBatchSize: 1000                         #每次同步的批数量
  retries: 0                                  #重试次数,-1为无限次
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer                      #canal adapter连接的canal server
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer                          #canal adapter连接的kafka
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer                       #canal adapter连接的rocketmq
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer                       #canal adapter连接的rabbitmq
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
 
#  srcDataSources:                 #数据源
#    defaultDS:
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
#      username: root
#      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
                      # tcp mode:instance name
                      # mq  mode:topic name
                      # 注意:instance name、topic name不支持通配符匹配
    groups:
    - groupId: g1                  #一份数据可被多个groupId消费
                                   #不同groupId并发执行,
                                   #同一groupId内的adapters顺序执行
      outerAdapters:
      - name: logger               #输出到日志
#      - name: rdb                 #输出到rdb(关系型数据库)
#        key: mysql1                     #输出到mysql数据库
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1                    #输出到oracle数据库
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1                  #输出到postgress数据库
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase               #输出到hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es                  #输出到es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu              #输出到kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address


1) 当新增相同MySQL下库表的同步时,调整过滤规则即可,比如如下设置,表示同步test库下的test和test2两张表。

canal.instance.filter.regex=test\\.test,test\\.test2

2) 当新增不同Mysql下库表的同步时,需要增加适配器,即按照上述配置CanalAdapter的方式再配置一套适配器。

3) MaterializeMySQL引擎方式创建ods库,实时同步MySQL ods库的表数据。

4) 全量数据导入

上述仅说明了增量同步的方式,而Canal实现全量同步数据,需要通过手动调用接口实现,命令如下:

curl http://127.0.0.1:10005/etl/rdb/mysql1/test.yml -X POST

#10005表示Adapter配置的application.yml中配置的tomcat的port值

#mysql1表示Adapter配置的application.yml中配置的outerAdapters中的参数key值

#test.yml表示要同步的表对应的yml文件,在Adatper home的conf/rdb目录下

上述全量同步过程中,如果数据量比较大,需要耐心等待一段时间。



# 案例

1 修改启动器配置: application.yml, 这里以oracle目标库为例

canal.conf:
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  mode: tcp # kafka rocketMQ
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
      username: root
      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb                                               # 指定为rdb类型同步
        key: oracle1                                            # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应
        properties:
          jdbc.driverClassName: oracle.jdbc.OracleDriver        # jdbc驱动名, 部分jdbc的jar包需要自行放致lib目录下
          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE        # jdbc url
          jdbc.username: mytest                                 # jdbc username
          jdbc.password: m121212                                # jdbc password
          threads: 5                                            # 并行执行的线程数, 默认为1

注意点:

    其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数

    adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件


2.RDB表映射文件

-修改 conf/rdb/mytest_user.yml文件:

dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example            # cannal的instance或者MQ的topic
groupId:                        # 对应MQ模式下的groupId, 只会同步对应groupId的数据
outerAdapterKey: oracle1        # adapter key, 对应上面配置outAdapters中的key
concurrent: true                # 是否按主键hash并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
dbMapping:
  database: mytest              # 源数据源的database/shcema
  table: user                   # 源数据源表名
  targetTable: mytest.tb_user   # 目标数据源的库名.表名
  targetPk:                     # 主键映射
    id: id                      # 如果是复合主键可以换行映射多个
#  mapAll: true                 # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
  targetColumns:                # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
    id:
    name:
    role_id:
    c_time:
    test1:

导入的类型以目标表的元类型为准, 将自动进行类型转换


3 Mysql 库间镜像schema DDL DML同步

-修改 application.yml:

canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
    outerAdapters:
    - name: rdb
    key: mysql1
    properties:
        jdbc.driverClassName: com.mysql.jdbc.Driver
        jdbc.url: jdbc:mysql://192.168.0.36/mytest?useUnicode=true
        jdbc.username: root
        jdbc.password: 121212

-修改 conf/rdb/mytest_user.yml文件:

dataSourceKey: defaultDS
destination: example
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  mirrorDb: true
  database: mytest

其中dbMapping.database的值代表源库和目标库的schema名称,即两库的schema要一模一样


4 RDB启动

-将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar (如果是其他数据库则放入对应的驱动)

-启动canal-adapter启动器

bin/startup.sh

-验证 修改mysql mytest.user表的数据, 将会自动同步到Oracle的MYTEST.TB_USER表下面, 并会打出DML的log


---同步---

# canal-server安装配置

vi /usr/local/canal/conf/example/instance.properties

canal.instance.master.address=192.168.107.216:3306 # position info
canal.instance.dbUsername=canal # username
canal.instance.dbPassword=root # password

-安装目录下执行以下命令,server,instance出现下面日记说明启动成功

bash /usr/local/canal/bin/startup.sh # 启动

bash /usr/local/canal/bin/stop.sh # 关闭

bash /usr/local/canal/bin/restart.sh # 重启

-查看server日记

cat /usr/local/canal/logs/canal/canal.log | tail -n 200

-查看instance日记

cat /usr/local/canal/logs/example/example.log | tail -n 200


# canal-client安装配置

vim /usr/local/canal-adapter/conf/application.yml

#canal-server地址
canalServerHost: 127.0.0.1:11111
#同步数据源配置
srcDataSources
defaultDS:
#mysql连接信息
      url: jdbc:mysql://192.168.107.216:3306/canal?useUnicode=true&characterEncoding=utf8&useSSL=false
      username: canal
      password: root
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: rdb   #rdb类型
        key: mysql1
        properties:
        #clickhouse数据看配置
          jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
          jdbc.url: jdbc:clickhouse://192.168.107.216:8123/canal
          jdbc.username: default
          jdbc.password:

vim /usr/local/canal-adapter/conf/rdb/mytest_user.yml

#添加以下内容
## Mirror schema synchronize config
dataSourceKey: defaultDS #application.yml中一致
destination: example  #跟application.yml中instance值一样
groupId: g1 #跟application.yml中groupId值一样
outerAdapterKey: mysql1 #跟application.yml中outerAdapters中key值一样
concurrent: true
dbMapping:
  mirrorDb: true
  database: canal #要同步的mysql库

 

# 修改rdb文件夹中配置

#1.同步增个库配置,前提条件是mysql数据库名,跟clickhouse数据库名一致
vim /usr/local/canal-adapter/conf/rdb/mytest_user.yml
#添加以下内容
## Mirror schema synchronize config
dataSourceKey: defaultDS #application.yml中一致
destination: example  #跟application.yml中instance值一样
groupId: g1 #跟application.yml中groupId值一样
outerAdapterKey: mysql1 #跟application.yml中outerAdapters中key值一样
concurrent: true
dbMapping:
  mirrorDb: true
  database: canal #要同步的mysql库
 
#2.如果mysql,clickhouse库名不一致,则要同步的表分别在rdb中新增一个配置文件,以t_download表为列
vim  /usr/local/canal-adapter/conf/rdb/mytest_user.yml
#添加以下内容
dataSourceKey: defaultDS   # application.yml中一致
destination: example    # 跟application.yml中instance值一样
groupId: g1             # 跟application.yml中groupId值一样
outerAdapterKey: mysql1  # 跟application.yml中outerAdapters中key值一样
concurrent: true
dbMapping:
  database: clickhouse  # mysql数据库名
  table: t_download   # mysql要同步的表
  targetTable: maxwell.t_download    # clickhouse中对应的表
    id: id                      # 如果是复合主键可以换行映射多个
  mapAll: true                 # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
  #targetColumns:                # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
  #  id:
   # name:
   # role_id:
   #c_time:
   #test1:

-启动

bash /usr/local/canal-adapter/bin/startup.sh

bash /usr/local/canal-adapter/bin/stop.sh

bash /usr/local/canal-adapter/bin/restart.sh

-查看日志

cat /usr/local/canal-adapter/logs/adapter/adapter.log | tail -n 200

-查看clickhouse日志

cat /var/log/clickhouse-server/clickhouse-server.log | tail -n 200

cat /var/log/clickhouse-server/clickhouse-server.err.log | tail -n 200


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回