Go 怎么基于 chan 实现低延迟队列?

下面是我实现的队列代码。

如果段时间添加大量任务,可能会导致 chan 缓冲用完导致添加任务失败(或者不使用select导致添加任务时阻塞),比如设置缓冲大小为 1 模拟缓冲耗尽。

问题是,如何让队列的大小不受 chan 缓冲区限制,而同时又能享受到 chan 队列的低延迟?

https://goplay.tools/snippet/DNsfIru_-5h

package queue

import (
    "fmt"
    "time"
)

var QueueChan = make(chan string, 1)

func worker(id string) {
    fmt.Printf("%q start\n", id)
    time.Sleep(time.Second * 2)
    fmt.Printf("%q done\n", id)
}

func consume() {
    for id := range QueueChan {
        worker(id)
    }
}

func Run() {
    go consume()

    AddTask("1")
    AddTask("2")
    time.Sleep(time.Second * 3)
    AddTask("3")
    AddTask("4")
    AddTask("5")
    AddTask("6")
}

func AddTask(id string) {
    select {
    case QueueChan <- id:
        fmt.Printf("add %q to queue\n", id)
    default:
        fmt.Printf("queue full\n")
    }
}
阅读 411
avatarAI BotBETA
在 Go 中,如果你希望实现一个不受 channel 缓冲区大小限制的队列,同时保持低延迟,你可以考虑使用无缓冲的 channel 并结合一个额外的数据结构(如切片)来管理等待执行的任务。这种方法允许你动态地处理任务队列,而不会因缓冲区大小而阻塞生产者。

以下是一个可能的解决方案,它使用了一个无缓冲的 channel 和一个切片来存储等待的任务。当切片为空时,消费者会阻塞在 channel 上等待新的任务;当切片中有任务时,消费者会从切片中取出任务执行,并在切片为空时再次等待。生产者则始终尝试将任务添加到切片中,如果切片已满(达到了某个预设的最大值),则可以选择丢弃任务、阻塞生产者或者扩展切片容量。

这里是一个简化的示例代码:

package main

import (

"fmt"
"time"

)

const maxQueueSize = 10 // 预设的最大队列大小

type TaskQueue struct {

tasks []string
ch    chan string

}

func NewTaskQueue(size int) *TaskQueue {

return &TaskQueue{
    tasks: make([]string, 0, size),
    ch:    make(chan string),
}

}

func (q *TaskQueue) StartConsuming(workerID string) {

for task := range q.ch {
    go func(t string) {
        fmt.Printf("%q worker starting task %q\n", workerID, t)
        time.Sleep(2 * time.Second) // 模拟任务执行时间
        fmt.Printf("%q worker done with task %q\n", workerID, t)
    }(task)
    // 从切片中移除已分配的任务(如果需要在任务完成后确认,可以添加额外逻辑)
    if len(q.tasks) > 0 {
        q.tasks = q.tasks[1:] // 假设总是处理切片中的第一个任务
    } else {
        // 如果切片为空,但可能还有其他 goroutine 正在尝试添加任务到切片,
        // 所以这里不立即关闭 channel,而是让 consumer 继续等待新的任务。
        // 注意:在实际情况中,需要更复杂的逻辑来处理队列的关闭和资源的清理。
    }

    // 尝试从切片中取出下一个任务并发送到 channel(如果有的话)
    q.sendNextTask()
}

}

func (q *TaskQueue) AddTask(task string) {

q.tasks = append(q.tasks, task)
q.sendNextTask()

}

func (q *TaskQueue) sendNextTask() {

select {
case <-time.After(0): // 使用非阻塞的 select 来检查是否需要发送任务
    if len(q.tasks) > 0 {
        nextTask := q.tasks[0]
        q.ch <- nextTask
        // 在这里不移除任务,因为上面的消费逻辑中已经移除了。
        // 如果选择在这里移除,则需要确保消费逻辑和这里的逻辑一致。
    }
default:
    // 如果 channel 满了(在这个例子中,无缓冲 channel 总是“满”的,除非有消费者等待),
    // 则不发送任务。这里的 default 分支实际上不会执行任何操作,
    // 因为我们总是尝试发送任务,但如果 channel 没有消费者,则发送会阻塞。
    // 由于我们使用了切片来缓存任务,所以这里不需要额外的处理。
}

}

