针对存储在Hive中的结构化数据,我们需要读取Hive表中的数据并筛选出我们关心的字段,或者对字段进行转换,最后将对应的字段写入ClickHouse的表中。
---Hive Schema
我们在Hive中存储的数据表结构如下,存储的是很常见的Nginx日志
CREATE TABLE `nginx_msg_detail`(
`hostname` string,
`domain` string,
`remote_addr` string,
`request_time` float,
`datetime` string,
`url` string,
`status` int,
`data_size` int,
`referer` string,
`cookie_info` string,
`user_agent` string,
`minute` string)
PARTITIONED BY (
`date` string,
`hour` string)
---ClickHouse Schema
我们的ClickHouse建表语句如下,我们的表按日进行分区
CREATE TABLE cms.cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32
) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostname) SETTINGS index_granularity = 16384
---seatunnel Pipeline
我们仅需要编写一个seatunnel Pipeline的配置文件即可完成数据的导入。
配置文件包括四个部分,分别是Spark、Input、filter和Output。
--Spark
这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
spark {
// 这个配置必需填写
spark.sql.catalogImplementation = "hive"
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}--Input
这一部分定义数据源,如下是从Hive文件中读取text格式数据的配置案例。
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}看,很简单的一个配置就可以从Hive中读取数据了。其中pre_sql是从Hive中读取数据SQL,table_name是将读取后的数据,注册成为Spark中临时表的表名,可为任意字段。
需要注意的是,必须保证hive的metastore是在服务状态。
在Cluster、Client、Local模式下运行时,必须把hive-site.xml文件置于提交任务节点的$HADOOP_CONF目录下
--Filter
在Filter部分,这里我们配置一系列的转化,我们这里把不需要的minute和hour字段丢弃。当然我们也可以在读取Hive的时候通过pre_sql不读取这些字段
filter {
remove {
source_field = ["minute", "hour"]
}
}--Output
最后我们将处理好的结构化数据写入ClickHouse
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "nginx_log"
fields = ["date", "datetime", "hostname", "url", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}