Rust —— 高级篇(2)

Zephyr Lv3

并发编程

创建线程

let handle = thread::spawn(|| {
    for i in 1..5 {
        println!("{i}")
    }
});
handle.join().unwrap();

thread::spawn会创建一个子线程,之后通过join方法,让主线程阻塞,等待子线程执行完毕。

如果子线程捕获了环境变量,那么必须要将这个环境变量的所有权交给对应的闭包。因为编译器无法确定子线程和环境变量究竟谁会先被回收,如果环境变量被回收了那么就会导致闭包函数访问无效的内存地址。

for j in 1..3 {
    let _handle = thread::spawn(move || {
        for i in 1..20 {
            println!("{i}, {j}")
        }
    });
}

上面的代码中将j的所有权全部转移到了闭包当中,如果没有进行所有权转移,对于编译器来说就无法确定闭包可以安全的使用环境变量。

线程终止

如果主线程终止,那么所有的线程都会被强制终止掉。但如果父线程不是主线程,那么父线程的结束不会影响到子线程。Rust中,线程的任务执行完毕,就会关闭线程。

但如果碰到某些特殊情况,线程的代码无法执行完毕。有以下几种可能:

  1. 线程的任务是一个循环IO读取,由于IO操作的大部分事件,线程都会处于阻塞状态,因此它并不会占用大量的CPU。
  2. 如果不是一个循环IO,且没有休眠操作,那么这个线程会直接跑满CPU核心,并且不会被终止。

线程屏障

线程屏障可以实现让指定的线程都完成任务之后,再继续向后执行

let barrier = Arc::new(Barrier::new(3));
let mut v = vec![];
(0..3).for_each(|i| {
    let b = barrier.clone();
    let h = thread::spawn(move || {
        println!("{i}");
        b.wait();
        println!("after {i}")
    });
    v.push(h);
});
for ele in v {
    ele.join().unwrap();
}

线程局部变量

thread_local! {static FOO: RefCell<u32> = RefCell::new(1)};
FOO.with(|f| {
    assert_eq!(*f.borrow(), 1);
    *f.borrow_mut() = 3;
});
let t = thread::spawn(|| {
    FOO.with(|f| {
        assert_eq!(*f.borrow_mut(), 1);
    });
});
t.join().unwrap()

thread_local!宏用于创建线程局部变量,这里用static标记FOO为静态生命周期。

这种方式通过借用来获取线程局部变量,但线程本身并不持有它的拷贝,如果希望线程能够持有它的独立拷贝,可以考虑使用第三方库thread-local

let tls = Arc::new(ThreadLocal::new());

// 创建多个线程
for _ in 0..5 {
    let tls2 = tls.clone();
    thread::spawn(move || {
        // 将计数器加1
        let cell = tls2.get_or(|| Cell::new(0));
        cell.set(cell.get() + 1);
    }).join().unwrap();
}

// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
let tls = Arc::try_unwrap(tls).unwrap();
let total = tls.into_iter().fold(0, |x, y| x + y.get());

// 和为5
assert_eq!(total, 5);

用条件控制线程的挂起与执行

let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
thread::spawn(move || {
    let (lock, cond) = &*pair2;
    let mut started = lock.lock().unwrap();
    *started = true;
    cond.notify_one();
});
let (lock, cond) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
    started = cond.wait(started).unwrap();
}
print!("start");

上面的代码中,主线程开启一个子线程,接着尝试获取mutex的互斥锁,started是里面的值。lock方法会阻塞当前线程直到成功获取锁,或出现意外。接着判断锁内的值是否符合判定要求,如果不符合,就进入等待状态。wait方法会自动释放传入参数代表的锁,然后等待信号到达。

子线程中,它会尝试获取锁,并修改锁的状态,然后通过条件变量唤醒主线程,让主线程继续执行。

PS:这里可以看到子线程中并没有释放锁的操作,因为在守护变量(就是started)超出作用域后会被自动释放。

线程同步

消息传递

