seatunnel -v2 部署 - spark ETL spark


1. flink && spark

https://abc.htmltoo.com/search-flink-1.htm

Flink版本请选择 1.9.0

https://abc.htmltoo.com/search-spark-1.htm

Spark版本请选择 >= 2.x.x


2. seatunnel-2

export version="2.3.3"

wget https://github.com/InterestingLab/seatunnel/releases/download/v<version>/seatunnel-<version>.zip -O seatunnel-<version>.zip

unzip seatunnel-<version>.zip

ln -s seatunnel-<version> seatunnel


3. 配置文件

3.1 Source 插件配置

1) FakeStream 

主要用于方便得生成用户指定的数据,作为输入来对seatunnel进行功能验证,测试,以及性能测试等。

Fake

主要用于快速上手运行一个 seatunnel 应用

fakeStream {
    content = ['name=ricky&age=23', 'name=gary&age=28']
    rate = 5
}
生成的数据如下,从content列表中随机抽取其中的字符串
+-----------------+
|raw_message      |
+-----------------+
|name=gary&age=28 |
|name=ricky&age=23|
+-----------------+
---
Fake {
    result_table_name = "my_dataset"
}

2) Elasticsearch

elasticsearch {
    hosts = ["localhost:9200"]
    index = "seatunnel-20190424"
    result_table_name = "my_dataset"
  }
elasticsearch {
    hosts = ["localhost:9200"]
    index = "seatunnel-*"
    es.read.field.include = "name, age"
    resulttable_name = "my_dataset"
  }
匹配所有以 seatunnel- 开头的索引, 并且仅仅读取 name和 age 两个字段。

3) hive

env {
  ...
  spark.sql.catalogImplementation = "hive"
  ...
}
source {
  hive {
    pre_sql = "select * from mydb.mytb"
    result_table_name = "myTable"
  }
}
...

4) JDBC

jdbc {
    driver = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://localhost:3306/info"
    table = "access"
    result_table_name = "access_log"
    user = "username"
    password = "password"
}
通过JDBC读取MySQL数据
jdbc {
    driver = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://localhost:3306/info"
    table = "access"
    result_table_name = "access_log"
    user = "username"
    password = "password"
    jdbc.partitionColumn = "item_id"
    jdbc.numPartitions = "10"
    jdbc.lowerBound = 0
    jdbc.upperBound = 100
}
根据指定字段划分分区

5) Kafka

kafkaStream {
    topics = "seatunnel"
    consumer.bootstrap.servers = "localhost:9092"
    consumer.group.id = "seatunnel_group"
}

3.2 Transform 插件配置 - 数据转换

3.2.1 Json

对原始数据集指定字段进行Json解析

---不使用target_field
json {
    source_field = "message"
}
-Source
+----------------------------+
|message                   |
+----------------------------+
|{"name": "ricky", "age": 24}|
|{"name": "gary", "age": 28} |
+----------------------------+
-Sink
+----------------------------+---+-----+
|message                   |age|name |
+----------------------------+---+-----+
|{"name": "gary", "age": 28} |28 |gary |
|{"name": "ricky", "age": 23}|23 |ricky|
+----------------------------+---+-----+
---使用target_field
json {
    source_field = "message"
    target_field = "info"
}
-Souce
+----------------------------+
|message                   |
+----------------------------+
|{"name": "ricky", "age": 24}|
|{"name": "gary", "age": 28} |
+----------------------------+
-Sink
+----------------------------+----------+
|message                   |info      |
+----------------------------+----------+
|{"name": "gary", "age": 28} |[28,gary] |
|{"name": "ricky", "age": 23}|[23,ricky]|
+----------------------------+----------+
json处理的结果支持*select * from where info.age = 23*此类SQL语句
---使用 schema_file
json {
    source_field = "message"
    schema_file = "demo.json"
}
-Schema
在 Driver Node 的 ~/seatunnel/plugins/json/files/schemas/demo.json 中放置内容如下:
{
   "name": "demo",
   "age": 24,
   "city": "LA"
}
-Source
+----------------------------+
|message                   |
+----------------------------+
|{"name": "ricky", "age": 24}|
|{"name": "gary", "age": 28} |
+----------------------------+
-Sink
+----------------------------+---+-----+-----+
|message                     |age|name |city |
+----------------------------+---+-----+-----+
|{"name": "gary", "age": 28} |28 |gary |null |
|{"name": "ricky", "age": 23}|23 |ricky|null |
+----------------------------+---+-----+-----+
若使用 cluster 模式进行部署,需确保 json schemas 目录被打包到 plugins.tar.gz 中

3.2.1  SQL

sql {
    sql = "select username, address from user_info",
}
使用SQL插件用于字段删减,仅保留 username 和 address 字段,将丢弃其余字段。user_info 为之前插件配置的 result_table_name
sql {
    sql = "select substring(telephone, 0, 10) from user_info",
}
使用SQL插件用于数据处理,使用substring functions对 telephone 字段进行截取操作
sql {
    sql = "select avg(age) from user_info",
    table_name = "user_info"
}
使用SQL插件用于数据聚合,使用avg functions对原始数据集进行聚合操作,取出 age 字段平均值

3.2.2 Split

separator [string] 指定的分隔符,默认为,   ;  fields [array]  分割后各个字段的名称

split {
    source_field = "message"
    delimiter = "&"
    fields = ["field1", "field2"]
}
将源数据中的 message 字段根据**&**进行分割,可以以 field1 或 field2 为key获取相应value
split {
    source_field = "message"
    target_field = "info"
    delimiter = ","
    fields = ["field1", "field2"]
}
将源数据中的 message 字段根据**,**进行分割,分割后的字段为 info,可以以 info.field1 或 info.field2 为key获取相应value

