ClickHouse为了方便与Kafka集成,提供了一个名为Kafka引擎的专用表引擎。Kafka引擎允许你在ClickHouse中创建一个表,这个表的数据源来自于一个或多个Kafka队列。结合使用Kafka引擎和Materialized Views,可以实现将数据从Kafka队列消费,然后将数据存储到其他引擎的表中,从而实现实时数据处理和查询。
1.集成示例
要创建一个Kafka引擎的表,你需要提供以下几个关键参数:
kafka_broker_list:Kafka代理地址列表,用逗号分隔的字符串。
kafka_topic:要订阅的Kafka主题。
kafka_group_name:消费者组名称,用于标识ClickHouse实例所属的消费者组。
kafka_format:消息格式,用于指定如何将Kafka中的消息解析成表的行,例如JSONEachRow等。
创建一个Kafka引擎的表的示例
CREATE TABLE kafka_table
(
column1 String,
column2 UInt64,
column3 Float64
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092,kafka2:9092',
kafka_topic = 'kafka_topic_name',
kafka_group_name = 'clickhouse_group',
kafka_format = 'JSONEachRow';
为了将数据从Kafka表消费并存储到其他表引擎(例如MergeTree)的表中,你可以创建一个Materialized View
CREATE MATERIALIZED VIEW mv_kafka_to_storage
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(column2)
ORDER BY (column1, column2)
AS SELECT
column1,
column2,
column3
FROM kafka_table;
使用Kafka引擎和Materialized View,你可以在ClickHouse中实现实时数据消费、处理和查询,从而大大提高数据处理的效率。
其总体步骤可以归纳为如下:
1.创建CK跟kafka的联合表,也就是创建kafka引擎表,这一步就好比在CK跟kafka之间打开了一个数据连接的通道,但注意,此时kafka的数据并没有引入到CK中;
2. 配置分片表的集群,用来确定我们的分片表数据存储在哪些机器上,以及每个副本的分布情况。我们知道,CK的玩法跟普通的分布式数据库不一样,普通数据库创建分片表时,最多只需要指定分片数量和冗余数量(比如Elasticsearch),有些甚至啥都不用指定(比如基于HDFS的数据库);
3. 创建基于以上分片配置的分片表,确定分片表的字段,基于哪个本地表而创建,以及分片规则(数据拆分规则);
4. 创建用于存储实际分片表数据的本地表(这一步可以跟第3步位置互换),确定本地表字段,以及本地表的引擎;
5.创建基于分片表的物化视图,将第1步创建的kafka引擎表,跟第3步创建的分片表,建立起数据流通的管道,这样,就把kafka的数据给源源不断给接入到分片表中了。
# clickhouse 创建 kafka 引擎表
CREATE TABLE default.kafka_clickhouse_inner_log ON CLUSTER clickhouse_cluster (
log_uuid String ,
date_partition UInt32 ,
event_name String ,
activity_name String ,
activity_type String ,
activity_id UInt16
) ENGINE = Kafka SETTINGS
kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092',
kafka_topic_list = 'data_clickhouse',
kafka_group_name = 'clickhouse_xxx',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
# clikhouse 客户端无法查询 kafka 引擎表
clickhouse-client --stream_like_engine_allow_direct_select 1 --password xxxxx
# clickhouse 创建集群表
创建分布式表(根据 log_uuid 对数据进行分发,相同的 log_uuid 会发送到同一个 shard 分片上,用于后续合并时的数据去重)
CREATE TABLE default.bi_inner_log_all ON CLUSTER clickhouse_cluster AS default.bi_inner_log_local
ENGINE = Distributed(clickhouse_cluster, default, bi_inner_log_local, xxHash32(log_uuid));
# clickhouse 创建物化视图
创建物化视图,把 Kafka 消费表消费的数据同步到 ClickHouse 分布式表
CREATE MATERIALIZED VIEW default.view_bi_inner_log ON CLUSTER clickhouse_cluster TO default.bi_inner_log_all AS
SELECT
log_uuid ,
date_partition ,
event_name ,
activity_name ,
credits_bring ,
activity_type ,
activity_id
FROM default.kafka_clickhouse_inner_log;
# FileBeat 配置文件说明,坑点 1(需设置 keys_under_root: true)
/etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /root/logs/xxx/inner/*.log
json:
如果不设置该索性,所有的数据都存储在message里面,这样设置以后数据会平铺。
keys_under_root: true
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: 'xxx_data_clickhouse'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
processors:
剔除filebeat 无效的字段数据
- drop_fields:
fields: ["input", "agent", "ecs", "log", "metadata", "timestamp"]
ignore_missing: false
nohup ./filebeat -e -c /etc/filebeat/filebeat.yml > /user/filebeat/filebeat.log &
输出到filebeat.log文件中,方便排查