mq发送完指令后,需等待返回结果,需要怎么做?

apue
  • 43

比如:前端发送一条“插入数据库操作”指令到mq队列中,消费者执行了这条指令之后,需返回给前端指令操作成功或者失败。前端一直要等待这个返回结果,并且消息那么多,我怎么保证消费的消息能够准确的返回给这个请求。
用go写的,能满足,但是貌似耦合有点严重,要维护Conn,并且消费和生产只能在同一进程下,没法拆开。
嗯,还是根据#1楼上天说的,我理解是产生messageid+消息体,消费者消费之后把messageid和结果存储都redis中,前端请求我可以一直阻塞或者设置超时,等到获取到redis的结果之后再返回,不知道对不对这样?

package main

import (
    "fmt"
    "os"
    "strconv"
    "time"

    "github.com/Shopify/sarama"
    "github.com/gin-gonic/gin"
)

var i = 0

type MyContext struct {
    *gin.Context
    res chan bool
}

var Conn = make(map[string]*MyContext)
var token = "xxxx"

type HandlerFunc func(*MyContext)

func handler(handler HandlerFunc) gin.HandlerFunc {
    return func(c *gin.Context) {
        context := new(MyContext)
        context.Context = c
        context.res = make(chan bool)
        handler(context)
        <-context.res
    }
}

func main() {
    go Consumer()
    r := gin.Default()
    r.Handle("GET", "/", handler(SendMSG))
    r.Run()
}

func SendMSG(c *MyContext) {
    Conn[token] = c
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    p, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }
    defer p.Close()

    topic := "chu"

    // srcValue := "sync: this is a message"
    i = i + 1
    srcValue := strconv.Itoa(i)

    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder(srcValue),
    }
    
    partition, offset, err := p.SendMessage(msg)
    if err != nil {
        fmt.Printf("send message(%s) err=%s \n", srcValue, err)
    } else {
        fmt.Fprintf(os.Stdout, srcValue+"发送成功,partition=%d, offset=%d \n", partition, offset)
    }

}

func Consumer() {

    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions("chu") 
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)

    pc, err := consumer.ConsumePartition("chu", 0, sarama.OffsetNewest)
    if err != nil {
        fmt.Printf("failed to start consumer for partition %d,err:%v\n", 0, err)
        return
    }
    defer pc.AsyncClose()
    for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
        Conn[token].Context.String(200, string(msg.Value))
        time.Sleep(5 * time.Second)
        Conn[token].res <- true
        // wg.Done()
    }

}
回复
阅读 657
2 个回答

你都是异步了,怎么能等待返回结果.一般方式是在发送的时生成一个唯一键返回给前端,并且与消息一起发给消费者.消费者消费后并把唯一id键记录下来,前端就采用轮询的方式不断查询是否存在唯一键.这样就能实现类似的通知方式了,如果技术足够轮询方式也能通过websocket来实现

梦孝和
  • 3
新手上路,请多包涵

建议轮询,如果一定要同步的话Java可以使用DeferredResult异步返回

宣传栏