Threadpool, rayon

29 ноември 2018

Преговор

Преговор

Преговор

Преговор

Преговор

Преговор

Преговор

Преговор

Преговор

Threadpool

Какво е threadpool

Threadpool

Какво е threadpool

Threadpool

Какво е threadpool

Threadpool

Какво е threadpool

Threadpool

Какво е threadpool

Threadpool

Има готови имплементации на threadpool:

Но за упражнение ще си напишем минималистична имплементация

Threadpool

Целим да имаме следния интерфейс:

1 2 3 4 5 6 7 8 9 10 11 12 13 14
pub struct ThreadPool {
    /* ... */
}

impl ThreadPool {
    pub fn new() -> Self {
    }

    pub fn spawn<F: FnOnce()>(&self, task: F) {
    }

    pub fn join(self) {
    }
}
pub struct ThreadPool {
    /* ... */
}

impl ThreadPool {
    pub fn new() -> Self {
unimplemented!()
    }

    pub fn spawn(&self, task: F) {
unimplemented!()
    }

    pub fn join(self) {
unimplemented!()
    }
}
fn main() {}

Threadpool

Spawning threads

При създаване пускаме определен брой нишки. Всяка нишка ще цикли докато не получи задача.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
use std::thread::{self, JoinHandle};

pub struct ThreadPool {
    threads: Vec<JoinHandle<()>>
}

impl ThreadPool {
    pub fn new() -> Self {
        const NUM_CPUS: usize = 4;

        let threads = (0..NUM_CPUS)
            .map(|_| thread::spawn(move || {
                loop {
                    /* TODO: get tasks */
                }
            }))
            .collect::<Vec<_>>();

        ThreadPool {
            threads,
        }
    }
}
use std::thread::{self, JoinHandle};

pub struct ThreadPool {
    threads: Vec>
}

impl ThreadPool {
    pub fn new() -> Self {
        const NUM_CPUS: usize = 4;

        let threads = (0..NUM_CPUS)
            .map(|_| thread::spawn(move || {
                loop {
                    /* TODO: get tasks */
                }
            }))
            .collect::>();

        ThreadPool {
            threads,
        }
    }
}
fn main() {}

Threadpool

const NUM_CPUS = 4 не е добро решение. По-добре би било да пуснем толкова нишки, колкото ядра има процесора на машината на която вървим.

Има crate специално за това - num_cpus

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
extern crate num_cpus;


pub struct ThreadPool {
    threads: Vec<JoinHandle<()>>
}

impl ThreadPool {
    pub fn new() -> Self {
        let threads = (0..num_cpus::get())
            .map(|_| thread::spawn(move || {
                loop {
                    /* TODO: run tasks */
                }
            }))
            .collect::<Vec<_>>();

        ThreadPool {
            threads,
        }
    }
}
extern crate num_cpus;

use std::thread::{self, JoinHandle};

pub struct ThreadPool {
    threads: Vec>
}

impl ThreadPool {
    pub fn new() -> Self {
        let threads = (0..num_cpus::get())
            .map(|_| thread::spawn(move || {
                loop {
                    /* TODO: run tasks */
                }
            }))
            .collect::>();

        ThreadPool {
            threads,
        }
    }
}
fn main() {}

Threadpool

Tasks

Трябва ни начин да доставим задачите до нишките.

std::sync::channel няма да ни свърши работа, защото е MPSC (multi-producer single-consumer), а ние имаме много нишки (consumers).

Можем да използваме някоя структура от crossbeam:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
extern crate crossbeam;
use crossbeam::queue::MsQueue as Queue;

enum Task {
    Run(Box<dyn FnOnce() + Send>),
    Exit,
}

pub struct ThreadPool {
    threads: Vec<JoinHandle<()>>,
    queue: Arc<Queue<Task>>,
}

impl ThreadPool {
    pub fn new() -> Self {
        // слагаме опашката в Arc, за да я споделим между нишките
        let queue = Arc::new(Queue::new());

        let threads = (0..num_cpus::get())
            .map(|_| {
                let queue = queue.clone();

                thread::spawn(move || {
                    loop {
                        // опашката използва атомарни операции,
                        // затова `push` и `pop` методите приемат `&self`
                        match queue.pop() {
                            Task::Run(f) => f(),
                            Task::Exit => break,
                        }
                    }
                })
            })
            .collect::<Vec<_>>();

        ThreadPool {
            threads,
            queue,
        }
    }
}

