seatunnel -v2 部署 - flink ETL flink


https://hub.docker.com/r/sounos/seatunnel

https://seatunnel.apache.org/download

https://github.com/apache/incubator-seatunnel/releases

https://repo.maven.apache.org/maven2/org/apache/seatunnel/

https://github.com/apache/seatunnel-web

https://hub.docker.com/r/987846/seatunnel-web


1. flink

https://abc.htmltoo.com/thread-45970.htm

https://abc.htmltoo.com/search-flink-1.htm

Flink版本请选择>= 1.9.0


2. seatunnel-2

wget  https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz

cd  /data/site/htmltoo.opt/seatunnel/

vim  config/seatunnel-env.sh

FLINK_HOME=/data/site/htmltoo.opt/flink-1.16.2

./bin/seatunnel.sh --config ./config/v2.streaming.conf  -e local


3.Dockerfile

vim  /data/site/htmltoo.opt/seatunnel/Dockerfile

FROM openjdk:8
ENV SEATUNNEL_VERSION="2.3.3"
COPY /apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz /opt/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
WORKDIR /opt
RUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
RUN mv apache-seatunnel-${SEATUNNEL_VERSION} seatunnel
RUN rm -f /opt/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
WORKDIR /opt/seatunnel
ENTRYPOINT ["sh","-c"," bin/seatunnel.sh --config $config  -e local"]

docker build -t seatunnel:2.3.3 -f Dockerfile .


4.docker run 

docker run  -d  --restart always  --name=seatunnel  --privileged=true --user=root  -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro  -e DISABLE_JEMALLOC='true'  -e config="/opt/data/seatunnel.streaming.conf" -v /data/site/docker/env/bizdata/seatunnel/seatunnel.conf:/opt/data/seatunnel.streaming.conf:ro   --link mariadb --link ch  --link flinkjob  sounos/seatunnel:2.3.3-flink15   ./bin/start-seatunnel-flink-15-connector-v2.sh --config /opt/data/seatunnel.streaming.conf


docker run  -d  --restart always  -p 9991:8080 --name=seatunnelweb  --privileged=true --user=root  -e TZ='Asia/Shanghai' -v /etc/localtime:/etc/localtime:ro  987846/seatunnel-web:1.0.0-SNAPSHOT


docker exec -it  seatunnel  /bin/bash


http://g.htmltoo.com:9991


3. 配置seatunnel

编辑config/waterdrop-env.sh, 指定必须环境配置如FLINK_HOME (Step 1 中Flink下载并解压后的目录)

vim  config/waterdrop-env.sh

env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
    SocketStream{
          result_table_name = "fake"
          field_name = "info"
    }
}
transform {
  Split{
    separator = "#"
    fields = ["name","age"]
  }
  sql {
    sql = "select * from (select info,split(info) as info_row from fake) t1"
  }
}
sink {
  ConsoleSink {}
}


4. 启动netcat server用于发送数据

nc -l -p 9999


5. 启动seatunnel

cd seatunnel

./bin/start-waterdrop-flink.sh  --config ./config/application.conf


6. 在nc端输入

xg#1995