Rust在标准库中提供了消息通道,通过不同的库来实现不同的收发场景,例如单发送者——多接收者或多发送者——多接收者。

多发送者,单接收者

这种情况需要使用mpsc库。下面看一个简单的例子

let (sender, receiver) = mpsc::channel();
use std::thread;
thread::spawn(move || {
    sender.send(1).unwrap();
});
dbg!("{}", receiver.recv().unwrap());

这里只使用了一个发送者,使用send()方法发送消息。接收端则用recv()接收消息,它会阻塞直到管道中出现可读的消息。如果不希望接收端阻塞,可以改用try_recv(),如果程序执行到该方法发现没有可读取的数据会直接返回一个错误。

上面的例子中传送了i32类型的数据,由于它实现了Copy特征,因此它会被拷贝一份后放入管道。如果是一个没有实现Copy的数据,则会直接转移所有权。

管道也可以作为迭代器使用,如果我们希望循环接收管道中的数据,可以使用for循环处理。而接收停止的标志则是发送者被drop掉。

let (sender, receiver) = mpsc::channel();
use std::thread;
thread::spawn(move || {
    let v = vec!["a", "b", "c"];
    for val in v {
        sender.send(val).unwrap();
    }
});
// 子线程执行完毕后,发送者会被回收掉,这就代表这条管道的传输已经完成了
for r in receiver {
    println!("{}", r);
}

如果希望使用多发送者,那么可以将发送者克隆一份交给子线程。

上面使用的都是异步通道,发送方不会阻塞。mpsc还提供了同步通道,只需要在创建时使用sync_channel()方法即可。同步通道的发送者会在等待接收者收到消息后再继续执行。

同步通道在创建时还要提供一个额外的参数,就是缓冲区的大小,发送者会在缓冲区达到上限时发生阻塞。

let (sender, receiver) = mpsc::sync_channel(1);
use std::thread;
let s1 = sender.clone();
thread::spawn(move || {
    println!("sender send");
    sender.send(1).unwrap();
    println!("sender finish");
});
thread::sleep(Duration::from_secs(1));
thread::spawn(move || {
    println!("s1 send");
    s1.send(2).unwrap();
    println!("s1 finish");
});
thread::sleep(Duration::from_secs(1));
receiver.recv().unwrap();
receiver.recv().unwrap();

上面的实例中,同步通道的缓冲区大小为1,因此sender可以顺利完成发送,而在s1发送时,缓冲区已满,因此它会被阻塞(由此也可以看出缓冲区是发送者共享的)。

通道关闭:当所有发送者或所有接收者被drop掉之后,通道就视为关闭。

Rust中的互斥锁为Mutex,下面是一个简单的使用案例

let m = Arc::new(Mutex::new(3));
let m1 = m.clone();
let h = thread::spawn(move || {
    println!("thread try to get lock");
    let mut t = m1.lock().unwrap();
    println!("thread get lock, cur value {}", *t);
    *t = 4;
    println!("thread finish, cur value {}", *t);
});
// 这里必须控制m的作用域。如果main先获取了锁,必须等m被drop才算释放锁,而最后添加了让main等待子线程的操作, 这会导致main无法结束,因为子线程无法获取锁,而要想子线程获取锁,m必须被drop掉,因此需要控制m的作用域
{
    println!("main try to get lock");
    let mut t = m.lock().unwrap();
    println!("main get lock, cur value {}", *t);
    *t = 5;
    println!("main finish, cur value {}", *t);
}

h.join().unwrap();

lock方法会阻塞当前线程,直到获取到锁。此外Mutex也是个智能指针,或者说lock方法会返回一个智能指针,这个智能指针指向被保护的数据。也可以使用try_lock来获取锁,它不会阻塞当前线程,在发现无法获取锁时会直接返回一个错误。

还有一种读写锁RwLock,他可以确保同一时间可以有多个读操作或一个写操作。Rust中读锁和写锁的区别就在于是否实现了DerefMut特征。

