refactor(verify-era-proof-attestation): replace watch channel with CancellationToken

Refactored stop signal handling in all components to use `tokio_util::sync::CancellationToken` instead of `tokio::sync::watch`.

- Improved cancellation logic by leveraging `CancellationToken` for cleaner and more efficient handling.
- Updated corresponding dependency to `tokio-util` version `0.7.14`.
This commit is contained in:
Harald Hoyer 2025-04-07 08:54:00 +02:00
parent 2605e2ae3a
commit 95b6a2d70a
Signed by: harald
GPG key ID: F519A1143B3FBE32
9 changed files with 45 additions and 52 deletions

5
Cargo.lock generated
View file

@ -4962,9 +4962,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.13" version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
@ -5386,6 +5386,7 @@ dependencies = [
"teepot", "teepot",
"thiserror 2.0.11", "thiserror 2.0.11",
"tokio", "tokio",
"tokio-util",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"url", "url",

View file

@ -22,6 +22,7 @@ serde_yaml = "0.9.33"
teepot.workspace = true teepot.workspace = true
thiserror.workspace = true thiserror.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-util = "0.7.14"
tracing.workspace = true tracing.workspace = true
tracing-subscriber.workspace = true tracing-subscriber.workspace = true
url.workspace = true url.workspace = true

View file

@ -10,15 +10,15 @@ mod processor;
mod proof; mod proof;
mod verification; mod verification;
use clap::Parser;
use error::Result;
use tokio::{signal, sync::watch};
use crate::{ use crate::{
core::{VerifierConfig, VerifierConfigArgs}, core::{VerifierConfig, VerifierConfigArgs},
error::Error, error::Error,
processor::ProcessorFactory, processor::ProcessorFactory,
}; };
use clap::Parser;
use error::Result;
use tokio::signal;
use tokio_util::sync::CancellationToken;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -35,14 +35,17 @@ async fn main() -> Result<()> {
// Create processor based on config // Create processor based on config
let (processor, mode) = ProcessorFactory::create(config.clone())?; let (processor, mode) = ProcessorFactory::create(config.clone())?;
// Set up stop channel // Set up a cancellation Token
let (stop_sender, stop_receiver) = watch::channel(false); let token = CancellationToken::new();
// Log startup information // Log startup information
tracing::info!("Starting verification in {}", mode); tracing::info!("Starting verification in {}", mode);
// Spawn processing task // Spawn processing task
let mut process_handle = tokio::spawn(async move { processor.run(stop_receiver).await }); let mut process_handle = {
let token = token.clone();
tokio::spawn(async move { processor.run(token).await })
};
// Wait for processing to complete or for stop signal // Wait for processing to complete or for stop signal
tokio::select! { tokio::select! {
@ -77,7 +80,7 @@ async fn main() -> Result<()> {
}, },
_ = signal::ctrl_c() => { _ = signal::ctrl_c() => {
tracing::info!("Stop signal received, shutting down gracefully..."); tracing::info!("Stop signal received, shutting down gracefully...");
stop_sender.send(true).ok(); token.cancel();
// Wait for processor to complete gracefully // Wait for processor to complete gracefully
match process_handle.await { match process_handle.await {

View file

@ -4,7 +4,7 @@
//! Core functionality for processing individual batches //! Core functionality for processing individual batches
use crate::error; use crate::error;
use tokio::sync::watch; use tokio_util::sync::CancellationToken;
use zksync_basic_types::L1BatchNumber; use zksync_basic_types::L1BatchNumber;
use crate::{ use crate::{
@ -41,10 +41,10 @@ impl BatchProcessor {
/// Process a single batch and return the verification result /// Process a single batch and return the verification result
pub async fn process_batch( pub async fn process_batch(
&self, &self,
stop_receiver: &mut watch::Receiver<bool>, token: &CancellationToken,
batch_number: L1BatchNumber, batch_number: L1BatchNumber,
) -> error::Result<VerificationResult> { ) -> error::Result<VerificationResult> {
if *stop_receiver.borrow() { if token.is_cancelled() {
tracing::info!("Stop signal received, shutting down"); tracing::info!("Stop signal received, shutting down");
return Ok(VerificationResult::Interrupted); return Ok(VerificationResult::Interrupted);
} }
@ -56,7 +56,7 @@ impl BatchProcessor {
for tee_type in self.config.args.tee_types.iter() { for tee_type in self.config.args.tee_types.iter() {
match self match self
.proof_fetcher .proof_fetcher
.get_proofs(stop_receiver, batch_number, tee_type) .get_proofs(token, batch_number, tee_type)
.await .await
{ {
Ok(batch_proofs) => proofs.extend(batch_proofs), Ok(batch_proofs) => proofs.extend(batch_proofs),
@ -81,7 +81,7 @@ impl BatchProcessor {
// Verify proofs for the current batch // Verify proofs for the current batch
let verification_result = self let verification_result = self
.batch_verifier .batch_verifier
.verify_batch_proofs(stop_receiver, batch_number, proofs) .verify_batch_proofs(token, batch_number, proofs)
.await?; .await?;
let result = if verification_result.total_count == 0 { let result = if verification_result.total_count == 0 {
@ -103,7 +103,7 @@ impl BatchProcessor {
if !matches!(result, VerificationResult::Interrupted) if !matches!(result, VerificationResult::Interrupted)
&& self.config.args.rate_limit.as_millis() > 0 && self.config.args.rate_limit.as_millis() > 0
{ {
tokio::time::timeout(self.config.args.rate_limit, stop_receiver.changed()) tokio::time::timeout(self.config.args.rate_limit, token.cancelled())
.await .await
.ok(); .ok();
} }

View file

@ -3,7 +3,7 @@
//! Continuous batch processor for ongoing verification of new batches //! Continuous batch processor for ongoing verification of new batches
use tokio::sync::watch; use tokio_util::sync::CancellationToken;
use zksync_basic_types::L1BatchNumber; use zksync_basic_types::L1BatchNumber;
use crate::{ use crate::{
@ -32,7 +32,7 @@ impl ContinuousProcessor {
/// Run the processor until stopped /// Run the processor until stopped
pub async fn run( pub async fn run(
&self, &self,
mut stop_receiver: watch::Receiver<bool>, token: &CancellationToken,
) -> error::Result<Vec<(u32, VerificationResult)>> { ) -> error::Result<Vec<(u32, VerificationResult)>> {
tracing::info!( tracing::info!(
"Starting continuous verification from batch {}", "Starting continuous verification from batch {}",
@ -45,13 +45,9 @@ impl ContinuousProcessor {
let mut current_batch = self.start_batch.0; let mut current_batch = self.start_batch.0;
// Continue processing batches until stopped or reaching maximum batch number // Continue processing batches until stopped or reaching maximum batch number
while !*stop_receiver.borrow() { while !token.is_cancelled() {
let batch = L1BatchNumber(current_batch); let batch = L1BatchNumber(current_batch);
match self match self.batch_processor.process_batch(token, batch).await {
.batch_processor
.process_batch(&mut stop_receiver, batch)
.await
{
Ok(result) => { Ok(result) => {
match result { match result {
VerificationResult::Success => success_count += 1, VerificationResult::Success => success_count += 1,
@ -64,7 +60,7 @@ impl ContinuousProcessor {
VerificationResult::NoProofsFound => { VerificationResult::NoProofsFound => {
// In continuous mode, we might hit batches that don't have proofs yet // In continuous mode, we might hit batches that don't have proofs yet
// Wait a bit longer before retrying // Wait a bit longer before retrying
if !*stop_receiver.borrow() { if !token.is_cancelled() {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Don't increment batch number, try again // Don't increment batch number, try again
continue; continue;

View file

@ -15,7 +15,7 @@ use crate::{
core::{VerificationResult, VerifierConfig, VerifierMode}, core::{VerificationResult, VerifierConfig, VerifierMode},
error::Result, error::Result,
}; };
use tokio::sync::watch; use tokio_util::sync::CancellationToken;
// Using an enum instead of a trait because async functions in traits can't be used in trait objects // Using an enum instead of a trait because async functions in traits can't be used in trait objects
/// Processor variants for different verification modes /// Processor variants for different verification modes
@ -28,13 +28,10 @@ pub enum ProcessorType {
impl ProcessorType { impl ProcessorType {
/// Run the processor until completion or interruption /// Run the processor until completion or interruption
pub async fn run( pub async fn run(&self, token: CancellationToken) -> Result<Vec<(u32, VerificationResult)>> {
&self,
stop_receiver: watch::Receiver<bool>,
) -> Result<Vec<(u32, VerificationResult)>> {
match self { match self {
ProcessorType::OneShot(processor) => processor.run(stop_receiver).await, ProcessorType::OneShot(processor) => processor.run(&token).await,
ProcessorType::Continuous(processor) => processor.run(stop_receiver).await, ProcessorType::Continuous(processor) => processor.run(&token).await,
} }
} }
} }

View file

@ -4,7 +4,7 @@
//! One-shot batch processor for verifying a single batch or a range of batches //! One-shot batch processor for verifying a single batch or a range of batches
use crate::error; use crate::error;
use tokio::sync::watch; use tokio_util::sync::CancellationToken;
use zksync_basic_types::L1BatchNumber; use zksync_basic_types::L1BatchNumber;
use crate::{ use crate::{
@ -38,7 +38,7 @@ impl OneShotProcessor {
/// Run the processor until completion or interruption /// Run the processor until completion or interruption
pub async fn run( pub async fn run(
&self, &self,
mut stop_receiver: watch::Receiver<bool>, token: &CancellationToken,
) -> error::Result<Vec<(u32, VerificationResult)>> { ) -> error::Result<Vec<(u32, VerificationResult)>> {
tracing::info!( tracing::info!(
"Starting one-shot verification of batches {} to {}", "Starting one-shot verification of batches {} to {}",
@ -52,10 +52,7 @@ impl OneShotProcessor {
for batch_number in self.start_batch.0..=self.end_batch.0 { for batch_number in self.start_batch.0..=self.end_batch.0 {
let batch = L1BatchNumber(batch_number); let batch = L1BatchNumber(batch_number);
let result = self let result = self.batch_processor.process_batch(token, batch).await?;
.batch_processor
.process_batch(&mut stop_receiver, batch)
.await?;
match result { match result {
VerificationResult::Success => success_count += 1, VerificationResult::Success => success_count += 1,

View file

@ -10,7 +10,7 @@ use crate::{
}, },
}; };
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch; use tokio_util::sync::CancellationToken;
use url::Url; use url::Url;
use zksync_basic_types::{tee_types::TeeType, L1BatchNumber}; use zksync_basic_types::{tee_types::TeeType, L1BatchNumber};
@ -34,7 +34,7 @@ impl ProofFetcher {
/// Get proofs for a batch number with retry logic /// Get proofs for a batch number with retry logic
pub async fn get_proofs( pub async fn get_proofs(
&self, &self,
stop_receiver: &mut watch::Receiver<bool>, token: &CancellationToken,
batch_number: L1BatchNumber, batch_number: L1BatchNumber,
tee_type: &TeeType, tee_type: &TeeType,
) -> Result<Vec<Proof>> { ) -> Result<Vec<Proof>> {
@ -43,8 +43,8 @@ impl ProofFetcher {
let max_backoff = Duration::from_secs(128); let max_backoff = Duration::from_secs(128);
let retry_backoff_multiplier: f32 = 2.0; let retry_backoff_multiplier: f32 = 2.0;
while !*stop_receiver.borrow() { while !token.is_cancelled() {
match self.send_request(&proofs_request, stop_receiver).await { match self.send_request(&proofs_request, token).await {
Ok(response) => { Ok(response) => {
// Parse the response using the ProofResponseParser // Parse the response using the ProofResponseParser
match ProofResponseParser::parse_response(response) { match ProofResponseParser::parse_response(response) {
@ -84,9 +84,7 @@ impl ProofFetcher {
} }
} }
tokio::time::timeout(backoff, stop_receiver.changed()) tokio::time::timeout(backoff, token.cancelled()).await.ok();
.await
.ok();
backoff = std::cmp::min( backoff = std::cmp::min(
Duration::from_millis( Duration::from_millis(
@ -95,13 +93,13 @@ impl ProofFetcher {
max_backoff, max_backoff,
); );
if *stop_receiver.borrow() { if token.is_cancelled() {
break; break;
} }
} }
// If we've reached this point, we've either been stopped or exhausted retries // If we've reached this point, we've either been stopped or exhausted retries
if *stop_receiver.borrow() { if token.is_cancelled() {
// Return empty vector if stopped // Return empty vector if stopped
Ok(vec![]) Ok(vec![])
} else { } else {
@ -114,7 +112,7 @@ impl ProofFetcher {
async fn send_request( async fn send_request(
&self, &self,
request: &GetProofsRequest, request: &GetProofsRequest,
stop_receiver: &mut watch::Receiver<bool>, token: &CancellationToken,
) -> Result<GetProofsResponse> { ) -> Result<GetProofsResponse> {
let retry_helper = RetryHelper::new(self.retry_config.clone()); let retry_helper = RetryHelper::new(self.retry_config.clone());
let request_clone = request.clone(); let request_clone = request.clone();
@ -128,7 +126,7 @@ impl ProofFetcher {
.await; .await;
// Check if we need to abort due to stop signal // Check if we need to abort due to stop signal
if *stop_receiver.borrow() { if token.is_cancelled() {
return Err(Error::Interrupted); return Err(Error::Interrupted);
} }

View file

@ -8,7 +8,7 @@ use crate::{
proof::Proof, proof::Proof,
verification::{AttestationVerifier, PolicyEnforcer, SignatureVerifier, VerificationReporter}, verification::{AttestationVerifier, PolicyEnforcer, SignatureVerifier, VerificationReporter},
}; };
use tokio::sync::watch; use tokio_util::sync::CancellationToken;
use zksync_basic_types::L1BatchNumber; use zksync_basic_types::L1BatchNumber;
/// Result of a batch verification /// Result of a batch verification
@ -40,7 +40,7 @@ impl<C: JsonRpcClient> BatchVerifier<C> {
/// Verify proofs for a batch /// Verify proofs for a batch
pub async fn verify_batch_proofs( pub async fn verify_batch_proofs(
&self, &self,
stop_receiver: &mut watch::Receiver<bool>, token: &CancellationToken,
batch_number: L1BatchNumber, batch_number: L1BatchNumber,
proofs: Vec<Proof>, proofs: Vec<Proof>,
) -> error::Result<BatchVerificationResult> { ) -> error::Result<BatchVerificationResult> {
@ -49,7 +49,7 @@ impl<C: JsonRpcClient> BatchVerifier<C> {
let mut verified_proofs_count: u32 = 0; let mut verified_proofs_count: u32 = 0;
for proof in proofs.into_iter() { for proof in proofs.into_iter() {
if *stop_receiver.borrow() { if token.is_cancelled() {
tracing::warn!("Stop signal received during batch verification"); tracing::warn!("Stop signal received during batch verification");
return Ok(BatchVerificationResult { return Ok(BatchVerificationResult {
total_count: total_proofs_count, total_count: total_proofs_count,