1)底层支持重连服务器
%3|1600150057.893|ERROR|rdkafka#consumer-1| [thrd:192.168.58.95:9092/bootstrap]: 192.168.58.95:9092/0: Connect to ipv4#192.168.58.95:9092 failed: 由于目标计算机积极拒绝,无法连接
当服务器恢复正常,能够正常重连
2)必须指定一个groupid,可以是任意的一串字符串
/* Consumer groups require a group id */
if (!group)
group = "rdkafka_consumer_example";
topic到group质检是发布订阅的通信方式,即一条topic会被所有的group消费,属于一对多模式;group到consumer是点对点通信方式,属于一对一模式。
举例:
不使用group的话,启动10个consumer消费一个topic,这10个consumer都能得到topic的所有数据,相当于这个topic中的任一条消息被消费10次。
使用group的话,连接时带上groupid,topic的消息会分发到10个consumer上,每条消息只被消费1次。
3)rd_kafka_consumer_poll会返回超时信息或者发送数据
/** Reached the end of the topic+partition queue on
* the broker. Not really an error. */
RD_KAFKA_RESP_ERR__PARTITION_EOF = -191,//目前没有数据
RD_KAFKA_RESP_ERR__TIMED_OUT = -185,//操作超时
4)出现问题:Local: Message timed out 怀疑是发送的报文较长(图片的base64编码字符) 最终定位到是服务器连接不上的问题
1)底层支持重连服务器
%3|1600150057.893|ERROR|rdkafka#consumer-1| [thrd:192.168.58.95:9092/bootstrap]: 192.168.58.95:9092/0: Connect to ipv4#192.168.58.95:9092 failed: 由于目标计算机积极拒绝,无法连接
当服务器恢复正常,能够正常重连
2)必须指定一个groupid,可以是任意的一串字符串
/* Consumer groups require a group id */
if (!group)
group = "rdkafka_consumer_example";
topic到group质检是发布订阅的通信方式,即一条topic会被所有的group消费,属于一对多模式;group到consumer是点对点通信方式,属于一对一模式。
举例:
不使用group的话,启动10个consumer消费一个topic,这10个consumer都能得到topic的所有数据,相当于这个topic中的任一条消息被消费10次。
使用group的话,连接时带上groupid,topic的消息会分发到10个consumer上,每条消息只被消费1次。
3)rd_kafka_consumer_poll会返回超时信息或者发送数据
-191是超时信息
4)auto.offset.reset该属性定义了消费者从主题的哪个时刻开始消费
smallest:消费最早的消息
largest:从消费者连上主题这一刻时间开始消费,之前的消息不会消费
注意:Java版本中的这个字段的属性值分别是earliest和latest
5)kafka服务器默认支持报文长度是10k,如果发送大图片会出错
https://www.cnblogs.com/moonandstar08/p/6557248.html
https://blog.csdn.net/guoyuqi0554/article/details/48630907
https://kafka.apache.org/documentation/#producerconfigs
https://www.jianshu.com/p/40d812354d6c
发送图片缓存,返回错误Broker: Message size too large
对应错误的返回码:RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE
详细例子可以参考:
使用librdkafka库实现kafka的生产和消费实例--生产者