let lock = RwLock::new(3);
{
    let a = lock.read().unwrap();
    assert_eq!(*a, 3);
}
{
    let mut b = lock.write().unwrap();
    *b += 1;
    assert_eq!(*b, 4);
}

虽然看上去读写锁好像非常完美,但实际上由于它的实现比较复杂,在性能上是远不及Mutex的,并且在锁管理上也比较复杂。此外Rust中的读写锁是偏向读者的,有可能会导致写线程饥饿。

因此,使用RwLock应满足以下条件:并发读,并且读操作耗时较长。

条件变量

条件变量通常与一个Mutex关联,Condvarwaitnotify方法用于处理线程同步,而绑定的Mutex则决定线程同步的时机。

条件变量之所以要与一个Mutex绑定的原因是,使用条件变量进行线程同步必然涉及共享内存的修改,因此wait方法会直接释放传入的锁,同时将这个环境变量与传入的锁绑定,当收到notify消息并且锁处于空闲状态时,就会激活当前线程,bing

下面的代码实现了父子线程交替输出的功能。

use std::thread::{sleep, spawn};
use std::time::Duration;
let flag = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
let cflag = flag.clone();
let ccond = cond.clone();

let hdl = spawn(move || {
    // 用括号包裹获取锁的代码块,保证获取完之后直接释放
    let mut m = { *cflag.lock().unwrap() };
    let mut counter = 0;

    while counter < 3 {
        while !m {
            // 循环等待m为true的时刻,使用循环的原因是避免虚假唤醒导致线程在不满足情况的条件下提前苏醒
            m = *ccond.wait(cflag.lock().unwrap()).unwrap();
        }

        {
            // 再次锁定
            m = false;
            *cflag.lock().unwrap() = false;
        }

        counter += 1;
        println!("inner counter: {}", counter);
    }
});

let mut counter = 0;
loop {
    sleep(Duration::from_millis(1000));
    *flag.lock().unwrap() = true;
    counter += 1;
    if counter > 3 {
        break;
    }
    println!("outside counter: {}", counter);
    cond.notify_one();
}
hdl.join().unwrap();
println!("{:?}", flag);

原子操作与内存顺序

Rust也提供了原子类型用于提供对原子操作的支持。相比于锁和消息传递,它的性能更加优秀。因为它的底层使用循环CAS实现。

const N_THREAD: u32 = 10;
const N_TIMES: u32 = 10;

static COUNT: AtomicU32 = AtomicU32::new(0);

#[test]
fn atomic_test() {
    let s = Instant::now();
    let mut threads = vec![];
    for _ in 0..N_THREAD {
        threads.push(spawn(|| {
            for _ in 0..N_TIMES {
                COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            }
        }))
    }
    for t in threads {
        t.join().unwrap();
    }
    assert_eq!(N_THREAD * N_TIMES, COUNT.load(std::sync::atomic::Ordering::Relaxed));
    println!("spend {:?}", Instant::now().sub(s));
}

上面是一个原子类型的简单使用,这里重点关注Ordering这个枚举,他代表的是内存顺序限定规则

内存顺序

编译器的指令重排序或者CPU核心缓存的影响都有可能导致程序实际执行顺序与预期不符,Ordering就是用来限定内存排序规则的,他有5种取值:

  1. Relaxed:不做任何处理
  2. Release:要求线程对共享变量的写操作必须在之前的读操作之后发生
  3. Acquire:要求线程对共享变量的读操作必须在后续的写操作之前发生
  4. AcqRel:上面二者的结合
  5. SeqCst:顺序一致性,SeqCst就像是AcqRel的加强版,他额外保证了全局一致性。
static mut DATA: u64 = 0;
static READY: AtomicBool = AtomicBool::new(false);

fn producer() -> JoinHandle<()> {
    spawn(|| {
        unsafe { DATA = 100 };
        // 保证这里的修改可以被其他线程看到
        READY.store(true, Ordering::Release);
    })
}

fn consumer() -> JoinHandle<()> {
    spawn(|| {
        // 保证读取操作之前已经完成了所有在自己之前的写操作
        while !READY.load(Ordering::Acquire) {
            assert_eq!(100, unsafe { DATA });
        }
    })
}

