Многонишково програмиране
20 ноември 2018
Административни неща
- Домашно 2
Преговор
- Entirely too many linked lists
Преговор
- Entirely too many linked lists
- има качени слайдове
Многонишково програмиране

Fearless concurrency

Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from the spawned thread"));
println!("hi from the main thread");
}
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from the spawned thread"));
println!("hi from the main thread");
}
1:1multithreading - нишките в rust са нишки на операционната система
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from the spawned thread"));
println!("hi from the main thread");
}
1:1multithreading - нишките в rust са нишки на операционната системаthread::spawnпуска нова нишка и изпълнява подадената функция в нея
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from the spawned thread"));
println!("hi from the main thread");
}
1:1multithreading - нишките в rust са нишки на операционната системаthread::spawnпуска нова нишка и изпълнява подадената функция в нея- когато функцията завърши, нишката се спира
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from the spawned thread"));
println!("hi from the main thread");
let _ = handle.join();
}
hi from the main thread hi from the spawned thread
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from the spawned thread"));
println!("hi from the main thread");
let _ = handle.join();
}
hi from the main thread hi from the spawned thread
- програмата приключва когато главната нишка завърши
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from the spawned thread"));
println!("hi from the main thread");
let _ = handle.join();
}
hi from the main thread hi from the spawned thread
- програмата приключва когато главната нишка завърши
- можем да използваме
joinза да изчакаме пуснатите нишки
Нишки
Има и Builder с допълнителни опции
use std::thread::Builder;
fn main() {
let handle = Builder::new()
.name("sirespawn".to_string())
.spawn(|| {
println!("hi from the spawned thread");
panic!();
})
.expect("could not create thread");
handle.join().unwrap();
}
hi from the spawned thread thread 'sirespawn' panicked at 'explicit panic', /src/main_2.rs:8:13 note: Run with `RUST_BACKTRACE=1` for a backtrace. thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', libcore/result.rs:1009:5
Споделяне на стойности
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
Споделяне на стойности
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> /src/main_3.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ^^^^^^^
Споделяне на стойности
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
number 0 number 1 number 2 number 3 number 4
Споделяне между няколко нишки
Това очевидно няма да работи
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handles = (0..2)
.map(|_| {
//
//
thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure --> /src/main_5.rs:10:27 | 4 | let nums = (0..5).collect::<Vec<_>>(); | ---- captured outer variable ... 10 | thread::spawn(move || { | ^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
Споделяне между няколко нишки
Ами с Rc?
use std::rc::Rc;
use std::thread;
fn main() {
let nums = Rc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
Споделяне между няколко нишки
Ами с Rc?
use std::rc::Rc;
use std::thread;
fn main() {
let nums = Rc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely --> /src/main_6.rs:11:13 | 11 | thread::spawn(move || { | ^^^^^^^^^^^^^ `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely | = help: within `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::vec::Vec<i32>>` = note: required because it appears within the type `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]` = note: required by `std::thread::spawn`
Send и Sync
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
Send и Sync
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)
- marker traits
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)
- marker traits
- имплементирани са за повечето типове
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)
- marker traits
- имплементирани са за повечето типове
- аuto traits - имплементират се автоматично ако всичките полета са съответно
SendиSync
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Sync - позволява споделяне между нишки през референция (&T)
- marker traits
- имплементирани са за повечето типове
- аuto traits - имплементират се автоматично ако всичките полета са съответно
SendиSync - unsafe traits - unsafe са за ръчна имплементация
Send и Sync
pub struct Token(u32);
Send и Sync
pub struct Token(u32);

Send и Sync
Send - позволява прехвърляне на собственост между нишки
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Пример за типове, които не са Send:
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Пример за типове, които не са Send:
Rc
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Пример за типове, които не са Send:
Rc*const Tи*mut T
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Пример за типове, които не са Send:
Rc*const Tи*mut T- thread local типове, например
rand::rngs::ThreadRng
Send и Sync
Send - позволява прехвърляне на собственост между нишки
Пример за типове, които не са Send:
Rc*const Tи*mut T- thread local типове, например
rand::rngs::ThreadRng - и други
Send + Sync
Sync - позволява споделяне между нишки през референция, т.е. &T е Send
Send + Sync
Sync - позволява споделяне между нишки през референция, т.е. &T е Send
Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:
Send + Sync
Sync - позволява споделяне между нишки през референция, т.е. &T е Send
Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:
Cell
Send + Sync
Sync - позволява споделяне между нишки през референция, т.е. &T е Send
Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:
CellRefCell
Send + Sync
Sync - позволява споделяне между нишки през референция, т.е. &T е Send
Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:
CellRefCellRc
Send + Sync
Sync - позволява споделяне между нишки през референция, т.е. &T е Send
Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:
CellRefCellRc*const Tи*mut T
Send + Sync
Sync - позволява споделяне между нишки през референция, т.е. &T е Send
Tипове, които не са Sync, обикновено имат internal mutability без синхронизация, например:
CellRefCellRc*const Tи*mut T- и други
Имплементация
struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
- но може да пишем код, който разчита, че определен тип не може да се прехвърля / споделя
Деимплементация
Хак за stable
use std::marker::PhantomData;
struct SpecialToken(u8, PhantomData<*const ()>);
Arc
Да се върнем на кода, който не се компилираше
use std::rc::Rc;
use std::thread;
fn main() {
let nums = Rc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely --> /src/main_6.rs:11:13 | 11 | thread::spawn(move || { | ^^^^^^^^^^^^^ `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely | = help: within `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::vec::Vec<i32>>` = note: required because it appears within the type `[closure@/src/main_6.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]` = note: required by `std::thread::spawn`
Arc
Решението е да заменим Rc с Arc
use std::sync::Arc;
use std::thread;
fn main() {
let nums = Arc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Arc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
number 0 number 1 number 2 number 3 number 4 number 0 number 1 number 2 number 3 number 4
Arc
Arc
- Atomic Reference Counter
Arc
- Atomic Reference Counter
- аналогично на Rc, но използва атомарни операции за броене на референциите
Arc
- Atomic Reference Counter
- аналогично на Rc, но използва атомарни операции за броене на референциите
- може да се използва за споделяне на стойности между нишки, ако
T: Send + Sync
Примитиви за синхронизация
Примитиви за синхронизация
Стандартния пример за грешен многонишков алгоритъм:
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;
let t1 = {
let v = Arc::clone(&v);
thread::spawn(move || for i in &v[0..50] { sum += i; })
};
let t2 = {
let v = Arc::clone(&v);
thread::spawn(move || for i in &v[51..100] { sum += i; })
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
sum: 0
Примитиви за синхронизация
Можем ли да го накараме да работи?
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32-spawnне позволява заради'statictrait bound-а
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32-spawnне позволява заради'statictrait bound-аArc<i32>- нямаме как да модифицираме съдържанието
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32-spawnне позволява заради'statictrait bound-аArc<i32>- нямаме как да модифицираме съдържаниетоArc<Cell<i32>>,Arc<RefCell<i32>>-CellиRefCellне саSync
Примитиви за синхронизация
Можем да го накараме да работи
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>- atomic integers
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>- atomic integers
- да връщаме резултат от нишката
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>- atomic integers
- да връщаме резултат от нишката
- …
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>- atomic integers
- да връщаме резултат от нишката
- …
С две думи трябва да го напишем правилно, за да се компилира
Mutex
use std::sync::Mutex;
fn main() {
// мутекса опакова стойността, която предпазва
let mutex = Mutex::new(10);
{
// заключваме мутекса
// `lock` е умен указател с deref до `&T` и `&mut T`
let mut lock = mutex.lock().unwrap();
*lock += 32;
// мутекса се отключва когато `lock` се деалокира
}
}
Panic
Panic
panic!в главната нишка спира програмата
Panic
panic!в главната нишка спира програматаpanic!в друга нишка спира нишката
Panic
panic!в главната нишка спира програматаpanic!в друга нишка спира нишкатаJoinHandle::joinвръща резултат
Panic
panic!в главната нишка спира програматаpanic!в друга нишка спира нишкатаJoinHandle::joinвръща резултатOk(T)ако функцията е завършила успешно
Panic
panic!в главната нишка спира програматаpanic!в друга нишка спира нишкатаJoinHandle::joinвръща резултатOk(T)ако функцията е завършила успешноErr(Box<Any>)ако е имало паника
Panic
Panic
- ако нишка е заключила mutex и се панира по това време, може данните пазени от mutex-а да са невалидни
Panic
- ако нишка е заключила mutex и се панира по това време, може данните пазени от mutex-а да са невалидни
- mutex-а се зачита за отровен (mutex poisoning)
Panic
- ако нишка е заключила mutex и се панира по това време, може данните пазени от mutex-а да са невалидни
- mutex-а се зачита за отровен (mutex poisoning)
Mutex::lock()иMutex::try_lock()връщат резултат
Panic
- ако нишка е заключила mutex и се панира по това време, може данните пазени от mutex-а да са невалидни
- mutex-а се зачита за отровен (mutex poisoning)
Mutex::lock()иMutex::try_lock()връщат резултатOk(MutexGuard)
Panic
- ако нишка е заключила mutex и се панира по това време, може данните пазени от mutex-а да са невалидни
- mutex-а се зачита за отровен (mutex poisoning)
Mutex::lock()иMutex::try_lock()връщат резултатOk(MutexGuard)Err(PoisonError)
Други примитиви за синхронизация
Други примитиви за синхронизация
RwLock
Други примитиви за синхронизация
RwLockCondvar
Други примитиви за синхронизация
RwLockCondvarBarrier
Други примитиви за синхронизация
RwLockCondvarBarrier- вижте
std::sync
Lock-free алгоритми
Integer atomics
- AtomicBool
Integer atomics
- AtomicBool
- AtomicUsize, AtomicIsize, AtomicPtr
Integer atomics
- AtomicBool
- AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, … (nightly)
Integer atomics
Атомарни аритметични операции
- fetch_add
Integer atomics
Атомарни аритметични операции
- fetch_add
- fetch_xor
Integer atomics
Атомарни аритметични операции
- fetch_add
- fetch_xor
- …
Integer atomics
Атомарни аритметични операции
- fetch_add
- fetch_xor
- …
- удобни са за създаване на различни броячи и други неща
Integer atomics
Атомарни аритметични операции
- fetch_add
- fetch_xor
- …
- удобни са за създаване на различни броячи и други неща
- препоръчително да се използват пред
Mutex<i32>
Integer atomics
Операции по паметта
- load
Integer atomics
Операции по паметта
- load
- store
Integer atomics
Операции по паметта
- load
- store
- compare_and_swap
Integer atomics
Операции по паметта
- load
- store
- compare_and_swap
- …
Integer atomics
Операции по паметта
- load
- store
- compare_and_swap
- …
- използват се за имплементация на lock-free алгоритми и структури от данни
Integer atomics
Операции по паметта
- load
- store
- compare_and_swap
- …
- използват се за имплементация на lock-free алгоритми и структури от данни
- но няма да задълбаваме в тях
Канали

Канали
Go-lang motto
Don't communicate by sharing memory,
share memory by communicating
Канали в стандартната библиотека
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(10).unwrap();
});
println!("received {}", receiver.recv().unwrap());
}
received 10
Типове канали
Неограничен канал
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()(Sender, Receiver)
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()(Sender, Receiver)- изпращане на съобщение никога не блокира
Типове канали
Неограничен канал
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
Типове канали
Oграничен канал
- bounded / "synchronous"
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)(SyncSender, Receiver)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)(SyncSender, Receiver)- има буфер за
kсъобщения
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)(SyncSender, Receiver)- има буфер за
kсъобщения - изпращане на съобщения ще блокира ако буфера е пълен
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)(SyncSender, Receiver)- има буфер за
kсъобщения - изпращане на съобщения ще блокира ако буфера е пълен
- при
k = 0става "rendezvous" канал
Типове канали
Ограничен канал
let (sender, receiver) = mpsc::sync_channel(1);
thread::spawn(move || {
// записва съобщението и връща веднага
sender.send(1).unwrap();
// ще блокира докато главната нишка не извика `receiver.recv()`
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
Множество изпращачи
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
});
thread::spawn(move || {
sender2.send(3).unwrap();
sender2.send(4).unwrap();
});
println!("{} {} {} {}",
receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
3 4 1 2
Sender
Методи
// изпраща `t`
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>
Sender
Методи
let (sender, receiver) = mpsc::channel();
assert_eq!(sender.send(12), Ok(()));
// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);
// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
SyncSender
Методи
// блокира ако буфера е пълен
fn send(&self, t: T) -> Result<(), SendError<T>>
// връща грешка ако буфера е пълен или получателят е бил унищожен
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>
SyncSender
Методи
let (sender, receiver) = mpsc::sync_channel(1);
assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));
mem::drop(receiver);
assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
Множество получатели
Множество получатели
- не може - каналите са multi-producer, single-consumer
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiverне може да се клонира
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiverне може да се клонираReceivereSend, но не еSync
Receiver
Методи
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>
// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>
// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
Receiver
Методи
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
while let Ok(msg) = receiver.recv() {
println!("received {}", msg);
}
Receiver
Итератори
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
for msg in receiver.iter() {
println!("received {}", msg);
}
Други структури
Други структури
- crossbeam
Други структури
- crossbeam
- https://crates.io/crates/crossbeam
Други структури
- crossbeam
- https://crates.io/crates/crossbeam
- MPMC channel с опция за select по няколко канала
Други структури
- crossbeam
- https://crates.io/crates/crossbeam
- MPMC channel с опция за select по няколко канала
- lock-free структури от данни - опашка, стек, deque
Други структури
- crossbeam
- https://crates.io/crates/crossbeam
- MPMC channel с опция за select по няколко канала
- lock-free структури от данни - опашка, стек, deque
- и доста utilities
Други библиотеки
Други библиотеки
- rayon
Други библиотеки
Други библиотеки
- rayon
- https://crates.io/crates/rayon
- библиотека за паралелизъм по данни
Други библиотеки
- rayon
- https://crates.io/crates/rayon
- библиотека за паралелизъм по данни
- threadpool
Други библиотеки
- rayon
- https://crates.io/crates/rayon
- библиотека за паралелизъм по данни
- threadpool
- parallel iterators
Други библиотеки
- rayon
- https://crates.io/crates/rayon
- библиотека за паралелизъм по данни
- threadpool
- parallel iterators
- split/join