refactor(verify-era-proof-attestation): modularize and restructure proof verification logic

- Split `verify-era-proof-attestation` into modular subcomponents for maintainability.
- Moved client, proof handling, and core types into dedicated modules.
This commit is contained in:
Harald Hoyer 2025-04-02 16:03:01 +02:00
parent 1e853f653a
commit 2605e2ae3a
Signed by: harald
GPG key ID: F519A1143B3FBE32
34 changed files with 2918 additions and 2304 deletions

View file

@ -0,0 +1,118 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2023-2025 Matter Labs
//! Core functionality for processing individual batches
use crate::error;
use tokio::sync::watch;
use zksync_basic_types::L1BatchNumber;
use crate::{
client::{HttpClient, MainNodeClient, RetryConfig},
core::{VerificationResult, VerifierConfig},
proof::ProofFetcher,
verification::{BatchVerifier, VerificationReporter},
};
/// Responsible for processing individual batches
pub struct BatchProcessor {
config: VerifierConfig,
proof_fetcher: ProofFetcher,
batch_verifier: BatchVerifier<MainNodeClient>,
}
impl BatchProcessor {
/// Create a new batch processor with the given configuration
pub fn new(config: VerifierConfig) -> error::Result<Self> {
// Initialize clients and fetchers
let node_client = MainNodeClient::new(config.args.rpc_url.clone(), config.args.chain_id)?;
let http_client = HttpClient::new();
let retry_config = RetryConfig::default();
let proof_fetcher =
ProofFetcher::new(http_client, config.args.rpc_url.clone(), retry_config);
let batch_verifier = BatchVerifier::new(node_client, config.policy.clone());
Ok(Self {
config,
proof_fetcher,
batch_verifier,
})
}
/// Process a single batch and return the verification result
pub async fn process_batch(
&self,
stop_receiver: &mut watch::Receiver<bool>,
batch_number: L1BatchNumber,
) -> error::Result<VerificationResult> {
if *stop_receiver.borrow() {
tracing::info!("Stop signal received, shutting down");
return Ok(VerificationResult::Interrupted);
}
tracing::trace!("Verifying TEE proofs for batch #{}", batch_number.0);
// Fetch proofs for the current batch across different TEE types
let mut proofs = Vec::new();
for tee_type in self.config.args.tee_types.iter() {
match self
.proof_fetcher
.get_proofs(stop_receiver, batch_number, tee_type)
.await
{
Ok(batch_proofs) => proofs.extend(batch_proofs),
Err(error::Error::Interrupted) => return Err(error::Error::Interrupted),
Err(e) => {
tracing::error!(
"Failed to fetch proofs for TEE type {:?} at batch {}: {:#}",
tee_type,
batch_number.0,
e
);
continue;
}
}
}
if proofs.is_empty() {
tracing::warn!("No proofs found for batch #{}", batch_number.0);
return Ok(VerificationResult::NoProofsFound);
}
// Verify proofs for the current batch
let verification_result = self
.batch_verifier
.verify_batch_proofs(stop_receiver, batch_number, proofs)
.await?;
let result = if verification_result.total_count == 0 {
VerificationResult::NoProofsFound
} else if verification_result.verified_count == verification_result.total_count {
VerificationResult::Success
} else if verification_result.verified_count > 0 {
VerificationResult::PartialSuccess {
verified_count: verification_result.verified_count,
unverified_count: verification_result.unverified_count,
}
} else {
VerificationResult::Failure
};
tracing::debug!("Batch #{} verification result: {}", batch_number.0, result);
// Apply rate limiting between batches if needed
if !matches!(result, VerificationResult::Interrupted)
&& self.config.args.rate_limit.as_millis() > 0
{
tokio::time::timeout(self.config.args.rate_limit, stop_receiver.changed())
.await
.ok();
}
Ok(result)
}
/// Log the overall verification results
pub fn log_overall_results(success_count: u32, failure_count: u32) {
VerificationReporter::log_overall_verification_results(success_count, failure_count);
}
}

View file

