https://github.com/long2ice/synch
从其他数据库同步到 ClickHouse,当前支持 MySQL 与 postgres,支持全量复制与增量复制
注意: 同步的表必须有主键或非 null 唯一键或复合主键。
---安装
pip install synch
pip3 install synch
---使用
--配置文件
synch 默认从 ./synch.yaml读取配置, 或者可以使用synch -c 指定配置文件。
vim synch.yaml
core:
debug: true # when set True, will display sql information.
insert_num: 1 # how many num to submit,recommend set 20000 when production
insert_interval: 1 # how many seconds to submit,recommend set 60 when production
# enable this will auto create database `synch` in ClickHouse and insert monitor data
monitoring: true
redis:
host: redis
port: 6379
db: 0
password:
prefix: synch
sentinel: false # enable redis sentinel
sentinel_hosts: # redis sentinel hosts
- 127.0.0.1:5000
- 127.0.0.1:5001
- 127.0.0.1:5002
sentinel_master: master
queue_max_len: 200000 # stream max len, will delete redundant ones with FIFO
source_dbs:
- db_type: mysql
alias: mysql_db # must be unique
broker_type: redis # current support redis and kafka
server_id: 1
host: mysql
port: 3306
user: root
password: "123456"
# optional, auto get from `show master status` when empty
init_binlog_file:
# optional, auto get from `show master status` when empty
init_binlog_pos:
skip_dmls: # dmls to skip
skip_delete_tables: # tables skip delete, format with schema.table
skip_update_tables: # tables skip update, format with schema.table
databases:
- database: synch_mysql_test
# optional, default true, auto create database when database in clickhouse not exists
auto_create: true
tables:
- table: test
# optional, default false, if your table has decimal column with nullable, there is a bug with full data etl will, see https://github.com/ClickHouse/ClickHouse/issues/7690.
skip_decimal: false # set it true will replace decimal with string type.
# optional, default true
auto_full_etl: true # auto do full etl at first when table not exists
# optional, default ReplacingMergeTree
clickhouse_engine: ReplacingMergeTree # current support MergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, ReplacingMergeTree
# optional
partition_by: # Table create partitioning by, like toYYYYMM(created_at).
# optional
settings: # Table create settings, like index_granularity=8192
# optional
sign_column: sign # need when clickhouse_engine=CollapsingMergeTree and VersionedCollapsingMergeTree, no need real in source db, will auto generate in clickhouse
# optional
version_column: # need when clickhouse_engine=VersionedCollapsingMergeTree and ReplacingMergeTree(optional), need real in source db, usually is `updated_at` with auto update.
- db_type: postgres
alias: postgres_db
broker_type: kafka # current support redis and kafka
host: postgres
port: 5432
user: postgres
password: "123456"
databases:
- database: postgres
auto_create: true
tables:
- table: test
auto_full_etl: true
clickhouse_engine: ReplacingMergeTree
sign_column: sign
version_column:
partition_by:
settings:
clickhouse:
# shard hosts when cluster, will insert by random
hosts:
- clickhouse:9000
# - clickhouse:9021
user: default
password: ''
cluster_name: # enable cluster mode when not empty, and hosts must be more than one if enable.
distributed_suffix: _all # distributed tables suffix, available in cluster
kafka:
servers:
- kafka:9092
topic_prefix: synch
# enable this to send error report, comment or delete these if not.
mail:
mailhost: smtp.gmail.com
fromaddr: long2ice@gmail.com
toaddrs:
- long2ice@gmail.com
user: long2ice@gmail.com
password: "123456"
subject: "[synch] Error logging report"
--全量复制
-在增量复制之前一般需要进行一次全量复制,或者使用--renew进行全量重建。
synch --alias mysql_db etl -h
Usage: synch etl [OPTIONS]
Make etl from source table to ClickHouse.
Options:
--schema TEXT Schema to full etl.
--renew Etl after try to drop the target tables.
-t, --table TEXT Tables to full etl.
-h, --help Show this message and exit.
-全量复制表 test.test:
synch --alias mysql_db etl --schema test --tables test
--生产
监听源库并将变动数据写入消息队列。
synch --alias mysql_db produce
--消费
从消息队列中消费数据并插入 ClickHouse,使用 --skip-error跳过错误消息
配置 auto_full_etl = True 的时候会首先尝试做一次全量复制
synch --alias mysql_db consume -h
Usage: synch consume [OPTIONS]
Consume from broker and insert into ClickHouse.
Options:
--schema TEXT Schema to consume. [required]
--skip-error Skip error rows.
--last-msg-id TEXT Redis stream last msg id or kafka msg offset, depend on
broker_type in config.
-h, --help Show this message and exit.
-消费数据库 test 并插入到ClickHouse:
synch --alias mysql_db consume --schema test
--监控
设置core.monitoring为true的时候会自动在ClickHouse创建一个synch数据库用以插入监控数据
表结构:
create table if not exists synch.log
(
alias String,
schema String,
table String,
num int,
type int, -- 1:生产者, 2:消费者
created_at DateTime
)
engine = MergeTree partition by toYYYYMM
(
created_at
)
--ClickHouse 表引擎
现在 synch 支持 MergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree等引擎。
MergeTree,默认引擎,通常情况下的选择。
CollapsingMergeTree ,详情参考CollapsingMergeTree。
VersionedCollapsingMergeTree ,详情参考VersionedCollapsingMergeTree。
ReplacingMergeTree ,详情参考ReplacingMergeTree。
# mysql2ch
cd /data/docker/tools/Dockerfile/mysql2ch
wget https://github.com/long2ice/mysql2ch/archive/v0.0.3.zip
unzip v0.0.3.zip && rm -rf v0.0.3.zip && cd mysql2ch-0.0.3
# Linux当前目录所有文件移动到上一级目录
mv * ../
cd ../ && rm -rf mysql2ch-0.0.3
sudo docker build -t mysql2ch .
sudo docker tag mysql2ch:latest hub.htmltoo.com:5000/db:mysql2ch
sudo docker push hub.htmltoo.com:5000/db:mysql2ch
# redis - 无密码
https://abc.htmltoo.com/thread-45629.htm
# kafka
https://abc.htmltoo.com/thread-44967.htm
# .env
# if True,will display sql information
DEBUG=True
# sentry need
ENVIRONMENT=development
MYSQL_HOST=mysql
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=wdqdmm@r
MYSQL_SERVER_ID=1
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_DB=0
CLICKHOUSE_HOST=server1
CLICKHOUSE_PORT=9002
CLICKHOUSE_PASSWORD=
CLICKHOUSE_USER=default
SENTRY_DSN=https://3450e192063d47aea7b9733d3d52585f@sentry.test.com/1
KAFKA_SERVER=kafka:9092
KAFKA_TOPIC=mysql2ch
# only these schemas to replication
SCHEMAS=abc
# only these tables to replication
TABLES=bbs
# kafka partitions mapping,which means binlog of test.test will produce to 0 partition.
PARTITIONS=abc.bbs=0;test.test2=1;
# init binlog file and position,should set first,after will read from redis.
INIT_BINLOG_FILE=mysql-bin.000004
INIT_BINLOG_POS=232459
# how many num to submit
INSERT_NUMS=20000
# how many seconds to submit
INSERT_INTERVAL=60
# producer
镜像: hub.htmltoo.com:5000/db:mysql2ch
命令: pypy3 manage.py produce
...
# consumer-abc
镜像: hub.htmltoo.com:5000/db:mysql2ch
命令: pypy3 manage.py consume --schema abc --table bbs
---特性
全量复制与实时增量复制。
支持 DML 同步与 DDL 同步, 支持增加字段、删除字段、更改字段,并且支持所有的 DML。
错误邮件通知。
支持 redis 与 kafka 作为消息队列。
支持多源数据库同时同步到 ClickHouse。
支持 ClickHouse MergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree引擎。
支持 ClickHouse 集群。
---依赖
Python >= 3.7
redis,缓存 binlog 和作为消息队列,支持 redis 集群。
kafka,使用 kafka 作为消息队列时需要。
clickhouse-jdbc-bridge, 在 postgres 执行etl命令的时候需要。