initial commit

This commit is contained in:
Harald Hoyer 2020-03-10 22:27:22 +01:00
commit 85acc0c4af
7 changed files with 464 additions and 0 deletions

212
src/lib.rs Normal file
View file

@ -0,0 +1,212 @@
//! DynQueue - dynamically extendable Rayon parallel iterator
//!
//! A `DynQueue<T>` can be iterated with `into_par_iter` producing `(DynQueueHandle, T)` elements.
//! With the `DynQueueHandle<T>` a new `T` can be inserted in the `DynQueue<T>`,
//! which is currently iterated over.
//!
//! # Example
//!
//! ```
//! use dynqueue::{DynQueue, DynQueueHandle};
//!
//! use rayon::iter::IntoParallelIterator as _;
//! use rayon::iter::ParallelIterator as _;
//!
//! let mut result = DynQueue::new(vec![1, 2, 3])
//! .into_par_iter()
//! .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
//! .collect::<Vec<_>>();
//! result.sort();
//!
//! assert_eq!(result, vec![1, 2, 3, 4]);
//! ```
//!
//! # Panics
//!
//! The `DynQueueHandle` shall not outlive the `DynQueue` iterator
//!
//! ```should_panic
//! use dynqueue::{DynQueue, DynQueueHandle};
//!
//! use rayon::iter::IntoParallelIterator as _;
//! use rayon::iter::ParallelIterator as _;
//!
//! static mut STALE_HANDLE : Option<DynQueueHandle<u8, Vec<u8>>> = None;
//!
//! pub fn test_func() -> Vec<u8> {
//! DynQueue::new(vec![1u8, 2u8, 3u8])
//! .into_par_iter()
//! .map(|(handle, value)| unsafe { STALE_HANDLE.replace(handle); value })
//! .collect::<Vec<_>>()
//! }
//! // test_func() panics
//! let result = test_func();
//! unsafe { STALE_HANDLE.as_ref().unwrap().enqueue(4); }
//! ```
#![deny(clippy::all)]
#![deny(missing_docs)]
use rayon::iter::plumbing::{
bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer,
};
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::sync::{Arc, RwLock};
#[cfg(test)]
mod tests;
/// Everything implementing `Queue` can be handled by DynQueue
pub trait Queue<T>
where
Self: Sized,
{
/// push an element in the queue
fn push(&mut self, v: T);
/// pop an element from the queue
fn pop(&mut self) -> Option<T>;
/// number of elements in the queue
fn len(&self) -> usize;
/// split off `size` elements
fn split_off(&mut self, size: usize) -> Self;
}
impl<T> Queue<T> for Vec<T> {
#[inline(always)]
fn push(&mut self, v: T) {
Vec::push(self, v)
}
#[inline(always)]
fn pop(&mut self) -> Option<T> {
Vec::pop(self)
}
#[inline(always)]
fn len(&self) -> usize {
Vec::len(self)
}
#[inline(always)]
fn split_off(&mut self, size: usize) -> Self {
Vec::split_off(self, size)
}
}
impl<T> Queue<T> for VecDeque<T> {
#[inline(always)]
fn push(&mut self, v: T) {
VecDeque::push_back(self, v)
}
#[inline(always)]
fn pop(&mut self) -> Option<T> {
VecDeque::pop_front(self)
}
#[inline(always)]
fn len(&self) -> usize {
VecDeque::len(self)
}
#[inline(always)]
fn split_off(&mut self, size: usize) -> Self {
VecDeque::split_off(self, size)
}
}
// PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue`
// but does not always.
struct DynQueueInner<'a, T, U: Queue<T>>(std::sync::RwLock<U>, PhantomData<&'a T>);
/// The `DynQueueHandle` returned by the iterator in addition to `T`
pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
impl<'a, T, U: Queue<T>> DynQueueHandle<'a, T, U> {
/// Enqueue `T` in the `DynQueue<T>`, which is currently iterated.
#[inline]
pub fn enqueue(&self, job: T) {
(self.0).0.write().unwrap().push(job)
}
}
/// The `DynQueue<T>` which can be parallel iterated over
pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
impl<'a, T, U: Queue<T>> DynQueue<'a, T, U> {
/// Create a new `DynQueue<T>` from a `Vec<T>`
#[inline]
pub fn new(lifo: U) -> Self {
Self(Arc::new(DynQueueInner(RwLock::new(lifo), PhantomData)))
}
}
impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U>
where
T: Send + Sync,
U: Queue<T> + Send + Sync,
{
type Item = (DynQueueHandle<'a, T, U>, T);
fn split(self) -> (Self, Option<Self>) {
let len = {
let q = (self.0).0.read().unwrap();
q.len()
};
if len >= 2 {
let new_q = {
let mut q = (self.0).0.write().unwrap();
let split_off = q.split_off(len / 2);
DynQueue::new(split_off)
};
(self, Some(new_q))
} else {
(self, None)
}
}
fn fold_with<F>(self, folder: F) -> F
where
F: Folder<Self::Item>,
{
let mut folder = folder;
loop {
let ret = {
let mut q = (self.0).0.write().unwrap();
q.pop()
};
if let Some(v) = ret {
folder = folder.consume((DynQueueHandle(self.0.clone()), v));
if folder.full() {
break;
}
} else {
// Self shall have the only reference
assert_eq!(Arc::strong_count(&self.0), 1, "Stale Handle");
break;
}
}
folder
}
}
impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U>
where
T: Send + Sync,
U: Queue<T> + Send + Sync,
{
type Item = (DynQueueHandle<'a, T, U>, T);
fn drive_unindexed<C>(self, consumer: C) -> <C as Consumer<Self::Item>>::Result
where
C: UnindexedConsumer<Self::Item>,
{
bridge_unindexed(self, consumer)
}
}

169
src/tests.rs Normal file
View file

@ -0,0 +1,169 @@
use crate::{DynQueue, DynQueueHandle, Queue};
use std::collections::VecDeque;
const SLEEP_MS: u64 = 10;
#[inline]
fn handle_queue<U: Queue<u64>>(t: (DynQueueHandle<u64, U>, u64)) -> u64 {
let (h, v) = t;
if v % 2 == 0 {
h.enqueue(11);
}
if v % 3 == 0 {
h.enqueue(11);
}
if v % 4 == 0 {
h.enqueue(11);
}
if v == 11 {
h.enqueue(5);
h.enqueue(17);
}
v
}
#[inline]
fn get_input() -> Vec<u64> {
vec![
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
]
}
#[inline]
fn get_expected() -> Vec<u64> {
vec![
1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 7,
8, 9, 10, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11,
11, 11, 11, 12, 13, 14, 15, 16, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17,
17, 17, 17, 17, 17, 17, 17, 17, 17, 18, 19, 20, 21,
]
}
#[test]
fn dynqueue_iter_test_const_sleep() {
use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _;
use std::time::Duration;
let expected = get_expected();
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
let jq = DynQueue::new(get_input());
let now = std::time::Instant::now();
let mut res = jq
.into_par_iter()
.map(handle_queue)
.map(|v| {
std::thread::sleep(Duration::from_millis(SLEEP_MS * med));
v
})
.collect::<Vec<_>>();
eprintln!("elapsed = {:#?}", now.elapsed());
res.sort();
assert_eq!(res, expected);
eprintln!(
"instead of = {}ms",
res.iter().count() * med as usize * SLEEP_MS as usize
);
}
#[test]
fn dynqueue_iter_test_const_sleep_vecdeque() {
use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _;
use std::time::Duration;
let expected = get_expected();
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
let jq = DynQueue::new(VecDeque::from(get_input()));
let now = std::time::Instant::now();
let mut res = jq
.into_par_iter()
.map(handle_queue)
.map(|v| {
std::thread::sleep(Duration::from_millis(SLEEP_MS * med));
v
})
.collect::<Vec<_>>();
eprintln!("elapsed = {:#?}", now.elapsed());
res.sort();
assert_eq!(res, expected);
eprintln!(
"instead of = {}ms",
res.iter().count() * med as usize * SLEEP_MS as usize
);
}
#[test]
fn dynqueue_iter_test_sleep_v() {
use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _;
use std::time::Duration;
let jq = DynQueue::new(get_input());
let now = std::time::Instant::now();
let mut res = jq
.into_par_iter()
.map(handle_queue)
.map(|v| {
std::thread::sleep(Duration::from_millis(SLEEP_MS * v));
v
})
.collect::<Vec<_>>();
eprintln!("elapsed = {:#?}", now.elapsed());
res.sort();
assert_eq!(res, get_expected());
eprintln!("instead of = {}ms", res.iter().sum::<u64>() * SLEEP_MS);
}
#[test]
fn dynqueue_iter_test_sleep_inv_v() {
use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _;
use std::time::Duration;
let jq = DynQueue::new(get_input());
let now = std::time::Instant::now();
let mut res = jq
.into_par_iter()
.map(handle_queue)
.map(|v| {
std::thread::sleep(Duration::from_millis(SLEEP_MS * (22 - v)));
v
})
.collect::<Vec<_>>();
eprintln!("elapsed = {:#?}", now.elapsed());
res.sort();
assert_eq!(res, get_expected());
eprintln!(
"instead of = {}ms",
(res.iter().count() as u64 * 22 - res.iter().sum::<u64>()) * SLEEP_MS
);
}
#[test]
fn par_iter_test() {
use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _;
use std::time::Duration;
let now = std::time::Instant::now();
let res = get_expected()
.into_par_iter()
.map(|v| {
std::thread::sleep(Duration::from_millis(SLEEP_MS * v as u64));
v
})
.collect::<Vec<_>>();
eprintln!("elapsed = {:#?}", now.elapsed());
eprintln!("instead of = {}ms", res.iter().sum::<u64>() * SLEEP_MS);
}