将 MySQL 的数据实时(秒级)同步至 StarRocks,支撑企业实时分析和处理海量数据
实时同步 MySQL 至 StarRocks 分成同步库表结构、同步数据两个阶段进行。首先 StarRocks Migration Tool (数据迁移工具,以下简称 SMT) 将 MySQL 的库表结构转化成 StarRocks 的建库和建表语句。然后 Flink 集群运行 Flink job,同步 MySQL 全量及增量数据至 StarRocks。
1. 同步库表结构
SMT 根据其配置文件中源 MySQL 和目标 StarRocks 的信息,读取 MySQL 中待同步的库表结构,并生成 SQL 文件,用于在 StarRocks 内创建对应的目标库表。
2. 同步数据
Flink SQL 客户端执行导入数据的 SQL 语句(INSERT INTO SELECT语句),向 Flink 集群提交一个或者多个长时间运行的 Flink job。Flink集群运行 Flink job ,Flink cdc connector 先读取数据库的历史全量数据,然后无缝切换到增量读取,并且发给 flink-starrocks-connector,最后 flink-starrocks-connector 攒微批数据同步至 StarRocks。
注意: 仅支持同步 DML,不支持同步 DDL。
DML(Data Manipulation Language)语句:数据操纵语句,用于添加、删除、更新和查询数据库记录,并检查数据完整性,常用的语句关键字主要包括 insert、delete、update 和select 等。(增添改查)
DDL(Data Definition Languages)语句:数据定义语言,这些语句定义了不同的数据段、数据库、表、列、索引等数据库对象的定义。常用的语句关键字主要包括 create、drop、alter等。
3. 业务场景
以商品累计销量实时榜单为例,存储在 MySQL 中的原始订单表,通过 Flink 处理计算出产品销量的实时排行,并实时同步至 StarRocks 的主键模型表中。最终用户可以通过可视化工具连接StarRocks查看到实时刷新的榜单。
4. 开启 MySQL Binlog 日志
vim /etc/my.cnf
# 开启 Binlog 日志
log_bin = ON
# 设置 Binlog 的存储位置
log_bin =/var/lib/mysql/mysql-bin
# 设置 server_id
# 在 MySQL 5.7.3 及以后版本,如果没有 server_id,那么设置 binlog 后无法开启 MySQL 服务
server_id = 1
# 设置 Binlog 模式为 ROW
binlog_format = ROW
# binlog 日志的基本文件名,后面会追加标识来表示每一个 Binlog 文件
log_bin_basename =/var/lib/mysql/mysql-bin
# binlog 文件的索引文件,管理所有 Binlog 文件的目录
log_bin_index =/var/lib/mysql/mysql-bin.index
确认是否已经开启 Binlog:
SHOW VARIABLES LIKE 'log_bin';
5. SMT
下载并解压 SMT,并将其放在 flink-1.14.5 目录下
wget https://releases.starrocks.io/resources/smt.tar.gz
6. 同步库表结构
1) 配置 SMT 配置文件, 例如源 MySQL 连接信息、待同步库表的匹配规则,flink-starrocks-connector 配置信息等
vim flink/conf/config_prod.conf
[db]
host = xxx.xx.xxx.xx
port = 3306
user = user1
password = xxxxxx
[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = true
# file to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^demo.*$
# pattern to match tables for setting properties
table = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000
[table-rule] :库表匹配规则,以及对应的flink-connector-starrocks 配置。
如果需要为不同表匹配不同的 flink-connector-starrocks 配置,例如部分表更新频繁,需要提高导入速度,请参见补充说明。
如果需要将 MySQL 分库分表后的多张表导入至 StarRocks的一张表中,请参见补充说明。
database、table:MySQL 中同步对象的库表名,支持正则表达式。
flink.starrocks.* :flink-connector-starrocks 的配置信息,更多配置和说明,请参见 Flink-connector-starrocks。
[other] :其他信息
be_num: StarRocks 集群的 BE 节点数(后续生成的 StarRocks 建表 SQL 文件会参考该参数,设置合理的分桶数量)。
use_decimal_v3:是否开启 decimalV3。开启后,MySQL 小数类型的数据同步至 StarRocks 时会转换为 decimalV3。
output_dir :待生成的 SQL 文件的路径。SQL 文件会用于在 StarRocks 集群创建库表, 向 Flink 集群提交 Flink job。默认为 ./result,不建议修改。
2) 执行如下命令,SMT 会读取 MySQL 中同步对象的库表结构,并且结合配置文件信息,在 result 目录生成 SQL 文件,用于 StarRocks 集群创建库表(starrocks-create.all.sql), 用于向 Flink 集群提交同步数据的 flink job(flink-create.all.sql)。 并且源表不同,则 starrocks-create.all.sql 中建表语句默认创建的数据模型不同。
如果源表没有 Primary Key、 Unique Key,则默认创建明细模型。
如果源表有 Primary Key、 Unique Key,则区分以下几种情况:
源表是 Hive 表、ClickHouse MergeTree 表,则默认创建明细模型。
源表是 ClickHouse SummingMergeTree表,则默认创建聚合模型。
源表为其他类型,则默认创建主键模型。
# 运行 SMT
./starrocks-migrate-tool
# 进入并查看 result 目录中的文件
cd result
ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql
3) 执行如下命令,连接 StarRocks,并执行 SQL 文件 starrocks-create.all.sql,用于创建目标库和表。推荐使用 SQL 文件中默认的建表语句,本示例中建表语句默认创建的数据模型为主键模型。
注意
您也可以根据业务需要,修改 SQL 文件中的建表语句,基于其他模型创建目标表。
如果您选择基于非主键模型创建目标表,StarRocks 不支持将源表中 DELETE 操作同步至非主键模型的表,请谨慎使用。
mysql -h <fe_host> -P <fe_query_port> -u user2 -pxxxxxx < starrocks-create.all.sql
如果数据需要经过 Flink 处理后写入目标表,目标表与源表的结构不一样,则您需要修改 SQL 文件 starrocks-create.all.sql 中的建表语句。
7. 同步数据
运行 Flink 集群,提交 Flink job,启动流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 StarRocks 中。
1) 进入 Flink 目录,执行如下命令,在 Flink SQL 客户端运行 SQL 文件 flink-create.all.sql。
该 SQL 文件定义了动态表 source table、sink table,查询语句 INSERT INTO SELECT,并且指定 connector、源数据库和目标数据库。Flink SQL 客户端执行该 SQL 文件后,向 Flink 集群提交一个 Flink job,开启同步任务。
./bin/sql-client.sh -f flink-create.all.sql