https://hub.docker.com/r/sounos/seatunnel
https://seatunnel.apache.org/download
https://github.com/apache/incubator-seatunnel/releases
https://repo.maven.apache.org/maven2/org/apache/seatunnel/
https://github.com/apache/seatunnel-web
https://hub.docker.com/r/987846/seatunnel-web
1. flink
https://abc.htmltoo.com/thread-45970.htm
https://abc.htmltoo.com/search-flink-1.htm
Flink版本请选择>= 1.9.0
2. seatunnel-2
wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz
cd /data/site/htmltoo.opt/seatunnel/
vim config/seatunnel-env.sh
FLINK_HOME=/data/site/htmltoo.opt/flink-1.16.2
./bin/seatunnel.sh --config ./config/v2.streaming.conf -e local
3.Dockerfile
vim /data/site/htmltoo.opt/seatunnel/Dockerfile
FROM openjdk:8
ENV SEATUNNEL_VERSION="2.3.3"
COPY /apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz /opt/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
WORKDIR /opt
RUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
RUN mv apache-seatunnel-${SEATUNNEL_VERSION} seatunnel
RUN rm -f /opt/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
WORKDIR /opt/seatunnel
ENTRYPOINT ["sh","-c"," bin/seatunnel.sh --config $config -e local"]docker build -t seatunnel:2.3.3 -f Dockerfile .
4.docker run
docker run -d --restart always --name=seatunnel --privileged=true --user=root -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro -e DISABLE_JEMALLOC='true' -e config="/opt/data/seatunnel.streaming.conf" -v /data/site/docker/env/bizdata/seatunnel/seatunnel.conf:/opt/data/seatunnel.streaming.conf:ro --link mariadb --link ch --link flinkjob sounos/seatunnel:2.3.3-flink15 ./bin/start-seatunnel-flink-15-connector-v2.sh --config /opt/data/seatunnel.streaming.conf
docker run -d --restart always -p 9991:8080 --name=seatunnelweb --privileged=true --user=root -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro 987846/seatunnel-web:1.0.0-SNAPSHOT
docker exec -it seatunnel /bin/bash
3. 配置seatunnel
编辑config/waterdrop-env.sh, 指定必须环境配置如FLINK_HOME (Step 1 中Flink下载并解压后的目录)
vim config/waterdrop-env.sh
env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
SocketStream{
result_table_name = "fake"
field_name = "info"
}
}
transform {
Split{
separator = "#"
fields = ["name","age"]
}
sql {
sql = "select * from (select info,split(info) as info_row from fake) t1"
}
}
sink {
ConsoleSink {}
}4. 启动netcat server用于发送数据
nc -l -p 9999
5. 启动seatunnel
cd seatunnel
./bin/start-waterdrop-flink.sh --config ./config/application.conf
6. 在nc端输入
xg#1995
可在 flink Web-UI(http://localhost:8081/#/task-manager)的 TaskManager Stdout日志打印出:
xg#1995,xg,1995
6.1 Streaming 流式计算
以上配置为默认【流式处理配置模版】,可直接运行,命令如下: cd seatunnel ./bin/start-waterdrop-flink.sh --config ./config/flink.streaming.conf.template
6.2 Batch 离线批处理
以上配置为默认【离线批处理配置模版】,可直接运行,命令如下: cd seatunnel ./bin/start-waterdrop-flink.sh --config ./config/flink.batch.conf.template
7.配置文件
一个完整的seatunnel配置包含env, source, transform, sink, 即:
env {
...
}
source {
...
}
transform {
...
}
sink {
...
}7.1 env是flink任务的相关的配置,例如设置时间为event-time还是process-time
env {
execution.parallelism = 1 #设置任务的整体并行度为1
execution.checkpoint.interval = 10000 #设置任务checkpoint的频率
execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" #设置checkpoint的路径
}以下是env可配置项,具体含义可以参照flink官网配置
public class ConfigKeyName {
//the time characteristic for all streams create from this environment, e.g., processing-time,event-time,ingestion-time
public final static String TIME_CHARACTERISTIC = "execution.time-characteristic";
//the maximum time frequency (milliseconds) for the flushing of the output buffers
public final static String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
//the parallelism for operations executed through this environment
public final static String PARALLELISM = "execution.parallelism";
//the maximum degree of parallelism to be used for the program
public final static String MAX_PARALLELISM = "execution.max-parallelism";
//enables checkpointing for the streaming job,time interval between state checkpoints in milliseconds
public final static String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
//the checkpointing mode (exactly-once vs. at-least-once)
public final static String CHECKPOINT_MODE = "execution.checkpoint.mode";
//the maximum time that a checkpoint may take before being discarded
public final static String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
//a file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
public final static String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
//the maximum number of checkpoint attempts that may be in progress at the same time
public final static String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
//enables checkpoints to be persisted externally,delete externalized checkpoints on job cancellation (e.g., true,false)
public final static String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
//the minimal pause before the next checkpoint is triggere
public final static String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
//the tolerable checkpoint failure number
public final static String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
//the restart strategy to be used for recovery (e.g., 'no' , 'fixed-delay', 'failure-rate')
//no -> no restart strategy
//fixed-delay -> fixed delay restart strategy
//failure-rate -> failure rate restart strategy
public final static String RESTART_STRATEGY = "execution.restart.strategy";
//number of restart attempts for the fixed delay restart strategy
public final static String RESTART_ATTEMPTS = "execution.restart.attempts";
//delay in-between restart attempts for the delay restart strategy
public final static String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
//time interval for failures
public final static String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval";
//maximum number of restarts in given interval for the failure rate restart strategy
public final static String RESTART_FAILURE_RATE = "execution.restart.failureRate";
//delay in-between restart attempts for the failure rate restart strategy
public final static String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
//the maximum time interval for which idle state is retained
public final static String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
//the minimum time interval for which idle state is retained
public final static String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
//the state backend ('rocksdb','fs')
public final static String STATE_BACKEND = "execution.state.backend";
}source可配置任意的source插件及其参数,具体参数随不同的source插件而变化。
transform可配置任意的transform插件及其参数,具体参数随不同的transform插件而变化。transform中的多个插件按配置顺序形成了数据处理的pipeline, 默认上一个transform的输出是下一个transform的输入,但也可以通过source_table_name控制。
transform处理完的数据,会发送给sink中配置的每个插件。
sink可配置任意的sink插件及其参数,具体参数随不同的sink插件而变化。
7.2 Source 插件配置
不指定 result_table_name时 ,此插件处理后的数据,不会被注册为一个可供其他插件直接访问的数据集(dataStream/dataset),或者被称为临时表(table);
指定 result_table_name 时,此插件处理后的数据,会被注册为一个可供其他插件直接访问的数据集(dataStream/dataset),或者被称为临时表(table)。此处注册的数据集(dataStream/dataset),其他插件可通过指定 source_table_name 来直接访问。
source {
FakeSourceStream {
result_table_name = "fake"
field_name = "name,age"
}
}
数据源 FakeSourceStream 的结果将注册为名为 fake 的临时表。
这个临时表,可以被任意 Transform 或者 Sink 插件通过指定 source_table_name 使用。
field_name 将临时表的两列分别命名为name和age。1) Fake
Fake Source主要用于自动生成数据,数据只有两列,第一列为String类型,内容为["Gary", "Ricky Huo", "Kid Xiong"]中随机一个,第二列为Long类型,为当前的13位时间戳,以此作为输入来对seatunnel进行功能验证,测试等。
source {
FakeSourceStream {
result_table_name = "fake"
field_name = "name,age"
}
}2) Socket
Socket作为数据源
source {
SocketStream{
result_table_name = "socket"
field_name = "info"
}
}3) File
从文件系统中读取数据
从文件系统中读取文件的格式,目前支持csv、json、parquet 、orc和 text
需要文件路径,hdfs文件以hdfs://开头,本地文件以file://开头
FileSource{
path = "hdfs://localhost:9000/input/"
source_format = "json"
schema = "{\"data\":[{\"a\":1,\"b\":2},{\"a\":3,\"b\":4}],\"db\":\"string\",\"q\":{\"s\":\"string\"}}"
result_table_name = "test"
}4) JDBC
JdbcSource {
driver = com.mysql.jdbc.Driver
url = "jdbc:mysql://localhost/test"
username = root
query = "select * from test"
}5) Kafka
从Kafka消费数据,支持的Kafka版本 >= 0.10.0.
KafkaTableStream {
consumer.bootstrap.servers = "127.0.0.1:9092"
consumer.group.id = "seatunnel5"
topics = test
result_table_name = test
format.type = csv
schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
format.field-delimiter = ";"
format.allow-comments = "true"
format.ignore-parse-errors = "true"
}7.3 Transform 插件配置
1) SQL
使用SQL处理数据,使用的是flink的sql语法,支持其各种udf
sql {
sql = "select name, age from fake"
}2) Split
定义了一个字符串切割函数,用于在Sql插件对指定字段进行分割。
#这个只是创建了一个叫split的udf
Split{
separator = "#"
fields = ["name","age"]
}
#使用split函数(确认fake表存在)
sql {
sql = "select * from (select info,split(info) as info_row from fake) t1"
}7.4 Sink 插件配置
1) Console
用于功能测试和和debug,结果将输出在taskManager的stdout选项卡
ConsoleSink{}2) Elasticsearch
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel"
}
将结果写入Elasticsearch集群的名称为 seatunnel 的索引中3) File
目前支持csv、json、和 text。streaming模式目前只支持text
需要文件路径,hdfs文件以hdfs://开头,本地文件以file://开头
write_mode: NO_OVERWRITE 不覆盖,路径存在报错; OVERWRITE 覆盖,路径存在则先删除再写入
FileSink {
format = "json"
path = "hdfs://localhost:9000/flink/output/"
write_mode = "OVERWRITE"
}4) JDBC
batch_size: 每批写入数量
JdbcSink {
source_table_name = fake
driver = com.mysql.jdbc.Driver
url = "jdbc:mysql://localhost/test"
username = root
query = "insert into test(name,age) values(?,?)"
batch_size = 2
}5) Kafka
KafkaTable {
producer.bootstrap.servers = "127.0.0.1:9092"
topics = test_sink
}7.5 完整配置文件
######
###### This config file is a demonstration of streaming processing in seatunnel config
######
env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSourceStream {
result_table_name = "fake"
field_name = "name,age"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://interestinglab.github.io/seatunnel-docs/#/zh-cn/configuration/base
}
transform {
sql {
sql = "select name,age from fake"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://interestinglab.github.io/seatunnel-docs/#/zh-cn/configuration/base
}
sink {
ConsoleSink {}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://interestinglab.github.io/seatunnel-docs/#/zh-cn/configuration/base
}7.6 运行
在Flink Standalone集群上运行seatunnel
.bin/start-seatunnel-flink.sh --config config-path
# -p 2 指定flink job的并行度为2,还可以指定更多的参数,使用 flink run -h查看
.bin/start-seatunnel-flink.sh -p 2 --config config-path
在Yarn集群上运行seatunnel
.bin/start-seatunnel-flink.sh -m yarn-cluster --config config-path
# -ynm seatunnel 指定在yarn webUI显示的名称为seatunnel,还可以指定更多的参数,使用 flink run -h查看
.bin/start-seatunnel-flink.sh -m yarn-cluster -ynm seatunnel --config config-path