Threadpool

FnBox

Ако компилираме кода ще получим грешка.

Причината е, че Box<dyn FnOnce()> не е директно използваемо, поради ограничеята за trait objects. По-точно през trait object не можем да извикаме функция, която приема self.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
enum Task {
    Run(Box<dyn FnOnce() + Send>),
    Exit,
}

pub struct ThreadPool {
    threads: Vec<JoinHandle<()>>,
    queue: Arc<Queue<Task>>,
}

impl ThreadPool {
    pub fn new() -> Self {
        // слагаме опашката в Arc, за да я споделим между нишките
        let queue = Arc::new(Queue::new());

        let threads = (0..num_cpus::get())
            .map(|_| {
                let queue = queue.clone();

                thread::spawn(move || {
                    loop {
                        // опашката използва атомарни операции,
                        // затова `push` и `pop` методите приемат `&self`
                        match queue.pop() {
                            Task::Run(f) => f(),
                            Task::Exit => break,
                        }
                    }
                })
            })
            .collect::<Vec<_>>();

        ThreadPool {
            threads,
            queue,
        }
    }
}
error[E0161]: cannot move a value of type (dyn std::ops::FnOnce() + std::marker::Send + 'static): the size of (dyn std::ops::FnOnce() + std::marker::Send + 'static) cannot be statically determined --> src/bin/main_4f0d51395a47ba50f1408c5831ee60fa5f3bac46.rs:32:45 | 32 | Task::Run(f) => f(), | ^
extern crate crossbeam;
extern crate num_cpus;

use crossbeam::queue::MsQueue as Queue;
use std::thread::{self, JoinHandle};
use std::sync::Arc;

enum Task {
    Run(Box),
    Exit,
}

pub struct ThreadPool {
    threads: Vec>,
    queue: Arc>,
}

impl ThreadPool {
    pub fn new() -> Self {
        // слагаме опашката в Arc, за да я споделим между нишките
        let queue = Arc::new(Queue::new());

        let threads = (0..num_cpus::get())
            .map(|_| {
                let queue = queue.clone();

                thread::spawn(move || {
                    loop {
                        // опашката използва атомарни операции,
                        // затова `push` и `pop` методите приемат `&self`
                        match queue.pop() {
                            Task::Run(f) => f(),
                            Task::Exit => break,
                        }
                    }
                })
            })
            .collect::>();

        ThreadPool {
            threads,
            queue,
        }
    }
}
fn main() {}

Threadpool

FnBox

Има workaround на проблема, който е да използваме trait-а FnBox, който го има само на nightly.
Ако не искаме да ползваме nightly можем да си го напишем сами - пример.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
#![feature(fnbox)]
use std::boxed::FnBox;

enum Task {
    Run(Box<dyn FnBox() + Send>),
    Exit,
}

pub struct ThreadPool {
    threads: Vec<JoinHandle<()>>,
    queue: Arc<Queue<Task>>,
}

impl ThreadPool {
    pub fn new() -> Self {
        // слагаме опашката в Arc, за да я споделим между нишките
        let queue = Arc::new(Queue::new());

        let threads = (0..num_cpus::get())
            .map(|_| {
                let queue = queue.clone();

                thread::spawn(move || {
                    loop {
                        // опашката използва атомарни операции,
                        // затова `push` и `pop` методите приемат `&self`
                        match queue.pop() {
                            Task::Run(f) => f(),
                            Task::Exit => break,
                        }
                    }
                })
            })
            .collect::<Vec<_>>();

        ThreadPool {
            threads,
            queue,
        }
    }
}
#![feature(fnbox)]

extern crate crossbeam;
extern crate num_cpus;

use crossbeam::queue::MsQueue as Queue;
use std::boxed::FnBox;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

enum Task {
    Run(Box),
    Exit,
}

pub struct ThreadPool {
    threads: Vec>,
    queue: Arc>,
}

