diff --git a/Cargo.lock b/Cargo.lock index 9d167d8b7..789dca9b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7792,6 +7792,7 @@ dependencies = [ "serde_qs", "serial_test", "sha2", + "snapchain-proto", "solar-ast", "solar-config", "solar-data-structures", @@ -7809,7 +7810,6 @@ dependencies = [ "tokio-stream", "toml 0.8.23", "tonic", - "tonic-build", "tracing", "tracing-subscriber", "tracing-test", @@ -7817,6 +7817,19 @@ dependencies = [ "walkdir", ] +[[package]] +name = "snapchain-proto" +version = "0.11.0" +dependencies = [ + "futures-core", + "hex", + "informalsystems-malachitebft-core-types", + "prost 0.13.5", + "serde", + "tonic", + "tonic-build", +] + [[package]] name = "snow" version = "0.9.6" diff --git a/Cargo.toml b/Cargo.toml index 40ed8f22e..0358c5f5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,7 @@ +[workspace] +members = ["proto"] +resolver = "2" + [package] name = "snapchain" version = "0.11.0" @@ -7,7 +11,9 @@ default-run = "snapchain" [lib] name = "snapchain" path = "src/lib.rs" + [dependencies] +snapchain-proto = { path = "proto" } tokio = { version = "1.40.0", features = ["full"] } tokio-stream = "0.1" serde = { version = "1.0", features = ["derive"] } @@ -98,8 +104,6 @@ solar-macros = "=0.1.1" solar-config = "=0.1.1" nix = { version = "0.29", features = ["resource"] } -[build-dependencies] -tonic-build = "0.9.2" [dev-dependencies] serial_test = "3.1.1" @@ -108,3 +112,4 @@ insta = { version = "1.40", features = ["json"] } [package.metadata.precommit] fmt = "cargo fmt --check --quiet" + diff --git a/Dockerfile b/Dockerfile index 96ca70ca0..f63ba64b7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,7 +30,8 @@ EOF # since the Cargo configuration references files in src. # This means we'll re-fetch all crates every time the source code changes, # which isn't ideal. -COPY Cargo.lock Cargo.toml build.rs ./ +COPY Cargo.lock Cargo.toml ./ +COPY proto ./proto COPY src ./src ENV RUST_BACKTRACE=full @@ -59,7 +60,7 @@ RUN < Result<(), Box> { // TODO: auto-discover proto files builder.compile( &[ - "src/proto/admin_rpc.proto", - "src/proto/blocks.proto", - "src/proto/rpc.proto", - "src/proto/message.proto", - "src/proto/onchain_event.proto", - "src/proto/hub_event.proto", - "src/proto/username_proof.proto", - "src/proto/sync_trie.proto", - "src/proto/node_state.proto", - "src/proto/gossip.proto", - "src/proto/request_response.proto", - "src/proto/replication.proto", + "definitions/admin_rpc.proto", + "definitions/blocks.proto", + "definitions/rpc.proto", + "definitions/message.proto", + "definitions/onchain_event.proto", + "definitions/hub_event.proto", + "definitions/username_proof.proto", + "definitions/sync_trie.proto", + "definitions/node_state.proto", + "definitions/gossip.proto", + "definitions/request_response.proto", + "definitions/replication.proto", ], - &["src/proto"], + &["definitions"], )?; Ok(()) diff --git a/src/proto/admin_rpc.proto b/proto/definitions/admin_rpc.proto similarity index 100% rename from src/proto/admin_rpc.proto rename to proto/definitions/admin_rpc.proto diff --git a/src/proto/blocks.proto b/proto/definitions/blocks.proto similarity index 100% rename from src/proto/blocks.proto rename to proto/definitions/blocks.proto diff --git a/src/proto/gossip.proto b/proto/definitions/gossip.proto similarity index 100% rename from src/proto/gossip.proto rename to proto/definitions/gossip.proto diff --git a/src/proto/hub_event.proto b/proto/definitions/hub_event.proto similarity index 100% rename from src/proto/hub_event.proto rename to proto/definitions/hub_event.proto diff --git a/src/proto/message.proto b/proto/definitions/message.proto similarity index 100% rename from src/proto/message.proto rename to proto/definitions/message.proto diff --git a/src/proto/node_state.proto b/proto/definitions/node_state.proto similarity index 100% rename from src/proto/node_state.proto rename to proto/definitions/node_state.proto diff --git a/src/proto/onchain_event.proto b/proto/definitions/onchain_event.proto similarity index 100% rename from src/proto/onchain_event.proto rename to proto/definitions/onchain_event.proto diff --git a/src/proto/replication.proto b/proto/definitions/replication.proto similarity index 100% rename from src/proto/replication.proto rename to proto/definitions/replication.proto diff --git a/src/proto/request_response.proto b/proto/definitions/request_response.proto similarity index 100% rename from src/proto/request_response.proto rename to proto/definitions/request_response.proto diff --git a/src/proto/rpc.proto b/proto/definitions/rpc.proto similarity index 100% rename from src/proto/rpc.proto rename to proto/definitions/rpc.proto diff --git a/src/proto/sync_trie.proto b/proto/definitions/sync_trie.proto similarity index 100% rename from src/proto/sync_trie.proto rename to proto/definitions/sync_trie.proto diff --git a/src/proto/username_proof.proto b/proto/definitions/username_proof.proto similarity index 100% rename from src/proto/username_proof.proto rename to proto/definitions/username_proof.proto diff --git a/proto/src/lib.rs b/proto/src/lib.rs new file mode 100644 index 000000000..bc03893b9 --- /dev/null +++ b/proto/src/lib.rs @@ -0,0 +1,217 @@ +use core::fmt; + +pub mod proto { + tonic::include_proto!("_"); +} + +pub use proto::*; + +// Basic impls for proto types that don't depend on main crate + +impl informalsystems_malachitebft_core_types::Height for proto::Height { + const ZERO: Self = Self::new(0, 0); + const INITIAL: Self = Self::new(0, 1); + + fn increment(&self) -> Self { + self.increment() + } + + fn as_u64(&self) -> u64 { + self.block_number + } + + fn increment_by(&self, n: u64) -> Self { + self.increment_by(n) + } + + fn decrement_by(&self, n: u64) -> Option { + self.decrement_by(n) + } +} + +impl informalsystems_malachitebft_core_types::Value for proto::ShardHash { + type Id = proto::ShardHash; + + fn id(&self) -> Self::Id { + self.clone() + } +} + +impl proto::Height { + pub const fn new(shard_index: u32, block_number: u64) -> Self { + Self { + shard_index, + block_number, + } + } + + pub const fn as_u64(&self) -> u64 { + self.block_number + } + + pub const fn increment(&self) -> Self { + self.increment_by(1) + } + + pub const fn increment_by(&self, n: u64) -> Self { + Self { + shard_index: self.shard_index, + block_number: self.block_number + n, + } + } + + pub fn decrement(&self) -> Option { + self.block_number.checked_sub(1).map(|block_number| Self { + shard_index: self.shard_index, + block_number, + }) + } + + pub fn decrement_by(&self, n: u64) -> Option { + self.block_number.checked_sub(n).map(|block_number| Self { + shard_index: self.shard_index, + block_number, + }) + } +} + +impl fmt::Display for proto::Height { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[{}] {}", self.shard_index, self.block_number) + } +} + +impl fmt::Display for proto::ShardHash { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[{}] {:?}", self.shard_index, hex::encode(&self.hash)) + } +} + +impl proto::BlockEvent { + pub fn seqnum(&self) -> u64 { + self.data.as_ref().unwrap().seqnum + } + + pub fn block_number(&self) -> u64 { + self.data.as_ref().unwrap().block_number + } + + pub fn block_timestamp(&self) -> u64 { + self.data.as_ref().unwrap().block_timestamp + } + + pub fn event_index(&self) -> u64 { + self.data.as_ref().unwrap().event_index + } +} + +impl proto::Message { + pub fn is_type(&self, message_type: proto::MessageType) -> bool { + self.data.is_some() && self.data.as_ref().unwrap().r#type == message_type as i32 + } + + pub fn fid(&self) -> u64 { + if self.data.is_some() { + self.data.as_ref().unwrap().fid + } else { + 0 + } + } + + pub fn msg_type(&self) -> proto::MessageType { + if self.data.is_some() { + proto::MessageType::try_from(self.data.as_ref().unwrap().r#type) + .unwrap_or(proto::MessageType::None) + } else { + proto::MessageType::None + } + } + + pub fn hex_hash(&self) -> String { + hex::encode(&self.hash) + } +} + +// Make malachite happy. Prost already implements PartialEq, should be safe to mark as Eq. +impl Eq for proto::FullProposal {} + +impl proto::FullProposal { + pub fn shard_id(&self) -> Result { + if let Some(height) = &self.height { + Ok(height.shard_index) + } else { + Err("No height in FullProposal".to_string()) + } + } + + pub fn shard_hash(&self) -> proto::ShardHash { + match &self.proposed_value { + Some(proto::full_proposal::ProposedValue::Block(block)) => proto::ShardHash { + shard_index: self.height().shard_index as u32, + hash: block.hash.clone(), + }, + Some(proto::full_proposal::ProposedValue::Shard(shard_chunk)) => proto::ShardHash { + shard_index: self.height().shard_index as u32, + hash: shard_chunk.hash.clone(), + }, + _ => { + panic!("Invalid proposal type"); + } + } + } + + pub fn block(&self, commits: proto::Commits) -> Option { + match &self.proposed_value { + Some(proto::full_proposal::ProposedValue::Block(block)) => { + let mut block = block.clone(); + block.commits = Some(commits); + Some(block) + } + _ => None, + } + } + + pub fn shard_chunk(&self, commits: proto::Commits) -> Option { + match &self.proposed_value { + Some(proto::full_proposal::ProposedValue::Shard(chunk)) => { + let mut chunk = chunk.clone(); + chunk.commits = Some(commits); + Some(chunk) + } + _ => None, + } + } + + pub fn height(&self) -> proto::Height { + self.height.clone().unwrap() + } + + pub fn round(&self) -> informalsystems_malachitebft_core_types::Round { + informalsystems_malachitebft_core_types::Round::new(self.round.try_into().unwrap()) + } + + pub fn to_sign_bytes(&self) -> Vec { + use prost::Message; + self.encode_to_vec() + } +} + +impl proto::ConsensusMessage { + pub fn shard_id(&self) -> Result { + if let Some(msg) = &self.consensus_message { + match msg { + proto::consensus_message::ConsensusMessage::Vote(vote) => { + if let Some(height) = &vote.height { + return Ok(height.shard_index); + } + } + proto::consensus_message::ConsensusMessage::Proposal(vote) => { + if let Some(height) = &vote.height { + return Ok(height.shard_index); + } + } + } + } + Err("Could not determine shard id for ConsensusMessage".to_string()) + } +} diff --git a/src/consensus/malachite/host.rs b/src/consensus/malachite/host.rs index f72e846cc..70a615d1a 100644 --- a/src/consensus/malachite/host.rs +++ b/src/consensus/malachite/host.rs @@ -1,7 +1,7 @@ //! Implementation of a host actor for bridiging consensus and the application via a set of channels. use crate::consensus::validator::{ProposalSource, ShardValidator}; -use crate::core::types::SnapchainValidatorContext; +use crate::core::types::{CommitsExt, FullProposalExt, SnapchainValidatorContext}; use crate::network::gossip::GossipEvent; use crate::proto::{self, decided_value, full_proposal, Block, Commits, FullProposal, ShardChunk}; use crate::utils::statsd_wrapper::StatsdClientWrapper; diff --git a/src/consensus/malachite/snapchain_codec.rs b/src/consensus/malachite/snapchain_codec.rs index 45637a74f..2b0472d0f 100644 --- a/src/consensus/malachite/snapchain_codec.rs +++ b/src/consensus/malachite/snapchain_codec.rs @@ -1,4 +1,4 @@ -use crate::core::types::{Proposal, Signature, SnapchainValidatorContext, Vote}; +use crate::core::types::{CommitsExt, Proposal, Signature, SnapchainValidatorContext, Vote}; use crate::proto::sync_request::SyncRequest; use crate::proto::{self}; use crate::proto::{consensus_message, ConsensusMessage, FullProposal, StatusMessage}; diff --git a/src/consensus/read_validator.rs b/src/consensus/read_validator.rs index 552f63fb0..d9aa5c7a6 100644 --- a/src/consensus/read_validator.rs +++ b/src/consensus/read_validator.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use super::validator::StoredValidatorSets; use crate::consensus::consensus::SystemMessage; -use crate::core::types::SnapchainValidatorContext; +use crate::core::types::{CommitsExt, SnapchainValidatorContext}; use crate::core::util::{verify_signatures, FarcasterTime}; use crate::proto::{self, DecidedValue, FarcasterNetwork, Height}; use crate::storage::store::block_engine::BlockEngine; diff --git a/src/consensus/validator.rs b/src/consensus/validator.rs index 1f31ddc3b..15c38deb7 100644 --- a/src/consensus/validator.rs +++ b/src/consensus/validator.rs @@ -1,8 +1,8 @@ use super::consensus::ValidatorSetConfig; use crate::consensus::proposer::{BlockProposer, Proposer, ShardProposer}; use crate::core::types::{ - Address, Height, ShardId, SnapchainShard, SnapchainValidator, SnapchainValidatorContext, - SnapchainValidatorSet, + Address, FullProposalExt, Height, ShardId, SnapchainShard, SnapchainValidator, + SnapchainValidatorContext, SnapchainValidatorSet, }; use crate::proto::{full_proposal, Commits, FullProposal, ShardHash}; use crate::storage::store::node_local_state::LocalStateStore; diff --git a/src/core/message.rs b/src/core/message.rs index 1979754e7..f5252994f 100644 --- a/src/core/message.rs +++ b/src/core/message.rs @@ -1,67 +1,20 @@ use crate::core::error::HubError; use crate::core::types::{Shardable, SnapchainValidatorContext}; use crate::proto; -use crate::proto::{consensus_message, ConsensusMessage, HubEvent, MessageType}; +use crate::proto::HubEvent; use crate::storage::store::engine::MessageValidationError; -impl proto::Message { - pub fn is_type(&self, message_type: proto::MessageType) -> bool { - self.data.is_some() && self.data.as_ref().unwrap().r#type == message_type as i32 - } +// impl proto::Message, impl proto::FullProposal::shard_id, impl ConsensusMessage::shard_id +// are now in the snapchain-proto crate - pub fn fid(&self) -> u64 { - if self.data.is_some() { - self.data.as_ref().unwrap().fid - } else { - 0 - } - } - - pub fn msg_type(&self) -> MessageType { - if self.data.is_some() { - MessageType::try_from(self.data.as_ref().unwrap().r#type).unwrap_or(MessageType::None) - } else { - MessageType::None - } - } - - pub fn hex_hash(&self) -> String { - hex::encode(&self.hash) - } -} - -impl proto::FullProposal { - pub fn shard_id(&self) -> Result { - if let Some(height) = &self.height { - Ok(height.shard_index) - } else { - Err("No height in FullProposal".to_string()) - } - } -} - -impl ConsensusMessage { - pub fn shard_id(&self) -> Result { - if let Some(msg) = &self.consensus_message { - match msg { - consensus_message::ConsensusMessage::Vote(vote) => { - if let Some(height) = &vote.height { - return Ok(height.shard_index); - } - } - consensus_message::ConsensusMessage::Proposal(vote) => { - if let Some(height) = &vote.height { - return Ok(height.shard_index); - } - } - } - } - Err("Could not determine shard id for ConsensusMessage".to_string()) - } +// Extension trait for HubEvent that depends on main crate types (HubError, MessageValidationError) +pub trait HubEventExt { + fn new_event(event_type: proto::HubEventType, body: proto::hub_event::Body) -> HubEvent; + fn from_validation_error(err: MessageValidationError, message: &proto::Message) -> HubEvent; } -impl proto::HubEvent { - pub fn from(event_type: proto::HubEventType, body: proto::hub_event::Body) -> Self { +impl HubEventExt for HubEvent { + fn new_event(event_type: proto::HubEventType, body: proto::hub_event::Body) -> HubEvent { proto::HubEvent { r#type: event_type as i32, body: Some(body), @@ -74,12 +27,12 @@ impl proto::HubEvent { } } - pub fn from_validation_error(err: MessageValidationError, message: &proto::Message) -> Self { + fn from_validation_error(err: MessageValidationError, message: &proto::Message) -> HubEvent { let merge_error = match err.clone() { MessageValidationError::StoreError(hub_err) => hub_err, _ => HubError::validation_failure(err.to_string().as_str()), }; - HubEvent::from( + HubEvent::new_event( proto::HubEventType::MergeFailure, proto::hub_event::Body::MergeFailure(proto::MergeFailureBody { message: Some(message.clone()), diff --git a/src/core/mod.rs b/src/core/mod.rs index 2c9f90ec7..e26d4aaab 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,5 +1,9 @@ pub mod error; -mod message; +pub mod message; pub mod types; pub mod util; pub mod validations; + +// Re-export extension traits for convenience +pub use message::HubEventExt; +pub use types::{CommitsExt, FullProposalExt}; diff --git a/src/core/types.rs b/src/core/types.rs index 1061eee1d..6b95ff482 100644 --- a/src/core/types.rs +++ b/src/core/types.rs @@ -15,8 +15,7 @@ use tracing::{error, warn}; pub use crate::proto; // TODO: reconsider how this is imported -use crate::proto::full_proposal::ProposedValue; -use crate::proto::{Block, Commits, FullProposal, ShardChunk}; +use crate::proto::{Commits, FullProposal}; pub use proto::Height; pub use proto::ShardHash; @@ -276,151 +275,18 @@ impl Display for Hash { } } -impl Height { - pub const fn new(shard_index: u32, block_number: u64) -> Self { - Self { - shard_index, - block_number, - } - } - - pub const fn as_u64(&self) -> u64 { - self.block_number - } - - pub const fn increment(&self) -> Self { - self.increment_by(1) - } - - pub const fn increment_by(&self, n: u64) -> Self { - Self { - shard_index: self.shard_index, - block_number: self.block_number + n, - } - } - - pub fn decrement(&self) -> Option { - self.block_number.checked_sub(1).map(|block_number| Self { - shard_index: self.shard_index, - block_number, - }) - } - - pub fn decrement_by(&self, n: u64) -> Option { - self.block_number.checked_sub(n).map(|block_number| Self { - shard_index: self.shard_index, - block_number, - }) - } -} - -impl fmt::Display for Height { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[{}] {}", self.shard_index, self.block_number) - } -} - -impl informalsystems_malachitebft_core_types::Height for Height { - const ZERO: Self = Self::new(0, 0); - const INITIAL: Self = Self::new(0, 1); - - fn increment(&self) -> Self { - self.increment() - } - - fn as_u64(&self) -> u64 { - self.block_number - } - - fn increment_by(&self, n: u64) -> Self { - self.increment_by(n) - } - - fn decrement_by(&self, n: u64) -> Option { - self.decrement_by(n) - } -} - -// #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -// pub struct ShardHash { -// shard_index: u8, -// hash: Hash, -// } - -impl fmt::Display for ShardHash { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[{}] {:?}", self.shard_index, hex::encode(&self.hash)) - } -} - -// impl ShardHash { -// pub fn new(shard_id: u8, hash: Hash) -> Self { -// Self { shard_id, hash } -// } -// } +// Malachite Height and Value trait impls are now in snapchain-proto crate +// Most FullProposal methods are now in snapchain-proto crate -impl informalsystems_malachitebft_core_types::Value for ShardHash { - type Id = ShardHash; - - fn id(&self) -> Self::Id { - self.clone() - } +// Extension trait for FullProposal that depends on main crate types (Address) +pub trait FullProposalExt { + fn proposer_address(&self) -> Address; } -impl FullProposal { - pub fn shard_hash(&self) -> ShardHash { - match &self.proposed_value { - Some(ProposedValue::Block(block)) => ShardHash { - shard_index: self.height().shard_index as u32, - hash: block.hash.clone(), - }, - Some(ProposedValue::Shard(shard_chunk)) => ShardHash { - shard_index: self.height().shard_index as u32, - hash: shard_chunk.hash.clone(), - }, - _ => { - panic!("Invalid proposal type"); - } - } - } - - pub fn block(&self, commits: Commits) -> Option { - match &self.proposed_value { - Some(ProposedValue::Block(block)) => { - let mut block = block.clone(); - block.commits = Some(commits); - Some(block) - } - _ => None, - } - } - - pub fn shard_chunk(&self, commits: Commits) -> Option { - match &self.proposed_value { - Some(ProposedValue::Shard(chunk)) => { - let mut chunk = chunk.clone(); - chunk.commits = Some(commits); - Some(chunk) - } - _ => None, - } - } - - pub fn proposer_address(&self) -> Address { +impl FullProposalExt for FullProposal { + fn proposer_address(&self) -> Address { Address::from_vec(self.proposer.clone()) } - - pub fn height(&self) -> Height { - self.height.clone().unwrap() - } - - pub fn round(&self) -> Round { - Round::new(self.round.try_into().unwrap()) - } - - pub fn to_sign_bytes(&self) -> Vec { - self.encode_to_vec() - } } #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -721,8 +587,7 @@ impl informalsystems_malachitebft_core_types::ProposalPart for Proposal { fn height(&self) -> Height { @@ -828,26 +693,20 @@ impl informalsystems_malachitebft_core_types::Validator u64 { - self.data.as_ref().unwrap().seqnum - } - - pub fn block_number(&self) -> u64 { - self.data.as_ref().unwrap().block_number - } - - pub fn block_timestamp(&self) -> u64 { - self.data.as_ref().unwrap().block_timestamp - } - - pub fn event_index(&self) -> u64 { - self.data.as_ref().unwrap().event_index - } +// Extension trait for Commits that depends on main crate types +pub trait CommitsExt { + fn to_commit_certificate( + &self, + ) -> informalsystems_malachitebft_core_types::CommitCertificate; + fn from_commit_certificate( + certificate: &informalsystems_malachitebft_core_types::CommitCertificate< + SnapchainValidatorContext, + >, + ) -> Commits; } -impl proto::Commits { - pub fn to_commit_certificate( +impl CommitsExt for Commits { + fn to_commit_certificate( &self, ) -> informalsystems_malachitebft_core_types::CommitCertificate { let height = self.height.unwrap(); @@ -871,11 +730,11 @@ impl proto::Commits { } } - pub fn from_commit_certificate( + fn from_commit_certificate( certificate: &informalsystems_malachitebft_core_types::CommitCertificate< SnapchainValidatorContext, >, - ) -> Self { + ) -> Commits { let height = Some(certificate.height.clone()); let round = certificate.round.as_i64(); let value = Some(certificate.value_id.clone()); diff --git a/src/core/util.rs b/src/core/util.rs index f4893eb86..1e1b937ce 100644 --- a/src/core/util.rs +++ b/src/core/util.rs @@ -3,7 +3,7 @@ use libp2p::identity::ed25519::PublicKey; use crate::consensus::validator::StoredValidatorSets; use crate::core::error::HubError; -use crate::core::types::{Vote, FARCASTER_EPOCH}; +use crate::core::types::{CommitsExt, Vote, FARCASTER_EPOCH}; use crate::proto::{self}; use itertools::Itertools; use tracing::error; diff --git a/src/lib.rs b/src/lib.rs index 1b75d686b..0dd11923d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,4 @@ pub mod version; mod tests; -pub mod proto { - tonic::include_proto!("_"); -} +pub use snapchain_proto::proto; diff --git a/src/mempool/mempool.rs b/src/mempool/mempool.rs index 9edc55643..235652a23 100644 --- a/src/mempool/mempool.rs +++ b/src/mempool/mempool.rs @@ -171,8 +171,13 @@ impl MempoolKey { } } -impl proto::Message { - pub fn mempool_key(&self) -> MempoolKey { +// Extension traits for proto types that depend on mempool types +pub trait MessageMempoolExt { + fn mempool_key(&self) -> MempoolKey; +} + +impl MessageMempoolExt for proto::Message { + fn mempool_key(&self) -> MempoolKey { if let Some(data) = &self.data { // TODO: Consider revisiting choice of timestamp here as backdated messages currently are prioritized. return MempoolKey::new( @@ -185,8 +190,12 @@ impl proto::Message { } } -impl proto::ValidatorMessage { - pub fn mempool_key(&self) -> MempoolKey { +pub trait ValidatorMessageMempoolExt { + fn mempool_key(&self) -> MempoolKey; +} + +impl ValidatorMessageMempoolExt for proto::ValidatorMessage { + fn mempool_key(&self) -> MempoolKey { if let Some(onchain_event) = &self.on_chain_event { MempoolKey::new( MempoolMessageKind::ValidatorMessage, diff --git a/src/network/rpc_extensions.rs b/src/network/rpc_extensions.rs index 78f3e65f6..33ba07329 100644 --- a/src/network/rpc_extensions.rs +++ b/src/network/rpc_extensions.rs @@ -77,18 +77,28 @@ fn page_options( } } -impl FidRequest { - pub fn page_options(&self) -> PageOptions { +// Extension traits for RPC request types +pub trait FidRequestExt { + fn page_options(&self) -> PageOptions; +} + +impl FidRequestExt for FidRequest { + fn page_options(&self) -> PageOptions { page_options(self.page_size, self.page_token.clone(), self.reverse) } } -impl FidTimestampRequest { - pub fn page_options(&self) -> PageOptions { +pub trait FidTimestampRequestExt { + fn page_options(&self) -> PageOptions; + fn timestamps(&self) -> (Option, Option); +} + +impl FidTimestampRequestExt for FidTimestampRequest { + fn page_options(&self) -> PageOptions { page_options(self.page_size, self.page_token.clone(), self.reverse) } - pub fn timestamps(&self) -> (Option, Option) { + fn timestamps(&self) -> (Option, Option) { let start_timestamp = match self.start_timestamp { Some(ts) => Some(ts as u32), None => None, @@ -101,20 +111,32 @@ impl FidTimestampRequest { } } -impl CastsByParentRequest { - pub fn page_options(&self) -> PageOptions { +pub trait CastsByParentRequestExt { + fn page_options(&self) -> PageOptions; +} + +impl CastsByParentRequestExt for CastsByParentRequest { + fn page_options(&self) -> PageOptions { page_options(self.page_size, self.page_token.clone(), self.reverse) } } -impl ReactionsByFidRequest { - pub fn page_options(&self) -> PageOptions { +pub trait ReactionsByFidRequestExt { + fn page_options(&self) -> PageOptions; +} + +impl ReactionsByFidRequestExt for ReactionsByFidRequest { + fn page_options(&self) -> PageOptions { page_options(self.page_size, self.page_token.clone(), self.reverse) } } -impl LinksByFidRequest { - pub fn page_options(&self) -> PageOptions { +pub trait LinksByFidRequestExt { + fn page_options(&self) -> PageOptions; +} + +impl LinksByFidRequestExt for LinksByFidRequest { + fn page_options(&self) -> PageOptions { page_options(self.page_size, self.page_token.clone(), self.reverse) } } diff --git a/src/network/server.rs b/src/network/server.rs index bc766bbfd..fadee7393 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -1,4 +1,7 @@ -use super::rpc_extensions::{authenticate_request, AsMessagesResponse, AsSingleMessageResponse}; +use super::rpc_extensions::{ + authenticate_request, AsMessagesResponse, AsSingleMessageResponse, FidRequestExt, + FidTimestampRequestExt, LinksByFidRequestExt, ReactionsByFidRequestExt, +}; use crate::connectors::onchain_events::{Chain, ChainClients}; use crate::core::error::HubError; use crate::core::types::SnapchainValidatorContext; diff --git a/src/network/server_tests.rs b/src/network/server_tests.rs index fe9239ee6..699c61b6f 100644 --- a/src/network/server_tests.rs +++ b/src/network/server_tests.rs @@ -24,7 +24,7 @@ mod tests { }; use crate::proto::{FidRequest, SubscribeRequest}; use crate::storage::db::{RocksDB, RocksDbTransactionBatch}; - use crate::storage::store::account::{HubEventIdGenerator, SEQUENCE_BITS}; + use crate::storage::store::account::{HubEventIdGenerator, HubEventStorageExt, SEQUENCE_BITS}; use crate::storage::store::block_engine::BlockEngine; use crate::storage::store::block_engine_test_helpers::{BlockEngineOptions, Validity}; use crate::storage::store::engine::{Senders, ShardEngine}; @@ -46,15 +46,13 @@ mod tests { const USER_NAME: &str = "user"; const PASSWORD: &str = "password"; - impl FidRequest { - fn for_fid(fid: u64) -> Request { - Request::new(FidRequest { - fid, - page_size: None, - page_token: None, - reverse: None, - }) - } + fn fid_request(fid: u64) -> Request { + Request::new(FidRequest { + fid, + page_size: None, + page_token: None, + reverse: None, + }) } struct MockL1Client {} @@ -1495,7 +1493,7 @@ mod tests { ) = make_server(None).await; let response = service - .get_current_storage_limits_by_fid(FidRequest::for_fid(SHARD1_FID)) + .get_current_storage_limits_by_fid(fid_request(SHARD1_FID)) .await .unwrap(); assert_eq!(response.get_ref().units, 0); @@ -1563,7 +1561,7 @@ mod tests { .await; let response = service - .get_current_storage_limits_by_fid(FidRequest::for_fid(SHARD1_FID)) + .get_current_storage_limits_by_fid(fid_request(SHARD1_FID)) .await .unwrap(); assert_eq!(response.get_ref().units, 3); diff --git a/src/storage/store/account/cast_store_test.rs b/src/storage/store/account/cast_store_test.rs index 6a0f99703..d83a18db8 100644 --- a/src/storage/store/account/cast_store_test.rs +++ b/src/storage/store/account/cast_store_test.rs @@ -6,7 +6,7 @@ mod tests { use crate::proto::{self as message, hub_event, CastType, HubEvent, HubEventType}; use crate::storage::db::{PageOptions, RocksDB, RocksDbTransactionBatch}; use crate::storage::store::account::{ - CastStore, CastStoreDef, Store, StoreEventHandler, StoreOptions, + CastStore, CastStoreDef, HubEventStorageExt, Store, StoreEventHandler, StoreOptions, }; use crate::utils::factory::{messages_factory, time}; use std::sync::Arc; diff --git a/src/storage/store/account/event.rs b/src/storage/store/account/event.rs index 141a6ff1f..57d80d9f9 100644 --- a/src/storage/store/account/event.rs +++ b/src/storage/store/account/event.rs @@ -124,21 +124,42 @@ impl StoreEventHandler { } } -impl HubEvent { - fn make_event_key(event_id: u64) -> Vec { - let mut key = Vec::with_capacity(1 + 8); +// Extension trait for HubEvent storage operations +pub trait HubEventStorageExt { + fn put_event_transaction( + txn: &mut RocksDbTransactionBatch, + event: &HubEvent, + ) -> Result<(), HubError>; + fn get_events( + db: Arc, + start_id: u64, + stop_id: Option, + page_options: Option, + ) -> Result; + fn get_event(db: Arc, event_id: u64) -> Result; + fn prune_events_util( + db: Arc, + stop_height: u64, + page_options: &PageOptions, + throttle: Duration, + ) -> impl std::future::Future> + Send; +} - key.push(RootPrefix::HubEvents as u8); // HubEvents prefix, 1 byte - key.extend_from_slice(&event_id.to_be_bytes()); +fn make_event_key(event_id: u64) -> Vec { + let mut key = Vec::with_capacity(1 + 8); - key - } + key.push(RootPrefix::HubEvents as u8); // HubEvents prefix, 1 byte + key.extend_from_slice(&event_id.to_be_bytes()); + + key +} - pub fn put_event_transaction( +impl HubEventStorageExt for HubEvent { + fn put_event_transaction( txn: &mut RocksDbTransactionBatch, event: &HubEvent, ) -> Result<(), HubError> { - let key = Self::make_event_key(event.id); + let key = make_event_key(event.id); let value = event.encode_to_vec(); txn.put(key, value); @@ -146,16 +167,16 @@ impl HubEvent { Ok(()) } - pub fn get_events( + fn get_events( db: Arc, start_id: u64, stop_id: Option, page_options: Option, ) -> Result { - let start_prefix = Self::make_event_key(start_id); + let start_prefix = make_event_key(start_id); let stop_prefix = match stop_id { - Some(id) => Self::make_event_key(id), - None => increment_vec_u8(&Self::make_event_key(std::u64::MAX)), + Some(id) => make_event_key(id), + None => increment_vec_u8(&make_event_key(std::u64::MAX)), }; let mut events = Vec::new(); @@ -187,8 +208,8 @@ impl HubEvent { }) } - pub fn get_event(db: Arc, event_id: u64) -> Result { - let key = Self::make_event_key(event_id); + fn get_event(db: Arc, event_id: u64) -> Result { + let key = make_event_key(event_id); let buf = db.get(&key)?; if buf.is_none() { return Err(HubError::not_found("Event not found")); @@ -203,15 +224,15 @@ impl HubEvent { } } - pub async fn prune_events_util( + async fn prune_events_util( db: Arc, stop_height: u64, page_options: &PageOptions, throttle: Duration, ) -> Result { let stop_event_id = HubEventIdGenerator::make_event_id_for_block_number(stop_height); - let start_event_key = Self::make_event_key(0); - let stop_event_key = Self::make_event_key(stop_event_id); + let start_event_key = make_event_key(0); + let stop_event_key = make_event_key(stop_event_id); let total_pruned = db .delete_paginated( Some(start_event_key), diff --git a/src/storage/store/account/onchain_event_store.rs b/src/storage/store/account/onchain_event_store.rs index 564960ce9..3a7c981e5 100644 --- a/src/storage/store/account/onchain_event_store.rs +++ b/src/storage/store/account/onchain_event_store.rs @@ -1,5 +1,6 @@ use super::{get_from_db_or_txn, make_fid_key, StoreEventHandler}; use crate::core::error::HubError; +use crate::core::message::HubEventExt; use crate::core::util::FarcasterTime; use crate::proto::{ self, on_chain_event, on_chain_event::Body, FarcasterNetwork, HubEvent, HubEventType, @@ -527,7 +528,7 @@ impl OnchainEventStore { txn: &mut RocksDbTransactionBatch, ) -> Result { merge_onchain_event(&self.db, txn, onchain_event.clone(), &self.store_opts)?; - let hub_event = &mut HubEvent::from( + let hub_event = &mut HubEvent::new_event( HubEventType::MergeOnChainEvent, proto::hub_event::Body::MergeOnChainEventBody(MergeOnChainEventBody { on_chain_event: Some(onchain_event.clone()), diff --git a/src/storage/store/account/store.rs b/src/storage/store/account/store.rs index ed52c6fd5..56def0357 100644 --- a/src/storage/store/account/store.rs +++ b/src/storage/store/account/store.rs @@ -5,6 +5,7 @@ use super::{ message_encode, put_message_transaction, MessagesPage, StoreEventHandler, TS_HASH_LENGTH, }; use crate::core::error::HubError; +use crate::core::message::HubEventExt; use crate::proto::{ hub_event, HubEvent, HubEventType, MergeMessageBody, PruneMessageBody, RevokeMessageBody, }; @@ -211,7 +212,7 @@ pub trait StoreDef: Send + Sync { #[inline] fn revoke_event_args(&self, message: &Message) -> HubEvent { - HubEvent::from( + HubEvent::new_event( HubEventType::RevokeMessage, hub_event::Body::RevokeMessageBody(RevokeMessageBody { message: Some(message.clone()), @@ -221,7 +222,7 @@ pub trait StoreDef: Send + Sync { #[inline] fn merge_event_args(&self, message: &Message, merge_conflicts: Vec) -> HubEvent { - HubEvent::from( + HubEvent::new_event( HubEventType::MergeMessage, hub_event::Body::MergeMessageBody(MergeMessageBody { message: Some(message.clone()), @@ -232,7 +233,7 @@ pub trait StoreDef: Send + Sync { #[inline] fn prune_event_args(&self, message: &Message) -> HubEvent { - HubEvent::from( + HubEvent::new_event( HubEventType::PruneMessage, hub_event::Body::PruneMessageBody(PruneMessageBody { message: Some(message.clone()), diff --git a/src/storage/store/account/user_data_store.rs b/src/storage/store/account/user_data_store.rs index fe14aea17..4ea3e5aad 100644 --- a/src/storage/store/account/user_data_store.rs +++ b/src/storage/store/account/user_data_store.rs @@ -7,6 +7,7 @@ use super::{ store::{Store, StoreDef}, MessagesPage, StoreEventHandler, }; +use crate::core::message::HubEventExt; use crate::{ core::error::HubError, proto::{ @@ -297,7 +298,7 @@ impl UserDataStore { put_username_proof_transaction(txn, username_proof, existing_fid); } - let mut hub_event = HubEvent::from( + let mut hub_event = HubEvent::new_event( HubEventType::MergeUsernameProof, proto::hub_event::Body::MergeUsernameProofBody(MergeUserNameProofBody { username_proof: Some(username_proof.clone()), diff --git a/src/storage/store/account/username_proof_store.rs b/src/storage/store/account/username_proof_store.rs index 6766589a2..ec99f7084 100644 --- a/src/storage/store/account/username_proof_store.rs +++ b/src/storage/store/account/username_proof_store.rs @@ -3,6 +3,7 @@ use super::{ store::{Store, StoreDef}, IntoU8, MessagesPage, StoreEventHandler, TS_HASH_LENGTH, }; +use crate::core::message::HubEventExt; use crate::proto::message_data::Body; use crate::proto::{self, HubEvent, HubEventType, MergeUserNameProofBody, Message, MessageType}; use crate::storage::constants::{RootPrefix, UserPostfix}; @@ -280,7 +281,7 @@ impl StoreDef for UsernameProofStoreDef { _ => None, }; - HubEvent::from( + HubEvent::new_event( HubEventType::MergeUsernameProof, proto::hub_event::Body::MergeUsernameProofBody(MergeUserNameProofBody { username_proof: None, @@ -317,7 +318,7 @@ impl StoreDef for UsernameProofStoreDef { (None, None) }; - HubEvent::from( + HubEvent::new_event( HubEventType::MergeUsernameProof, proto::hub_event::Body::MergeUsernameProofBody(MergeUserNameProofBody { username_proof: username_proof_body, diff --git a/src/storage/store/engine.rs b/src/storage/store/engine.rs index 84534d7ab..2845b9f13 100644 --- a/src/storage/store/engine.rs +++ b/src/storage/store/engine.rs @@ -1,7 +1,8 @@ use super::account::{IntoU8, OnchainEventStorageError, UserDataStore, UsernameProofStore}; use crate::consensus::proposer::ProposalSource; use crate::core::{ - error::HubError, types::Height, util::FarcasterTime, validations, validations::verification, + error::HubError, message::HubEventExt, types::Height, util::FarcasterTime, validations, + validations::verification, }; use crate::mempool::mempool::MempoolMessagesRequest; use crate::proto::message_data::Body; @@ -1592,7 +1593,7 @@ impl ShardEngine { let height = header.height.as_ref().unwrap(); let mut event_counts_by_type = Self::compute_event_counts_by_type(&events); event_counts_by_type.insert(HubEventType::BlockConfirmed as i32, 1); - let mut block_confirmed = HubEvent::from( + let mut block_confirmed = HubEvent::new_event( proto::HubEventType::BlockConfirmed, proto::hub_event::Body::BlockConfirmedBody(proto::BlockConfirmedBody { block_number: height.block_number, diff --git a/src/storage/store/engine_tests.rs b/src/storage/store/engine_tests.rs index 1d820f898..6f703ed11 100644 --- a/src/storage/store/engine_tests.rs +++ b/src/storage/store/engine_tests.rs @@ -9,7 +9,7 @@ mod tests { use crate::proto::{FnameTransfer, ShardChunk, UserNameProof}; use crate::proto::{OnChainEvent, OnChainEventType}; use crate::storage::db::{PageOptions, RocksDbTransactionBatch}; - use crate::storage::store::account::{HubEventIdGenerator, UserDataStore}; + use crate::storage::store::account::{HubEventIdGenerator, HubEventStorageExt, UserDataStore}; use crate::storage::store::engine::{MessageValidationError, ShardEngine}; use crate::storage::store::mempool_poller::MempoolMessage; use crate::storage::store::stores::StoreLimits; diff --git a/src/storage/store/stores.rs b/src/storage/store/stores.rs index 51704e588..5ffdbb56f 100644 --- a/src/storage/store/stores.rs +++ b/src/storage/store/stores.rs @@ -1,6 +1,6 @@ use super::account::{ - EventsPage, ReactionStore, ReactionStoreDef, StorageSlot, UserDataStore, UserDataStoreDef, - VerificationStore, VerificationStoreDef, + EventsPage, HubEventStorageExt, ReactionStore, ReactionStoreDef, StorageSlot, UserDataStore, + UserDataStoreDef, VerificationStore, VerificationStoreDef, }; use crate::core::error::HubError; use crate::core::util::FarcasterTime;