mysql2clickhouse synch - clickhouse ETL clickhouse


https://github.com/long2ice/synch


从其他数据库同步到 ClickHouse,当前支持 MySQL 与 postgres,支持全量复制与增量复制

注意: 同步的表必须有主键或非 null 唯一键或复合主键。


---安装

pip install synch

pip3 install synch

---使用

--配置文件

synch 默认从 ./synch.yaml读取配置, 或者可以使用synch -c 指定配置文件。

vim   synch.yaml

core:
  debug: true # when set True, will display sql information.
  insert_num: 1 # how many num to submit,recommend set 20000 when production
  insert_interval: 1 # how many seconds to submit,recommend set 60 when production
  # enable this will auto create database `synch` in ClickHouse and insert monitor data
  monitoring: true
redis:
  host: redis
  port: 6379
  db: 0
  password:
  prefix: synch
  sentinel: false # enable redis sentinel
  sentinel_hosts: # redis sentinel hosts
    - 127.0.0.1:5000
    - 127.0.0.1:5001
    - 127.0.0.1:5002
  sentinel_master: master
  queue_max_len: 200000 # stream max len, will delete redundant ones with FIFO
source_dbs:
  - db_type: mysql
    alias: mysql_db # must be unique
    broker_type: redis # current support redis and kafka
    server_id: 1
    host: mysql
    port: 3306
    user: root
    password: "123456"
    # optional, auto get from `show master status` when empty
    init_binlog_file:
    # optional, auto get from `show master status` when empty
    init_binlog_pos:
    skip_dmls: # dmls to skip
    skip_delete_tables: # tables skip delete, format with schema.table
    skip_update_tables: # tables skip update, format with schema.table
    databases:
      - database: synch_mysql_test
        # optional, default true, auto create database when database in clickhouse not exists
        auto_create: true
        tables:
          - table: test
            # optional, default false, if your table has decimal column with nullable, there is a bug with full data etl will, see https://github.com/ClickHouse/ClickHouse/issues/7690.
            skip_decimal: false # set it true will replace decimal with string type.
            # optional, default true
            auto_full_etl: true # auto do full etl at first when table not exists
            # optional, default ReplacingMergeTree
            clickhouse_engine: ReplacingMergeTree # current support MergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, ReplacingMergeTree
            # optional
            partition_by: # Table create partitioning by, like toYYYYMM(created_at).
            # optional
            settings: # Table create settings, like index_granularity=8192
            # optional
            sign_column: sign # need when clickhouse_engine=CollapsingMergeTree and VersionedCollapsingMergeTree, no need real in source db, will auto generate in clickhouse
            # optional
            version_column: # need when clickhouse_engine=VersionedCollapsingMergeTree and ReplacingMergeTree(optional), need real in source db, usually is `updated_at` with auto update.
  - db_type: postgres
    alias: postgres_db
    broker_type: kafka # current support redis and kafka
    host: postgres
    port: 5432
    user: postgres
    password: "123456"
    databases:
      - database: postgres
        auto_create: true
        tables:
          - table: test
            auto_full_etl: true
            clickhouse_engine: ReplacingMergeTree
            sign_column: sign
            version_column:
            partition_by:
            settings:
clickhouse:
  # shard hosts when cluster, will insert by random
  hosts:
    - clickhouse:9000
#    - clickhouse:9021
  user: default
  password: ''
  cluster_name:  # enable cluster mode when not empty, and hosts must be more than one if enable.
  distributed_suffix: _all # distributed tables suffix, available in cluster
kafka:
  servers:
    - kafka:9092
  topic_prefix: synch
# enable this to send error report, comment or delete these if not.
mail:
  mailhost: smtp.gmail.com
  fromaddr: long2ice@gmail.com
  toaddrs:
    - long2ice@gmail.com
  user: long2ice@gmail.com
  password: "123456"
  subject: "[synch] Error logging report"

--全量复制

-在增量复制之前一般需要进行一次全量复制,或者使用--renew进行全量重建。

synch --alias mysql_db etl -h
Usage: synch etl [OPTIONS]
  Make etl from source table to ClickHouse.
Options:
  --schema TEXT     Schema to full etl.
  --renew           Etl after try to drop the target tables.
  -t, --table TEXT  Tables to full etl.
  -h, --help        Show this message and exit.

-全量复制表 test.test:

