searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

jchsinker-java去重写clickhouse

2023-07-26 07:49:27
63
0

前言

设计一种写入框架实现类似于logstash般可配置化消费kfk数据到异构数据源(clickhouse),同时能实现数据的去重。相当于是做了一个java去重版本的cksinker。其他整体流程不变

 

架构

 

系统流程设计

注意:

1,只做消费kafka写clickhouse功能。

2,process只做去重功能。

3,数据一致性只能保证,至少一次。

4,当前版本只实现了数据快照到本地文件的功能,当前服务只能支持单进程去重一致性,因此可能存在单节点问题,如果需要多节点多进程只能将Bloomfilter放到远程(redis:rebloom,其他的第三方存储考察来看暂时都不行),且要免快照化才能实现去重bloomfilter数据的一致性。

5,由于bloomfilter存在误判,在判断存在的情况下可能是错误的,但是判断不存在的情况下,肯定是不存在的,那这样可能会导致数据丢失,所以一定要控制好错误比例,如果后期出现确实大量的数据丢失,那只能改下bitmap位图的形式了,或者(Roaringbitmaps更好的压缩性能),不过要花费大量的内存成本,以及极大降低系统的稳定性。不过也可以考虑类似于logstash那样的死队列的思路。

 

配置参数说明

{
    "job": {
        "setting": {
            "channel": 10,
            "data.dir":"/ssd1/services/jchsinker",
            "keep.alive.timeout.mspoll.duration.ms":1000,
            "name":"jchsinker_demo1"
        },
        "content": {
            "inputs": [{
                "name": "kafkareader",
                "parameter": {
                    "bootstrap.servers": "host:6667",
                    "topic": "test-topic-in",
                    "codec": "json",
                    "auto.offset.reset": "earliest",
                    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "poll.interval.ms": "5000",
                    "max.poll.records":1000,
                    "group.id": "chpipeline_groupid_001",
                    "security.protocol": "SASL_PLAINTEXT",
                    "sasl.mechanism": "PLAIN",
                    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";",
                    "table": {
                        "name": "kfk_demo_view",
                        "column": [{
                                "alias": "beginTime",
                                "name": "BeginTime"
                            },
                            {
                                "alias": "endTime",
                                "name": "EndTime"
                            },
                            {
                                "alias": "chargingItem",
                                "name": "ChargingItem"
                            },
                            {
                                "alias": "chargingUnit",
                                "name": "ChargingUnit"
                            },
                            {
                                "alias": "chargingValue",
                                "name": "ChargingValue"
                            },
                            {
                                "alias": "inValue",
                                "name": "InValue"
                            },
                            {
                                "alias": "outValue",
                                "name": "OutValue"
                            },
                            {
                                "alias": "model",
                                "name": "Model"
                            },
                            {
                                "alias": "nodeCode",
                                "name": "NodeCode"
                            },
                            {
                                "alias": "recordType",
                                "name": "RecordType"
                            },
                            {
                                "alias": "resourceId",
                                "name": "ResourceId"
                            },
                            {
                                "alias": "resourceType",
                                "name": "ResourceType"
                            },
                            {
                                "alias": "userId",
                                "name": "UserId"
                            }
                        ]
                    }
                }
            }],
            "processors": [{
                "name": "memduplicate",
                "enable": true,
                "parameter": {
                    "duplicate.key": [
                        "resourceId",
                        "userId",
                        "chargingItem",
                        "beginTime"
                    ],
                    "engine": {
                        "name": "bloomfilter",
                        "time.column": "beginTime",
                        "time.data.type": "string_t",
                        "time.format":"yyyy-MM-dd HH:mm:ss",
                        "cardinality": 1000000,
                        "fpp": 0.01,
                        "instances": 200,
                        "cache.day": 1,
                        "snapshot.interval.ms":60
                    },
                    "data.source": {
                        "type": "file"
                    }
                }
            }],
            "outputs": [{
                "name": "clickhousewriter",
                "parameter": {
                    "batch.size": 10000,
                    "flush.interval": 3000,
                    "connection": {
                        "jdbc.url": "jdbc:clickhouse://ip:port/test_db",
                        "table": "test_table_all"
                    },
                    "retry.times": 3,
                    "username": "xxxx",
                    "password": "xxxx"
                }
            }],
            "metrics": [{
                "Prometheus": {
                    "host": "127.0.0.1",
                    "interval": "5 SECONDS",
                    "class": "cn.chinatelecom.jchsinker.metrics.promethues.PrometheusReporter",
                    "port": "29091"
                }
            }]
        }
    }
}

 