Send和Sync

之前提到过Rc是无法在多线程环境下使用的,因为他并没有有保证自己的引用计数器是线程安全的。但编译器显然不可能去主动检查他有没有线程安全的实现,它是通过标记特征SyncSend来了解相关的信息的。

// Rc源码片段
impl<T: ?Sized> !marker::Send for Rc<T> {}
impl<T: ?Sized> !marker::Sync for Rc<T> {}

// Arc源码片段
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}

上面是RcArc的源码片段,可以看到RcSendSync被移除了实现,编译器借此了解到Rc线程不安全,不能在多线程使用。

再讲讲上面提到的两个特征引用:

  • 实现Send的类型可以在线程间安全的传递其所有权
  • 实现Sync的类型可以在线程间安全的共享(通过引用)

下面来看点例子

unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}

unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}

上面分别是RwLockMutexSync实现,但我们主要关注的是泛型约束,可以看到RwLock要求泛型实现Sync特征,因为他允许多线程并发读取,这就要求数据可以在线程间安全共享。而Mutex就没这个需求,它是互斥的,因此他也就不需要传入的类型可以共享。

全局变量

面对全局变量,首先可以肯定的一点就是它的生命周期必然是'static,但不代表必须用static来声明,像是常量和字符串字面值这些都会被直接打包到二进制可执行文件中。

静态常量/变量

静态常量会在编译期直接初始化,这也就代表它的值必须可以在编译期就确定。

const MAX_ID: usize = usize::MAX / 2;

编译器在面对常量时会尽可能将其内联到代码中,因此不同地方对同一常量的引用可能会指向不同的地址。

静态变量允许声明一个全局的变量,常用于全局数据统计

static mut REQUEST_RECV: usize = 0;

同时Rust要求必须在unsafe语句块中才能访问和修改静态变量,因为在多线程环境下进行这类操作会不可避免的遇到脏数据。

因此只有在同一线程内或不在乎数据的准确性时才会使用全局变量。

运行期初始化

上面的代码都是基于编译期进行初始化,它的问题就是我们无法通过函数,或者只有在运行期才能加载到的数据进行初始化。

为此Rust提供了以下方式协助我们进行初始化

lazy_static

这是社区提供的用于懒加载静态变量的宏,它可以帮助我们在运行期初始化静态变量

lazy_static = "1.4.0"
use std::sync::Mutex;

use lazy_static::lazy_static;

lazy_static!{
    static ref NAMES: Mutex<String> = Mutex::new(String::from("ij"));
}

上面就是该库的一个简单使用。该宏匹配的时static ref,因此定义的静态变量都是不可变引用。

但我们使用lazy_static访问静态变量时,会有轻微的性能损失,因为它内部使用了一个底层的并发原语std::sync::Once,每次访问该变量时,都会执行一次原子指令用于确认静态变量是否完成初始化。

Box::leak

这是之前提到过的一个函数,它可以进行主动的内存泄露,将某个变量的生命周期提升为整个程序生命周期。它用来初始化静态变量也十分好用。

static mut CONFIG: Option<&mut String> = None;

#[test]
fn fe() {
    let b = Box::new(String::from("a"));
    unsafe { CONFIG = Some(Box::leak(b)) };
}

错误处理

组合器

自定义错误类型

Rust在标准库中提供了一些可复用的特征,其中std::error::Error就是用于错误处理的特征

use std::fmt::{Debug, Display};

pub trait Error: Debug + Display {
    fn source(&self) -> Option<&(Error + 'static)> { ... }
}

从上面的源代码可以看到,Error特征需要实现DebugDisplay特征。

实际上,只需要实现DebugDisplay就可以作为一个自定义错误类型了

use std::fmt;

#[derive(Debug)]
struct AppError {
    code: u32,
    message: String,
}

impl fmt::Display for AppError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self.code {
            404 => write!(f, "not found "),
            _ => write!(f, "fail "),
        };
        write!(f, "{}", self.message)
    }
}