可在 flink Web-UI(http://localhost:8081/#/task-manager)的 TaskManager Stdout日志打印出:

xg#1995,xg,1995


6.1 Streaming 流式计算

以上配置为默认【流式处理配置模版】,可直接运行,命令如下:
cd seatunnel
./bin/start-waterdrop-flink.sh --config ./config/flink.streaming.conf.template

6.2 Batch 离线批处理

以上配置为默认【离线批处理配置模版】,可直接运行,命令如下:
cd seatunnel
./bin/start-waterdrop-flink.sh --config ./config/flink.batch.conf.template


7.配置文件

一个完整的seatunnel配置包含env, source, transform, sink, 即:

    env {
        ...
    }
    
    source {
        ...
    }
    
    transform {
        ...
    }
    
    sink {
        ...
    }

7.1 env是flink任务的相关的配置,例如设置时间为event-time还是process-time

 env {
    execution.parallelism = 1 #设置任务的整体并行度为1
    execution.checkpoint.interval = 10000 #设置任务checkpoint的频率
    execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" #设置checkpoint的路径
  }

以下是env可配置项,具体含义可以参照flink官网配置

  public class ConfigKeyName {
      //the time characteristic for all streams create from this environment, e.g., processing-time,event-time,ingestion-time
      public final static String TIME_CHARACTERISTIC = "execution.time-characteristic";
      //the maximum time frequency (milliseconds) for the flushing of the output buffers
      public final static String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
      //the parallelism for operations executed through this environment
      public final static String PARALLELISM = "execution.parallelism";
      //the maximum degree of parallelism to be used for the program
      public final static String MAX_PARALLELISM = "execution.max-parallelism";
      //enables checkpointing for the streaming job,time interval between state checkpoints in milliseconds
      public final static String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
      //the checkpointing mode (exactly-once vs. at-least-once)
      public final static String CHECKPOINT_MODE = "execution.checkpoint.mode";
      //the maximum time that a checkpoint may take before being discarded
      public final static String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
      //a file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
      public final static String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
      //the maximum number of checkpoint attempts that may be in progress at the same time
      public final static String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
      //enables checkpoints to be persisted externally,delete externalized checkpoints on job cancellation (e.g., true,false)
      public final static String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
      //the minimal pause before the next checkpoint is triggere
      public final static String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
      //the tolerable checkpoint failure number
      public final static String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
      //the restart strategy to be used for recovery (e.g., 'no' , 'fixed-delay', 'failure-rate')
      //no -> no restart strategy
      //fixed-delay -> fixed delay restart strategy
      //failure-rate -> failure rate restart strategy
      public final static String RESTART_STRATEGY = "execution.restart.strategy";
      //number of restart attempts for the fixed delay restart strategy
      public final static String RESTART_ATTEMPTS = "execution.restart.attempts";
      //delay in-between restart attempts for the delay restart strategy
      public final static String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts";
      //time interval for failures
      public final static String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval";
      //maximum number of restarts in given interval for the failure rate restart strategy
      public final static String RESTART_FAILURE_RATE = "execution.restart.failureRate";
      //delay in-between restart attempts for the failure rate restart strategy
      public final static String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
      //the maximum time interval for which idle state is retained
      public final static String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
      //the minimum time interval for which idle state is retained
      public final static String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
      //the state backend ('rocksdb','fs')
      public final static String STATE_BACKEND = "execution.state.backend";
  
  }

source可配置任意的source插件及其参数,具体参数随不同的source插件而变化。

transform可配置任意的transform插件及其参数,具体参数随不同的transform插件而变化。transform中的多个插件按配置顺序形成了数据处理的pipeline, 默认上一个transform的输出是下一个transform的输入,但也可以通过source_table_name控制。

transform处理完的数据,会发送给sink中配置的每个插件。

sink可配置任意的sink插件及其参数,具体参数随不同的sink插件而变化。


7.2 Source 插件配置

不指定 result_table_name时 ,此插件处理后的数据,不会被注册为一个可供其他插件直接访问的数据集(dataStream/dataset),或者被称为临时表(table);

指定 result_table_name 时,此插件处理后的数据,会被注册为一个可供其他插件直接访问的数据集(dataStream/dataset),或者被称为临时表(table)。此处注册的数据集(dataStream/dataset),其他插件可通过指定 source_table_name 来直接访问。

source {
    FakeSourceStream {
      result_table_name = "fake"
      field_name = "name,age"
    }
}
数据源 FakeSourceStream 的结果将注册为名为 fake 的临时表。
这个临时表,可以被任意 Transform 或者 Sink 插件通过指定 source_table_name 使用。
field_name 将临时表的两列分别命名为name和age。

1) Fake

Fake Source主要用于自动生成数据,数据只有两列,第一列为String类型,内容为["Gary", "Ricky Huo", "Kid Xiong"]中随机一个,第二列为Long类型,为当前的13位时间戳,以此作为输入来对seatunnel进行功能验证,测试等。

source {
    FakeSourceStream {
      result_table_name = "fake"
      field_name = "name,age"
    }
}

2) Socket

Socket作为数据源

source {
  SocketStream{
        result_table_name = "socket"
        field_name = "info"
  }
}