impl ThreadPool {
    pub fn new() -> Self {
        // слагаме опашката в Arc, за да я споделим между нишките
        let queue = Arc::new(Queue::new());

        let threads = (0..num_cpus::get())
            .map(|_| {
                let queue = queue.clone();

                thread::spawn(move || {
                    loop {
                        // опашката използва атомарни операции,
                        // затова `push` и `pop` методите приемат `&self`
                        match queue.pop() {
                            Task::Run(f) => f(),
                            Task::Exit => break,
                        }
                    }
                })
            })
            .collect::>();

        ThreadPool {
            threads,
            queue,
        }
    }
}
fn main() {}

Threadpool

За да добавим задача просто я добавяме в опашката.

Понеже ThreadPool имплементира Sync и spawn приема &self, можем да си споделим thread pool-a, например като го сложим в Arc, и да добавяме задачи от няколко нишки.

1 2 3 4 5 6 7 8
impl ThreadPool {
    pub fn spawn<F>(&self, task: F)
    where
        F: FnOnce() + Send + 'static
    {
        self.queue.push(Task::Run(Box::new(task)))
    }
}
#![feature(fnbox)]

extern crate crossbeam;
extern crate num_cpus;

use crossbeam::queue::MsQueue as Queue;
use std::boxed::FnBox;
use std::sync::Arc;
use std::thread::JoinHandle;

enum Task {
Run(Box),
Exit,
}

pub struct ThreadPool {
threads: Vec>,
queue: Arc>,
}

impl ThreadPool {
    pub fn spawn(&self, task: F)
    where
        F: FnOnce() + Send + 'static
    {
        self.queue.push(Task::Run(Box::new(task)))
    }
}
fn main() {}

Threadpool

Добавяме си и функция join, която блокира докато всички задачи не бъдат изпълнени.

1 2 3 4 5 6 7 8 9 10 11
impl ThreadPool {
    pub fn join(self) {
        for _ in &self.threads {
            self.queue.push(Task::Exit);
        }

        for t in self.threads {
            let _ = t.join();
        }
    }
}
#![feature(fnbox)]

extern crate crossbeam;
extern crate num_cpus;

use crossbeam::queue::MsQueue as Queue;
use std::boxed::FnBox;
use std::sync::Arc;
use std::thread::JoinHandle;

enum Task {
Run(Box),
Exit,
}

pub struct ThreadPool {
threads: Vec>,
queue: Arc>,
}


impl ThreadPool {
    pub fn join(self) {
        for _ in &self.threads {
            self.queue.push(Task::Exit);
        }

        for t in self.threads {
            let _ = t.join();
        }
    }
}
fn main() {}

Threadpool

Целият код:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
#![feature(fnbox)]

extern crate crossbeam;
extern crate num_cpus;

use crossbeam::queue::MsQueue as Queue;
use std::boxed::FnBox;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

enum Task {
    Run(Box<FnBox() + Send>),
    Exit,
}

struct ThreadPool {
    threads: Vec<JoinHandle<()>>,
    queue: Arc<Queue<Task>>,
}

impl ThreadPool {
    pub fn new() -> Self {
        let queue = Arc::new(Queue::new());

        let threads = (0..num_cpus::get())
            .map(|_| {
                let queue = queue.clone();

                thread::spawn(move || loop {
                    match queue.pop() {
                        Task::Run(f) => f(),
                        Task::Exit => break,
                    }
                })
            })
            .collect::<Vec<_>>();

        ThreadPool {
            threads,
            queue,
        }
    }

    pub fn spawn<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.queue.push(Task::Run(Box::new(f)));
    }

    pub fn join(self) {
        for _ in &self.threads {
            self.queue.push(Task::Exit);
        }

        for t in self.threads {
            let _ = t.join();
        }
    }
}
#![feature(fnbox)]

extern crate crossbeam;
extern crate num_cpus;

use crossbeam::queue::MsQueue as Queue;
use std::boxed::FnBox;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

enum Task {
    Run(Box),
    Exit,
}

struct ThreadPool {
    threads: Vec>,
    queue: Arc>,
}

impl ThreadPool {
    pub fn new() -> Self {
        let queue = Arc::new(Queue::new());

        let threads = (0..num_cpus::get())
            .map(|_| {
                let queue = queue.clone();

                thread::spawn(move || loop {
                    match queue.pop() {
                        Task::Run(f) => f(),
                        Task::Exit => break,
                    }
                })
            })
            .collect::>();

        ThreadPool {
            threads,
            queue,
        }
    }