参数详解

参数
类型
必须?

默认值

描述
job.setting.channel int 5 并发线程数,注意:必须和kafka的topic分区数一致,否则会出现消费丢失获得线程空闲
job.setting.data.dir string /soft/jchsinker/ 数据缓存目录,比如bloomfilter快照文件,task状态文件register.json,如果是docker或者k8s部署,切记需要配置然后挂载到本机文件目录上,否则会出现进程重启或者奔溃后数据丢失或者去重失效。
job.setting.keep.alive.timeout.ms int 5000 单位ms,系统在关闭后,进程继续存活最大时间
job.setting.name string jchsinker 任务名
job.setting.max.queue.size int 50000 阻塞队列长度
job.content.inputs.name string   消费名,当前为kafka
job.content.inputs.parameter.bootstrap.servers string   kafka 服务器地址
job.content.inputs.parameter.topic string   kafka topic
job.content.inputs.parameter.auto.offset.reset string earliest groupid 第一次默认消费位置
job.content.inputs.parameter.value.deserializer string ....StringDeserializer  
job.content.inputs.parameter.key.deserializer string ....StringDeserializer  

job.content.inputs.parameter.poll.duration.ms

int
1000
kafka consumer两次poll的最大时间间隔
job.content.inputs.parameter.max.poll.records int
1000
kafka consumer每次消费最大条数
job.content.inputs.parameter.group.id string   kafka group id
job.content.inputs.parameter.security.protocol string SASL_PLAINTEXT  
job.content.inputs.parameter.sasl.mechanism string PLAIN  
job.content.inputs.parameter.sasl.jaas.config string  

org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";

kafka sasl鉴权验证

job.content.inputs.parameter.table.name string   kafka中的数据的虚拟表
job.content.inputs.parameter.table.column.alias string   kafka中的数据的虚拟表字段别名,注意:别名为输出到下游的字段名
job.content.inputs.parameter.table.column.name string   kafka中的数据的虚拟表字段名
job.content.processors.name string   数据处理器名
job.content.processors.enable boolean true 是否开启数据处理器
job.content.processors.parameter.duplicate.key list   去重的字段,可以选多个,使用逗号分割
job.content.processors.parameter.engine.name string bloomfilter 去重插件名,当前只支持本地 bloomfilter,后期支持redis
job.content.processors.parameter.engine.time.column string   bloomfilter时间缓存字段,在事件数据中取一个时间字段,去重bloomfilter因为不能一直增加,所以需要按事件中的一个时间字段来过期删除bloomfilter,bloomfilter占用内存大小计算工具:https://krisives.github.io/bloom-calculator/
job.content.processors.parameter.engine.time.data.type string string_t 时间字段类型,可以支持 string_t 字符串,比如:2021-11-02 17:13:32,int_t 10位时间戳,long_t 13位时间戳
job.content.processors.parameter.engine.time.format string   如果time.data.type为string_t,必须指定时间格式
job.content.processors.parameter.engine.cardinality long 1000000 bloomfilter基数 ,这个值越大,内存磁盘资源也就越大,bloomfilter占用内存大小计算工具:https://krisives.github.io/bloom-calculator/
job.content.processors.parameter.engine.fpp double 0.01 bloomfilter容错因子
job.content.processors.parameter.engine.instances int 100 bloomfilter缓存最大个数
job.content.processors.parameter.engine.cache.day int 1 缓存bloomfilter最大时间,单位天,这个数据越大,快照时间和服务重启和停止时间也就越长,注意:需要计算下instances之间的关系,bloomfilter为每小时缓存一个,一天24个
job.content.processors.parameter.engine.snapshot.interval.ms int 300000 bloomfilter定时快照时间,单位ms,单位ms,不能小于60000,否则强制为60000,为了保证服务重启,过滤器不丢失过滤数据
job.content.processors.parameter.data.source.type string file bloomfilter快照输出数据源,当前只支持文件,后期支持多节点的话,可以支持hdfs,redis 
job.content.outputs.name string   输出数据源,当前只支持clickhousewriter
job.content.outputs.parameter.batch.size int 1000 每批次写入clickhouse大小,数值越大,延时越大。
job.content.outputs.parameter.flush.interval.ms int 1000 每批次最大等待时间,单位ms,不能小于1000,否则强制为1000,数据越少在数据量不大的情况下,延时越低。注意需要平衡好batch.size和flush.interval的关系,平衡延时和性能,类似于kafka的批量操作
job.content.outputs.parameter.connection.jdbc.url string  

