seatunnel配置 ETL


seatunnel配置

通用配置

一个完整的seatunnel配置包含spark, input, filter, output, 即:

 一个完整的seatunnel配置包含spark, input, filter, output, 即:
 
 spark {
     ...
 }
 
 input {
     ...
 }
 
 filter {
     ...
 }
 
 output {
     ...
 }

Input插件

  • Alluxio

    从Alluxio中读取文件的格式,目前支持csvjsonparquetxmlorctext

     alluxio {
         path = "alluxio:///var/seatunnel-logs"
         result_table_name = "access_log"
         format = "json"
     }
  • ElasticSearch

     elasticsearch {
         hosts = ["localhost:9200"]
         index = "seatunnel-20190424"
         result_table_name = "my_dataset"
       }
     elasticsearch {
         hosts = ["localhost:9200"]
         index = "seatunnel-*"
         es.read.field.include = "name, age"
         result_table_name = "my_dataset"
       }
     匹配所有以 seatunnel- 开头的索引, 并且仅仅读取 name和 age 两个字段。
  • FakeStream

Fake Input主要用于方便得生成用户指定的数据,作为输入来对seatunnel进行功能验证,测试,以及性能测试等。

  • File

从本地文件中读取原始数据

文件的格式,目前支持csvjsonparquetxmlorctext.

  • FileStream

从本地文件目录中读取原始数据,会监听新文件生成

 fileStream {
     path = "file:///var/log/"
 }
 或者指定format
 fileStream {
     path = "file:///var/log/"
     format = "xml"
     rowTag = "book"
 }
  • HdfsStream

监听HDFS目录中的文件变化,实时加载并处理新文件,形成文件处理流

 hdfsStream {
     path = "hdfs:///access/log/"
 }
 或者可以指定 hdfs name service:
 
 hdfsStream {
     path = "hdfs://m2:8022/access/log/"
 }
 或者指定format
 
 hdfsStream {
     path = "hdfs://m2:8022/access/log/"
     format = "xml"
     rowTag = "book"
 }

  • Jdbc

通过JDBC读取外部数据源数据

jdbc {
     driver = "com.mysql.jdbc.Driver"
     url = "jdbc:mysql://localhost:3306/info"
     table = "access"
     result_table_name = "access_log"
     user = "username"
     password = "password"
 }
 jdbc {
     driver = "com.mysql.jdbc.Driver"
     url = "jdbc:mysql://localhost:3306/info"
     table = "(select * from access) AS a"
     result_table_name = "access_log"
     user = "username"
     password = "password"
 }
 通过JDBC读取MySQL数据
 jdbc {
     driver = "com.mysql.jdbc.Driver"
     url = "jdbc:mysql://localhost:3306/info"
     table = "access"
     result_table_name = "access_log"
     user = "username"
     password = "password"
     jdbc.partitionColumn = "item_id"
     jdbc.numPartitions = "10"
     jdbc.lowerBound = 0
     jdbc.upperBound = 100
 }
  • KafkaStream

从Kafka消费数据,支持的Kafka版本 >= 0.10.0

