Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ pub struct Storage {
pub master_key_import_path: Option<PathBuf>,
/// Keep a map of "address" to "list of UTXOs" in memory, to speed up getBalance and getUtxoInfo methods
pub utxos_in_memory: bool,
/// Index RAD hashes in memory
pub rad_hashes_index: bool,
/// RocksDB option max_open_files. -1 means unlimited.
pub max_open_files: i32,
}
Expand Down Expand Up @@ -779,6 +781,10 @@ impl Storage {
.utxos_in_memory
.to_owned()
.unwrap_or_else(|| defaults.storage_utxos_in_memory()),
rad_hashes_index: config
.rad_hashes_index
.to_owned()
.unwrap_or_else(|| defaults.storage_rad_hashes_index()),
max_open_files: config
.max_open_files
.unwrap_or_else(|| defaults.storage_max_open_files()),
Expand All @@ -791,6 +797,7 @@ impl Storage {
db_path: Some(self.db_path.clone()),
master_key_import_path: self.master_key_import_path.clone(),
utxos_in_memory: Some(self.utxos_in_memory),
rad_hashes_index: Some(self.rad_hashes_index),
max_open_files: Some(self.max_open_files),
}
}
Expand Down Expand Up @@ -1360,6 +1367,7 @@ mod tests {
db_path: Some(PathBuf::from("other")),
master_key_import_path: None,
utxos_in_memory: None,
rad_hashes_index: None,
max_open_files: None,
};
let config = Storage::from_partial(&partial_config, &Testnet);
Expand Down
7 changes: 6 additions & 1 deletion config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,16 @@ pub trait Defaults {
PathBuf::from(".witnet")
}

/// Do not keep utxos in memory by default
/// Keep utxos in memory by default
fn storage_utxos_in_memory(&self) -> bool {
true
}

/// Do not index RAD hashes in memory by default
fn storage_rad_hashes_index(&self) -> bool {
false
}