clickhouse集群的jdbc.url,例如jdbc:clickhouse://127.0.0.1:8123/test_db

注意只能用clickhouse的http端口

job.content.outputs.parameter.connection.table string   数据写入目标表,当前只支持写分布式表
job.content.outputs.parameter.retry.times int 3 写clickhouse,失败重试次数
job.content.outputs.parameter.username string   clickhouse访问账号
job.content.outputs.parameter.password string   clickhouse访问密码
job.content.metrics.Prometheus.host string   Prometheus数据上报IP
job.content.metrics.Prometheus.interval int   Prometheus数据收集时间间隔
job.content.metrics.Prometheus.class string   当前只能为cn.chinatelecom.jchsinker.metrics.promethues.PrometheusReporter使用pull方式
job.content.metrics.Prometheus.port int
9249
Prometheus pull方式对外服务端口

 

数据完整性保证

数据的完整性主要分量种exactly-once(精确一次语义)和at-least-once(至少一次语义)。

 

完整性消费架构

 

去重逻辑处理细节

 

消费快照文件

register.json

{ 
    ##记录成功写入到clickhouse数据库的offset
    "consumedOffset": {
        "demo1-in#0": 546983,
        "demo1-in#9": 543723,
        "demo1-in#6": 544547,
        "demo1-in#5": 544166,
        "demo1-in#8": 541308,
        "demo1-in#7": 545393,
        "demo1-in#2": 541287,
        "demo1-in#1": 545401,
        "demo1-in#4": 544586,
        "demo1-in#3": 543736
    },
   ##更新时间
    "updateTime": 1636106394830,
   ##队列中余留的数据长度
   "outputBatchQueueRemain": {
        "3#": 60,
        "2#": 69,
        "1#": 29,
        "0#": 32,
        "9#": 62,
        "8#": 22,
        "7#": 22,
        "6#": 64,
        "5#": 41,
        "4#": 29
    },
    ##去重任务去重后的最大offset
    "dumpHighWatermarkOffset": {
        "0#": 547014,
        "9#": 543784,
        "5#": 544206,
        "6#": 544610,
        "7#": 545414,
        "8#": 541329,
        "1#": 545429,
        "2#": 541355,
        "3#": 543795,
        "4#": 544614
    }
}

 

数据完整性监控

curl ip:port/metrics

输入数据量:TASK_pipeline_numRecordsIn

输出数据量:TASK_pipeline_numRecordsOut 

判断两个数据是否相等。

 

完整性保障流程

为保证整个流程数据的完整性

0,数据消费一个线程处理一个kafka 分区,kafka offset消费单调递增。

1,定时记录快照文件,包括bloomfilter文件信息,不同任务的状态信息register.json。

2,在进程奔溃的时候同样需要记录状态信息到本地。

3,在进程重启或者奔溃的情况下,需要按记录的状态文件(register.json)做replay。

0条评论
0 / 1000
wanghg11
13文章数
2粉丝数
wanghg11
13 文章 | 2 粉丝
原创