#[test]
fn error_test() {
    let err: Result<(), AppError> = Err(AppError {code: 1, message: "oi".to_string()});
    match err {
        Ok(_) => todo!(),
        Err(e) => eprintln!("{}", e),
    }
}

程序中的错误类型繁多,我们肯定希望可以将它们全部转换成自定义的错误类型,这就需要用到std::convert::From特征。

impl From<io::Error> for AppError {
    fn from(value: io::Error) -> Self {
        AppError { code: 200, message: value.to_string() }
    }
}

fn err_func() -> Result<(), AppError> {
    let _file = File::open("notexist.txt")?;
    Ok(())
}

上面是一个简单的用例,实现了从io::Error转换到AppError,而实现之后,我们就可以利用到前面提到的?运算符进行错误传播,它会自动进行错误类型转换。

归一化不同的错误类型

归一化错误类型有两种方案:

  1. 使用特征对象
  2. 自定义错误类型(上面提到的)

二者的区别就在于,第一种方法相对来说更简单,而第二种实现虽然繁琐但自定义程度较高

不过总体来说上面的两种方案还是偏繁琐,我们可以借助一些第三方库 实现自己需要的功能

Unsafe

Unsafe提供的能力:

  1. 解引用裸指针
  2. 调用一个unsafe或外部的函数
  3. 访问或修改一个可变的静态变量
  4. 实现一个unsafe特征
  5. 访问union中的字段

解引用裸指针

裸指针的语法格式:*const T*mut T

let mut num = 5;
let r1 = &num as *const i32;
let r2 = &mut num as *mut i32;

相比于引用和智能指针,裸指针可以:

  • 绕过借用规则
  • 不保证指向合法内存
  • 没有自动回收

PS:上面创建裸指针的操作并没有放在unsafe代码块中,因为从引用创建裸指针是安全的,解引用才是不安全的。不过,如果我们基于内存地址创建裸指针,就不是安全的了。

下面是使用unsafe的一个例子,我们将一个数组切成两半,并获取它们的可变引用。如果不使用unsafe这是无法完成的,因为我们同时获取了一个数组的两个可变借用,但如果使用unsafe我们就可以绕过借用规则。

fn split(arr: &mut [i32], mid: usize) -> (&mut [i32], &mut[i32]) {
    let len = arr.len();
    let ptr = arr.as_mut_ptr();
    assert!(mid <= len);
    unsafe {
        (
            slice::from_raw_parts_mut(ptr, mid),
            from_raw_parts_mut(ptr.add(mid), len - mid)
        )
    }
}

unsafe函数

unsafe函数在形式上与普通函数的区别就是在函数声明前加了个unsafe,这相当于在告诉使用者Rust无法担保使用者在使用该函数时可以满足他所需的所有需求。

unsafe函数必须在unsafe代码块中调用

// TODO

Macro宏编程

宏和函数的区别:

  • 元编程

    宏是通过一种代码来生成另一种代码,它可以有效减少代码量

  • 可变参数

    Rust中的函数签名是固定的,而宏可以拥有可变数量的参数

  • 宏展开

    宏会被展开成其他代码,并且这个过程发生在编译之前,因此宏可以帮助我们为类型实现某个te’zhen

异步编程

Rust中使用的异步模型是async。由于其底层原理较为复杂,此处暂时不作探究。

这里简单讲讲Rust中的async与其他语言的区别:

  • Future在Rust中是惰性的,只有在轮询时才会运行
  • Async在Rust中使用开销为0
  • 没有内置异步调用必需的运行时
  • 运行时同时支持单线程和多线程

简单应用

async fn go() {
    println!("go")
}

#[test]
fn async_test() {
    block_on(go());
}

由于Rust的future是惰性的,因此如果我们直接调用go(),函数其实并不会执行,只有当我们用block_on处理该异步函数时,才会执行它的内容。block_on会阻塞当前线程,直到传入的异步函数执行完毕。

如果我们需要在一个async函数中去调用另一个async fn,并且等待其完成后再执行别的代码,可以用到.await关键字。

