Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4961ce0
initial impl with incoming/outgoing message impl
noot Jul 7, 2025
f5365d9
request-response protocol working
noot Jul 7, 2025
565ed95
clippy
noot Jul 7, 2025
a532121
clippy
noot Jul 7, 2025
921724d
merge w main
noot Jul 8, 2025
a548ce4
begin implementation of libp2p node in worker; working on msg handling
noot Jul 8, 2025
d780aae
implement more request handlers
noot Jul 8, 2025
bcaa444
impl hardware challenge, add new p2p to worker cli
noot Jul 8, 2025
0f386af
implement invite request handling, finish cli changes
noot Jul 8, 2025
7bd1009
add full hardware challenge message
noot Jul 9, 2025
d6c1a4a
move messages to their own dir
noot Jul 9, 2025
ea46820
add general request-response protocol
noot Jul 9, 2025
977ab4e
merge
noot Jul 9, 2025
7288261
update SystemState to store libp2p keypair
noot Jul 9, 2025
304f8a8
organize and remove unused deps
noot Jul 9, 2025
46ecca7
add libp2p_port to cli
noot Jul 9, 2025
4358e32
serde for PersistedSystemState
noot Jul 9, 2025
577d843
spawn message handler
noot Jul 9, 2025
4285eaa
add dial channel to p2p node; impl validator libp2p node
noot Jul 9, 2025
7503885
fully implement hardware challenge flow
noot Jul 9, 2025
d32f540
upddate validator main to use libp2p node
noot Jul 9, 2025
56d6b1d
clean up deps
noot Jul 9, 2025
c6183d6
add authorized peer to map
noot Jul 9, 2025
94e9e4d
implement dialing peers
noot Jul 10, 2025
d05ad87
merge
noot Jul 10, 2025
91c0e5b
Merge branch 'noot/libp2p' of https://github.com/PrimeIntellect-ai/pr…
noot Jul 10, 2025
a8af706
use tracing
noot Jul 10, 2025
0952b30
Merge branch 'noot/libp2p' of https://github.com/PrimeIntellect-ai/pr…
noot Jul 10, 2025
15dc2c4
move shared authentication service to shared
noot Jul 10, 2025
0046fac
implement orchestrator p2p service
noot Jul 10, 2025
08a10ec
update orchestrator to use libp2p node
noot Jul 10, 2025
ac923ca
deps cleanup
noot Jul 10, 2025
de3afb1
merge
noot Jul 10, 2025
6a3b04b
Merge branch 'noot/validator-libp2p' into noot/libp2p-unified
noot Jul 10, 2025
f35b001
delete unused code
noot Jul 10, 2025
2475059
no port conflict
noot Jul 10, 2025
73300be
rename messages to be more correct
noot Jul 11, 2025
e135ad4
add logging
noot Jul 11, 2025
f9516c4
basic version of python sdk with provider and node registration
JannikSt Jul 11, 2025
9093134
basic message queue with mock data
JannikSt Jul 11, 2025
ecb5b66
fix tests
noot Jul 11, 2025
0934bf2
merge p2p changes
JannikSt Jul 11, 2025
4798692
remove explicit dialing, messaging now working
noot Jul 11, 2025
f87d5d3
remove println
noot Jul 11, 2025
d77ef04
fix unit tests
noot Jul 11, 2025
25a0d6d
restructure python sdk lib to have pyo bindings in sep. modules
JannikSt Jul 11, 2025
0f907f2
fix async gil issues, add bootstrap cmd to Makefile
JannikSt Jul 11, 2025
291ced4
cleanup message queue setup
JannikSt Jul 11, 2025
75d5fa0
merge latest changes from p2p_setup
JannikSt Jul 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 214 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ members = [
"crates/validator",
"crates/shared",
"crates/orchestrator",
"crates/p2p",
"crates/dev-utils",
"crates/prime-protocol-py",
"crates/prime-core",
]
resolver = "2"

[workspace.dependencies]
shared = { path = "crates/shared" }
prime-core = { path = "crates/prime-core" }
p2p = { path = "crates/p2p" }

