Многонишково програмиране

20 ноември 2018

Административни неща

Преговор

Преговор

Многонишково програмиране

Must be this tall

Fearless concurrency

Rust

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
}

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
}

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
}

Нишки

1 2 3 4 5 6 7
use std::thread;

fn main() {
    thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
}

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
    let _ = handle.join();
}
hi from the main thread
hi from the spawned thread

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
    let _ = handle.join();
}
hi from the main thread
hi from the spawned thread

Нишки

1 2 3 4 5 6 7 8
use std::thread;

fn main() {
    let handle = thread::spawn(|| println!("hi from the spawned thread"));

    println!("hi from the main thread");
    let _ = handle.join();
}
hi from the main thread
hi from the spawned thread

Нишки

Има и Builder с допълнителни опции

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread::Builder;

fn main() {
    let handle = Builder::new()
        .name("sirespawn".to_string())
        .spawn(|| {
            println!("hi from the spawned thread");
            panic!();
        })
        .expect("could not create thread");

    handle.join().unwrap();
}
hi from the spawned thread
thread 'sirespawn' panicked at 'explicit panic', /src/main_2.rs:8:13
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', libcore/result.rs:1009:5

Споделяне на стойности

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}

Споделяне на стойности

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handle = thread::spawn(|| {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function
 --> /src/main_3.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `nums`
7 |         for i in &nums {
  |                   ---- `nums` is borrowed here
help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^

Споделяне на стойности

1 2 3 4 5 6 7 8 9 10 11 12 13
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handle = thread::spawn(move || {
        for i in &nums {
            println!("number {}", i);
        }
    });

    let _ = handle.join();
}
number 0
number 1
number 2
number 3
number 4

Споделяне между няколко нишки

Това очевидно няма да работи

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
use std::thread;

fn main() {
    let nums = (0..5).collect::<Vec<_>>();

    let handles = (0..2)
        .map(|_| {
            //
            //
            thread::spawn(move || {
                for i in &nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> /src/main_5.rs:10:27
   |
4  |     let nums = (0..5).collect::<Vec<_>>();
   |         ---- captured outer variable
...
10 |             thread::spawn(move || {
   |                           ^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure

Споделяне между няколко нишки

Ами с Rc?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::rc::Rc;
use std::thread;

fn main() {
    let nums = Rc::new((0..5).collect::<Vec<_>>());

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}

Споделяне между няколко нишки

Ами с Rc?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::rc::Rc;
use std::thread;

fn main() {
    let nums = Rc::new((0..5).collect::<Vec<_>>());

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}
error[E0277]: `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely
  --> /src/main_6.rs:11:13
   |
11 |             thread::spawn(move || {
   |             ^^^^^^^^^^^^^ `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely
   |
   = help: within `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::vec::Vec<i32>>`
   = note: required because it appears within the type `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`
   = note: required by `std::thread::spawn`

Send и Sync

1 2 3
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static

Send и Sync

Send и Sync

Send - позволява прехвърляне на собственост между нишки

Send и Sync

Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)

Send и Sync

Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)

Send и Sync

Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)

Send и Sync

Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)

Send и Sync

Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)

Send и Sync

1
pub struct Token(u32);

Send и Sync

1
pub struct Token(u32);

Auto trait docs

Send и Sync

Send - позволява прехвърляне на собственост между нишки

Send и Sync

Send - позволява прехвърляне на собственост между нишки


Пример за типове, които не са Send:

Send и Sync

Send - позволява прехвърляне на собственост между нишки


Пример за типове, които не са Send:

Send и Sync

Send - позволява прехвърляне на собственост между нишки


Пример за типове, които не са Send:

Send и Sync

Send - позволява прехвърляне на собственост между нишки


Пример за типове, които не са Send:

Send и Sync

Send - позволява прехвърляне на собственост между нишки


Пример за типове, които не са Send:

Send + Sync

Sync - позволява споделяне между нишки през референция, т.е. &T е Send

Send + Sync

Sync - позволява споделяне между нишки през референция, т.е. &T е Send


Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:

Send + Sync

Sync - позволява споделяне между нишки през референция, т.е. &T е Send


Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:

Send + Sync

Sync - позволява споделяне между нишки през референция, т.е. &T е Send


Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:

Send + Sync

Sync - позволява споделяне между нишки през референция, т.е. &T е Send


Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:

Send + Sync

Sync - позволява споделяне между нишки през референция, т.е. &T е Send


Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:

Send + Sync

Sync - позволява споделяне между нишки през референция, т.е. &T е Send


Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:

Имплементация

1 2 3 4
struct MyBox(*mut u8);

unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Деимплементация

1 2 3 4 5 6 7
// Само на nightly
#![feature(optin_builtin_traits)]

struct SpecialToken(u8);

impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}

Деимплементация

Хак за stable

1 2 3
use std::marker::PhantomData;

struct SpecialToken(u8, PhantomData<*const ()>);

Arc

Да се върнем на кода, който не се компилираше

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::rc::Rc;
use std::thread;

fn main() {
    let nums = Rc::new((0..5).collect::<Vec<_>>());

    let handles = (0..2)
        .map(|_| {
            let nums = Rc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}
error[E0277]: `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely
  --> /src/main_6.rs:11:13
   |
11 |             thread::spawn(move || {
   |             ^^^^^^^^^^^^^ `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely
   |
   = help: within `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::vec::Vec<i32>>`
   = note: required because it appears within the type `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`
   = note: required by `std::thread::spawn`

Arc

Решението е да заменим Rc с Arc

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use std::sync::Arc;
use std::thread;

fn main() {
    let nums = Arc::new((0..5).collect::<Vec<_>>());

    let handles = (0..2)
        .map(|_| {
            let nums = Arc::clone(&nums);

            thread::spawn(move || {
                for i in &*nums {
                    println!("number {}", i);
                }
            })
        })
        .collect::<Vec<_>>();

    for h in handles {
        let _ = h.join();
    }
}
number 0
number 1
number 2
number 3
number 4
number 0
number 1
number 2
number 3
number 4

Arc

Arc

Arc

Arc

Примитиви за синхронизация

Примитиви за синхронизация

Стандартния пример за грешен многонишков алгоритъм:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;

let t1 = {
    let v = Arc::clone(&v);
    thread::spawn(move || for i in &v[0..50] { sum += i; })
};

let t2 = {
    let v = Arc::clone(&v);
    thread::spawn(move || for i in &v[51..100] { sum += i; })
};

let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
sum: 0

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем ли да го накараме да работи?

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Примитиви за синхронизация

Можем да го накараме да работи

Mutex

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use std::sync::Mutex;

fn main() {
    // мутекса опакова стойността, която предпазва
    let mutex = Mutex::new(10);

    {
        // заключваме мутекса
        // `lock` е умен указател с deref до `&T` и `&mut T`
        let mut lock = mutex.lock().unwrap();
        *lock += 32;

        // мутекса се отключва когато `lock` се деалокира
    }
}

Panic

Panic

Panic

Panic

Panic

Panic

Panic

Panic

Panic

Panic

Panic

Panic

Други примитиви за синхронизация

Други примитиви за синхронизация

Други примитиви за синхронизация

Други примитиви за синхронизация

Други примитиви за синхронизация

Lock-free алгоритми

Integer atomics

Integer atomics

Integer atomics

Integer atomics

Атомарни аритметични операции

Integer atomics

Атомарни аритметични операции

Integer atomics

Атомарни аритметични операции

Integer atomics

Атомарни аритметични операции

Integer atomics

Атомарни аритметични операции

Integer atomics

Операции по паметта

Integer atomics

Операции по паметта

Integer atomics

Операции по паметта

Integer atomics

Операции по паметта

Integer atomics

Операции по паметта

Integer atomics

Операции по паметта

Канали

MPSC

Канали

Go-lang motto

Don't communicate by sharing memory,
share memory by communicating

Канали в стандартната библиотека

1 2 3 4 5 6 7 8 9 10 11 12
use std::sync::mpsc;
use std::thread;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        sender.send(10).unwrap();
    });

    println!("received {}", receiver.recv().unwrap());
}
received 10

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

Типове канали

Неограничен канал

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
    sender.send(3).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Oграничен канал

Типове канали

Ограничен канал

1 2 3 4 5 6 7 8 9 10 11 12
let (sender, receiver) = mpsc::sync_channel(1);

thread::spawn(move || {
    // записва съобщението и връща веднага
    sender.send(1).unwrap();

    // ще блокира докато главната нишка не извика `receiver.recv()`
    sender.send(2).unwrap();
});

assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);

Множество изпращачи

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();

thread::spawn(move || {
    sender.send(1).unwrap();
    sender.send(2).unwrap();
});

thread::spawn(move || {
    sender2.send(3).unwrap();
    sender2.send(4).unwrap();
});

println!("{} {} {} {}",
    receiver.recv().unwrap(), receiver.recv().unwrap(),
    receiver.recv().unwrap(), receiver.recv().unwrap());
3 4 1 2

Sender

Методи

1 2 3
// изпраща `t`
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>

Sender

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

assert_eq!(sender.send(12), Ok(()));

// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);

// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));

SyncSender

Методи

1 2 3 4 5
// блокира ако буфера е пълен
fn send(&self, t: T) -> Result<(), SendError<T>>

// връща грешка ако буфера е пълен или получателят е бил унищожен
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>

SyncSender

Методи

1 2 3 4 5 6 7 8
let (sender, receiver) = mpsc::sync_channel(1);

assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));

mem::drop(receiver);

assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));

Множество получатели

Множество получатели

Множество получатели

Множество получатели

Receiver

Методи

1 2 3 4 5 6 7 8 9 10 11
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>

// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>

// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>

Receiver

Методи

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

while let Ok(msg) = receiver.recv() {
    println!("received {}", msg);
}

Receiver

Итератори

1 2 3 4 5 6 7 8 9 10 11
let (sender, receiver) = mpsc::channel();

thread::spawn(move || {
    for i in (0..50).rev() {
        sender.send(i).unwrap();
    }
});

for msg in receiver.iter() {
    println!("received {}", msg);
}

Други структури

Други структури

Други структури

Други структури

Други структури

Други структури

Други библиотеки

Други библиотеки

Други библиотеки

Други библиотеки

Други библиотеки

Други библиотеки

Други библиотеки

Въпроси