    pub fn spawn(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.queue.push(Task::Run(Box::new(f)));
    }

    pub fn join(self) {
        for _ in &self.threads {
            self.queue.push(Task::Exit);
        }

        for t in self.threads {
            let _ = t.join();
        }
    }
}
fn main() {}

Threadpool

И това е. Разбира се в готовите библиотеки има доста допълнителни функционалностии, които не сме покрили:

Rayon

par_graph

За пример за използване на rayon ще си направим програма, която генерира граф.

Върховете на графа ще се бележат с числа 0..n. Графа ще бъде записан в списъци на съседство.
Например следната структура описва графа по-отдолу:

1 2 3 4 5
let lists = [
    [1, 2],     // съседи на връх 0, т.е. графът съдържа ребра (0, 1) и (0, 2)
    [2],        // съседи на връх 1, т.е. графът съдържа ребро (1, 2)
    [],         // съсъде на връх 2
];

Graph

Базови дефиниции

1 2 3 4 5 6 7 8 9 10 11 12 13 14
#[derive(Debug, Clone)]
pub struct AdjLists {
    n_verts: usize,
    lists: Vec<Vec<usize>>,
}

impl AdjLists {
    pub fn new(n_verts: usize) -> Self {
        AdjLists {
            n_verts,
            lists: vec![vec![]; n_verts],
        }
    }
}
#[derive(Debug, Clone)]
pub struct AdjLists {
    n_verts: usize,
    lists: Vec>,
}

impl AdjLists {
    pub fn new(n_verts: usize) -> Self {
        AdjLists {
            n_verts,
            lists: vec![vec![]; n_verts],
        }
    }
}
fn main() {}

Базови дефиниции

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
extern crate rand;
use rand::distributions::Uniform;
use rand::prelude::*;

/// Помощна структура, обхващаща работата
/// необходима за генериране на част от графа
pub struct Job<'a> {
    pub from_verts: Range<usize>,
    pub to_verts: Range<usize>,
    pub n_edges: usize,
    pub lists: &'a mut [Vec<usize>],
}

impl<'a> Job<'a> {
    pub fn gen(&mut self) {
        let mut added = 0;
        let mut rng = rand::thread_rng();

        let from_range = Uniform::new(self.from_verts.start, self.from_verts.end);
        let to_range = Uniform::new(self.to_verts.start, self.to_verts.end);

        while added < self.n_edges {
            let from = from_range.sample(&mut rng);
            let to = to_range.sample(&mut rng);

            if from != to && self.list_at(from).iter().find(|&&e| e == to).is_none() {
                self.list_at(from).push(to);
                added += 1;
            }
        }
    }

    fn list_at(&mut self, v: usize) -> &mut Vec<usize> {
        &mut self.lists[v - self.from_verts.start]
    }
}
extern crate rand;
use rand::distributions::Uniform;
use rand::prelude::*;
use std::ops::Range;

/// Помощна структура, обхващаща работата
/// необходима за генериране на част от графа
pub struct Job<'a> {
    pub from_verts: Range,
    pub to_verts: Range,
    pub n_edges: usize,
    pub lists: &'a mut [Vec],
}

impl<'a> Job<'a> {
    pub fn gen(&mut self) {
        let mut added = 0;
        let mut rng = rand::thread_rng();

        let from_range = Uniform::new(self.from_verts.start, self.from_verts.end);
        let to_range = Uniform::new(self.to_verts.start, self.to_verts.end);

        while added < self.n_edges {
            let from = from_range.sample(&mut rng);
            let to = to_range.sample(&mut rng);

            if from != to && self.list_at(from).iter().find(|&&e| e == to).is_none() {
                self.list_at(from).push(to);
                added += 1;
            }
        }
    }

    fn list_at(&mut self, v: usize) -> &mut Vec {
        &mut self.lists[v - self.from_verts.start]
    }
}
fn main() {}

Еднонишкова версия

За да има нещо с което да сравняваме.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
impl AdjLists {
    pub fn gen_seq(n_verts: usize, n_edges: usize) -> Self {
        let mut graph = AdjLists::new(n_verts);

        {
            let mut job = Job {
                from_verts: 0..n_verts,
                to_verts: 0..n_verts,
                n_edges: n_edges,
                lists: &mut graph.lists,
            };

            job.gen();
        }

        graph
    }
}
use std::ops::Range;

