https://hub.docker.com/_/flink
https://flink.apache.org/
https://nightlies.apache.org/flink/flink-docs-stable/zh/
# 选择合适的 flink 版本: 1.16.2-scala_2.12-java8
---创建 JobManager
docker run -d --name=flinkjob --restart always -p 8088:8081 -e FLINK_PROPERTIES='jobmanager.rpc.address: flinkjob' --privileged=true --user=root -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro flink:1.16.2-scala_2.12-java8 jobmanager
-v /data/site/docker/env/bizdata/flink/jobmanager:/opt/flink/conf
---创建 TaskManager
docker run -d --name=flinktask1 --restart always -e FLINK_PROPERTIES='jobmanager.rpc.address: flinkjob' --privileged=true --user=root -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro --link flinkjob flink:1.16.2-scala_2.12-java8 taskmanager
docker run -d --name=flinktask2 --restart always -e FLINK_PROPERTIES='jobmanager.rpc.address: flinkjob' --privileged=true --user=root -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro --link flinkjob flink:1.16.2-scala_2.12-java8 taskmanager
-v /data/site/docker/env/bizdata/flink/taskmanager:/opt/flink/conf
http://g.htmltoo.com:8088
# 执行程序
docker exec -it flinkjob /bin/bash
docker exec -it flinkjob python demo1.py
docker exec -it flinkjob flink run ./examples/batch/WordCount.jar -input ./README.txt -output README_CountWord_Result.txt
# 在flink服务器上本地提交
./bin/flink run --python demo.py
# 用文件夹提交pyflink job,并且使用--pyModule 指定入口模块 :
./bin/flink run \
--pyModule table.word_count \
--pyFiles examples/python/table
# 提交 PyFlink到一个具体的JobManager :
./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/word_count.py
vim /data/site/docker/env/bizdata/flink/jobmanager/flink-conf.yaml
jobmanager.rpc.address: flinkjob
jobmanager.memory.process.size: 2000m
blob.server.port: 6124
query.server.port: 6125
vim /data/site/docker/env/bizdata/flink/taskmanager/flink-conf.yaml
jobmanager.rpc.address: flinkjob
taskmanager.memory.process.size: 2000m
taskmanager.numberOfTaskSlots: 3
blob.server.port: 6124
query.server.port: 6125
# 源mysql的连接器
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc
https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
-上传两个jar包到lib中
docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/flink-connector-jdbc-3.1.1-1.17.jar flinkjob:/opt/flink/lib/flink-connector-jdbc-3.1.1-1.17.jar
docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/flink-sql-connector-mysql-cdc-2.4.1.jar flinkjob:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.4.1.jar
docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/flink-connector-kafka-1.16.2.jar flinkjob:/opt/flink/lib/flink-connector-kafka-1.16.2.jar
docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/kafka-clients-3.5.1.jar flinkjob:/opt/flink/lib/kafka-clients-3.5.1.jar
# 同步mysql数据到StarRocks
https://github.com/StarRocks/starrocks-connector-for-apache-flink
# 基于Flink SQL CDC Mysql to Mysql数据同步
运行start-cluster.sh
运行sql-client.sh进入到FlinkSQL模式
创建两个表映射、一个启动任务指令
1)源数据库映射
create table ny_energy_data_source
(
id bigint ,
enterprise_id bigint ,
use_time timestamp ,
date_type int ,
attribute_id bigint ,
PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
'connector' = 'mysql-cdc',
'hostname' = 'ip地址',
'port' = '3306',
'username' = '用户名',
'password' = '密码',
'database-name' = '源数据库名',
'table-name' = '表名'
);
2)目标数据库映射
create table ny_energy_data_target
(
id bigint ,
enterprise_id bigint ,
use_time timestamp ,
date_type int ,
attribute_id bigint ,
PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://IP地址:3306/库名?serverTimezone=UTC',
'username' = '用户名',
'password' = '密码',
'table-name' = '表名',
'driver' = 'com.mysql.cj.jdbc.Driver',
'scan.fetch-size' = '200'
);
3) 启动任务
从ny_energy_data_source到ny_energy_data_target,先全量后增量
insert into ny_energy_data_target select * from ny_energy_data_source;
4) 进入到http://IP:8081(默认端口),查看任务运行情况
#Apache Flink是一个用于分布式流和批处理数据处理的开源平台。Flink的核心是流数据流引擎,为数据流上的分布式计算提供数据分发,通信和容错。
With Docker Compose you can create a Flink cluster:
version: "2.1"
services:
jobmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanagerand just run docker-compose up.
Scale the cluster up or down to N TaskManagers:
docker-compose scale taskmanager=<N>
Configuration
These are the default ports used by the Flink image:
- The Web Client is on port 8081
- JobManager RPC port 6123
- TaskManagers RPC port 6122
- TaskManagers Data port 6121