/// Unlimited number of open files by default
fn storage_max_open_files(&self) -> i32 {
-1
Expand Down
17 changes: 17 additions & 0 deletions data_structures/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4042,6 +4042,9 @@ pub struct ChainState {
/// Unspent Outputs Pool
#[serde(skip)]
pub unspent_outputs_pool: UnspentOutputsPool,
/// Arrays of (Epoch, DrTxHash) grouped by RadHash
#[serde(skip)]
pub rad_hashes_index: HashMap<Hash, Vec<(Epoch, Hash)>>,
}

impl ChainState {
Expand Down Expand Up @@ -4074,6 +4077,20 @@ impl ChainState {
.consensus_constants
.clone()
}

pub fn get_data_requests_by_rad_hash(&self, rad_hash: &Hash) -> HashMap<Epoch, Vec<Hash>> {
let mut found: HashMap<Epoch, Vec<Hash>> = HashMap::new();
if let Some(entry) = self.rad_hashes_index.get(rad_hash) {
entry.iter().for_each(|(epoch, dr_tx_hash)| {
if let Some(vec) = found.get_mut(epoch) {
vec.push(*dr_tx_hash)
} else {
found.insert(*epoch, vec![*dr_tx_hash]);
}
});
}
found
}
}

/// A boxed and pinned future that resolves to a vector of stuff.
Expand Down
5 changes: 4 additions & 1 deletion data_structures/src/proto/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,12 @@ impl Versioned for crate::transaction::StakeTransactionBody {
impl Versioned for crate::transaction::UnstakeTransactionBody {
type LegacyType = <Self as ProtobufConvert>::ProtoStruct;
}

impl Versioned for crate::chain::DataRequestOutput {
type LegacyType = <Self as ProtobufConvert>::ProtoStruct;
}
impl Versioned for crate::chain::RADRequest {
type LegacyType = <Self as ProtobufConvert>::ProtoStruct;
}

pub trait AutoVersioned: ProtobufConvert {}

Expand All @@ -447,6 +449,7 @@ impl AutoVersioned for crate::transaction::MintTransaction {}
impl AutoVersioned for crate::transaction::StakeTransactionBody {}
impl AutoVersioned for crate::transaction::UnstakeTransactionBody {}
impl AutoVersioned for crate::chain::DataRequestOutput {}
impl AutoVersioned for crate::chain::RADRequest {}

pub trait VersionedHashable {
fn versioned_hash(&self, version: ProtocolVersion) -> Hash;
Expand Down
3 changes: 3 additions & 0 deletions node/src/actors/chain_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ impl ChainManager {
log::debug!("Initial WIT supply: {}", act.initial_supply);
}

// Set whether an in-memory index of data request transactions is required
act.rad_hashes_index = config.storage.rad_hashes_index;

storage_mngr::get_chain_state(storage_keys::chain_state_key(magic))
.into_actor(act)
.then(|chain_state_from_storage, _, _| {
Expand Down
46 changes: 44 additions & 2 deletions node/src/actors/chain_manager/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ use crate::{
GetReputation, GetReputationResult, GetSignalingInfo, GetState, GetSuperBlockVotes,
GetSupplyInfo, GetSupplyInfo2, GetUtxoInfo, IsConfirmedBlock, PeersBeacons,
QueryStakes, QueryStakesOrderByOptions, QueryStakingPowers, ReputationStats, Rewind,
SendLastBeacon, SessionUnitResult, SetLastBeacon, SetPeersLimits, SignalingInfo,
SnapshotExport, SnapshotImport, TryMineBlock, try_do_magic_into_pkh,
SearchDataRequests, SendLastBeacon, SessionUnitResult, SetLastBeacon, SetPeersLimits,
SignalingInfo, SnapshotExport, SnapshotImport, TryMineBlock, try_do_magic_into_pkh,
},
sessions_manager::SessionsManager,
},
Expand Down Expand Up @@ -1528,6 +1528,48 @@ impl Handler<BuildUnstake> for ChainManager {
}
}

impl Handler<SearchDataRequests> for ChainManager {
type Result = <SearchDataRequests as Message>::Result;

fn handle(&mut self, msg: SearchDataRequests, _ctx: &mut Self::Context) -> Self::Result {
let result = self.chain_state.rad_hashes_index.get(&msg.rad_hash);
if let Some(result) = result {
let limit = msg.limit.unwrap_or(u16::MAX) as usize;
let offset = msg.offset.unwrap_or_default();
let reverse = msg.reverse.unwrap_or_default();
let mut since_epoch: i64 = msg.since.unwrap_or_default();
if let Some(current) = self.current_epoch {
since_epoch = i64::from(current).saturating_add(since_epoch);
}
let since_epoch: u32 = since_epoch.try_into().inspect_err(|&e| {
log::warn!("Invalid 'since' limit on SearchDataRequests: {e}");
})?;

Ok(result
.iter()
.filter_map(|tuple| {
if tuple.0 >= since_epoch {
Some(*tuple)
} else {
None
}
})
Comment on lines +1550 to +1556
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may use filter instead of map_filter

Suggested change
.filter_map(|tuple| {
if tuple.0 >= since_epoch {
Some(*tuple)
} else {
None
}
})
.filter(|(epoch, _)| *epoch >= since_epoch)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing to filter makes later collect to fail

.sorted_by(|(epoch_a, _), (epoch_b, _)| {
if reverse {
epoch_b.cmp(epoch_a)
} else {
epoch_a.cmp(epoch_b)
}
})
.skip(offset)
.take(limit)
.collect())
} else {
Ok(vec![])
}
}
}

impl Handler<QueryStakes> for ChainManager {
type Result = <QueryStakes as Message>::Result;

Expand Down
23 changes: 21 additions & 2 deletions node/src/actors/chain_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ pub struct ChainManager {
consensus_constants_wit2: ConsensusConstantsWit2,
/// Initial WIT supply
initial_supply: u64,
/// Populate RAD hashes index
rad_hashes_index: bool,
}

impl ChainManager {
Expand Down Expand Up @@ -938,7 +940,8 @@ impl ChainManager {
..
} => {
let block_epoch = block.block_header.beacon.checkpoint;
let block_hash = block.versioned_hash(get_protocol_version(Some(block_epoch)));
let protocol_version = get_protocol_version(Some(block_epoch));
let block_hash = block.versioned_hash(protocol_version);
let block_signals = block.block_header.signals;
let validator_count = stakes.validator_count();

Expand Down Expand Up @@ -990,7 +993,7 @@ impl ChainManager {

// Check total amount staked to make sure we can activate wit/2
let superblock_period = chain_info.consensus_constants.superblock_period;
if get_protocol_version(Some(block_epoch)) == ProtocolVersion::V1_8
if protocol_version == ProtocolVersion::V1_8
&& get_protocol_version_activation_epoch(ProtocolVersion::V2_0) == Epoch::MAX
&& block_epoch % u32::from(superblock_period) == 0
{
Expand Down Expand Up @@ -1111,6 +1114,22 @@ impl ChainManager {
&HashSet::default(),
);

// Update chain state's RAD hash index, only if set up in config:
if self.rad_hashes_index {
block.txns.data_request_txns
.iter()
.for_each(|dr_tx| {
let dr_tx_hash = dr_tx.versioned_hash(protocol_version);
let rad_hash = dr_tx.body.dr_output.data_request.versioned_hash(protocol_version);
if let Some(vec) = self.chain_state.rad_hashes_index.get_mut(&rad_hash) {
vec.push((block_epoch, dr_tx_hash));
} else {
self.chain_state.rad_hashes_index.insert(rad_hash, vec![(block_epoch, dr_tx_hash)]);
}
log::warn!("Indexing data request {dr_tx_hash:?} with rad_hash {rad_hash:?} on epoch {block_epoch}");
});
}

match self.sm_state {
StateMachine::WaitingConsensus => {
// Persist finished data requests into storage
Expand Down
31 changes: 29 additions & 2 deletions node/src/actors/json_rpc/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use crate::{
GetItemSuperblock, GetItemTransaction, GetKnownPeers, GetMemoryTransaction, GetMempool,
GetNodeStats, GetProtocolInfo, GetReputation, GetSignalingInfo, GetState,
GetSupplyInfo, GetSupplyInfo2, GetUtxoInfo, InitializePeers, IsConfirmedBlock,
MagicEither, QueryStakes, QueryStakingPowers, Rewind, SnapshotExport, SnapshotImport,
StakeAuthorization,
MagicEither, QueryStakes, QueryStakingPowers, Rewind, SearchDataRequests,
SnapshotExport, SnapshotImport, StakeAuthorization,
},
peers_manager::PeersManager,
sessions_manager::SessionsManager,
Expand Down Expand Up @@ -167,6 +167,9 @@ pub fn attach_regular_methods<H>(
server.add_actix_method(system, "getUtxoInfo", move |params: Params| {
Box::pin(get_utxo_info(params.parse()))
});
server.add_actix_method(system, "searchDataRequests", |params: Params| {
Box::pin(search_data_requests(params.parse()))
});
}

/// Attach the sensitive JSON-RPC methods to a multi-transport server.
Expand Down Expand Up @@ -2350,6 +2353,30 @@ pub async fn query_stakes(params: Result<Option<QueryStakes>, Error>) -> JsonRpc
.await
}

/// Query data requests transaction hashes by providing a RAD hash value
pub async fn search_data_requests(
params: Result<Option<SearchDataRequests>, Error>,
) -> JsonRpcResult {
// short-circuit if parameters are wrong
let params = params?;
// parse params or defaults
let msg = params.ok_or(Error::invalid_params("A 'radHash' must be specified"))?;
ChainManager::from_registry()
.send(msg)
.map(|res| match res {
Ok(Ok(result)) => serde_json::to_value(result).map_err(internal_error),
Ok(Err(e)) => {
let err = internal_error_s(e);
Err(err)
}
Err(e) => {
let err = internal_error_s(e);
Err(err)
}
})
.await
}

/// Format of the output of query_powers
#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct QueryStakingPowersRecord {
Expand Down
21 changes: 21 additions & 0 deletions node/src/actors/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,27 @@ where
}
}

/// Message for querying data requests transactions by some specified RAD hash value
#[derive(Clone, Debug, Eq, Hash, PartialEq, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SearchDataRequests {
/// Limits max number of entries to return (default: 0 == u16::MAX)
pub limit: Option<u16>,
/// Skips first found entries (default: 0)
pub offset: Option<usize>,
/// The RAD hash of the data request transactions being searched for:
pub rad_hash: Hash,
/// List fresher data requests first
pub reverse: Option<bool>,
/// Select data requests that have been included on or after the specified
/// absolute epoch, or relative epoch if negative (default: -30240, or one week ago)
pub since: Option<i64>,
}

impl Message for SearchDataRequests {
type Result = Result<Vec<(Epoch, Hash)>, anyhow::Error>;
}

/// Builds a `DataRequestTransaction` from a `DataRequestOutput`
#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDrt {
Expand Down
2 changes: 2 additions & 0 deletions witnet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ reject_sybil_inbounds = true
[storage]
# Path of the folder where RocksDB storage files will be written to.
db_path = ".witnet/storage"
# Enable in-memory index of data requests grouped by RAD hash
rad_hashes_index = false

[jsonrpc]
# Enables or disables the JSON-RPC server altogether. This is needed for using the CLI methods of the node.
Expand Down