pub struct Job<'a> {
pub from_verts: Range,
pub to_verts: Range,
pub n_edges: usize,
pub lists: &'a mut [Vec],
}

impl<'a> Job<'a> {
pub fn gen(&mut self) {
unimplemented!()
}
}

#[derive(Debug, Clone)]
pub struct AdjLists {
n_verts: usize,
lists: Vec>,
}

impl AdjLists {
pub fn new(n_verts: usize) -> Self {
unimplemented!()
}

    pub fn gen_seq(n_verts: usize, n_edges: usize) -> Self {
        let mut graph = AdjLists::new(n_verts);

        {
            let mut job = Job {
                from_verts: 0..n_verts,
                to_verts: 0..n_verts,
                n_edges: n_edges,
                lists: &mut graph.lists,
            };

            job.gen();
        }

        graph
    }
}
fn main() {}

Многонишкова версия с ръчна паралелизация

Ръчно разделяме графа на N еднакви части, по една за всяка нишка, и ги пускаме като задачи в threadpool-а на rayon.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
extern crate rayon;

impl AdjLists {
    pub fn gen_manual(n_verts: usize, n_edges: usize, n_threads: usize) -> Self {
        let mut graph = AdjLists::new(n_verts);

        {
            let mut state = graph.lists.as_mut_slice();

            // добавяме задача към глобалния thread pool
            // функцията блокира докато всички задача пуснати вътре не приключат
            rayon::scope(|scope| {
                (0..n_threads).map(move |i| {
                    let from_verts = subrange(0..n_verts, i, n_threads);

                    // `mem::replace` to "trick" the borrow checker
                    let tmp = mem::replace(&mut state, &mut []);
                    let (lists, next) = tmp.split_at_mut(from_verts.len());
                    state = next;

                    let n_edges = subrange(0..n_edges, i, n_threads).len();

                    Job {
                        from_verts,
                        to_verts: 0..n_verts,
                        n_edges,
                        lists,
                    }
                })
                .for_each(|mut job| {
                    // добавяме задача в глобалния thread pool на rayon
                    scope.spawn(move |_| job.gen())
                });
            });
        }

        graph
    }
}

/// Разделя `range` на `n_threads` равни части и връща `t`-тата част
fn subrange(range: Range<usize>, t: usize, n_threads: usize) -> Range<usize> {
    let from = t as f32 / n_threads as f32;
    let to = (t + 1) as f32 / n_threads as f32;

    Range {
        start: (range.start as f32 + from * range.len() as f32).floor() as usize,
        end: (range.start as f32 + to * range.len() as f32).floor() as usize,
    }
}
extern crate rayon;
use std::ops::Range;
use std::mem;

pub struct Job<'a> {
pub from_verts: Range,
pub to_verts: Range,
pub n_edges: usize,
pub lists: &'a mut [Vec],
}

impl<'a> Job<'a> {
pub fn gen(&mut self) {
unimplemented!()
}
}

#[derive(Debug, Clone)]
pub struct AdjLists {
n_verts: usize,
lists: Vec>,
}

impl AdjLists {
pub fn new(n_verts: usize) -> Self {
unimplemented!()
}

    pub fn gen_manual(n_verts: usize, n_edges: usize, n_threads: usize) -> Self {
        let mut graph = AdjLists::new(n_verts);

        {
            let mut state = graph.lists.as_mut_slice();

            // добавяме задача към глобалния thread pool
            // функцията блокира докато всички задача пуснати вътре не приключат
            rayon::scope(|scope| {
                (0..n_threads).map(move |i| {
                    let from_verts = subrange(0..n_verts, i, n_threads);

                    // `mem::replace` to "trick" the borrow checker
                    let tmp = mem::replace(&mut state, &mut []);
                    let (lists, next) = tmp.split_at_mut(from_verts.len());
                    state = next;

                    let n_edges = subrange(0..n_edges, i, n_threads).len();

                    Job {
                        from_verts,
                        to_verts: 0..n_verts,
                        n_edges,
                        lists,
                    }
                })
                .for_each(|mut job| {
                    // добавяме задача в глобалния thread pool на rayon
                    scope.spawn(move |_| job.gen())
                });
            });
        }

        graph
    }
}