@ -0,0 +1,95 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2023-2025 Matter Labs
//! Continuous batch processor for ongoing verification of new batches
use tokio::sync::watch;
use zksync_basic_types::L1BatchNumber;
use crate::{
core::{VerificationResult, VerifierConfig},
error,
processor::BatchProcessor,
};
/// Processes batches continuously until stopped
pub struct ContinuousProcessor {
batch_processor: BatchProcessor,
start_batch: L1BatchNumber,
}
impl ContinuousProcessor {
/// Create a new continuous processor that starts from the given batch
pub fn new(config: VerifierConfig, start_batch: L1BatchNumber) -> error::Result<Self> {
let batch_processor = BatchProcessor::new(config)?;
Ok(Self {
batch_processor,
start_batch,
})
}
/// Run the processor until stopped
pub async fn run(
&self,
mut stop_receiver: watch::Receiver<bool>,
) -> error::Result<Vec<(u32, VerificationResult)>> {
tracing::info!(
"Starting continuous verification from batch {}",
self.start_batch.0
);
let mut results = Vec::new();
let mut success_count = 0;
let mut failure_count = 0;
let mut current_batch = self.start_batch.0;
// Continue processing batches until stopped or reaching maximum batch number
while !*stop_receiver.borrow() {
let batch = L1BatchNumber(current_batch);
match self
.batch_processor
.process_batch(&mut stop_receiver, batch)
.await
{
Ok(result) => {
match result {
VerificationResult::Success => success_count += 1,
VerificationResult::PartialSuccess { .. } => success_count += 1,
VerificationResult::Failure => failure_count += 1,
VerificationResult::Interrupted => {
results.push((current_batch, result));
break;
}
VerificationResult::NoProofsFound => {
// In continuous mode, we might hit batches that don't have proofs yet
// Wait a bit longer before retrying
if !*stop_receiver.borrow() {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Don't increment batch number, try again
continue;
}
}
}
results.push((current_batch, result));
}
Err(e) => {
tracing::error!("Error processing batch {}: {}", current_batch, e);
results.push((current_batch, VerificationResult::Failure));
failure_count += 1;
}
}
// Move to the next batch
current_batch = current_batch
.checked_add(1)
.ok_or(error::Error::internal("Maximum batch number reached"))?;
}
// Log overall results
BatchProcessor::log_overall_results(success_count, failure_count);
Ok(results)
}
}

View file

@ -0,0 +1,65 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2023-2025 Matter Labs
//! Processing logic for batch verification
mod batch_processor;
mod continuous_processor;
mod one_shot_processor;
pub use batch_processor::BatchProcessor;
pub use continuous_processor::ContinuousProcessor;
pub use one_shot_processor::OneShotProcessor;
use crate::{
core::{VerificationResult, VerifierConfig, VerifierMode},
error::Result,
};
use tokio::sync::watch;
// Using an enum instead of a trait because async functions in traits can't be used in trait objects
/// Processor variants for different verification modes
pub enum ProcessorType {
/// One-shot processor for processing a specific range of batches
OneShot(OneShotProcessor),
/// Continuous processor for monitoring new batches
Continuous(ContinuousProcessor),
}
impl ProcessorType {
/// Run the processor until completion or interruption
pub async fn run(
&self,
stop_receiver: watch::Receiver<bool>,
) -> Result<Vec<(u32, VerificationResult)>> {
match self {
ProcessorType::OneShot(processor) => processor.run(stop_receiver).await,
ProcessorType::Continuous(processor) => processor.run(stop_receiver).await,
}
}
}
/// Factory for creating the appropriate processor based on configuration
pub struct ProcessorFactory;
impl ProcessorFactory {
/// Create a new processor based on the provided configuration
pub fn create(config: VerifierConfig) -> Result<(ProcessorType, VerifierMode)> {
let mode = if let Some((start, end)) = config.args.batch_range {
let processor = OneShotProcessor::new(config.clone(), start, end)?;
let mode = VerifierMode::OneShot {
start_batch: start,
end_batch: end,
};
(ProcessorType::OneShot(processor), mode)
} else if let Some(start) = config.args.continuous {
let processor = ContinuousProcessor::new(config.clone(), start)?;
let mode = VerifierMode::Continuous { start_batch: start };
(ProcessorType::Continuous(processor), mode)
} else {
unreachable!("Clap ArgGroup should ensure either batch_range or continuous is set")
};
Ok(mode)
}
}

View file

@ -0,0 +1,79 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2023-2025 Matter Labs
//! One-shot batch processor for verifying a single batch or a range of batches
use crate::error;
use tokio::sync::watch;
use zksync_basic_types::L1BatchNumber;
use crate::{
core::{VerificationResult, VerifierConfig},
processor::BatchProcessor,
};
/// Processes a specific range of batches and then exits
pub struct OneShotProcessor {
batch_processor: BatchProcessor,
start_batch: L1BatchNumber,
end_batch: L1BatchNumber,
}
impl OneShotProcessor {
/// Create a new one-shot processor for the given batch range
pub fn new(
config: VerifierConfig,
start_batch: L1BatchNumber,
end_batch: L1BatchNumber,
) -> error::Result<Self> {
let batch_processor = BatchProcessor::new(config)?;
Ok(Self {
batch_processor,
start_batch,
end_batch,
})
}
/// Run the processor until completion or interruption
pub async fn run(
&self,
mut stop_receiver: watch::Receiver<bool>,
) -> error::Result<Vec<(u32, VerificationResult)>> {
tracing::info!(
"Starting one-shot verification of batches {} to {}",
self.start_batch.0,
self.end_batch.0
);
let mut results = Vec::new();
let mut success_count = 0;
let mut failure_count = 0;
for batch_number in self.start_batch.0..=self.end_batch.0 {
let batch = L1BatchNumber(batch_number);
let result = self
.batch_processor
.process_batch(&mut stop_receiver, batch)
.await?;
match result {
VerificationResult::Success => success_count += 1,
VerificationResult::PartialSuccess { .. } => success_count += 1,
VerificationResult::Failure => failure_count += 1,
VerificationResult::Interrupted => {
results.push((batch_number, result));
break;
}
VerificationResult::NoProofsFound => {}
}
results.push((batch_number, result));
}
// Log overall results
BatchProcessor::log_overall_results(success_count, failure_count);
Ok(results)
}
}