async fn go() {
    pre1().await;
    println!("===");
    pre2().await;
    println!("go")
}

block_on不同,.await不会阻塞当前线程,方法会在遇到.await的地方挂起,然后将控制权返回给调用者,下面的例子展示了这一点

struct Song {
    author: String,
    name: String
}

async fn learn_song() -> Song {
    Song {
        author: "a".to_string(),
        name: String::from("b")
    }
}

async fn sing_song(song: Song) {
    println!("{} {}", song.author, song.name)
}

async fn dance() {
    println!("dance")
}

async fn learn_and_sing() {
    // await等待执行完成,之后再去执行唱歌
    let song = learn_song().await;
    // 这里await等待线程休眠结束,此时会将控制权返还给调用者,drive会继续调用dance,而不会在这里阻塞等待休眠结束
    sleep(Duration::from_secs(1)).await;
    sing_song(song).await;
}

async fn drive() {
    let f1 = learn_and_sing();
    let f2 = dance();
    // join!可以并发处理和等待多个future,它会选择当前没有阻塞的部分继续执行,如果全部被阻塞,他就会让出线程所有权,交还给调用者
    join!(f1, f2);
}

#[test]
fn feature() {
    block_on(drive());
}

Future执行与任务调度

Future的定义:一个可以产出值的异步计算

一个简化版的Future特征如下

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending
}

这里的poll函数是供执行器来调用的(block_on)就是一种执行器,可以推进Future的进一步执行。该函数会返回一个枚举,枚举表示本次poll的结果,如果本次poll直接完成了任务,就会携带结果返回。如果没有完成(猜测:有限时间片内没有完成作业,或陷入阻塞状态),就会返回一个Pending状态。对于处于Pending状态的Future,一开始传入的wake函数就能派上用场了:当Future再次准备好了执行,就会调用wake函数,然后管理该Future的执行器就会再次调用它。

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // socket有数据,写入buffer中并返回
            Poll::Ready(self.socket.read_buf())
        } else {
            // socket中还没数据
            //
            // 注册一个`wake`函数,当数据可用时,该函数会被调用,
            // 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

之前提到的join!可以实现两个future的并发执行,这里实现一个简单的版本,其思路就是将两个Future交给同一个Future进行管理。

struct Join<FutureA, FutureB> {
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }
        if self.a.is_none() && self.b.is_none() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

前面讲到Rust的Future是惰性的,这意味着只有执行器对它调用了poll方法,这个Future对应的操作才会开始执行。不过首次调用完成之后,执行器就不必再主动调用了。之后Future会通过调用wake函数来告诉执行器可以再次对他使用poll

下面是一个定时器Future的简单实现

struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

impl Future for TimerFuture {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 每次调用poll都会拷贝一次waker,
            // 因为TimerFuture可以在执行器中多个任务间移动
            // 如果不进行拷贝,可能会导致获取到的waker与当前任务不匹配
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));
        let t = shared_state.clone();
        spawn(move || {
            sleep(duration);
            let mut shared_state = t.lock().unwrap();
            shared_state.completed = true;
            // 计时完毕,通知执行器可以poll获取结果
            if let Some(waker) = shared_state.waker.take() {
                waker.wake();
            }
        });
        TimerFuture { shared_state }
    }
}

下面是执行器实现

大致的思路是:Spawner负责不断将新的任务投送到执行队列中去,Exectuor不断从执行队列中取出任务进行处理。Task需要实现Waker特征用于处理单次poll 无法完成任务的情况,这里的操作是将任务重新投放到执行队列中去

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("queue is full");
    }
}

/// 给Task实现Waker特征,这里会将任务拷贝一份发送回执行队列中去
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("queue is full");
    }
}

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // 获取最新的future
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 获取苏醒函数
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // 再次对该future调用poll,如果没有执行完,复原该task
                if future.as_mut().poll(context).is_pending() {
                    *future_slot = Some(future);
                }
            }
        }
    }
}

