Skip to main content

kafka 插件

我们使用Sarama作为kafka连接的SDK, 并且将其封装在xkafka包中。我们也为其适配了自动注册的rkafka包的实现。

如何使用

自动注册

在使用的过程中,我们只需要将rkafka导入到主程序中,比如

import (
_ "path/to/r/rkafka"
)

func main() {
.....
}

主程序运行过程中会根据配置中的数据,创建对应的实例,并且放到内存中。我们在使用的过程中,可以调用

consumer := rkafka.GetConsumer("ngo.client.kafka.{name}")
producer := rkafka.GetProducer("ngo.client.kafka.{name}")

直接拿到客户端,就可以使用了。

配置参数

参数类型含义默认值
Namestring{name},用于在ngo.client.kafka.{name}中使用
Addr[]string连接地址和端口号mysql
Versionstring客户端版本
MaxOpenRequestsint一个连接处理的最大请求数
DialTimeouttime.Duration请求建连超时时间0
ReadTimeouttime.Duration请求等待超时
WriteTimeouttime.Duration请求成功超时时间,包含同步所有broker副本时间
Consumerstruct消费者配置
Consumer.Groupstring消费者所在群组
Consumer.EnableAutoCommitbool是否自动提交offsets
Consumer.AutoCommitIntervaltime.Duration自动提交offsetsbroker的周期
Consumer.InitialOffsetnumber默认起始offsets
Consumer.SessionTimeouttime.Durationconsumer 多长时间没有发送心跳broker的剔除时间
Consumer.MinFetchBytesnumber消费的最小字节数
Consumer.DefaultFetchBytesnumber默认消费的字节数
Consumer.MaxFetchBytesnumber一次消费的最大字节数, 单个消息超标之后会导致消费失败
Consumer.MaxFetchWaittime.Duration等待能够消费的最大时间
Consumer.Retriesnumberoffsets提交的重试次数
Producerstruct生产者配置
Producer.MaxMessageBytesnumber暂未使用
Producer.Acksnumber确认级别; 0: 不需要确认 1:提交本地 -1: 等待所有
Producer.TimeoutnumberAcks相关的,broker超时时间
Producer.Retriesnumber消息重试次数
Producer.MaxFlushBytesnumber触发写入字节数量
Producer.MaxFlushMessagesnumber触发写入的消息数量
Producer.FlushFrequencynumber触发写入的频率周期
Producer.Idempotentbool消息是否只保留一个副本
EnableTracerbool是否开始tracingfalse

配置举例

[[ngo.client.kafka]] # 本地自行安装kafka,或者通过测试环境
name="demo01"
addr=["127.0.0.1:9092"]
# version= "0.11.0.2"
dialTimeout=5000000000 #5s
readTimeout=5000000000 #5s
writeTimeout=5000000000 #5s
maxOpenRequests = 10
# sasl:
# enable: true
# user: xxx
# password: xxx
# mechanism: PLAIN
# handshake: true
[ngo.client.kafka.metadata]
retries = 2
timeout = 10000000000 #10s
[ngo.client.kafka.consumer]
group="ngoexamplekafkaqueue"
enableAutoCommit=false
autoCommitInterval=1000000000 #1s
initialOffset= -1 # OffsetNewest=-1 OffsetOldest=-2
sessionTimeout=10000000000 #10s
minFetchBytes=1
maxFetchBytes= 0 # 无限制
defaultFetchBytes = 1000000
maxFetchWait= 250000000 #250ms
retries=1
[ngo.client.kafka.producer]
maxMessageBytes=1000000
acks= 1 # NoResponse = 0 WaitForLocal = 1 WaitForAll = -1
timeout= 10000000000 #10s
retries=3
maxFlushBytes=0
maxFlushMessages=0
flushFrequency= 0

目前我们支持了同一个应用的多客户端, 因此你可以在toml配置文件中配置多个。