MySQL 实时同步,需开启 binlog
https://debezium.io/documentation/reference/1.6/connectors/mysql.html#setting-up-mysql
---安装 connectors 插件
执行 bash bin/install-plugin.sh,国内建议先配置 maven 镜像,不然容易失败 或者 慢
官方文档写着执行 sh bin/install-plugin.sh
---编写配置文件
config 目录下,新建配置文件:如 mysql-es-test.conf
因为是 实时同步,这里 job.mode = “STREAMING”,execution.parallelism 是 并发
vim config/v2.batch.config.template
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {}
}vim config/mysql-es-test.conf
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 2000
}---添加 数据源 配置
https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/MySQL-CDC#options
result_table_name 取个 临时表名,便于后续使用。
table-names 必须是 数据库.表名,base-url 必须指定 数据库。
startup.mode 默认是 INITIAL,先同步历史数据,后增量同步
source {
MySQL-CDC {
result_table_name = "t1"
server-id = 5656
username = "root"
password = "pwd"
table-names = ["db.t1"]
base-url = "jdbc:mysql://host:3306/db"
}
}添加 转换 配置,sql 比较灵活。
https://seatunnel.apache.org/docs/2.3.3/transform-v2/sql#options
函数列表请点击
https://seatunnel.apache.org/docs/2.3.3/transform-v2/sql-functions
transform {
Sql {
source_table_name = "t1"
query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
}
}---添加 输出 配置
https://seatunnel.apache.org/docs/2.3.3/connector-v2/sink/Elasticsearch#options
CDC 实时同步 es,必须配置 primary_keys
sink {
Elasticsearch {
hosts = ["host:9200"]
username = "elastic"
password = "pwd"
index = "index_t1"
# cdc required options
primary_keys = ["id"]
}
}---最终配置
vim config/mysql-es-test.conf
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 2000
}
source {
MySQL-CDC {
result_table_name = "t1"
server-id = 5656
username = "root"
password = "pwd"
table-names = ["db.t1"]
base-url = "jdbc:mysql://host:3306/db"
}
}
transform {
Sql {
source_table_name = "t1"
query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
}
}
sink {
Elasticsearch {
hosts = ["host:9200"]
username = "elastic"
password = "pwd"
index = "index_t1"
# cdc required options
primary_keys = ["id"]
}
}---启动任务
这里以 本地模式为例,另有 集群、spark、flink 模式。
./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf