1. flink && spark
https://abc.htmltoo.com/search-flink-1.htm
Flink版本请选择 1.9.0
https://abc.htmltoo.com/search-spark-1.htm
Spark版本请选择 >= 2.x.x
2. seatunnel-2
export version="2.3.3"
wget https://github.com/InterestingLab/seatunnel/releases/download/v<version>/seatunnel-<version>.zip -O seatunnel-<version>.zip
unzip seatunnel-<version>.zip
ln -s seatunnel-<version> seatunnel
3. 配置文件
3.1 Source 插件配置
1) FakeStream
主要用于方便得生成用户指定的数据,作为输入来对seatunnel进行功能验证,测试,以及性能测试等。
Fake
主要用于快速上手运行一个 seatunnel 应用
fakeStream {
content = ['name=ricky&age=23', 'name=gary&age=28']
rate = 5
}
生成的数据如下,从content列表中随机抽取其中的字符串
+-----------------+
|raw_message |
+-----------------+
|name=gary&age=28 |
|name=ricky&age=23|
+-----------------+
---
Fake {
result_table_name = "my_dataset"
}2) Elasticsearch
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-20190424"
result_table_name = "my_dataset"
}
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-*"
es.read.field.include = "name, age"
resulttable_name = "my_dataset"
}
匹配所有以 seatunnel- 开头的索引, 并且仅仅读取 name和 age 两个字段。3) hive
env {
...
spark.sql.catalogImplementation = "hive"
...
}
source {
hive {
pre_sql = "select * from mydb.mytb"
result_table_name = "myTable"
}
}
...4) JDBC
jdbc {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/info"
table = "access"
result_table_name = "access_log"
user = "username"
password = "password"
}
通过JDBC读取MySQL数据
jdbc {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/info"
table = "access"
result_table_name = "access_log"
user = "username"
password = "password"
jdbc.partitionColumn = "item_id"
jdbc.numPartitions = "10"
jdbc.lowerBound = 0
jdbc.upperBound = 100
}
根据指定字段划分分区5) Kafka
kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_group"
}3.2 Transform 插件配置 - 数据转换
3.2.1 Json
对原始数据集指定字段进行Json解析
---不使用target_field
json {
source_field = "message"
}
-Source
+----------------------------+
|message |
+----------------------------+
|{"name": "ricky", "age": 24}|
|{"name": "gary", "age": 28} |
+----------------------------+
-Sink
+----------------------------+---+-----+
|message |age|name |
+----------------------------+---+-----+
|{"name": "gary", "age": 28} |28 |gary |
|{"name": "ricky", "age": 23}|23 |ricky|
+----------------------------+---+-----+
---使用target_field
json {
source_field = "message"
target_field = "info"
}
-Souce
+----------------------------+
|message |
+----------------------------+
|{"name": "ricky", "age": 24}|
|{"name": "gary", "age": 28} |
+----------------------------+
-Sink
+----------------------------+----------+
|message |info |
+----------------------------+----------+
|{"name": "gary", "age": 28} |[28,gary] |
|{"name": "ricky", "age": 23}|[23,ricky]|
+----------------------------+----------+
json处理的结果支持*select * from where info.age = 23*此类SQL语句
---使用 schema_file
json {
source_field = "message"
schema_file = "demo.json"
}
-Schema
在 Driver Node 的 ~/seatunnel/plugins/json/files/schemas/demo.json 中放置内容如下:
{
"name": "demo",
"age": 24,
"city": "LA"
}
-Source
+----------------------------+
|message |
+----------------------------+
|{"name": "ricky", "age": 24}|
|{"name": "gary", "age": 28} |
+----------------------------+
-Sink
+----------------------------+---+-----+-----+
|message |age|name |city |
+----------------------------+---+-----+-----+
|{"name": "gary", "age": 28} |28 |gary |null |
|{"name": "ricky", "age": 23}|23 |ricky|null |
+----------------------------+---+-----+-----+
若使用 cluster 模式进行部署,需确保 json schemas 目录被打包到 plugins.tar.gz 中3.2.1 SQL
sql {
sql = "select username, address from user_info",
}
使用SQL插件用于字段删减,仅保留 username 和 address 字段,将丢弃其余字段。user_info 为之前插件配置的 result_table_name
sql {
sql = "select substring(telephone, 0, 10) from user_info",
}
使用SQL插件用于数据处理,使用substring functions对 telephone 字段进行截取操作
sql {
sql = "select avg(age) from user_info",
table_name = "user_info"
}
使用SQL插件用于数据聚合,使用avg functions对原始数据集进行聚合操作,取出 age 字段平均值3.2.2 Split
separator [string] 指定的分隔符,默认为, ; fields [array] 分割后各个字段的名称
split {
source_field = "message"
delimiter = "&"
fields = ["field1", "field2"]
}
将源数据中的 message 字段根据**&**进行分割,可以以 field1 或 field2 为key获取相应value
split {
source_field = "message"
target_field = "info"
delimiter = ","
fields = ["field1", "field2"]
}
将源数据中的 message 字段根据**,**进行分割,分割后的字段为 info,可以以 info.field1 或 info.field2 为key获取相应value3.3 Sink 插件配置 - 数据输出
不指定 source_table_name 时,当前插件处理的就是配置文件中上一个插件输出的数据集(dataset);
指定 source_table_name 的时候,当前插件处理的就是此参数对应的数据集。
stdout {
source_table_name = "view_table_2"
}
将名为 view_table_2 的临时表输出。1) Clickhouse
通过 Clickhouse-jdbc 将数据源按字段名对应,写入ClickHouse,使用前需要提前创建对应的数据表
clickhouse {
host = "localhost:8123"
clickhouse.socket_timeout = 50000
database = "nginx"
table = "access_msg"
fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
username = "username"
password = "password"
bulk_size = 20000
}
ClickHouse {
host = "localhost:8123"
database = "nginx"
table = "access_msg"
fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
username = "username"
password = "password"
bulk_size = 20000
retry_codes = [209, 210]
retry = 3
}
当出现网络超时或者网络异常的情况下,重试写入3次2) Console
输出数据到标准输出/终端, 常用于debug, 能够很方便观察数据.
console {
limit = 10
serializer = "json"
}
以Json格式输出10条数据3) Elasticsearch
输出数据到Elasticsearch,支持的ElasticSearch版本为 >= 2.x 且 < 7.0.0。
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel"
}
将结果写入Elasticsearch集群的名称为seatunnel的index中
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
按天创建索引,例如 seatunnel-2020.01.014) File
输出数据到文件
file {
path = "file:///var/logs"
serializer = "text"
}5) HBase
采用hbase-connectors将数据输出到Hbase, Hbase(>=2.1.0)以及Spark(>=2.0.0)的版本兼容均依赖于hbase-connectors。
支持两种写模式,overwrite和append,overwrite代表当hbase表中若存在数据,那么将会进行truncate清空,然后再加载数据。
hbase {
source_table_name = "hive_dataset"
hbase.zookeeper.quorum = "centos01:2181,centos02:2181,centos03:2181"
catalog = "{\"table\":{\"namespace\":\"default\", \"name\":\"customer\"},\"rowkey\":\"c_custkey\",\"columns\":{\"c_custkey\":{\"cf\":\"rowkey\", \"col\":\"c_custkey\", \"type\":\"bigint\"},\"c_name\":{\"cf\":\"info\", \"col\":\"c_name\", \"type\":\"string\"},\"c_address\":{\"cf\":\"info\", \"col\":\"c_address\", \"type\":\"string\"},\"c_city\":{\"cf\":\"info\", \"col\":\"c_city\", \"type\":\"string\"},\"c_nation\":{\"cf\":\"info\", \"col\":\"c_nation\", \"type\":\"string\"},\"c_region\":{\"cf\":\"info\", \"col\":\"c_region\", \"type\":\"string\"},\"c_phone\":{\"cf\":\"info\", \"col\":\"c_phone\", \"type\":\"string\"},\"c_mktsegment\":{\"cf\":\"info\", \"col\":\"c_mktsegment\", \"type\":\"string\"}}}"
staging_dir = "/tmp/hbase-staging/2020.11.9/"
save_mode = "overwrite"
}
Hbase的这个插件没有给用户提供创建表的功能,因为hbase表的预分区方式会和业务逻辑有关,所以运行该插件的时候需要用户自己提前创建好hbase的表以及它的预分区;对于rowkey的设计,catalog本身就支持了多列联合 rowkey=”col1:col2:col3“,但若有其他对rowkey的设计需求,例如:加盐,等等操作,完全可以通过解耦的方式基于transform plugin对rowkey进行修改。6) Hdfs
hdfs {
path = "hdfs:///var/logs-${now}"
serializer = "json"
path_time_format = "yyyy.MM.dd"
}7) MySQL
支持Update的方式输出数据到Mysql
存储模式,添加模式update,在插入数据键冲突的时候进行可指定方式的数据覆盖
基本模式overwrite,append,ignore以及error
Mysql {
save_mode = "update",
truncate = true,
url = "jdbc:mysql://192.168.1.1:3306/database",
user= "userName",
password = "***********",
dbtable = "tableName",
customUpdateStmt = "INSERT INTO table (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
}8) Phoenix
Phoenix {
zk-connect = "host1:2181,host2:2181,host3:2181"
table = "table22"
}3.4 完整配置文件案例
Streaming 流式计算 - v2.streaming.conf.template
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}
sink {
Console {
}
# If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}Batch 离线批处理 - v2.batch.config.template
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######
env {
# You can set SeaTunnel environment configuration here
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}
sink {
Console {
}
# If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}4. 运行
在本地以 local 方式运行 seatunnel
./bin/start-seatunnel-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf
在 Spark Standalone 集群上运行 seatunnel
# client 模式
./bin/start-seatunnel-spark.sh --master spark://207.184.161.138:7077 --deploy-mode client --config ./config/application.conf
# cluster 模式
./bin/start-seatunnel-spark.sh --master spark://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf
在 Yarn 集群上运行 seatunnel
# client 模式
./bin/start-seatunnel-spark.sh --master yarn --deploy-mode client --config ./config/application.conf
# cluster 模式
./bin/start-seatunnel-spark.sh --master yarn --deploy-mode cluster --config ./config/application.conf