diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..5cde165 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +version: 2 +updates: +- package-ecosystem: cargo + directory: "/" + schedule: + interval: daily + open-pull-requests-limit: 10 diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml new file mode 100644 index 0000000..d8af1b2 --- /dev/null +++ b/.github/workflows/coverage.yml @@ -0,0 +1,29 @@ +name: coverage + +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + test: + name: coverage + runs-on: ubuntu-latest + container: + image: xd009642/tarpaulin + options: --security-opt seccomp=unconfined + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Generate code coverage + run: | + cargo tarpaulin --verbose --all-features --workspace --timeout 120 --out Lcov --output-dir coverage + + - name: Upload to coveralls + uses: coverallsapp/github-action@master + with: + github-token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6738b0b..6724628 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -13,7 +13,9 @@ jobs: steps: - uses: actions/checkout@v2 + - name: Run fmt + run: cargo fmt --all -- --check - name: Build - run: cargo build --verbose + run: cargo build --verbose --all-features - name: Run tests - run: cargo test --verbose + run: cargo test --verbose --all-features diff --git a/Cargo.toml b/Cargo.toml index c2da882..c3396da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dynqueue" -version = "0.1.2" +version = "0.3.1-alpha.0" authors = ["Harald Hoyer "] edition = "2018" @@ -15,4 +15,5 @@ keywords = [ "parallel", "performance", "thread", "join", "concurrency"] categories = [ "concurrency" ] [dependencies] -rayon = "1.3.0" +rayon = "1.3" +crossbeam-queue = { version = "0.3", optional = true } diff --git a/README.md b/README.md index f044844..3b2e2f7 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,42 @@ +[![Rust](https://github.com/haraldh/dynqueue/workflows/Rust/badge.svg)](https://github.com/haraldh/dynqueue/actions) +[![Coverage Status](https://coveralls.io/repos/github/haraldh/dynqueue/badge.svg?branch=master)](https://coveralls.io/github/haraldh/dynqueue?branch=master) + # DynQueue - dynamically extendable Rayon parallel iterator A `DynQueue` can be iterated with `into_par_iter` producing `(DynQueueHandle, T)` elements. With the `DynQueueHandle` a new `T` can be inserted in the `DynQueue`, which is currently iterated over. -```rust -use dynqueue::DynQueue; +A `Vec`, `VecDeque` and `crossbeam_queue::SegQueue` (with `feature = "crossbeam-queue"`) +can be turned into a `DynQueue` with `.into_dyn_queue()`. +```rust use rayon::iter::IntoParallelIterator as _; use rayon::iter::ParallelIterator as _; +use dynqueue::IntoDynQueue as _; + fn main() { - let mut result = DynQueue::new(vec![1, 2, 3]) - .into_par_iter() - .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value }) - .collect::>(); + let mut result = vec![1, 2, 3] + .into_dyn_queue() + .into_par_iter() + .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value }) + .collect::>(); result.sort(); assert_eq!(result, vec![1, 2, 3, 4]); } ``` + +## Features + +* `crossbeam-queue` : to use `crossbeam::queue::SegQueue` as the inner collection. + +## Changelog + +### 0.2.0 +- introduce `IntoDynQueue` +- handle lockless collections + +### 0.1.0 +- initial version diff --git a/src/lib.rs b/src/lib.rs index c59b075..733cbf9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,15 +7,21 @@ //! # Example //! //! ``` -//! use dynqueue::{DynQueue, DynQueueHandle}; -//! //! use rayon::iter::IntoParallelIterator as _; //! use rayon::iter::ParallelIterator as _; //! -//! let mut result = DynQueue::new(vec![1, 2, 3]) -//! .into_par_iter() -//! .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value }) -//! .collect::>(); +//! use dynqueue::IntoDynQueue as _; +//! +//! let mut result = vec![1, 2, 3] +//! .into_dyn_queue() +//! .into_par_iter() +//! .map(|(handle, value)| { +//! if value == 2 { +//! handle.enqueue(4) +//! }; +//! value +//! }) +//! .collect::>(); //! result.sort(); //! //! assert_eq!(result, vec![1, 2, 3, 4]); @@ -26,27 +32,45 @@ //! The `DynQueueHandle` shall not outlive the `DynQueue` iterator //! //! ```should_panic -//! use dynqueue::{DynQueue, DynQueueHandle}; +//! use dynqueue::{DynQueue, DynQueueHandle, IntoDynQueue}; //! //! use rayon::iter::IntoParallelIterator as _; //! use rayon::iter::ParallelIterator as _; +//! use std::sync::RwLock; //! -//! static mut STALE_HANDLE : Option>> = None; +//! static mut STALE_HANDLE: Option>>> = None; //! //! pub fn test_func() -> Vec { -//! DynQueue::new(vec![1u8, 2u8, 3u8]) +//! vec![1u8, 2u8, 3u8] +//! .into_dyn_queue() //! .into_par_iter() -//! .map(|(handle, value)| unsafe { STALE_HANDLE.replace(handle); value }) +//! .map(|(handle, value)| unsafe { +//! STALE_HANDLE.replace(handle); +//! value +//! }) //! .collect::>() //! } //! // test_func() panics //! let result = test_func(); -//! unsafe { STALE_HANDLE.as_ref().unwrap().enqueue(4); } +//! unsafe { +//! STALE_HANDLE.as_ref().unwrap().enqueue(4); +//! } //! ``` #![deny(clippy::all)] #![deny(missing_docs)] +#[allow(unused)] +macro_rules! doc_comment { + ($x:expr) => { + #[doc = $x] + #[doc(hidden)] + mod readme_tests {} + }; +} + +doc_comment!(include_str!("../README.md")); + use rayon::iter::plumbing::{ bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer, }; @@ -57,71 +81,144 @@ use std::sync::{Arc, RwLock}; #[cfg(test)] mod tests; +/// Trait to produce a new DynQueue +pub trait IntoDynQueue> { + /// new + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, U>; +} + /// Everything implementing `Queue` can be handled by DynQueue +#[allow(clippy::len_without_is_empty)] pub trait Queue where Self: Sized, { /// push an element in the queue - fn push(&mut self, v: T); + fn push(&self, v: T); /// pop an element from the queue - fn pop(&mut self) -> Option; + fn pop(&self) -> Option; /// number of elements in the queue fn len(&self) -> usize; /// split off `size` elements - fn split_off(&mut self, size: usize) -> Self; + fn split_off(&self, size: usize) -> Self; } -impl Queue for Vec { +impl IntoDynQueue>> for Vec { #[inline(always)] - fn push(&mut self, v: T) { - Vec::push(self, v) + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData))) + } +} + +impl IntoDynQueue>> for RwLock> { + #[inline(always)] + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(self, PhantomData))) + } +} + +impl Queue for RwLock> { + #[inline(always)] + fn push(&self, v: T) { + self.write().unwrap().push(v) } #[inline(always)] - fn pop(&mut self) -> Option { - Vec::pop(self) + fn pop(&self) -> Option { + self.write().unwrap().pop() } #[inline(always)] fn len(&self) -> usize { - Vec::len(self) + self.read().unwrap().len() } #[inline(always)] - fn split_off(&mut self, size: usize) -> Self { - Vec::split_off(self, size) + fn split_off(&self, size: usize) -> Self { + RwLock::new(self.write().unwrap().split_off(size)) } } -impl Queue for VecDeque { +impl IntoDynQueue>> for VecDeque { #[inline(always)] - fn push(&mut self, v: T) { - VecDeque::push_back(self, v) + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData))) + } +} + +impl IntoDynQueue>> for RwLock> { + #[inline(always)] + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock>> { + DynQueue(Arc::new(DynQueueInner(self, PhantomData))) + } +} + +impl Queue for RwLock> { + #[inline(always)] + fn push(&self, v: T) { + self.write().unwrap().push_back(v) } #[inline(always)] - fn pop(&mut self) -> Option { - VecDeque::pop_front(self) + fn pop(&self) -> Option { + self.write().unwrap().pop_front() } #[inline(always)] fn len(&self) -> usize { - VecDeque::len(self) + self.read().unwrap().len() } #[inline(always)] - fn split_off(&mut self, size: usize) -> Self { - VecDeque::split_off(self, size) + fn split_off(&self, size: usize) -> Self { + RwLock::new(self.write().unwrap().split_off(size)) + } +} + +#[cfg(feature = "crossbeam-queue")] +use crossbeam_queue::SegQueue; + +#[cfg(feature = "crossbeam-queue")] +impl IntoDynQueue> for SegQueue { + #[inline(always)] + fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, Self> { + DynQueue(Arc::new(DynQueueInner(self, PhantomData))) + } +} + +#[cfg(feature = "crossbeam-queue")] +impl Queue for SegQueue { + #[inline(always)] + fn push(&self, v: T) { + SegQueue::push(self, v); + } + + #[inline(always)] + fn pop(&self) -> Option { + SegQueue::pop(self) + } + + #[inline(always)] + fn len(&self) -> usize { + SegQueue::len(self) + } + + #[inline(always)] + fn split_off(&self, size: usize) -> Self { + let q = SegQueue::new(); + (0..size) + .filter_map(|_| Queue::pop(self)) + .for_each(|ele| q.push(ele)); + q } } // PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue` // but does not always. -struct DynQueueInner<'a, T, U: Queue>(std::sync::RwLock, PhantomData<&'a T>); +struct DynQueueInner<'a, T, U: Queue>(U, PhantomData<&'a T>); /// The `DynQueueHandle` returned by the iterator in addition to `T` pub struct DynQueueHandle<'a, T, U: Queue>(Arc>); @@ -130,40 +227,26 @@ impl<'a, T, U: Queue> DynQueueHandle<'a, T, U> { /// Enqueue `T` in the `DynQueue`, which is currently iterated. #[inline] pub fn enqueue(&self, job: T) { - (self.0).0.write().unwrap().push(job) + (self.0).0.push(job) } } /// The `DynQueue` which can be parallel iterated over pub struct DynQueue<'a, T, U: Queue>(Arc>); -impl<'a, T, U: Queue> DynQueue<'a, T, U> { - /// Create a new `DynQueue` from a `Vec` - #[inline] - pub fn new(lifo: U) -> Self { - Self(Arc::new(DynQueueInner(RwLock::new(lifo), PhantomData))) - } -} - impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U> where T: Send + Sync, - U: Queue + Send + Sync, + U: IntoDynQueue + Queue + Send + Sync, { type Item = (DynQueueHandle<'a, T, U>, T); fn split(self) -> (Self, Option) { - let len = { - let q = (self.0).0.read().unwrap(); - q.len() - }; + let len = (self.0).0.len(); + if len >= 2 { - let new_q = { - let mut q = (self.0).0.write().unwrap(); - let split_off = q.split_off(len / 2); - DynQueue::new(split_off) - }; - (self, Some(new_q)) + let new_q = (self.0).0.split_off(len / 2); + (self, Some(new_q.into_dyn_queue())) } else { (self, None) } @@ -175,10 +258,7 @@ where { let mut folder = folder; loop { - let ret = { - let mut q = (self.0).0.write().unwrap(); - q.pop() - }; + let ret = (self.0).0.pop(); if let Some(v) = ret { folder = folder.consume((DynQueueHandle(self.0.clone()), v)); @@ -199,7 +279,7 @@ where impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U> where T: Send + Sync, - U: Queue + Send + Sync, + U: IntoDynQueue + Queue + Send + Sync, { type Item = (DynQueueHandle<'a, T, U>, T); diff --git a/src/tests.rs b/src/tests.rs index 05be6ce..5262774 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,4 +1,4 @@ -use crate::{DynQueue, DynQueueHandle, Queue}; +use crate::{DynQueueHandle, IntoDynQueue, Queue}; use std::collections::VecDeque; const SLEEP_MS: u64 = 10; @@ -49,7 +49,7 @@ fn dynqueue_iter_test_const_sleep() { let med = expected.iter().sum::() / expected.iter().count() as u64; - let jq = DynQueue::new(get_input()); + let jq = get_input().into_dyn_queue(); let now = std::time::Instant::now(); let mut res = jq @@ -69,6 +69,39 @@ fn dynqueue_iter_test_const_sleep() { ); } +#[cfg(feature = "crossbeam-queue")] +#[test] +fn dynqueue_iter_test_const_sleep_segqueue() { + use crossbeam_queue::SegQueue; + use rayon::iter::IntoParallelIterator as _; + use rayon::iter::ParallelIterator as _; + use std::time::Duration; + let expected = get_expected(); + + let med = expected.iter().sum::() / expected.iter().count() as u64; + let jq = SegQueue::new(); + get_input().drain(..).for_each(|ele| jq.push(ele)); + + let now = std::time::Instant::now(); + + let mut res = jq + .into_dyn_queue() + .into_par_iter() + .map(handle_queue) + .map(|v| { + std::thread::sleep(Duration::from_millis(SLEEP_MS * med)); + v + }) + .collect::>(); + eprintln!("elapsed = {:#?}", now.elapsed()); + res.sort(); + assert_eq!(res, expected); + eprintln!( + "instead of = {}ms", + res.iter().count() * med as usize * SLEEP_MS as usize + ); +} + #[test] fn dynqueue_iter_test_const_sleep_vecdeque() { use rayon::iter::IntoParallelIterator as _; @@ -78,10 +111,11 @@ fn dynqueue_iter_test_const_sleep_vecdeque() { let med = expected.iter().sum::() / expected.iter().count() as u64; - let jq = DynQueue::new(VecDeque::from(get_input())); + let jq = VecDeque::from(get_input()); let now = std::time::Instant::now(); let mut res = jq + .into_dyn_queue() .into_par_iter() .map(handle_queue) .map(|v| { @@ -104,11 +138,12 @@ fn dynqueue_iter_test_sleep_v() { use rayon::iter::ParallelIterator as _; use std::time::Duration; - let jq = DynQueue::new(get_input()); + let jq = get_input(); let now = std::time::Instant::now(); let mut res = jq + .into_dyn_queue() .into_par_iter() .map(handle_queue) .map(|v| { @@ -128,11 +163,12 @@ fn dynqueue_iter_test_sleep_inv_v() { use rayon::iter::ParallelIterator as _; use std::time::Duration; - let jq = DynQueue::new(get_input()); + let jq = get_input(); let now = std::time::Instant::now(); let mut res = jq + .into_dyn_queue() .into_par_iter() .map(handle_queue) .map(|v| {