diff --git a/Cargo.lock b/Cargo.lock index c15d204..67d9f61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4943,6 +4943,7 @@ dependencies = [ "chrono", "dotenvy", "etcd-client", + "reqwest", "serde", "serde_json", "shared", diff --git a/control-plane/volume-manager/.gitignore b/control-plane/volume-manager/.gitignore new file mode 100644 index 0000000..6946efa --- /dev/null +++ b/control-plane/volume-manager/.gitignore @@ -0,0 +1,5 @@ +# Ceph configuration (auto-generated by init-ceph-config.sh) +ceph-config/ + +# Test artifacts +*.log diff --git a/control-plane/volume-manager/Cargo.toml b/control-plane/volume-manager/Cargo.toml index 4ced391..0cd9402 100644 --- a/control-plane/volume-manager/Cargo.toml +++ b/control-plane/volume-manager/Cargo.toml @@ -27,6 +27,9 @@ tracing-subscriber = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +# HTTP Client for Patroni +reqwest = { version = "0.11", features = ["json"] } + # Utilities dotenvy = { workspace = true } anyhow = { workspace = true } diff --git a/control-plane/volume-manager/Dockerfile.test b/control-plane/volume-manager/Dockerfile.test index 0d9fdfb..21aff8b 100644 --- a/control-plane/volume-manager/Dockerfile.test +++ b/control-plane/volume-manager/Dockerfile.test @@ -1,6 +1,6 @@ -# Test-Dockerfile für Volume Manager (Multi-Stage Build mit rust:latest) +# Test-Dockerfile für Volume Manager # Stage 1: Build -FROM rust:latest AS builder +FROM --platform=linux/arm64 rust:bookworm AS builder WORKDIR /app @@ -35,12 +35,13 @@ COPY control-plane/shared/ ./control-plane/shared/ RUN cargo build --release -p volume-manager # Stage 2: Runtime -FROM debian:bookworm-slim +FROM --platform=linux/arm64 debian:bookworm-slim # Installiere notwendige Laufzeit-Abhängigkeiten RUN apt-get update && apt-get install -y \ ca-certificates \ libssl3 \ + curl \ && rm -rf /var/lib/apt/lists/* WORKDIR /app @@ -53,3 +54,4 @@ RUN chmod +x /app/volume-manager # Starte das Binary CMD ["/app/volume-manager"] + diff --git a/control-plane/volume-manager/docker-compose.dev.yml b/control-plane/volume-manager/docker-compose.dev.yml new file mode 100644 index 0000000..5b40d3a --- /dev/null +++ b/control-plane/volume-manager/docker-compose.dev.yml @@ -0,0 +1,611 @@ +services: + # ======================================== + # CEPH CLUSTER (3 MONs + 3 OSDs + 3 MGRs) + # ======================================== + + ceph-mon1: + image: ceph/daemon:latest-pacific + container_name: ceph-mon1 + hostname: ceph-mon1 + environment: + - CEPH_DAEMON=mon + - MON_IP=172.20.0.21 + - CEPH_PUBLIC_NETWORK=172.20.0.0/16 + - CLUSTER=ceph + - CEPH_CLUSTER_NETWORK=172.20.0.0/16 + - DEMO_DAEMONS=mon + - NETWORK_AUTO_DETECT=4 + - CEPH_AUTH_REQUIRE_SIGNATURES=false + volumes: + - ceph-mon1-data:/var/lib/ceph + - ./ceph-config/ceph.conf:/etc/ceph/ceph.conf:ro + networks: + csf-test: + ipv4_address: 172.20.0.21 + cap_add: + - ALL + privileged: true + healthcheck: + test: ["CMD", "ceph", "health"] + interval: 15s + timeout: 10s + retries: 10 + start_period: 90s + restart: unless-stopped + + ceph-mon2: + image: ceph/daemon:latest-pacific + container_name: ceph-mon2 + hostname: ceph-mon2 + environment: + - CEPH_DAEMON=mon + - MON_IP=172.20.0.22 + - CEPH_PUBLIC_NETWORK=172.20.0.0/16 + - CLUSTER=ceph + - CEPH_CLUSTER_NETWORK=172.20.0.0/16 + - CLOBBER=true + - NETWORK_AUTO_DETECT=4 + - CEPH_AUTH_REQUIRE_SIGNATURES=false + volumes: + - ceph-mon2-data:/var/lib/ceph + - ./ceph-config/ceph.conf:/etc/ceph/ceph.conf:ro + networks: + csf-test: + ipv4_address: 172.20.0.22 + cap_add: + - ALL + privileged: true + healthcheck: + test: ["CMD", "ceph", "health"] + interval: 15s + timeout: 10s + retries: 10 + start_period: 90s + restart: unless-stopped + depends_on: + - ceph-mon1 + + ceph-mon3: + image: ceph/daemon:latest-pacific + container_name: ceph-mon3 + hostname: ceph-mon3 + environment: + - CEPH_DAEMON=mon + - MON_IP=172.20.0.23 + - CEPH_PUBLIC_NETWORK=172.20.0.0/16 + - CLUSTER=ceph + - CEPH_CLUSTER_NETWORK=172.20.0.0/16 + - CLOBBER=true + - NETWORK_AUTO_DETECT=4 + - CEPH_AUTH_REQUIRE_SIGNATURES=false + volumes: + - ceph-mon3-data:/var/lib/ceph + - ./ceph-config/ceph.conf:/etc/ceph/ceph.conf:ro + networks: + csf-test: + ipv4_address: 172.20.0.23 + cap_add: + - ALL + privileged: true + healthcheck: + test: ["CMD", "ceph", "health"] + interval: 15s + timeout: 10s + retries: 10 + start_period: 90s + restart: unless-stopped + depends_on: + - ceph-mon1 + + ceph-osd1: + image: ceph/daemon:latest-pacific + container_name: ceph-osd1 + hostname: ceph-osd1 + environment: + - CEPH_DAEMON=osd + - OSD_TYPE=directory + - CEPH_PUBLIC_NETWORK=172.20.0.0/16 + - CLUSTER=ceph + - NETWORK_AUTO_DETECT=4 + - CEPH_AUTH_REQUIRE_SIGNATURES=false + volumes: + - ceph-osd1-data:/var/lib/ceph/osd + - ./ceph-config/ceph.conf:/etc/ceph/ceph.conf:ro + networks: + csf-test: + ipv4_address: 172.20.0.31 + cap_add: + - ALL + privileged: true + restart: unless-stopped + depends_on: + - ceph-mon1 + - ceph-mon2 + - ceph-mon3 + + ceph-osd2: + image: ceph/daemon:latest-pacific + container_name: ceph-osd2 + hostname: ceph-osd2 + environment: + - CEPH_DAEMON=osd + - OSD_TYPE=directory + - CEPH_PUBLIC_NETWORK=172.20.0.0/16 + - CLUSTER=ceph + - NETWORK_AUTO_DETECT=4 + - CEPH_AUTH_REQUIRE_SIGNATURES=false + volumes: + - ceph-osd2-data:/var/lib/ceph/osd + - ./ceph-config/ceph.conf:/etc/ceph/ceph.conf:ro + networks: + csf-test: + ipv4_address: 172.20.0.32 + cap_add: + - ALL + privileged: true + restart: unless-stopped + depends_on: + - ceph-mon1 + - ceph-mon2 + - ceph-mon3 + + ceph-osd3: + image: ceph/daemon:latest-pacific + container_name: ceph-osd3 + hostname: ceph-osd3 + environment: + - CEPH_DAEMON=osd + - OSD_TYPE=directory + - CEPH_PUBLIC_NETWORK=172.20.0.0/16 + - CLUSTER=ceph + - NETWORK_AUTO_DETECT=4 + - CEPH_AUTH_REQUIRE_SIGNATURES=false + volumes: + - ceph-osd3-data:/var/lib/ceph/osd + - ./ceph-config/ceph.conf:/etc/ceph/ceph.conf:ro + networks: + csf-test: + ipv4_address: 172.20.0.33 + cap_add: + - ALL + privileged: true + restart: unless-stopped + depends_on: + - ceph-mon1 + - ceph-mon2 + - ceph-mon3 + + # ======================================== + # ETCD CLUSTER (für State & Patroni) + # ======================================== + + etcd1: + image: quay.io/coreos/etcd:v3.5.13 + container_name: etcd1 + hostname: etcd1 + environment: + - ETCD_NAME=etcd1 + - ETCD_ENABLE_V2=true + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 + - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-csf + - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 + - ETCD_INITIAL_CLUSTER_STATE=new + ports: + - "2379:2379" + - "2380:2380" + networks: + csf-test: + ipv4_address: 172.20.0.11 + volumes: + - etcd1-data:/etcd-data + restart: unless-stopped + + etcd2: + image: quay.io/coreos/etcd:v3.5.13 + container_name: etcd2 + hostname: etcd2 + environment: + - ETCD_NAME=etcd2 + - ETCD_ENABLE_V2=true + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 + - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-csf + - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 + - ETCD_INITIAL_CLUSTER_STATE=new + ports: + - "2479:2379" + - "2480:2380" + networks: + csf-test: + ipv4_address: 172.20.0.12 + volumes: + - etcd2-data:/etcd-data + restart: unless-stopped + + etcd3: + image: quay.io/coreos/etcd:v3.5.13 + container_name: etcd3 + hostname: etcd3 + environment: + - ETCD_ENABLE_V2=true + - ETCD_NAME=etcd3 + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 + - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-csf + - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 + - ETCD_INITIAL_CLUSTER_STATE=new + ports: + - "2579:2379" + - "2580:2380" + networks: + csf-test: + ipv4_address: 172.20.0.13 + volumes: + - etcd3-data:/etcd-data + restart: unless-stopped + + # ======================================== + # PATRONI POSTGRESQL HA CLUSTER + # ======================================== + + patroni1: + image: ghcr.io/zalando/spilo-15:3.0-p1 + container_name: patroni1 + hostname: patroni1 + environment: + - PATRONI_NAME=patroni1 + - ETCD_HOST=etcd1:2379 + - PATRONI_SCOPE=postgres-csf + - PATRONI_ETCD3_HOSTS=etcd1:2379,etcd2:2379,etcd3:2379 + - PATRONI_ETCD3_PROTOCOL=http + + # PostgreSQL Configuration + - PATRONI_POSTGRESQL_DATA_DIR=/home/postgres/pgdata + - PATRONI_POSTGRESQL_LISTEN=0.0.0.0:5432 + - PATRONI_POSTGRESQL_CONNECT_ADDRESS=patroni1:5432 + + # Replication + - PATRONI_REPLICATION_USERNAME=replicator + - PATRONI_REPLICATION_PASSWORD=replpass + + # Superuser + - PATRONI_SUPERUSER_USERNAME=postgres + - PATRONI_SUPERUSER_PASSWORD=postgrespass + + # Application User + - PATRONI_POSTGRESQL_PGPASS=/tmp/pgpass + - POSTGRES_USER=csf + - POSTGRES_PASSWORD=csfpassword + - POSTGRES_DB=csf_core + + # REST API + - PATRONI_RESTAPI_LISTEN=0.0.0.0:8008 + - PATRONI_RESTAPI_CONNECT_ADDRESS=patroni1:8008 + + # Bootstrap + - PATRONI_BOOTSTRAP_DCS_TTL=30 + - PATRONI_BOOTSTRAP_DCS_LOOP_WAIT=10 + - PATRONI_BOOTSTRAP_DCS_RETRY_TIMEOUT=10 + - PATRONI_BOOTSTRAP_METHOD=initdb + + # PostgreSQL parameters + - PATRONI_POSTGRESQL_PARAMETERS_MAX_CONNECTIONS=100 + - PATRONI_POSTGRESQL_PARAMETERS_MAX_WAL_SENDERS=10 + - PATRONI_POSTGRESQL_PARAMETERS_WAL_LEVEL=replica + - PATRONI_POSTGRESQL_PARAMETERS_HOT_STANDBY=on + - PATRONI_POSTGRESQL_PARAMETERS_WAL_KEEP_SIZE=128MB + - PATRONI_POSTGRESQL_PARAMETERS_ARCHIVE_MODE=on + - PATRONI_POSTGRESQL_PARAMETERS_ARCHIVE_COMMAND=/bin/true + + volumes: + - patroni1-data:/home/postgres/pgdata + networks: + csf-test: + ipv4_address: 172.20.0.41 + ports: + - "5441:5432" + - "8008:8008" + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8008/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 60s + depends_on: + - etcd1 + - etcd2 + - etcd3 + - ceph-osd1 + - ceph-osd2 + - ceph-osd3 + restart: unless-stopped + + patroni2: + image: ghcr.io/zalando/spilo-15:3.0-p1 + container_name: patroni2 + hostname: patroni2 + environment: + - ETCD_HOST=etcd1:2379 + - PATRONI_NAME=patroni2 + - PATRONI_SCOPE=postgres-csf + - PATRONI_ETCD3_HOSTS=etcd1:2379,etcd2:2379,etcd3:2379 + - PATRONI_ETCD3_PROTOCOL=http + + - PATRONI_POSTGRESQL_DATA_DIR=/home/postgres/pgdata + - PATRONI_POSTGRESQL_LISTEN=0.0.0.0:5432 + - PATRONI_POSTGRESQL_CONNECT_ADDRESS=patroni2:5432 + + - PATRONI_REPLICATION_USERNAME=replicator + - PATRONI_REPLICATION_PASSWORD=replpass + + - PATRONI_SUPERUSER_USERNAME=postgres + - PATRONI_SUPERUSER_PASSWORD=postgrespass + + - PATRONI_POSTGRESQL_PGPASS=/tmp/pgpass + - POSTGRES_USER=csf + - POSTGRES_PASSWORD=csfpassword + - POSTGRES_DB=csf_core + + - PATRONI_RESTAPI_LISTEN=0.0.0.0:8008 + - PATRONI_RESTAPI_CONNECT_ADDRESS=patroni2:8008 + + - PATRONI_BOOTSTRAP_DCS_TTL=30 + - PATRONI_BOOTSTRAP_DCS_LOOP_WAIT=10 + - PATRONI_BOOTSTRAP_DCS_RETRY_TIMEOUT=10 + - PATRONI_BOOTSTRAP_METHOD=initdb + + - PATRONI_POSTGRESQL_PARAMETERS_MAX_CONNECTIONS=100 + - PATRONI_POSTGRESQL_PARAMETERS_MAX_WAL_SENDERS=10 + - PATRONI_POSTGRESQL_PARAMETERS_WAL_LEVEL=replica + - PATRONI_POSTGRESQL_PARAMETERS_HOT_STANDBY=on + - PATRONI_POSTGRESQL_PARAMETERS_WAL_KEEP_SIZE=128MB + - PATRONI_POSTGRESQL_PARAMETERS_ARCHIVE_MODE=on + - PATRONI_POSTGRESQL_PARAMETERS_ARCHIVE_COMMAND=/bin/true + + volumes: + - patroni2-data:/home/postgres/pgdata + networks: + csf-test: + ipv4_address: 172.20.0.42 + ports: + - "5442:5432" + - "8009:8008" + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8008/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 60s + depends_on: + - etcd1 + - etcd2 + - etcd3 + - ceph-osd1 + - ceph-osd2 + - ceph-osd3 + restart: unless-stopped + + patroni3: + image: ghcr.io/zalando/spilo-15:3.0-p1 + container_name: patroni3 + hostname: patroni3 + environment: + - ETCD_HOST=etcd1:2379 + - PATRONI_NAME=patroni3 + - PATRONI_SCOPE=postgres-csf + - PATRONI_ETCD3_HOSTS=etcd1:2379,etcd2:2379,etcd3:2379 + - PATRONI_ETCD3_PROTOCOL=http + + - PATRONI_POSTGRESQL_DATA_DIR=/home/postgres/pgdata + - PATRONI_POSTGRESQL_LISTEN=0.0.0.0:5432 + - PATRONI_POSTGRESQL_CONNECT_ADDRESS=patroni3:5432 + + - PATRONI_REPLICATION_USERNAME=replicator + - PATRONI_REPLICATION_PASSWORD=replpass + + - PATRONI_SUPERUSER_USERNAME=postgres + - PATRONI_SUPERUSER_PASSWORD=postgrespass + + - PATRONI_POSTGRESQL_PGPASS=/tmp/pgpass + - POSTGRES_USER=csf + - POSTGRES_PASSWORD=csfpassword + - POSTGRES_DB=csf_core + + - PATRONI_RESTAPI_LISTEN=0.0.0.0:8008 + - PATRONI_RESTAPI_CONNECT_ADDRESS=patroni3:8008 + + - PATRONI_BOOTSTRAP_DCS_TTL=30 + - PATRONI_BOOTSTRAP_DCS_LOOP_WAIT=10 + - PATRONI_BOOTSTRAP_DCS_RETRY_TIMEOUT=10 + - PATRONI_BOOTSTRAP_METHOD=initdb + + - PATRONI_POSTGRESQL_PARAMETERS_MAX_CONNECTIONS=100 + - PATRONI_POSTGRESQL_PARAMETERS_MAX_WAL_SENDERS=10 + - PATRONI_POSTGRESQL_PARAMETERS_WAL_LEVEL=replica + - PATRONI_POSTGRESQL_PARAMETERS_HOT_STANDBY=on + - PATRONI_POSTGRESQL_PARAMETERS_WAL_KEEP_SIZE=128MB + - PATRONI_POSTGRESQL_PARAMETERS_ARCHIVE_MODE=on + - PATRONI_POSTGRESQL_PARAMETERS_ARCHIVE_COMMAND=/bin/true + + volumes: + - patroni3-data:/home/postgres/pgdata + networks: + csf-test: + ipv4_address: 172.20.0.43 + ports: + - "5443:5432" + - "8010:8008" + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8008/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 60s + depends_on: + - etcd1 + - etcd2 + - etcd3 + - ceph-osd1 + - ceph-osd2 + - ceph-osd3 + restart: unless-stopped + + # ======================================== + # HAPROXY (Smart Routing: Primary/Replica) + # ======================================== + + postgres-haproxy: + image: haproxy:2.8-alpine + container_name: postgres-haproxy + volumes: + - ./haproxy-patroni.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro + ports: + - "5432:5432" # Write Port (Primary) + - "5433:5433" # Read Port (Replicas) + - "8000:8000" # HAProxy Stats + networks: + csf-test: + ipv4_address: 172.20.0.40 + depends_on: + - patroni1 + - patroni2 + - patroni3 + healthcheck: + test: ["CMD-SHELL", "wget -q --spider http://localhost:8000/stats || exit 1"] + interval: 10s + timeout: 5s + retries: 3 + restart: unless-stopped + + # ======================================== + # VOLUME MANAGER (mit Patroni-Integration) + # ======================================== + + volume-manager-1: + image: volume-manager:patroni + build: + context: ../.. + dockerfile: control-plane/volume-manager/Dockerfile.test + container_name: volume-manager-1 + hostname: volume-manager-1 + environment: + - RUST_LOG=info + - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 + - NODE_ID=vm-1 + - CEPH_MON_HOSTS=ceph-mon1:6789,ceph-mon2:6789,ceph-mon3:6789 + - CEPH_DEFAULT_POOL=csf-data + - CEPH_PG_NUM=128 + - CEPH_DEFAULT_REPLICATION=3 + - PATRONI_SCOPE=postgres-csf + - PATRONI_NODES=patroni1:8008,patroni2:8008,patroni3:8008 + networks: + csf-test: + ipv4_address: 172.20.0.51 + depends_on: + - etcd1 + - etcd2 + - etcd3 + - ceph-mon1 + - ceph-mon2 + - ceph-mon3 + - patroni1 + - patroni2 + - patroni3 + restart: unless-stopped + + volume-manager-2: + image: volume-manager:patroni + build: + context: ../.. + dockerfile: control-plane/volume-manager/Dockerfile.test + container_name: volume-manager-2 + hostname: volume-manager-2 + environment: + - RUST_LOG=info + - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 + - NODE_ID=vm-2 + - CEPH_MON_HOSTS=ceph-mon1:6789,ceph-mon2:6789,ceph-mon3:6789 + - CEPH_DEFAULT_POOL=csf-data + - CEPH_PG_NUM=128 + - CEPH_DEFAULT_REPLICATION=3 + - PATRONI_SCOPE=postgres-csf + - PATRONI_NODES=patroni1:8008,patroni2:8008,patroni3:8008 + networks: + csf-test: + ipv4_address: 172.20.0.52 + depends_on: + - etcd1 + - etcd2 + - etcd3 + - ceph-mon1 + - ceph-mon2 + - ceph-mon3 + - patroni1 + - patroni2 + - patroni3 + restart: unless-stopped + + volume-manager-3: + image: volume-manager:patroni + build: + context: ../.. + dockerfile: control-plane/volume-manager/Dockerfile.test + container_name: volume-manager-3 + hostname: volume-manager-3 + environment: + - RUST_LOG=info + - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 + - NODE_ID=vm-3 + - CEPH_MON_HOSTS=ceph-mon1:6789,ceph-mon2:6789,ceph-mon3:6789 + - CEPH_DEFAULT_POOL=csf-data + - CEPH_PG_NUM=128 + - CEPH_DEFAULT_REPLICATION=3 + - PATRONI_SCOPE=postgres-csf + - PATRONI_NODES=patroni1:8008,patroni2:8008,patroni3:8008 + networks: + csf-test: + ipv4_address: 172.20.0.53 + depends_on: + - etcd1 + - etcd2 + - etcd3 + - ceph-mon1 + - ceph-mon2 + - ceph-mon3 + - patroni1 + - patroni2 + - patroni3 + restart: unless-stopped + +networks: + csf-test: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 + +volumes: + # Ceph + ceph-mon1-data: + ceph-mon2-data: + ceph-mon3-data: + ceph-osd1-data: + ceph-osd2-data: + ceph-osd3-data: + + # etcd + etcd1-data: + etcd2-data: + etcd3-data: + + # Patroni PostgreSQL + patroni1-data: + patroni2-data: + patroni3-data: diff --git a/control-plane/volume-manager/docker-compose.test.yml b/control-plane/volume-manager/docker-compose.test.yml deleted file mode 100644 index baf9306..0000000 --- a/control-plane/volume-manager/docker-compose.test.yml +++ /dev/null @@ -1,128 +0,0 @@ -services: - # etcd Cluster (3 Nodes für HA) - etcd1: - image: quay.io/coreos/etcd:v3.5.13 - container_name: etcd1 - environment: - - ETCD_NAME=etcd1 - - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 - - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 - - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-test - - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - - ETCD_INITIAL_CLUSTER_STATE=new - ports: - - "2379:2379" - - "2380:2380" - networks: - - csf-test - - etcd2: - image: quay.io/coreos/etcd:v3.5.13 - container_name: etcd2 - environment: - - ETCD_NAME=etcd2 - - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 - - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 - - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-test - - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - - ETCD_INITIAL_CLUSTER_STATE=new - ports: - - "2479:2379" - - "2480:2380" - networks: - - csf-test - - etcd3: - image: quay.io/coreos/etcd:v3.5.13 - container_name: etcd3 - environment: - - ETCD_NAME=etcd3 - - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 - - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 - - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-test - - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - - ETCD_INITIAL_CLUSTER_STATE=new - ports: - - "2579:2379" - - "2580:2380" - networks: - - csf-test - - # Volume Manager Node 1 - volume-manager-1: - image: volume-manager:test - build: - context: ../.. - dockerfile: control-plane/volume-manager/Dockerfile.test - container_name: volume-manager-1 - environment: - - RUST_LOG=debug - - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 - - NODE_ID=volume-manager-1 - - HOSTNAME=volume-manager-1 - - NODE_IP=172.20.0.11 - depends_on: - - etcd1 - - etcd2 - - etcd3 - networks: - csf-test: - ipv4_address: 172.20.0.11 - restart: unless-stopped - - # Volume Manager Node 2 - volume-manager-2: - image: volume-manager:test - build: - context: ../.. - dockerfile: control-plane/volume-manager/Dockerfile.test - container_name: volume-manager-2 - environment: - - RUST_LOG=debug - - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 - - NODE_ID=volume-manager-2 - - HOSTNAME=volume-manager-2 - - NODE_IP=172.20.0.12 - depends_on: - - etcd1 - - etcd2 - - etcd3 - networks: - csf-test: - ipv4_address: 172.20.0.12 - restart: unless-stopped - - # Volume Manager Node 3 - volume-manager-3: - image: volume-manager:test - build: - context: ../.. - dockerfile: control-plane/volume-manager/Dockerfile.test - container_name: volume-manager-3 - environment: - - RUST_LOG=info - - ETCD_ENDPOINTS=http://etcd1:2379,http://etcd2:2379,http://etcd3:2379 - - NODE_ID=volume-manager-3 - - HOSTNAME=volume-manager-3 - - NODE_IP=172.20.0.13 - depends_on: - - etcd1 - - etcd2 - - etcd3 - networks: - csf-test: - ipv4_address: 172.20.0.13 - restart: unless-stopped - -networks: - csf-test: - driver: bridge - ipam: - config: - - subnet: 172.20.0.0/16 diff --git a/control-plane/volume-manager/haproxy-patroni.cfg b/control-plane/volume-manager/haproxy-patroni.cfg new file mode 100644 index 0000000..d56c4d6 --- /dev/null +++ b/control-plane/volume-manager/haproxy-patroni.cfg @@ -0,0 +1,75 @@ +global + log stdout format raw local0 + maxconn 4096 + +defaults + log global + mode tcp + option tcplog + option dontlognull + retries 3 + timeout connect 5000ms + timeout client 50000ms + timeout server 50000ms + +# ======================================== +# Stats Interface +# ======================================== +listen stats + bind *:8000 + mode http + stats enable + stats uri /stats + stats refresh 5s + stats admin if TRUE + stats show-legends + stats show-desc PostgreSQL HA Cluster with Patroni + +# ======================================== +# PostgreSQL Primary (WRITES) +# Port 5432 - Nur der Primary antwortet +# ======================================== +listen postgres_primary + bind *:5432 + mode tcp + option httpchk GET /primary + http-check expect status 200 + default-server inter 3s fall 3 rise 2 on-marked-down shutdown-sessions + + # Patroni REST API Health Checks + server patroni1 patroni1:5432 check port 8008 + server patroni2 patroni2:5432 check port 8008 + server patroni3 patroni3:5432 check port 8008 + +# ======================================== +# PostgreSQL Replicas (READS) +# Port 5433 - Nur Replicas antworten +# ======================================== +listen postgres_replicas + bind *:5433 + mode tcp + balance roundrobin + option httpchk GET /replica + http-check expect status 200 + default-server inter 3s fall 3 rise 2 + + # Patroni REST API Health Checks + server patroni1 patroni1:5432 check port 8008 + server patroni2 patroni2:5432 check port 8008 + server patroni3 patroni3:5432 check port 8008 + +# ======================================== +# PostgreSQL Any (Fallback für Legacy Apps) +# Port 5434 - Jede gesunde Node akzeptiert +# ======================================== +listen postgres_any + bind *:5434 + mode tcp + balance roundrobin + option httpchk GET /health + http-check expect status 200 + default-server inter 3s fall 3 rise 2 + + server patroni1 patroni1:5432 check port 8008 + server patroni2 patroni2:5432 check port 8008 + server patroni3 patroni3:5432 check port 8008 diff --git a/control-plane/volume-manager/haproxy.cfg b/control-plane/volume-manager/haproxy.cfg new file mode 100644 index 0000000..db59dba --- /dev/null +++ b/control-plane/volume-manager/haproxy.cfg @@ -0,0 +1,38 @@ +global + log stdout format raw local0 + maxconn 4096 + +defaults + log global + mode tcp + option tcplog + option dontlognull + retries 3 + timeout connect 5000ms + timeout client 50000ms + timeout server 50000ms + +# Stats interface +listen stats + bind *:7000 + mode http + stats enable + stats uri / + stats refresh 10s + stats admin if TRUE + +# PostgreSQL Load Balancer +listen postgres + bind *:5432 + mode tcp + option tcp-check + tcp-check connect + tcp-check send-binary 00000014 # Length + tcp-check send-binary 00030000 # SSL request + tcp-check expect binary 4e # 'N' - SSL not available + default-server inter 3s fall 3 rise 2 + + # PostgreSQL Backends + server postgres1 postgres1:5432 check + server postgres2 postgres2:5432 check + server postgres3 postgres3:5432 check backup diff --git a/control-plane/volume-manager/src/ceph/core/client.rs b/control-plane/volume-manager/src/ceph/core/client.rs new file mode 100644 index 0000000..f79b0e3 --- /dev/null +++ b/control-plane/volume-manager/src/ceph/core/client.rs @@ -0,0 +1,181 @@ +use super::config::CephConfig; +use crate::ceph::storage::types::*; +use anyhow::{anyhow, Context, Result}; +use serde_json::Value; +use tokio::process::Command as AsyncCommand; + +#[derive(Clone)] +pub struct CephClient { + config: CephConfig, +} + +impl CephClient { + pub fn new(config: CephConfig) -> Self { + Self { config } + } + + /// Führt ein Ceph-Kommando aus + pub async fn execute(&self, cmd: CephCommand) -> Result { + let cmd_vec = cmd.to_vec(); + crate::log_debug!( + "ceph_client", + &format!("Executing ceph command: {}", cmd_vec.join(" ")) + ); + + let mut command = AsyncCommand::new("ceph"); + + // Monitoring hosts hinzufügen + command.arg("-m").arg(self.config.mon_host_string()); + + // Keyring falls vorhanden + if let Some(ref keyring) = self.config.keyring_path { + command.arg("--keyring").arg(keyring); + } + + // Client name + command + .arg("--name") + .arg(format!("client.{}", self.config.client_name)); + + // Das eigentliche Kommando + for arg in cmd_vec { + command.arg(arg); + } + + // JSON Format für strukturierte Ausgabe + command.arg("--format").arg("json"); + + let output = command + .output() + .await + .context("Failed to execute ceph command")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + crate::log_error!("ceph_client", &format!("Ceph command failed: {}", stderr)); + return Err(anyhow!("Ceph command failed: {}", stderr)); + } + + Ok(String::from_utf8(output.stdout)?) + } + + /// Prüft Cluster-Health + pub async fn health_status(&self) -> Result { + crate::log_debug!("ceph_client", "Checking cluster health status"); + + let cmd = CephCommand::new("status"); + let output = self.execute(cmd).await?; + + // Parse JSON output + let status: Value = serde_json::from_str(&output)?; + + // Extrahiere Health-Status + let health_status = status["health"]["status"].as_str().unwrap_or("HEALTH_ERR"); + + let health = match health_status { + "HEALTH_OK" => HealthStatus::Ok, + "HEALTH_WARN" => HealthStatus::Warn, + _ => HealthStatus::Error, + }; + + crate::log_debug!( + "ceph_client", + &format!("Cluster health status: {:?}", health) + ); + + // Extrahiere Monitor-Info + let mons = if let Some(mon_map) = status["monmap"]["mons"].as_array() { + mon_map + .iter() + .filter_map(|m| { + Some(MonitorInfo { + name: m["name"].as_str()?.to_string(), + addr: m["addr"].as_str()?.to_string(), + rank: m["rank"].as_u64()? as u32, + in_quorum: true, // Simplified + }) + }) + .collect() + } else { + Vec::new() + }; + + // Extrahiere OSD-Info + let osds = if let Some(osd_map) = status["osdmap"]["osds"].as_array() { + osd_map + .iter() + .filter_map(|o| { + Some(OsdInfo { + id: o["osd"].as_u64()? as u32, + up: o["up"].as_u64()? == 1, + in_cluster: o["in"].as_u64()? == 1, + weight: o["weight"].as_f64()?, + }) + }) + .collect() + } else { + Vec::new() + }; + + // PG Summary + let pgs = PgSummary { + total: status["pgmap"]["num_pgs"].as_u64().unwrap_or(0) as u32, + active_clean: status["pgmap"]["pgs_by_state"] + .as_array() + .and_then(|arr| { + arr.iter() + .find(|s| s["state_name"].as_str() == Some("active+clean")) + .and_then(|s| s["count"].as_u64()) + }) + .unwrap_or(0) as u32, + degraded: 0, // Simplified + misplaced: 0, // Simplified + }; + + Ok(CephClusterHealth { + status: health, + mons, + osds, + pgs, + }) + } + + /// Prüft ob Ceph-Cluster erreichbar ist + pub async fn is_available(&self) -> bool { + self.health_status().await.is_ok() + } + + /// Wartet bis Cluster verfügbar ist + pub async fn wait_for_cluster(&self, max_attempts: u32) -> Result<()> { + crate::log_info!( + "ceph_client", + &format!("Waiting for Ceph cluster (max {} attempts)", max_attempts) + ); + + for attempt in 1..=max_attempts { + crate::log_debug!( + "ceph_client", + &format!( + "Cluster availability check: attempt {}/{}", + attempt, max_attempts + ) + ); + + if self.is_available().await { + crate::log_info!("ceph_client", "Ceph cluster is available and ready"); + return Ok(()); + } + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + + crate::log_error!( + "ceph_client", + &format!("Ceph cluster not available after {} attempts", max_attempts) + ); + Err(anyhow!( + "Ceph cluster not available after {} attempts", + max_attempts + )) + } +} diff --git a/control-plane/volume-manager/src/ceph/core/config.rs b/control-plane/volume-manager/src/ceph/core/config.rs new file mode 100644 index 0000000..7bcf4f7 --- /dev/null +++ b/control-plane/volume-manager/src/ceph/core/config.rs @@ -0,0 +1,80 @@ +use serde::{Deserialize, Serialize}; +use std::env; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CephConfig { + pub mon_hosts: Vec, + pub keyring_path: Option, + pub client_name: String, + pub default_pool: String, + pub default_pg_num: u32, + pub default_replication: u32, +} + +impl CephConfig { + pub fn from_env() -> anyhow::Result { + crate::log_debug!("ceph_config", "Loading Ceph configuration from environment"); + + let config = Self { + mon_hosts: env::var("CEPH_MON_HOSTS") + .unwrap_or_else(|_| "ceph-mon1:6789,ceph-mon2:6789,ceph-mon3:6789".to_string()) + .split(',') + .map(|s| s.to_string()) + .collect(), + keyring_path: env::var("CEPH_KEYRING").ok(), + client_name: env::var("CEPH_CLIENT_NAME").unwrap_or_else(|_| "admin".to_string()), + default_pool: env::var("CEPH_DEFAULT_POOL") + .unwrap_or_else(|_| "csf-volumes".to_string()), + default_pg_num: env::var("CEPH_PG_NUM") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(128), + default_replication: env::var("CEPH_REPLICATION") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3), + }; + + crate::log_info!( + "ceph_config", + &format!( + "Loaded config: monitors={}, client={}, pool={}", + config.mon_hosts.len(), + config.client_name, + config.default_pool + ) + ); + + Ok(config) + } + + pub fn mon_initial_members(&self) -> String { + self.mon_hosts + .iter() + .enumerate() + .map(|(i, _)| format!("ceph-mon{}", i + 1)) + .collect::>() + .join(",") + } + + pub fn mon_host_string(&self) -> String { + self.mon_hosts.join(",") + } +} + +impl Default for CephConfig { + fn default() -> Self { + Self { + mon_hosts: vec![ + "ceph-mon1:6789".to_string(), + "ceph-mon2:6789".to_string(), + "ceph-mon3:6789".to_string(), + ], + keyring_path: None, + client_name: "admin".to_string(), + default_pool: "csf-volumes".to_string(), + default_pg_num: 128, + default_replication: 3, + } + } +} diff --git a/control-plane/volume-manager/src/ceph/core/error.rs b/control-plane/volume-manager/src/ceph/core/error.rs new file mode 100644 index 0000000..781112c --- /dev/null +++ b/control-plane/volume-manager/src/ceph/core/error.rs @@ -0,0 +1,39 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum CephError { + #[error("Command execution failed: {0}")] + CommandFailed(String), + + #[error("Parse error: {0}")] + ParseError(String), + + #[error("Ceph cluster not healthy: {0}")] + UnhealthyCluster(String), + + #[error("Pool operation failed: {0}")] + PoolError(String), + + #[error("RBD operation failed: {0}")] + RbdError(String), + + #[error("Configuration error: {0}")] + ConfigError(String), + + #[error("Timeout waiting for cluster")] + Timeout, + + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("JSON error: {0}")] + JsonError(#[from] serde_json::Error), + + #[error("UTF-8 error: {0}")] + Utf8Error(#[from] std::string::FromUtf8Error), + + #[error("Unknown error: {0}")] + Unknown(String), +} + +pub type Result = std::result::Result; diff --git a/control-plane/volume-manager/src/ceph/core/mod.rs b/control-plane/volume-manager/src/ceph/core/mod.rs new file mode 100644 index 0000000..3fcb51a --- /dev/null +++ b/control-plane/volume-manager/src/ceph/core/mod.rs @@ -0,0 +1,7 @@ +pub mod client; +pub mod config; +pub mod error; + +pub use client::CephClient; +pub use config::CephConfig; +pub use error::{CephError, Result}; diff --git a/control-plane/volume-manager/src/ceph/mod.rs b/control-plane/volume-manager/src/ceph/mod.rs new file mode 100644 index 0000000..a3f802c --- /dev/null +++ b/control-plane/volume-manager/src/ceph/mod.rs @@ -0,0 +1,3 @@ +pub mod core; +pub mod ops; +pub mod storage; diff --git a/control-plane/volume-manager/src/ceph/ops/init.rs b/control-plane/volume-manager/src/ceph/ops/init.rs new file mode 100644 index 0000000..d0015a8 --- /dev/null +++ b/control-plane/volume-manager/src/ceph/ops/init.rs @@ -0,0 +1,140 @@ +use crate::ceph::core::{CephClient, CephConfig}; +use crate::ceph::storage::types::CephPool; +use crate::ceph::storage::{PoolManager, RbdManager}; +use anyhow::Result; + +pub struct CephManager { + pub client: CephClient, + pub pool_manager: PoolManager, + pub rbd_manager: RbdManager, +} + +/// Initialisiert Ceph-Cluster und erstellt Standard-Pools +pub async fn init_ceph() -> Result { + crate::log_info!("ceph_init", "Initializing Ceph storage system"); + + // Konfiguration laden + let config = CephConfig::from_env()?; + crate::log_debug!( + "ceph_init", + &format!("Using {} monitor hosts", config.mon_hosts.len()) + ); + + // Client erstellen + let client = CephClient::new(config.clone()); + + // Auf Cluster warten (max 30 Versuche = 2.5 Minuten) + client.wait_for_cluster(30).await?; + + // Health-Status prüfen + let health = client.health_status().await?; + crate::log_info!( + "ceph_init", + &format!("Ceph cluster health: {:?}", health.status) + ); + crate::log_info!( + "ceph_init", + &format!( + "Monitors: {}, OSDs: {}", + health.mons.len(), + health.osds.len() + ) + ); + + // Pool Manager erstellen + let pool_manager = PoolManager::new(client.clone()); + + // Standard-Pools erstellen + let pools = vec![ + CephPool { + name: config.default_pool.clone(), + pg_num: config.default_pg_num, + pgp_num: config.default_pg_num, + size: config.default_replication, + min_size: 2, + }, + CephPool { + name: "csf-postgres".to_string(), + pg_num: 64, + pgp_num: 64, + size: config.default_replication, + min_size: 2, + }, + CephPool { + name: "csf-metadata".to_string(), + pg_num: 32, + pgp_num: 32, + size: config.default_replication, + min_size: 2, + }, + ]; + + for pool in pools { + crate::log_debug!( + "ceph_init", + &format!("Ensuring pool '{}' exists", pool.name) + ); + if let Err(e) = pool_manager.ensure_pool(&pool).await { + crate::log_warn!( + "ceph_init", + &format!("Failed to create pool '{}': {}", pool.name, e) + ); + } + } + + // RBD Manager erstellen + let rbd_manager = RbdManager::new(client.clone()); + + crate::log_info!("ceph_init", "Ceph storage system initialized successfully"); + + Ok(CephManager { + client, + pool_manager, + rbd_manager, + }) +} + +/// Erstellt PostgreSQL Volumes auf dem Ceph-Cluster +pub async fn create_postgres_volumes(ceph: &CephManager, node_count: u32) -> Result> { + crate::log_info!( + "ceph_init", + &format!("Creating PostgreSQL volumes for {} nodes", node_count) + ); + + let mut volumes = Vec::new(); + + for i in 1..=node_count { + let volume_name = format!("postgres-node-{}", i); + + let volume = crate::ceph::storage::types::CephVolume { + name: volume_name.clone(), + pool: "csf-postgres".to_string(), + size_mb: 10240, // 10 GB + features: vec!["layering".to_string(), "exclusive-lock".to_string()], + encrypted: false, + }; + + // Erstelle Volume falls nicht vorhanden + if !ceph + .rbd_manager + .image_exists(&volume.pool, &volume.name) + .await? + { + ceph.rbd_manager.create_image(&volume).await?; + volumes.push(volume_name); + } else { + crate::log_info!( + "ceph_init", + &format!("Volume '{}' already exists", volume_name) + ); + volumes.push(volume_name); + } + } + + crate::log_info!( + "ceph_init", + &format!("Created {} PostgreSQL volumes", volumes.len()) + ); + + Ok(volumes) +} diff --git a/control-plane/volume-manager/src/ceph/ops/mod.rs b/control-plane/volume-manager/src/ceph/ops/mod.rs new file mode 100644 index 0000000..14f353d --- /dev/null +++ b/control-plane/volume-manager/src/ceph/ops/mod.rs @@ -0,0 +1,3 @@ +pub mod init; + +pub use init::{create_postgres_volumes, init_ceph, CephManager}; diff --git a/control-plane/volume-manager/src/ceph/storage/mod.rs b/control-plane/volume-manager/src/ceph/storage/mod.rs new file mode 100644 index 0000000..b3d07f5 --- /dev/null +++ b/control-plane/volume-manager/src/ceph/storage/mod.rs @@ -0,0 +1,6 @@ +pub mod pool; +pub mod rbd; +pub mod types; + +pub use pool::PoolManager; +pub use rbd::RbdManager; diff --git a/control-plane/volume-manager/src/ceph/storage/pool.rs b/control-plane/volume-manager/src/ceph/storage/pool.rs new file mode 100644 index 0000000..3d55886 --- /dev/null +++ b/control-plane/volume-manager/src/ceph/storage/pool.rs @@ -0,0 +1,130 @@ +use crate::ceph::core::CephClient; +use super::types::*; +use anyhow::{Context, Result}; + +pub struct PoolManager { + client: CephClient, +} + +impl PoolManager { + pub fn new(client: CephClient) -> Self { + Self { client } + } + + /// Erstellt einen neuen Ceph Pool + pub async fn create_pool(&self, pool: &CephPool) -> Result<()> { + crate::log_info!( + "pool_manager", + &format!("Creating Ceph pool: {}", pool.name) + ); + + // Pool erstellen + let cmd = CephCommand::new("osd") + .arg("pool") + .arg("create") + .arg(&pool.name) + .arg(pool.pg_num.to_string()) + .arg(pool.pgp_num.to_string()); + + self.client.execute(cmd).await + .context("Failed to create pool")?; + + // Replikation setzen + let cmd = CephCommand::new("osd") + .arg("pool") + .arg("set") + .arg(&pool.name) + .arg("size") + .arg(pool.size.to_string()); + + self.client.execute(cmd).await + .context("Failed to set pool size")?; + + // Min size setzen + let cmd = CephCommand::new("osd") + .arg("pool") + .arg("set") + .arg(&pool.name) + .arg("min_size") + .arg(pool.min_size.to_string()); + + self.client.execute(cmd).await + .context("Failed to set pool min_size")?; + + // RBD Pool initialisieren + let cmd = CephCommand::new("osd") + .arg("pool") + .arg("application") + .arg("enable") + .arg(&pool.name) + .arg("rbd"); + + self.client.execute(cmd).await + .context("Failed to enable RBD application")?; + + crate::log_info!( + "pool_manager", + &format!("Pool '{}' created successfully", pool.name) + ); + + Ok(()) + } + + /// Löscht einen Pool + pub async fn delete_pool(&self, pool_name: &str) -> Result<()> { + crate::log_info!( + "pool_manager", + &format!("Deleting Ceph pool: {}", pool_name) + ); + + let cmd = CephCommand::new("osd") + .arg("pool") + .arg("delete") + .arg(pool_name) + .arg(pool_name) // Bestätigung + .arg("--yes-i-really-really-mean-it"); + + self.client.execute(cmd).await + .context("Failed to delete pool")?; + + Ok(()) + } + + /// Listet alle Pools auf + pub async fn list_pools(&self) -> Result> { + crate::log_debug!("pool_manager", "Listing all Ceph pools"); + + let cmd = CephCommand::new("osd") + .arg("pool") + .arg("ls"); + + let output = self.client.execute(cmd).await?; + let pools: Vec = serde_json::from_str(&output)?; + + crate::log_debug!( + "pool_manager", + &format!("Found {} pools: {}", pools.len(), pools.join(", ")) + ); + + Ok(pools) + } + + /// Prüft ob Pool existiert + pub async fn pool_exists(&self, pool_name: &str) -> Result { + let pools = self.list_pools().await?; + Ok(pools.contains(&pool_name.to_string())) + } + + /// Erstellt Pool falls nicht vorhanden + pub async fn ensure_pool(&self, pool: &CephPool) -> Result<()> { + if !self.pool_exists(&pool.name).await? { + self.create_pool(pool).await?; + } else { + crate::log_info!( + "pool_manager", + &format!("Pool '{}' already exists", pool.name) + ); + } + Ok(()) + } +} diff --git a/control-plane/volume-manager/src/ceph/storage/rbd.rs b/control-plane/volume-manager/src/ceph/storage/rbd.rs new file mode 100644 index 0000000..6543da5 --- /dev/null +++ b/control-plane/volume-manager/src/ceph/storage/rbd.rs @@ -0,0 +1,258 @@ +use super::types::*; +use crate::ceph::core::CephClient; +use anyhow::{Context, Result}; +use serde_json::Value; + +pub struct RbdManager { + client: CephClient, +} + +impl RbdManager { + pub fn new(client: CephClient) -> Self { + Self { client } + } + + /// Erstellt ein RBD Image (Volume) + pub async fn create_image(&self, volume: &CephVolume) -> Result<()> { + crate::log_info!( + "rbd_manager", + &format!("Creating RBD image: {}/{}", volume.pool, volume.name) + ); + + let mut cmd = CephCommand::new("rbd") + .arg("create") + .arg(format!("{}/{}", volume.pool, volume.name)) + .arg("--size") + .arg(volume.size_mb.to_string()); + + // Features hinzufügen + if !volume.features.is_empty() { + cmd = cmd.arg("--image-feature").arg(volume.features.join(",")); + } + + self.client + .execute(cmd) + .await + .context("Failed to create RBD image")?; + + // Verschlüsselung aktivieren falls gewünscht + if volume.encrypted { + self.enable_encryption(&volume.pool, &volume.name).await?; + } + + crate::log_info!( + "rbd_manager", + &format!( + "RBD image '{}/{}' created successfully", + volume.pool, volume.name + ) + ); + + Ok(()) + } + + /// Löscht ein RBD Image + pub async fn delete_image(&self, pool: &str, name: &str) -> Result<()> { + crate::log_info!( + "rbd_manager", + &format!("Deleting RBD image: {}/{}", pool, name) + ); + + let cmd = CephCommand::new("rbd") + .arg("rm") + .arg(format!("{}/{}", pool, name)); + + self.client + .execute(cmd) + .await + .context("Failed to delete RBD image")?; + + Ok(()) + } + + /// Listet alle RBD Images in einem Pool + pub async fn list_images(&self, pool: &str) -> Result> { + crate::log_debug!( + "rbd_manager", + &format!("Listing RBD images in pool: {}", pool) + ); + + let cmd = CephCommand::new("rbd").arg("ls").arg("-l").arg(pool); + + let output = self.client.execute(cmd).await?; + + if output.trim().is_empty() || output.trim() == "[]" { + crate::log_debug!("rbd_manager", &format!("No images found in pool: {}", pool)); + return Ok(Vec::new()); + } + + let images: Vec = serde_json::from_str(&output)?; + + let result: Vec = images + .into_iter() + .filter_map(|img| { + Some(RbdImage { + name: img["name"].as_str()?.to_string(), + size: img["size"].as_u64()?, + pool: pool.to_string(), + format: img["format"].as_u64().unwrap_or(2) as u32, + features: img["features"] + .as_array()? + .iter() + .filter_map(|f| f.as_str().map(|s| s.to_string())) + .collect(), + }) + }) + .collect(); + + crate::log_debug!( + "rbd_manager", + &format!("Found {} images in pool: {}", result.len(), pool) + ); + + Ok(result) + } + + /// Erstellt einen Snapshot + pub async fn create_snapshot(&self, pool: &str, image: &str, snapshot: &str) -> Result<()> { + crate::log_info!( + "rbd_manager", + &format!("Creating snapshot: {}/{}@{}", pool, image, snapshot) + ); + + let cmd = CephCommand::new("rbd") + .arg("snap") + .arg("create") + .arg(format!("{}/{}@{}", pool, image, snapshot)); + + self.client + .execute(cmd) + .await + .context("Failed to create snapshot")?; + + Ok(()) + } + + /// Löscht einen Snapshot + pub async fn delete_snapshot(&self, pool: &str, image: &str, snapshot: &str) -> Result<()> { + crate::log_info!( + "rbd_manager", + &format!("Deleting snapshot: {}/{}@{}", pool, image, snapshot) + ); + + let cmd = CephCommand::new("rbd") + .arg("snap") + .arg("rm") + .arg(format!("{}/{}@{}", pool, image, snapshot)); + + self.client + .execute(cmd) + .await + .context("Failed to delete snapshot")?; + + Ok(()) + } + + /// Resized ein Image + pub async fn resize_image(&self, pool: &str, name: &str, new_size_mb: u64) -> Result<()> { + crate::log_info!( + "rbd_manager", + &format!( + "Resizing RBD image: {}/{} to {} MB", + pool, name, new_size_mb + ) + ); + + let cmd = CephCommand::new("rbd") + .arg("resize") + .arg(format!("{}/{}", pool, name)) + .arg("--size") + .arg(new_size_mb.to_string()); + + self.client + .execute(cmd) + .await + .context("Failed to resize RBD image")?; + + Ok(()) + } + + /// Maps ein RBD Device + pub async fn map_device(&self, pool: &str, image: &str) -> Result { + crate::log_info!( + "rbd_manager", + &format!("Mapping RBD device: {}/{}", pool, image) + ); + + let cmd = CephCommand::new("rbd") + .arg("map") + .arg(format!("{}/{}", pool, image)); + + let output = self + .client + .execute(cmd) + .await + .context("Failed to map RBD device")?; + + let device = output.trim().trim_matches('"').to_string(); + + crate::log_info!("rbd_manager", &format!("RBD device mapped to: {}", device)); + + Ok(device) + } + + /// Unmaps ein RBD Device + pub async fn unmap_device(&self, device: &str) -> Result<()> { + crate::log_info!("rbd_manager", &format!("Unmapping RBD device: {}", device)); + + let cmd = CephCommand::new("rbd").arg("unmap").arg(device); + + self.client + .execute(cmd) + .await + .context("Failed to unmap RBD device")?; + + Ok(()) + } + + /// Aktiviert Verschlüsselung (LUKS) + async fn enable_encryption(&self, pool: &str, image: &str) -> Result<()> { + crate::log_info!( + "rbd_manager", + &format!("Enabling encryption for: {}/{}", pool, image) + ); + + // Dies ist ein Platzhalter - tatsächliche LUKS-Verschlüsselung + // würde auf dem gemappten Block Device erfolgen + // Hier könnten wir rbd encryption format aufrufen + + let cmd = CephCommand::new("rbd") + .arg("encryption") + .arg("format") + .arg(format!("{}/{}", pool, image)) + .arg("luks2") + .arg("passphrase-file") + .arg("/etc/ceph/luks-passphrase"); + + // Ignoriere Fehler falls Encryption nicht verfügbar + match self.client.execute(cmd).await { + Ok(_) => { + crate::log_info!("rbd_manager", "Encryption enabled successfully"); + } + Err(e) => { + crate::log_warn!( + "rbd_manager", + &format!("Encryption not available or failed: {}", e) + ); + } + } + + Ok(()) + } + + /// Prüft ob Image existiert + pub async fn image_exists(&self, pool: &str, name: &str) -> Result { + let images = self.list_images(pool).await?; + Ok(images.iter().any(|img| img.name == name)) + } +} diff --git a/control-plane/volume-manager/src/ceph/storage/types.rs b/control-plane/volume-manager/src/ceph/storage/types.rs new file mode 100644 index 0000000..52c933b --- /dev/null +++ b/control-plane/volume-manager/src/ceph/storage/types.rs @@ -0,0 +1,98 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CephVolume { + pub name: String, + pub pool: String, + pub size_mb: u64, + pub features: Vec, + pub encrypted: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CephPool { + pub name: String, + pub pg_num: u32, + pub pgp_num: u32, + pub size: u32, // Replikation + pub min_size: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CephClusterHealth { + pub status: HealthStatus, + pub mons: Vec, + pub osds: Vec, + pub pgs: PgSummary, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum HealthStatus { + Ok, + Warn, + Error, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MonitorInfo { + pub name: String, + pub addr: String, + pub rank: u32, + pub in_quorum: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OsdInfo { + pub id: u32, + pub up: bool, + pub in_cluster: bool, + pub weight: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PgSummary { + pub total: u32, + pub active_clean: u32, + pub degraded: u32, + pub misplaced: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RbdImage { + pub name: String, + pub size: u64, + pub pool: String, + pub format: u32, + pub features: Vec, +} + +#[derive(Debug, Clone)] +pub struct CephCommand { + pub cmd: String, + pub args: Vec, +} + +impl CephCommand { + pub fn new(cmd: impl Into) -> Self { + Self { + cmd: cmd.into(), + args: Vec::new(), + } + } + + pub fn arg(mut self, arg: impl Into) -> Self { + self.args.push(arg.into()); + self + } + + pub fn args_vec(mut self, args: Vec) -> Self { + self.args.extend(args); + self + } + + pub fn to_vec(&self) -> Vec { + let mut result = vec![self.cmd.clone()]; + result.extend(self.args.clone()); + result + } +} diff --git a/control-plane/volume-manager/src/etcd/ha/health.rs b/control-plane/volume-manager/src/etcd/ha/health.rs index d08a51e..f456dd6 100644 --- a/control-plane/volume-manager/src/etcd/ha/health.rs +++ b/control-plane/volume-manager/src/etcd/ha/health.rs @@ -1,4 +1,4 @@ -use crate::etcd::core::{EtcdClient, EtcdError}; +use crate::etcd::core::EtcdClient; use crate::etcd::state::{NodeState, NodeStatus}; use crate::{log_info, log_warn}; use std::sync::Arc; diff --git a/control-plane/volume-manager/src/etcd/ha/leader_election.rs b/control-plane/volume-manager/src/etcd/ha/leader_election.rs index 7b961cb..f424074 100644 --- a/control-plane/volume-manager/src/etcd/ha/leader_election.rs +++ b/control-plane/volume-manager/src/etcd/ha/leader_election.rs @@ -1,6 +1,5 @@ use crate::etcd::core::{EtcdClient, EtcdError}; use crate::{log_error, log_info, log_warn}; -use etcd_client::EventType; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; @@ -185,7 +184,7 @@ impl LeaderElection { } /// Wartet auf Leadership Changes (Watch) - pub async fn watch_leadership(&self, mut callback: F) -> Result<(), EtcdError> + pub async fn watch_leadership(&self, callback: F) -> Result<(), EtcdError> where F: FnMut(Option) + Send + 'static, { diff --git a/control-plane/volume-manager/src/etcd/ha/mod.rs b/control-plane/volume-manager/src/etcd/ha/mod.rs index bbd5d92..59d7759 100644 --- a/control-plane/volume-manager/src/etcd/ha/mod.rs +++ b/control-plane/volume-manager/src/etcd/ha/mod.rs @@ -3,5 +3,5 @@ pub mod health; pub mod leader_election; -pub use health::{ClusterHealthSummary, HealthChecker, NodeHealthStatus}; +pub use health::{HealthChecker, NodeHealthStatus}; pub use leader_election::LeaderElection; diff --git a/control-plane/volume-manager/src/etcd/sync/mod.rs b/control-plane/volume-manager/src/etcd/sync/mod.rs index 0884814..5bfde68 100644 --- a/control-plane/volume-manager/src/etcd/sync/mod.rs +++ b/control-plane/volume-manager/src/etcd/sync/mod.rs @@ -2,6 +2,3 @@ pub mod lock; pub mod watcher; - -pub use lock::DistributedLock; -pub use watcher::StateWatcher; diff --git a/control-plane/volume-manager/src/etcd/sync/watcher.rs b/control-plane/volume-manager/src/etcd/sync/watcher.rs index db0debd..05606bf 100644 --- a/control-plane/volume-manager/src/etcd/sync/watcher.rs +++ b/control-plane/volume-manager/src/etcd/sync/watcher.rs @@ -13,7 +13,7 @@ impl StateWatcher { } /// Beobachtet einen Key-Prefix für Änderungen - pub async fn watch_prefix(&self, prefix: &str, mut callback: F) -> Result<(), EtcdError> + pub async fn watch_prefix(&self, prefix: &str, callback: F) -> Result<(), EtcdError> where F: FnMut(WatchEvent) + Send + 'static, { diff --git a/control-plane/volume-manager/src/main.rs b/control-plane/volume-manager/src/main.rs index d1e3ad8..7622171 100644 --- a/control-plane/volume-manager/src/main.rs +++ b/control-plane/volume-manager/src/main.rs @@ -3,18 +3,96 @@ use std::sync::Arc; use etcd::state::NodeRole; use etcd::StateManager; +mod ceph; mod etcd; mod logger; +mod patroni; #[tokio::main] async fn main() -> anyhow::Result<()> { + // Initialisiere etcd Cluster let init_data = etcd::init::init_cluster().await?; - let etcd_client = init_data.etcd_client; + let _etcd_client = init_data.etcd_client; let state_manager = init_data.state_manager; let health_checker = init_data.health_checker; let leader_election = init_data.leader_election; let node_id = init_data.node_id; + // Initialisiere Ceph Storage (nur Leader) + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + let ceph_manager = if leader_election.is_leader() { + log_info!("main", "Node is leader, initializing Ceph storage"); + + match ceph::ops::init_ceph().await { + Ok(manager) => { + log_info!("main", "Ceph storage initialized successfully"); + + // Erstelle PostgreSQL Volumes für Patroni + match ceph::ops::create_postgres_volumes(&manager, 3).await { + Ok(volumes) => { + log_info!( + "main", + &format!("Created PostgreSQL volumes on Ceph: {:?}", volumes) + ); + } + Err(e) => { + log_error!( + "main", + &format!("Failed to create PostgreSQL volumes: {}", e) + ); + } + } + + Some(Arc::new(manager)) + } + Err(e) => { + log_warn!( + "main", + &format!( + "Ceph initialization failed (continuing without Ceph): {}", + e + ) + ); + None + } + } + } else { + log_info!("main", "Node is follower, skipping Ceph initialization"); + None + }; + + // Initialisiere Patroni Monitoring (alle Nodes) + log_info!("main", "Initializing Patroni PostgreSQL HA monitoring"); + + let patroni_scope = + std::env::var("PATRONI_SCOPE").unwrap_or_else(|_| "postgres-csf".to_string()); + + let patroni_nodes = std::env::var("PATRONI_NODES") + .unwrap_or_else(|_| "patroni1:8008,patroni2:8008,patroni3:8008".to_string()) + .split(',') + .map(|s| format!("http://{}", s.trim())) + .collect::>(); + + let patroni_client = patroni::PatroniClient::new(patroni_scope.clone(), patroni_nodes); + let patroni_monitor = Arc::new(patroni::PatroniMonitor::new(patroni_client, 10)); + + // Warte bis Patroni Cluster bereit ist + log_info!("main", "Waiting for Patroni cluster to be ready..."); + if let Err(e) = patroni_monitor.wait_for_cluster_ready(120).await { + log_warn!("main", &format!("Patroni cluster not ready: {}", e)); + } else { + log_info!("main", "Patroni cluster is ready and healthy"); + } + + // Starte Patroni Monitoring Loop (in eigenem Task) + let monitor_handle = { + let monitor = patroni_monitor.clone(); + tokio::spawn(async move { + monitor.start_monitoring().await; + }) + }; + // Erstelle Test-Volumes wenn Leader (nach kurzer Wartezeit) tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; @@ -49,12 +127,16 @@ async fn main() -> anyhow::Result<()> { log_info!("main", "Node is follower, waiting for leader"); } - log_info!("main", "Volume Manager initialized successfully"); + log_info!( + "main", + "Volume Manager with Patroni HA initialized successfully" + ); // Hauptschleife let mut heartbeat_interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); let mut health_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); let mut operations_interval = tokio::time::interval(tokio::time::Duration::from_secs(35)); + let mut patroni_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(15)); let mut election_interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); loop { @@ -97,7 +179,7 @@ async fn main() -> anyhow::Result<()> { // Nur Leader führt Failover durch if leader_election.is_leader() { - perform_failover(&state_manager, &summary.nodes).await; + perform_failover(&state_manager, &summary.nodes, &ceph_manager).await; } } } @@ -105,10 +187,61 @@ async fn main() -> anyhow::Result<()> { } } + // Patroni Check: Überwache PostgreSQL HA Status + _ = patroni_check_interval.tick() => { + if leader_election.is_leader() { + match patroni_monitor.get_primary().await { + Ok(Some(primary)) => { + log_info!("main", &format!("PostgreSQL Primary: {}", primary.name)); + + // Prüfe Replicas + match patroni_monitor.get_replicas().await { + Ok(replicas) => { + log_info!("main", &format!("PostgreSQL Replicas: {}", replicas.len())); + for replica in replicas { + let lag_info = if let Some(lag) = replica.lag { + format!(" (Lag: {}KB)", lag / 1024) + } else { + String::new() + }; + log_debug!("main", &format!(" - {}{}", replica.name, lag_info)); + } + } + Err(e) => log_error!("main", &format!("Failed to get replicas: {}", e)), + } + } + Ok(None) => { + log_error!("main", "NO PRIMARY FOUND! Patroni failover in progress?"); + } + Err(e) => { + log_error!("main", &format!("Failed to get primary: {}", e)); + } + } + + // Prüfe ob Cluster healthy ist + if !patroni_monitor.is_cluster_healthy().await { + log_warn!("main", "Patroni cluster is not healthy!"); + + // Hier könnte man zusätzliche Recovery-Aktionen triggern + if let Some(ceph) = &ceph_manager { + log_info!("main", "Checking Ceph storage health for recovery..."); + match ceph.client.health_status().await { + Ok(health) => { + log_info!("main", &format!("Ceph Health: {:?}", health.status)); + } + Err(e) => { + log_error!("main", &format!("Ceph health check failed: {}", e)); + } + } + } + } + } + } + // Volume Operations: Nur Leader führt diese aus _ = operations_interval.tick() => { if leader_election.is_leader() { - log_info!("main", "[LEADER] Managing storage volumes..."); + log_info!("main", "Managing storage volumes..."); // Liste alle Volumes match state_manager.list_volumes().await { @@ -125,7 +258,7 @@ async fn main() -> anyhow::Result<()> { log_info!("main", "- Processing snapshot requests"); log_info!("main", "- Verifying encryption status"); } else { - log_info!("main", "[FOLLOWER] Standby mode - waiting for leader instructions"); + log_info!("main", "Standby mode - waiting for leader instructions"); // Follower kann Leader abfragen if let Ok(Some(leader)) = leader_election.get_leader().await { @@ -141,6 +274,7 @@ async fn main() -> anyhow::Result<()> { async fn perform_failover( state_manager: &Arc, health_statuses: &[etcd::ha::NodeHealthStatus], + ceph_manager: &Option>, ) { log_info!("main", "Initiating failover procedure..."); @@ -158,14 +292,78 @@ async fn perform_failover( ); } - // Hier würde man Volumes von diesem Node migrieren + // Volume Migration (für User-Volumes, nicht PostgreSQL) + // PostgreSQL Failover wird von Patroni automatisch gehandelt! log_info!( "main", - &format!("Initiating volume migration from node {}", status.node_id) + &format!( + "Initiating user volume migration from node {}", + status.node_id + ) ); - // TODO: Implementiere Volume Migration + + if let Some(ceph) = ceph_manager { + // Liste alle Volumes die auf dem toten Node waren + match state_manager.list_volumes().await { + Ok(volumes) => { + let node_volumes: Vec<_> = volumes + .iter() + .filter(|v| v.node_id.as_ref() == Some(&status.node_id)) + .collect(); + + if !node_volumes.is_empty() { + log_info!( + "main", + &format!( + "Found {} volumes to migrate from {}", + node_volumes.len(), + status.node_id + ) + ); + + for volume in node_volumes { + log_info!( + "main", + &format!( + "Migrating volume: {} ({}GB)", + volume.name, volume.size_gb + ) + ); + + // Hier würde Volume-Migration implementiert werden: + // 1. Unmap von toter Node (Ceph RBD exclusive-lock release) + // 2. Map zu gesunder Node + // 3. Volume-Status in etcd aktualisieren + + // Für jetzt nur loggen + log_info!( + "main", + &format!("Volume {} ready for remount (Ceph ensures data persistence)", volume.name) + ); + } + } else { + log_info!( + "main", + &format!("No volumes found on node {}", status.node_id) + ); + } + } + Err(e) => { + log_error!("main", &format!("Failed to list volumes: {}", e)); + } + } + } else { + log_warn!( + "main", + "Ceph manager not available, skipping volume migration" + ); + } } } - log_info!("main", "Failover procedure completed successfully"); + log_info!("main", "Failover procedure completed"); + log_info!( + "main", + "Note: PostgreSQL failover is handled automatically by Patroni" + ); } diff --git a/control-plane/volume-manager/src/patroni/client.rs b/control-plane/volume-manager/src/patroni/client.rs new file mode 100644 index 0000000..e994a4c --- /dev/null +++ b/control-plane/volume-manager/src/patroni/client.rs @@ -0,0 +1,245 @@ +use super::types::*; +use anyhow::{Context, Result}; +use reqwest::Client; +use std::time::Duration; + +pub struct PatroniClient { + client: Client, + scope: String, + nodes: Vec, // API URLs like "http://patroni1:8008" +} + +impl PatroniClient { + pub fn new(scope: String, nodes: Vec) -> Self { + let client = Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .unwrap(); + + Self { + client, + scope, + nodes, + } + } + + /// Holt den Cluster-Status von allen Patroni Nodes + pub async fn get_cluster_status(&self) -> Result { + crate::log_debug!("patroni", "Fetching cluster status"); + + let mut members = Vec::new(); + let mut leader = None; + + for node_url in &self.nodes { + match self.get_node_health(node_url).await { + Ok(health) => { + let role = PostgresNodeRole::from(health.role.as_str()); + + if role == PostgresNodeRole::Primary { + leader = Some(self.extract_node_name(node_url)); + } + + members.push(PatroniNode { + name: self.extract_node_name(node_url), + role, + state: PatroniState::from(health.state.as_str()), + api_url: node_url.clone(), + postgres_url: self.build_postgres_url(node_url), + timeline: health.timeline, + lag: None, // Wird später von Cluster-API gefüllt + }); + } + Err(e) => { + crate::log_warn!( + "patroni", + &format!("Failed to get health from {}: {}", node_url, e) + ); + + members.push(PatroniNode { + name: self.extract_node_name(node_url), + role: PostgresNodeRole::Unknown, + state: PatroniState::Failed, + api_url: node_url.clone(), + postgres_url: self.build_postgres_url(node_url), + timeline: None, + lag: None, + }); + } + } + } + + Ok(PatroniCluster { + scope: self.scope.clone(), + members, + leader, + failover_in_progress: false, + }) + } + + /// Holt Health-Info von einem einzelnen Node + async fn get_node_health(&self, node_url: &str) -> Result { + let url = format!("{}/health", node_url); + + let response = self + .client + .get(&url) + .send() + .await + .context("Failed to send health request")?; + + let health: PatroniHealth = response + .json() + .await + .context("Failed to parse health response")?; + + crate::log_debug!( + "patroni", + &format!( + "Fetched health from {}: role={}, state={}", + node_url, health.role, health.state + ) + ); + + Ok(health) + } + + /// Prüft ob ein Node der Primary ist + pub async fn is_primary(&self, node_url: &str) -> Result { + let url = format!("{}/primary", node_url); + + let response = self.client.get(&url).send().await?; + + let is_primary = response.status().is_success(); + crate::log_debug!( + "patroni", + &format!("Node {} is primary: {}", node_url, is_primary) + ); + + Ok(is_primary) + } + + /// Prüft ob ein Node ein Replica ist + pub async fn is_replica(&self, node_url: &str) -> Result { + let url = format!("{}/replica", node_url); + + let response = self.client.get(&url).send().await?; + + let is_replica = response.status().is_success(); + crate::log_debug!( + "patroni", + &format!("Node {} is replica: {}", node_url, is_replica) + ); + + Ok(is_replica) + } + + /// Findet die aktuelle Primary Node + pub async fn find_primary(&self) -> Result> { + let cluster = self.get_cluster_status().await?; + + let primary = cluster + .members + .into_iter() + .find(|m| m.role == PostgresNodeRole::Primary); + + if let Some(ref p) = primary { + crate::log_info!("patroni", &format!("Found primary node: {}", p.name)); + } else { + crate::log_warn!("patroni", "No primary node found in cluster"); + } + + Ok(primary) + } + + /// Holt alle Replica Nodes + pub async fn find_replicas(&self) -> Result> { + let cluster = self.get_cluster_status().await?; + + let replicas: Vec = cluster + .members + .into_iter() + .filter(|m| m.role == PostgresNodeRole::Replica) + .collect(); + + crate::log_info!( + "patroni", + &format!("Found {} replica nodes", replicas.len()) + ); + + Ok(replicas) + } + + /// Triggert ein manuelles Failover (NUR FÜR TESTING!) + pub async fn trigger_failover(&self, candidate: Option<&str>) -> Result<()> { + crate::log_warn!( + "patroni", + &format!("Triggering manual failover to {:?}", candidate) + ); + + // Finde Primary + let primary = self.find_primary().await?.context("No primary found")?; + + let url = format!("{}/failover", primary.api_url); + + let mut body = serde_json::json!({ + "leader": self.extract_node_name(&primary.api_url), + }); + + if let Some(candidate) = candidate { + body["candidate"] = serde_json::json!(candidate); + } + + self.client + .post(&url) + .json(&body) + .send() + .await + .context("Failed to trigger failover")?; + + crate::log_info!("patroni", "Failover triggered successfully"); + Ok(()) + } + + /// Extrahiert Node-Namen aus URL + fn extract_node_name(&self, url: &str) -> String { + url.split("://") + .nth(1) + .and_then(|s| s.split(':').next()) + .unwrap_or("unknown") + .to_string() + } + + /// Baut PostgreSQL Connection URL + fn build_postgres_url(&self, api_url: &str) -> String { + let host = self.extract_node_name(api_url); + format!("postgresql://{}:5432", host) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_node_name() { + let client = + PatroniClient::new("test".to_string(), vec!["http://patroni1:8008".to_string()]); + + assert_eq!(client.extract_node_name("http://patroni1:8008"), "patroni1"); + assert_eq!( + client.extract_node_name("http://192.168.1.100:8008"), + "192.168.1.100" + ); + } + + #[test] + fn test_build_postgres_url() { + let client = + PatroniClient::new("test".to_string(), vec!["http://patroni1:8008".to_string()]); + + assert_eq!( + client.build_postgres_url("http://patroni1:8008"), + "postgresql://patroni1:5432" + ); + } +} diff --git a/control-plane/volume-manager/src/patroni/mod.rs b/control-plane/volume-manager/src/patroni/mod.rs new file mode 100644 index 0000000..665b95a --- /dev/null +++ b/control-plane/volume-manager/src/patroni/mod.rs @@ -0,0 +1,6 @@ +pub mod client; +pub mod monitor; +pub mod types; + +pub use client::PatroniClient; +pub use monitor::PatroniMonitor; diff --git a/control-plane/volume-manager/src/patroni/monitor.rs b/control-plane/volume-manager/src/patroni/monitor.rs new file mode 100644 index 0000000..fe92e2d --- /dev/null +++ b/control-plane/volume-manager/src/patroni/monitor.rs @@ -0,0 +1,204 @@ +use super::client::PatroniClient; +use super::types::*; +use anyhow::Result; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::interval; + +/// Patroni Cluster Monitor +/// Überwacht kontinuierlich den PostgreSQL-Cluster Status +pub struct PatroniMonitor { + client: Arc, + check_interval: Duration, +} + +impl PatroniMonitor { + pub fn new(client: PatroniClient, check_interval_secs: u64) -> Self { + Self { + client: Arc::new(client), + check_interval: Duration::from_secs(check_interval_secs), + } + } + + /// Startet den Monitoring-Loop (läuft in eigenem Task) + pub async fn start_monitoring(self: Arc) { + crate::log_info!("patroni_monitor", "Starting Patroni cluster monitoring"); + + let mut check_interval = interval(self.check_interval); + + loop { + check_interval.tick().await; + + match self.check_cluster_health().await { + Ok(status) => { + self.log_cluster_status(&status); + + // Prüfe auf Probleme + if status.leader.is_none() { + crate::log_error!( + "patroni_monitor", + "NO PRIMARY LEADER! Cluster in failover mode!" + ); + } + + let unhealthy_count = status + .members + .iter() + .filter(|m| m.state != PatroniState::Running) + .count(); + + if unhealthy_count > 0 { + crate::log_warn!( + "patroni_monitor", + &format!("{} nodes unhealthy", unhealthy_count) + ); + } + } + Err(e) => { + crate::log_error!( + "patroni_monitor", + &format!("Failed to check cluster health: {}", e) + ); + } + } + } + } + + /// Prüft Cluster-Health + async fn check_cluster_health(&self) -> Result { + self.client.get_cluster_status().await + } + + /// Loggt Cluster-Status übersichtlich + fn log_cluster_status(&self, cluster: &PatroniCluster) { + crate::log_info!( + "patroni_monitor", + &format!( + "Cluster '{}': Leader={:?}, Members={}", + cluster.scope, + cluster.leader, + cluster.members.len() + ) + ); + + for member in &cluster.members { + let lag_info = if let Some(lag) = member.lag { + if lag > 1024 * 1024 { + // > 1MB lag + format!(" (LAG: {:.2}MB)", lag as f64 / 1024.0 / 1024.0) + } else if lag > 0 { + format!(" (LAG: {}KB)", lag / 1024) + } else { + String::new() + } + } else { + String::new() + }; + + crate::log_debug!( + "patroni_monitor", + &format!(" {} {:?}{}", member.name, member.state, lag_info) + ); + } + } + + /// Wartet bis Cluster bereit ist (Primary + mindestens 1 Replica) + pub async fn wait_for_cluster_ready(&self, timeout_secs: u64) -> Result<()> { + crate::log_info!( + "patroni_monitor", + &format!( + "Waiting for cluster to be ready (timeout: {}s)", + timeout_secs + ) + ); + + let start = std::time::Instant::now(); + let timeout = Duration::from_secs(timeout_secs); + + loop { + if start.elapsed() > timeout { + anyhow::bail!("Timeout waiting for cluster to be ready"); + } + + match self.client.get_cluster_status().await { + Ok(cluster) => { + let has_primary = cluster.leader.is_some(); + let running_members = cluster + .members + .iter() + .filter(|m| m.state == PatroniState::Running) + .count(); + + if has_primary && running_members >= 2 { + crate::log_info!( + "patroni_monitor", + &format!( + "Cluster ready! Primary={:?}, Running members={}", + cluster.leader, running_members + ) + ); + return Ok(()); + } + + crate::log_debug!( + "patroni_monitor", + &format!( + "Cluster not ready: Primary={}, Running={}", + has_primary, running_members + ) + ); + } + Err(e) => { + crate::log_debug!("patroni_monitor", &format!("Cluster check failed: {}", e)); + } + } + + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + + /// Holt aktuelle Primary Node + pub async fn get_primary(&self) -> Result> { + self.client.find_primary().await + } + + /// Holt alle Replica Nodes + pub async fn get_replicas(&self) -> Result> { + self.client.find_replicas().await + } + + /// Prüft ob Cluster healthy ist + pub async fn is_cluster_healthy(&self) -> bool { + match self.client.get_cluster_status().await { + Ok(cluster) => { + let has_primary = cluster.leader.is_some(); + let all_running = cluster + .members + .iter() + .all(|m| m.state == PatroniState::Running); + + has_primary && all_running + } + Err(_) => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_monitor_creation() { + let client = PatroniClient::new( + "test-scope".to_string(), + vec![ + "http://patroni1:8008".to_string(), + "http://patroni2:8008".to_string(), + ], + ); + + let monitor = PatroniMonitor::new(client, 10); + assert_eq!(monitor.check_interval, Duration::from_secs(10)); + } +} diff --git a/control-plane/volume-manager/src/patroni/types.rs b/control-plane/volume-manager/src/patroni/types.rs new file mode 100644 index 0000000..196470a --- /dev/null +++ b/control-plane/volume-manager/src/patroni/types.rs @@ -0,0 +1,92 @@ +use serde::{Deserialize, Serialize}; + +/// PostgreSQL Node Role in Patroni Cluster +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum PostgresNodeRole { + Primary, + Replica, + Standby, + Unknown, +} + +/// Patroni Node Status +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatroniNode { + pub name: String, + pub role: PostgresNodeRole, + pub state: PatroniState, + pub api_url: String, + pub postgres_url: String, + pub timeline: Option, + pub lag: Option, // Replication lag in bytes +} + +/// Patroni Cluster State +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum PatroniState { + Running, + Starting, + Stopped, + Failed, + Unknown, +} + +/// Patroni Cluster Info +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatroniCluster { + pub scope: String, + pub members: Vec, + pub leader: Option, + pub failover_in_progress: bool, +} + +/// Patroni Health Response (from REST API) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatroniHealth { + pub state: String, + pub role: String, + pub server_version: Option, + pub cluster_unlocked: Option, + pub timeline: Option, +} + +/// Patroni Cluster Topology (from /cluster endpoint) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatroniClusterInfo { + pub members: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PatroniMemberInfo { + pub name: String, + pub role: String, + pub state: String, + pub api_url: String, + pub host: String, + pub port: u16, + pub timeline: Option, + pub lag: Option, +} + +impl From<&str> for PostgresNodeRole { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "master" | "primary" | "leader" => PostgresNodeRole::Primary, + "replica" | "standby_leader" => PostgresNodeRole::Replica, + "standby" => PostgresNodeRole::Standby, + _ => PostgresNodeRole::Unknown, + } + } +} + +impl From<&str> for PatroniState { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "running" => PatroniState::Running, + "starting" => PatroniState::Starting, + "stopped" => PatroniState::Stopped, + "failed" => PatroniState::Failed, + _ => PatroniState::Unknown, + } + } +} diff --git a/control-plane/volume-manager/test-ha.sh b/control-plane/volume-manager/test-ha.sh deleted file mode 100755 index f0b803b..0000000 --- a/control-plane/volume-manager/test-ha.sh +++ /dev/null @@ -1,378 +0,0 @@ -#!/bin/bash - -# Test Script für HA, Leader Election und Failover -# Verwendung: ./test-ha.sh - -set -e - -# Setze ETCDCTL API Version -export ETCDCTL_API=3 - -COLOR_RESET='\033[0m' -COLOR_GREEN='\033[0;32m' -COLOR_BLUE='\033[0;34m' -COLOR_YELLOW='\033[1;33m' -COLOR_RED='\033[0;31m' -COLOR_CYAN='\033[0;36m' - -log() { - echo -e "${COLOR_BLUE}[$(date +'%H:%M:%S')]${COLOR_RESET} $1" -} - -success() { - echo -e "${COLOR_GREEN}✅ $1${COLOR_RESET}" -} - -info() { - echo -e "${COLOR_CYAN}ℹ️ $1${COLOR_RESET}" -} - -warning() { - echo -e "${COLOR_YELLOW}⚠️ $1${COLOR_RESET}" -} - -error() { - echo -e "${COLOR_RED}❌ $1${COLOR_RESET}" -} - -header() { - echo -e "\n${COLOR_GREEN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${COLOR_RESET}" - echo -e "${COLOR_GREEN} $1${COLOR_RESET}" - echo -e "${COLOR_GREEN}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${COLOR_RESET}\n" -} - -# Prüfe ob etcdctl installiert ist -check_etcdctl() { - if ! command -v etcdctl &> /dev/null; then - warning "etcdctl nicht gefunden. Installiere mit:" - echo " brew install etcd # macOS" - echo " apt install etcd-client # Ubuntu" - return 1 - fi - return 0 -} - -# Zeige etcd Cluster Status -show_etcd_status() { - header "etcd Cluster Status" - if check_etcdctl; then - etcdctl --endpoints=localhost:2379 member list - echo "" - etcdctl --endpoints=localhost:2379 endpoint health - fi -} - -# Zeige alle Nodes in etcd -show_nodes() { - header "Registrierte Nodes" - if check_etcdctl; then - echo "Nodes im Cluster:" - etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix --keys-only | grep -v "^$" || echo "Keine Nodes gefunden" - echo "" - echo "Node Details:" - etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix | grep -v "^$" | jq '.' 2>/dev/null || etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix - fi -} - -# Zeige aktuellen Leader -show_leader() { - header "Leader Election Status" - if check_etcdctl; then - echo "Aktueller Leader:" - # Der Leader wird direkt als String unter /csf/volume-manager/election/leader gespeichert - LEADER=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) - if [ -n "$LEADER" ]; then - echo -e "${COLOR_GREEN}👑 $LEADER${COLOR_RESET}" - - # Zeige zusätzliche Node-Details - echo "" - echo "Node Details:" - NODE_DATA=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/$LEADER --print-value-only 2>/dev/null) - if [ -n "$NODE_DATA" ]; then - echo "$NODE_DATA" | jq '.' - else - echo " Node-Daten nicht verfügbar" - fi - else - echo "Kein Leader gewählt" - fi - fi -} - -# Zeige Volume States -show_volumes() { - header "Volume States" - if check_etcdctl; then - echo "Volumes im Cluster:" - etcdctl --endpoints=localhost:2379 get /csf/volume-manager/volumes/ --prefix --keys-only | grep -v "^$" || echo "Keine Volumes gefunden" - echo "" - echo "Volume Details:" - etcdctl --endpoints=localhost:2379 get /csf/volume-manager/volumes/ --prefix | grep -v "^$" | jq '.' 2>/dev/null || echo "Keine Volumes" - fi -} - -# Zeige Container Logs -show_logs() { - local NODE=$1 - header "Logs von $NODE" - docker logs --tail 20 $NODE -} - -# Stop einen Node (simuliert Failover) -stop_node() { - local NODE=$1 - header "Stoppe Node: $NODE" - docker stop $NODE - success "Node $NODE gestoppt" - info "Warte 5 Sekunden für Failover..." - sleep 5 -} - -# Start einen Node -start_node() { - local NODE=$1 - header "Starte Node: $NODE" - docker start $NODE - success "Node $NODE gestartet" - info "Warte 5 Sekunden für Initialisierung..." - sleep 5 -} - -# Zeige Container Status -show_container_status() { - header "Docker Container Status" - docker ps -a --filter "name=volume-manager" --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}" -} - -# Überwache Cluster in Echtzeit -monitor() { - header "Cluster Monitoring (Strg+C zum Beenden)" - while true; do - clear - echo -e "${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}" - echo -e "${COLOR_CYAN} Volume Manager Cluster - Live Monitor${COLOR_RESET}" - echo -e "${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}" - echo "" - - # Container Status - echo -e "${COLOR_YELLOW}📦 Container Status:${COLOR_RESET}" - docker ps --filter "name=volume-manager" --format " {{.Names}}: {{.Status}}" | sed 's/Up /✅ /' | sed 's/Exited /❌ /' - echo "" - - # Leader - if check_etcdctl; then - LEADER=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) - if [ -n "$LEADER" ]; then - echo -e "${COLOR_YELLOW}👑 Aktueller Leader:${COLOR_RESET} ${COLOR_GREEN}$LEADER${COLOR_RESET}" - else - echo -e "${COLOR_YELLOW}👑 Aktueller Leader:${COLOR_RESET} ${COLOR_RED}Kein Leader${COLOR_RESET}" - fi - echo "" - - # Nodes - echo -e "${COLOR_YELLOW}🖥️ Registrierte Nodes:${COLOR_RESET}" - etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix 2>/dev/null | \ - jq -r 'select(.node_id != null) | " \(.node_id): \(.status) (\(.role))"' 2>/dev/null || echo " Keine Nodes" - echo "" - fi - - echo -e "${COLOR_CYAN}───────────────────────────────────────────────────────────────${COLOR_RESET}" - echo "Aktualisiert: $(date +'%H:%M:%S') | Drücke Strg+C zum Beenden" - - sleep 3 - done -} - -# Führe Failover-Test durch -test_failover() { - header "Failover Test starten" - - info "1. Zeige initialen Cluster-Status" - show_container_status - sleep 2 - - show_leader - sleep 2 - - info "2. Stoppe aktuellen Leader" - if check_etcdctl; then - LEADER=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) - if [ -n "$LEADER" ]; then - # Leader ID ist der Node-Name, aber Container Name könnte anders sein - CONTAINER_NAME=$(docker ps --filter "name=$LEADER" --format "{{.Names}}" | head -n1) - if [ -n "$CONTAINER_NAME" ]; then - stop_node "$CONTAINER_NAME" - else - stop_node "$LEADER" - fi - else - warning "Kein Leader gefunden, stoppe volume-manager-1" - stop_node "volume-manager-1" - fi - else - stop_node "volume-manager-1" - fi - - info "3. Prüfe neuen Leader" - show_leader - sleep 2 - - show_nodes - sleep 2 - - info "4. Starte gestoppten Node wieder" - if [ -n "$LEADER" ] && [ "$LEADER" != "Kein Leader" ]; then - start_node "$LEADER" - else - start_node "volume-manager-1" - fi - - info "5. Finaler Cluster-Status" - show_container_status - sleep 2 - show_leader - - success "Failover Test abgeschlossen!" -} - -# Hauptmenü -show_menu() { - echo -e "\n${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}" - echo -e "${COLOR_CYAN} Volume Manager HA Test Suite${COLOR_RESET}" - echo -e "${COLOR_CYAN}═══════════════════════════════════════════════════════════════${COLOR_RESET}\n" - echo " 1) Start Cluster (docker-compose up)" - echo " 2) Stop Cluster (docker-compose down)" - echo " 3) Zeige Container Status" - echo " 4) Zeige etcd Cluster Status" - echo " 5) Zeige registrierte Nodes" - echo " 6) Zeige aktuellen Leader" - echo " 7) Zeige Volumes" - echo " 8) Zeige Logs (Node auswählen)" - echo " 9) Stop Node (Failover simulieren)" - echo " 10) Start Node" - echo " 11) Failover Test automatisch" - echo " 12) Live Monitor starten" - echo " 13) Cleanup etcd Daten" - echo " 14) Restart Container" - echo " 0) Beenden" - echo "" - echo -n "Wähle eine Option: " -} - -# Start Cluster -start_cluster() { - header "Starte Cluster" - docker-compose -f docker-compose.test.yml up -d - success "Cluster gestartet" - info "Warte 10 Sekunden für Initialisierung..." - sleep 10 -} - -# Stop Cluster -stop_cluster() { - header "Stoppe Cluster" - docker-compose -f docker-compose.test.yml down - success "Cluster gestoppt" -} - -# Clean etcd data -clean_etcd() { - header "Cleanup etcd Daten" - if check_etcdctl; then - log "Lösche alle Keys unter /csf/volume-manager/..." - etcdctl --endpoints=localhost:2379 del /csf/volume-manager/ --prefix 2>/dev/null || true - success "etcd Daten gelöscht" - - warning "Bitte starte die Volume Manager Container neu:" - echo " docker-compose -f docker-compose.test.yml restart" - else - error "etcdctl nicht verfügbar" - fi -} - -# Node auswählen -select_node() { - echo "" - echo "Verfügbare Nodes:" - echo " 1) volume-manager-1" - echo " 2) volume-manager-2" - echo " 3) volume-manager-3" - echo -n "Wähle Node: " - read NODE_NUM - case $NODE_NUM in - 1) echo "volume-manager-1" ;; - 2) echo "volume-manager-2" ;; - 3) echo "volume-manager-3" ;; - *) echo "" ;; - esac -} - -# Hauptprogramm -main() { - if [ "$1" == "monitor" ]; then - monitor - exit 0 - fi - - if [ "$1" == "test" ]; then - test_failover - exit 0 - fi - - while true; do - show_menu - read OPTION - - case $OPTION in - 1) start_cluster ;; - 2) stop_cluster ;; - 3) show_container_status ;; - 4) show_etcd_status ;; - 5) show_nodes ;; - 6) show_leader ;; - 7) show_volumes ;; - 8) - NODE=$(select_node) - if [ -n "$NODE" ]; then - show_logs "$NODE" - fi - ;; - 9) - NODE=$(select_node) - if [ -n "$NODE" ]; then - stop_node "$NODE" - fi - ;; - 10) - NODE=$(select_node) - if [ -n "$NODE" ]; then - start_node "$NODE" - fi - ;; - 13) clean_etcd ;; - 14) - header "Restart Container" - docker-compose -f docker-compose.test.yml restart - success "Container neu gestartet" - info "Warte 10 Sekunden..." - sleep 10 - ;; - 11) test_failover ;; - 12) monitor ;; - 0) - log "Auf Wiedersehen!" - exit 0 - ;; - *) - error "Ungültige Option" - ;; - esac - - echo "" - echo -n "Drücke Enter um fortzufahren..." - read - done -} - -# Starte -main "$@" diff --git a/control-plane/volume-manager/test-hybrid-system.sh b/control-plane/volume-manager/test-hybrid-system.sh new file mode 100755 index 0000000..2a9e5a6 --- /dev/null +++ b/control-plane/volume-manager/test-hybrid-system.sh @@ -0,0 +1,696 @@ +#!/bin/bash + +# 🧪 Umfassendes Test-Suite für das Hybridsystem +# Testet: etcd + Ceph + PostgreSQL/Patroni + Volume Manager + +set -e + +# Export etcd API version +export ETCDCTL_API=3 + +# 🎨 Colors +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +BLUE='\033[0;34m' +CYAN='\033[0;36m' +MAGENTA='\033[0;35m' +NC='\033[0m' # No Color + +# 📝 Logging functions +log_header() { + echo -e "\n${BLUE}═══════════════════════════════════════════════${NC}" + echo -e "${BLUE} $1${NC}" + echo -e "${BLUE}═══════════════════════════════════════════════${NC}\n" +} + +log_info() { + echo -e "${CYAN}ℹ️ $1${NC}" +} + +log_success() { + echo -e "${GREEN}✅ $1${NC}" +} + +log_warn() { + echo -e "${YELLOW}⚠️ $1${NC}" +} + +log_error() { + echo -e "${RED}❌ $1${NC}" +} + +log_step() { + echo -e "${MAGENTA}▶ $1${NC}" +} + +# 🔍 Check prerequisites +check_prerequisites() { + log_header "Checking Prerequisites" + + local all_ok=true + + # Check etcdctl + if command -v etcdctl &> /dev/null; then + log_success "etcdctl installed" + else + log_error "etcdctl not found - install with: brew install etcd" + all_ok=false + fi + + # Check docker + if command -v docker &> /dev/null; then + log_success "docker installed" + else + log_error "docker not found" + all_ok=false + fi + + # Check docker-compose + if docker compose version &> /dev/null; then + log_success "docker compose available" + else + log_error "docker compose not found" + all_ok=false + fi + + # Check curl + if command -v curl &> /dev/null; then + log_success "curl installed" + else + log_error "curl not found" + all_ok=false + fi + + # Check jq + if command -v jq &> /dev/null; then + log_success "jq installed" + else + log_warn "jq not found (optional) - install with: brew install jq" + fi + + if [ "$all_ok" = false ]; then + log_error "Please install missing prerequisites" + exit 1 + fi + + echo "" +} + +# 🏥 Component health checks +check_etcd_health() { + log_step "Checking etcd cluster..." + + if etcdctl --endpoints=localhost:2379 endpoint health &>/dev/null; then + local member_count=$(etcdctl --endpoints=localhost:2379 member list 2>/dev/null | wc -l) + log_success "etcd cluster healthy ($member_count members)" + return 0 + else + log_error "etcd cluster unhealthy" + return 1 + fi +} + +check_ceph_health() { + log_step "Checking Ceph cluster..." + + if docker exec ceph-mon1 ceph health 2>/dev/null | grep -q "HEALTH_OK\|HEALTH_WARN"; then + local health=$(docker exec ceph-mon1 ceph health 2>/dev/null | awk '{print $1}') + if [ "$health" == "HEALTH_OK" ]; then + log_success "Ceph cluster: $health" + else + log_warn "Ceph cluster: $health (may be degraded)" + fi + return 0 + else + log_error "Ceph cluster unhealthy or not accessible" + return 1 + fi +} + +check_patroni_health() { + log_step "Checking Patroni cluster..." + + local primary_found=false + local replica_count=0 + + for port in 8008 8009 8010; do + if role=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '.role' 2>/dev/null); then + if [ "$role" == "master" ] || [ "$role" == "primary" ]; then + primary_found=true + log_success "Patroni primary found on port $port" + elif [ "$role" == "replica" ]; then + ((replica_count++)) + fi + fi + done + + if [ "$primary_found" = true ]; then + log_success "Patroni cluster: 1 primary + $replica_count replicas" + return 0 + else + log_error "No Patroni primary found" + return 1 + fi +} + +check_volume_manager_health() { + log_step "Checking Volume Manager..." + + local leader=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) + + if [ -n "$leader" ]; then + log_success "Volume Manager leader: $leader" + + # Count nodes + local node_count=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/nodes/ --prefix --keys-only 2>/dev/null | grep -c "/csf/volume-manager/nodes/" || echo "0") + log_info "Registered nodes: $node_count" + return 0 + else + log_error "No Volume Manager leader elected" + return 1 + fi +} + +# 🧪 Test 1: Complete System Status +test_system_status() { + log_header "Test 1: Complete System Status" + + echo -e "${YELLOW}🗄️ Database Layer:${NC}" + check_patroni_health + echo "" + + echo -e "${YELLOW}💾 Storage Layer:${NC}" + check_ceph_health + echo "" + + echo -e "${YELLOW}🔑 Coordination Layer:${NC}" + check_etcd_health + echo "" + + echo -e "${YELLOW}🎛️ Control Plane:${NC}" + check_volume_manager_health + echo "" + + echo -e "${YELLOW}🐳 Docker Services:${NC}" + docker-compose -f docker-compose.patroni.yml ps + echo "" +} + +# 🧪 Test 2: Data Replication Test +test_data_replication() { + log_header "Test 2: PostgreSQL Data Replication" + + local test_data="hybrid_test_$(date +%s)" + + log_step "Creating test table..." + docker exec patroni1 psql -U csf -d csf_core -c \ + "CREATE TABLE IF NOT EXISTS hybrid_test (id SERIAL PRIMARY KEY, data TEXT, created_at TIMESTAMP DEFAULT NOW());" &>/dev/null + + log_step "Writing test data to primary..." + docker exec patroni1 psql -U csf -d csf_core -c \ + "INSERT INTO hybrid_test (data) VALUES ('$test_data');" &>/dev/null + + # Wait for replication + sleep 2 + + log_step "Verifying data on replica..." + local result=$(docker exec patroni2 psql -U csf -d csf_core -t -c \ + "SELECT data FROM hybrid_test WHERE data='$test_data';" 2>/dev/null | xargs) + + if [ "$result" == "$test_data" ]; then + log_success "Data successfully replicated to all nodes!" + + # Verify via HAProxy + log_step "Verifying access via HAProxy..." + if docker exec postgres-haproxy nc -zv localhost 5000 &>/dev/null; then + log_success "HAProxy routing working" + else + log_warn "HAProxy connectivity issue" + fi + else + log_error "Data replication failed" + return 1 + fi + + echo "" +} + +# 🧪 Test 3: PostgreSQL Failover +test_postgres_failover() { + log_header "Test 3: PostgreSQL Primary Failover" + + # Find current primary + local primary="" + for port in 8008 8009 8010; do + role=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '.role' 2>/dev/null) + if [ "$role" == "master" ] || [ "$role" == "primary" ]; then + case $port in + 8008) primary="patroni1" ;; + 8009) primary="patroni2" ;; + 8010) primary="patroni3" ;; + esac + break + fi + done + + if [ -z "$primary" ]; then + log_error "No primary found" + return 1 + fi + + log_info "Current primary: $primary" + echo "" + + read -p "$(echo -e ${YELLOW}Stop $primary to trigger failover? [y/N]: ${NC})" -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + log_info "Skipped" + return 0 + fi + + log_step "Stopping $primary..." + docker-compose -f docker-compose.patroni.yml stop $primary &>/dev/null + + log_step "Waiting for automatic failover..." + local failover_start=$(date +%s) + + for i in {1..30}; do + sleep 1 + + # Check for new primary + for port in 8008 8009 8010; do + role=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '.role' 2>/dev/null) + if [ "$role" == "master" ] || [ "$role" == "primary" ]; then + case $port in + 8008) new_primary="patroni1" ;; + 8009) new_primary="patroni2" ;; + 8010) new_primary="patroni3" ;; + esac + + if [ "$new_primary" != "$primary" ]; then + local failover_time=$(($(date +%s) - failover_start)) + echo "" + log_success "Failover completed in ${failover_time}s!" + log_info "New primary: $new_primary" + + # Test connectivity + sleep 2 + if docker exec $new_primary psql -U csf -d csf_core -c "SELECT 1;" &>/dev/null; then + log_success "New primary accepting connections" + fi + + echo "" + read -p "$(echo -e ${YELLOW}Restart $primary? [y/N]: ${NC})" -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + log_step "Restarting $primary..." + docker-compose -f docker-compose.patroni.yml start $primary &>/dev/null + log_success "$primary will rejoin as replica" + fi + + return 0 + fi + fi + done + + echo -n "." + done + + echo "" + log_error "Failover timeout (30s exceeded)" + return 1 +} + +# 🧪 Test 4: Ceph OSD Failure +test_ceph_failover() { + log_header "Test 4: Ceph OSD Failure" + + log_info "Current Ceph status:" + docker exec ceph-mon1 ceph -s 2>/dev/null | head -15 + echo "" + + read -p "$(echo -e ${YELLOW}Stop ceph-osd1 to simulate failure? [y/N]: ${NC})" -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + log_info "Skipped" + return 0 + fi + + log_step "Stopping ceph-osd1..." + docker-compose -f docker-compose.patroni.yml stop ceph-osd1 &>/dev/null + + log_step "Waiting for Ceph to detect failure (10s)..." + sleep 10 + + log_info "Ceph status after OSD failure:" + docker exec ceph-mon1 ceph -s 2>/dev/null | head -15 + echo "" + + log_step "Testing PostgreSQL availability..." + if docker exec patroni1 psql -U csf -d csf_core -c "SELECT version();" &>/dev/null; then + log_success "PostgreSQL still fully operational (Ceph has 2 remaining replicas)" + else + log_error "PostgreSQL affected by OSD failure" + fi + + echo "" + read -p "$(echo -e ${YELLOW}Restart ceph-osd1? [y/N]: ${NC})" -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + log_step "Restarting ceph-osd1..." + docker-compose -f docker-compose.patroni.yml start ceph-osd1 &>/dev/null + + log_step "Waiting for OSD recovery (15s)..." + sleep 15 + + log_info "Ceph status after recovery:" + docker exec ceph-mon1 ceph -s 2>/dev/null | head -15 + fi + + echo "" +} + +# 🧪 Test 5: etcd & Volume Manager Failover +test_volume_manager_failover() { + log_header "Test 5: Volume Manager Failover" + + local current_leader=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) + + if [ -z "$current_leader" ]; then + log_error "No leader found" + return 1 + fi + + log_info "Current leader: $current_leader" + echo "" + + read -p "$(echo -e ${YELLOW}Stop $current_leader to trigger re-election? [y/N]: ${NC})" -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + log_info "Skipped" + return 0 + fi + + log_step "Stopping $current_leader..." + docker-compose -f docker-compose.patroni.yml stop $current_leader &>/dev/null + + log_step "Waiting for leader re-election (10s)..." + sleep 10 + + local new_leader=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) + + if [ -n "$new_leader" ] && [ "$new_leader" != "$current_leader" ]; then + log_success "New leader elected: $new_leader" + else + log_error "Leader election failed" + return 1 + fi + + echo "" + read -p "$(echo -e ${YELLOW}Restart $current_leader? [y/N]: ${NC})" -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + log_step "Restarting $current_leader..." + docker-compose -f docker-compose.patroni.yml start $current_leader &>/dev/null + log_success "$current_leader restarted (will run as standby)" + fi + + echo "" +} + +# 🧪 Test 6: End-to-End Integration Test +test_e2e_integration() { + log_header "Test 6: End-to-End Integration" + + log_step "Verifying all layers are working together..." + + # 1. Check etcd + if ! check_etcd_health &>/dev/null; then + log_error "etcd not healthy" + return 1 + fi + log_success "✓ etcd coordination working" + + # 2. Check Ceph + if ! check_ceph_health &>/dev/null; then + log_error "Ceph not healthy" + return 1 + fi + log_success "✓ Ceph storage available" + + # 3. Check Patroni + if ! check_patroni_health &>/dev/null; then + log_error "Patroni not healthy" + return 1 + fi + log_success "✓ Patroni database cluster ready" + + # 4. Check Volume Manager + if ! check_volume_manager_health &>/dev/null; then + log_error "Volume Manager not healthy" + return 1 + fi + log_success "✓ Volume Manager orchestration active" + + # 5. Test data write & read + log_step "Testing complete data flow..." + local test_val="e2e_test_$(date +%s)" + + if docker exec patroni1 psql -U csf -d csf_core -c \ + "CREATE TABLE IF NOT EXISTS e2e_test (val TEXT); INSERT INTO e2e_test VALUES ('$test_val');" &>/dev/null; then + + sleep 2 + local result=$(docker exec patroni2 psql -U csf -d csf_core -t -c \ + "SELECT val FROM e2e_test WHERE val='$test_val';" 2>/dev/null | xargs) + + if [ "$result" == "$test_val" ]; then + log_success "✓ Complete data path verified (Primary → Ceph → Replica)" + else + log_error "Data replication failed" + return 1 + fi + else + log_error "Database write failed" + return 1 + fi + + echo "" + log_success "🎉 All integration tests passed!" + echo "" +} + +# 🧪 Test 7: Performance Metrics +test_performance_metrics() { + log_header "Test 7: Performance Metrics" + + log_step "Measuring system metrics..." + echo "" + + # PostgreSQL connections + local pg_connections=$(docker exec patroni1 psql -U csf -d csf_core -t -c \ + "SELECT count(*) FROM pg_stat_activity;" 2>/dev/null | xargs) + echo -e "${CYAN}PostgreSQL Connections:${NC} $pg_connections" + + # Ceph metrics + log_info "Ceph Cluster Metrics:" + docker exec ceph-mon1 ceph df 2>/dev/null || log_warn "Could not get Ceph metrics" + echo "" + + # etcd metrics + local etcd_keys=$(etcdctl --endpoints=localhost:2379 get "" --prefix --keys-only 2>/dev/null | wc -l) + echo -e "${CYAN}etcd Keys:${NC} $etcd_keys" + echo "" +} + +# 🧪 Test 8: Live Monitoring +test_live_monitoring() { + log_header "Test 8: Live Monitoring" + + echo -e "${YELLOW}Starting live monitoring... (Press Ctrl+C to stop)${NC}" + echo "" + + while true; do + clear + log_header "Hybrid System Live Status - $(date '+%H:%M:%S')" + + # etcd + echo -e "${CYAN}🔑 etcd Leader:${NC}" + etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null || echo "none" + echo "" + + # Ceph + echo -e "${CYAN}💾 Ceph Health:${NC}" + docker exec ceph-mon1 ceph health 2>/dev/null | head -1 + echo "" + + # Patroni + echo -e "${CYAN}🗄️ Patroni Cluster:${NC}" + for port in 8008 8009 8010; do + role=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '.role' 2>/dev/null) + state=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '.state' 2>/dev/null) + case $port in + 8008) node="patroni1" ;; + 8009) node="patroni2" ;; + 8010) node="patroni3" ;; + esac + + if [ "$role" == "master" ] || [ "$role" == "primary" ]; then + echo -e " ${GREEN}👑 $node: $role ($state)${NC}" + elif [ "$role" == "replica" ]; then + echo -e " ${BLUE}🔄 $node: $role ($state)${NC}" + else + echo -e " ${RED}❌ $node: offline${NC}" + fi + done + echo "" + + # Docker services + echo -e "${CYAN}🐳 Container Status:${NC}" + docker-compose -f docker-compose.patroni.yml ps --format "table {{.Name}}\t{{.Status}}" | head -10 + + sleep 3 + done +} + +# 🔥 Test 9: Full Chaos Test +test_chaos() { + log_header "Test 9: Full Chaos Engineering Test" + + log_warn "⚠️ This will simulate multiple failure scenarios!" + echo "" + read -p "$(echo -e ${RED}Are you SURE you want to continue? [y/N]: ${NC})" -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + log_info "Cancelled" + return 0 + fi + + echo "" + log_step "Starting chaos test sequence..." + sleep 2 + + # Scenario 1: Kill PostgreSQL primary + log_info "🔥 Scenario 1: Killing PostgreSQL primary..." + local primary=$(curl -s http://localhost:8008/health 2>/dev/null | jq -r '.role' 2>/dev/null) + if [ "$primary" == "master" ] || [ "$primary" == "primary" ]; then + docker-compose -f docker-compose.patroni.yml stop patroni1 &>/dev/null + log_warn "patroni1 stopped" + fi + + sleep 15 + log_step "Checking if system recovered..." + check_patroni_health + echo "" + + # Scenario 2: Kill Ceph OSD + log_info "🔥 Scenario 2: Killing Ceph OSD..." + docker-compose -f docker-compose.patroni.yml stop ceph-osd2 &>/dev/null + log_warn "ceph-osd2 stopped" + + sleep 15 + log_step "Checking Ceph status..." + check_ceph_health + echo "" + + # Scenario 3: Kill Volume Manager leader + log_info "🔥 Scenario 3: Killing Volume Manager leader..." + local leader=$(etcdctl --endpoints=localhost:2379 get /csf/volume-manager/election/leader --print-value-only 2>/dev/null) + if [ -n "$leader" ]; then + docker-compose -f docker-compose.patroni.yml stop $leader &>/dev/null + log_warn "$leader stopped" + fi + + sleep 10 + log_step "Checking leader re-election..." + check_volume_manager_health + echo "" + + # Check if system is still functional + log_step "Testing system functionality under stress..." + + if docker exec patroni2 psql -U csf -d csf_core -c "SELECT 1;" &>/dev/null; then + log_success "✅ Database still accessible!" + else + log_error "Database not accessible" + fi + + echo "" + log_info "🔄 Recovering all services..." + docker-compose -f docker-compose.patroni.yml up -d &>/dev/null + + log_step "Waiting for recovery (30s)..." + sleep 30 + + log_info "Final system status:" + check_etcd_health + check_ceph_health + check_patroni_health + check_volume_manager_health + + echo "" + log_success "🎉 Chaos test completed!" +} + +# 📋 Main Menu +show_menu() { + echo "" + echo -e "${GREEN}╔════════════════════════════════════════════╗${NC}" + echo -e "${GREEN}║ 🧪 Hybrid System Test Suite ║${NC}" + echo -e "${GREEN}║ etcd + Ceph + PostgreSQL/Patroni + VM ║${NC}" + echo -e "${GREEN}╚════════════════════════════════════════════╝${NC}" + echo "" + echo " 1) 📊 Complete System Status" + echo " 2) 🔄 Test Data Replication" + echo " 3) 🗄️ Test PostgreSQL Failover" + echo " 4) 💾 Test Ceph OSD Failure" + echo " 5) 🎛️ Test Volume Manager Failover" + echo " 6) 🔗 End-to-End Integration Test" + echo " 7) 📈 Performance Metrics" + echo " 8) 👀 Monitor Cluster (Live)" + echo " 9) 🔥 Full Chaos Test (Advanced)" + echo " 0) 🚪 Exit" + echo "" +} + +# 🚀 Main +main() { + clear + log_header "Hybrid System Test Suite" + + # Check prerequisites + check_prerequisites + + # Main loop + while true; do + show_menu + read -p "$(echo -e ${CYAN}Select option: ${NC})" choice + + case $choice in + 1) test_system_status ;; + 2) test_data_replication ;; + 3) test_postgres_failover ;; + 4) test_ceph_failover ;; + 5) test_volume_manager_failover ;; + 6) test_e2e_integration ;; + 7) test_performance_metrics ;; + 8) test_live_monitoring ;; + 9) test_chaos ;; + 0) + echo "" + log_info "Exiting... Goodbye! 👋" + echo "" + exit 0 + ;; + *) + log_error "Invalid option" + ;; + esac + + echo "" + read -p "$(echo -e ${CYAN}Press Enter to continue...${NC})" + done +} + +# Run main +main diff --git a/control-plane/volume-manager/test-patroni-ha.sh b/control-plane/volume-manager/test-patroni-ha.sh new file mode 100755 index 0000000..55931d2 --- /dev/null +++ b/control-plane/volume-manager/test-patroni-ha.sh @@ -0,0 +1,396 @@ +#!/bin/bash + +# Test-Suite für PostgreSQL HA mit Patroni + Ceph +# Testet verschiedene Failover-Szenarien + +set -e + +# Colors +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo -e "${BLUE}═══════════════════════════════════════════════${NC}" +echo -e "${BLUE} PostgreSQL HA Failover Tests (Patroni)${NC}" +echo -e "${BLUE}═══════════════════════════════════════════════${NC}" +echo "" + +# Function: Check if service is running +check_service() { + local service=$1 + if docker ps | grep -q $service; then + echo -e "${GREEN}✅${NC} $service" + return 0 + else + echo -e "${RED}❌${NC} $service" + return 1 + fi +} + +# Function: Get Patroni Primary +get_primary() { + for port in 8008 8009 8010; do + role=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '.role' 2>/dev/null) + if [ "$role" == "master" ] || [ "$role" == "primary" ]; then + case $port in + 8008) echo "patroni1" ;; + 8009) echo "patroni2" ;; + 8010) echo "patroni3" ;; + esac + return 0 + fi + done + echo "none" +} + +# Function: Count healthy replicas +count_replicas() { + local count=0 + for port in 8008 8009 8010; do + role=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '.role' 2>/dev/null) + if [ "$role" == "replica" ]; then + ((count++)) + fi + done + echo $count +} + +# Function: Test database write +test_write() { + local port=$1 + local test_data="failover_test_$(date +%s)" + + # Ermittle aktuellen Primary + local primary=$(get_primary) + echo "Writing to primary: $primary" + + if [ "$primary" == "none" ]; then + echo -e "${RED}❌ No primary found!${NC}" + return 1 + fi + + # Bestimme Replica + local replica="" + if [ "$primary" == "patroni1" ]; then replica="patroni2"; + else replica="patroni1"; fi + + docker exec $primary psql -U csf -d csf_core -c \ + "CREATE TABLE IF NOT EXISTS failover_test (id SERIAL PRIMARY KEY, data TEXT, created_at TIMESTAMP DEFAULT NOW());" &>/dev/null + + docker exec $primary psql -U csf -d csf_core -c \ + "INSERT INTO failover_test (data) VALUES ('$test_data');" &>/dev/null + + # Verify on replica + sleep 2 + local result=$(docker exec $replica psql -U csf -d csf_core -t -c \ + "SELECT data FROM failover_test WHERE data='$test_data';" 2>/dev/null | xargs) + + if [ "$result" == "$test_data" ]; then + echo -e "${GREEN}✅ Data replicated successfully${NC}" + return 0 + else + echo -e "${RED}❌ Replication failed${NC}" + return 1 + fi +} + +# Menu +show_menu() { + echo "" + echo "Choose a test:" + echo " 1) Check Cluster Status" + echo " 2) Test Database Replication" + echo " 3) Test PostgreSQL Primary Failover" + echo " 4) Test Ceph OSD Failure" + echo " 5) Test Volume Manager Failover" + echo " 6) Full HA Test (All scenarios)" + echo " 7) Monitor Cluster (Live)" + echo " 0) Exit" + echo "" + read -p "Select option: " choice +} + +# Test 1: Cluster Status +test_cluster_status() { + echo "" + echo -e "${YELLOW}📊 Checking Cluster Status...${NC}" + echo "" + + echo "🗄️ PostgreSQL Nodes:" + check_service "patroni1" + check_service "patroni2" + check_service "patroni3" + echo "" + + primary=$(get_primary) + replicas=$(count_replicas) + + echo -e "👑 Primary: ${GREEN}$primary${NC}" + echo -e "🔄 Replicas: ${GREEN}$replicas${NC}" + echo "" + + echo "💾 Ceph Storage:" + check_service "ceph-mon1" + check_service "ceph-osd1" + check_service "ceph-osd2" + echo "" + + echo "🎛️ Control Plane:" + check_service "etcd1" + check_service "volume-manager-1" + check_service "postgres-haproxy" + echo "" + + echo "Ceph Health:" + docker exec ceph-mon1 ceph health 2>/dev/null || echo "Ceph not ready" + echo "" +} + +# Test 2: Database Replication +test_replication() { + echo "" + echo -e "${YELLOW}🧪 Testing Database Replication...${NC}" + echo "" + + primary=$(get_primary) + if [ "$primary" == "none" ]; then + echo -e "${RED}❌ No primary found!${NC}" + return 1 + fi + + echo "Primary is: $primary" + echo "Writing test data..." + + if test_write; then + echo -e "${GREEN}✅ Replication test passed${NC}" + else + echo -e "${RED}❌ Replication test failed${NC}" + fi + echo "" +} + +# Test 3: PostgreSQL Failover +test_postgres_failover() { + echo "" + echo -e "${YELLOW}🧪 Testing PostgreSQL Primary Failover...${NC}" + echo "" + + primary=$(get_primary) + if [ "$primary" == "none" ]; then + echo -e "${RED}❌ No primary found!${NC}" + return 1 + fi + + echo "Current Primary: $primary" + echo "" + + read -p "Stop $primary to trigger failover? (y/N) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + return 0 + fi + + echo "Stopping $primary..." + docker-compose -f docker-compose.patroni.yml stop $primary + + echo "Waiting for failover (max 30 seconds)..." + for i in {1..30}; do + sleep 1 + new_primary=$(get_primary) + if [ "$new_primary" != "none" ] && [ "$new_primary" != "$primary" ]; then + echo "" + echo -e "${GREEN}✅ Failover successful!${NC}" + echo "New Primary: $new_primary (took ${i}s)" + break + fi + echo -n "." + done + echo "" + + # Test connection to new primary + echo "Testing connection to new primary..." + sleep 3 + if docker exec $new_primary psql -U csf -d csf_core -c "SELECT 1;" &>/dev/null; then + echo -e "${GREEN}✅ New primary is accepting connections${NC}" + else + echo -e "${RED}❌ New primary not ready${NC}" + fi + echo "" + + read -p "Restart $primary? (y/N) " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + echo "Restarting $primary..." + docker-compose -f docker-compose.patroni.yml start $primary + echo -e "${GREEN}✅ $primary restarted (will join as replica)${NC}" + fi + echo "" +} + +# Test 4: Ceph OSD Failure +test_ceph_failure() { + echo "" + echo -e "${YELLOW}🧪 Testing Ceph OSD Failure...${NC}" + echo "" + + echo "Current Ceph Status:" + docker exec ceph-mon1 ceph -s + echo "" + + read -p "Stop ceph-osd1? (y/N) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + return 0 + fi + + echo "Stopping ceph-osd1..." + docker-compose -f docker-compose.patroni.yml stop ceph-osd1 + + echo "Waiting for Ceph to detect failure..." + sleep 10 + + echo "" + echo "Ceph Status (should be HEALTH_WARN with degraded PGs):" + docker exec ceph-mon1 ceph -s + echo "" + + echo -e "${YELLOW}Testing if PostgreSQL still works...${NC}" + if docker exec patroni1 psql -U csf -d csf_core -c "SELECT version();" &>/dev/null; then + echo -e "${GREEN}✅ PostgreSQL still working (Ceph has 2 replicas)${NC}" + else + echo -e "${RED}❌ PostgreSQL affected${NC}" + fi + echo "" + + read -p "Restart ceph-osd1? (y/N) " -n 1 -r + echo + if [[ $REPLY =~ ^[Yy]$ ]]; then + echo "Restarting ceph-osd1..." + docker-compose -f docker-compose.patroni.yml start ceph-osd1 + echo "Waiting for recovery..." + sleep 10 + echo "" + docker exec ceph-mon1 ceph -s + fi + echo "" +} + +# Test 5: Volume Manager Failover +test_volume_manager_failover() { + echo "" + echo -e "${YELLOW}🧪 Testing Volume Manager Leader Election...${NC}" + echo "" + + echo "Current Volume Manager nodes:" + check_service "volume-manager-1" + check_service "volume-manager-2" + check_service "volume-manager-3" + echo "" + + echo "Checking logs for current leader..." + for i in {1..3}; do + if docker logs volume-manager-$i 2>&1 | tail -20 | grep -q "LEADER"; then + echo -e "volume-manager-$i: ${GREEN}LEADER${NC}" + else + echo -e "volume-manager-$i: FOLLOWER" + fi + done + echo "" + + read -p "Stop volume-manager-1? (y/N) " -n 1 -r + echo + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + return 0 + fi + + docker-compose -f docker-compose.patroni.yml stop volume-manager-1 + echo "Waiting for new leader election..." + sleep 10 + + echo "New leader should be elected:" + for i in {2..3}; do + if docker logs volume-manager-$i 2>&1 | tail -20 | grep -q "LEADER"; then + echo -e "volume-manager-$i: ${GREEN}NEW LEADER${NC}" + else + echo -e "volume-manager-$i: FOLLOWER" + fi + done + echo "" +} + +# Test 6: Full HA Test +test_full_ha() { + echo "" + echo -e "${YELLOW}🧪 Running Full HA Test Suite...${NC}" + echo "" + + test_cluster_status + read -p "Press Enter to continue..." dummy + + test_replication + read -p "Press Enter to continue..." dummy + + test_postgres_failover + read -p "Press Enter to continue..." dummy + + test_ceph_failure + + echo "" + echo -e "${GREEN}✅ Full HA test completed${NC}" + echo "" +} + +# Test 7: Monitor +monitor_cluster() { + echo "" + echo -e "${YELLOW}📊 Monitoring Cluster (Ctrl+C to stop)...${NC}" + echo "" + + while true; do + clear + echo "=== PostgreSQL HA Cluster Monitor ===" + echo "" + echo "Time: $(date)" + echo "" + + primary=$(get_primary) + replicas=$(count_replicas) + + echo -e "Primary: ${GREEN}$primary${NC}" + echo -e "Replicas: ${GREEN}$replicas${NC}" + echo "" + + echo "PostgreSQL Nodes:" + for port in 8008 8009 8010; do + node="patroni$((port-8007))" + health=$(curl -s http://localhost:$port/health 2>/dev/null | jq -r '"\(.role) - \(.state)"' 2>/dev/null || echo "offline") + echo " $node: $health" + done + echo "" + + echo "Ceph Health:" + docker exec ceph-mon1 ceph health 2>/dev/null || echo " offline" + echo "" + + sleep 5 + done +} + +# Main loop +while true; do + show_menu + + case $choice in + 1) test_cluster_status ;; + 2) test_replication ;; + 3) test_postgres_failover ;; + 4) test_ceph_failure ;; + 5) test_volume_manager_failover ;; + 6) test_full_ha ;; + 7) monitor_cluster ;; + 0) echo "Goodbye!"; exit 0 ;; + *) echo "Invalid option" ;; + esac +done