Compare commits
No commits in common. "master" and "0.1.2" have entirely different histories.
7
.github/dependabot.yml
vendored
7
.github/dependabot.yml
vendored
|
@ -1,7 +0,0 @@
|
||||||
version: 2
|
|
||||||
updates:
|
|
||||||
- package-ecosystem: cargo
|
|
||||||
directory: "/"
|
|
||||||
schedule:
|
|
||||||
interval: daily
|
|
||||||
open-pull-requests-limit: 10
|
|
29
.github/workflows/coverage.yml
vendored
29
.github/workflows/coverage.yml
vendored
|
@ -1,29 +0,0 @@
|
||||||
name: coverage
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- master
|
|
||||||
pull_request:
|
|
||||||
branches:
|
|
||||||
- master
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
test:
|
|
||||||
name: coverage
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
container:
|
|
||||||
image: xd009642/tarpaulin
|
|
||||||
options: --security-opt seccomp=unconfined
|
|
||||||
steps:
|
|
||||||
- name: Checkout repository
|
|
||||||
uses: actions/checkout@v2
|
|
||||||
|
|
||||||
- name: Generate code coverage
|
|
||||||
run: |
|
|
||||||
cargo tarpaulin --verbose --all-features --workspace --timeout 120 --out Lcov --output-dir coverage
|
|
||||||
|
|
||||||
- name: Upload to coveralls
|
|
||||||
uses: coverallsapp/github-action@master
|
|
||||||
with:
|
|
||||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
|
6
.github/workflows/rust.yml
vendored
6
.github/workflows/rust.yml
vendored
|
@ -13,9 +13,7 @@ jobs:
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
- name: Run fmt
|
|
||||||
run: cargo fmt --all -- --check
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: cargo build --verbose --all-features
|
run: cargo build --verbose
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: cargo test --verbose --all-features
|
run: cargo test --verbose
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "dynqueue"
|
name = "dynqueue"
|
||||||
version = "0.3.1-alpha.0"
|
version = "0.1.2"
|
||||||
authors = ["Harald Hoyer <harald@redhat.com>"]
|
authors = ["Harald Hoyer <harald@redhat.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
@ -15,5 +15,4 @@ keywords = [ "parallel", "performance", "thread", "join", "concurrency"]
|
||||||
categories = [ "concurrency" ]
|
categories = [ "concurrency" ]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
rayon = "1.3"
|
rayon = "1.3.0"
|
||||||
crossbeam-queue = { version = "0.3", optional = true }
|
|
||||||
|
|
32
README.md
32
README.md
|
@ -1,42 +1,22 @@
|
||||||
[](https://github.com/haraldh/dynqueue/actions)
|
|
||||||
[](https://coveralls.io/github/haraldh/dynqueue?branch=master)
|
|
||||||
|
|
||||||
# DynQueue - dynamically extendable Rayon parallel iterator
|
# DynQueue - dynamically extendable Rayon parallel iterator
|
||||||
|
|
||||||
A `DynQueue<T>` can be iterated with `into_par_iter` producing `(DynQueueHandle, T)` elements.
|
A `DynQueue<T>` can be iterated with `into_par_iter` producing `(DynQueueHandle, T)` elements.
|
||||||
With the `DynQueueHandle<T>` a new `T` can be inserted in the `DynQueue<T>`,
|
With the `DynQueueHandle<T>` a new `T` can be inserted in the `DynQueue<T>`,
|
||||||
which is currently iterated over.
|
which is currently iterated over.
|
||||||
|
|
||||||
A `Vec<T>`, `VecDeque<T>` and `crossbeam_queue::SegQueue<T>` (with `feature = "crossbeam-queue"`)
|
|
||||||
can be turned into a `DynQueue<T>` with `.into_dyn_queue()`.
|
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
|
use dynqueue::DynQueue;
|
||||||
|
|
||||||
use rayon::iter::IntoParallelIterator as _;
|
use rayon::iter::IntoParallelIterator as _;
|
||||||
use rayon::iter::ParallelIterator as _;
|
use rayon::iter::ParallelIterator as _;
|
||||||
|
|
||||||
use dynqueue::IntoDynQueue as _;
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let mut result = vec![1, 2, 3]
|
let mut result = DynQueue::new(vec![1, 2, 3])
|
||||||
.into_dyn_queue()
|
.into_par_iter()
|
||||||
.into_par_iter()
|
.map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
|
||||||
.map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
|
.collect::<Vec<_>>();
|
||||||
.collect::<Vec<_>>();
|
|
||||||
result.sort();
|
result.sort();
|
||||||
|
|
||||||
assert_eq!(result, vec![1, 2, 3, 4]);
|
assert_eq!(result, vec![1, 2, 3, 4]);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## Features
|
|
||||||
|
|
||||||
* `crossbeam-queue` : to use `crossbeam::queue::SegQueue` as the inner collection.
|
|
||||||
|
|
||||||
## Changelog
|
|
||||||
|
|
||||||
### 0.2.0
|
|
||||||
- introduce `IntoDynQueue`
|
|
||||||
- handle lockless collections
|
|
||||||
|
|
||||||
### 0.1.0
|
|
||||||
- initial version
|
|
||||||
|
|
192
src/lib.rs
192
src/lib.rs
|
@ -7,21 +7,15 @@
|
||||||
//! # Example
|
//! # Example
|
||||||
//!
|
//!
|
||||||
//! ```
|
//! ```
|
||||||
|
//! use dynqueue::{DynQueue, DynQueueHandle};
|
||||||
|
//!
|
||||||
//! use rayon::iter::IntoParallelIterator as _;
|
//! use rayon::iter::IntoParallelIterator as _;
|
||||||
//! use rayon::iter::ParallelIterator as _;
|
//! use rayon::iter::ParallelIterator as _;
|
||||||
//!
|
//!
|
||||||
//! use dynqueue::IntoDynQueue as _;
|
//! let mut result = DynQueue::new(vec![1, 2, 3])
|
||||||
//!
|
//! .into_par_iter()
|
||||||
//! let mut result = vec![1, 2, 3]
|
//! .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
|
||||||
//! .into_dyn_queue()
|
//! .collect::<Vec<_>>();
|
||||||
//! .into_par_iter()
|
|
||||||
//! .map(|(handle, value)| {
|
|
||||||
//! if value == 2 {
|
|
||||||
//! handle.enqueue(4)
|
|
||||||
//! };
|
|
||||||
//! value
|
|
||||||
//! })
|
|
||||||
//! .collect::<Vec<_>>();
|
|
||||||
//! result.sort();
|
//! result.sort();
|
||||||
//!
|
//!
|
||||||
//! assert_eq!(result, vec![1, 2, 3, 4]);
|
//! assert_eq!(result, vec![1, 2, 3, 4]);
|
||||||
|
@ -32,45 +26,27 @@
|
||||||
//! The `DynQueueHandle` shall not outlive the `DynQueue` iterator
|
//! The `DynQueueHandle` shall not outlive the `DynQueue` iterator
|
||||||
//!
|
//!
|
||||||
//! ```should_panic
|
//! ```should_panic
|
||||||
//! use dynqueue::{DynQueue, DynQueueHandle, IntoDynQueue};
|
//! use dynqueue::{DynQueue, DynQueueHandle};
|
||||||
//!
|
//!
|
||||||
//! use rayon::iter::IntoParallelIterator as _;
|
//! use rayon::iter::IntoParallelIterator as _;
|
||||||
//! use rayon::iter::ParallelIterator as _;
|
//! use rayon::iter::ParallelIterator as _;
|
||||||
//! use std::sync::RwLock;
|
|
||||||
//!
|
//!
|
||||||
//! static mut STALE_HANDLE: Option<DynQueueHandle<u8, RwLock<Vec<u8>>>> = None;
|
//! static mut STALE_HANDLE : Option<DynQueueHandle<u8, Vec<u8>>> = None;
|
||||||
//!
|
//!
|
||||||
//! pub fn test_func() -> Vec<u8> {
|
//! pub fn test_func() -> Vec<u8> {
|
||||||
//! vec![1u8, 2u8, 3u8]
|
//! DynQueue::new(vec![1u8, 2u8, 3u8])
|
||||||
//! .into_dyn_queue()
|
|
||||||
//! .into_par_iter()
|
//! .into_par_iter()
|
||||||
//! .map(|(handle, value)| unsafe {
|
//! .map(|(handle, value)| unsafe { STALE_HANDLE.replace(handle); value })
|
||||||
//! STALE_HANDLE.replace(handle);
|
|
||||||
//! value
|
|
||||||
//! })
|
|
||||||
//! .collect::<Vec<_>>()
|
//! .collect::<Vec<_>>()
|
||||||
//! }
|
//! }
|
||||||
//! // test_func() panics
|
//! // test_func() panics
|
||||||
//! let result = test_func();
|
//! let result = test_func();
|
||||||
//! unsafe {
|
//! unsafe { STALE_HANDLE.as_ref().unwrap().enqueue(4); }
|
||||||
//! STALE_HANDLE.as_ref().unwrap().enqueue(4);
|
|
||||||
//! }
|
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
#![deny(clippy::all)]
|
#![deny(clippy::all)]
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
#[allow(unused)]
|
|
||||||
macro_rules! doc_comment {
|
|
||||||
($x:expr) => {
|
|
||||||
#[doc = $x]
|
|
||||||
#[doc(hidden)]
|
|
||||||
mod readme_tests {}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
doc_comment!(include_str!("../README.md"));
|
|
||||||
|
|
||||||
use rayon::iter::plumbing::{
|
use rayon::iter::plumbing::{
|
||||||
bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer,
|
bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer,
|
||||||
};
|
};
|
||||||
|
@ -81,144 +57,71 @@ use std::sync::{Arc, RwLock};
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
/// Trait to produce a new DynQueue
|
|
||||||
pub trait IntoDynQueue<T, U: Queue<T>> {
|
|
||||||
/// new
|
|
||||||
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, U>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Everything implementing `Queue` can be handled by DynQueue
|
/// Everything implementing `Queue` can be handled by DynQueue
|
||||||
#[allow(clippy::len_without_is_empty)]
|
|
||||||
pub trait Queue<T>
|
pub trait Queue<T>
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
{
|
{
|
||||||
/// push an element in the queue
|
/// push an element in the queue
|
||||||
fn push(&self, v: T);
|
fn push(&mut self, v: T);
|
||||||
|
|
||||||
/// pop an element from the queue
|
/// pop an element from the queue
|
||||||
fn pop(&self) -> Option<T>;
|
fn pop(&mut self) -> Option<T>;
|
||||||
|
|
||||||
/// number of elements in the queue
|
/// number of elements in the queue
|
||||||
fn len(&self) -> usize;
|
fn len(&self) -> usize;
|
||||||
|
|
||||||
/// split off `size` elements
|
/// split off `size` elements
|
||||||
fn split_off(&self, size: usize) -> Self;
|
fn split_off(&mut self, size: usize) -> Self;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for Vec<T> {
|
impl<T> Queue<T> for Vec<T> {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
|
fn push(&mut self, v: T) {
|
||||||
DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
|
Vec::push(self, v)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for RwLock<Vec<T>> {
|
|
||||||
#[inline(always)]
|
|
||||||
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
|
|
||||||
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Queue<T> for RwLock<Vec<T>> {
|
|
||||||
#[inline(always)]
|
|
||||||
fn push(&self, v: T) {
|
|
||||||
self.write().unwrap().push(v)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn pop(&self) -> Option<T> {
|
fn pop(&mut self) -> Option<T> {
|
||||||
self.write().unwrap().pop()
|
Vec::pop(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
self.read().unwrap().len()
|
Vec::len(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn split_off(&self, size: usize) -> Self {
|
fn split_off(&mut self, size: usize) -> Self {
|
||||||
RwLock::new(self.write().unwrap().split_off(size))
|
Vec::split_off(self, size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for VecDeque<T> {
|
impl<T> Queue<T> for VecDeque<T> {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
|
fn push(&mut self, v: T) {
|
||||||
DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
|
VecDeque::push_back(self, v)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for RwLock<VecDeque<T>> {
|
|
||||||
#[inline(always)]
|
|
||||||
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
|
|
||||||
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Queue<T> for RwLock<VecDeque<T>> {
|
|
||||||
#[inline(always)]
|
|
||||||
fn push(&self, v: T) {
|
|
||||||
self.write().unwrap().push_back(v)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn pop(&self) -> Option<T> {
|
fn pop(&mut self) -> Option<T> {
|
||||||
self.write().unwrap().pop_front()
|
VecDeque::pop_front(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
self.read().unwrap().len()
|
VecDeque::len(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn split_off(&self, size: usize) -> Self {
|
fn split_off(&mut self, size: usize) -> Self {
|
||||||
RwLock::new(self.write().unwrap().split_off(size))
|
VecDeque::split_off(self, size)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "crossbeam-queue")]
|
|
||||||
use crossbeam_queue::SegQueue;
|
|
||||||
|
|
||||||
#[cfg(feature = "crossbeam-queue")]
|
|
||||||
impl<T> IntoDynQueue<T, SegQueue<T>> for SegQueue<T> {
|
|
||||||
#[inline(always)]
|
|
||||||
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, Self> {
|
|
||||||
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "crossbeam-queue")]
|
|
||||||
impl<T> Queue<T> for SegQueue<T> {
|
|
||||||
#[inline(always)]
|
|
||||||
fn push(&self, v: T) {
|
|
||||||
SegQueue::push(self, v);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
fn pop(&self) -> Option<T> {
|
|
||||||
SegQueue::pop(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
fn len(&self) -> usize {
|
|
||||||
SegQueue::len(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
fn split_off(&self, size: usize) -> Self {
|
|
||||||
let q = SegQueue::new();
|
|
||||||
(0..size)
|
|
||||||
.filter_map(|_| Queue::pop(self))
|
|
||||||
.for_each(|ele| q.push(ele));
|
|
||||||
q
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue`
|
// PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue`
|
||||||
// but does not always.
|
// but does not always.
|
||||||
struct DynQueueInner<'a, T, U: Queue<T>>(U, PhantomData<&'a T>);
|
struct DynQueueInner<'a, T, U: Queue<T>>(std::sync::RwLock<U>, PhantomData<&'a T>);
|
||||||
|
|
||||||
/// The `DynQueueHandle` returned by the iterator in addition to `T`
|
/// The `DynQueueHandle` returned by the iterator in addition to `T`
|
||||||
pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
|
pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
|
||||||
|
@ -227,26 +130,40 @@ impl<'a, T, U: Queue<T>> DynQueueHandle<'a, T, U> {
|
||||||
/// Enqueue `T` in the `DynQueue<T>`, which is currently iterated.
|
/// Enqueue `T` in the `DynQueue<T>`, which is currently iterated.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn enqueue(&self, job: T) {
|
pub fn enqueue(&self, job: T) {
|
||||||
(self.0).0.push(job)
|
(self.0).0.write().unwrap().push(job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The `DynQueue<T>` which can be parallel iterated over
|
/// The `DynQueue<T>` which can be parallel iterated over
|
||||||
pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
|
pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
|
||||||
|
|
||||||
|
impl<'a, T, U: Queue<T>> DynQueue<'a, T, U> {
|
||||||
|
/// Create a new `DynQueue<T>` from a `Vec<T>`
|
||||||
|
#[inline]
|
||||||
|
pub fn new(lifo: U) -> Self {
|
||||||
|
Self(Arc::new(DynQueueInner(RwLock::new(lifo), PhantomData)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U>
|
impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U>
|
||||||
where
|
where
|
||||||
T: Send + Sync,
|
T: Send + Sync,
|
||||||
U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
|
U: Queue<T> + Send + Sync,
|
||||||
{
|
{
|
||||||
type Item = (DynQueueHandle<'a, T, U>, T);
|
type Item = (DynQueueHandle<'a, T, U>, T);
|
||||||
|
|
||||||
fn split(self) -> (Self, Option<Self>) {
|
fn split(self) -> (Self, Option<Self>) {
|
||||||
let len = (self.0).0.len();
|
let len = {
|
||||||
|
let q = (self.0).0.read().unwrap();
|
||||||
|
q.len()
|
||||||
|
};
|
||||||
if len >= 2 {
|
if len >= 2 {
|
||||||
let new_q = (self.0).0.split_off(len / 2);
|
let new_q = {
|
||||||
(self, Some(new_q.into_dyn_queue()))
|
let mut q = (self.0).0.write().unwrap();
|
||||||
|
let split_off = q.split_off(len / 2);
|
||||||
|
DynQueue::new(split_off)
|
||||||
|
};
|
||||||
|
(self, Some(new_q))
|
||||||
} else {
|
} else {
|
||||||
(self, None)
|
(self, None)
|
||||||
}
|
}
|
||||||
|
@ -258,7 +175,10 @@ where
|
||||||
{
|
{
|
||||||
let mut folder = folder;
|
let mut folder = folder;
|
||||||
loop {
|
loop {
|
||||||
let ret = (self.0).0.pop();
|
let ret = {
|
||||||
|
let mut q = (self.0).0.write().unwrap();
|
||||||
|
q.pop()
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(v) = ret {
|
if let Some(v) = ret {
|
||||||
folder = folder.consume((DynQueueHandle(self.0.clone()), v));
|
folder = folder.consume((DynQueueHandle(self.0.clone()), v));
|
||||||
|
@ -279,7 +199,7 @@ where
|
||||||
impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U>
|
impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U>
|
||||||
where
|
where
|
||||||
T: Send + Sync,
|
T: Send + Sync,
|
||||||
U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
|
U: Queue<T> + Send + Sync,
|
||||||
{
|
{
|
||||||
type Item = (DynQueueHandle<'a, T, U>, T);
|
type Item = (DynQueueHandle<'a, T, U>, T);
|
||||||
|
|
||||||
|
|
46
src/tests.rs
46
src/tests.rs
|
@ -1,4 +1,4 @@
|
||||||
use crate::{DynQueueHandle, IntoDynQueue, Queue};
|
use crate::{DynQueue, DynQueueHandle, Queue};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
const SLEEP_MS: u64 = 10;
|
const SLEEP_MS: u64 = 10;
|
||||||
|
@ -49,7 +49,7 @@ fn dynqueue_iter_test_const_sleep() {
|
||||||
|
|
||||||
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
|
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
|
||||||
|
|
||||||
let jq = get_input().into_dyn_queue();
|
let jq = DynQueue::new(get_input());
|
||||||
let now = std::time::Instant::now();
|
let now = std::time::Instant::now();
|
||||||
|
|
||||||
let mut res = jq
|
let mut res = jq
|
||||||
|
@ -69,39 +69,6 @@ fn dynqueue_iter_test_const_sleep() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "crossbeam-queue")]
|
|
||||||
#[test]
|
|
||||||
fn dynqueue_iter_test_const_sleep_segqueue() {
|
|
||||||
use crossbeam_queue::SegQueue;
|
|
||||||
use rayon::iter::IntoParallelIterator as _;
|
|
||||||
use rayon::iter::ParallelIterator as _;
|
|
||||||
use std::time::Duration;
|
|
||||||
let expected = get_expected();
|
|
||||||
|
|
||||||
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
|
|
||||||
let jq = SegQueue::new();
|
|
||||||
get_input().drain(..).for_each(|ele| jq.push(ele));
|
|
||||||
|
|
||||||
let now = std::time::Instant::now();
|
|
||||||
|
|
||||||
let mut res = jq
|
|
||||||
.into_dyn_queue()
|
|
||||||
.into_par_iter()
|
|
||||||
.map(handle_queue)
|
|
||||||
.map(|v| {
|
|
||||||
std::thread::sleep(Duration::from_millis(SLEEP_MS * med));
|
|
||||||
v
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
eprintln!("elapsed = {:#?}", now.elapsed());
|
|
||||||
res.sort();
|
|
||||||
assert_eq!(res, expected);
|
|
||||||
eprintln!(
|
|
||||||
"instead of = {}ms",
|
|
||||||
res.iter().count() * med as usize * SLEEP_MS as usize
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn dynqueue_iter_test_const_sleep_vecdeque() {
|
fn dynqueue_iter_test_const_sleep_vecdeque() {
|
||||||
use rayon::iter::IntoParallelIterator as _;
|
use rayon::iter::IntoParallelIterator as _;
|
||||||
|
@ -111,11 +78,10 @@ fn dynqueue_iter_test_const_sleep_vecdeque() {
|
||||||
|
|
||||||
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
|
let med = expected.iter().sum::<u64>() / expected.iter().count() as u64;
|
||||||
|
|
||||||
let jq = VecDeque::from(get_input());
|
let jq = DynQueue::new(VecDeque::from(get_input()));
|
||||||
let now = std::time::Instant::now();
|
let now = std::time::Instant::now();
|
||||||
|
|
||||||
let mut res = jq
|
let mut res = jq
|
||||||
.into_dyn_queue()
|
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(handle_queue)
|
.map(handle_queue)
|
||||||
.map(|v| {
|
.map(|v| {
|
||||||
|
@ -138,12 +104,11 @@ fn dynqueue_iter_test_sleep_v() {
|
||||||
use rayon::iter::ParallelIterator as _;
|
use rayon::iter::ParallelIterator as _;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
let jq = get_input();
|
let jq = DynQueue::new(get_input());
|
||||||
|
|
||||||
let now = std::time::Instant::now();
|
let now = std::time::Instant::now();
|
||||||
|
|
||||||
let mut res = jq
|
let mut res = jq
|
||||||
.into_dyn_queue()
|
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(handle_queue)
|
.map(handle_queue)
|
||||||
.map(|v| {
|
.map(|v| {
|
||||||
|
@ -163,12 +128,11 @@ fn dynqueue_iter_test_sleep_inv_v() {
|
||||||
use rayon::iter::ParallelIterator as _;
|
use rayon::iter::ParallelIterator as _;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
let jq = get_input();
|
let jq = DynQueue::new(get_input());
|
||||||
|
|
||||||
let now = std::time::Instant::now();
|
let now = std::time::Instant::now();
|
||||||
|
|
||||||
let mut res = jq
|
let mut res = jq
|
||||||
.into_dyn_queue()
|
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(handle_queue)
|
.map(handle_queue)
|
||||||
.map(|v| {
|
.map(|v| {
|
||||||
|
|
Loading…
Reference in a new issue