jchsinker-java去重写clickhouse

2023-07-26 07:49:27
63
0

前言

设计一种写入框架实现类似于logstash般可配置化消费kfk数据到异构数据源(clickhouse),同时能实现数据的去重。相当于是做了一个java去重版本的cksinker。其他整体流程不变

 

架构

 

系统流程设计

注意:

1,只做消费kafka写clickhouse功能。

2,process只做去重功能。

3,数据一致性只能保证,至少一次。

4,当前版本只实现了数据快照到本地文件的功能,当前服务只能支持单进程去重一致性,因此可能存在单节点问题,如果需要多节点多进程只能将Bloomfilter放到远程(redis:rebloom,其他的第三方存储考察来看暂时都不行),且要免快照化才能实现去重bloomfilter数据的一致性。

5,由于bloomfilter存在误判,在判断存在的情况下可能是错误的,但是判断不存在的情况下,肯定是不存在的,那这样可能会导致数据丢失,所以一定要控制好错误比例,如果后期出现确实大量的数据丢失,那只能改下bitmap位图的形式了,或者(Roaringbitmaps更好的压缩性能),不过要花费大量的内存成本,以及极大降低系统的稳定性。不过也可以考虑类似于logstash那样的死队列的思路。

 

配置参数说明

{
    "job": {
        "setting": {
            "channel": 10,
            "data.dir":"/ssd1/services/jchsinker",
            "keep.alive.timeout.mspoll.duration.ms":1000,
            "name":"jchsinker_demo1"
        },
        "content": {
            "inputs": [{
                "name": "kafkareader",
                "parameter": {
                    "bootstrap.servers": "host:6667",
                    "topic": "test-topic-in",
                    "codec": "json",
                    "auto.offset.reset": "earliest",
                    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "poll.interval.ms": "5000",
                    "max.poll.records":1000,
                    "group.id": "chpipeline_groupid_001",
                    "security.protocol": "SASL_PLAINTEXT",
                    "sasl.mechanism": "PLAIN",
                    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";",
                    "table": {
                        "name": "kfk_demo_view",
                        "column": [{
                                "alias": "beginTime",
                                "name": "BeginTime"
                            },
                            {
                                "alias": "endTime",
                                "name": "EndTime"
                            },
                            {
                                "alias": "chargingItem",
                                "name": "ChargingItem"
                            },
                            {
                                "alias": "chargingUnit",
                                "name": "ChargingUnit"
                            },
                            {
                                "alias": "chargingValue",
                                "name": "ChargingValue"
                            },
                            {
                                "alias": "inValue",
                                "name": "InValue"
                            },
                            {
                                "alias": "outValue",
                                "name": "OutValue"
                            },
                            {
                                "alias": "model",
                                "name": "Model"
                            },
                            {
                                "alias": "nodeCode",
                                "name": "NodeCode"
                            },
                            {
                                "alias": "recordType",
                                "name": "RecordType"
                            },
                            {
                                "alias": "resourceId",
                                "name": "ResourceId"
                            },
                            {
                                "alias": "resourceType",
                                "name": "ResourceType"
                            },
                            {
                                "alias": "userId",
                                "name": "UserId"
                            }
                        ]
                    }
                }
            }],
            "processors": [{
                "name": "memduplicate",
                "enable": true,
                "parameter": {
                    "duplicate.key": [
                        "resourceId",
                        "userId",
                        "chargingItem",
                        "beginTime"
                    ],
                    "engine": {
                        "name": "bloomfilter",
                        "time.column": "beginTime",
                        "time.data.type": "string_t",
                        "time.format":"yyyy-MM-dd HH:mm:ss",
                        "cardinality": 1000000,
                        "fpp": 0.01,
                        "instances": 200,
                        "cache.day": 1,
                        "snapshot.interval.ms":60
                    },
                    "data.source": {
                        "type": "file"
                    }
                }
            }],
            "outputs": [{
                "name": "clickhousewriter",
                "parameter": {
                    "batch.size": 10000,
                    "flush.interval": 3000,
                    "connection": {
                        "jdbc.url": "jdbc:clickhouse://ip:port/test_db",
                        "table": "test_table_all"
                    },
                    "retry.times": 3,
                    "username": "xxxx",
                    "password": "xxxx"
                }
            }],
            "metrics": [{
                "Prometheus": {
                    "host": "127.0.0.1",
                    "interval": "5 SECONDS",
                    "class": "cn.chinatelecom.jchsinker.metrics.promethues.PrometheusReporter",
                    "port": "29091"
                }
            }]
        }
    }
}

 

