# kafka-go **Repository Path**: tym_hmm/kafka-go ## Basic Information - **Project Name**: kafka-go - **Description**: golang kafka 组件 生产者连接池,基于sarama, 目的在于使用简易上手, 快速调优 - **Primary Language**: Go - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 3 - **Forks**: 3 - **Created**: 2023-01-06 - **Last Updated**: 2024-09-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## kafka golang > 基于 sarama 封装, 目的在于使用简洁及优化 > 运行模式为 消费组的方式 ## 支持功能 1. 客户端连接池,可复用连接 2. 通过build可构建多个消息主题发送及主题消费 3. 默认使用消费者组模式 ## 使用 ### 1. 消费者 > 通过build 可同时构建多个需要的业务消费,指定不同的参数传入 * ### demo ```go var ( addr = "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092" topic = "web_log" groupId = "testGroupId" ) //使用阻塞模式 var consumerFactory = kafka_go.NewFactoryConsumer() //使用非阻塞模式 // var consumerFactory = kafka_go.NewFactoryConsumerBackGround() //消费构建 demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic) //设置kafka版本 demoConsumerBuilder.SetKafkaVersion("3.0.0") //注册消费回调监听 demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) { fmt.Printf("%+v\n", context.GetMessageString()) }) //注册消费构建(多个以参数区分) consumerFactory.RegisterConsumer(demoConsumerBuilder) //运行消费者 consumerFactory.Run() ``` * ### build 接口说明 `BuildConsumer` > 使用方式 ```go //消费构建 demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic) //设置kafka版本 demoConsumerBuilder.SetKafkaVersion("3.0.0") ``` > 接口使用前注意事项 > * 消费者可创建为一消费者多协程分区消费和一消费者一协程消费 > * 一消费者多协程分区消费不保存消息可靠性,重启后存在丢失消息的情况,但吞吐量较高,且接收消息后无法调用消息确认 > * 一消费一协程保证消息可靠,但吞吐量较底 > * 此组件针对消费组进行处理,需传入消费入`groupdId` > > 案例设置:(主要针对`BuildConsumer`设置) ```go //一消费多协程 //消费构建 demo2ConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId2, Conf.Topic2) demo2ConsumerBuilder.SetDebug(true) demo2ConsumerBuilder.SetMultiplePartition(true) //开启多协程消费 默认为false demo2ConsumerBuilder.SetIsAutoCommit(false)//开启多协程消费,此配置失效 demo2ConsumerBuilder.SetKafkaVersion("3.0.0") demo2ConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) { nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND) fmt.Printf("%s xx received2 [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString()) context.GetSession().Ack() }) ``` ```go //一消费一协程 demoConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId3, Conf.Topic3) demoConsumerBuilder.SetDebug(true) demoConsumerBuilder.SetIsAutoCommit(false)// 开启手动提交 demoConsumerBuilder.SetKafkaVersion("3.0.0") demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) { nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND) fmt.Printf("%s xx received [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString()) //fmt.Printf("%+v\n", context.GetMessageString()) context.GetSession().Ack() }) ``` | 接口名称| 描述 | |---|---| | SetKafkaVersion(kafkaVersion string) | 设置kakfa版本号 底层会自动处理新版本接口兼容 | | SetIsAutoCommit(autoCommit bool) | 是否自动提交 `默认为true`, 当为false是需执行手动提交
通过 回调上下文session处理 `ConsumerMessageContext.getSession().Ack()`进行提交 | | SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi| 是否多分区指定消费(一个消费者跟根据分区数创建子协程),此模式存在重启后消息丢失 | | SetBalanceType(balanceType BalanceType) BuildConsumerApi|设置分区策略类型 CONSUMER
BALANCE_STRATEGY_ROUNDROBIN:轮询(默认)
CONSUMER_BALANCE_STRATEGY_STICKY:亲和性
CONSUMER_BALANCE_STRATEGY_RANGE: 随机| | SetResponseListener(responseResult ConsumerResponseListener) | 拉取消息事件监听| | GetAddr() string | 获取broker地址| | GetGroupId() string | 获取消费组 | | GetTopic() string | 获取主题 | | GetBalanceType() BalanceType | 获取分区策略方式 | | ToString() string | 返回构建数据字符串| * ### build 拉取消息事件监听回调 说明 `ConsumerResponseListener` > type ConsumerResponseListener func(context \*ConsumerMessageContext) ConsumerMessageContext : 事件消息上下文 | 接口名称| 说明 | |---|---| | GetBuilder() BuildConsumerApi | 返回当前执行回调中的build信息 | | GetGroupId() string | 返回当前组 | | GetTopic() string | 返回当前主题 | | GetPartition() int32 | 返回当前执行数据的分区 | | GetOffset() int64 | 返回当前数据的偏移数 | | GetMessage() []byte | 获取消息主体 字节数组 | | GetMessageString() string | 返回消息主体 字符串 | | GetTimeStamp() time.Time | 返回消息时间 | | GetVal() \*sarama.ConsumerMessage | 返回原生消息内容 | | GetSession() *ConsumerSession | 返回当前执行的session | * ### ConsumerSession 接口说明 | 接口名称 | 说明 | | --- |-------------| | Ack() | 手动确认消息 | | IsAutoAck() bool | 是否自动提交 | | GetSession() sarama.ConsumerGroupSession | 原生session获取 | | GetMessage() *sarama.ConsumerMessage | 原生消息获取 | --- ### 1. 生产者 > 通过build 可同时构建多个需要的业务生产者,指定不同的参数传入,默认生使用的是同步提交
> 底层维护一套连接池,根据设置最大连接数进行设置 * ### demo ```go var ( productFactory = kafka_go.NewFactoryProduct() ) demoBuild := kafka_go.NewBuildProduct("demo1", "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092").SetDebug(true).SetMaxConnection(4) err := productFactory.Register(demoBuild).Connect() if err != nil { log.Panicln(err) } num := 10 var wg = &sync.WaitGroup{} for i := 0; i < num; i++ { wg.Add(1) go func() { defer wg.Done() partition, offset, err := productFactory.Push("demo1", Conf.Topic3, "生产者消息") if err != nil { fmt.Println(err) } else { fmt.Printf("发送成功 partition:%d, offset:%d", partition, offset) } }() //time.Sleep(time.Second) } wg.Wait() ``` * ### build 接口说明 `BuildProduct` > 使用方式 | 接口| 说明 | |---|-------------------| | SetMaxConnection(maxConnection int32) BuildProductApi| 设置连接池数量 | | SetAckType(ackType ProductAckType) BuildProductApi | 设置消息确认方式 | | SetTransactional(isTransactional bool) BuildProductApi | 是否开启事务提交 | | GetName() string | 获取当前build name | | GetAddr() string | 获取当前服务地址 | | GetAddrSlice() []string | 获取当前服务地址切片 | | GetMaxConnection() int32 | 获取当前大连接数 | | GetConnStrategy() ProductBalanceType | 获取连接池加载类型, 默认为轮询 | * ### ProductAckType 消息确认类型说明 | 确认类型 | 说明 | | --- |----------------------------------------------------------------------------------------------------------------------------------------------------------| | PRODUCT_ACK_TYPE_NONAL | 不等待 broker 的 ack,这一操作提供了一个最低的延迟 | | PRODUCT_ACK_TYPE_FOLLOWER | 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack | | PRODUCT_ACK_TYPE_ALL | 等待 broker 的 ack,partition 的 leader 和 follower (ISRL里的follower,不是全部的follower)全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复, 开启事务后可避免数据重复|