#[test]
fn feature() {
    let (executor, spawner) = new_executor_and_spawner();
    spawner.spawn(async {
        println!("howdy");
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done");
    });
    drop(spawner);
    executor.run();
}

async/.await是语法的一部分,他在遇到阻塞操作时,会选择让出当前线程,而非阻塞线程。async声明的函数或代码块会返回一个实现Future特征的值。

async生命周期

async fn的函数如果有引用类型的参数,那么它返回的Future的生命周期也会受到影响

async fn foo(x: &u8) -> u8 { *x }

// 上面的函数跟下面的函数是等价的:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

这意味着传入参数的生命周期必须比Future更长。一般来说,如果对应的函数调用await,就不会发生问题,因为函数不会再继续执行下去,对应的参数肯定还被保留着。但如果我们要把Future暂存起来,或者发送到其他线程中去,就会出现问题,因为编译器无法确定传入的参数是否比Future活得更久。

fn async_life() -> impl Future<Output = u8> {
    let x = 5;
    foo(&x)
}

async fn foo(x: &u8) -> u8 {*x}

这里因为我们要把Future返回出去以备它用,而编译器发现x在函数结束后就会被drop,因此这里就会报错。

要想解决这个问题也很简单,使用async代码块将相关代码包裹起来,即可将生命周期提升到'static

async fn borrow_x(x: &u8) -> u8 { *x }

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

这里讲以下我的理解:用async代码块包裹之后,x和borrow_x的作用域就相同了,保证x能活得比对应的Future更久。

async也同样支持move关键字,它会将环境变量的所有权直接转移进来,这样我们就可以避免借用规则的限制,上面的示例也可以用这种方法解决。

如果使用多线程Future执行器,Future可能会在线程间移动,因为函数体内任何一个.await都可能导致阻塞,而它再次被调用时不一定还是由原先的线程来执行。

Pin And Unpin

Pin在Rust的作用是阻止某个值在内存中移动,它的一个应用就是处理自引用。之前我们对自引用的处理就是简单粗暴的用指针来处理。

struct Test {
    a: String,
    b: *const String,
}

这可以帮我们绕过借用规则的限制,不过也引入了另一个问题,b表示的是指向a的指针,但是如果a发生了移动,比如将它和一个新的字符串进行绑定,这个操作对于b来说是完全无感知的,除非程序员手动进行重新赋值,否则b会一直指向原先的那个值。因此Pin的作用就是再加一层限制,告诉程序员这里的值不能移动。

有一个和Pin相关的特征 —— Unpin,他表示的就是值可以在内存中安全的移动。所有可以被Pin住的值的特征必须是!Unpin,否则Pin也发挥不了什么作用。

实践运用

将值固定在栈上

use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}


impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned, // 这个标记可以让我们的类型自动实现特征`!Unpin`
        }
    }

    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

此时如果我们再尝试移动,会直接在编译期收到报错,因此它是没有运行时开销的。

fn main() {
   let mut test1 = Test::new("test1");
   let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
   Test::init(test1_pin.as_mut());

   drop(test1_pin);
   println!(r#"test1.b points to "test1": {:?}..."#, test1.b);

   let mut test2 = Test::new("test2");
   mem::swap(&mut test1, &mut test2);
   println!("... and now it points nowhere: {:?}", test1.b);
}

将值固定在堆上

use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}

impl Test {
    fn new(txt: &str) -> Pin<Box<Self>> {
        let t = Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned,
        };
        let mut boxed = Box::pin(t);
        let self_ptr: *const String = &boxed.as_ref().a;
        unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
        boxed
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        unsafe { &*(self.b) }
    }
}

pub fn main() {
    let test1 = Test::new("test1");
    let test2 = Test::new("test2");

    println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
    println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}
  • 标题: Rust —— 高级篇(2)
  • 作者: Zephyr
  • 创建于 : 2023-05-16 23:37:46
  • 更新于 : 2023-05-16 23:38:41
  • 链接: https://faustpromaxpx.github.io/2023/05/16/rust-ad2/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论