参数详解

参数
类型
必须?

默认值

描述
job.setting.channel int 5 并发线程数,注意:必须和kafka的topic分区数一致,否则会出现消费丢失获得线程空闲
job.setting.data.dir string /soft/jchsinker/ 数据缓存目录,比如bloomfilter快照文件,task状态文件register.json,如果是docker或者k8s部署,切记需要配置然后挂载到本机文件目录上,否则会出现进程重启或者奔溃后数据丢失或者去重失效。
job.setting.keep.alive.timeout.ms int 5000 单位ms,系统在关闭后,进程继续存活最大时间
job.setting.name string jchsinker 任务名
job.setting.max.queue.size int 50000 阻塞队列长度
job.content.inputs.name string   消费名,当前为kafka
job.content.inputs.parameter.bootstrap.servers string   kafka 服务器地址
job.content.inputs.parameter.topic string   kafka topic
job.content.inputs.parameter.auto.offset.reset string earliest groupid 第一次默认消费位置
job.content.inputs.parameter.value.deserializer string ....StringDeserializer  
job.content.inputs.parameter.key.deserializer string ....StringDeserializer  

job.content.inputs.parameter.poll.duration.ms

int
1000
kafka consumer两次poll的最大时间间隔
job.content.inputs.parameter.max.poll.records int
1000
kafka consumer每次消费最大条数
job.content.inputs.parameter.group.id string   kafka group id
job.content.inputs.parameter.security.protocol string SASL_PLAINTEXT  
job.content.inputs.parameter.sasl.mechanism string PLAIN  
job.content.inputs.parameter.sasl.jaas.config string  

org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxx\" password=\"xxxx\";

kafka sasl鉴权验证

job.content.inputs.parameter.table.name string   kafka中的数据的虚拟表
job.content.inputs.parameter.table.column.alias string   kafka中的数据的虚拟表字段别名,注意:别名为输出到下游的字段名
job.content.inputs.parameter.table.column.name string   kafka中的数据的虚拟表字段名
job.content.processors.name string   数据处理器名
job.content.processors.enable boolean true 是否开启数据处理器
job.content.processors.parameter.duplicate.key list   去重的字段,可以选多个,使用逗号分割
job.content.processors.parameter.engine.name string bloomfilter 去重插件名,当前只支持本地 bloomfilter,后期支持redis
job.content.processors.parameter.engine.time.column string   bloomfilter时间缓存字段,在事件数据中取一个时间字段,去重bloomfilter因为不能一直增加,所以需要按事件中的一个时间字段来过期删除bloomfilter,bloomfilter占用内存大小计算工具:https://krisives.github.io/bloom-calculator/
job.content.processors.parameter.engine.time.data.type string string_t 时间字段类型,可以支持 string_t 字符串,比如:2021-11-02 17:13:32,int_t 10位时间戳,long_t 13位时间戳
job.content.processors.parameter.engine.time.format string   如果time.data.type为string_t,必须指定时间格式
job.content.processors.parameter.engine.cardinality long 1000000 bloomfilter基数 ,这个值越大,内存磁盘资源也就越大,bloomfilter占用内存大小计算工具:https://krisives.github.io/bloom-calculator/
job.content.processors.parameter.engine.fpp double 0.01 bloomfilter容错因子
job.content.processors.parameter.engine.instances int 100 bloomfilter缓存最大个数
job.content.processors.parameter.engine.cache.day int 1 缓存bloomfilter最大时间,单位天,这个数据越大,快照时间和服务重启和停止时间也就越长,注意:需要计算下instances之间的关系,bloomfilter为每小时缓存一个,一天24个
job.content.processors.parameter.engine.snapshot.interval.ms int 300000 bloomfilter定时快照时间,单位ms,单位ms,不能小于60000,否则强制为60000,为了保证服务重启,过滤器不丢失过滤数据
job.content.processors.parameter.data.source.type string file bloomfilter快照输出数据源,当前只支持文件,后期支持多节点的话,可以支持hdfs,redis 
job.content.outputs.name string   输出数据源,当前只支持clickhousewriter
job.content.outputs.parameter.batch.size int 1000 每批次写入clickhouse大小,数值越大,延时越大。
job.content.outputs.parameter.flush.interval.ms int 1000 每批次最大等待时间,单位ms,不能小于1000,否则强制为1000,数据越少在数据量不大的情况下,延时越低。注意需要平衡好batch.size和flush.interval的关系,平衡延时和性能,类似于kafka的批量操作
job.content.outputs.parameter.connection.jdbc.url string  