actix-web = "4.9.0"
clap = { version = "4.5.27", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
Expand Down Expand Up @@ -39,9 +45,9 @@ mockito = "1.7.0"
iroh = "0.34.1"
rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] }
rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
ipld-core = "0.4"
rust-ipfs = "0.14"
cid = "0.11"
tracing = "0.1.41"

[workspace.package]
version = "0.3.11"
Expand All @@ -55,3 +61,10 @@ manual_let_else = "warn"

[workspace.lints.rust]
unreachable_pub = "warn"

[workspace.metadata.rust-analyzer]
# Help rust-analyzer with proc-macros
procMacro.enable = true
procMacro.attributes.enable = true
# Use a separate target directory for rust-analyzer
targetDir = true
21 changes: 21 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ up:
@# Attach to session
@tmux attach-session -t prime-dev

# Start Docker services and deploy contracts only
.PHONY: bootstrap
bootstrap:
@echo "Starting Docker services and deploying contracts..."
@# Start Docker services
@docker compose up -d reth redis --wait --wait-timeout 180
@# Deploy contracts
@cd smart-contracts && sh deploy.sh && sh deploy_work_validation.sh && cd ..
@# Run setup
@$(MAKE) setup
@echo "Bootstrap complete - Docker services running and contracts deployed"

# Stop development environment
.PHONY: down
down:
Expand Down Expand Up @@ -268,3 +280,12 @@ deregister-worker:
set -a; source ${ENV_FILE}; set +a; \
cargo run --bin worker -- deregister --compute-pool-id $${WORKER_COMPUTE_POOL_ID} --private-key-provider $${PRIVATE_KEY_PROVIDER} --private-key-node $${PRIVATE_KEY_NODE} --rpc-url $${RPC_URL}

# Python Package
.PHONY: python-install
python-install:
@cd crates/prime-protocol-py && make install

.PHONY: python-test
python-test:
@cd crates/prime-protocol-py && make test

9 changes: 3 additions & 6 deletions crates/dev-utils/examples/compute_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,22 @@ async fn main() -> Result<()> {
compute_limit,
)
.await;
println!("Transaction: {:?}", tx);
println!("Transaction: {tx:?}");
let rewards_distributor_address = contracts
.compute_pool
.get_reward_distributor_address(U256::from(0))
.await
.unwrap();

println!(
"Rewards distributor address: {:?}",
rewards_distributor_address
);
println!("Rewards distributor address: {rewards_distributor_address:?}");
let rewards_distributor = RewardsDistributor::new(
rewards_distributor_address,
wallet.provider(),
"rewards_distributor.json",
);
let rate = U256::from(10000000000000000u64);
let tx = rewards_distributor.set_reward_rate(rate).await;
println!("Setting reward rate: {:?}", tx);
println!("Setting reward rate: {tx:?}");