/// Разделя `range` на `n_threads` равни части и връща `t`-тата част
fn subrange(range: Range, t: usize, n_threads: usize) -> Range {
    let from = t as f32 / n_threads as f32;
    let to = (t + 1) as f32 / n_threads as f32;

    Range {
        start: (range.start as f32 + from * range.len() as f32).floor() as usize,
        end: (range.start as f32 + to * range.len() as f32).floor() as usize,
    }
}
fn main() {}

Многонишкова версия с автоматична паралелизация

Разделяме графа на много парчета от по CHUNK_SIZE върха и оставяме rayon да се оправи с паралелизацията

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
extern crate rayon;
use rayon::prelude::*;

impl AdjLists {
    pub fn gen_par_iter(n_verts: usize, n_edges: usize) -> Self {
        let mut graph = AdjLists::new(n_verts);

        const CHUNK_SIZE: usize = 128;

        graph.lists
            .par_chunks_mut(CHUNK_SIZE)
            .enumerate()
            .map(move |(i, lists)| {
                let start = i * CHUNK_SIZE;
                let end = start + lists.len();

                let from_verts = start..end;
                let to_verts = 0..n_verts;
                let n_edges = edge_count(from_verts.clone(), n_verts, n_edges);

                Job {
                    from_verts,
                    to_verts,
                    n_edges,
                    lists,
                }
            })
            .for_each(|mut job| job.gen());

        graph
    }
}

/// Брой ребра които трябва да се генерират
fn edge_count(from_verts: Range<usize>, n_verts: usize, n_edges: usize) -> usize {
    let from = from_verts.start as f64 / n_verts as f64;
    let to = from_verts.end as f64 / n_verts as f64;

    (to * n_edges as f64).floor() as usize - (from * n_edges as f64).floor() as usize
}
extern crate rayon;
use rayon::prelude::*;
use std::ops::Range;

pub struct Job<'a> {
pub from_verts: Range,
pub to_verts: Range,
pub n_edges: usize,
pub lists: &'a mut [Vec],
}

impl<'a> Job<'a> {
pub fn gen(&mut self) {
unimplemented!()
}
}

#[derive(Debug, Clone)]
pub struct AdjLists {
n_verts: usize,
lists: Vec>,
}

impl AdjLists {
pub fn new(n_verts: usize) -> Self {
unimplemented!()
}

    pub fn gen_par_iter(n_verts: usize, n_edges: usize) -> Self {
        let mut graph = AdjLists::new(n_verts);

        const CHUNK_SIZE: usize = 128;

        graph.lists
            .par_chunks_mut(CHUNK_SIZE)
            .enumerate()
            .map(move |(i, lists)| {
                let start = i * CHUNK_SIZE;
                let end = start + lists.len();

                let from_verts = start..end;
                let to_verts = 0..n_verts;
                let n_edges = edge_count(from_verts.clone(), n_verts, n_edges);

                Job {
                    from_verts,
                    to_verts,
                    n_edges,
                    lists,
                }
            })
            .for_each(|mut job| job.gen());

        graph
    }
}

/// Брой ребра които трябва да се генерират
fn edge_count(from_verts: Range, n_verts: usize, n_edges: usize) -> usize {
    let from = from_verts.start as f64 / n_verts as f64;
    let to = from_verts.end as f64 / n_verts as f64;

    (to * n_edges as f64).floor() as usize - (from * n_edges as f64).floor() as usize
}
fn main() {}

Benchmarking

├── Cargo.lock
├── Cargo.toml
├── benches
│   └── graph.rs
└── src
    └── lib.rs

Benchmarking

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#![feature(test)]

// Трябва да добавим проекта ни като към външен пакет.
extern crate par_graph;

// `#![feature(test)]` ни дава достъп до пакета `test`.
extern crate test;

use test::Bencher;

#[bench]
fn benchmark(bencher: &mut Bencher) {
    // Можем да настроим начално състояние за теста преди да извикаме
    // `bencher.iter()` и то няма да се брои към засеченото време.

    bencher.iter(|| {
        // На `bencher.iter()` се подава closure,
        // който ще бъде изпълнен множество пъти от библиотеката
        // и ще бъде засечено времето за всяко едно изпълнение.

        // На края се показва статистика за засечените времена.
    });
}