3.3 Sink 插件配置 - 数据输出

不指定 source_table_name 时,当前插件处理的就是配置文件中上一个插件输出的数据集(dataset);

指定 source_table_name 的时候,当前插件处理的就是此参数对应的数据集。

stdout {
    source_table_name = "view_table_2"
}
将名为 view_table_2 的临时表输出。

1) Clickhouse

通过 Clickhouse-jdbc 将数据源按字段名对应,写入ClickHouse,使用前需要提前创建对应的数据表

clickhouse {
    host = "localhost:8123"
    clickhouse.socket_timeout = 50000
    database = "nginx"
    table = "access_msg"
    fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
    username = "username"
    password = "password"
    bulk_size = 20000
}
ClickHouse {
    host = "localhost:8123"
    database = "nginx"
    table = "access_msg"
    fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
    username = "username"
    password = "password"
    bulk_size = 20000
    retry_codes = [209, 210]
    retry = 3
}
当出现网络超时或者网络异常的情况下,重试写入3次

2) Console

输出数据到标准输出/终端, 常用于debug, 能够很方便观察数据.

console {
    limit = 10
    serializer = "json"
}
以Json格式输出10条数据

3) Elasticsearch

输出数据到Elasticsearch,支持的ElasticSearch版本为 >= 2.x 且 < 7.0.0。

elasticsearch {
    hosts = ["localhost:9200"]
    index = "seatunnel"
}
将结果写入Elasticsearch集群的名称为seatunnel的index中
elasticsearch {
    hosts = ["localhost:9200"]
    index = "seatunnel-${now}"
    es.batch.size.entries = 100000
    index_time_format = "yyyy.MM.dd"
}
按天创建索引,例如 seatunnel-2020.01.01

4) File

输出数据到文件

file {
    path = "file:///var/logs"
    serializer = "text"
}

5) HBase

采用hbase-connectors将数据输出到Hbase, Hbase(>=2.1.0)以及Spark(>=2.0.0)的版本兼容均依赖于hbase-connectors。

支持两种写模式,overwrite和append,overwrite代表当hbase表中若存在数据,那么将会进行truncate清空,然后再加载数据。

 hbase {
    source_table_name = "hive_dataset"
    hbase.zookeeper.quorum = "centos01:2181,centos02:2181,centos03:2181"
    catalog = "{\"table\":{\"namespace\":\"default\", \"name\":\"customer\"},\"rowkey\":\"c_custkey\",\"columns\":{\"c_custkey\":{\"cf\":\"rowkey\", \"col\":\"c_custkey\", \"type\":\"bigint\"},\"c_name\":{\"cf\":\"info\", \"col\":\"c_name\", \"type\":\"string\"},\"c_address\":{\"cf\":\"info\", \"col\":\"c_address\", \"type\":\"string\"},\"c_city\":{\"cf\":\"info\", \"col\":\"c_city\", \"type\":\"string\"},\"c_nation\":{\"cf\":\"info\", \"col\":\"c_nation\", \"type\":\"string\"},\"c_region\":{\"cf\":\"info\", \"col\":\"c_region\", \"type\":\"string\"},\"c_phone\":{\"cf\":\"info\", \"col\":\"c_phone\", \"type\":\"string\"},\"c_mktsegment\":{\"cf\":\"info\", \"col\":\"c_mktsegment\", \"type\":\"string\"}}}"
    staging_dir = "/tmp/hbase-staging/2020.11.9/"
    save_mode = "overwrite"
}
Hbase的这个插件没有给用户提供创建表的功能,因为hbase表的预分区方式会和业务逻辑有关,所以运行该插件的时候需要用户自己提前创建好hbase的表以及它的预分区;对于rowkey的设计,catalog本身就支持了多列联合 rowkey=”col1:col2:col3“,但若有其他对rowkey的设计需求,例如:加盐,等等操作,完全可以通过解耦的方式基于transform plugin对rowkey进行修改。

6) Hdfs

hdfs {
    path = "hdfs:///var/logs-${now}"
    serializer = "json"
    path_time_format = "yyyy.MM.dd"
}

7) MySQL

支持Update的方式输出数据到Mysql

存储模式,添加模式update,在插入数据键冲突的时候进行可指定方式的数据覆盖

基本模式overwrite,append,ignore以及error

Mysql {
    save_mode = "update",
    truncate = true,
    url = "jdbc:mysql://192.168.1.1:3306/database",
    user= "userName",
    password = "***********",
    dbtable = "tableName",
    customUpdateStmt = "INSERT INTO table (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
}

8) Phoenix

  Phoenix {
    zk-connect = "host1:2181,host2:2181,host3:2181"
    table = "table22"
  }

3.4 完整配置文件案例

Streaming 流式计算 - v2.streaming.conf.template

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######
env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 2000
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}
sink {
  Console {
  }
  # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}

Batch 离线批处理  -  v2.batch.config.template

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######
env {
  # You can set SeaTunnel environment configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 10000
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/category/source-v2
}
sink {
  Console {
  }
  # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}


4. 运行

在本地以 local 方式运行 seatunnel

./bin/start-seatunnel-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf

在 Spark Standalone 集群上运行 seatunnel

# client 模式

./bin/start-seatunnel-spark.sh --master spark://207.184.161.138:7077 --deploy-mode client --config ./config/application.conf

# cluster 模式

./bin/start-seatunnel-spark.sh --master spark://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf

在 Yarn 集群上运行 seatunnel

# client 模式

./bin/start-seatunnel-spark.sh --master yarn --deploy-mode client --config ./config/application.conf

# cluster 模式

./bin/start-seatunnel-spark.sh --master yarn --deploy-mode cluster --config ./config/application.conf


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回