环境准备
- 下载Demo包kafka-confluent-go-demo.zip。
- 使用开发工具导入Demo。
配置修改
- 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。
keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12 openssl pkcs12 -in caRoot.p12 -out caRoot.pem
- 修改kafka.json文件。(security.protocol仅在ssl连接时需要配置)
{ "topic": "XXX", "topic2": "XXX", "group.id": "XXX", "bootstrap.servers" : "XXX:XX", "security.protocol" : "SSL" }
生产消息
发送以下命令发送消息。
go run -mod=vendor producer/producer.go
生产消息示例代码如下:
package main
import (
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"os"
"path/filepath"
"strconv"
"time"
)
const (
INT32_MAX = 2147483647 - 1000
)
type KafkaConfig struct {
Topic string `json:"topic"`
Topic2 string `json:"topic2"`
GroupId string `json:"group.id"`
BootstrapServers string `json:"bootstrap.servers"`
SecurityProtocol string `json:"security.protocol"`
SslCaLocation string `json:"ssl.ca.location"`
}
// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
workPath, err := os.Getwd()
if err != nil {
panic(err)
}
configPath := filepath.Join(workPath, "conf")
fullPath := filepath.Join(configPath, "kafka.json")
file, err := os.Open(fullPath);
if (err != nil) {
msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
panic(msg)
}
defer file.Close()
decoder := json.NewDecoder(file)
var config = &KafkaConfig{}
err = decoder.Decode(config);
if (err != nil) {
msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
panic(msg)
}
json.Marshal(config)
return config
}
func doInitProducer(cfg *KafkaConfig) *kafka.Producer {
fmt.Print("init kafka producer, it may take a few seconds to init the connection\n")
//common arguments
var kafkaconf = &kafka.ConfigMap{
"api.version.request": "true",
"message.max.bytes": 1000000,
"linger.ms": 500,
"sticky.partitioning.linger.ms" : 1000,
"retries": INT32_MAX,
"retry.backoff.ms": 1000,
"acks": "1"}
kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers)
switch cfg.SecurityProtocol {
case "PLAINTEXT" :
kafkaconf.SetKey("security.protocol", "plaintext");
case "SSL":
kafkaconf.SetKey("security.protocol", "ssl");
kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem")
case "SASL_SSL":
kafkaconf.SetKey("security.protocol", "sasl_ssl");
kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism);
kafkaconf.SetKey("enable.ssl.certificate.verification", "false")
case "SASL_PLAINTEXT":
kafkaconf.SetKey("security.protocol", "sasl_plaintext");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)
default:
panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
}
producer, err := kafka.NewProducer(kafkaconf)
if err != nil {
panic(err)
}
fmt.Print("init kafka producer success\n")
return producer
}
func main() {
// Choose the correct protocol
cfg := loadJsonConfig();
producer := doInitProducer(cfg)
defer producer.Close()
// Delivery report handler for produced messages
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Failed to write access log entry:%v", ev.TopicPartition.Error)
} else {
log.Printf("Send OK topic:%v partition:%v offset:%v content:%s\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset, ev.Value)
}
}
}
}()
// Produce messages to topic (asynchronously)
i := 0
for {
i = i + 1
value := "this is a kafka message from confluent go " + strconv.Itoa(i)
var msg *kafka.Message = nil
if i % 2 == 0 {
msg = &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &cfg.Topic2, Partition: kafka.PartitionAny},
Value: []byte(value),
}
} else {
msg = &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &cfg.Topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}
}
producer.Produce(msg, nil)
time.Sleep(time.Duration(1) * time.Millisecond)
}
// Wait for message deliveries before shutting down
producer.Flush(15 * 1000)
}
消费消息
发送以下命令消费消息。
go run -mod=vendor consumer/consumer.go
消费消息示例代码如下:
package main
import (
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"path/filepath"
)
type KafkaConfig struct {
Topic string `json:"topic"`
Topic2 string `json:"topic2"`
GroupId string `json:"group.id"`
BootstrapServers string `json:"bootstrap.servers"`
SecurityProtocol string `json:"security.protocol"`
}
// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
workPath, err := os.Getwd()
if err != nil {
panic(err)
}
configPath := filepath.Join(workPath, "conf")
fullPath := filepath.Join(configPath, "kafka.json")
file, err := os.Open(fullPath);
if (err != nil) {
msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
panic(msg)
}
defer file.Close()
decoder := json.NewDecoder(file)
var config = &KafkaConfig{}
err = decoder.Decode(config);
if (err != nil) {
msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err)
panic(msg)
}
json.Marshal(config)
return config
}
func doInitConsumer(cfg *KafkaConfig) *kafka.Consumer {
fmt.Print("init kafka consumer, it may take a few seconds to init the connection\n")
//common arguments
var kafkaconf = &kafka.ConfigMap{
"api.version.request": "true",
"auto.offset.reset": "latest",
"heartbeat.interval.ms": 3000,
"session.timeout.ms": 30000,
"max.poll.interval.ms": 120000,
"fetch.max.bytes": 1024000,
"max.partition.fetch.bytes": 256000}
kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers);
kafkaconf.SetKey("group.id", cfg.GroupId)
switch cfg.SecurityProtocol {
case "PLAINTEXT" :
kafkaconf.SetKey("security.protocol", "plaintext");
case "SSL":
kafkaconf.SetKey("security.protocol", "ssl");
kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem")
case "SASL_SSL":
kafkaconf.SetKey("security.protocol", "sasl_ssl");
kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)
case "SASL_PLAINTEXT":
kafkaconf.SetKey("security.protocol", "sasl_plaintext");
kafkaconf.SetKey("sasl.username", cfg.SaslUsername);
kafkaconf.SetKey("sasl.password", cfg.SaslPassword);
kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism)
default:
panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true))
}
consumer, err := kafka.NewConsumer(kafkaconf)
if err != nil {
panic(err)
}
fmt.Print("init kafka consumer success\n")
return consumer;
}
func main() {
// Choose the correct protocol
cfg := loadJsonConfig();
consumer := doInitConsumer(cfg)
consumer.SubscribeTopics([]string{cfg.Topic, cfg.Topic2}, nil)
for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will
//automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
consumer.Close()
}