flink 部署 - 开源分布式流和批处理框架 大数据 docker flink


https://hub.docker.com/_/flink

https://flink.apache.org/

https://nightlies.apache.org/flink/flink-docs-stable/zh/


# 选择合适的 flink 版本: 1.16.2-scala_2.12-java8

---创建 JobManager 

docker run -d --name=flinkjob --restart always -p 8088:8081 -e FLINK_PROPERTIES='jobmanager.rpc.address: flinkjob'   --privileged=true --user=root  -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro  flink:1.16.2-scala_2.12-java8 jobmanager 

-v /data/site/docker/env/bizdata/flink/jobmanager:/opt/flink/conf

---创建 TaskManager 

docker run -d --name=flinktask1 --restart always -e FLINK_PROPERTIES='jobmanager.rpc.address: flinkjob'   --privileged=true --user=root  -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro --link flinkjob  flink:1.16.2-scala_2.12-java8  taskmanager

docker run -d --name=flinktask2 --restart always -e FLINK_PROPERTIES='jobmanager.rpc.address: flinkjob'   --privileged=true --user=root  -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro --link flinkjob  flink:1.16.2-scala_2.12-java8  taskmanager

-v /data/site/docker/env/bizdata/flink/taskmanager:/opt/flink/conf


http://g.htmltoo.com:8088


# 执行程序

docker exec -it  flinkjob /bin/bash

docker exec -it flinkjob python demo1.py
docker exec -it flinkjob flink run ./examples/batch/WordCount.jar  -input ./README.txt  -output README_CountWord_Result.txt
# 在flink服务器上本地提交
 ./bin/flink run --python demo.py
# 用文件夹提交pyflink job,并且使用--pyModule 指定入口模块 :
./bin/flink run \
      --pyModule table.word_count \
      --pyFiles examples/python/table
# 提交 PyFlink到一个具体的JobManager :
./bin/flink run \
      --jobmanager <jobmanagerHost>:8081 \
      --python examples/python/table/word_count.py


vim  /data/site/docker/env/bizdata/flink/jobmanager/flink-conf.yaml

jobmanager.rpc.address: flinkjob
jobmanager.memory.process.size: 2000m
blob.server.port: 6124
query.server.port: 6125

vim /data/site/docker/env/bizdata/flink/taskmanager/flink-conf.yaml

jobmanager.rpc.address: flinkjob 
taskmanager.memory.process.size: 2000m
taskmanager.numberOfTaskSlots: 3
blob.server.port: 6124
query.server.port: 6125


# 源mysql的连接器

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc

https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka

https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

-上传两个jar包到lib中

docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/flink-connector-jdbc-3.1.1-1.17.jar   flinkjob:/opt/flink/lib/flink-connector-jdbc-3.1.1-1.17.jar

docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/flink-sql-connector-mysql-cdc-2.4.1.jar  flinkjob:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.4.1.jar

docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/flink-connector-kafka-1.16.2.jar   flinkjob:/opt/flink/lib/flink-connector-kafka-1.16.2.jar

docker cp /data/site/htmltoo.f/htmltoo.soft/src/common/flink-connector/kafka-clients-3.5.1.jar   flinkjob:/opt/flink/lib/kafka-clients-3.5.1.jar


# 同步mysql数据到StarRocks

https://github.com/StarRocks/starrocks-connector-for-apache-flink


# 基于Flink SQL CDC Mysql to Mysql数据同步

运行start-cluster.sh

运行sql-client.sh进入到FlinkSQL模式

创建两个表映射、一个启动任务指令

1)源数据库映射

create table ny_energy_data_source
(
    id             bigint ,
    enterprise_id  bigint     ,
    use_time       timestamp   ,
    date_type      int        ,
    attribute_id   bigint     ,
    PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'ip地址',
    'port' = '3306',
    'username' = '用户名',
    'password' = '密码',
    'database-name' = '源数据库名',
    'table-name' = '表名'
);

2)目标数据库映射

create table ny_energy_data_target
(
    id             bigint ,
    enterprise_id  bigint     ,
    use_time       timestamp   ,
    date_type      int        ,
    attribute_id   bigint     ,
    PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://IP地址:3306/库名?serverTimezone=UTC',
    'username' = '用户名',
    'password' = '密码',
    'table-name' = '表名',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'scan.fetch-size' = '200'
);

3) 启动任务

从ny_energy_data_source到ny_energy_data_target,先全量后增量

insert into ny_energy_data_target select * from ny_energy_data_source;

4) 进入到http://IP:8081(默认端口),查看任务运行情况


#Apache Flink是一个用于分布式流和批处理数据处理的开源平台。Flink的核心是流数据流引擎,为数据流上的分布式计算提供数据分发,通信和容错。


With Docker Compose you can create a Flink cluster:

version: "2.1"
services:
  jobmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

and just run docker-compose up.

Scale the cluster up or down to N TaskManagers:

docker-compose scale taskmanager=<N>


Configuration

These are the default ports used by the Flink image:

  • The Web Client is on port 8081
  • JobManager RPC port 6123
  • TaskManagers RPC port 6122
  • TaskManagers Data port 6121



签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回