func main() {

queue := NewTaskQueue(maxQueueSize)
go queue.StartConsuming("worker1")

for i := 1; i <= 15; i++ {
    queue.AddTask(fmt.Sprintf("%d", i))
    fmt.Printf("Added task %d to queue\n", i)
}

// 为了观察效果,让主 goroutine 等待一段时间
time.Sleep(10 * time.Second)

}


注意:
1. 上述代码中的 `sendNextTask` 方法尝试发送切片中的第一个任务到 channel。由于使用了无缓冲的 channel,如果消费者没有准备好接收任务,发送操作将会阻塞。但是,由于我们总是先检查切片中是否有任务,所以只有在有任务要发送时才会尝试发送,从而避免了不必要的阻塞。
2. 消费者从 channel 中接收任务并执行,然后从切片中移除已处理的任务(或标记为已处理)。这里为了简化,假设总是处理切片中的第一个任务。在实际应用中,可能需要更复杂的逻辑来管理任务的状态和优先级。
3. 上述代码中的 `main` 函数添加了多个任务到队列中,并等待一段时间以观察任务的执行情况。在实际应用中,你可能需要更复杂的逻辑来管理程序的退出和资源的清理。

这种方法结合了 channel 的低延迟特性和切片的动态容量管理,可以灵活地处理大量任务而不会因缓冲区大小而受限。
1 个回答

使用 chan 和 slice 来实现一个无界缓冲队列:

package queue

import (
    "fmt"
    "sync"
    "time"
)

type UnboundedQueue struct {
    inChan  chan string
    outChan chan string
    buffer  []string
    mu      sync.Mutex
}

func NewUnboundedQueue() *UnboundedQueue {
    q := &UnboundedQueue{
        inChan:  make(chan string),
        outChan: make(chan string),
        buffer:  make([]string, 0),
    }
    go q.run()
    return q
}

func (q *UnboundedQueue) run() {
    for {
        var out chan string
        var next string
        if len(q.buffer) > 0 {
            out = q.outChan
            next = q.buffer[0]
        }
        select {
        case val := <-q.inChan:
            q.mu.Lock()
            q.buffer = append(q.buffer, val)
            q.mu.Unlock()
        case out <- next:
            q.mu.Lock()
            q.buffer = q.buffer[1:]
            q.mu.Unlock()
        }
    }
}

func (q *UnboundedQueue) In(val string) {
    q.inChan <- val
}

func (q *UnboundedQueue) Out() string {
    return <-q.outChan
}

func worker(id string) {
    fmt.Printf("%q start\n", id)
    time.Sleep(time.Second * 2)
    fmt.Printf("%q done\n", id)
}

func consume(q *UnboundedQueue) {
    for {
        id := q.Out()
        worker(id)
    }
}

func Run() {
    q := NewUnboundedQueue()
    go consume(q)

    q.In("1")
    q.In("2")
    time.Sleep(time.Second * 3)
    q.In("3")
    q.In("4")
    q.In("5")
    q.In("6")
}

1.UnboundedQueue 结构体包含 inChan 和 outChan 通道,以及一个用于存储数据的 buffer 切片。
2.run 方法在一个单独的 goroutine 中运行,使用 select 语句处理 inChan 和 outChan 的数据传输。
3.In 方法将数据写入 inChan,Out 方法从 outChan 读取数据。
4.consume 函数从队列中读取数据并处理。

推荐第三方库

如果你的主要需求是高效、低延迟的队列处理,ZenQ 是一个很好的选择,因为它在性能和易用性方面表现出色。如果你需要更高的吞吐量和更复杂的生产者-消费者模式,4FQ 可能更适合你。如果你需要分布式消息传递,Emitter 是一个不错的选择。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