SeaTunnel: MySQL实时同步到es elasticsearch ETL mysql



MySQL 实时同步,需开启 binlog

https://debezium.io/documentation/reference/1.6/connectors/mysql.html#setting-up-mysql

---安装 connectors 插件

执行 bash bin/install-plugin.sh,国内建议先配置 maven 镜像,不然容易失败 或者 慢

官方文档写着执行 sh bin/install-plugin.sh

---编写配置文件

config 目录下,新建配置文件:如 mysql-es-test.conf

因为是 实时同步,这里 job.mode = “STREAMING”,execution.parallelism 是 并发

vim  config/v2.batch.config.template

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}
source {
    FakeSource {
      result_table_name = "fake"
      row.num = 16
      schema = {
        fields {
          name = "string"
          age = "int"
        }
      }
    }
}
sink {
  Console {}
}

vim  config/mysql-es-test.conf

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

---添加 数据源 配置

https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/MySQL-CDC#options

result_table_name 取个 临时表名,便于后续使用。

table-names 必须是 数据库.表名,base-url 必须指定 数据库。

startup.mode 默认是 INITIAL,先同步历史数据,后增量同步

source {
  MySQL-CDC {
    result_table_name = "t1"
    server-id = 5656
    username = "root"
    password = "pwd"
    table-names = ["db.t1"]
    base-url = "jdbc:mysql://host:3306/db"
  }
}

添加 转换 配置,sql 比较灵活。

https://seatunnel.apache.org/docs/2.3.3/transform-v2/sql#options

函数列表请点击

https://seatunnel.apache.org/docs/2.3.3/transform-v2/sql-functions

transform {
  Sql {
    source_table_name = "t1"
    query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
  }
}

---添加 输出 配置

https://seatunnel.apache.org/docs/2.3.3/connector-v2/sink/Elasticsearch#options

CDC 实时同步 es,必须配置 primary_keys

sink {
    Elasticsearch {
        hosts = ["host:9200"]
        username = "elastic"
        password = "pwd"
        index = "index_t1"
        # cdc required options
        primary_keys = ["id"]
    }
}

---最终配置

vim  config/mysql-es-test.conf

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}
source {
  MySQL-CDC {
    result_table_name = "t1"
    server-id = 5656
    username = "root"
    password = "pwd"
    table-names = ["db.t1"]
    base-url = "jdbc:mysql://host:3306/db"
  }
}
transform {
  Sql {
    source_table_name = "t1"
    query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
  }
}
sink {
    Elasticsearch {
        hosts = ["host:9200"]
        username = "elastic"
        password = "pwd"
        index = "index_t1"
        # cdc required options
        primary_keys = ["id"]
    }
}

---启动任务

 这里以 本地模式为例,另有  集群、spark、flink 模式。

./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回