seatunnel如何使用Spark快速将数据写入Elasticsearch elasticsearch ETL spark



---Kafka to Elasticsearch

这里我们以最常见的Kakfa作为输入源为例

-Log Sample

原始日志格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

-Elasticsearch Document

我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:

domain String
hostname String
status int
datetime String
count int


---Running seatunnel

spark {
  spark.app.name = "seatunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.streaming.batchDuration = 5
}
input {
    kafkaStream {
        topics = "seatunnel-es"
        consumer.bootstrap.servers = "localhost:9092"
        consumer.group.id = "seatunnel_es_group"
        consumer.rebalance.max.retries = 100
    }
}
filter {
    # 使用正则解析原始日志
    # 最开始数据都在raw_message字段中
    grok {
        source_field = "raw_message"
        pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
   }
    # 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
    # Elasticsearch中支持的格式
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
    }
    ## 利用SQL对数据进行聚合
    sql {
        table_name = "access_log"
        sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, hostname, status, datetime"
    }
 }
output {
    elasticsearch {
        hosts = ["localhost:9200"]
        index = "seatunnel-${now}"
        es.batch.size.entries = 100000
        index_time_format = "yyyy.MM.dd"
    }
}


签名:这个人很懒,什么也没有留下!
最新回复 (0)
返回