替代ELK:ClickHouse+Kafka+FlieBeat filebeat ETL clickhouse kakfa



ClickHouse为了方便与Kafka集成,提供了一个名为Kafka引擎的专用表引擎。Kafka引擎允许你在ClickHouse中创建一个表,这个表的数据源来自于一个或多个Kafka队列。结合使用Kafka引擎和Materialized Views,可以实现将数据从Kafka队列消费,然后将数据存储到其他引擎的表中,从而实现实时数据处理和查询。


1.集成示例

要创建一个Kafka引擎的表,你需要提供以下几个关键参数:

kafka_broker_list:Kafka代理地址列表,用逗号分隔的字符串。

kafka_topic:要订阅的Kafka主题。

kafka_group_name:消费者组名称,用于标识ClickHouse实例所属的消费者组。

kafka_format:消息格式,用于指定如何将Kafka中的消息解析成表的行,例如JSONEachRow等。


创建一个Kafka引擎的表的示例

CREATE TABLE kafka_table
(
    column1 String,
    column2 UInt64,
    column3 Float64
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka1:9092,kafka2:9092',
    kafka_topic = 'kafka_topic_name',
    kafka_group_name = 'clickhouse_group',
    kafka_format = 'JSONEachRow';

为了将数据从Kafka表消费并存储到其他表引擎(例如MergeTree)的表中,你可以创建一个Materialized View

CREATE MATERIALIZED VIEW mv_kafka_to_storage
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(column2)
ORDER BY (column1, column2)
AS SELECT
    column1,
    column2,
    column3
FROM kafka_table;

使用Kafka引擎和Materialized View,你可以在ClickHouse中实现实时数据消费、处理和查询,从而大大提高数据处理的效率。


其总体步骤可以归纳为如下:

1.创建CK跟kafka的联合表,也就是创建kafka引擎表,这一步就好比在CK跟kafka之间打开了一个数据连接的通道,但注意,此时kafka的数据并没有引入到CK中;

2. 配置分片表的集群,用来确定我们的分片表数据存储在哪些机器上,以及每个副本的分布情况。我们知道,CK的玩法跟普通的分布式数据库不一样,普通数据库创建分片表时,最多只需要指定分片数量和冗余数量(比如Elasticsearch),有些甚至啥都不用指定(比如基于HDFS的数据库);

3. 创建基于以上分片配置的分片表,确定分片表的字段,基于哪个本地表而创建,以及分片规则(数据拆分规则);

4. 创建用于存储实际分片表数据的本地表(这一步可以跟第3步位置互换),确定本地表字段,以及本地表的引擎;

5.创建基于分片表的物化视图,将第1步创建的kafka引擎表,跟第3步创建的分片表,建立起数据流通的管道,这样,就把kafka的数据给源源不断给接入到分片表中了。


# clickhouse 创建 kafka 引擎表

CREATE TABLE default.kafka_clickhouse_inner_log ON CLUSTER clickhouse_cluster (
    log_uuid   String ,
    date_partition   UInt32 ,
    event_name   String ,
    activity_name   String ,
    activity_type   String ,
    activity_id   UInt16 
) ENGINE = Kafka SETTINGS
    kafka_broker_list = 'kafka1:9092,kafka2:9092,kafka3:9092',
    kafka_topic_list = 'data_clickhouse',
    kafka_group_name = 'clickhouse_xxx',
    kafka_format = 'JSONEachRow',
    kafka_row_delimiter = '\n',


# clikhouse 客户端无法查询 kafka 引擎表

clickhouse-client --stream_like_engine_allow_direct_select 1 --password xxxxx


# clickhouse 创建集群表

创建分布式表(根据 log_uuid 对数据进行分发,相同的 log_uuid 会发送到同一个 shard 分片上,用于后续合并时的数据去重)

CREATE TABLE default.bi_inner_log_all ON CLUSTER clickhouse_cluster AS default.bi_inner_log_local
ENGINE = Distributed(clickhouse_cluster, default, bi_inner_log_local, xxHash32(log_uuid));


# clickhouse 创建物化视图

创建物化视图,把 Kafka 消费表消费的数据同步到 ClickHouse 分布式表

CREATE MATERIALIZED VIEW default.view_bi_inner_log ON CLUSTER clickhouse_cluster TO default.bi_inner_log_all AS 
SELECT 
    log_uuid ,
date_partition ,
event_name ,
activity_name ,
credits_bring ,
activity_type ,
activity_id 
FROM default.kafka_clickhouse_inner_log;


# FileBeat 配置文件说明,坑点 1(需设置 keys_under_root: true)

/etc/filebeat/filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /root/logs/xxx/inner/*.log
  json:  
如果不设置该索性,所有的数据都存储在message里面,这样设置以后数据会平铺。
       keys_under_root: true 
output.kafka:
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
  topic: 'xxx_data_clickhouse'
  partition.round_robin:
            reachable_only: false
            required_acks: 1
            compression: gzip
processors: 
剔除filebeat 无效的字段数据
    - drop_fields:  
        fields: ["input", "agent", "ecs", "log", "metadata", "timestamp"]
        ignore_missing: false

nohup ./filebeat -e -c /etc/filebeat/filebeat.yml > /user/filebeat/filebeat.log & 

输出到filebeat.log文件中,方便排查



签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回