synch --alias mysql_db etl --schema test --tables test

--生产

监听源库并将变动数据写入消息队列。

synch --alias mysql_db produce

--消费

从消息队列中消费数据并插入 ClickHouse,使用 --skip-error跳过错误消息

配置 auto_full_etl = True 的时候会首先尝试做一次全量复制

synch --alias mysql_db consume -h
Usage: synch consume [OPTIONS]
  Consume from broker and insert into ClickHouse.
Options:
  --schema TEXT       Schema to consume.  [required]
  --skip-error        Skip error rows.
  --last-msg-id TEXT  Redis stream last msg id or kafka msg offset, depend on
                      broker_type in config.
  -h, --help          Show this message and exit.

-消费数据库 test 并插入到ClickHouse:

synch --alias mysql_db consume --schema test

--监控

设置core.monitoring为true的时候会自动在ClickHouse创建一个synch数据库用以插入监控数据

表结构:
create table if not exists synch.log
(
    alias String,
    schema String,
    table String,
    num        int,
    type       int, -- 1:生产者, 2:消费者
    created_at DateTime
)
    engine = MergeTree partition by toYYYYMM
(
    created_at
)

--ClickHouse 表引擎

现在 synch 支持 MergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree等引擎。

MergeTree,默认引擎,通常情况下的选择。

CollapsingMergeTree ,详情参考CollapsingMergeTree。

VersionedCollapsingMergeTree ,详情参考VersionedCollapsingMergeTree。

ReplacingMergeTree ,详情参考ReplacingMergeTree。


# mysql2ch

cd /data/docker/tools/Dockerfile/mysql2ch

wget https://github.com/long2ice/mysql2ch/archive/v0.0.3.zip 

unzip v0.0.3.zip  && rm -rf v0.0.3.zip && cd mysql2ch-0.0.3

# Linux当前目录所有文件移动到上一级目录

mv * ../

cd ../  && rm -rf mysql2ch-0.0.3

sudo docker build -t mysql2ch .

sudo docker tag mysql2ch:latest hub.htmltoo.com:5000/db:mysql2ch

sudo docker push hub.htmltoo.com:5000/db:mysql2ch


# redis  - 无密码

https://abc.htmltoo.com/thread-45629.htm


# kafka

https://abc.htmltoo.com/thread-44967.htm


#  .env

# if True,will display sql information
DEBUG=True
# sentry need
ENVIRONMENT=development
MYSQL_HOST=mysql
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=wdqdmm@r
MYSQL_SERVER_ID=1
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_DB=0
CLICKHOUSE_HOST=server1
CLICKHOUSE_PORT=9002
CLICKHOUSE_PASSWORD=
CLICKHOUSE_USER=default
SENTRY_DSN=https://3450e192063d47aea7b9733d3d52585f@sentry.test.com/1
KAFKA_SERVER=kafka:9092
KAFKA_TOPIC=mysql2ch
# only these schemas to replication
SCHEMAS=abc
# only these tables to replication
TABLES=bbs
# kafka partitions mapping,which means binlog of test.test will produce to 0 partition.
PARTITIONS=abc.bbs=0;test.test2=1;
# init binlog file and position,should set first,after will read from redis.
INIT_BINLOG_FILE=mysql-bin.000004
INIT_BINLOG_POS=232459
# how many num to submit
INSERT_NUMS=20000
# how many seconds to submit
INSERT_INTERVAL=60


# producer

镜像: hub.htmltoo.com:5000/db:mysql2ch

命令: pypy3 manage.py produce

...


# consumer-abc

镜像: hub.htmltoo.com:5000/db:mysql2ch

命令: pypy3 manage.py consume --schema abc --table bbs


---特性

全量复制与实时增量复制。

支持 DML 同步与 DDL 同步, 支持增加字段、删除字段、更改字段,并且支持所有的 DML。

错误邮件通知。

支持 redis 与 kafka 作为消息队列。

支持多源数据库同时同步到 ClickHouse。

支持 ClickHouse MergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree引擎。

支持 ClickHouse 集群。

---依赖

Python >= 3.7

redis,缓存 binlog 和作为消息队列,支持 redis 集群。

kafka,使用 kafka 作为消息队列时需要。

clickhouse-jdbc-bridge, 在 postgres 执行etl命令的时候需要。


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回