seatunnel配置
通用配置
一个完整的seatunnel配置包含spark, input, filter, output, 即:
一个完整的seatunnel配置包含spark, input, filter, output, 即:
spark {
...
}
input {
...
}
filter {
...
}
output {
...
}Input插件
Alluxio
从Alluxio中读取文件的格式,目前支持
csv、json、parquet、xml、orc和textalluxio { path = "alluxio:///var/seatunnel-logs" result_table_name = "access_log" format = "json" }
ElasticSearch
elasticsearch { hosts = ["localhost:9200"] index = "seatunnel-20190424" result_table_name = "my_dataset" } elasticsearch { hosts = ["localhost:9200"] index = "seatunnel-*" es.read.field.include = "name, age" result_table_name = "my_dataset" } 匹配所有以 seatunnel- 开头的索引, 并且仅仅读取 name和 age 两个字段。
FakeStream
Fake Input主要用于方便得生成用户指定的数据,作为输入来对seatunnel进行功能验证,测试,以及性能测试等。
File
从本地文件中读取原始数据
文件的格式,目前支持csv、json、parquet 、xml、orc和 text.
FileStream
从本地文件目录中读取原始数据,会监听新文件生成
fileStream {
path = "file:///var/log/"
}
或者指定format
fileStream {
path = "file:///var/log/"
format = "xml"
rowTag = "book"
}HdfsStream
监听HDFS目录中的文件变化,实时加载并处理新文件,形成文件处理流
hdfsStream {
path = "hdfs:///access/log/"
}
或者可以指定 hdfs name service:
hdfsStream {
path = "hdfs://m2:8022/access/log/"
}
或者指定format
hdfsStream {
path = "hdfs://m2:8022/access/log/"
format = "xml"
rowTag = "book"
}Jdbc
通过JDBC读取外部数据源数据
jdbc {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/info"
table = "access"
result_table_name = "access_log"
user = "username"
password = "password"
}
jdbc {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/info"
table = "(select * from access) AS a"
result_table_name = "access_log"
user = "username"
password = "password"
}
通过JDBC读取MySQL数据
jdbc {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/info"
table = "access"
result_table_name = "access_log"
user = "username"
password = "password"
jdbc.partitionColumn = "item_id"
jdbc.numPartitions = "10"
jdbc.lowerBound = 0
jdbc.upperBound = 100
}KafkaStream
从Kafka消费数据,支持的Kafka版本 >= 0.10.0
-Spark Streaming
kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_group"
}
-Spark Structured Streaming
kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_group"
consumer.failOnDataLoss = false
}
-在 Spark Structured Streaming 模式下,如果kafka里的数据是json格式,可以指定json的schema,input 将按照指定的schema进行解析,如果你需要流关联功能,还需要指定 table_name 这个参数以便在sql插件中使用 如下
kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_group"
consumer.rebalance.max.retries = 100
consumer.failOnDataLoss = false
schema = "{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
}
-在 Spark Streaming 模式下,会将数据统一按照字符串进行处理,生成如下格式
+--------------+-------------------------------+
| topic | raw_message |
+--------------+-------------------------------+
| topic_name | kafka_message_1 |
+--------------+-------------------------------+
| topic_name | kafka_message_2 |
+--------------+-------------------------------+
-后续可以在 filter 部分使用 json、grok、split 等插件进行处理Kudu
kudu{ kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051" kudu_table="my_kudu_table" result_table_name="reg_table" }
MongoDB
mongodb{ readconfig.uri="mongodb://myhost:mypost" readconfig.database="mydatabase" readconfig.collection="mycollection" readconfig.spark.mongodb.input.partitioner = "MongoPaginateBySizePartitioner" schema="{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}" result_table_name = "test" }
MySQL
mysql { url = "jdbc:mysql://localhost:3306/info" table = "access" result_table_name = "access_log" user = "username" password = "password" } mysql { url = "jdbc:mysql://localhost:3306/info" table = "(select * from access) AS a" result_table_name = "access_log" user = "username" password = "password" } 从MySQL中读取数据 mysql { url = "jdbc:mysql://localhost:3306/info" table = "access" result_table_name = "access_log" user = "username" password = "password" jdbc.partitionColumn = "item_id" jdbc.numPartitions = "10" jdbc.lowerBound = 0 jdbc.upperBound = 100 }
Hive
-注意:从seatunnel v1.3.4 开始,使用hive input必须做如下配置 spark { ... spark.sql.catalogImplementation = "hive" ... } input { hive { pre_sql = "select * from mydb.mytb" result_table_name = "myTable" } } ...
S3Stream
S3云存储路径,当前支持的路径格式有s3://, s3a://, s3n://
s3Stream { path = "s3n://bucket/access.log" }SocketStream
Socket作为数据源
RedisStream
Redis集群作为数据源,以队列作为数据输入
RedisStream { host = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002" prefKey = "" queue = "test" password = "root" }Redis
从Redis中读取数据
Redis { host = "192.168.1.100" port = 6379 key_pattern = "*keys*" partition = 20 db_num = 2 result_table_name = "reids_result_table" }Tidb
通过TiSpark从TiDB数据库中读取数据,当前仅仅支持Spark 2.1
-使用TiDB Input必须在spark-defaults.conf或者seatunnel配置文件中配置spark.tispark.pd.addresses和spark.sql.extensions spark { ... spark.tispark.pd.addresses = "localhost:2379" spark.sql.extensions = "org.apache.spark.sql.TiExtensions" } input { tidb { database = "test" pre_sql = "select * from test.my_table" result_table_name = "myTable" } }
Filter插件
Add
在源数据中新增一个字段
add { value = "1" } 新增一个字段,其值为1Checksum
获取指定字段的校验码
checksum { source_field = "deviceId" target_field = "device_crc32" method = "CRC32" } 获取deviceId字段CRC32校验码Convert
对指定字段进行类型转换
convert { source_field = "age" new_type = "integer" } 将源数据中的age字段转换为integer类型Date
对指定字段进行时间格式转换
date { source_field = "timestamp" target_field = "date" source_time_format = "UNIX" target_time_format = "yyyy/MM/dd" } -将源数据中的timestamp字段由UNIX时间戳,例如1517128894转换为yyyy/MM/dd格式的date字段,例如2018/01/28 date { source_field = "httpdate" target_field = "datetime" source_time_format = "dd/MMM/yyyy:HH:mm:ss Z" target_time_format = "yyyy/MM/dd HH:mm:ss" } -将源数据中的httpdate字段由dd/MMM/yyyy:HH:mm:ss Z格式转化为yyyy/MM/dd HH:mm:ss格式的datetime字段Drop
丢弃掉符合指定条件的Row
drop { condition = "status = '200'" } 状态码为200的Row将被丢弃Grok
使用Grok Pattern来解析字段
grok { source_field = "raw_message" pattern = "%{WORD:name} is %{WORD:gender}, %{NUMBER:age} years old and weighs %{NUMBER:weight} kilograms" target_field = "info_detail" } -Input +----------------------------------------------------+ |raw_message | +----------------------------------------------------+ |gary is male, 25 years old and weighs 68.5 kilograms| |gary is male, 25 years old and weighs 68.5 kilograms| +----------------------------------------------------+ -Output +----------------------+--------------------------+ |raw_message |info_detail | +----------------------+--------------------------+ |gary is male, 25 years old and weighs 68.5 kilograms|Map(age -> 25, gender -> male, name -> gary, weight -> 68.5)| |gary is male, 25 years old and weighs 68.5 kilograms|Map(age -> 25, gender -> male, name -> gary, weight -> 68.5)| +----------------------------------------------------+------------------------------------------------------------+- Join
和指定的临时表进行Join操作, 目前仅支持Stream-static Inner Joins
input { fakestream { content = ["Hello World,seatunnel"] rate = 1 } mysql { url = "jdbc:mysql://localhost:3306/info" table = "project_info" table_name = "spark_project_info" user = "username" password = "password" } } filter { split { fields = ["msg", "project"] delimiter = "," } join { table_name = "spark_project_info" source_field = "project" } }Json
对原始数据集指定字段进行Json解析
1.不使用 target_field json { source_field = "message" } -Input +----------------------------+ |message | +----------------------------+ |{"name": "ricky", "age": 24}| |{"name": "gary", "age": 28} | +----------------------------+ -Output +----------------------------+---+-----+ |message |age|name | +----------------------------+---+-----+ |{"name": "gary", "age": 28} |28 |gary | |{"name": "ricky", "age": 23}|23 |ricky| +----------------------------+---+-----+ 2.使用 target_field 使用 target_field 会将解析后的嵌套结果存储在指定字段中。 json { source_field = "message" target_field = "info" result_table_name = "view_1" } -Input +----------------------------+ |message | +----------------------------+ |{"name": "ricky", "age": 24}| |{"name": "gary", "age": 28} | +----------------------------+ -Output +----------------------------+----------+ |message |info | +----------------------------+----------+ |{"name": "gary", "age": 28} |[28,gary] | |{"name": "ricky", "age": 23}|[23,ricky]| +----------------------------+----------+ json处理的结果支持*select * from view_1 where info.age = 23*此类SQL语句 3.使用schema_file json { source_field = "message" schema_file = "demo.json" } -Schema 在 Driver Node 的 /opt/seatunnel/plugins/json/files/schemas/demo.json 中放置内容如下: { "name": "demo", "age": 24, "city": "LA" } -Input ``` +----+ |message | +--+ | {"name": "ricky", "age": 24}| |{"name": "gary", "age": 28} | +----------------------------+ -Output +-----------+ |message |age|name |city | +-----+-----+ | {"name": "gary", "age": 28} |28 |gary |null | |{"name": "ricky", "age": 23}|23 |ricky|null | +----------------------------+---+-----+-----+ > 若使用 cluster 模式进行部署,需确保 json schemas 目录被打包到 plugins.tar.gz 中Kv
提取指定字段所有的Key-Value, 常用于解析url参数中的key和value
Lowercase
将指定字段内容全部转换为小写字母
lowercase { source_field = "address" target_field = "address_lowercased" }Remove
删除数据中的字段
remove { source_field = ["field1", "field2"] } 删除原始数据中的field1和field2字段Rename
变更之后的字段名
rename { source_field = "field1" target_field = "field2" } 将原始数据中的field1字段重命名为field2字段Repartition
调整数据处理的分区个数,并行度。这个filter主要是为了调节数据处理性能,不对数据本身做任何处理。
repartition { num_partitions = 8 }Replace
将指定字段内容根据正则表达式进行替换
replace { target_field = "tmp" source_field = "message" pattern = "is" replacement = "are" } 将message中的is替换为are,并赋值给tmpSample
对原始数据集进行抽样
sample { fraction = 0.8 } 抽取80%的数据Script
解析并执行自定义脚本中逻辑, 即接受
object_name(默认是event) 指定的JSONObject, 完成自定义的处理逻辑,再返回一个新的eventconf文件插件配置 script { script_name = "my_script.ql" } 自定义脚本(my_script.ql) newEvent = new java.util.HashMap(); you = event.getString("name"); age = event.getLong("age"); if(age > 10){ newEvent.put("name",you); } return newEvent; 如果age大于10,则获取name放入map中并返回Split
根据delimiter分割字符串
split { source_field = "message" delimiter = "&" fields = ["field1", "field2"] } 将源数据中的message字段根据**&**进行分割,可以以field1或field2为key获取相应value split { source_field = "message" target_field = "info" delimiter = "," fields = ["field1", "field2"] } 将源数据中的message字段根据**,**进行分割,分割后的字段为info,可以以info.field1或info.field2为key获取相应valueSQL
使用SQL处理数据,支持Spark丰富的UDF函数
sql { sql = "select username, address from user_info", } -仅保留username和address字段,将丢弃其余字段。user_info 为之前插件配置的 result_table_name sql { sql = "select substring(telephone, 0, 10) from user_info", } -使用substring functions对telephone字段进行截取操作 sql { sql = "select avg(age) from user_info", table_name = "user_info" } -使用avg functions对原始数据集进行聚合操作,取出age平均值Table
Table 用于将静态文件映射为一张表,可与实时处理的流进行关联,常用于用户昵称,国家省市等字典表关联
-不指定列的类型,默认为string table { table_name = "mydict" path = "/user/seatunnel/mylog/a.txt" fields = ['city', 'population'] } -指定列的类型 table { table_name = "mydict" path = "/user/seatunnel/mylog/a.txt" fields = ['city', 'population'] field_types = ['string', 'long'] }Truncate
对指定字段进行字符串截取
truncate { source_field = "telephone" max_length = 10 }Uppercase
将指定字段内容全部转换为大写字母
uppercase { source_field = "username" target_field = "username_uppercased" }Urlencode
urlencode { source_field = "url" } UrlEncode 方法已经注册为 UDF,可以直接在 SQL 插件中使用 - sql { sql = "select urlencode(url) as url from view_1" }Urldecode
urldecode { source_field = "url" } -UrlDecode 方法已经注册为 UDF,可以直接在 SQL 插件中使用 sql { sql = "select urldecode(url) as url from view_1" }Uuid
为原始数据集新增一个全局唯一且自增的UUID字段,使用的是spark的
monotonically_increasing_id()函数。uuid { target_field = "id" }Watermark
Spark结构化流式水印
Watermark { delay_threshold = "5 minutes" time_field = "tf" time_type = "UNIX" watermark_field = "wm" }
Output插件
Alluxio
Clickhouse
clickhouse { host = "localhost:8123" clickhouse.socket_timeout = 50000 database = "nginx" table = "access_msg" fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"] username = "username" password = "password" bulk_size = 20000 } ClickHouse { host = "localhost:8123" database = "nginx" table = "access_msg" fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"] username = "username" password = "password" bulk_size = 20000 retry_codes = [209, 210] retry = 3 } 当出现网络超时或者网络异常的情况下,重试写入3次 -分布式表配置 ClickHouse { host = "localhost:8123" database = "nginx" table = "access_msg" cluster = "no_replica_cluster" fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"] } -根据提供的cluster名称,会从system.clusters表里面获取当前table实际分布在那些节点上。 单spark partition的数据会根据随机策略选择某一个ClickHouse节点执行具体的写入操作Elasticsearch
elasticsearch { hosts = ["localhost:9200"] index = "seatunnel" } -将结果写入Elasticsearch集群的名称为seatunnel的index中 elasticsearch { hosts = ["localhost:9200"] index = "seatunnel-${now}" es.batch.size.entries = 100000 index_time_format = "yyyy.MM.dd" } -按天创建索引,例如 seatunnel-2017.11.03File
file { path = "file:///var/logs" format = "text" }Hdfs
hdfs { path = "hdfs:///var/logs-${now}" format = "json" path_time_format = "yyyy.MM.dd" } -按天生成HDFS文件,例如logs-2018.02.12Hive
-output { Hive { sql = "insert overwrite table seatunnel.test1 partition(province) select name,age,province from myTable2" } } -output { Hive { source_table_name = "myTable2" result_table_name = "seatunnel.test1" save_mode = "overwrite" sink_columns = "name,age,province" partition_by = ["province"] } }Jdbc
通过JDBC输出数据到外部数据源
-spark streaming jdbc { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost:3306/info" table = "access" user = "username" password = "password" save_mode = "append" } -structured streaming jdbc { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost:3306/info" table = "access" user = "username" password = "password" }Kafka
-spark streaming or batch kafka { topic = "seatunnel" producer.bootstrap.servers = "localhost:9092" } -structured streaming kafka { topic = "seatunnel" producer.bootstrap.servers = "localhost:9092" streaming_output_mode = "update" checkpointLocation = "/your/path" }Kudu
kudu{ kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051" kudu_table="my_kudu_table" mode="upsert" }MongoDB
-spark streaming or batch mongodb{ writeconfig.uri="mongodb://myhost:mypost" writeconfig.database="mydatabase" writeconfig.collection="mycollection" } -structured streaming mongodb{ writeconfig.host="my host" writeconfig.port=27017 writeconfig.database="mydatabase" writeconfig.collection="mycollection" mongo_output_mode = "updateOne" update_fields = "id,name" streaming_output_mode = "update" checkpointLocation = "/your/path" }MySQL
mysql { url = "jdbc:mysql://localhost:3306/info" table = "access" user = "username" password = "password" save_mode = "append" }Opentsdb
opentsdb{ postUrl = "http://localhost:4222/api/put?summary" metric = "test_metric" tags_fields = ["col1","col2","col3"] measures = ["men1","men2"] value_fields = "timestamps" }S3
输出数据到S3文件
s3 { path = "s3a://var/logs" format = "parquet" }Stdout
输出数据到标准输出/终端, 常用于debug, 能够很方便输出数据.
stdout { limit = 10 format = "json" }Tidb
通过JDBC将数据写入TiDB
tidb { url = "jdbc:mysql://127.0.0.1:4000/test?useUnicode=true&characterEncoding=utf8" table = "access" user = "username" password = "password" save_mode = "append" }