比如:前端发送一条“插入数据库操作”指令到mq队列中,消费者执行了这条指令之后,需返回给前端指令操作成功或者失败。前端一直要等待这个返回结果,并且消息那么多,我怎么保证消费的消息能够准确的返回给这个请求。
用go写的,能满足,但是貌似耦合有点严重,要维护Conn,并且消费和生产只能在同一进程下,没法拆开。
嗯,还是根据#1楼上天说的,我理解是产生messageid+消息体,消费者消费之后把messageid和结果存储都redis中,前端请求我可以一直阻塞或者设置超时,等到获取到redis的结果之后再返回,不知道对不对这样?
package main
import (
"fmt"
"os"
"strconv"
"time"
"github.com/Shopify/sarama"
"github.com/gin-gonic/gin"
)
var i = 0
type MyContext struct {
*gin.Context
res chan bool
}
var Conn = make(map[string]*MyContext)
var token = "xxxx"
type HandlerFunc func(*MyContext)
func handler(handler HandlerFunc) gin.HandlerFunc {
return func(c *gin.Context) {
context := new(MyContext)
context.Context = c
context.res = make(chan bool)
handler(context)
<-context.res
}
}
func main() {
go Consumer()
r := gin.Default()
r.Handle("GET", "/", handler(SendMSG))
r.Run()
}
func SendMSG(c *MyContext) {
Conn[token] = c
config := sarama.NewConfig()
config.Producer.Return.Successes = true
p, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
defer p.Close()
topic := "chu"
// srcValue := "sync: this is a message"
i = i + 1
srcValue := strconv.Itoa(i)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(srcValue),
}
partition, offset, err := p.SendMessage(msg)
if err != nil {
fmt.Printf("send message(%s) err=%s \n", srcValue, err)
} else {
fmt.Fprintf(os.Stdout, srcValue+"发送成功,partition=%d, offset=%d \n", partition, offset)
}
}
func Consumer() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("chu")
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
pc, err := consumer.ConsumePartition("chu", 0, sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", 0, err)
return
}
defer pc.AsyncClose()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
Conn[token].Context.String(200, string(msg.Value))
time.Sleep(5 * time.Second)
Conn[token].res <- true
// wg.Done()
}
}
你都是异步了,怎么能等待返回结果.一般方式是在发送的时生成一个唯一键返回给前端,并且与消息一起发给消费者.消费者消费后并把唯一id键记录下来,前端就采用轮询的方式不断查询是否存在唯一键.这样就能实现类似的通知方式了,如果技术足够轮询方式也能通过websocket来实现