From 95b6a2d70a7d1f7f0b01d14f413adf4227209c63 Mon Sep 17 00:00:00 2001 From: Harald Hoyer Date: Mon, 7 Apr 2025 08:54:00 +0200 Subject: [PATCH] refactor(verify-era-proof-attestation): replace watch channel with `CancellationToken` Refactored stop signal handling in all components to use `tokio_util::sync::CancellationToken` instead of `tokio::sync::watch`. - Improved cancellation logic by leveraging `CancellationToken` for cleaner and more efficient handling. - Updated corresponding dependency to `tokio-util` version `0.7.14`. --- Cargo.lock | 5 +++-- bin/verify-era-proof-attestation/Cargo.toml | 1 + bin/verify-era-proof-attestation/src/main.rs | 19 ++++++++++-------- .../src/processor/batch_processor.rs | 12 +++++------ .../src/processor/continuous_processor.rs | 14 +++++-------- .../src/processor/mod.rs | 11 ++++------ .../src/processor/one_shot_processor.rs | 9 +++------ .../src/proof/fetcher.rs | 20 +++++++++---------- .../src/verification/batch.rs | 6 +++--- 9 files changed, 45 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 08b0c8c..9e9d369 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4962,9 +4962,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" dependencies = [ "bytes", "futures-core", @@ -5386,6 +5386,7 @@ dependencies = [ "teepot", "thiserror 2.0.11", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "url", diff --git a/bin/verify-era-proof-attestation/Cargo.toml b/bin/verify-era-proof-attestation/Cargo.toml index 533a3dc..a566639 100644 --- a/bin/verify-era-proof-attestation/Cargo.toml +++ b/bin/verify-era-proof-attestation/Cargo.toml @@ -22,6 +22,7 @@ serde_yaml = "0.9.33" teepot.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util = "0.7.14" tracing.workspace = true tracing-subscriber.workspace = true url.workspace = true diff --git a/bin/verify-era-proof-attestation/src/main.rs b/bin/verify-era-proof-attestation/src/main.rs index 9fa03ae..2f66b43 100644 --- a/bin/verify-era-proof-attestation/src/main.rs +++ b/bin/verify-era-proof-attestation/src/main.rs @@ -10,15 +10,15 @@ mod processor; mod proof; mod verification; -use clap::Parser; -use error::Result; -use tokio::{signal, sync::watch}; - use crate::{ core::{VerifierConfig, VerifierConfigArgs}, error::Error, processor::ProcessorFactory, }; +use clap::Parser; +use error::Result; +use tokio::signal; +use tokio_util::sync::CancellationToken; #[tokio::main] async fn main() -> Result<()> { @@ -35,14 +35,17 @@ async fn main() -> Result<()> { // Create processor based on config let (processor, mode) = ProcessorFactory::create(config.clone())?; - // Set up stop channel - let (stop_sender, stop_receiver) = watch::channel(false); + // Set up a cancellation Token + let token = CancellationToken::new(); // Log startup information tracing::info!("Starting verification in {}", mode); // Spawn processing task - let mut process_handle = tokio::spawn(async move { processor.run(stop_receiver).await }); + let mut process_handle = { + let token = token.clone(); + tokio::spawn(async move { processor.run(token).await }) + }; // Wait for processing to complete or for stop signal tokio::select! { @@ -77,7 +80,7 @@ async fn main() -> Result<()> { }, _ = signal::ctrl_c() => { tracing::info!("Stop signal received, shutting down gracefully..."); - stop_sender.send(true).ok(); + token.cancel(); // Wait for processor to complete gracefully match process_handle.await { diff --git a/bin/verify-era-proof-attestation/src/processor/batch_processor.rs b/bin/verify-era-proof-attestation/src/processor/batch_processor.rs index f7520c2..c7e8cbf 100644 --- a/bin/verify-era-proof-attestation/src/processor/batch_processor.rs +++ b/bin/verify-era-proof-attestation/src/processor/batch_processor.rs @@ -4,7 +4,7 @@ //! Core functionality for processing individual batches use crate::error; -use tokio::sync::watch; +use tokio_util::sync::CancellationToken; use zksync_basic_types::L1BatchNumber; use crate::{ @@ -41,10 +41,10 @@ impl BatchProcessor { /// Process a single batch and return the verification result pub async fn process_batch( &self, - stop_receiver: &mut watch::Receiver, + token: &CancellationToken, batch_number: L1BatchNumber, ) -> error::Result { - if *stop_receiver.borrow() { + if token.is_cancelled() { tracing::info!("Stop signal received, shutting down"); return Ok(VerificationResult::Interrupted); } @@ -56,7 +56,7 @@ impl BatchProcessor { for tee_type in self.config.args.tee_types.iter() { match self .proof_fetcher - .get_proofs(stop_receiver, batch_number, tee_type) + .get_proofs(token, batch_number, tee_type) .await { Ok(batch_proofs) => proofs.extend(batch_proofs), @@ -81,7 +81,7 @@ impl BatchProcessor { // Verify proofs for the current batch let verification_result = self .batch_verifier - .verify_batch_proofs(stop_receiver, batch_number, proofs) + .verify_batch_proofs(token, batch_number, proofs) .await?; let result = if verification_result.total_count == 0 { @@ -103,7 +103,7 @@ impl BatchProcessor { if !matches!(result, VerificationResult::Interrupted) && self.config.args.rate_limit.as_millis() > 0 { - tokio::time::timeout(self.config.args.rate_limit, stop_receiver.changed()) + tokio::time::timeout(self.config.args.rate_limit, token.cancelled()) .await .ok(); } diff --git a/bin/verify-era-proof-attestation/src/processor/continuous_processor.rs b/bin/verify-era-proof-attestation/src/processor/continuous_processor.rs index 4e66ce0..9796367 100644 --- a/bin/verify-era-proof-attestation/src/processor/continuous_processor.rs +++ b/bin/verify-era-proof-attestation/src/processor/continuous_processor.rs @@ -3,7 +3,7 @@ //! Continuous batch processor for ongoing verification of new batches -use tokio::sync::watch; +use tokio_util::sync::CancellationToken; use zksync_basic_types::L1BatchNumber; use crate::{ @@ -32,7 +32,7 @@ impl ContinuousProcessor { /// Run the processor until stopped pub async fn run( &self, - mut stop_receiver: watch::Receiver, + token: &CancellationToken, ) -> error::Result> { tracing::info!( "Starting continuous verification from batch {}", @@ -45,13 +45,9 @@ impl ContinuousProcessor { let mut current_batch = self.start_batch.0; // Continue processing batches until stopped or reaching maximum batch number - while !*stop_receiver.borrow() { + while !token.is_cancelled() { let batch = L1BatchNumber(current_batch); - match self - .batch_processor - .process_batch(&mut stop_receiver, batch) - .await - { + match self.batch_processor.process_batch(token, batch).await { Ok(result) => { match result { VerificationResult::Success => success_count += 1, @@ -64,7 +60,7 @@ impl ContinuousProcessor { VerificationResult::NoProofsFound => { // In continuous mode, we might hit batches that don't have proofs yet // Wait a bit longer before retrying - if !*stop_receiver.borrow() { + if !token.is_cancelled() { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; // Don't increment batch number, try again continue; diff --git a/bin/verify-era-proof-attestation/src/processor/mod.rs b/bin/verify-era-proof-attestation/src/processor/mod.rs index 449f694..75df936 100644 --- a/bin/verify-era-proof-attestation/src/processor/mod.rs +++ b/bin/verify-era-proof-attestation/src/processor/mod.rs @@ -15,7 +15,7 @@ use crate::{ core::{VerificationResult, VerifierConfig, VerifierMode}, error::Result, }; -use tokio::sync::watch; +use tokio_util::sync::CancellationToken; // Using an enum instead of a trait because async functions in traits can't be used in trait objects /// Processor variants for different verification modes @@ -28,13 +28,10 @@ pub enum ProcessorType { impl ProcessorType { /// Run the processor until completion or interruption - pub async fn run( - &self, - stop_receiver: watch::Receiver, - ) -> Result> { + pub async fn run(&self, token: CancellationToken) -> Result> { match self { - ProcessorType::OneShot(processor) => processor.run(stop_receiver).await, - ProcessorType::Continuous(processor) => processor.run(stop_receiver).await, + ProcessorType::OneShot(processor) => processor.run(&token).await, + ProcessorType::Continuous(processor) => processor.run(&token).await, } } } diff --git a/bin/verify-era-proof-attestation/src/processor/one_shot_processor.rs b/bin/verify-era-proof-attestation/src/processor/one_shot_processor.rs index 469e9de..1bd7d77 100644 --- a/bin/verify-era-proof-attestation/src/processor/one_shot_processor.rs +++ b/bin/verify-era-proof-attestation/src/processor/one_shot_processor.rs @@ -4,7 +4,7 @@ //! One-shot batch processor for verifying a single batch or a range of batches use crate::error; -use tokio::sync::watch; +use tokio_util::sync::CancellationToken; use zksync_basic_types::L1BatchNumber; use crate::{ @@ -38,7 +38,7 @@ impl OneShotProcessor { /// Run the processor until completion or interruption pub async fn run( &self, - mut stop_receiver: watch::Receiver, + token: &CancellationToken, ) -> error::Result> { tracing::info!( "Starting one-shot verification of batches {} to {}", @@ -52,10 +52,7 @@ impl OneShotProcessor { for batch_number in self.start_batch.0..=self.end_batch.0 { let batch = L1BatchNumber(batch_number); - let result = self - .batch_processor - .process_batch(&mut stop_receiver, batch) - .await?; + let result = self.batch_processor.process_batch(token, batch).await?; match result { VerificationResult::Success => success_count += 1, diff --git a/bin/verify-era-proof-attestation/src/proof/fetcher.rs b/bin/verify-era-proof-attestation/src/proof/fetcher.rs index 0a22445..d8d0322 100644 --- a/bin/verify-era-proof-attestation/src/proof/fetcher.rs +++ b/bin/verify-era-proof-attestation/src/proof/fetcher.rs @@ -10,7 +10,7 @@ use crate::{ }, }; use std::time::Duration; -use tokio::sync::watch; +use tokio_util::sync::CancellationToken; use url::Url; use zksync_basic_types::{tee_types::TeeType, L1BatchNumber}; @@ -34,7 +34,7 @@ impl ProofFetcher { /// Get proofs for a batch number with retry logic pub async fn get_proofs( &self, - stop_receiver: &mut watch::Receiver, + token: &CancellationToken, batch_number: L1BatchNumber, tee_type: &TeeType, ) -> Result> { @@ -43,8 +43,8 @@ impl ProofFetcher { let max_backoff = Duration::from_secs(128); let retry_backoff_multiplier: f32 = 2.0; - while !*stop_receiver.borrow() { - match self.send_request(&proofs_request, stop_receiver).await { + while !token.is_cancelled() { + match self.send_request(&proofs_request, token).await { Ok(response) => { // Parse the response using the ProofResponseParser match ProofResponseParser::parse_response(response) { @@ -84,9 +84,7 @@ impl ProofFetcher { } } - tokio::time::timeout(backoff, stop_receiver.changed()) - .await - .ok(); + tokio::time::timeout(backoff, token.cancelled()).await.ok(); backoff = std::cmp::min( Duration::from_millis( @@ -95,13 +93,13 @@ impl ProofFetcher { max_backoff, ); - if *stop_receiver.borrow() { + if token.is_cancelled() { break; } } // If we've reached this point, we've either been stopped or exhausted retries - if *stop_receiver.borrow() { + if token.is_cancelled() { // Return empty vector if stopped Ok(vec![]) } else { @@ -114,7 +112,7 @@ impl ProofFetcher { async fn send_request( &self, request: &GetProofsRequest, - stop_receiver: &mut watch::Receiver, + token: &CancellationToken, ) -> Result { let retry_helper = RetryHelper::new(self.retry_config.clone()); let request_clone = request.clone(); @@ -128,7 +126,7 @@ impl ProofFetcher { .await; // Check if we need to abort due to stop signal - if *stop_receiver.borrow() { + if token.is_cancelled() { return Err(Error::Interrupted); } diff --git a/bin/verify-era-proof-attestation/src/verification/batch.rs b/bin/verify-era-proof-attestation/src/verification/batch.rs index 53c3438..5542f05 100644 --- a/bin/verify-era-proof-attestation/src/verification/batch.rs +++ b/bin/verify-era-proof-attestation/src/verification/batch.rs @@ -8,7 +8,7 @@ use crate::{ proof::Proof, verification::{AttestationVerifier, PolicyEnforcer, SignatureVerifier, VerificationReporter}, }; -use tokio::sync::watch; +use tokio_util::sync::CancellationToken; use zksync_basic_types::L1BatchNumber; /// Result of a batch verification @@ -40,7 +40,7 @@ impl BatchVerifier { /// Verify proofs for a batch pub async fn verify_batch_proofs( &self, - stop_receiver: &mut watch::Receiver, + token: &CancellationToken, batch_number: L1BatchNumber, proofs: Vec, ) -> error::Result { @@ -49,7 +49,7 @@ impl BatchVerifier { let mut verified_proofs_count: u32 = 0; for proof in proofs.into_iter() { - if *stop_receiver.borrow() { + if token.is_cancelled() { tracing::warn!("Stop signal received during batch verification"); return Ok(BatchVerificationResult { total_count: total_proofs_count,