我使用quic-go和protobuf开发应用。
使用单向流发送心跳数据,在需要时启动新的stream传输文件数据(一方分块发送数据,等待另外一方发送ack后发送下一块数据)。
然而在程序运行过程中有一端会突然没办法收到数据(一直阻塞在read方法,不会报错,感知不到另外一端已经断开连接和流),另外一端则因为没有数据流量产生断开stream和connection。
我写了一个简单的验证程序(监听本地地址模拟发送数据和ack,Windows环境百分百复现):
message.proto
syntax = "proto3";
option go_package = "./main";
message Message {
bytes data = 1;
}
message Ack {
bool ok = 1;
bool complete = 2;
int32 received = 3;
}
main.go
//go:generate protoc --go_out=. --go_opt=paths=source_relative message.proto
package main
import (
"context"
"crypto/rand"
"crypto/tls"
"fmt"
"github.com/quic-go/quic-go"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/encoding/protodelim"
"io"
"log/slog"
"net"
"syscall"
"time"
)
var cert, _ = tls.LoadX509KeyPair("./certificate.pem", "./private.key")
var quicConfig = &quic.Config{
MaxConnectionReceiveWindow: 100 << 20, MaxStreamReceiveWindow: 20 << 20, MaxIdleTimeout: time.Second * 30,
}
type transport struct {
stream io.ReadWriter
r, w int
}
func (t *transport) state() string {
return fmt.Sprintf("r: %d w:%d", t.r, t.w)
}
func (t *transport) Write(p []byte) (i int, err error) {
i, err = t.stream.Write(p)
if i > 0 {
t.w += i
}
return
}
func (t *transport) Read(p []byte) (i int, err error) {
i, err = t.stream.Read(p)
if i > 0 {
t.r += i
}
return
}
func (t *transport) ReadByte() (byte, error) {
var b [1]byte
n, err := t.Read(b[:])
if n > 0 {
t.r += n
}
return b[0], err
}
func main() {
slog.SetLogLoggerLevel(slog.LevelDebug)
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
return listenAndSend(ctx)
})
eg.Go(func() error {
return connectAndReceive(ctx, 10000)
})
if err := eg.Wait(); err != nil {
slog.Error("error", "err", err)
} else {
slog.Info("completed")
}
}
func setUDPBufferSize(conn *net.UDPConn, size int) error {
rawConn, err := conn.SyscallConn()
if err != nil {
return err
}
var sysErr error
err = rawConn.Control(func(fd uintptr) {
sysErr = syscall.SetsockoptInt(syscall.Handle(fd), syscall.SOL_SOCKET, syscall.SO_RCVBUF, size)
})
if err != nil {
return err
}
return sysErr
}
func listenAndSend(ctx context.Context) error {
udpConn, _ := net.ResolveUDPAddr("udp", "127.0.0.1:65533")
conn, err := net.ListenUDP("udp", udpConn)
if err != nil {
return err
}
tr := quic.Transport{Conn: conn}
l, err := tr.Listen(&tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}, quicConfig)
if err != nil {
return err
}
c, err := l.Accept(ctx)
if err != nil {
return err
}
defer c.CloseWithError(quic.ApplicationErrorCode(0), "")
stream, err := c.OpenStreamSync(ctx)
if err != nil {
return err
}
slog.Info("accept stream", "stream", stream.StreamID())
defer stream.Close()
reader := &transport{stream: stream}
defer func() {
defer slog.Info("transport", "state", reader.state())
}()
m := &Message{Data: make([]byte, 8*1024)}
ack := &Ack{}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, _ = rand.Read(m.Data)
_, err = protodelim.MarshalTo(reader, m)
if err != nil {
slog.Error("write message error", "err", err)
return err
} else {
slog.Debug("write message")
}
if err = protodelim.UnmarshalFrom(reader, ack); err != nil {
slog.Error("read ack error", "err", err)
return err
} else {
slog.Debug("receive ack", "complete", ack.Complete, "received", ack.Received, "ok", ack.Ok)
}
if ack.Complete {
return nil
}
time.Sleep(time.Microsecond)
}
}
func connectAndReceive(ctx context.Context, m int) error {
addr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:0")
udpConn, _ := net.ListenUDP("udp", addr)
targetAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:65533")
conn, err := quic.Dial(ctx, udpConn, targetAddr, &tls.Config{InsecureSkipVerify: true}, quicConfig)
if err != nil {
return fmt.Errorf("connect failed %w", err)
}
stream, err := conn.AcceptStream(ctx)
if err != nil {
return err
}
defer stream.Close()
defer stream.CancelRead(quic.StreamErrorCode(0))
defer stream.CancelWrite(quic.StreamErrorCode(0))
tr := &transport{stream: stream}
defer func() {
slog.Info("transport", "state", tr.state())
}()
var (
msg Message
ack Ack
)
i := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err = protodelim.UnmarshalFrom(tr, &msg); err != nil {
slog.Error("read message error", "err", err)
return err
}
i++
slog.Info("receive message", "i", i, "size", len(msg.Data))
ack.Ok, ack.Received, ack.Complete = true, int32(len(msg.Data)), i > m
if _, err = protodelim.MarshalTo(tr, &ack); err != nil {
slog.Error("write ack error", "err", err)
return err
} else {
slog.Debug("write ack", "i", i)
}
if ack.Complete {
return nil
}
}
}
- 尝试调整了MaxConnectionReceiveWindow和MaxStreamReceiveWindow没有效果
- 调整UDP缓冲区大小也无效
- 在Linux环境编译并运行有效,在我的两个Windows(Windows10)环境百分百复现