-Spark Streaming
 kafkaStream {
     topics = "seatunnel"
     consumer.bootstrap.servers = "localhost:9092"
     consumer.group.id = "seatunnel_group"
 }
 -Spark Structured Streaming
 kafkaStream {
     topics = "seatunnel"
     consumer.bootstrap.servers = "localhost:9092"
     consumer.group.id = "seatunnel_group"
     consumer.failOnDataLoss = false
 }
 -在 Spark Structured Streaming 模式下,如果kafka里的数据是json格式,可以指定json的schema,input 将按照指定的schema进行解析,如果你需要流关联功能,还需要指定 table_name 这个参数以便在sql插件中使用 如下
 kafkaStream {
   topics = "seatunnel"
   consumer.bootstrap.servers = "localhost:9092"
   consumer.group.id = "seatunnel_group"
   consumer.rebalance.max.retries = 100
   consumer.failOnDataLoss = false
   schema = "{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
 }
 -在 Spark Streaming 模式下,会将数据统一按照字符串进行处理,生成如下格式
 +--------------+-------------------------------+
 | topic        |       raw_message             |
 +--------------+-------------------------------+
 | topic_name   |     kafka_message_1           |
 +--------------+-------------------------------+
 | topic_name   |     kafka_message_2           |
 +--------------+-------------------------------+
 -后续可以在 filter 部分使用 json、grok、split 等插件进行处理
  • Kudu

     kudu{
        kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051"
        kudu_table="my_kudu_table"
        result_table_name="reg_table"
      }

  • MongoDB

     mongodb{
             readconfig.uri="mongodb://myhost:mypost"
             readconfig.database="mydatabase"
             readconfig.collection="mycollection"
             readconfig.spark.mongodb.input.partitioner = "MongoPaginateBySizePartitioner"
             schema="{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
             result_table_name = "test"
           }

  • MySQL

     mysql {
         url = "jdbc:mysql://localhost:3306/info"
         table = "access"
         result_table_name = "access_log"
         user = "username"
         password = "password"
     }
     mysql {
         url = "jdbc:mysql://localhost:3306/info"
         table = "(select * from access) AS a"
         result_table_name = "access_log"
         user = "username"
         password = "password"
     }
     从MySQL中读取数据
     mysql {
         url = "jdbc:mysql://localhost:3306/info"
         table = "access"
         result_table_name = "access_log"
         user = "username"
         password = "password"
         jdbc.partitionColumn = "item_id"
         jdbc.numPartitions = "10"
         jdbc.lowerBound = 0
         jdbc.upperBound = 100
     }

  • Hive

     -注意:从seatunnel v1.3.4 开始,使用hive input必须做如下配置
     spark {
       ...
       spark.sql.catalogImplementation = "hive"
       ...
     }
     input {
       hive {
         pre_sql = "select * from mydb.mytb"
         result_table_name = "myTable"
       }
     }
     ...

  • S3Stream

    S3云存储路径,当前支持的路径格式有s3://, s3a://, s3n:// 

  • s3Stream {
         path = "s3n://bucket/access.log"
     }
  • SocketStream

    Socket作为数据源

  • RedisStream

    Redis集群作为数据源,以队列作为数据输入

     RedisStream {
         host = "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002"
         prefKey = ""
         queue = "test"
         password = "root"
     }
  • Redis

    从Redis中读取数据

     Redis {
         host = "192.168.1.100"
         port = 6379
         key_pattern = "*keys*"
         partition = 20
         db_num = 2
         result_table_name = "reids_result_table"
     }
  • Tidb

    通过TiSpark从TiDB数据库中读取数据,当前仅仅支持Spark 2.1

     -使用TiDB Input必须在spark-defaults.conf或者seatunnel配置文件中配置spark.tispark.pd.addresses和spark.sql.extensions
     spark {
       ...
       spark.tispark.pd.addresses = "localhost:2379"
       spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
     }
     input {
         tidb {
             database = "test"
             pre_sql = "select * from test.my_table"
             result_table_name = "myTable"
         }
     }

