Compare commits

..

No commits in common. "master" and "v0.1.3" have entirely different histories.

7 changed files with 85 additions and 162 deletions

View file

@ -1,7 +0,0 @@
version: 2
updates:
- package-ecosystem: cargo
directory: "/"
schedule:
interval: daily
open-pull-requests-limit: 10

View file

@ -1,13 +1,6 @@
name: coverage name: coverage
on: on: [ "push" , "pull_request" ]
push:
branches:
- master
pull_request:
branches:
- master
jobs: jobs:
test: test:
name: coverage name: coverage

View file

@ -13,9 +13,7 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Run fmt
run: cargo fmt --all -- --check
- name: Build - name: Build
run: cargo build --verbose --all-features run: cargo build --verbose
- name: Run tests - name: Run tests
run: cargo test --verbose --all-features run: cargo test --verbose

View file

@ -1,6 +1,6 @@
[package] [package]
name = "dynqueue" name = "dynqueue"
version = "0.3.1-alpha.0" version = "0.1.3"
authors = ["Harald Hoyer <harald@redhat.com>"] authors = ["Harald Hoyer <harald@redhat.com>"]
edition = "2018" edition = "2018"
@ -15,5 +15,5 @@ keywords = [ "parallel", "performance", "thread", "join", "concurrency"]
categories = [ "concurrency" ] categories = [ "concurrency" ]
[dependencies] [dependencies]
rayon = "1.3" rayon = "1.3.0"
crossbeam-queue = { version = "0.3", optional = true } crossbeam-queue = { version = "0.2", optional = true }

View file

