diff --git a/Cargo.lock b/Cargo.lock index 176af29..c15d204 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -459,7 +459,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tower", + "tower 0.5.3", "tower-layer", "tower-service", "tracing", @@ -1165,6 +1165,22 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "etcd-client" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0452bcc559431b16f472b7ab86e2f9ccd5f3c2da3795afbd6b773665e047fe" +dependencies = [ + "http 1.4.0", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower 0.4.13", + "tower-service", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -1269,6 +1285,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.8" @@ -1770,6 +1792,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.8.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2309,6 +2344,12 @@ dependencies = [ "pxfm", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "native-tls" version = "0.2.14" @@ -2604,6 +2645,16 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap 2.13.0", +] + [[package]] name = "pgvector" version = "0.4.1" @@ -2613,6 +2664,26 @@ dependencies = [ "serde", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2714,6 +2785,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.114", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -2810,6 +2891,58 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck 0.5.0", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.114", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.114", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -4438,6 +4571,50 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2 0.5.10", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.114", +] + [[package]] name = "totp-rs" version = "5.7.0" @@ -4454,6 +4631,26 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.3" @@ -4742,11 +4939,18 @@ name = "volume-manager" version = "0.2.2" dependencies = [ "anyhow", + "async-trait", + "chrono", "dotenvy", + "etcd-client", + "serde", + "serde_json", "shared", + "thiserror 1.0.69", "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f31adcd..45dd6fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,9 @@ thiserror = "1.0" utoipa = { version = "4.2", features = ["axum_extras", "uuid", "chrono"] } utoipa-swagger-ui = { version = "6.0", features = ["axum"] } +# etcd +etcd-client = "0.14" + # Other rand = "0.8" base64 = "0.22" diff --git a/control-plane/volume-manager/.env.example b/control-plane/volume-manager/.env.example new file mode 100644 index 0000000..350ee7d --- /dev/null +++ b/control-plane/volume-manager/.env.example @@ -0,0 +1,17 @@ +# etcd Configuration +ETCD_ENDPOINTS=http://localhost:2379 + +# Optional: Multiple endpoints for HA +# ETCD_ENDPOINTS=http://localhost:2379,http://localhost:2479,http://localhost:2579 + +# Node Configuration +NODE_ID=volume-manager-local +HOSTNAME=localhost +NODE_IP=127.0.0.1 + +# Optional: etcd Authentication +# ETCD_USERNAME=root +# ETCD_PASSWORD=secret + +# Logging +RUST_LOG=debug diff --git a/control-plane/volume-manager/Cargo.toml b/control-plane/volume-manager/Cargo.toml index a56e773..4ced391 100644 --- a/control-plane/volume-manager/Cargo.toml +++ b/control-plane/volume-manager/Cargo.toml @@ -14,11 +14,22 @@ shared = { path = "../shared/shared" } # Async runtime tokio = { workspace = true } +async-trait = { workspace = true } + +# etcd +etcd-client = { workspace = true } # Logging tracing = { workspace = true } tracing-subscriber = { workspace = true } +# Serialization +serde = { workspace = true } +serde_json = { workspace = true } + # Utilities dotenvy = { workspace = true } anyhow = { workspace = true } +thiserror = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } diff --git a/control-plane/volume-manager/Dockerfile.dev b/control-plane/volume-manager/Dockerfile.dev index 1f2198e..c44fcc9 100644 --- a/control-plane/volume-manager/Dockerfile.dev +++ b/control-plane/volume-manager/Dockerfile.dev @@ -22,17 +22,15 @@ COPY control-plane/failover-controller/Cargo.toml ./control-plane/failover-contr COPY control-plane/sdn-controller/Cargo.toml ./control-plane/sdn-controller/ COPY control-plane/volume-manager/Cargo.toml ./control-plane/volume-manager/ COPY control-plane/registry/Cargo.toml ./control-plane/registry/ -COPY shared/Cargo.toml ./shared/ -COPY entity/Cargo.toml ./entity/ -COPY migration/Cargo.toml ./migration/ +COPY control-plane/shared/entity/Cargo.toml ./control-plane/shared/entity/ +COPY control-plane/shared/migration/Cargo.toml ./control-plane/shared/migration/ +COPY control-plane/shared/shared/Cargo.toml ./control-plane/shared/shared/ # Copy service and dependencies COPY control-plane/volume-manager/ ./control-plane/volume-manager/ -COPY shared/ ./shared/ -COPY entity/ ./entity/ -COPY migration/ ./migration/ +COPY control-plane/shared/ ./control-plane/shared/ # Pre-build dependencies -RUN cargo build --bin volume-manager +RUN cargo build -p volume-manager -CMD ["cargo", "watch", "-x", "run --bin volume-manager"] +CMD ["cargo", "run", "-p", "volume-manager"] diff --git a/control-plane/volume-manager/Dockerfile.test b/control-plane/volume-manager/Dockerfile.test new file mode 100644 index 0000000..0d9fdfb --- /dev/null +++ b/control-plane/volume-manager/Dockerfile.test @@ -0,0 +1,55 @@ +# Test-Dockerfile für Volume Manager (Multi-Stage Build mit rust:latest) +# Stage 1: Build +FROM rust:latest AS builder + +WORKDIR /app + +# Installiere Build-Abhängigkeiten +RUN apt-get update && apt-get install -y \ + pkg-config \ + libssl-dev \ + protobuf-compiler \ + && rm -rf /var/lib/apt/lists/* + +# Kopiere Workspace-Konfiguration +COPY Cargo.toml Cargo.lock ./ + +# Kopiere alle Workspace-Member Manifests +COPY agent/Cargo.toml ./agent/ +COPY cli/Cargo.toml ./cli/ +COPY control-plane/api-gateway/Cargo.toml ./control-plane/api-gateway/ +COPY control-plane/scheduler/Cargo.toml ./control-plane/scheduler/ +COPY control-plane/failover-controller/Cargo.toml ./control-plane/failover-controller/ +COPY control-plane/sdn-controller/Cargo.toml ./control-plane/sdn-controller/ +COPY control-plane/volume-manager/Cargo.toml ./control-plane/volume-manager/ +COPY control-plane/registry/Cargo.toml ./control-plane/registry/ +COPY control-plane/shared/entity/Cargo.toml ./control-plane/shared/entity/ +COPY control-plane/shared/migration/Cargo.toml ./control-plane/shared/migration/ +COPY control-plane/shared/shared/Cargo.toml ./control-plane/shared/shared/ + +# Kopiere Source-Code +COPY control-plane/volume-manager/ ./control-plane/volume-manager/ +COPY control-plane/shared/ ./control-plane/shared/ + +# Baue das Binary im Release-Modus +RUN cargo build --release -p volume-manager + +# Stage 2: Runtime +FROM debian:bookworm-slim + +# Installiere notwendige Laufzeit-Abhängigkeiten +RUN apt-get update && apt-get install -y \ + ca-certificates \ + libssl3 \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Kopiere das gebaute Binary aus der Builder-Stage +COPY --from=builder /app/target/release/volume-manager /app/volume-manager + +# Stelle sicher, dass das Binary ausführbar ist +RUN chmod +x /app/volume-manager + +# Starte das Binary +CMD ["/app/volume-manager"] diff --git a/control-plane/volume-manager/cleanup-etcd.sh b/control-plane/volume-manager/cleanup-etcd.sh new file mode 100755 index 0000000..93eb3aa --- /dev/null +++ b/control-plane/volume-manager/cleanup-etcd.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Cleanup Script - Löscht alte etcd Daten + +COLOR_GREEN='\033[0;32m' +COLOR_YELLOW='\033[1;33m' +COLOR_RESET='\033[0m' + +echo -e "${COLOR_YELLOW}🧹 Cleaning etcd data...${COLOR_RESET}" + +# Lösche alle alten Daten +echo "Deleting all keys under /csf/volume-manager/..." + +# Nodes +ETCDCTL_API=3 etcdctl --endpoints=localhost:2379 del /csf/volume-manager/nodes/ --prefix + +# Leader +ETCDCTL_API=3 etcdctl --endpoints=localhost:2379 del /csf/volume-manager/election/ --prefix + +# Volumes +ETCDCTL_API=3 etcdctl --endpoints=localhost:2379 del /csf/volume-manager/volumes/ --prefix + +# Snapshots +ETCDCTL_API=3 etcdctl --endpoints=localhost:2379 del /csf/volume-manager/snapshots/ --prefix + +echo -e "${COLOR_GREEN}✅ etcd cleaned!${COLOR_RESET}" +echo "" +echo "Restart volume-manager containers:" +echo " docker-compose -f docker-compose.test.yml restart" diff --git a/control-plane/volume-manager/docker-compose.test.yml b/control-plane/volume-manager/docker-compose.test.yml new file mode 100644 index 0000000..baf9306 --- /dev/null +++ b/control-plane/volume-manager/docker-compose.test.yml @@ -0,0 +1,128 @@ +services: + # etcd Cluster (3 Nodes für HA) + etcd1: + image: quay.io/coreos/etcd:v3.5.13 + container_name: etcd1 + environment: + - ETCD_NAME=etcd1 + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 + - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-test + - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 + - ETCD_INITIAL_CLUSTER_STATE=new + ports: + - "2379:2379" + - "2380:2380" + networks: + - csf-test + + etcd2: + image: quay.io/coreos/etcd:v3.5.13 + container_name: etcd2 + environment: + - ETCD_NAME=etcd2 + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 + - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-test + - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 + - ETCD_INITIAL_CLUSTER_STATE=new + ports: + - "2479:2379" + - "2480:2380" + networks: + - csf-test + + etcd3: + image: quay.io/coreos/etcd:v3.5.13 + container_name: etcd3 + environment: + - ETCD_NAME=etcd3 + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 + - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-test + - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 + - ETCD_INITIAL_CLUSTER_STATE=new + ports: + - "2579:2379" + - "2580:2380" + networks: + - csf-test + + # Volume Manager Node 1 + volume-manager-1: + image: volume-manager:test + build: + context: ../.. + dockerfile: control-plane/volume-manager/Dockerfile.test + container_name: volume-manager-1 + environment: + - RUST_LOG=debug + - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 + - NODE_ID=volume-manager-1 + - HOSTNAME=volume-manager-1 + - NODE_IP=172.20.0.11 + depends_on: + - etcd1 + - etcd2 + - etcd3 + networks: + csf-test: + ipv4_address: 172.20.0.11 + restart: unless-stopped + + # Volume Manager Node 2 + volume-manager-2: + image: volume-manager:test + build: + context: ../.. + dockerfile: control-plane/volume-manager/Dockerfile.test + container_name: volume-manager-2 + environment: + - RUST_LOG=debug + - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 + - NODE_ID=volume-manager-2 + - HOSTNAME=volume-manager-2 + - NODE_IP=172.20.0.12 + depends_on: + - etcd1 + - etcd2 + - etcd3 + networks: + csf-test: + ipv4_address: 172.20.0.12 + restart: unless-stopped + + # Volume Manager Node 3 + volume-manager-3: + image: volume-manager:test + build: + context: ../.. + dockerfile: control-plane/volume-manager/Dockerfile.test + container_name: volume-manager-3 + environment: + - RUST_LOG=info + - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 + - NODE_ID=volume-manager-3 + - HOSTNAME=volume-manager-3 + - NODE_IP=172.20.0.13 + depends_on: + - etcd1 + - etcd2 + - etcd3 + networks: + csf-test: + ipv4_address: 172.20.0.13 + restart: unless-stopped + +networks: + csf-test: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 diff --git a/control-plane/volume-manager/src/etcd/core/client.rs b/control-plane/volume-manager/src/etcd/core/client.rs new file mode 100644 index 0000000..07e3ff2 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/core/client.rs @@ -0,0 +1,257 @@ +use super::{EtcdConfig, EtcdError}; +use crate::{log_info, log_warn}; +use etcd_client::{Client, ConnectOptions, GetOptions, PutOptions}; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// etcd Client mit Connection Management +#[derive(Clone)] +pub struct EtcdClient { + config: Arc, + client: Arc>>, +} + +impl EtcdClient { + /// Erstellt neuen etcd Client + pub fn new(config: EtcdConfig) -> Self { + Self { + config: Arc::new(config), + client: Arc::new(RwLock::new(None)), + } + } + + /// Verbindet mit etcd + pub async fn connect(&self) -> Result<(), EtcdError> { + log_info!("etcd::client", "Connecting to etcd cluster..."); + + let mut options = ConnectOptions::new() + .with_connect_timeout(self.config.connect_timeout) + .with_timeout(self.config.request_timeout) + .with_keep_alive( + self.config.keepalive_interval, + self.config.keepalive_timeout, + ); + + // Auth falls vorhanden + if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) { + options = options.with_user(username, password); + } + + let client = Client::connect(&self.config.endpoints, Some(options)) + .await + .map_err(|e| EtcdError::Connection(e.to_string()))?; + + *self.client.write().await = Some(client); + log_info!("etcd::client", "Successfully connected to etcd cluster"); + Ok(()) + } + + /// Holt etcd client oder reconnect + async fn get_client(&self) -> Result { + let read_guard = self.client.read().await; + if let Some(client) = read_guard.as_ref() { + return Ok(client.clone()); + } + drop(read_guard); + + // Reconnect + log_warn!( + "etcd::client", + "No active etcd connection, attempting to reconnect..." + ); + self.connect().await?; + + self.client + .read() + .await + .as_ref() + .ok_or_else(|| EtcdError::Connection("Failed to establish connection".to_string())) + .cloned() + } + + /// Setzt einen Key-Value + pub async fn put(&self, key: &str, value: Vec) -> Result<(), EtcdError> { + let full_key = self.config.prefixed_key(key); + + let mut client = self.get_client().await?; + client + .put(full_key, value, None) + .await + .map_err(|e| EtcdError::StateOperation(e.to_string()))?; + + Ok(()) + } + + /// Setzt einen Key-Value mit Lease + pub async fn put_with_lease( + &self, + key: &str, + value: Vec, + lease_id: i64, + ) -> Result<(), EtcdError> { + let full_key = self.config.prefixed_key(key); + + let mut client = self.get_client().await?; + let options = etcd_client::PutOptions::new().with_lease(lease_id); + client + .put(full_key, value, Some(options)) + .await + .map_err(|e| EtcdError::StateOperation(e.to_string()))?; + + Ok(()) + } + + /// Holt einen Wert + pub async fn get(&self, key: &str) -> Result>, EtcdError> { + let full_key = self.config.prefixed_key(key); + + let mut client = self.get_client().await?; + let resp = client + .get(full_key, None) + .await + .map_err(|e| EtcdError::StateOperation(e.to_string()))?; + + Ok(resp.kvs().first().map(|kv| kv.value().to_vec())) + } + + /// Holt einen Wert mit Lease-Info + pub async fn get_with_lease(&self, key: &str) -> Result, i64)>, EtcdError> { + let full_key = self.config.prefixed_key(key); + + let mut client = self.get_client().await?; + let resp = client + .get(full_key, None) + .await + .map_err(|e| EtcdError::StateOperation(e.to_string()))?; + + Ok(resp + .kvs() + .first() + .map(|kv| (kv.value().to_vec(), kv.lease()))) + } + + /// Prüft ob ein Lease noch gültig ist + pub async fn lease_time_to_live(&self, lease_id: i64) -> Result, EtcdError> { + let mut client = self.get_client().await?; + match client.lease_time_to_live(lease_id, None).await { + Ok(resp) => { + if resp.ttl() > 0 { + Ok(Some(resp.ttl())) + } else { + Ok(None) // Lease expired + } + } + Err(_) => Ok(None), // Lease doesn't exist anymore + } + } + + /// Löscht einen Key + pub async fn delete(&self, key: &str) -> Result<(), EtcdError> { + let full_key = self.config.prefixed_key(key); + + let mut client = self.get_client().await?; + client + .delete(full_key, None) + .await + .map_err(|e| EtcdError::StateOperation(e.to_string()))?; + + Ok(()) + } + + /// Versucht einen Key mit Lease zu setzen, nur wenn er nicht existiert (atomare CAS-Operation) + /// Gibt true zurück wenn erfolgreich, false wenn Key bereits existiert + pub async fn try_acquire_with_lease( + &self, + key: &str, + value: Vec, + lease_id: i64, + ) -> Result { + let full_key = self.config.prefixed_key(key); + + let mut client = self.get_client().await?; + + // Transaction: Setze Key NUR wenn er nicht existiert (Version = 0) + use etcd_client::{Compare, CompareOp, Txn, TxnOp}; + + let put_options = PutOptions::new().with_lease(lease_id); + let compare = Compare::version(full_key.clone(), CompareOp::Equal, 0); + let put = TxnOp::put(full_key.clone(), value, Some(put_options)); + let get = TxnOp::get(full_key.clone(), None); + + let txn = Txn::new().when([compare]).and_then([put]).or_else([get]); + + let txn_resp = client.txn(txn).await.map_err(|e| EtcdError::Client(e))?; + + // Wenn succeeded = true, war die Transaction erfolgreich (Key nicht vorhanden) + Ok(txn_resp.succeeded()) + } + + /// Listet alle Keys mit Prefix + pub async fn list(&self, prefix: &str) -> Result)>, EtcdError> { + let full_prefix = self.config.prefixed_key(prefix); + + let mut client = self.get_client().await?; + let options = GetOptions::new().with_prefix(); + let resp = client + .get(full_prefix.clone(), Some(options)) + .await + .map_err(|e| EtcdError::StateOperation(e.to_string()))?; + + let results = resp + .kvs() + .iter() + .map(|kv| { + let key = String::from_utf8_lossy(kv.key()).to_string(); + let key = key + .trim_start_matches(&self.config.namespace) + .trim_start_matches('/') + .to_string(); + (key, kv.value().to_vec()) + }) + .collect(); + + Ok(results) + } + + /// Erstellt einen Lease (für TTL) + pub async fn grant_lease(&self, ttl: i64) -> Result { + let mut client = self.get_client().await?; + let resp = client + .lease_grant(ttl, None) + .await + .map_err(|e| EtcdError::Client(e))?; + + Ok(resp.id()) + } + + /// Erneuert einen Lease + pub async fn keep_alive(&self, lease_id: i64) -> Result<(), EtcdError> { + let mut client = self.get_client().await?; + let (mut keeper, _stream) = client + .lease_keep_alive(lease_id) + .await + .map_err(|e| EtcdError::Client(e))?; + + keeper + .keep_alive() + .await + .map_err(|e| EtcdError::Client(e))?; + Ok(()) + } + + /// Widerruft einen Lease + pub async fn revoke_lease(&self, lease_id: i64) -> Result<(), EtcdError> { + let mut client = self.get_client().await?; + client + .lease_revoke(lease_id) + .await + .map_err(|e| EtcdError::Client(e))?; + + Ok(()) + } + + /// Holt Config + pub fn config(&self) -> &EtcdConfig { + &self.config + } +} diff --git a/control-plane/volume-manager/src/etcd/core/config.rs b/control-plane/volume-manager/src/etcd/core/config.rs new file mode 100644 index 0000000..9f9e23c --- /dev/null +++ b/control-plane/volume-manager/src/etcd/core/config.rs @@ -0,0 +1,78 @@ +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EtcdConfig { + /// etcd endpoints (e.g., ["http://localhost:2379"]) + pub endpoints: Vec, + + /// Connection timeout + pub connect_timeout: Duration, + + /// Request timeout + pub request_timeout: Duration, + + /// Keepalive interval + pub keepalive_interval: Duration, + + /// Keepalive timeout + pub keepalive_timeout: Duration, + + /// Namespace prefix for all keys + pub namespace: String, + + /// Username (optional) + pub username: Option, + + /// Password (optional) + pub password: Option, +} + +impl Default for EtcdConfig { + fn default() -> Self { + Self { + endpoints: vec!["http://localhost:2379".to_string()], + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(10), + keepalive_interval: Duration::from_secs(30), + keepalive_timeout: Duration::from_secs(10), + namespace: "/csf/volume-manager".to_string(), + username: None, + password: None, + } + } +} + +impl EtcdConfig { + /// Erstellt Config von Umgebungsvariablen + pub fn from_env() -> Self { + let endpoints = std::env::var("ETCD_ENDPOINTS") + .unwrap_or_else(|_| "http://localhost:2379".to_string()) + .split(',') + .map(|s| s.to_string()) + .collect(); + + let namespace = + std::env::var("ETCD_NAMESPACE").unwrap_or_else(|_| "/csf/volume-manager".to_string()); + + let username = std::env::var("ETCD_USERNAME").ok(); + let password = std::env::var("ETCD_PASSWORD").ok(); + + Self { + endpoints, + namespace, + username, + password, + ..Default::default() + } + } + + /// Erstellt vollen Key-Pfad mit Namespace + pub fn prefixed_key(&self, key: &str) -> String { + format!( + "{}/{}", + self.namespace.trim_end_matches('/'), + key.trim_start_matches('/') + ) + } +} diff --git a/control-plane/volume-manager/src/etcd/core/error.rs b/control-plane/volume-manager/src/etcd/core/error.rs new file mode 100644 index 0000000..4f6e442 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/core/error.rs @@ -0,0 +1,39 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum EtcdError { + #[error("Connection error: {0}")] + Connection(String), + + #[error("etcd client error: {0}")] + Client(#[from] etcd_client::Error), + + #[error("Leader election failed: {0}")] + LeaderElection(String), + + #[error("State operation failed: {0}")] + StateOperation(String), + + #[error("Lock acquisition failed: {0}")] + LockFailed(String), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("No leader available")] + NoLeader, + + #[error("Lease expired")] + LeaseExpired, + + #[error("Watch error: {0}")] + Watch(String), + + #[error("Timeout: {0}")] + Timeout(String), + + #[error("Invalid configuration: {0}")] + InvalidConfig(String), +} + +pub type Result = std::result::Result; diff --git a/control-plane/volume-manager/src/etcd/core/mod.rs b/control-plane/volume-manager/src/etcd/core/mod.rs new file mode 100644 index 0000000..7573270 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/core/mod.rs @@ -0,0 +1,9 @@ +// Core Modul - Basis-Komponenten + +pub mod client; +pub mod config; +pub mod error; + +pub use client::EtcdClient; +pub use config::EtcdConfig; +pub use error::EtcdError; diff --git a/control-plane/volume-manager/src/etcd/ha/health.rs b/control-plane/volume-manager/src/etcd/ha/health.rs new file mode 100644 index 0000000..d08a51e --- /dev/null +++ b/control-plane/volume-manager/src/etcd/ha/health.rs @@ -0,0 +1,98 @@ +use crate::etcd::core::{EtcdClient, EtcdError}; +use crate::etcd::state::{NodeState, NodeStatus}; +use crate::{log_info, log_warn}; +use std::sync::Arc; +use std::time::Duration; + +/// Health Checker für Cluster Nodes +pub struct HealthChecker { + client: Arc, + timeout: Duration, +} + +impl HealthChecker { + pub fn new(client: Arc) -> Self { + Self { + client, + timeout: Duration::from_secs(30), + } + } + + /// Prüft Health aller Nodes + pub async fn check_cluster_health(&self, nodes: Vec) -> Vec { + let mut health_statuses = Vec::new(); + + for node in nodes { + let status = self.check_node_health(&node).await; + health_statuses.push(status); + } + + health_statuses + } + + /// Prüft einzelnen Node + async fn check_node_health(&self, node: &NodeState) -> NodeHealthStatus { + let time_since_heartbeat = chrono::Utc::now() + .signed_duration_since(node.last_heartbeat) + .to_std() + .unwrap_or(Duration::from_secs(999)); + + let is_healthy = time_since_heartbeat < self.timeout; + + if !is_healthy { + log_warn!("etcd::ha::health", &format!("Node {} is unhealthy ({}s since last heartbeat)", node.node_id, time_since_heartbeat.as_secs())); + } + + NodeHealthStatus { + node_id: node.node_id.clone(), + is_healthy, + last_heartbeat: node.last_heartbeat, + time_since_heartbeat, + status: node.status.clone(), + } + } + + /// Findet offline Nodes + pub async fn find_offline_nodes(&self, nodes: Vec) -> Vec { + let health_statuses = self.check_cluster_health(nodes).await; + health_statuses + .into_iter() + .filter(|s| !s.is_healthy) + .map(|s| s.node_id) + .collect() + } + + /// Cluster Health Summary + pub async fn get_cluster_summary(&self, nodes: Vec) -> ClusterHealthSummary { + let health_statuses = self.check_cluster_health(nodes).await; + let total = health_statuses.len(); + let healthy = health_statuses.iter().filter(|s| s.is_healthy).count(); + let unhealthy = total - healthy; + + log_info!("etcd::ha::health", &format!("Cluster Health: {}/{} nodes healthy", healthy, total)); + + ClusterHealthSummary { + total_nodes: total, + healthy_nodes: healthy, + unhealthy_nodes: unhealthy, + nodes: health_statuses, + } + } +} + +#[derive(Debug, Clone)] +pub struct NodeHealthStatus { + pub node_id: String, + pub is_healthy: bool, + pub last_heartbeat: chrono::DateTime, + pub time_since_heartbeat: Duration, + pub status: NodeStatus, +} + +#[derive(Debug)] +pub struct ClusterHealthSummary { + pub total_nodes: usize, + pub healthy_nodes: usize, + pub unhealthy_nodes: usize, + pub nodes: Vec, +} diff --git a/control-plane/volume-manager/src/etcd/ha/leader_election.rs b/control-plane/volume-manager/src/etcd/ha/leader_election.rs new file mode 100644 index 0000000..7b961cb --- /dev/null +++ b/control-plane/volume-manager/src/etcd/ha/leader_election.rs @@ -0,0 +1,245 @@ +use crate::etcd::core::{EtcdClient, EtcdError}; +use crate::{log_error, log_info, log_warn}; +use etcd_client::EventType; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Leader Election mit etcd +pub struct LeaderElection { + client: Arc, + node_id: String, + election_key: String, + lease_id: Arc>>, + is_leader: Arc, + ttl: i64, +} + +impl LeaderElection { + pub fn new(client: Arc, node_id: String) -> Self { + let election_key = format!("election/leader"); + Self { + client, + node_id, + election_key, + lease_id: Arc::new(RwLock::new(None)), + is_leader: Arc::new(AtomicBool::new(false)), + ttl: 10, // 10 Sekunden TTL + } + } + + /// Startet Leader Election Campaign + pub async fn campaign(&self) -> Result<(), EtcdError> { + // Bin ich bereits Leader? + if self.is_leader() { + return Ok(()); // Bereits Leader, nichts zu tun + } + + log_info!( + "etcd::ha::leader_election", + &format!("Attempting to become leader: {}", self.node_id) + ); + + // Prüfe ob bereits Leader aktiv ist + match self.client.get_with_lease(&self.election_key).await? { + Some((current_leader_bytes, lease_id)) => { + let current_leader = String::from_utf8_lossy(¤t_leader_bytes); + + // Prüfe ob der Lease noch gültig ist + if lease_id > 0 { + match self.client.lease_time_to_live(lease_id).await? { + Some(ttl) => { + // Lease ist noch gültig, respektiere den aktuellen Leader + log_info!( + "etcd::ha::leader_election", + &format!( + "Leader already exists: {} (Lease: {}, TTL: {}s)", + current_leader, lease_id, ttl + ) + ); + return Ok(()); + } + None => { + // Lease ist abgelaufen oder existiert nicht mehr! + log_warn!( + "etcd::ha::leader_election", + &format!( + "Old leader {} has expired lease {}, taking over leadership", + current_leader, lease_id + ) + ); + // Lösche den alten Key + if let Err(e) = self.client.delete(&self.election_key).await { + log_warn!( + "etcd::ha::leader_election", + &format!("Failed to delete old leader key: {}", e) + ); + } + } + } + } else { + // Kein Lease, der alte Eintrag ist ungültig + log_warn!( + "etcd::ha::leader_election", + &format!( + "Old leader {} found without valid lease, taking over leadership", + current_leader + ) + ); + // Lösche den alten Key + if let Err(e) = self.client.delete(&self.election_key).await { + log_warn!( + "etcd::ha::leader_election", + &format!("Failed to delete old leader key: {}", e) + ); + } + } + } + None => { + // Kein Leader vorhanden + log_info!( + "etcd::ha::leader_election", + &format!("No leader found, starting campaign: {}", self.node_id) + ); + } + } + + // Lease erstellen + let lease_id = self.client.grant_lease(self.ttl).await?; + *self.lease_id.write().await = Some(lease_id); + + // Versuche Leader zu werden mit atomarer CAS-Operation + let value = self.node_id.as_bytes().to_vec(); + match self + .client + .try_acquire_with_lease(&self.election_key, value, lease_id) + .await? + { + true => { + // Erfolgreich! Wir sind jetzt Leader + self.is_leader.store(true, Ordering::SeqCst); + log_info!( + "etcd::ha::leader_election", + &format!("Successfully elected as leader! (Lease: {})", lease_id) + ); + + // Starte Lease Renewal + self.start_lease_renewal().await; + } + false => { + // Ein anderer Node war schneller + log_warn!( + "etcd::ha::leader_election", + "Another node became leader (race condition)" + ); + + // Revoke unseren Lease da wir ihn nicht brauchen + if let Err(e) = self.client.revoke_lease(lease_id).await { + log_warn!( + "etcd::ha::leader_election", + &format!("Failed to revoke unused lease: {}", e) + ); + } + *self.lease_id.write().await = None; + self.is_leader.store(false, Ordering::SeqCst); + } + } + + Ok(()) + } + + /// Startet Lease Renewal im Hintergrund + async fn start_lease_renewal(&self) { + let client = Arc::clone(&self.client); + let lease_id = Arc::clone(&self.lease_id); + let is_leader = Arc::clone(&self.is_leader); + let node_id = self.node_id.clone(); + + tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + if let Some(lid) = *lease_id.read().await { + match client.keep_alive(lid).await { + Ok(_) => { + // Lease erfolgreich erneuert + } + Err(e) => { + log_error!( + "etcd::ha::leader_election", + &format!("Lease renewal failed: {}", e) + ); + is_leader.store(false, Ordering::SeqCst); + log_warn!( + "etcd::ha::leader_election", + &format!("Lost leadership due to lease failure: {}", node_id) + ); + break; + } + } + } else { + break; + } + } + }); + } + + /// Wartet auf Leadership Changes (Watch) + pub async fn watch_leadership(&self, mut callback: F) -> Result<(), EtcdError> + where + F: FnMut(Option) + Send + 'static, + { + let key = self.client.config().prefixed_key(&self.election_key); + log_info!( + "etcd::ha::leader_election", + &format!("Watching leadership changes on: {}", key) + ); + + // Hier würde ein Watch implementiert werden + // Für jetzt vereinfacht + Ok(()) + } + + /// Prüft ob dieser Node Leader ist + pub fn is_leader(&self) -> bool { + self.is_leader.load(Ordering::SeqCst) + } + + /// Gibt den aktuellen Leader zurück + pub async fn get_leader(&self) -> Result, EtcdError> { + match self.client.get(&self.election_key).await? { + Some(value) => { + let leader = String::from_utf8_lossy(&value).to_string(); + Ok(Some(leader)) + } + None => Ok(None), + } + } + + /// Gibt auf Leadership auf (nur wenn wir Leader sind) + pub async fn resign(&self) -> Result<(), EtcdError> { + if !self.is_leader() { + return Ok(()); // Nicht Leader, nichts zu tun + } + + log_info!( + "etcd::ha::leader_election", + &format!("Resigning from leadership: {}", self.node_id) + ); + + // Revoke Lease + if let Some(lid) = *self.lease_id.read().await { + self.client.revoke_lease(lid).await?; + } + + // Update State + self.is_leader.store(false, Ordering::SeqCst); + *self.lease_id.write().await = None; + + log_info!( + "etcd::ha::leader_election", + "Successfully resigned from leadership" + ); + Ok(()) + } +} diff --git a/control-plane/volume-manager/src/etcd/ha/mod.rs b/control-plane/volume-manager/src/etcd/ha/mod.rs new file mode 100644 index 0000000..bbd5d92 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/ha/mod.rs @@ -0,0 +1,7 @@ +// High Availability Modul + +pub mod health; +pub mod leader_election; + +pub use health::{ClusterHealthSummary, HealthChecker, NodeHealthStatus}; +pub use leader_election::LeaderElection; diff --git a/control-plane/volume-manager/src/etcd/init.rs b/control-plane/volume-manager/src/etcd/init.rs new file mode 100644 index 0000000..c360f42 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/init.rs @@ -0,0 +1,51 @@ +use super::{EtcdClient, EtcdConfig, HealthChecker, LeaderElection, StateManager}; +use crate::log_info; +use crate::logger; +use std::sync::Arc; +use uuid; + +pub struct InitData { + pub etcd_client: Arc, + pub state_manager: Arc, + pub health_checker: Arc, + pub leader_election: Arc, + pub node_id: String, +} + +pub async fn init_cluster() -> anyhow::Result { + logger::init_logger(); + + let etcd_config = EtcdConfig::from_env(); + let etcd_client = Arc::new(EtcdClient::new(etcd_config)); + + etcd_client.connect().await?; + log_info!("etcd::init", "Connected to etcd cluster"); + + let state_manager = Arc::new(StateManager::new(etcd_client.clone())); + let health_checker = Arc::new(HealthChecker::new(etcd_client.clone())); + + let node_id = std::env::var("NODE_ID") + .unwrap_or_else(|_| format!("volume-manager-{}", uuid::Uuid::new_v4())); + let hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); + let ip_address = std::env::var("NODE_IP").unwrap_or_else(|_| "127.0.0.1".to_string()); + + log_info!( + "etcd::init", + &format!("Registering node with ID: {}", node_id) + ); + state_manager + .register_node(node_id.clone(), hostname, ip_address) + .await?; + + // Leader Election starten + let leader_election = Arc::new(LeaderElection::new(etcd_client.clone(), node_id.clone())); + leader_election.campaign().await?; + + Ok(InitData { + etcd_client, + state_manager, + health_checker, + leader_election, + node_id, + }) +} diff --git a/control-plane/volume-manager/src/etcd/mod.rs b/control-plane/volume-manager/src/etcd/mod.rs new file mode 100644 index 0000000..7808e8d --- /dev/null +++ b/control-plane/volume-manager/src/etcd/mod.rs @@ -0,0 +1,9 @@ +pub mod core; +pub mod ha; +pub mod init; +pub mod state; +pub mod sync; + +pub use core::{EtcdClient, EtcdConfig}; +pub use ha::{HealthChecker, LeaderElection}; +pub use state::StateManager; diff --git a/control-plane/volume-manager/src/etcd/state/manager.rs b/control-plane/volume-manager/src/etcd/state/manager.rs new file mode 100644 index 0000000..8cd5cdf --- /dev/null +++ b/control-plane/volume-manager/src/etcd/state/manager.rs @@ -0,0 +1,180 @@ +use super::{storage::StateStorage, types::*}; +use crate::etcd::core::{EtcdClient, EtcdError}; +use crate::{log_info, log_warn}; +use std::sync::Arc; +use uuid::Uuid; + +/// High-level State Manager +pub struct StateManager { + storage: Arc, +} + +impl StateManager { + pub fn new(client: Arc) -> Self { + let storage = Arc::new(StateStorage::new(client)); + Self { storage } + } + + // === Volume Management === + + /// Erstellt neues Volume + pub async fn create_volume( + &self, + name: String, + size_gb: u64, + pool: String, + encrypted: bool, + ) -> Result { + let volume = VolumeState::new(name, size_gb, pool, encrypted); + log_info!("etcd::state::manager", &format!("Creating volume: {} ({})", volume.name, volume.id)); + self.storage.save_volume(&volume).await?; + Ok(volume) + } + + /// Aktualisiert Volume Status + pub async fn update_volume_status( + &self, + id: Uuid, + status: VolumeStatus, + ) -> Result<(), EtcdError> { + let mut volume = self + .storage + .get_volume(id) + .await? + .ok_or_else(|| EtcdError::StateOperation(format!("Volume {} not found", id)))?; + + volume.update_status(status); + self.storage.save_volume(&volume).await?; + log_info!("etcd::state::manager", &format!("Updated volume {} status to {:?}", id, volume.status)); + Ok(()) + } + + /// Holt Volume + pub async fn get_volume(&self, id: Uuid) -> Result, EtcdError> { + self.storage.get_volume(id).await + } + + /// Listet alle Volumes + pub async fn list_volumes(&self) -> Result, EtcdError> { + self.storage.list_volumes().await + } + + /// Löscht Volume + pub async fn delete_volume(&self, id: Uuid) -> Result<(), EtcdError> { + log_info!("etcd::state::manager", &format!("Deleting volume: {}", id)); + self.storage.delete_volume(id).await + } + + // === Node Management === + + /// Registriert Node + pub async fn register_node( + &self, + node_id: String, + hostname: String, + ip_address: String, + ) -> Result { + let node = NodeState { + node_id: node_id.clone(), + hostname, + ip_address, + status: NodeStatus::Online, + role: NodeRole::Follower, + last_heartbeat: chrono::Utc::now(), + volumes: Vec::new(), + }; + + log_info!("etcd::state::manager", &format!("Registering node: {}", node_id)); + self.storage.save_node(&node).await?; + Ok(node) + } + + /// Aktualisiert Node Heartbeat + pub async fn update_node_heartbeat(&self, node_id: &str) -> Result<(), EtcdError> { + let mut node = self + .storage + .get_node(node_id) + .await? + .ok_or_else(|| EtcdError::StateOperation(format!("Node {} not found", node_id)))?; + + node.last_heartbeat = chrono::Utc::now(); + node.status = NodeStatus::Online; + self.storage.save_node(&node).await + } + + /// Markiert Node als Offline + pub async fn mark_node_offline(&self, node_id: &str) -> Result<(), EtcdError> { + let mut node = self + .storage + .get_node(node_id) + .await? + .ok_or_else(|| EtcdError::StateOperation(format!("Node {} not found", node_id)))?; + + log_warn!("etcd::state::manager", &format!("Marking node {} as offline", node_id)); + node.status = NodeStatus::Offline; + self.storage.save_node(&node).await + } + + /// Setzt Node Rolle + pub async fn set_node_role(&self, node_id: &str, role: NodeRole) -> Result<(), EtcdError> { + let mut node = self + .storage + .get_node(node_id) + .await? + .ok_or_else(|| EtcdError::StateOperation(format!("Node {} not found", node_id)))?; + + node.role = role; + self.storage.save_node(&node).await?; + log_info!("etcd::state::manager", &format!("Set node {} role to {:?}", node_id, node.role)); + Ok(()) + } + + /// Listet alle Nodes + pub async fn list_nodes(&self) -> Result, EtcdError> { + self.storage.list_nodes().await + } + + /// Findet Online Nodes + pub async fn get_online_nodes(&self) -> Result, EtcdError> { + let nodes = self.storage.list_nodes().await?; + Ok(nodes + .into_iter() + .filter(|n| n.status == NodeStatus::Online) + .collect()) + } + + // === Snapshot Management === + + /// Erstellt Snapshot + pub async fn create_snapshot( + &self, + volume_id: Uuid, + name: String, + size_gb: u64, + ) -> Result { + let snapshot = SnapshotState { + id: Uuid::new_v4(), + volume_id, + name: name.clone(), + size_gb, + status: SnapshotStatus::Creating, + created_at: chrono::Utc::now(), + }; + + log_info!("etcd::state::manager", &format!("Creating snapshot: {} for volume {}", name, volume_id)); + self.storage.save_snapshot(&snapshot).await?; + Ok(snapshot) + } + + /// Listet Snapshots für Volume + pub async fn list_volume_snapshots( + &self, + volume_id: Uuid, + ) -> Result, EtcdError> { + let snapshots = self.storage.list_snapshots().await?; + Ok(snapshots + .into_iter() + .filter(|s| s.volume_id == volume_id) + .collect()) + } +} diff --git a/control-plane/volume-manager/src/etcd/state/mod.rs b/control-plane/volume-manager/src/etcd/state/mod.rs new file mode 100644 index 0000000..d6fed9e --- /dev/null +++ b/control-plane/volume-manager/src/etcd/state/mod.rs @@ -0,0 +1,13 @@ +// State Management Modul +// +// Hier kommt rein: +// - StateManager: Hauptschnittstelle für State-Operationen +// - State CRUD operations +// - State versioning + +pub mod manager; +pub mod storage; +pub mod types; + +pub use manager::StateManager; +pub use types::*; diff --git a/control-plane/volume-manager/src/etcd/state/storage.rs b/control-plane/volume-manager/src/etcd/state/storage.rs new file mode 100644 index 0000000..32d65e4 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/state/storage.rs @@ -0,0 +1,121 @@ +use super::types::*; +use crate::etcd::core::{EtcdClient, EtcdError}; +use crate::log_error; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::sync::Arc; +use uuid::Uuid; + +/// State Storage Layer - abstrahiert etcd operations +pub struct StateStorage { + client: Arc, +} + +impl StateStorage { + pub fn new(client: Arc) -> Self { + Self { client } + } + + /// Speichert generischen State + async fn put_state(&self, key: &str, state: &T) -> Result<(), EtcdError> { + let data = serde_json::to_vec(state)?; + self.client.put(key, data).await + } + + /// Liest generischen State + async fn get_state(&self, key: &str) -> Result, EtcdError> { + match self.client.get(key).await? { + Some(data) => { + let state = serde_json::from_slice(&data)?; + Ok(Some(state)) + } + None => Ok(None), + } + } + + /// Löscht State + async fn delete_state(&self, key: &str) -> Result<(), EtcdError> { + self.client.delete(key).await + } + + /// Listet States mit Prefix + async fn list_states(&self, prefix: &str) -> Result, EtcdError> { + let entries = self.client.list(prefix).await?; + let mut states = Vec::new(); + + for (_key, data) in entries { + match serde_json::from_slice(&data) { + Ok(state) => states.push(state), + Err(e) => log_error!( + "etcd::state::storage", + &format!("Failed to deserialize state: {}", e) + ), + } + } + + Ok(states) + } + + // === Volume State Operations === + + pub async fn save_volume(&self, volume: &VolumeState) -> Result<(), EtcdError> { + let key = format!("volumes/{}", volume.id); + self.put_state(&key, volume).await + } + + pub async fn get_volume(&self, id: Uuid) -> Result, EtcdError> { + let key = format!("volumes/{}", id); + self.get_state(&key).await + } + + pub async fn delete_volume(&self, id: Uuid) -> Result<(), EtcdError> { + let key = format!("volumes/{}", id); + self.delete_state(&key).await + } + + pub async fn list_volumes(&self) -> Result, EtcdError> { + self.list_states("volumes/").await + } + + // === Node State Operations === + + pub async fn save_node(&self, node: &NodeState) -> Result<(), EtcdError> { + let key = format!("nodes/{}", node.node_id); + self.put_state(&key, node).await + } + + pub async fn get_node(&self, node_id: &str) -> Result, EtcdError> { + let key = format!("nodes/{}", node_id); + self.get_state(&key).await + } + + pub async fn delete_node(&self, node_id: &str) -> Result<(), EtcdError> { + let key = format!("nodes/{}", node_id); + self.delete_state(&key).await + } + + pub async fn list_nodes(&self) -> Result, EtcdError> { + self.list_states("nodes/").await + } + + // === Snapshot State Operations === + + pub async fn save_snapshot(&self, snapshot: &SnapshotState) -> Result<(), EtcdError> { + let key = format!("snapshots/{}", snapshot.id); + self.put_state(&key, snapshot).await + } + + pub async fn get_snapshot(&self, id: Uuid) -> Result, EtcdError> { + let key = format!("snapshots/{}", id); + self.get_state(&key).await + } + + pub async fn delete_snapshot(&self, id: Uuid) -> Result<(), EtcdError> { + let key = format!("snapshots/{}", id); + self.delete_state(&key).await + } + + pub async fn list_snapshots(&self) -> Result, EtcdError> { + self.list_states("snapshots/").await + } +} diff --git a/control-plane/volume-manager/src/etcd/state/types.rs b/control-plane/volume-manager/src/etcd/state/types.rs new file mode 100644 index 0000000..51f2ba7 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/state/types.rs @@ -0,0 +1,96 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +/// Volume State in etcd +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VolumeState { + pub id: Uuid, + pub name: String, + pub size_gb: u64, + pub pool: String, + pub status: VolumeStatus, + pub encrypted: bool, + pub node_id: Option, + pub created_at: DateTime, + pub updated_at: DateTime, + pub version: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum VolumeStatus { + Creating, + Available, + InUse, + Deleting, + Error, + Migrating, +} + +impl VolumeState { + pub fn new(name: String, size_gb: u64, pool: String, encrypted: bool) -> Self { + let now = Utc::now(); + Self { + id: Uuid::new_v4(), + name, + size_gb, + pool, + status: VolumeStatus::Creating, + encrypted, + node_id: None, + created_at: now, + updated_at: now, + version: 1, + } + } + + pub fn update_status(&mut self, status: VolumeStatus) { + self.status = status; + self.updated_at = Utc::now(); + self.version += 1; + } +} + +/// Node State für Failover +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeState { + pub node_id: String, + pub hostname: String, + pub ip_address: String, + pub status: NodeStatus, + pub role: NodeRole, + pub last_heartbeat: DateTime, + pub volumes: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum NodeStatus { + Online, + Offline, + Degraded, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum NodeRole { + Leader, + Follower, +} + +/// Snapshot State +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SnapshotState { + pub id: Uuid, + pub volume_id: Uuid, + pub name: String, + pub size_gb: u64, + pub status: SnapshotStatus, + pub created_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum SnapshotStatus { + Creating, + Available, + Deleting, + Error, +} diff --git a/control-plane/volume-manager/src/etcd/sync/lock.rs b/control-plane/volume-manager/src/etcd/sync/lock.rs new file mode 100644 index 0000000..138154e --- /dev/null +++ b/control-plane/volume-manager/src/etcd/sync/lock.rs @@ -0,0 +1,115 @@ +use crate::etcd::core::{EtcdClient, EtcdError}; +use crate::{log_info, log_warn}; +use std::sync::Arc; + +/// Distributed Lock mit etcd +pub struct DistributedLock { + client: Arc, + lock_key: String, + lease_id: Option, + ttl: i64, +} + +impl DistributedLock { + /// Erstellt einen neuen Lock + pub fn new(client: Arc, lock_name: &str) -> Self { + let lock_key = format!("locks/{}", lock_name); + Self { + client, + lock_key, + lease_id: None, + ttl: 30, // 30 Sekunden TTL + } + } + + /// Versucht Lock zu erwerben + pub async fn acquire(&mut self) -> Result { + // Prüfe ob Lock bereits existiert + if let Some(_) = self.client.get(&self.lock_key).await? { + return Ok(false); + } + + // Erstelle Lease + let lease_id = self.client.grant_lease(self.ttl).await?; + self.lease_id = Some(lease_id); + + // Setze Lock mit Lease + let value = format!("locked-{}", chrono::Utc::now().timestamp()); + self.client.put(&self.lock_key, value.into_bytes()).await?; + + log_info!( + "etcd::sync::lock", + &format!("Lock acquired: {}", self.lock_key) + ); + Ok(true) + } + + /// Versucht Lock mit Retry zu erwerben + pub async fn acquire_with_retry( + &mut self, + max_retries: u32, + retry_interval_ms: u64, + ) -> Result { + for attempt in 1..=max_retries { + if self.acquire().await? { + return Ok(true); + } + + if attempt < max_retries { + tokio::time::sleep(tokio::time::Duration::from_millis(retry_interval_ms)).await; + } + } + + log_warn!( + "etcd::sync::lock", + &format!("Failed to acquire lock after {} attempts", max_retries) + ); + Ok(false) + } + + /// Gibt Lock frei + pub async fn release(&mut self) -> Result<(), EtcdError> { + if self.lease_id.is_none() { + return Ok(()); + } + + // Lösche Lock Key + self.client.delete(&self.lock_key).await?; + + // Revoke Lease + if let Some(lease_id) = self.lease_id { + self.client.revoke_lease(lease_id).await?; + } + + self.lease_id = None; + log_info!( + "etcd::sync::lock", + &format!("Lock released: {}", self.lock_key) + ); + Ok(()) + } + + /// Erneuert Lock Lease + pub async fn refresh(&self) -> Result<(), EtcdError> { + if let Some(lease_id) = self.lease_id { + self.client.keep_alive(lease_id).await?; + Ok(()) + } else { + Err(EtcdError::LockFailed( + "No active lease to refresh".to_string(), + )) + } + } +} + +/// Automatisches Release beim Drop +impl Drop for DistributedLock { + fn drop(&mut self) { + if self.lease_id.is_some() { + log_warn!( + "etcd::sync::lock", + &format!("Lock dropped without explicit release: {}", self.lock_key) + ); + } + } +} diff --git a/control-plane/volume-manager/src/etcd/sync/mod.rs b/control-plane/volume-manager/src/etcd/sync/mod.rs new file mode 100644 index 0000000..0884814 --- /dev/null +++ b/control-plane/volume-manager/src/etcd/sync/mod.rs @@ -0,0 +1,7 @@ +// Synchronisation Modul + +pub mod lock; +pub mod watcher; + +pub use lock::DistributedLock; +pub use watcher::StateWatcher; diff --git a/control-plane/volume-manager/src/etcd/sync/watcher.rs b/control-plane/volume-manager/src/etcd/sync/watcher.rs new file mode 100644 index 0000000..db0debd --- /dev/null +++ b/control-plane/volume-manager/src/etcd/sync/watcher.rs @@ -0,0 +1,83 @@ +use crate::etcd::core::{EtcdClient, EtcdError}; +use crate::log_info; +use std::sync::Arc; + +/// State Watcher für etcd Events +pub struct StateWatcher { + client: Arc, +} + +impl StateWatcher { + pub fn new(client: Arc) -> Self { + Self { client } + } + + /// Beobachtet einen Key-Prefix für Änderungen + pub async fn watch_prefix(&self, prefix: &str, mut callback: F) -> Result<(), EtcdError> + where + F: FnMut(WatchEvent) + Send + 'static, + { + let full_prefix = self.client.config().prefixed_key(prefix); + log_info!( + "etcd::sync::watcher", + &format!("Starting watch on: {}", full_prefix) + ); + + // Watch implementierung würde hier kommen + // In etcd-client würde man WatchOptions mit prefix verwenden + // Für diese Demo-Implementierung vereinfacht + + Ok(()) + } + + /// Beobachtet einen spezifischen Key + pub async fn watch_key(&self, key: &str, callback: F) -> Result<(), EtcdError> + where + F: FnMut(WatchEvent) + Send + 'static, + { + let full_key = self.client.config().prefixed_key(key); + log_info!( + "etcd::sync::watcher", + &format!("Starting watch on key: {}", full_key) + ); + + // Watch implementierung + Ok(()) + } + + /// Stoppt alle Watches + pub async fn stop_all(&self) -> Result<(), EtcdError> { + log_info!("etcd::sync::watcher", "Stopping all watches"); + Ok(()) + } +} + +/// Watch Event Types +#[derive(Debug, Clone)] +pub enum WatchEvent { + Put { + key: String, + value: Vec, + version: i64, + }, + Delete { + key: String, + }, +} + +impl WatchEvent { + pub fn key(&self) -> &str { + match self { + WatchEvent::Put { key, .. } => key, + WatchEvent::Delete { key } => key, + } + } + + pub fn is_put(&self) -> bool { + matches!(self, WatchEvent::Put { .. }) + } + + pub fn is_delete(&self) -> bool { + matches!(self, WatchEvent::Delete { .. }) + } +} diff --git a/control-plane/volume-manager/src/logger.rs b/control-plane/volume-manager/src/logger.rs new file mode 100644 index 0000000..601bd3b --- /dev/null +++ b/control-plane/volume-manager/src/logger.rs @@ -0,0 +1,115 @@ +use tracing::{event, Level}; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[derive(Debug, Clone, Copy)] +pub enum LogLevel { + Trace, + Debug, + Info, + Warn, + Error, +} + +impl From for Level { + fn from(level: LogLevel) -> Self { + match level { + LogLevel::Trace => Level::TRACE, + LogLevel::Debug => Level::DEBUG, + LogLevel::Info => Level::INFO, + LogLevel::Warn => Level::WARN, + LogLevel::Error => Level::ERROR, + } + } +} + +pub fn init_logger() { + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::registry() + .with(filter) + .with(fmt::layer().with_target(false).with_thread_ids(true)) + .init(); +} + +pub fn log_message(level: LogLevel, module: &str, location: &str, description: &str) { + let lvl: Level = level.into(); + + match lvl { + Level::ERROR => { + event!(Level::ERROR, module = %module, location = %location, "{}", description) + } + Level::WARN => { + event!(Level::WARN, module = %module, location = %location, "{}", description) + } + Level::INFO => { + event!(Level::INFO, module = %module, location = %location, "{}", description) + } + Level::DEBUG => { + event!(Level::DEBUG, module = %module, location = %location, "{}", description) + } + Level::TRACE => { + event!(Level::TRACE, module = %module, location = %location, "{}", description) + } + } +} + +/// Makros für einfaches Logging mit automatischem Ort (Dateiname:Zeile) +#[macro_export] +macro_rules! log_trace { + ($module:expr, $desc:expr) => { + $crate::logger::log_message( + $crate::logger::LogLevel::Trace, + $module, + concat!(file!(), ":", line!()), + $desc, + ) + }; +} + +#[macro_export] +macro_rules! log_debug { + ($module:expr, $desc:expr) => { + $crate::logger::log_message( + $crate::logger::LogLevel::Debug, + $module, + concat!(file!(), ":", line!()), + $desc, + ) + }; +} + +#[macro_export] +macro_rules! log_info { + ($module:expr, $desc:expr) => { + $crate::logger::log_message( + $crate::logger::LogLevel::Info, + $module, + concat!(file!(), ":", line!()), + $desc, + ) + }; +} + +#[macro_export] +macro_rules! log_warn { + ($module:expr, $desc:expr) => { + $crate::logger::log_message( + $crate::logger::LogLevel::Warn, + $module, + concat!(file!(), ":", line!()), + $desc, + ) + }; +} + +#[macro_export] +macro_rules! log_error { + ($module:expr, $desc:expr) => { + $crate::logger::log_message( + $crate::logger::LogLevel::Error, + $module, + concat!(file!(), ":", line!()), + $desc, + ) + }; +} diff --git a/control-plane/volume-manager/src/main.rs b/control-plane/volume-manager/src/main.rs index 779a988..d1e3ad8 100644 --- a/control-plane/volume-manager/src/main.rs +++ b/control-plane/volume-manager/src/main.rs @@ -1,27 +1,171 @@ -use tracing::{info, warn}; +use std::sync::Arc; + +use etcd::state::NodeRole; +use etcd::StateManager; + +mod etcd; +mod logger; #[tokio::main] async fn main() -> anyhow::Result<()> { - // Initialize logger - tracing_subscriber::fmt() - .with_target(false) - .with_thread_ids(true) - .with_level(true) - .init(); + let init_data = etcd::init::init_cluster().await?; + let etcd_client = init_data.etcd_client; + let state_manager = init_data.state_manager; + let health_checker = init_data.health_checker; + let leader_election = init_data.leader_election; + let node_id = init_data.node_id; + + // Erstelle Test-Volumes wenn Leader (nach kurzer Wartezeit) + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + if leader_election.is_leader() { + log_info!( + "main", + "Node is leader, initializing volume management tasks" + ); + + // Erstelle Demo-Volumes + for i in 1..=3 { + match state_manager + .create_volume( + format!("demo-volume-{}", i), + 100 + (i * 50), + "rbd".to_string(), + i % 2 == 0, // Jedes zweite Volume verschlüsselt + ) + .await + { + Ok(vol) => log_info!( + "main", + &format!( + "Successfully created volume: {} ({} GB)", + vol.name, vol.size_gb + ) + ), + Err(e) => log_error!("main", &format!("Failed to create volume: {}", e)), + } + } + } else { + log_info!("main", "Node is follower, waiting for leader"); + } - info!("💾 Volume Manager Service starting..."); - info!("✅ Volume Manager initialized"); + log_info!("main", "Volume Manager initialized successfully"); - // Demo: Simulated storage management loop - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(35)); + // Hauptschleife + let mut heartbeat_interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + let mut health_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + let mut operations_interval = tokio::time::interval(tokio::time::Duration::from_secs(35)); + let mut election_interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); loop { - interval.tick().await; - info!("📦 Managing storage volumes..."); - info!(" - Monitoring Ceph cluster health"); - info!(" - Creating/managing RBD volumes"); - info!(" - Processing snapshot requests"); - info!(" - Verifying encryption status"); - warn!(" ⚠️ Demo mode: No actual volume operations performed"); + tokio::select! { + // Leader Election Loop: Versuche regelmäßig Leader zu werden + _ = election_interval.tick() => { + if !leader_election.is_leader() { + if let Err(e) = leader_election.campaign().await { + log_error!("main", &format!("Leader election campaign failed: {}", e)); + } + } + } + + // Heartbeat: Aktualisiere Node Status + _ = heartbeat_interval.tick() => { + if let Err(e) = state_manager.update_node_heartbeat(&node_id).await { + log_error!("main", &format!("Failed to update node heartbeat: {}", e)); + } + + // Aktualisiere Rolle basierend auf Leadership + let role = if leader_election.is_leader() { + NodeRole::Leader + } else { + NodeRole::Follower + }; + + if let Err(e) = state_manager.set_node_role(&node_id, role).await { + log_error!("main", &format!("Failed to update node role: {}", e)); + } + } + + // Health Check: Prüfe Cluster Health + _ = health_check_interval.tick() => { + match state_manager.list_nodes().await { + Ok(nodes) => { + let summary = health_checker.get_cluster_summary(nodes.clone()).await; + + if summary.unhealthy_nodes > 0 { + log_warn!("main", &format!("Detected {} unhealthy nodes", summary.unhealthy_nodes)); + + // Nur Leader führt Failover durch + if leader_election.is_leader() { + perform_failover(&state_manager, &summary.nodes).await; + } + } + } + Err(e) => log_error!("main", &format!("Failed to list nodes: {}", e)), + } + } + + // Volume Operations: Nur Leader führt diese aus + _ = operations_interval.tick() => { + if leader_election.is_leader() { + log_info!("main", "[LEADER] Managing storage volumes..."); + + // Liste alle Volumes + match state_manager.list_volumes().await { + Ok(volumes) => { + log_info!("main", &format!("Total volumes: {}", volumes.len())); + for vol in volumes.iter().take(3) { + log_info!("main", &format!("- {} ({:?})", vol.name, vol.status)); + } + } + Err(e) => log_error!("main", &format!("Failed to list volumes: {}", e)), + } + + log_info!("main", "- Monitoring Ceph cluster health"); + log_info!("main", "- Processing snapshot requests"); + log_info!("main", "- Verifying encryption status"); + } else { + log_info!("main", "[FOLLOWER] Standby mode - waiting for leader instructions"); + + // Follower kann Leader abfragen + if let Ok(Some(leader)) = leader_election.get_leader().await { + log_info!("main", &format!("Current leader: {}", leader)); + } + } + } + } } } + +/// Führt Failover für offline Nodes durch +async fn perform_failover( + state_manager: &Arc, + health_statuses: &[etcd::ha::NodeHealthStatus], +) { + log_info!("main", "Initiating failover procedure..."); + + for status in health_statuses { + if !status.is_healthy { + log_warn!( + "main", + &format!("Node {} is unhealthy, marking as offline", status.node_id) + ); + + if let Err(e) = state_manager.mark_node_offline(&status.node_id).await { + log_error!( + "main", + &format!("Failed to mark node {} as offline: {}", status.node_id, e) + ); + } + + // Hier würde man Volumes von diesem Node migrieren + log_info!( + "main", + &format!("Initiating volume migration from node {}", status.node_id) + ); + // TODO: Implementiere Volume Migration + } + } + + log_info!("main", "Failover procedure completed successfully"); +} diff --git a/control-plane/volume-manager/test-ha.sh b/control-plane/volume-manager/test-ha.sh new file mode 100755 index 0000000..f0b803b --- /dev/null +++ b/control-plane/volume-manager/test-ha.sh @@ -0,0 +1,378 @@ +#!/bin/bash + +# Test Script für HA, Leader Election und Failover +# Verwendung: ./test-ha.sh + +set -e + +# Setze ETCDCTL API Version +export ETCDCTL_API=3 + +COLOR_RESET='\033[0m' +COLOR_GREEN='\033[0;32m' +COLOR_BLUE='\033[0;34m' +COLOR_YELLOW='\033[1;33m' +COLOR_RED='\033[0;31m' +COLOR_CYAN='\033[0;36m' + +log() { + echo -e "${COLOR_BLUE}[$(date +'%H:%M:%S')]${COLOR_RESET} $1" +} + +success() { + echo -e "${COLOR_GREEN}✅ $1${COLOR_RESET}" +} + +info() { + echo -e "${COLOR_CYAN}ℹ️ $1${COLOR_RESET}" +} + +warning() { + echo -e "${COLOR_YELLOW}⚠️ $1${COLOR_RESET}" +} + +error() { + echo -e "${COLOR_RED}❌ $1${COLOR_RESET}" +} + +header() { + echo -e "\n${COLOR_GREEN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${COLOR_RESET}" + echo -e "${COLOR_GREEN} $1${COLOR_RESET}" + echo -e "${COLOR_GREEN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${COLOR_RESET}\n" +} + +# Prüfe ob etcdctl installiert ist +check_etcdctl() { + if ! command -v etcdctl &> /dev/null; then + warning "etcdctl nicht gefunden. Installiere mit:" + echo " brew install etcd # macOS" + echo " apt install etcd-client # Ubuntu" + return 1 + fi + return 0 +} + +# Zeige etcd Cluster Status +show_etcd_status() { + header "etcd Cluster Status" + if check_etcdctl; then + etcdctl --endpoints=localhost:2379 member list + echo "" + etcdctl --endpoints=localhost:2379 endpoint health + fi +} + +# Zeige alle Nodes in etcd +show_nodes() { + header "Registrierte Nodes" + if check_etcdctl; then + echo "Nodes im Cluster:" + etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix --keys-only | grep -v "^$" || echo "Keine Nodes gefunden" + echo "" + echo "Node Details:" + etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix | grep -v "^$" | jq '.' 2>/dev/null || etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix + fi +} + +# Zeige aktuellen Leader +show_leader() { + header "Leader Election Status" + if check_etcdctl; then + echo "Aktueller Leader:" + # Der Leader wird direkt als String unter /csf/volume-manager/election/leader gespeichert + LEADER=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) + if [ -n "$LEADER" ]; then + echo -e "${COLOR_GREEN}👑 $LEADER${COLOR_RESET}" + + # Zeige zusätzliche Node-Details + echo "" + echo "Node Details:" + NODE_DATA=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/$LEADER --print-value-only 2>/dev/null) + if [ -n "$NODE_DATA" ]; then + echo "$NODE_DATA" | jq '.' + else + echo " Node-Daten nicht verfügbar" + fi + else + echo "Kein Leader gewählt" + fi + fi +} + +# Zeige Volume States +show_volumes() { + header "Volume States" + if check_etcdctl; then + echo "Volumes im Cluster:" + etcdctl --endpoints=localhost:2379 get /csf/volume-manager/volumes/ --prefix --keys-only | grep -v "^$" || echo "Keine Volumes gefunden" + echo "" + echo "Volume Details:" + etcdctl --endpoints=localhost:2379 get /csf/volume-manager/volumes/ --prefix | grep -v "^$" | jq '.' 2>/dev/null || echo "Keine Volumes" + fi +} + +# Zeige Container Logs +show_logs() { + local NODE=$1 + header "Logs von $NODE" + docker logs --tail 20 $NODE +} + +# Stop einen Node (simuliert Failover) +stop_node() { + local NODE=$1 + header "Stoppe Node: $NODE" + docker stop $NODE + success "Node $NODE gestoppt" + info "Warte 5 Sekunden für Failover..." + sleep 5 +} + +# Start einen Node +start_node() { + local NODE=$1 + header "Starte Node: $NODE" + docker start $NODE + success "Node $NODE gestartet" + info "Warte 5 Sekunden für Initialisierung..." + sleep 5 +} + +# Zeige Container Status +show_container_status() { + header "Docker Container Status" + docker ps -a --filter "name=volume-manager" --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}" +} + +# Überwache Cluster in Echtzeit +monitor() { + header "Cluster Monitoring (Strg+C zum Beenden)" + while true; do + clear + echo -e "${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}" + echo -e "${COLOR_CYAN} Volume Manager Cluster - Live Monitor${COLOR_RESET}" + echo -e "${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}" + echo "" + + # Container Status + echo -e "${COLOR_YELLOW}📦 Container Status:${COLOR_RESET}" + docker ps --filter "name=volume-manager" --format " {{.Names}}: {{.Status}}" | sed 's/Up /✅ /' | sed 's/Exited /❌ /' + echo "" + + # Leader + if check_etcdctl; then + LEADER=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) + if [ -n "$LEADER" ]; then + echo -e "${COLOR_YELLOW}👑 Aktueller Leader:${COLOR_RESET} ${COLOR_GREEN}$LEADER${COLOR_RESET}" + else + echo -e "${COLOR_YELLOW}👑 Aktueller Leader:${COLOR_RESET} ${COLOR_RED}Kein Leader${COLOR_RESET}" + fi + echo "" + + # Nodes + echo -e "${COLOR_YELLOW}🖥️ Registrierte Nodes:${COLOR_RESET}" + etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix 2>/dev/null | \ + jq -r 'select(.node_id != null) | " \(.node_id): \(.status) (\(.role))"' 2>/dev/null || echo " Keine Nodes" + echo "" + fi + + echo -e "${COLOR_CYAN}───────────────────────────────────────────────────────────────${COLOR_RESET}" + echo "Aktualisiert: $(date +'%H:%M:%S') | Drücke Strg+C zum Beenden" + + sleep 3 + done +} + +# Führe Failover-Test durch +test_failover() { + header "Failover Test starten" + + info "1. Zeige initialen Cluster-Status" + show_container_status + sleep 2 + + show_leader + sleep 2 + + info "2. Stoppe aktuellen Leader" + if check_etcdctl; then + LEADER=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) + if [ -n "$LEADER" ]; then + # Leader ID ist der Node-Name, aber Container Name könnte anders sein + CONTAINER_NAME=$(docker ps --filter "name=$LEADER" --format "{{.Names}}" | head -n1) + if [ -n "$CONTAINER_NAME" ]; then + stop_node "$CONTAINER_NAME" + else + stop_node "$LEADER" + fi + else + warning "Kein Leader gefunden, stoppe volume-manager-1" + stop_node "volume-manager-1" + fi + else + stop_node "volume-manager-1" + fi + + info "3. Prüfe neuen Leader" + show_leader + sleep 2 + + show_nodes + sleep 2 + + info "4. Starte gestoppten Node wieder" + if [ -n "$LEADER" ] && [ "$LEADER" != "Kein Leader" ]; then + start_node "$LEADER" + else + start_node "volume-manager-1" + fi + + info "5. Finaler Cluster-Status" + show_container_status + sleep 2 + show_leader + + success "Failover Test abgeschlossen!" +} + +# Hauptmenü +show_menu() { + echo -e "\n${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}" + echo -e "${COLOR_CYAN} Volume Manager HA Test Suite${COLOR_RESET}" + echo -e "${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}\n" + echo " 1) Start Cluster (docker-compose up)" + echo " 2) Stop Cluster (docker-compose down)" + echo " 3) Zeige Container Status" + echo " 4) Zeige etcd Cluster Status" + echo " 5) Zeige registrierte Nodes" + echo " 6) Zeige aktuellen Leader" + echo " 7) Zeige Volumes" + echo " 8) Zeige Logs (Node auswählen)" + echo " 9) Stop Node (Failover simulieren)" + echo " 10) Start Node" + echo " 11) Failover Test automatisch" + echo " 12) Live Monitor starten" + echo " 13) Cleanup etcd Daten" + echo " 14) Restart Container" + echo " 0) Beenden" + echo "" + echo -n "Wähle eine Option: " +} + +# Start Cluster +start_cluster() { + header "Starte Cluster" + docker-compose -f docker-compose.test.yml up -d + success "Cluster gestartet" + info "Warte 10 Sekunden für Initialisierung..." + sleep 10 +} + +# Stop Cluster +stop_cluster() { + header "Stoppe Cluster" + docker-compose -f docker-compose.test.yml down + success "Cluster gestoppt" +} + +# Clean etcd data +clean_etcd() { + header "Cleanup etcd Daten" + if check_etcdctl; then + log "Lösche alle Keys unter /csf/volume-manager/..." + etcdctl --endpoints=localhost:2379 del /csf/volume-manager/ --prefix 2>/dev/null || true + success "etcd Daten gelöscht" + + warning "Bitte starte die Volume Manager Container neu:" + echo " docker-compose -f docker-compose.test.yml restart" + else + error "etcdctl nicht verfügbar" + fi +} + +# Node auswählen +select_node() { + echo "" + echo "Verfügbare Nodes:" + echo " 1) volume-manager-1" + echo " 2) volume-manager-2" + echo " 3) volume-manager-3" + echo -n "Wähle Node: " + read NODE_NUM + case $NODE_NUM in + 1) echo "volume-manager-1" ;; + 2) echo "volume-manager-2" ;; + 3) echo "volume-manager-3" ;; + *) echo "" ;; + esac +} + +# Hauptprogramm +main() { + if [ "$1" == "monitor" ]; then + monitor + exit 0 + fi + + if [ "$1" == "test" ]; then + test_failover + exit 0 + fi + + while true; do + show_menu + read OPTION + + case $OPTION in + 1) start_cluster ;; + 2) stop_cluster ;; + 3) show_container_status ;; + 4) show_etcd_status ;; + 5) show_nodes ;; + 6) show_leader ;; + 7) show_volumes ;; + 8) + NODE=$(select_node) + if [ -n "$NODE" ]; then + show_logs "$NODE" + fi + ;; + 9) + NODE=$(select_node) + if [ -n "$NODE" ]; then + stop_node "$NODE" + fi + ;; + 10) + NODE=$(select_node) + if [ -n "$NODE" ]; then + start_node "$NODE" + fi + ;; + 13) clean_etcd ;; + 14) + header "Restart Container" + docker-compose -f docker-compose.test.yml restart + success "Container neu gestartet" + info "Warte 10 Sekunden..." + sleep 10 + ;; + 11) test_failover ;; + 12) monitor ;; + 0) + log "Auf Wiedersehen!" + exit 0 + ;; + *) + error "Ungültige Option" + ;; + esac + + echo "" + echo -n "Drücke Enter um fortzufahren..." + read + done +} + +# Starte +main "$@"