3) File

从文件系统中读取数据

从文件系统中读取文件的格式,目前支持csv、json、parquet 、orc和 text

需要文件路径,hdfs文件以hdfs://开头,本地文件以file://开头

  FileSource{
    path = "hdfs://localhost:9000/input/"
    source_format = "json"
    schema = "{\"data\":[{\"a\":1,\"b\":2},{\"a\":3,\"b\":4}],\"db\":\"string\",\"q\":{\"s\":\"string\"}}"
    result_table_name = "test"
  }

4) JDBC

JdbcSource {
        driver = com.mysql.jdbc.Driver
        url = "jdbc:mysql://localhost/test"
        username = root
        query = "select * from test"
   }

5) Kafka

从Kafka消费数据,支持的Kafka版本 >= 0.10.0.

  KafkaTableStream {
    consumer.bootstrap.servers = "127.0.0.1:9092"
    consumer.group.id = "seatunnel5"
    topics = test
    result_table_name = test
    format.type = csv
    schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
    format.field-delimiter = ";"
    format.allow-comments = "true"
    format.ignore-parse-errors = "true"
  }

7.3 Transform 插件配置

1) SQL

使用SQL处理数据,使用的是flink的sql语法,支持其各种udf

sql {
    sql = "select name, age from fake"
}

2) Split

定义了一个字符串切割函数,用于在Sql插件对指定字段进行分割。

  #这个只是创建了一个叫split的udf
  Split{
    separator = "#"
    fields = ["name","age"]
  }
  #使用split函数(确认fake表存在)
  sql {
    sql = "select * from (select info,split(info) as info_row from fake) t1"
  }

7.4 Sink 插件配置

1) Console

用于功能测试和和debug,结果将输出在taskManager的stdout选项卡

ConsoleSink{}

2) Elasticsearch

elasticsearch {
    hosts = ["localhost:9200"]
    index = "seatunnel"
}
将结果写入Elasticsearch集群的名称为 seatunnel 的索引中

3) File

目前支持csv、json、和 text。streaming模式目前只支持text

需要文件路径,hdfs文件以hdfs://开头,本地文件以file://开头

write_mode: NO_OVERWRITE 不覆盖,路径存在报错;  OVERWRITE 覆盖,路径存在则先删除再写入

  FileSink {
    format = "json"
    path = "hdfs://localhost:9000/flink/output/"
    write_mode = "OVERWRITE"
  }

4) JDBC

batch_size: 每批写入数量

   JdbcSink {
     source_table_name = fake
     driver = com.mysql.jdbc.Driver
     url = "jdbc:mysql://localhost/test"
     username = root
     query = "insert into test(name,age) values(?,?)"
     batch_size = 2
   }

5) Kafka

   KafkaTable {
     producer.bootstrap.servers = "127.0.0.1:9092"
     topics = test_sink
   }

7.5 完整配置文件

######
###### This config file is a demonstration of streaming processing in seatunnel config
######
env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
    FakeSourceStream {
      result_table_name = "fake"
      field_name = "name,age"
    }
  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
  # please go to https://interestinglab.github.io/seatunnel-docs/#/zh-cn/configuration/base
}
transform {
    sql {
      sql = "select name,age from fake"
    }
  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://interestinglab.github.io/seatunnel-docs/#/zh-cn/configuration/base
}
sink {
  ConsoleSink {}
  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
  # please go to https://interestinglab.github.io/seatunnel-docs/#/zh-cn/configuration/base
}

7.6 运行

在Flink Standalone集群上运行seatunnel

.bin/start-seatunnel-flink.sh --config config-path

# -p 2 指定flink job的并行度为2,还可以指定更多的参数,使用 flink run -h查看

.bin/start-seatunnel-flink.sh -p 2 --config config-path

在Yarn集群上运行seatunnel

.bin/start-seatunnel-flink.sh -m yarn-cluster --config config-path

# -ynm seatunnel 指定在yarn webUI显示的名称为seatunnel,还可以指定更多的参数,使用 flink run -h查看

.bin/start-seatunnel-flink.sh -m yarn-cluster -ynm seatunnel --config config-path


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回