clickhouse集群的jdbc.url,例如jdbc:clickhouse://127.0.0.1:8123/test_db

注意只能用clickhouse的http端口

job.content.outputs.parameter.connection.table string   数据写入目标表,当前只支持写分布式表
job.content.outputs.parameter.retry.times int 3 写clickhouse,失败重试次数
job.content.outputs.parameter.username string   clickhouse访问账号
job.content.outputs.parameter.password string   clickhouse访问密码
job.content.metrics.Prometheus.host string   Prometheus数据上报IP
job.content.metrics.Prometheus.interval int   Prometheus数据收集时间间隔
job.content.metrics.Prometheus.class string   当前只能为cn.chinatelecom.jchsinker.metrics.promethues.PrometheusReporter使用pull方式
job.content.metrics.Prometheus.port int
9249
Prometheus pull方式对外服务端口

 

数据完整性保证

数据的完整性主要分量种exactly-once(精确一次语义)和at-least-once(至少一次语义)。

 

完整性消费架构

 

去重逻辑处理细节

 

消费快照文件

register.json

{ 
    ##记录成功写入到clickhouse数据库的offset
    "consumedOffset": {
        "demo1-in#0": 546983,
        "demo1-in#9": 543723,
        "demo1-in#6": 544547,
        "demo1-in#5": 544166,
        "demo1-in#8": 541308,
        "demo1-in#7": 545393,
        "demo1-in#2": 541287,
        "demo1-in#1": 545401,
        "demo1-in#4": 544586,
        "demo1-in#3": 543736
    },
   ##更新时间
    "updateTime": 1636106394830,
   ##队列中余留的数据长度
   "outputBatchQueueRemain": {
        "3#": 60,
        "2#": 69,
        "1#": 29,
        "0#": 32,
        "9#": 62,
        "8#": 22,
        "7#": 22,
        "6#": 64,
        "5#": 41,
        "4#": 29
    },
    ##去重任务去重后的最大offset
    "dumpHighWatermarkOffset": {
        "0#": 547014,
        "9#": 543784,
        "5#": 544206,
        "6#": 544610,
        "7#": 545414,
        "8#": 541329,
        "1#": 545429,
        "2#": 541355,
        "3#": 543795,
        "4#": 544614
    }
}

 

数据完整性监控

curl ip:port/metrics

输入数据量:TASK_pipeline_numRecordsIn

输出数据量:TASK_pipeline_numRecordsOut 

判断两个数据是否相等。

 

完整性保障流程

为保证整个流程数据的完整性

0,数据消费一个线程处理一个kafka 分区,kafka offset消费单调递增。

1,定时记录快照文件,包括bloomfilter文件信息,不同任务的状态信息register.json。

2,在进程奔溃的时候同样需要记录状态信息到本地。

3,在进程重启或者奔溃的情况下,需要按记录的状态文件(register.json)做replay。

文章来自个人专栏
日志服务
11 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0