Filter插件

  • Add

    在源数据中新增一个字段

     add {
         value = "1"
     }
     新增一个字段,其值为1
  • Checksum

    获取指定字段的校验码

    checksum {
         source_field = "deviceId"
         target_field = "device_crc32"
         method = "CRC32"
     }
     获取deviceId字段CRC32校验码
  • Convert

    对指定字段进行类型转换

     convert {
         source_field = "age"
         new_type = "integer"
     }
     将源数据中的age字段转换为integer类型
  • Date

    对指定字段进行时间格式转换

     date {
         source_field = "timestamp"
         target_field = "date"
         source_time_format = "UNIX"
         target_time_format = "yyyy/MM/dd"
     }
     -将源数据中的timestamp字段由UNIX时间戳,例如1517128894转换为yyyy/MM/dd格式的date字段,例如2018/01/28
     date {
         source_field = "httpdate"
         target_field = "datetime"
         source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
         target_time_format = "yyyy/MM/dd HH:mm:ss"
     }
     -将源数据中的httpdate字段由dd/MMM/yyyy:HH:mm:ss Z格式转化为yyyy/MM/dd HH:mm:ss格式的datetime字段
  • Drop

    丢弃掉符合指定条件的Row

     drop {
         condition = "status = '200'"
     }
     状态码为200的Row将被丢弃
  • Grok

    使用Grok Pattern来解析字段

    grok {
         source_field = "raw_message"
         pattern = "%{WORD:name} is %{WORD:gender}, %{NUMBER:age} years old and weighs %{NUMBER:weight} kilograms"
         target_field = "info_detail"
     }
     -Input
     +----------------------------------------------------+
     |raw_message                                         |
     +----------------------------------------------------+
     |gary is male, 25 years old and weighs 68.5 kilograms|
     |gary is male, 25 years old and weighs 68.5 kilograms|
     +----------------------------------------------------+
     -Output
     +----------------------+--------------------------+
     |raw_message           |info_detail               |
     +----------------------+--------------------------+
     |gary is male, 25 years old and weighs 68.5 kilograms|Map(age -> 25, gender -> male, name -> gary, weight -> 68.5)|
     |gary is male, 25 years old and weighs 68.5 kilograms|Map(age -> 25, gender -> male, name -> gary, weight -> 68.5)|
     +----------------------------------------------------+------------------------------------------------------------+
  • Join
  • 和指定的临时表进行Join操作, 目前仅支持Stream-static Inner Joins

     input {
       fakestream {
         content = ["Hello World,seatunnel"]
         rate = 1
       }
       mysql {
         url = "jdbc:mysql://localhost:3306/info"
         table = "project_info"
         table_name = "spark_project_info"
         user = "username"
         password = "password"
       }
     }
     filter {
       split {
         fields = ["msg", "project"]
         delimiter = ","
       }
       join {
         table_name = "spark_project_info"
         source_field = "project"
       }
     }
  • Json

    对原始数据集指定字段进行Json解析

     1.不使用 target_field
     json {
         source_field = "message"
     }
     -Input
     +----------------------------+
     |message                   |
     +----------------------------+
     |{"name": "ricky", "age": 24}|
     |{"name": "gary", "age": 28} |
     +----------------------------+
     -Output
     +----------------------------+---+-----+
     |message                   |age|name |
     +----------------------------+---+-----+
     |{"name": "gary", "age": 28} |28 |gary |
     |{"name": "ricky", "age": 23}|23 |ricky|
     +----------------------------+---+-----+
     
     2.使用 target_field
     使用 target_field 会将解析后的嵌套结果存储在指定字段中。
     json {
         source_field = "message"
         target_field = "info"
         result_table_name = "view_1"
     }
     -Input
     +----------------------------+
     |message                   |
     +----------------------------+
     |{"name": "ricky", "age": 24}|
     |{"name": "gary", "age": 28} |
     +----------------------------+
     -Output
     +----------------------------+----------+
     |message                   |info      |
     +----------------------------+----------+
     |{"name": "gary", "age": 28} |[28,gary] |
     |{"name": "ricky", "age": 23}|[23,ricky]|
     +----------------------------+----------+
     json处理的结果支持*select * from view_1 where info.age = 23*此类SQL语句
     
     3.使用schema_file
     json {
         source_field = "message"
         schema_file = "demo.json"
     }
     -Schema
     在 Driver Node 的 /opt/seatunnel/plugins/json/files/schemas/demo.json 中放置内容如下:
     {
        "name": "demo",
        "age": 24,
        "city": "LA"
     }
     -Input ```
     +----+ |message | +--+ |
     {"name": "ricky", "age": 24}| |{"name": "gary", "age": 28} | +----------------------------+
     
     
     -Output
     +-----------+ |message |age|name |city | +-----+-----+ |
     {"name": "gary", "age": 28} |28 |gary |null | |{"name": "ricky", "age": 23}|23 |ricky|null | +----------------------------+---+-----+-----+
     
     > 若使用 cluster 模式进行部署,需确保 json schemas 目录被打包到 plugins.tar.gz 中
  • Kv

    提取指定字段所有的Key-Value, 常用于解析url参数中的key和value

  • Lowercase

    将指定字段内容全部转换为小写字母

     lowercase {
         source_field = "address"
         target_field = "address_lowercased"
     }
  • Remove

    删除数据中的字段

     remove {
         source_field = ["field1", "field2"]
     }
     删除原始数据中的field1和field2字段
  • Rename

    变更之后的字段名

     rename {
         source_field = "field1"
         target_field = "field2"
     }
     将原始数据中的field1字段重命名为field2字段
  • Repartition

    调整数据处理的分区个数,并行度。这个filter主要是为了调节数据处理性能,不对数据本身做任何处理。

     repartition {
         num_partitions = 8
     }
  • Replace

    将指定字段内容根据正则表达式进行替换

     replace {
         target_field = "tmp"
         source_field = "message"
         pattern = "is"
         replacement = "are"
     }
     将message中的is替换为are,并赋值给tmp
  • Sample

    对原始数据集进行抽样

     sample {
         fraction = 0.8
     }
     抽取80%的数据
  • Script

    解析并执行自定义脚本中逻辑, 即接受object_name(默认是event) 指定的JSONObject, 完成自定义的处理逻辑,再返回一个新的event

     conf文件插件配置
       script {
         script_name = "my_script.ql"
       }
     自定义脚本(my_script.ql)
     newEvent = new java.util.HashMap();
     you = event.getString("name");
     age = event.getLong("age");
     if(age > 10){
     newEvent.put("name",you);
     }
     return newEvent;
     如果age大于10,则获取name放入map中并返回
  • Split

    根据delimiter分割字符串

     split {
         source_field = "message"
         delimiter = "&"
         fields = ["field1", "field2"]
     }
     将源数据中的message字段根据**&**进行分割,可以以field1或field2为key获取相应value
     split {
         source_field = "message"
         target_field = "info"
         delimiter = ","
         fields = ["field1", "field2"]
     }
     将源数据中的message字段根据**,**进行分割,分割后的字段为info,可以以info.field1或info.field2为key获取相应value
  • SQL

    使用SQL处理数据,支持Spark丰富的UDF函数

     sql {
         sql = "select username, address from user_info",
     }
     -仅保留username和address字段,将丢弃其余字段。user_info 为之前插件配置的 result_table_name
     sql {
         sql = "select substring(telephone, 0, 10) from user_info",
     }
     -使用substring functions对telephone字段进行截取操作
     sql {
         sql = "select avg(age) from user_info",
         table_name = "user_info"
     }
     -使用avg functions对原始数据集进行聚合操作,取出age平均值
  • Table

    Table 用于将静态文件映射为一张表,可与实时处理的流进行关联,常用于用户昵称,国家省市等字典表关联

     -不指定列的类型,默认为string
     table {
         table_name = "mydict"
         path = "/user/seatunnel/mylog/a.txt"
         fields = ['city', 'population']
     }
     -指定列的类型
     table {
         table_name = "mydict"
         path = "/user/seatunnel/mylog/a.txt"
         fields = ['city', 'population']
         field_types = ['string', 'long']
     }
  • Truncate

    对指定字段进行字符串截取

     truncate {
         source_field = "telephone"
         max_length = 10
     }
  • Uppercase

    将指定字段内容全部转换为大写字母

     uppercase {
         source_field = "username"
         target_field = "username_uppercased"
     }
  • Urlencode

     urlencode {
         source_field = "url"
     }
     UrlEncode 方法已经注册为 UDF,可以直接在 SQL 插件中使用
     -
     sql {
         sql = "select urlencode(url) as url from view_1"
     }
  • Urldecode

     urldecode {
         source_field = "url"
     }
     -UrlDecode 方法已经注册为 UDF,可以直接在 SQL 插件中使用
     sql {
         sql = "select urldecode(url) as url from view_1"
     }
  • Uuid

    为原始数据集新增一个全局唯一且自增的UUID字段,使用的是spark的monotonically_increasing_id()函数。

     uuid {
         target_field = "id"
     }
  • Watermark

    Spark结构化流式水印

     Watermark {
              delay_threshold = "5 minutes"
              time_field = "tf"
              time_type = "UNIX"
              watermark_field = "wm"
     }


Output插件

  • Alluxio

  • Clickhouse

     clickhouse {
         host = "localhost:8123"
         clickhouse.socket_timeout = 50000
         database = "nginx"
         table = "access_msg"
         fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
         username = "username"
         password = "password"
         bulk_size = 20000
     }
     
     ClickHouse {
         host = "localhost:8123"
         database = "nginx"
         table = "access_msg"
         fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
         username = "username"
         password = "password"
         bulk_size = 20000
         retry_codes = [209, 210]
         retry = 3
     }
     当出现网络超时或者网络异常的情况下,重试写入3次
     -分布式表配置
     ClickHouse {
         host = "localhost:8123"
         database = "nginx"
         table = "access_msg"
         cluster = "no_replica_cluster"
         fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
     }
     -根据提供的cluster名称,会从system.clusters表里面获取当前table实际分布在那些节点上。
    单spark partition的数据会根据随机策略选择某一个ClickHouse节点执行具体的写入操作
  • Elasticsearch

     elasticsearch {
         hosts = ["localhost:9200"]
         index = "seatunnel"
     }
     -将结果写入Elasticsearch集群的名称为seatunnel的index中
     elasticsearch {
         hosts = ["localhost:9200"]
         index = "seatunnel-${now}"
         es.batch.size.entries = 100000
         index_time_format = "yyyy.MM.dd"
     }
     -按天创建索引,例如 seatunnel-2017.11.03
  • File

     file {
         path = "file:///var/logs"
         format = "text"
     }
  • Hdfs

     hdfs {
         path = "hdfs:///var/logs-${now}"
         format = "json"
         path_time_format = "yyyy.MM.dd"
     }
     -按天生成HDFS文件,例如logs-2018.02.12
  • Hive

     -output {
       Hive {
         sql = "insert overwrite table seatunnel.test1 partition(province) select name,age,province from myTable2"
       }
     }
     -output {
       Hive {
         source_table_name = "myTable2"
         result_table_name = "seatunnel.test1"
         save_mode = "overwrite"
         sink_columns = "name,age,province"
         partition_by = ["province"]
       }
     }
  • Jdbc

    通过JDBC输出数据到外部数据源

     -spark streaming
     jdbc {
         driver = "com.mysql.jdbc.Driver"
         url = "jdbc:mysql://localhost:3306/info"
         table = "access"
         user = "username"
         password = "password"
         save_mode = "append"
     }
     -structured streaming
     jdbc {
         driver = "com.mysql.jdbc.Driver"
         url = "jdbc:mysql://localhost:3306/info"
         table = "access"
         user = "username"
         password = "password"
     }
  • Kafka

     -spark streaming or batch
     kafka {
         topic = "seatunnel"
         producer.bootstrap.servers = "localhost:9092"
     }
     -structured streaming
     kafka {
         topic = "seatunnel"
         producer.bootstrap.servers = "localhost:9092"
         streaming_output_mode = "update"
         checkpointLocation = "/your/path"
     }
  • Kudu

     kudu{
        kudu_master="hadoop01:7051,hadoop02:7051,hadoop03:7051"
        kudu_table="my_kudu_table"
        mode="upsert"
      }
  • MongoDB

     -spark streaming or batch
     mongodb{
         writeconfig.uri="mongodb://myhost:mypost"
         writeconfig.database="mydatabase"
         writeconfig.collection="mycollection"
     }
     -structured streaming
     mongodb{
         writeconfig.host="my host"
         writeconfig.port=27017
         writeconfig.database="mydatabase"
         writeconfig.collection="mycollection"
         mongo_output_mode = "updateOne"
         update_fields = "id,name"
         streaming_output_mode = "update"
         checkpointLocation = "/your/path"
     }
  • MySQL

     mysql {
         url = "jdbc:mysql://localhost:3306/info"
         table = "access"
         user = "username"
         password = "password"
         save_mode = "append"
     }
  • Opentsdb

     opentsdb{
         postUrl = "http://localhost:4222/api/put?summary"
         metric = "test_metric"
         tags_fields = ["col1","col2","col3"]
         measures = ["men1","men2"]
         value_fields = "timestamps"
     }
  • S3

    输出数据到S3文件

     s3 {
         path = "s3a://var/logs"
         format = "parquet"
     }
  • Stdout

    输出数据到标准输出/终端, 常用于debug, 能够很方便输出数据.

     stdout {
         limit = 10
         format = "json"
     }
  • Tidb

    通过JDBC将数据写入TiDB

     tidb {
         url = "jdbc:mysql://127.0.0.1:4000/test?useUnicode=true&characterEncoding=utf8"
         table = "access"
         user = "username"
         password = "password"
         save_mode = "append"
     }
签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回