seatunnel可以基于Spark,在 TiSpark 的基础上快速实现 TiDB 数据读取和 OLAP 分析
---使用seatunnel操作TiDB
在我们线上有这么一个需求,从 TiDB 中读取某一天的网站访问数据,统计每个域名以及服务返回状态码的访问次数,最后将统计结果写入 TiDB 另外一个表中。 我们来看看seatunnel是如何实现这么一个功能的。
-Input(存储访问日志的表)
CREATE TABLE access_log (
domain VARCHAR(255),
datetime VARCHAR(63),
remote_addr VARCHAR(63),
http_ver VARCHAR(15),
body_bytes_send INT,
status INT,
request_time FLOAT,
url TEXT
)
+-----------------+--------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+--------------+------+------+---------+-------+
| domain | varchar(255) | YES | | NULL | |
| datetime | varchar(63) | YES | | NULL | |
| remote_addr | varchar(63) | YES | | NULL | |
| http_ver | varchar(15) | YES | | NULL | |
| body_bytes_send | int(11) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| request_time | float | YES | | NULL | |
| url | text | YES | | NULL | |
+-----------------+--------------+------+------+---------+-------+
-Output(存储结果数据的表)
CREATE TABLE access_collect (
date VARCHAR(23),
domain VARCHAR(63),
status INT,
hit INT
)
+--------+-------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+------+---------+-------+
| date | varchar(23) | YES | | NULL | |
| domain | varchar(63) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| hit | int(11) | YES | | NULL | |
+--------+-------------+------+------+---------+-------+
---运行 seatunnel
我们将上述四部分配置组合成我们最终的配置文件conf/tidb.conf
spark {
spark.app.name = "seatunnel-tidb"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
# Set for TiSpark
spark.tispark.pd.addresses = "localhost:2379"
spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
input {
tidb {
database = "nginx"
pre_sql = "select * from nginx.access_log"
table_name = "spark_table"
}
}
filter {
sql {
table_name = "spark_nginx_log"
sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
}
}
output {
tidb {
url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
table = "access_collect"
user = "username"
password = "password"
save_mode = "append"
}
}---执行命令,指定配置文件,运行 seatunnel ,即可实现我们的数据处理逻辑。
./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode client --master 'local[2]'
./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode client --master yarn
./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode cluster -master yarn
如果是本机测试验证逻辑,用本地模式(Local)就可以了,
一般生产环境下,都是使用yarn-client或者yarn-cluster模式。
---检查结果
mysql> select * from access_collect;
+------------+--------+--------+------+
| date | domain | status | hit |
+------------+--------+--------+------+
| 2019-01-20 | b.com | 200 | 63 |
| 2019-01-20 | a.com | 200 | 85 |
+------------+--------+--------+------+
2 rows in set (0.21 sec)