impl<T> Queue<T> for crossbeam_queue::SegQueue<T>

Resolves: https://github.com/haraldh/dynqueue/issues/2
This commit is contained in:
Harald Hoyer 2020-06-22 12:02:17 +02:00
parent 00d6c86637
commit 84d72fc0f8
4 changed files with 75 additions and 2 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "dynqueue" name = "dynqueue"
version = "0.1.2" version = "0.1.3"
authors = ["Harald Hoyer <harald@redhat.com>"] authors = ["Harald Hoyer <harald@redhat.com>"]
edition = "2018" edition = "2018"
@ -16,3 +16,4 @@ categories = [ "concurrency" ]
[dependencies] [dependencies]
rayon = "1.3.0" rayon = "1.3.0"
crossbeam-queue = { version = "0.2", optional = true }

View file

@ -23,3 +23,8 @@ fn main() {
assert_eq!(result, vec![1, 2, 3, 4]); assert_eq!(result, vec![1, 2, 3, 4]);
} }
``` ```
## Features
* `crossbeam-queue` : to use `crossbeam::queue::SegQueue` as the inner collection.

View file

@ -119,6 +119,36 @@ impl<T> Queue<T> for VecDeque<T> {
} }
} }
#[cfg(feature = "crossbeam-queue")]
use crossbeam_queue::SegQueue;
#[cfg(feature = "crossbeam-queue")]
impl<T> Queue<T> for SegQueue<T> {
#[inline(always)]
fn push(&mut self, v: T) {
SegQueue::push(self, v);
}
#[inline(always)]
fn pop(&mut self) -> Option<T> {
SegQueue::pop(self).ok()
}
#[inline(always)]
fn len(&self) -> usize {
SegQueue::len(self)
}
#[inline(always)]
fn split_off(&mut self, size: usize) -> Self {
let q = SegQueue::new();
(0..size)
.filter_map(|_| self.pop())
.for_each(|ele| q.push(ele));
q
}
}
// PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue` // PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue`
// but does not always. // but does not always.
struct DynQueueInner<'a, T, U: Queue<T>>(std::sync::RwLock<U>, PhantomData<&'a T>); struct DynQueueInner<'a, T, U: Queue<T>>(std::sync::RwLock<U>, PhantomData<&'a T>);
@ -137,7 +167,11 @@ impl<'a, T, U: Queue<T>> DynQueueHandle<'a, T, U> {
/// The `DynQueue<T>` which can be parallel iterated over /// The `DynQueue<T>` which can be parallel iterated over
pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>); pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
impl<'a, T, U: Queue<T>> DynQueue<'a, T, U> { impl<'a, T, U: Queue<T>> DynQueue<'a, T, U>
where
T: Send + Sync,
U: Queue<T> + Send + Sync,
{
/// Create a new `DynQueue<T>` from a `Vec<T>` /// Create a new `DynQueue<T>` from a `Vec<T>`
#[inline] #[inline]
pub fn new(lifo: U) -> Self { pub fn new(lifo: U) -> Self {

View file

@ -69,6 +69,39 @@ fn dynqueue_iter_test_const_sleep() {
); );
} }
#[cfg(feature = "crossbeam-queue")]
#[test]
fn dynqueue_iter_test_const_sleep_segqueue() {
use crossbeam_queue::SegQueue;
use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _;
use std::time::Duration;
let expected = get_expected();
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
let q = SegQueue::new();
get_input().drain(..).for_each(|ele| q.push(ele));
let jq = DynQueue::new(q);
let now = std::time::Instant::now();
let mut res = jq
.into_par_iter()
.map(handle_queue)
.map(|v| {
std::thread::sleep(Duration::from_millis(SLEEP_MS * med));
v
})
.collect::<Vec<_>>();
eprintln!("elapsed = {:#?}", now.elapsed());
res.sort();
assert_eq!(res, expected);
eprintln!(
"instead of = {}ms",
res.iter().count() * med as usize * SLEEP_MS as usize
);
}
#[test] #[test]
fn dynqueue_iter_test_const_sleep_vecdeque() { fn dynqueue_iter_test_const_sleep_vecdeque() {
use rayon::iter::IntoParallelIterator as _; use rayon::iter::IntoParallelIterator as _;