let reward_rate = rewards_distributor.get_reward_rate().await.unwrap();
println!(
Expand Down
2 changes: 1 addition & 1 deletion crates/dev-utils/examples/create_domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ async fn main() -> Result<()> {
.await;
println!("Creating domain: {}", args.domain_name);
println!("Validation logic: {}", args.validation_logic);
println!("Transaction: {:?}", tx);
println!("Transaction: {tx:?}");
Ok(())
}
6 changes: 3 additions & 3 deletions crates/dev-utils/examples/eject_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ async fn main() -> Result<()> {
.compute_registry
.get_node(provider_address, node_address)
.await;
println!("Node info: {:?}", node_info);
println!("Node info: {node_info:?}");

let tx = contracts
.compute_pool
.eject_node(args.pool_id, node_address)
.await;
println!("Ejected node {} from pool {}", args.node, args.pool_id);
println!("Transaction: {:?}", tx);
println!("Transaction: {tx:?}");

let node_info = contracts
.compute_registry
.get_node(provider_address, node_address)
.await;
println!("Post ejection node info: {:?}", node_info);
println!("Post ejection node info: {node_info:?}");

Ok(())
}
5 changes: 1 addition & 4 deletions crates/dev-utils/examples/get_node_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ async fn main() -> Result<()> {
.await
.unwrap();

println!(
"Node Active: {}, Validated: {}, In Pool: {}",
active, validated, is_node_in_pool
);
println!("Node Active: {active}, Validated: {validated}, In Pool: {is_node_in_pool}");
Ok(())
}
2 changes: 1 addition & 1 deletion crates/dev-utils/examples/invalidate_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn main() -> Result<()> {
"Invalidated work in pool {} with penalty {}",
args.pool_id, args.penalty
);
println!("Transaction hash: {:?}", tx);
println!("Transaction hash: {tx:?}");

Ok(())
}
4 changes: 2 additions & 2 deletions crates/dev-utils/examples/mint_ai_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ async fn main() -> Result<()> {
let amount = U256::from(args.amount) * Unit::ETHER.wei();
let tx = contracts.ai_token.mint(address, amount).await;
println!("Minting to address: {}", args.address);
println!("Transaction: {:?}", tx);
println!("Transaction: {tx:?}");

let balance = contracts.ai_token.balance_of(address).await;
println!("Balance: {:?}", balance);
println!("Balance: {balance:?}");
Ok(())
}
4 changes: 2 additions & 2 deletions crates/dev-utils/examples/set_min_stake_amount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ async fn main() -> Result<()> {
.unwrap();

let min_stake_amount = U256::from(args.min_stake_amount) * Unit::ETHER.wei();
println!("Min stake amount: {}", min_stake_amount);
println!("Min stake amount: {min_stake_amount}");

let tx = contracts
.prime_network
.set_stake_minimum(min_stake_amount)
.await;
println!("Transaction: {:?}", tx);
println!("Transaction: {tx:?}");

Ok(())
}
2 changes: 1 addition & 1 deletion crates/dev-utils/examples/start_compute_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ async fn main() -> Result<()> {
.start_compute_pool(U256::from(args.pool_id))
.await;
println!("Started compute pool with id: {}", args.pool_id);
println!("Transaction: {:?}", tx);
println!("Transaction: {tx:?}");
Ok(())
}
2 changes: 1 addition & 1 deletion crates/dev-utils/examples/submit_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn main() -> Result<()> {
"Submitted work for node {} in pool {}",
args.node, args.pool_id
);
println!("Transaction hash: {:?}", tx);
println!("Transaction hash: {tx:?}");

Ok(())
}
14 changes: 7 additions & 7 deletions crates/dev-utils/examples/test_concurrent_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() -> Result<()> {
let wallet = Arc::new(Wallet::new(&args.key, Url::parse(&args.rpc_url)?).unwrap());

let price = wallet.provider.get_gas_price().await?;
println!("Gas price: {:?}", price);
println!("Gas price: {price:?}");

let current_nonce = wallet
.provider
Expand All @@ -50,8 +50,8 @@ async fn main() -> Result<()> {
.block_id(BlockId::Number(BlockNumberOrTag::Pending))
.await?;

println!("Pending nonce: {:?}", pending_nonce);
println!("Current nonce: {:?}", current_nonce);
println!("Pending nonce: {pending_nonce:?}");
println!("Current nonce: {current_nonce:?}");

// Unfortunately have to build all contracts atm
let contracts = Arc::new(
Expand All @@ -67,7 +67,7 @@ async fn main() -> Result<()> {
let address = Address::from_str(&args.address).unwrap();
let amount = U256::from(args.amount) * Unit::ETHER.wei();
let random = (rand::random::<u8>() % 10) + 1;
println!("Random: {:?}", random);
println!("Random: {random:?}");

let contracts_one = contracts.clone();
let wallet_one = wallet.clone();
Expand All @@ -80,7 +80,7 @@ async fn main() -> Result<()> {
let tx = retry_call(mint_call, 5, wallet_one.provider(), None)
.await
.unwrap();
println!("Transaction hash I: {:?}", tx);
println!("Transaction hash I: {tx:?}");
});

let contracts_two = contracts.clone();
Expand All @@ -93,11 +93,11 @@ async fn main() -> Result<()> {
let tx = retry_call(mint_call_two, 5, wallet_two.provider(), None)
.await
.unwrap();
println!("Transaction hash II: {:?}", tx);
println!("Transaction hash II: {tx:?}");
});

let balance = contracts.ai_token.balance_of(address).await.unwrap();
println!("Balance: {:?}", balance);
println!("Balance: {balance:?}");
tokio::time::sleep(tokio::time::Duration::from_secs(40)).await;
Ok(())
}
16 changes: 6 additions & 10 deletions crates/discovery/src/api/routes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,10 @@ mod tests {
assert_eq!(body.data, "Node registered successfully");

let nodes = app_state.node_store.get_nodes().await;
let nodes = match nodes {
Ok(nodes) => nodes,
Err(_) => {
panic!("Error getting nodes");
}
let Ok(nodes) = nodes else {
panic!("Error getting nodes");
};

assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].id, node.id);
assert_eq!(nodes[0].last_updated, None);
Expand Down Expand Up @@ -611,12 +609,10 @@ mod tests {
assert_eq!(body.data, "Node registered successfully");

let nodes = app_state.node_store.get_nodes().await;
let nodes = match nodes {
Ok(nodes) => nodes,
Err(_) => {
panic!("Error getting nodes");
}
let Ok(nodes) = nodes else {
panic!("Error getting nodes");
};

assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].id, node.id);
}
Expand Down
10 changes: 3 additions & 7 deletions crates/discovery/src/chainsync/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async fn sync_single_node(
})?;