Benchmarking

Примерен тест за паралелния граф

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Тестваме генериране с паралелни итератори
// с 4 нишки, 4_000 върха и 400_000 ребра
#[bench]
fn par_iter_t4_n4k_m400k(bencher: &mut Bencher) {
    // Ръчно инициализираме rayon thread pool с 4 нишки
    let thread_pool = ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();

    // Всички rayon операции извикани от closure-а аргумента на `install`
    // ще използват ръчно инициализирания thread pool вместо глобалния.
    thread_pool.install(|| {
        bencher.iter(|| AdjLists::gen_par_iter(4_000, 400_000, None))
    });
}
#![feature(test)]
#![allow(unused_imports)]
extern crate test;
use test::Bencher;

extern crate rayon;
use rayon::prelude::*;
use std::ops::Range;

pub struct Job<'a> {
pub from_verts: Range,
pub to_verts: Range,
pub n_edges: usize,
pub lists: &'a mut [Vec],
}

impl<'a> Job<'a> {
pub fn gen(&mut self) {
unimplemented!()
}
}

#[derive(Debug, Clone)]
pub struct AdjLists {
n_verts: usize,
lists: Vec>,
}

impl AdjLists {
pub fn new(n_verts: usize) -> Self {
unimplemented!()
}

pub fn gen_par_iter(n_verts: usize, n_edges: usize) -> Self {
let mut graph = AdjLists::new(n_verts);

const CHUNK_SIZE: usize = 128;

graph.lists
.par_chunks_mut(CHUNK_SIZE)
.enumerate()
.map(move |(i, lists)| {
let start = i * CHUNK_SIZE;
let end = start + lists.len();

let from_verts = start..end;
let to_verts = 0..n_verts;
let n_edges = edge_count(from_verts.clone(), n_verts, n_edges);

Job {
from_verts,
to_verts,
n_edges,
lists,
}
})
.for_each(|mut job| job.gen());

graph
}
}

/// Брой ребра които трябва да се генерират
fn edge_count(from_verts: Range, n_verts: usize, n_edges: usize) -> usize {
let from = from_verts.start as f64 / n_verts as f64;
let to = from_verts.end as f64 / n_verts as f64;

(to * n_edges as f64).floor() as usize - (from * n_edges as f64).floor() as usize
}
fn main() {}

// Тестваме генериране с паралелни итератори
// с 4 нишки, 4_000 върха и 400_000 ребра
#[bench]
fn par_iter_t4_n4k_m400k(bencher: &mut Bencher) {
    // Ръчно инициализираме rayon thread pool с 4 нишки
    let thread_pool = ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();

    // Всички rayon операции извикани от closure-а аргумента на `install`
    // ще използват ръчно инициализирания thread pool вместо глобалния.
    thread_pool.install(|| {
        bencher.iter(|| AdjLists::gen_par_iter(4_000, 400_000, None))
    });
}

Benchmarking

Изпълнение: cargo +nightly bench

test manual_t1_n4k_m400k ... bench:  49,611,783 ns/iter (+/- 20,393,860)
test manual_t2_n4k_m400k ... bench:  32,891,567 ns/iter (+/- 7,411,710)
test manual_t4_n4k_m400k ... bench:  16,658,675 ns/iter (+/- 1,670,698)
test manual_t8_n4k_m400k ... bench:  14,196,021 ns/iter (+/- 5,979,191)
test par_iter_t1_n4k_m400k  ... bench:  30,916,100 ns/iter (+/- 2,442,593)
test par_iter_t2_n4k_m400k  ... bench:  16,075,870 ns/iter (+/- 4,243,729)
test par_iter_t4_n4k_m400k  ... bench:  13,176,011 ns/iter (+/- 2,134,242)
test par_iter_t8_n4k_m400k  ... bench:  14,355,875 ns/iter (+/- 3,413,343)
test seq_n4k_m400k       ... bench:  49,739,925 ns/iter (+/- 22,063,050)

Ресурси

Стара глава от rust книгата свързана с benchmarking:

Кодът за паралелно генериране на граф е взет от следния проект:

Въпроси