From e157e17d7a46e5195b2fbc2ea8ed60fa837ff38b Mon Sep 17 00:00:00 2001 From: Harald Hoyer Date: Mon, 20 Jul 2020 13:46:14 +0200 Subject: [PATCH] Change lib to handle lockless collections crossbeam_queue::SegQueue does not need a guarding RwLock. Fixes: https://github.com/haraldh/dynqueue/issues/4 --- Cargo.toml | 2 +- README.md | 16 +++-- src/lib.rs | 174 ++++++++++++++++++++++++++++++++------------------- src/tests.rs | 19 +++--- 4 files changed, 132 insertions(+), 79 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6fde620..5bbc133 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,5 +15,5 @@ keywords = [ "parallel", "performance", "thread", "join", "concurrency"] categories = [ "concurrency" ] [dependencies] -rayon = "1.3.0" +rayon = "1.3" crossbeam-queue = { version = "0.2", optional = true } diff --git a/README.md b/README.md index f12d753..931896a 100644 --- a/README.md +++ b/README.md @@ -7,17 +7,21 @@ A `DynQueue` can be iterated with `into_par_iter` producing `(DynQueueHandle, With the `DynQueueHandle` a new `T` can be inserted in the `DynQueue`, which is currently iterated over. -```rust -use dynqueue::DynQueue; +A `Vec`, `VecDeque` and `crossbeam_queue::SegQueue` (with `feature = "crossbeam-queue"`) +can be turned into a `DynQueue` with `.into_dyn_queue()`. +```rust use rayon::iter::IntoParallelIterator as _; use rayon::iter::ParallelIterator as _; +use dynqueue::IntoDynQueue as _; + fn main() { - let mut result = DynQueue::new(vec![1, 2, 3]) - .into_par_iter() - .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value }) - .collect::>(); + let mut result = vec![1, 2, 3] + .into_dyn_queue() + .into_par_iter() + .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value }) + .collect::>(); result.sort(); assert_eq!(result, vec![1, 2, 3, 4]); diff --git a/src/lib.rs b/src/lib.rs index ae8a1bc..e822ae0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,15 +7,21 @@ //! # Example //! //! ``` -//! use dynqueue::{DynQueue, DynQueueHandle}; -//! //! use rayon::iter::IntoParallelIterator as _; //! use rayon::iter::ParallelIterator as _; //! -//! let mut result = DynQueue::new(vec![1, 2, 3]) -//! .into_par_iter() -//! .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value }) -//! .collect::>(); +//! use dynqueue::IntoDynQueue as _; +//! +//! let mut result = vec![1, 2, 3] +//! .into_dyn_queue() +//! .into_par_iter() +//! .map(|(handle, value)| { +//! if value == 2 { +//! handle.enqueue(4) +//! }; +//! value +//! }) +//! .collect::>(); //! result.sort(); //! //! assert_eq!(result, vec![1, 2, 3, 4]); @@ -26,27 +32,45 @@ //! The `DynQueueHandle` shall not outlive the `DynQueue` iterator //! //! ```should_panic -//! use dynqueue::{DynQueue, DynQueueHandle}; +//! use dynqueue::{DynQueue, DynQueueHandle, IntoDynQueue}; //! //! use rayon::iter::IntoParallelIterator as _; //! use rayon::iter::ParallelIterator as _; +//! use std::sync::RwLock; //! -//! static mut STALE_HANDLE : Option>> = None; +//! static mut STALE_HANDLE: Option>>> = None; //! //! pub fn test_func() -> Vec { -//! DynQueue::new(vec![1u8, 2u8, 3u8]) +//! vec![1u8, 2u8, 3u8] +//! .into_dyn_queue() //! .into_par_iter() -//! .map(|(handle, value)| unsafe { STALE_HANDLE.replace(handle); value }) +//! .map(|(handle, value)| unsafe { +//! STALE_HANDLE.replace(handle); +//! value +//! }) //! .collect::>() //! } //! // test_func() panics //! let result = test_func(); -//! unsafe { STALE_HANDLE.as_ref().unwrap().enqueue(4); } +//! unsafe { +//! STALE_HANDLE.as_ref().unwrap().enqueue(4); +//! } //! ``` #![deny(clippy::all)] #![deny(missing_docs)] +#[allow(unused)] +macro_rules! doc_comment { + ($x:expr) => { + #[doc = $x] + #[doc(hidden)] + mod readme_tests {} + }; +} + +doc_comment!(include_str!("../README.md")); + use rayon::iter::plumbing::{ bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer, }; @@ -57,80 +81,123 @@ use std::sync::{Arc, RwLock}; #[cfg(test)] mod tests; +/// Trait to produce a new DynQueue +pub trait IntoDynQueue> { + /// new + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, U>; +} + /// Everything implementing `Queue` can be handled by DynQueue +#[allow(clippy::len_without_is_empty)] pub trait Queue where Self: Sized, { /// push an element in the queue - fn push(&mut self, v: T); + fn push(&self, v: T); /// pop an element from the queue - fn pop(&mut self) -> Option; + fn pop(&self) -> Option; /// number of elements in the queue fn len(&self) -> usize; /// split off `size` elements - fn split_off(&mut self, size: usize) -> Self; + fn split_off(&self, size: usize) -> Self; } -impl Queue for Vec { +impl IntoDynQueue>> for Vec { #[inline(always)] - fn push(&mut self, v: T) { - Vec::push(self, v) + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData))) + } +} + +impl IntoDynQueue>> for RwLock> { + #[inline(always)] + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(self, PhantomData))) + } +} + +impl Queue for RwLock> { + #[inline(always)] + fn push(&self, v: T) { + self.write().unwrap().push(v) } #[inline(always)] - fn pop(&mut self) -> Option { - Vec::pop(self) + fn pop(&self) -> Option { + self.write().unwrap().pop() } #[inline(always)] fn len(&self) -> usize { - Vec::len(self) + self.read().unwrap().len() } #[inline(always)] - fn split_off(&mut self, size: usize) -> Self { - Vec::split_off(self, size) + fn split_off(&self, size: usize) -> Self { + RwLock::new(self.write().unwrap().split_off(size)) } } -impl Queue for VecDeque { +impl IntoDynQueue>> for VecDeque { #[inline(always)] - fn push(&mut self, v: T) { - VecDeque::push_back(self, v) + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData))) + } +} + +impl IntoDynQueue>> for RwLock> { + #[inline(always)] + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(self, PhantomData))) + } +} + +impl Queue for RwLock> { + #[inline(always)] + fn push(&self, v: T) { + self.write().unwrap().push_back(v) } #[inline(always)] - fn pop(&mut self) -> Option { - VecDeque::pop_front(self) + fn pop(&self) -> Option { + self.write().unwrap().pop_front() } #[inline(always)] fn len(&self) -> usize { - VecDeque::len(self) + self.read().unwrap().len() } #[inline(always)] - fn split_off(&mut self, size: usize) -> Self { - VecDeque::split_off(self, size) + fn split_off(&self, size: usize) -> Self { + RwLock::new(self.write().unwrap().split_off(size)) } } #[cfg(feature = "crossbeam-queue")] use crossbeam_queue::SegQueue; +#[cfg(feature = "crossbeam-queue")] +impl IntoDynQueue> for SegQueue { + #[inline(always)] + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, Self> { + DynQueue(Arc::new(DynQueueInner(self, PhantomData))) + } +} + #[cfg(feature = "crossbeam-queue")] impl Queue for SegQueue { #[inline(always)] - fn push(&mut self, v: T) { + fn push(&self, v: T) { SegQueue::push(self, v); } #[inline(always)] - fn pop(&mut self) -> Option { + fn pop(&self) -> Option { SegQueue::pop(self).ok() } @@ -140,10 +207,10 @@ impl Queue for SegQueue { } #[inline(always)] - fn split_off(&mut self, size: usize) -> Self { + fn split_off(&self, size: usize) -> Self { let q = SegQueue::new(); (0..size) - .filter_map(|_| self.pop()) + .filter_map(|_| Queue::pop(self)) .for_each(|ele| q.push(ele)); q } @@ -151,7 +218,7 @@ impl Queue for SegQueue { // PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue` // but does not always. -struct DynQueueInner<'a, T, U: Queue>(std::sync::RwLock, PhantomData<&'a T>); +struct DynQueueInner<'a, T, U: Queue>(U, PhantomData<&'a T>); /// The `DynQueueHandle` returned by the iterator in addition to `T` pub struct DynQueueHandle<'a, T, U: Queue>(Arc>); @@ -160,44 +227,26 @@ impl<'a, T, U: Queue> DynQueueHandle<'a, T, U> { /// Enqueue `T` in the `DynQueue`, which is currently iterated. #[inline] pub fn enqueue(&self, job: T) { - (self.0).0.write().unwrap().push(job) + (self.0).0.push(job) } } /// The `DynQueue` which can be parallel iterated over pub struct DynQueue<'a, T, U: Queue>(Arc>); -impl<'a, T, U: Queue> DynQueue<'a, T, U> -where - T: Send + Sync, - U: Queue + Send + Sync, -{ - /// Create a new `DynQueue` from a `Vec` - #[inline] - pub fn new(lifo: U) -> Self { - Self(Arc::new(DynQueueInner(RwLock::new(lifo), PhantomData))) - } -} - impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U> where T: Send + Sync, - U: Queue + Send + Sync, + U: IntoDynQueue + Queue + Send + Sync, { type Item = (DynQueueHandle<'a, T, U>, T); fn split(self) -> (Self, Option) { - let len = { - let q = (self.0).0.read().unwrap(); - q.len() - }; + let len = (self.0).0.len(); + if len >= 2 { - let new_q = { - let mut q = (self.0).0.write().unwrap(); - let split_off = q.split_off(len / 2); - DynQueue::new(split_off) - }; - (self, Some(new_q)) + let new_q = (self.0).0.split_off(len / 2); + (self, Some(new_q.into_dyn_queue())) } else { (self, None) } @@ -209,10 +258,7 @@ where { let mut folder = folder; loop { - let ret = { - let mut q = (self.0).0.write().unwrap(); - q.pop() - }; + let ret = (self.0).0.pop(); if let Some(v) = ret { folder = folder.consume((DynQueueHandle(self.0.clone()), v)); @@ -233,7 +279,7 @@ where impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U> where T: Send + Sync, - U: Queue + Send + Sync, + U: IntoDynQueue + Queue + Send + Sync, { type Item = (DynQueueHandle<'a, T, U>, T); diff --git a/src/tests.rs b/src/tests.rs index f9f8a49..5262774 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,4 +1,4 @@ -use crate::{DynQueue, DynQueueHandle, Queue}; +use crate::{DynQueueHandle, IntoDynQueue, Queue}; use std::collections::VecDeque; const SLEEP_MS: u64 = 10; @@ -49,7 +49,7 @@ fn dynqueue_iter_test_const_sleep() { let med = expected.iter().sum::() / expected.iter().count() as u64; - let jq = DynQueue::new(get_input()); + let jq = get_input().into_dyn_queue(); let now = std::time::Instant::now(); let mut res = jq @@ -79,13 +79,13 @@ fn dynqueue_iter_test_const_sleep_segqueue() { let expected = get_expected(); let med = expected.iter().sum::() / expected.iter().count() as u64; - let q = SegQueue::new(); - get_input().drain(..).for_each(|ele| q.push(ele)); + let jq = SegQueue::new(); + get_input().drain(..).for_each(|ele| jq.push(ele)); - let jq = DynQueue::new(q); let now = std::time::Instant::now(); let mut res = jq + .into_dyn_queue() .into_par_iter() .map(handle_queue) .map(|v| { @@ -111,10 +111,11 @@ fn dynqueue_iter_test_const_sleep_vecdeque() { let med = expected.iter().sum::() / expected.iter().count() as u64; - let jq = DynQueue::new(VecDeque::from(get_input())); + let jq = VecDeque::from(get_input()); let now = std::time::Instant::now(); let mut res = jq + .into_dyn_queue() .into_par_iter() .map(handle_queue) .map(|v| { @@ -137,11 +138,12 @@ fn dynqueue_iter_test_sleep_v() { use rayon::iter::ParallelIterator as _; use std::time::Duration; - let jq = DynQueue::new(get_input()); + let jq = get_input(); let now = std::time::Instant::now(); let mut res = jq + .into_dyn_queue() .into_par_iter() .map(handle_queue) .map(|v| { @@ -161,11 +163,12 @@ fn dynqueue_iter_test_sleep_inv_v() { use rayon::iter::ParallelIterator as _; use std::time::Duration; - let jq = DynQueue::new(get_input()); + let jq = get_input(); let now = std::time::Instant::now(); let mut res = jq + .into_dyn_queue() .into_par_iter() .map(handle_queue) .map(|v| {