首先我使用了 "github.com/confluentinc/confluent-kafka-go/kafka"
这个包来连接Kafka,之后实现了个连接池的功能,代码如下:
package kfk
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"strings"
"sync"
)
type Pool struct {
brokers []string
config *kafka.ConfigMap
pool chan *kafka.Producer
wg sync.WaitGroup
}
var (
KafkaPool *Pool
)
func KfkInit() {
poolSize := 10
brokers := []string{"localhost:9092"}
KafkaPool = NewPool(brokers, poolSize)
}
// 初始化连接池
func NewPool(brokers []string, poolSize int) *Pool {
config := &kafka.ConfigMap{
"bootstrap.servers": strings.Join(brokers, ","),
"acks": "all",
"delivery.timeout.ms": 3000,
}
p := &Pool{
brokers: brokers,
config: config,
pool: make(chan *kafka.Producer, poolSize),
}
for i := 0; i < poolSize; i++ {
producer, err := p.createProducer()
if err != nil {
p.Close()
panic(err)
}
p.pool <- producer
}
return p
}
// 创建Producer实例
func (p *Pool) createProducer() (*kafka.Producer, error) {
producer, err := kafka.NewProducer(p.config)
if err != nil {
return nil, err
}
return producer, nil
}
// 从连接池中获取Producer实例
func (p *Pool) getProducer() *kafka.Producer {
return <-p.pool
}
// 将Producer实例放回连接池
func (p *Pool) releaseProducer(producer *kafka.Producer) {
p.pool <- producer
}
// 发送消息到指定主题
func (p *Pool) SendMessage(topic string, tableName string, message []byte) error {
producer := p.getProducer()
defer p.releaseProducer(producer)
deliveryChan := make(chan kafka.Event)
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte(tableName),
Value: message,
}, deliveryChan)
if err != nil {
return err
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
}
return nil
}
// 关闭连接池
func (p *Pool) Close() {
close(p.pool)
p.wg.Wait()
}
开发完之后,我打算执行交叉编译
,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错):
#!/bin/bash
swag init >/dev/null 2>&1
# 根据不同的平台进行交叉编译
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: Swag生成成功,正在执行打包···"
# save the original values of GOOS and GOARCH
GOOS_ORIGINAL=$(go env GOOS)
GOARCH_ORIGINAL=$(go env GOARCH)
# specify the target platform and architecture
platform=${1:-"linux"}
arch=${2:-"amd64"}
# set the environment variables for the target platform and architecture
if [ "$platform" == "linux" ] && [ "$arch" == "amd64" ]; then
export GOOS=linux
export GOARCH=amd64
elif [ "$platform" == "windows" ] && [ "$arch" == "amd64" ]; then
export GOOS=windows
export GOARCH=amd64
else
echo "Error: Unsupported platform and architecture combination"
exit 1
fi
# build the Go binary for the target platform and architecture
go build -o dot_production main_production.go
# check if the binary was successfully built
if [ $? -eq 0 ]; then
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包完成!"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包失败!"
exit 1
fi
go build -o dot_consumer main_consumer.go
# check if the binary was successfully built
if [ $? -eq 0 ]; then
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包完成!"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包失败!"
exit 1
fi
# reset the GOOS and GOARCH environment variables to their original values
export GOOS=$GOOS_ORIGINAL
export GOARCH=$GOARCH_ORIGINAL
报错如下:
zxx@Macbook dot % ./pack.sh
2023-06-03 00:31:29 Success: Swag生成成功,正在执行打包···
# dot/middleware/kfk
middleware/kfk/kfk_consumer.go:18:22: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:19:22: undefined: kafka.Consumer
middleware/kfk/kfk_consumer.go:26:19: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:39:25: undefined: kafka.NewConsumer
middleware/kfk/kfk_consumer.go:56:2: maxMessages declared and not used
middleware/kfk/kfk_production.go:11:17: undefined: kafka.ConfigMap
middleware/kfk/kfk_production.go:12:22: undefined: kafka.Producer
middleware/kfk/kfk_production.go:53:41: undefined: kafka.Producer
middleware/kfk/kfk_production.go:63:37: undefined: kafka.Producer
middleware/kfk/kfk_production.go:68:48: undefined: kafka.Producer
middleware/kfk/kfk_consumer.go:56:2: too many errors
2023-06-03 00:31:29 Success: linux 生产者 平台打包失败
十分不理解,这是什么原因呢?换成kafka之后,不能进行交叉编译么?请各位大佬赐教~ 各位大佬辛苦
首先,我并不是很确定问题的原因,因为确实没用过这个库,我只能说一些可能存在的问题:
修改 import 为 v2 ,可能由于老版本存在问题
编译Linux时尝试添加 tags
If you are building for Alpine Linux (musl), -tags musl must be specified.
尝试使用下面的方式进行编译(我认为最可能的问题)
https://github.com/confluentinc/confluent-kafka-go/issues/947