rust使用tokio读写数据卡死(新手)怎么处理?

各位老师,我是一个rust新手尝试使用tokio做一些实验。目前碰到一个问题,我模拟了一个客户端client.rs,想要他和服务端server.rs进行通讯。客户端发送一些信息给到服务端,进行处理后再把结果发回来。
目前的代码会出现客户端卡死,用try_read看了一下原因是WouldBlock错误,单发送和单接收都正常。一旦像代码中出现发送后再接收就会出问题。能否有大佬解释一下原因,谢谢。

client.rs

use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpSocket, TcpStream},
};

type BoxedError = Box<dyn std::error::Error>;

#[derive(Serialize, Deserialize)]
struct RequestFrame {
    command: String,
    key: String,
    value: Option<String>,
}

impl RequestFrame {
    fn new(command: String, key: String, value: Option<String>) -> RequestFrame {
        RequestFrame {
            command: command,
            key: key,
            value: value,
        }
    }
}

#[derive(Parser, Debug)]
#[command(
    version = "0.0.1",
    author = "haozhang",
    long_about = "this is a self-made redis client"
)]
struct Args {
    #[arg(short, long, value_name = "command")]
    command: Command,

    #[arg(short, long, value_name = "key")]
    key: String,

    #[arg(short, long, value_name = "value")]
    value: Option<String>,
}

#[derive(Debug, Clone, ValueEnum)]
enum Command {
    Get,
    Set,
}

async fn get_stream() -> Result<TcpStream, BoxedError> {
    let addr = "127.0.0.0:6379".parse().unwrap();

    let socket = TcpSocket::new_v4().unwrap();
    let stream = socket.connect(addr).await.unwrap();

    Ok(stream)
}

#[tokio::main]
async fn main() -> Result<(), BoxedError> {
    // let args = std::env::args().collect::<Vec<String>>();

    let mut stream = get_stream().await?;

    let (mut reader, mut writer) = stream.split();

    let args = Args::parse();

    let command = match args.command {
        Command::Get => "get".to_string(),
        Command::Set => "set".to_string(),
    };
    let key = args.key;
    let value = args.value;

    // println!("{:?}", command);
    // println!("{:?}", key);
    // println!("{:?}", value);

    let request = RequestFrame::new(command, key, value);

    let request_json_string = serde_json::to_string(&request)?;

    writer.write_all(request_json_string.as_bytes()).await?;
    writer.flush().await?;

    let mut buf = String::new();
    reader.read_to_string(&mut buf).await?;
    // let mut buf = [0; 100];
    // reader.try_read(&mut buf)?;
    println!("{:?}", buf);

    println!("done");

    Ok(())
}

server.rs

use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpListener,
};

type BoxedError = Box<dyn std::error::Error>;

#[tokio::main]
async fn main() -> Result<(), BoxedError> {
    let listener = TcpListener::bind("0.0.0.0:6379").await?;

    println!("listening on {}", listener.local_addr()?.port());

    loop {
        let (socket, _) = listener.accept().await?;

        tokio::spawn(async {
            match process(socket).await {
                Ok(()) => (),
                Err(e) => eprintln!("process occurs an error: {e}")
            };
        });
    }
}

async fn process(mut socket: tokio::net::TcpStream) -> Result<(), BoxedError> {
    let (mut reader, mut writer) = socket.split();

    let mut buf = String::new();

    reader.read_to_string(&mut buf).await?;

    writer.write_all("we got the command, thanks".as_bytes()).await?;

    writer.flush().await?;

    println!("content: {}", buf);

    Ok(())
}
阅读 891
avatarAI BotBETA

WouldBlock 错误在 Tokio 中通常意味着当前的异步操作还不能立即完成,需要等待一段时间或者其它异步操作触发之后才能完成。在你提供的代码中,这个问题最有可能出现在客户端的读取操作中。

在你的 client.rs 文件中,你在发送请求后立即尝试读取响应:

writer.write_all(request_json_string.as_bytes()).await?;
writer.flush().await?;

let mut buf = String::new();
reader.read_to_string(&mut buf).await?; // 这里可能会返回 WouldBlock 错误

read_to_string 方法会一直尝试读取数据,直到遇到 EOF(文件结束符)或者发生错误。如果服务端还没有准备好发送数据,那么这个方法就会返回 WouldBlock 错误,因为当前没有数据可读。

解决这个问题的一个方法是使用 tokio::io::AsyncReadExt::read_until 或者其它可以处理 WouldBlock 错误的读取方法。但是,对于你的场景来说,你可能需要实现一个协议来确保客户端在读取之前知道服务端已经发送了数据。这通常涉及到发送和接收特定的消息头或协议控制消息,来通知对方数据已经准备好。

另一个方法是使用流(Stream)和异步迭代器(async iterator)来处理消息。Tokio 提供了一些工具,如 tokio::io::split 配合 tokio_util::codec,可以帮助你更方便地处理这种情况。

这里有一个简化的例子,展示了如何使用 tokio_util::codec 中的 FramedLinesCodec 来简化消息的读写过程:

use tokio_util::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;

// 假设消息是以换行符分隔的文本行
async fn send_and_receive(stream: TcpStream) -> Result<String, Box<dyn std::error::Error>> {
    let (mut reader, mut writer) = Framed::new(stream, LinesCodec::new()).split();

    // 发送消息
    writer.send("your_message_here").await?;

    // 读取响应
    let response = reader.next().await.transpose()?;
    response.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "unexpected EOF"))?
}

// 在你的 main 函数中使用 send_and_receive

请注意,这只是一个简化的例子,实际的应用中你可能需要定义自己的编解码器(Codec)来处理复杂的协议。

最后,请确保你的服务端代码在发送响应后,不要立即关闭连接,这样客户端才能读取到响应。在你的 server.rs 文件中,process 函数在发送完响应后就结束了,这可能会导致连接被关闭。你可能需要修改服务端代码以确保连接在发送完响应后仍然保持打开状态,直到客户端完成读取。

希望这些建议能帮助你解决问题!如果还有其他问题,请继续提问。

1 个回答

用错方法了,server中的read_to_string是通过EOF作为结束标志的,这个一般是操作文件的。
这里需要通过read方法手动拼接结果

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进