新手写了一个晚上没写出来, 求帮忙写个功能?

新手上路,请多包涵

新手写了一个晚上没写出来, 各种报错, baidu google 翻遍了, 求帮忙写个参考一下。
有一个向量 let data: Vec<i32> = (0..50).collect();
需要开三个thread来修改data
每个thread获取data的4个元素, 原地修改元素
// 0, 1, 2, 3 -> 1, 2, 3, 4
1号thread +1

// 4, 5, 6, 7 -> 6, 7, 8, 9
2号thread +2

// 8, 9, 10, 11 -> 11, 12, 13, 14
3号thread +3

阅读 3.9k
5 个回答

Implement step by step

先使用Vec<i32>来存放数据,然后创建三个线程分别对数据进行修改,代码大概是下面这样:

use std::thread;
fn main() {
    let mut data = (0..50).collect::<Vec<i32>>();
    let mut handles = vec![];
    
    for _ in [0,4,8] {
        let handle = thread::spawn(|| {
            //没有实现完整逻辑
            data[0] = 1;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", data);
}


运行一下有下面这些错误:

error[E0499]: cannot borrow `data` as mutable more than once at a time
  --> src/main.rs:14:36
   |
14 |           let handle = thread::spawn(|| {
   |                        -             ^^ `data` was mutably borrowed here in the previous iteration of the loop
   |  ______________________|
   | |
15 | |             //没有实现完整逻辑
16 | |             data[0] = 1;
   | |             ---- borrows occur due to use of `data` in closure
17 | |         });
   | |__________- argument requires that `data` is borrowed for `'static`

上面因为在 for _ in [0,4,8]..... 这个循环中创建的线程中的闭包对data的可变引用,3次可变引用是不允许的

error[E0373]: closure may outlive the current function, but it borrows `data`, which is owned by the current function
  --> src/main.rs:14:36
   |
14 |         let handle = thread::spawn(|| {
   |                                    ^^ may outlive borrowed value `data`
15 |             //没有实现完整逻辑
16 |             data[0] = 1;
   |             ---- `data` is borrowed here
   |
note: function requires argument type to outlive `'static`
  --> src/main.rs:14:22
   |
14 |           let handle = thread::spawn(|| {
   |  ______________________^
15 | |             //没有实现完整逻辑
16 | |             data[0] = 1;
17 | |         });
   | |__________^
help: to force the closure to take ownership of `data` (and any other referenced variables), use the `move` keyword
   |
14 |         let handle = thread::spawn(move || {
   |                                    ++++

上面编译器检测到线程中的闭包对可能在main函数外存活,而对data的可变引用此时还存在, 如果此时data不存在就会出问题(按理说我们的线程join之后不该有这个错误提示的😂)。然后提出的提示是使用move把data的所有权转给闭包

error[E0502]: cannot borrow `data` as immutable because it is also borrowed as mutable
  --> src/main.rs:25:22
   |
14 |           let handle = thread::spawn(|| {
   |                        -             -- mutable borrow occurs here
   |  ______________________|
   | |
15 | |             //没有实现完整逻辑
16 | |             data[0] = 1;
   | |             ---- first borrow occurs due to use of `data` in closure
17 | |         });
   | |__________- argument requires that `data` is borrowed for `'static`
...
25 |       println!("{:?}", data);
   |                        ^^^^ immutable borrow occurs here
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

上面线程中的闭包是对data的可变引用而println是对data的不变引用


上面的错误提示中使用move把data的所有权转给一个闭包使用是不行的,我们要把data给3个线程的闭包使用而不是一个;所以要想办法把data分给3个线程使用,这里想到的是Rc<T> 引用计数(reference counting),当Rc类型变量的引用计数不为0时这个变量不会被释放,使用Rc::clone使某个变量的引用计数加一;下面代码先使data的引用计数加一,move语意的闭包会使线程执行完毕后data的引用计数减一

 fn main() {
    //下面两种效果一样
    let r_c = Rc::new(0);
     //使用方法
    let _new_rc =  r_c.clone(); 
    //使用完全限定 
    let _new_rc_1 = Rc::clone(&r_c);
 }
use std::thread;
use std::rc::Rc;

fn main() {
    let  r_c = Rc::new((0..50).collect::<Vec<i32>>());
    let mut handles = vec![];
    
    for _ in [0,4,8] {
        let data = Rc::clone(&r_c);
        let handle = thread::spawn(move || {
            //没有实现完整逻辑
            data[0] = 1;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", r_c);
}

运行一下得到下面错误:


error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely
  --> src/main.rs:15:36
   |
15 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `[closure@src/main.rs:15:36: 15:43]`
   | |                      |
   | |                      required by a bound introduced by this call
16 | |             //没有实现完整逻辑
17 | |             data[0] = 1;
18 | |         });
   | |_________^ `Rc<Vec<i32>>` cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:15:36: 15:43]`, the trait `Send` is not implemented for `Rc<Vec<i32>>`
note: required because it's used within this closure
  --> src/main.rs:15:36
   |
15 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/84c898d65adf2f39a5a98507f1fe0ce10a2b8dbc/library/std/src/thread/mod.rs:711:1

出现上面错误的原因是Rc<Vec<i32>>没有实现 Send trait,Send trait 的作用是 保证实现了 Send trait 的类型值的所有权可以在线程间传送。几乎所有的 Rust类型都是Send的,不过有一些例外,Rc<T> 就是不能Send的,因为如果克隆了 Rc<T>的值并尝试将克隆的所有权转移到另一个线程,那么这两个线程都可能同时更新引用计数。为此,Rc<T> 被实现为用于单线程场景。而Arc(Atomically Reference Counted)实现了Send trait,下面改为Arc


use std::thread;
use std::sync::Arc;
fn main() {
    let  r_c = Arc::new((0..50).collect::<Vec<i32>>());
    let mut handles = vec![];
    
    for _ in [0,4,8] {
        let data = Arc::clone(&r_c);
        let handle = thread::spawn(move || {
            //没有实现完整逻辑
            data[0] = 1;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", r_c);
}

执行一下得到的错误信息:

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:17:13
   |
17 |             data[0] = 1;
   |             ^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Vec<i32>>`

因为Arc<T> 没有实现 DerefMut trait 所以才会报错这个错误,怎么办? 最直接的想法就是为Arc实现DerefMut trait。这里换一种思考能不能在Arc与Vec之间加一层? 想要在没有实现DerefMut trait 的情况下 改变Vec中的数据第一想到是RefCell<T>,但是RefCell<T> 不能用于多线程,而Mutex 却是一个线程安全的RefCell<T> ,看到Mutex]中有与Arc配合使用的例子,下面尝试一下

use std::thread;
use std::sync::{Arc,Mutex};

fn main() {
    let  r_c = Arc::new(Mutex::new((0..50).collect::<Vec<i32>>()));
    let mut handles = vec![];
    
    for _ in [0,4,8] {
        let data = Arc::clone(&r_c);
        let handle = thread::spawn(move || {
            //没有实现完整逻辑
            let mut m = data.lock().unwrap();
            //let mut m = (*(data.deref())).lock().unwrap();
            m[0] = 1;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", r_c);
}

值得注意的是 data的类型是Arc<Mutex<Vec<i32>>> 为什么可以调用Mutex的lock方法? 因为Arc实现了 Deref trait,

impl<T> Deref for Arc<T>
where
    T: ?Sized,
type Target = T
    The resulting type after dereferencing.
fn deref(&self) -> &T
    Dereferences the value.

所以编译器会自动对data进行解引用,data.lock() 等价于 (*(data.deref())).lock()

当获取锁后lock返回一个LockResult<MutexGuard<'_, T>>

pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>

所以unwrap解包后 m 的类型是 MutexGuard<'_, T>

同理因为MutexGuard 实现了 Deref trait,

impl<T: ?Sized> Deref for MutexGuard<'_, T>
type Target = T
    The resulting type after dereferencing.
fn deref(&self) -> &T
    Dereferences the value.

编译器会自动解引用,所以可以写成 m[0] = 1;这种形式。

下面是完整实现

use std::thread;
use std::sync::{Arc,Mutex};
use std::collections::HashMap;

fn main() {
    let  r_c = Arc::new(Mutex::new((0..50).collect::<Vec<i32>>()));
    let mut handles = vec![];
    
     let index_value= HashMap::from([
        (0,1),
        (4,2),
        (8,3),
    ]);
   
    for (k,v) in index_value {
        let  c = Arc::clone(&r_c);
        let handle = thread::spawn( move || {
            modify_data(c,k,v);
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", r_c);
}

//此方法借用了此问题的回答者 **`Fractal`** 的实现
fn modify_data(c: Arc<Mutex<Vec<i32>>>, start: usize, increment: i32) {
    let mut data = c.lock().unwrap();
    for i in start..start+4 {
        data[i] += increment;
    }
}

Arc 配合 RwLock 方式的实现

use std::thread;
use std::sync::{Arc, RwLock};
use std::collections::HashMap;

fn main() {
  
    let c_lock = Arc::new(RwLock::new((0..50).collect()));
    let lock = Arc::clone(&c_lock);

    let mut handles = vec![];
    let index_value= HashMap::from([
        (0,1),
        (4,2),
        (8,3),
    ]);
   
    for (k,v) in index_value {
        let  c = Arc::clone(&lock);
        let handle = thread::spawn( move || {
            modify_data(c,k,v);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("{:?}", c_lock);
}

//此方法借用了此问题的回答者 **`Fractal`** 的实现
fn modify_data(c: Arc<RwLock<Vec<i32>>>, start: usize, increment: i32) {
    let mut data = c.write().unwrap();
    for i in start..start+4 {
        data[i] += increment;
    }
}


参考

modify_data 方法借用了此问题的回答者 Fractal 的实现
https://doc.rust-lang.org/std/sync/struct.Mutex.html#examples
https://kaisery.github.io/trpl-zh-cn/ch16-04-extensible-concu...

使用 Arc + Mutex 确实是明智的做法,它是安全的,并且运行结果完全符合预期。但是如果题主的这个需求是并行计算的话,那么使用了 Mutex 的方案显然是不满足的。然而,以 Rust 的设计哲学,你不可能在保证安全的前提下,从多个线程同时修改同一个对象。所以如果真的想要并行修改的话,就必然只能使用 unsafe 的手段。

并行修改

警告:不要轻易写这样的代码,除非你 100% 清楚你正在干什么。

use std::{
    slice,
    thread::{self, JoinHandle},
};

fn main() {
    let mut data: Vec<i32> = (0..50).collect();
    let threads = vec![
        thread_spawn_modify(&mut data[0..4], 1),
        thread_spawn_modify(&mut data[4..8], 2),
        thread_spawn_modify(&mut data[8..12], 3),
    ];

    for t in threads {
        t.join().unwrap();
    }
    assert_eq!(&data[0..12], &[1, 2, 3, 4, 6, 7, 8, 9, 11, 12, 13, 14])
}

fn thread_spawn_modify(data: &mut [i32], add_n: i32) -> JoinHandle<()> {
    // 非常危险的行为
    let data = unsafe { slice::from_raw_parts_mut(data.as_mut_ptr(), data.len()) };

    thread::spawn(move || {
        for d in data {
            *d += add_n;
        }
    })
}

用 Arc::new(Mutex::new(data)) 把数据包起来, 传到另外的线程中处理试试? 刚学, 也不太懂.
Arc多线程引用计数, Mutex互斥锁.

写了一段, 能运行, 是不是最佳实践就不知道了.

use std::thread;
use std::thread::JoinHandle;

fn m2() {
    let mut data: Vec<i32> = (1..50).collect();
    let data_mutex = Arc::new(Mutex::new(data));
    let threads = vec![
        thread_spawn(Arc::clone(&data_mutex), 0..4, 10),
        thread_spawn(Arc::clone(&data_mutex), 4..8, 20),
        thread_spawn(Arc::clone(&data_mutex), 8..12, 30),
    ];

    for x in threads {
        x.join().unwrap();
    }
    for x in data_mutex.lock().unwrap().iter() {
        println!("rs x = {x}");
    }
}

fn thread_spawn(data: Arc<Mutex<Vec<i32>>>, range: Range<usize>, i: i32) -> JoinHandle<()> {
    thread::spawn(move || {
        let mut guard = data.lock().unwrap();
        for x in range {
            guard[x] += i;
        }
    })
}
新手上路,请多包涵

按照rust的理念:不要共享内存来通讯;而是要通讯来共享内存。如果每个工作线程都可以修改队列里的任何元素,是不安全的。
建议考虑采用以下的方式来实现:

use std::sync::mpsc;
use std::thread;

fn main() {
    let materials: Vec<i32> = (0..50).collect();
    println!("materials: {:?}", materials);

    let count = 3_usize;
    let mut handles = vec![];
    let mut material_senders = vec![];

    let (product_sender, product_receiver) = mpsc::channel();

    for _ in 0..count {
        let (material_sender, material_receiver) = mpsc::channel();
        material_senders.push(material_sender);

        let sender = product_sender.clone();

        let handle = thread::spawn(move || {
            while let Ok((i, material)) = material_receiver.recv() {
                let product = material * 2;
                let _ = sender.send((i, product));
            }
            drop(sender);
        });

        handles.push(handle);
    }

    drop(product_sender);

    for (i,material) in materials.iter().enumerate() {
        let _ = material_senders[i % count].send((i, *material));
    }

    for sender in material_senders {
        drop(sender);
    }

    let mut tmp = vec![];
    while let Ok((i,product)) = product_receiver.recv() {
        tmp.push((i, product));
    }
    tmp.sort_by(|a, b| a.cmp(b));

    let products: Vec<i32> = tmp.iter().map(|(_, product)| *product).collect();
    println!("[main] products: {:?}", products);

    for handle in handles {
        handle.join().unwrap();
    }
}

以下回答来自 ChatGPT

use std::thread;

fn main() {
    let mut data: Vec<i32> = (0..50).collect();

    let threads = vec![
        thread::spawn(move || modify_data(&mut data, 0, 1)),
        thread::spawn(move || modify_data(&mut data, 4, 2)),
        thread::spawn(move || modify_data(&mut data, 8, 3)),
    ];

    for t in threads {
        t.join().unwrap();
    }

    println!("{:?}", data);
}

fn modify_data(data: &mut Vec<i32>, start: usize, increment: i32) {
    for i in start..start+4 {
        data[i] += increment;
    }
}

首先,我们定义了一个名为 data 的可变向量,并使用 collect() 方法将整数迭代器 (0..50) 转换为向量。

接下来,我们创建了一个名为 threads 的向量,其中包含三个线程。每个线程调用 modify_data 函数来修改向量中的元素。该函数接受一个指向向量的可变引用,一个起始索引和一个增量值。每个线程修改从起始索引开始的四个元素,将它们增加给定的增量。

最后,我们等待每个线程完成并打印修改后的 data 向量。

推荐问题
宣传栏