@ -7,21 +7,17 @@ A `DynQueue<T>` can be iterated with `into_par_iter` producing `(DynQueueHandle,
With the `DynQueueHandle<T>` a new `T` can be inserted in the `DynQueue<T>`, With the `DynQueueHandle<T>` a new `T` can be inserted in the `DynQueue<T>`,
which is currently iterated over. which is currently iterated over.
A `Vec<T>`, `VecDeque<T>` and `crossbeam_queue::SegQueue<T>` (with `feature = "crossbeam-queue"`)
can be turned into a `DynQueue<T>` with `.into_dyn_queue()`.
```rust ```rust
use dynqueue::DynQueue;
use rayon::iter::IntoParallelIterator as _; use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _; use rayon::iter::ParallelIterator as _;
use dynqueue::IntoDynQueue as _;
fn main() { fn main() {
let mut result = vec![1, 2, 3] let mut result = DynQueue::new(vec![1, 2, 3])
.into_dyn_queue() .into_par_iter()
.into_par_iter() .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
.map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value }) .collect::<Vec<_>>();
.collect::<Vec<_>>();
result.sort(); result.sort();
assert_eq!(result, vec![1, 2, 3, 4]); assert_eq!(result, vec![1, 2, 3, 4]);
@ -32,11 +28,3 @@ fn main() {
* `crossbeam-queue` : to use `crossbeam::queue::SegQueue` as the inner collection. * `crossbeam-queue` : to use `crossbeam::queue::SegQueue` as the inner collection.
## Changelog
### 0.2.0
- introduce `IntoDynQueue`
- handle lockless collections
### 0.1.0
- initial version

View file

@ -7,21 +7,15 @@
//! # Example //! # Example
//! //!
//! ``` //! ```
//! use dynqueue::{DynQueue, DynQueueHandle};
//!
//! use rayon::iter::IntoParallelIterator as _; //! use rayon::iter::IntoParallelIterator as _;
//! use rayon::iter::ParallelIterator as _; //! use rayon::iter::ParallelIterator as _;
//! //!
//! use dynqueue::IntoDynQueue as _; //! let mut result = DynQueue::new(vec![1, 2, 3])
//! //! .into_par_iter()
//! let mut result = vec![1, 2, 3] //! .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
//! .into_dyn_queue() //! .collect::<Vec<_>>();
//! .into_par_iter()
//! .map(|(handle, value)| {
//! if value == 2 {
//! handle.enqueue(4)
//! };
//! value
//! })
//! .collect::<Vec<_>>();
//! result.sort(); //! result.sort();
//! //!
//! assert_eq!(result, vec![1, 2, 3, 4]); //! assert_eq!(result, vec![1, 2, 3, 4]);
@ -32,45 +26,27 @@
//! The `DynQueueHandle` shall not outlive the `DynQueue` iterator //! The `DynQueueHandle` shall not outlive the `DynQueue` iterator
//! //!
//! ```should_panic //! ```should_panic
//! use dynqueue::{DynQueue, DynQueueHandle, IntoDynQueue}; //! use dynqueue::{DynQueue, DynQueueHandle};
//! //!
//! use rayon::iter::IntoParallelIterator as _; //! use rayon::iter::IntoParallelIterator as _;
//! use rayon::iter::ParallelIterator as _; //! use rayon::iter::ParallelIterator as _;
//! use std::sync::RwLock;
//! //!
//! static mut STALE_HANDLE: Option<DynQueueHandle<u8, RwLock<Vec<u8>>>> = None; //! static mut STALE_HANDLE : Option<DynQueueHandle<u8, Vec<u8>>> = None;
//! //!
//! pub fn test_func() -> Vec<u8> { //! pub fn test_func() -> Vec<u8> {
//! vec![1u8, 2u8, 3u8] //! DynQueue::new(vec![1u8, 2u8, 3u8])
//! .into_dyn_queue()
//! .into_par_iter() //! .into_par_iter()
//! .map(|(handle, value)| unsafe { //! .map(|(handle, value)| unsafe { STALE_HANDLE.replace(handle); value })
//! STALE_HANDLE.replace(handle);
//! value
//! })
//! .collect::<Vec<_>>() //! .collect::<Vec<_>>()
//! } //! }
//! // test_func() panics //! // test_func() panics
//! let result = test_func(); //! let result = test_func();
//! unsafe { //! unsafe { STALE_HANDLE.as_ref().unwrap().enqueue(4); }
//! STALE_HANDLE.as_ref().unwrap().enqueue(4);
//! }
//! ``` //! ```
#![deny(clippy::all)] #![deny(clippy::all)]
#![deny(missing_docs)] #![deny(missing_docs)]
#[allow(unused)]
macro_rules! doc_comment {
($x:expr) => {
#[doc = $x]
#[doc(hidden)]
mod readme_tests {}
};
}
doc_comment!(include_str!("../README.md"));
use rayon::iter::plumbing::{ use rayon::iter::plumbing::{
bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer, bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer,
}; };
@ -81,124 +57,81 @@ use std::sync::{Arc, RwLock};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// Trait to produce a new DynQueue
pub trait IntoDynQueue<T, U: Queue<T>> {
/// new
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, U>;
}
/// Everything implementing `Queue` can be handled by DynQueue /// Everything implementing `Queue` can be handled by DynQueue
#[allow(clippy::len_without_is_empty)]
pub trait Queue<T> pub trait Queue<T>
where where
Self: Sized, Self: Sized,
{ {
/// push an element in the queue /// push an element in the queue
fn push(&self, v: T); fn push(&mut self, v: T);
/// pop an element from the queue /// pop an element from the queue
fn pop(&self) -> Option<T>; fn pop(&mut self) -> Option<T>;
/// number of elements in the queue /// number of elements in the queue
fn len(&self) -> usize; fn len(&self) -> usize;
/// split off `size` elements /// split off `size` elements
fn split_off(&self, size: usize) -> Self; fn split_off(&mut self, size: usize) -> Self;
} }
impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for Vec<T> { impl<T> Queue<T> for Vec<T> {
#[inline(always)] #[inline(always)]
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> { fn push(&mut self, v: T) {
DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData))) Vec::push(self, v)
}
}
impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for RwLock<Vec<T>> {
#[inline(always)]
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
}
}
impl<T> Queue<T> for RwLock<Vec<T>> {
#[inline(always)]
fn push(&self, v: T) {
self.write().unwrap().push(v)
} }
#[inline(always)] #[inline(always)]
fn pop(&self) -> Option<T> { fn pop(&mut self) -> Option<T> {
self.write().unwrap().pop() Vec::pop(self)
} }
#[inline(always)] #[inline(always)]
fn len(&self) -> usize { fn len(&self) -> usize {
self.read().unwrap().len() Vec::len(self)
} }
#[inline(always)] #[inline(always)]
fn split_off(&self, size: usize) -> Self { fn split_off(&mut self, size: usize) -> Self {
RwLock::new(self.write().unwrap().split_off(size)) Vec::split_off(self, size)
} }
} }
impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for VecDeque<T> { impl<T> Queue<T> for VecDeque<T> {
#[inline(always)] #[inline(always)]
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> { fn push(&mut self, v: T) {
DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData))) VecDeque::push_back(self, v)
}
}
impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for RwLock<VecDeque<T>> {
#[inline(always)]
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
}
}
impl<T> Queue<T> for RwLock<VecDeque<T>> {
#[inline(always)]
fn push(&self, v: T) {
self.write().unwrap().push_back(v)
} }
#[inline(always)] #[inline(always)]
fn pop(&self) -> Option<T> { fn pop(&mut self) -> Option<T> {
self.write().unwrap().pop_front() VecDeque::pop_front(self)
} }
#[inline(always)] #[inline(always)]
fn len(&self) -> usize { fn len(&self) -> usize {
self.read().unwrap().len() VecDeque::len(self)
} }
#[inline(always)] #[inline(always)]
fn split_off(&self, size: usize) -> Self { fn split_off(&mut self, size: usize) -> Self {
RwLock::new(self.write().unwrap().split_off(size)) VecDeque::split_off(self, size)
} }
} }
#[cfg(feature = "crossbeam-queue")] #[cfg(feature = "crossbeam-queue")]
use crossbeam_queue::SegQueue; use crossbeam_queue::SegQueue;
#[cfg(feature = "crossbeam-queue")]
impl<T> IntoDynQueue<T, SegQueue<T>> for SegQueue<T> {
#[inline(always)]
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, Self> {
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
}
}
#[cfg(feature = "crossbeam-queue")] #[cfg(feature = "crossbeam-queue")]
impl<T> Queue<T> for SegQueue<T> { impl<T> Queue<T> for SegQueue<T> {
#[inline(always)] #[inline(always)]
fn push(&self, v: T) { fn push(&mut self, v: T) {
SegQueue::push(self, v); SegQueue::push(self, v);
} }
#[inline(always)] #[inline(always)]
fn pop(&self) -> Option<T> { fn pop(&mut self) -> Option<T> {
SegQueue::pop(self) SegQueue::pop(self).ok()
} }
#[inline(always)] #[inline(always)]
@ -207,10 +140,10 @@ impl<T> Queue<T> for SegQueue<T> {
} }
#[inline(always)] #[inline(always)]
fn split_off(&self, size: usize) -> Self { fn split_off(&mut self, size: usize) -> Self {
let q = SegQueue::new(); let q = SegQueue::new();
(0..size) (0..size)
.filter_map(|_| Queue::pop(self)) .filter_map(|_| self.pop())
.for_each(|ele| q.push(ele)); .for_each(|ele| q.push(ele));
q q
} }
@ -218,7 +151,7 @@ impl<T> Queue<T> for SegQueue<T> {
// PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue` // PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue`
// but does not always. // but does not always.
struct DynQueueInner<'a, T, U: Queue<T>>(U, PhantomData<&'a T>); struct DynQueueInner<'a, T, U: Queue<T>>(std::sync::RwLock<U>, PhantomData<&'a T>);
/// The `DynQueueHandle` returned by the iterator in addition to `T` /// The `DynQueueHandle` returned by the iterator in addition to `T`
pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>); pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
@ -227,26 +160,44 @@ impl<'a, T, U: Queue<T>> DynQueueHandle<'a, T, U> {
/// Enqueue `T` in the `DynQueue<T>`, which is currently iterated. /// Enqueue `T` in the `DynQueue<T>`, which is currently iterated.
#[inline] #[inline]
pub fn enqueue(&self, job: T) { pub fn enqueue(&self, job: T) {
(self.0).0.push(job) (self.0).0.write().unwrap().push(job)
} }
} }
/// The `DynQueue<T>` which can be parallel iterated over /// The `DynQueue<T>` which can be parallel iterated over
pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>); pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
impl<'a, T, U: Queue<T>> DynQueue<'a, T, U>
where
T: Send + Sync,
U: Queue<T> + Send + Sync,
{
/// Create a new `DynQueue<T>` from a `Vec<T>`
#[inline]
pub fn new(lifo: U) -> Self {
Self(Arc::new(DynQueueInner(RwLock::new(lifo), PhantomData)))
}
}
impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U> impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U>
where where
T: Send + Sync, T: Send + Sync,
U: IntoDynQueue<T, U> + Queue<T> + Send + Sync, U: Queue<T> + Send + Sync,
{ {
type Item = (DynQueueHandle<'a, T, U>, T); type Item = (DynQueueHandle<'a, T, U>, T);
fn split(self) -> (Self, Option<Self>) { fn split(self) -> (Self, Option<Self>) {
let len = (self.0).0.len(); let len = {
let q = (self.0).0.read().unwrap();
q.len()
};
if len >= 2 { if len >= 2 {
let new_q = (self.0).0.split_off(len / 2); let new_q = {
(self, Some(new_q.into_dyn_queue())) let mut q = (self.0).0.write().unwrap();
let split_off = q.split_off(len / 2);
DynQueue::new(split_off)
};
(self, Some(new_q))
} else { } else {
(self, None) (self, None)
} }
@ -258,7 +209,10 @@ where
{ {
let mut folder = folder; let mut folder = folder;
loop { loop {
let ret = (self.0).0.pop(); let ret = {
let mut q = (self.0).0.write().unwrap();
q.pop()
};
if let Some(v) = ret { if let Some(v) = ret {
folder = folder.consume((DynQueueHandle(self.0.clone()), v)); folder = folder.consume((DynQueueHandle(self.0.clone()), v));
@ -279,7 +233,7 @@ where
impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U> impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U>
where where
T: Send + Sync, T: Send + Sync,
U: IntoDynQueue<T, U> + Queue<T> + Send + Sync, U: Queue<T> + Send + Sync,
{ {
type Item = (DynQueueHandle<'a, T, U>, T); type Item = (DynQueueHandle<'a, T, U>, T);

View file

@ -1,4 +1,4 @@
use crate::{DynQueueHandle, IntoDynQueue, Queue}; use crate::{DynQueue, DynQueueHandle, Queue};
use std::collections::VecDeque; use std::collections::VecDeque;
const SLEEP_MS: u64 = 10; const SLEEP_MS: u64 = 10;
@ -49,7 +49,7 @@ fn dynqueue_iter_test_const_sleep() {
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64; let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
let jq = get_input().into_dyn_queue(); let jq = DynQueue::new(get_input());
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let mut res = jq let mut res = jq
@ -79,13 +79,13 @@ fn dynqueue_iter_test_const_sleep_segqueue() {
let expected = get_expected(); let expected = get_expected();
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64; let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
let jq = SegQueue::new(); let q = SegQueue::new();
get_input().drain(..).for_each(|ele| jq.push(ele)); get_input().drain(..).for_each(|ele| q.push(ele));
let jq = DynQueue::new(q);
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let mut res = jq let mut res = jq
.into_dyn_queue()
.into_par_iter() .into_par_iter()
.map(handle_queue) .map(handle_queue)
.map(|v| { .map(|v| {
@ -111,11 +111,10 @@ fn dynqueue_iter_test_const_sleep_vecdeque() {
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64; let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
let jq = VecDeque::from(get_input()); let jq = DynQueue::new(VecDeque::from(get_input()));
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let mut res = jq let mut res = jq
.into_dyn_queue()
.into_par_iter() .into_par_iter()
.map(handle_queue) .map(handle_queue)
.map(|v| { .map(|v| {
@ -138,12 +137,11 @@ fn dynqueue_iter_test_sleep_v() {
use rayon::iter::ParallelIterator as _; use rayon::iter::ParallelIterator as _;
use std::time::Duration; use std::time::Duration;
let jq = get_input(); let jq = DynQueue::new(get_input());
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let mut res = jq let mut res = jq
.into_dyn_queue()
.into_par_iter() .into_par_iter()
.map(handle_queue) .map(handle_queue)
.map(|v| { .map(|v| {
@ -163,12 +161,11 @@ fn dynqueue_iter_test_sleep_inv_v() {
use rayon::iter::ParallelIterator as _; use rayon::iter::ParallelIterator as _;
use std::time::Duration; use std::time::Duration;
let jq = get_input(); let jq = DynQueue::new(get_input());
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let mut res = jq let mut res = jq
.into_dyn_queue()
.into_par_iter() .into_par_iter()
.map(handle_queue) .map(handle_queue)
.map(|v| { .map(|v| {