let balance = provider.get_balance(node_address).await.map_err(|e| {
error!("Error retrieving balance for node {}: {}", node_address, e);
error!("Error retrieving balance for node {node_address}: {e}");
anyhow::anyhow!("Failed to retrieve node balance")
})?;
n.latest_balance = Some(balance);
Expand All @@ -166,8 +166,7 @@ async fn sync_single_node(
.await
.map_err(|e| {
error!(
"Error retrieving node info for provider {} and node {}: {}",
provider_address, node_address, e
"Error retrieving node info for provider {provider_address} and node {node_address}: {e}"
);
anyhow::anyhow!("Failed to retrieve node info")
})?;
Expand All @@ -177,10 +176,7 @@ async fn sync_single_node(
.get_provider(provider_address)
.await
.map_err(|e| {
error!(
"Error retrieving provider info for {}: {}",
provider_address, e
);
error!("Error retrieving provider info for {provider_address}: {e}");
anyhow::anyhow!("Failed to retrieve provider info")
})?;

Expand Down
4 changes: 2 additions & 2 deletions crates/discovery/src/store/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ impl RedisStore {
_ => panic!("Expected TCP connection"),
};

let redis_url = format!("redis://{}:{}", host, port);
debug!("Starting test Redis server at {}", redis_url);
let redis_url = format!("redis://{host}:{port}");
debug!("Starting test Redis server at {redis_url}");

// Add a small delay to ensure server is ready
thread::sleep(Duration::from_millis(100));
Expand Down
22 changes: 11 additions & 11 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@ edition.workspace = true
workspace = true

[dependencies]
p2p = { workspace = true}
shared = { workspace = true }

actix-web = { workspace = true }
actix-web-prometheus = "0.1.2"
alloy = { workspace = true }
anyhow = { workspace = true }
async-trait = "0.1.88"
base64 = "0.22.1"
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
google-cloud-auth = "0.18.0"
google-cloud-storage = "0.24.0"
hex = { workspace = true }
log = { workspace = true }
prometheus = "0.14.0"
rand = "0.9.0"
redis = { workspace = true, features = ["tokio-comp"] }
redis-test = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
shared = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }

actix-web-prometheus = "0.1.2"
google-cloud-auth = "0.18.0"
google-cloud-storage = "0.24.0"
prometheus = "0.14.0"
rand = "0.9.0"
utoipa = { version = "5.3.0", features = ["actix_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "9.0.2", features = ["actix-web", "debug-embed", "reqwest", "vendored"] }
uuid = { workspace = true }
iroh = { workspace = true }
rand_v8 = { workspace = true }

[dev-dependencies]
mockito = { workspace = true }
Loading
Loading