From ab0798481265b45c6027ca7aef3136bd9f2b403e Mon Sep 17 00:00:00 2001 From: GatewayJ <835269233@qq.com> Date: Mon, 2 Feb 2026 20:04:23 +0800 Subject: [PATCH 1/2] console --- Cargo.lock | 2 + Cargo.toml | 1 + src/console/handlers/mod.rs | 2 + src/console/handlers/pods.rs | 415 ++++++++++++++++++++++++++++++++ src/console/handlers/pools.rs | 350 +++++++++++++++++++++++++++ src/console/handlers/tenants.rs | 151 ++++++++++++ src/console/models/mod.rs | 2 + src/console/models/pod.rs | 144 +++++++++++ src/console/models/pool.rs | 84 +++++++ src/console/models/tenant.rs | 50 ++++ src/console/routes/mod.rs | 48 +++- src/console/server.rs | 2 + 12 files changed, 1250 insertions(+), 1 deletion(-) create mode 100644 src/console/handlers/pods.rs create mode 100644 src/console/handlers/pools.rs create mode 100644 src/console/models/pod.rs create mode 100644 src/console/models/pool.rs diff --git a/Cargo.lock b/Cargo.lock index 15cd884..c7430a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1516,6 +1516,7 @@ dependencies = [ "snafu", "strum", "tokio", + "tokio-util", "tower", "tower-http", "tracing", @@ -2336,6 +2337,7 @@ checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "slab", diff --git a/Cargo.toml b/Cargo.toml index 8a9838f..fd571c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ chrono = { version = "0.4", features = ["serde"] } const-str = "1.0.0" serde = { version = "1.0.228", features = ["derive"] } tokio = { version = "1.49.0", features = ["rt", "rt-multi-thread", "macros", "fs", "io-std", "io-util"] } +tokio-util = { version = "0.7", features = ["io", "compat"] } futures = "0.3.31" tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } diff --git a/src/console/handlers/mod.rs b/src/console/handlers/mod.rs index 09fa7e0..db4cc76 100644 --- a/src/console/handlers/mod.rs +++ b/src/console/handlers/mod.rs @@ -15,4 +15,6 @@ pub mod auth; pub mod cluster; pub mod events; +pub mod pods; +pub mod pools; pub mod tenants; diff --git a/src/console/handlers/pods.rs b/src/console/handlers/pods.rs new file mode 100644 index 0000000..a30609d --- /dev/null +++ b/src/console/handlers/pods.rs @@ -0,0 +1,415 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::{ + body::Body, + extract::{Path, Query}, + response::{IntoResponse, Response}, + Extension, Json, +}; +use k8s_openapi::api::core::v1 as corev1; +use kube::{ + api::{DeleteParams, ListParams, LogParams}, + Api, Client, ResourceExt, +}; +use snafu::ResultExt; +use futures::TryStreamExt; + +use crate::console::{ + error::{self, Error, Result}, + models::pod::*, + state::Claims, +}; + +/// 列出 Tenant 的所有 Pods +pub async fn list_pods( + Path((namespace, tenant_name)): Path<(String, String)>, + Extension(claims): Extension, +) -> Result> { + let client = create_client(&claims).await?; + let api: Api = Api::namespaced(client, &namespace); + + // 查询带有 Tenant 标签的 Pods + let pods = api + .list( + &ListParams::default().labels(&format!("rustfs.tenant={}", tenant_name)), + ) + .await + .context(error::KubeApiSnafu)?; + + let mut pod_list = Vec::new(); + + for pod in pods.items { + let name = pod.name_any(); + let status = pod.status.as_ref(); + let spec = pod.spec.as_ref(); + + // 提取 Pool 名称(从 Pod 名称中解析) + let pool = pod + .metadata + .labels + .as_ref() + .and_then(|l| l.get("rustfs.pool")) + .cloned() + .unwrap_or_else(|| "unknown".to_string()); + + // Pod 阶段 + let phase = status + .and_then(|s| s.phase.clone()) + .unwrap_or_else(|| "Unknown".to_string()); + + // 整体状态 + let pod_status = if let Some(status) = status { + if let Some(conditions) = &status.conditions { + if conditions + .iter() + .any(|c| c.type_ == "Ready" && c.status == "True") + { + "Running" + } else { + "NotReady" + } + } else { + &phase + } + } else { + "Unknown" + }; + + // 节点名称 + let node = spec.and_then(|s| s.node_name.clone()); + + // 容器就绪状态 + let (ready_count, total_count) = if let Some(status) = status { + let total = status.container_statuses.as_ref().map(|c| c.len()).unwrap_or(0); + let ready = status + .container_statuses + .as_ref() + .map(|containers| containers.iter().filter(|c| c.ready).count()) + .unwrap_or(0); + (ready, total) + } else { + (0, 0) + }; + + // 重启次数 + let restarts = status + .and_then(|s| s.container_statuses.as_ref()) + .map(|containers| { + containers + .iter() + .map(|c| c.restart_count) + .sum::() + }) + .unwrap_or(0); + + // 创建时间和 Age + let created_at = pod + .metadata + .creation_timestamp + .as_ref() + .map(|ts| ts.0.to_rfc3339()); + + let age = pod + .metadata + .creation_timestamp + .as_ref() + .map(|ts| { + let duration = chrono::Utc::now().signed_duration_since(ts.0); + format_duration(duration) + }) + .unwrap_or_else(|| "Unknown".to_string()); + + pod_list.push(PodListItem { + name, + pool, + status: pod_status.to_string(), + phase, + node, + ready: format!("{}/{}", ready_count, total_count), + restarts, + age, + created_at, + }); + } + + Ok(Json(PodListResponse { pods: pod_list })) +} + +/// 删除 Pod +pub async fn delete_pod( + Path((namespace, _tenant_name, pod_name)): Path<(String, String, String)>, + Extension(claims): Extension, +) -> Result> { + let client = create_client(&claims).await?; + let api: Api = Api::namespaced(client, &namespace); + + api.delete(&pod_name, &DeleteParams::default()) + .await + .context(error::KubeApiSnafu)?; + + Ok(Json(DeletePodResponse { + success: true, + message: format!( + "Pod '{}' deletion initiated. StatefulSet will recreate it.", + pod_name + ), + })) +} + +/// 重启 Pod(通过删除实现) +pub async fn restart_pod( + Path((namespace, tenant_name, pod_name)): Path<(String, String, String)>, + Extension(claims): Extension, + Json(req): Json, +) -> Result> { + let client = create_client(&claims).await?; + let api: Api = Api::namespaced(client, &namespace); + + // 删除 Pod,StatefulSet 控制器会自动重建 + let delete_params = if req.force { + DeleteParams { + grace_period_seconds: Some(0), + ..Default::default() + } + } else { + DeleteParams::default() + }; + + api.delete(&pod_name, &delete_params) + .await + .context(error::KubeApiSnafu)?; + + Ok(Json(DeletePodResponse { + success: true, + message: format!( + "Pod '{}' restart initiated. StatefulSet will recreate it.", + pod_name + ), + })) +} + +/// 获取 Pod 详情 +pub async fn get_pod_details( + Path((namespace, _tenant_name, pod_name)): Path<(String, String, String)>, + Extension(claims): Extension, +) -> Result> { + let client = create_client(&claims).await?; + let api: Api = Api::namespaced(client, &namespace); + + let pod = api.get(&pod_name).await.context(error::KubeApiSnafu)?; + + // 提取详细信息 + let pool = pod + .metadata + .labels + .as_ref() + .and_then(|l| l.get("rustfs.pool")) + .cloned() + .unwrap_or_else(|| "unknown".to_string()); + + let status_info = pod.status.as_ref(); + let spec = pod.spec.as_ref(); + + // 构建状态 + let status = PodStatus { + phase: status_info + .and_then(|s| s.phase.clone()) + .unwrap_or_else(|| "Unknown".to_string()), + conditions: status_info + .and_then(|s| s.conditions.as_ref()) + .map(|conditions| { + conditions + .iter() + .map(|c| PodCondition { + type_: c.type_.clone(), + status: c.status.clone(), + reason: c.reason.clone(), + message: c.message.clone(), + last_transition_time: c.last_transition_time.as_ref().map(|t| t.0.to_rfc3339()), + }) + .collect() + }) + .unwrap_or_default(), + host_ip: status_info.and_then(|s| s.host_ip.clone()), + pod_ip: status_info.and_then(|s| s.pod_ip.clone()), + start_time: status_info + .and_then(|s| s.start_time.as_ref()) + .map(|t| t.0.to_rfc3339()), + }; + + // 容器信息 + let containers = if let Some(container_statuses) = status_info.and_then(|s| s.container_statuses.as_ref()) { + container_statuses + .iter() + .map(|cs| { + let state = if let Some(running) = &cs.state.as_ref().and_then(|s| s.running.as_ref()) { + ContainerState::Running { + started_at: running.started_at.as_ref().map(|t| t.0.to_rfc3339()), + } + } else if let Some(waiting) = &cs.state.as_ref().and_then(|s| s.waiting.as_ref()) { + ContainerState::Waiting { + reason: waiting.reason.clone(), + message: waiting.message.clone(), + } + } else if let Some(terminated) = &cs.state.as_ref().and_then(|s| s.terminated.as_ref()) { + ContainerState::Terminated { + reason: terminated.reason.clone(), + exit_code: terminated.exit_code, + finished_at: terminated.finished_at.as_ref().map(|t| t.0.to_rfc3339()), + } + } else { + ContainerState::Waiting { + reason: Some("Unknown".to_string()), + message: None, + } + }; + + ContainerInfo { + name: cs.name.clone(), + image: cs.image.clone(), + ready: cs.ready, + restart_count: cs.restart_count, + state, + } + }) + .collect() + } else { + Vec::new() + }; + + // Volume 信息 + let volumes = spec + .and_then(|s| s.volumes.as_ref()) + .map(|vols| { + vols.iter() + .map(|v| { + let volume_type = if v.persistent_volume_claim.is_some() { + "PersistentVolumeClaim" + } else if v.empty_dir.is_some() { + "EmptyDir" + } else if v.config_map.is_some() { + "ConfigMap" + } else if v.secret.is_some() { + "Secret" + } else { + "Other" + }; + + VolumeInfo { + name: v.name.clone(), + volume_type: volume_type.to_string(), + claim_name: v + .persistent_volume_claim + .as_ref() + .and_then(|pvc| Some(pvc.claim_name.clone())), + } + }) + .collect() + }) + .unwrap_or_default(); + + Ok(Json(PodDetails { + name: pod.name_any(), + namespace: pod.namespace().unwrap_or_default(), + pool, + status, + containers, + volumes, + node: spec.and_then(|s| s.node_name.clone()), + ip: status_info.and_then(|s| s.pod_ip.clone()), + labels: pod.metadata.labels.unwrap_or_default(), + annotations: pod.metadata.annotations.unwrap_or_default(), + created_at: pod + .metadata + .creation_timestamp + .map(|ts| ts.0.to_rfc3339()), + })) +} + +/// 获取 Pod 日志(流式传输) +pub async fn get_pod_logs( + Path((namespace, _tenant_name, pod_name)): Path<(String, String, String)>, + Query(query): Query, + Extension(claims): Extension, +) -> Result { + let client = create_client(&claims).await?; + let api: Api = Api::namespaced(client, &namespace); + + // 构建日志参数 + let mut log_params = LogParams { + container: query.container, + follow: query.follow, + tail_lines: Some(query.tail_lines), + timestamps: query.timestamps, + ..Default::default() + }; + + if let Some(since_time) = query.since_time { + if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&since_time) { + log_params.since_seconds = Some( + chrono::Utc::now() + .signed_duration_since(dt.with_timezone(&chrono::Utc)) + .num_seconds(), + ); + } + } + + // 获取日志流 + let log_stream = api + .log_stream(&pod_name, &log_params) + .await + .context(error::KubeApiSnafu)?; + + // 将字节流转换为可用的 Body + // kube-rs 返回的是 impl AsyncBufRead,我们需要逐行读取并转换为字节流 + use futures::io::AsyncBufReadExt; + let lines = log_stream.lines(); + + // 转换为字节流 + let byte_stream = lines.map_ok(|line| format!("{}\n", line).into_bytes()); + + // 返回流式响应 + Ok(Body::from_stream(byte_stream).into_response()) +} + +/// 创建 Kubernetes 客户端 +async fn create_client(claims: &Claims) -> Result { + let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer { + message: format!("Failed to load kubeconfig: {}", e), + })?; + + config.auth_info.token = Some(claims.k8s_token.clone().into()); + + Client::try_from(config).map_err(|e| Error::InternalServer { + message: format!("Failed to create K8s client: {}", e), + }) +} + +/// 格式化时间间隔 +fn format_duration(duration: chrono::Duration) -> String { + let days = duration.num_days(); + let hours = duration.num_hours() % 24; + let minutes = duration.num_minutes() % 60; + + if days > 0 { + format!("{}d{}h", days, hours) + } else if hours > 0 { + format!("{}h{}m", hours, minutes) + } else if minutes > 0 { + format!("{}m", minutes) + } else { + format!("{}s", duration.num_seconds()) + } +} diff --git a/src/console/handlers/pools.rs b/src/console/handlers/pools.rs new file mode 100644 index 0000000..e83c387 --- /dev/null +++ b/src/console/handlers/pools.rs @@ -0,0 +1,350 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::{extract::Path, Extension, Json}; +use k8s_openapi::api::apps::v1 as appsv1; +use k8s_openapi::api::core::v1 as corev1; +use kube::{api::ListParams, Api, Client, ResourceExt}; +use snafu::ResultExt; + +use crate::console::{ + error::{self, Error, Result}, + models::pool::*, + state::Claims, +}; +use crate::types::v1alpha1::{ + persistence::PersistenceConfig, + pool::{Pool, SchedulingConfig}, + tenant::Tenant, +}; + +/// 列出 Tenant 的所有 Pools +pub async fn list_pools( + Path((namespace, tenant_name)): Path<(String, String)>, + Extension(claims): Extension, +) -> Result> { + let client = create_client(&claims).await?; + let tenant_api: Api = Api::namespaced(client.clone(), &namespace); + + // 获取 Tenant + let tenant = tenant_api + .get(&tenant_name) + .await + .context(error::KubeApiSnafu)?; + + // 获取所有 StatefulSets + let ss_api: Api = Api::namespaced(client, &namespace); + let statefulsets = ss_api + .list( + &ListParams::default() + .labels(&format!("rustfs.tenant={}", tenant_name)), + ) + .await + .context(error::KubeApiSnafu)?; + + let mut pools_details = Vec::new(); + + for pool in &tenant.spec.pools { + let ss_name = format!("{}-{}", tenant_name, pool.name); + + // 查找对应的 StatefulSet + let ss = statefulsets + .items + .iter() + .find(|ss| ss.name_any() == ss_name); + + let ( + replicas, + ready_replicas, + updated_replicas, + current_revision, + update_revision, + state, + ) = if let Some(ss) = ss { + let status = ss.status.as_ref(); + let replicas = status.map(|s| s.replicas).unwrap_or(0); + let ready = status.and_then(|s| s.ready_replicas).unwrap_or(0); + let updated = status.and_then(|s| s.updated_replicas).unwrap_or(0); + let current_rev = status.and_then(|s| s.current_revision.clone()); + let update_rev = status.and_then(|s| s.update_revision.clone()); + + let state = if ready == replicas && updated == replicas && replicas > 0 { + "Ready" + } else if updated < replicas { + "Updating" + } else if ready < replicas { + "Degraded" + } else { + "NotReady" + }; + + ( + replicas, + ready, + updated, + current_rev, + update_rev, + state.to_string(), + ) + } else { + (0, 0, 0, None, None, "NotCreated".to_string()) + }; + + // 获取存储配置 + let storage_class = pool + .persistence + .volume_claim_template + .as_ref() + .and_then(|t| t.storage_class_name.clone()); + + let volume_size = pool + .persistence + .volume_claim_template + .as_ref() + .and_then(|t| { + t.resources.as_ref().and_then(|r| { + r.requests + .as_ref() + .and_then(|req| req.get("storage").map(|q| q.0.clone())) + }) + }); + + pools_details.push(PoolDetails { + name: pool.name.clone(), + servers: pool.servers, + volumes_per_server: pool.persistence.volumes_per_server, + total_volumes: pool.servers * pool.persistence.volumes_per_server, + storage_class, + volume_size, + replicas, + ready_replicas, + updated_replicas, + current_revision, + update_revision, + state, + created_at: ss.and_then(|s| { + s.metadata + .creation_timestamp + .as_ref() + .map(|ts| ts.0.to_rfc3339()) + }), + }); + } + + Ok(Json(PoolListResponse { + pools: pools_details, + })) +} + +/// 添加新的 Pool 到 Tenant +pub async fn add_pool( + Path((namespace, tenant_name)): Path<(String, String)>, + Extension(claims): Extension, + Json(req): Json, +) -> Result> { + let client = create_client(&claims).await?; + let tenant_api: Api = Api::namespaced(client, &namespace); + + // 获取当前 Tenant + let mut tenant = tenant_api + .get(&tenant_name) + .await + .context(error::KubeApiSnafu)?; + + // 验证 Pool 名称不重复 + if tenant.spec.pools.iter().any(|p| p.name == req.name) { + return Err(Error::BadRequest { + message: format!("Pool '{}' already exists", req.name), + }); + } + + // 验证最小卷数要求 (servers * volumes_per_server >= 4) + let total_volumes = req.servers * req.volumes_per_server; + if total_volumes < 4 { + return Err(Error::BadRequest { + message: format!( + "Pool must have at least 4 total volumes (got {} servers × {} volumes = {})", + req.servers, req.volumes_per_server, total_volumes + ), + }); + } + + // 构建新的 Pool + let new_pool = Pool { + name: req.name.clone(), + servers: req.servers, + persistence: PersistenceConfig { + volumes_per_server: req.volumes_per_server, + volume_claim_template: Some(corev1::PersistentVolumeClaimSpec { + access_modes: Some(vec!["ReadWriteOnce".to_string()]), + resources: Some(corev1::VolumeResourceRequirements { + requests: Some( + vec![( + "storage".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity( + req.storage_size.clone(), + ), + )] + .into_iter() + .collect(), + ), + ..Default::default() + }), + storage_class_name: req.storage_class.clone(), + ..Default::default() + }), + path: None, + labels: None, + annotations: None, + }, + scheduling: SchedulingConfig { + node_selector: req.node_selector, + resources: req.resources.map(|r| corev1::ResourceRequirements { + requests: r.requests.map(|req| { + let mut map = std::collections::BTreeMap::new(); + if let Some(cpu) = req.cpu { + map.insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu), + ); + } + if let Some(memory) = req.memory { + map.insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory), + ); + } + map + }), + limits: r.limits.map(|lim| { + let mut map = std::collections::BTreeMap::new(); + if let Some(cpu) = lim.cpu { + map.insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu), + ); + } + if let Some(memory) = lim.memory { + map.insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory), + ); + } + map + }), + ..Default::default() + }), + affinity: None, + tolerations: None, + topology_spread_constraints: None, + priority_class_name: None, + }, + }; + + // 添加到 Tenant + tenant.spec.pools.push(new_pool); + + // 更新 Tenant + let updated_tenant = tenant_api + .replace(&tenant_name, &Default::default(), &tenant) + .await + .context(error::KubeApiSnafu)?; + + Ok(Json(AddPoolResponse { + success: true, + message: format!("Pool '{}' added successfully", req.name), + pool: PoolDetails { + name: req.name.clone(), + servers: req.servers, + volumes_per_server: req.volumes_per_server, + total_volumes, + storage_class: req.storage_class, + volume_size: Some(req.storage_size), + replicas: 0, + ready_replicas: 0, + updated_replicas: 0, + current_revision: None, + update_revision: None, + state: "Creating".to_string(), + created_at: updated_tenant + .metadata + .creation_timestamp + .map(|ts| ts.0.to_rfc3339()), + }, + })) +} + +/// 删除 Pool +pub async fn delete_pool( + Path((namespace, tenant_name, pool_name)): Path<(String, String, String)>, + Extension(claims): Extension, +) -> Result> { + let client = create_client(&claims).await?; + let tenant_api: Api = Api::namespaced(client, &namespace); + + // 获取当前 Tenant + let mut tenant = tenant_api + .get(&tenant_name) + .await + .context(error::KubeApiSnafu)?; + + // 检查是否为最后一个 Pool + if tenant.spec.pools.len() == 1 { + return Err(Error::BadRequest { + message: "Cannot delete the last pool. Delete the entire Tenant instead." + .to_string(), + }); + } + + // 查找并移除 Pool + let pool_index = tenant + .spec + .pools + .iter() + .position(|p| p.name == pool_name) + .ok_or_else(|| Error::NotFound { + resource: format!("Pool '{}'", pool_name), + })?; + + tenant.spec.pools.remove(pool_index); + + // 更新 Tenant + tenant_api + .replace(&tenant_name, &Default::default(), &tenant) + .await + .context(error::KubeApiSnafu)?; + + Ok(Json(DeletePoolResponse { + success: true, + message: format!("Pool '{}' deleted successfully", pool_name), + warning: Some( + "The StatefulSet and PVCs will be deleted by the Operator. \ + Data may be lost if PVCs are not using a retain policy." + .to_string(), + ), + })) +} + +/// 创建 Kubernetes 客户端 +async fn create_client(claims: &Claims) -> Result { + let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer { + message: format!("Failed to load kubeconfig: {}", e), + })?; + + config.auth_info.token = Some(claims.k8s_token.clone().into()); + + Client::try_from(config).map_err(|e| Error::InternalServer { + message: format!("Failed to create K8s client: {}", e), + }) +} diff --git a/src/console/handlers/tenants.rs b/src/console/handlers/tenants.rs index 6579d99..19dde46 100644 --- a/src/console/handlers/tenants.rs +++ b/src/console/handlers/tenants.rs @@ -307,6 +307,157 @@ pub async fn delete_tenant( })) } +/// 更新 Tenant +pub async fn update_tenant( + Path((namespace, name)): Path<(String, String)>, + Extension(claims): Extension, + Json(req): Json, +) -> Result> { + let client = create_client(&claims).await?; + let api: Api = Api::namespaced(client, &namespace); + + // 获取当前 Tenant + let mut tenant = api.get(&name).await.context(error::KubeApiSnafu)?; + + // 应用更新(仅更新提供的字段) + let mut updated_fields = Vec::new(); + + if let Some(image) = req.image { + tenant.spec.image = Some(image.clone()); + updated_fields.push(format!("image={}", image)); + } + + if let Some(mount_path) = req.mount_path { + tenant.spec.mount_path = Some(mount_path.clone()); + updated_fields.push(format!("mount_path={}", mount_path)); + } + + if let Some(env_vars) = req.env { + tenant.spec.env = env_vars + .into_iter() + .map(|e| corev1::EnvVar { + name: e.name, + value: e.value, + ..Default::default() + }) + .collect(); + updated_fields.push("env".to_string()); + } + + if let Some(creds_secret) = req.creds_secret { + if creds_secret.is_empty() { + tenant.spec.creds_secret = None; + updated_fields.push("creds_secret=".to_string()); + } else { + tenant.spec.creds_secret = Some(corev1::LocalObjectReference { + name: creds_secret.clone(), + }); + updated_fields.push(format!("creds_secret={}", creds_secret)); + } + } + + if let Some(pod_mgmt_policy) = req.pod_management_policy { + use crate::types::v1alpha1::k8s::PodManagementPolicy; + tenant.spec.pod_management_policy = match pod_mgmt_policy.as_str() { + "OrderedReady" => Some(PodManagementPolicy::OrderedReady), + "Parallel" => Some(PodManagementPolicy::Parallel), + _ => { + return Err(Error::BadRequest { + message: format!( + "Invalid pod_management_policy '{}', must be 'OrderedReady' or 'Parallel'", + pod_mgmt_policy + ), + }) + } + }; + updated_fields.push(format!("pod_management_policy={}", pod_mgmt_policy)); + } + + if let Some(image_pull_policy) = req.image_pull_policy { + use crate::types::v1alpha1::k8s::ImagePullPolicy; + tenant.spec.image_pull_policy = match image_pull_policy.as_str() { + "Always" => Some(ImagePullPolicy::Always), + "IfNotPresent" => Some(ImagePullPolicy::IfNotPresent), + "Never" => Some(ImagePullPolicy::Never), + _ => { + return Err(Error::BadRequest { + message: format!( + "Invalid image_pull_policy '{}', must be 'Always', 'IfNotPresent', or 'Never'", + image_pull_policy + ), + }) + } + }; + updated_fields.push(format!("image_pull_policy={}", image_pull_policy)); + } + + if let Some(logging) = req.logging { + use crate::types::v1alpha1::logging::{LoggingConfig, LoggingMode}; + + let mode = match logging.log_type.as_str() { + "stdout" => LoggingMode::Stdout, + "emptyDir" => LoggingMode::EmptyDir, + "persistent" => LoggingMode::Persistent, + _ => { + return Err(Error::BadRequest { + message: format!( + "Invalid logging type '{}', must be 'stdout', 'emptyDir', or 'persistent'", + logging.log_type + ), + }) + } + }; + + tenant.spec.logging = Some(LoggingConfig { + mode, + storage_size: logging.volume_size, + storage_class: logging.storage_class, + mount_path: None, + }); + updated_fields.push(format!("logging={}", logging.log_type)); + } + + if updated_fields.is_empty() { + return Err(Error::BadRequest { + message: "No fields to update".to_string(), + }); + } + + // 提交更新 + let updated_tenant = api + .replace(&name, &Default::default(), &tenant) + .await + .context(error::KubeApiSnafu)?; + + Ok(Json(UpdateTenantResponse { + success: true, + message: format!("Tenant updated: {}", updated_fields.join(", ")), + tenant: TenantListItem { + name: updated_tenant.name_any(), + namespace: updated_tenant.namespace().unwrap_or_default(), + pools: updated_tenant + .spec + .pools + .iter() + .map(|p| PoolInfo { + name: p.name.clone(), + servers: p.servers, + volumes_per_server: p.persistence.volumes_per_server, + }) + .collect(), + state: updated_tenant + .status + .as_ref() + .map(|s| s.current_state.to_string()) + .unwrap_or_else(|| "Unknown".to_string()), + created_at: updated_tenant + .metadata + .creation_timestamp + .map(|ts| ts.0.to_rfc3339()), + }, + })) +} + /// 创建 Kubernetes 客户端 async fn create_client(claims: &Claims) -> Result { let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer { diff --git a/src/console/models/mod.rs b/src/console/models/mod.rs index d3b58bb..3523721 100644 --- a/src/console/models/mod.rs +++ b/src/console/models/mod.rs @@ -15,4 +15,6 @@ pub mod auth; pub mod cluster; pub mod event; +pub mod pod; +pub mod pool; pub mod tenant; diff --git a/src/console/models/pod.rs b/src/console/models/pod.rs new file mode 100644 index 0000000..79a15c2 --- /dev/null +++ b/src/console/models/pod.rs @@ -0,0 +1,144 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +/// Pod 列表项 +#[derive(Debug, Serialize)] +pub struct PodListItem { + pub name: String, + pub pool: String, + pub status: String, + pub phase: String, + pub node: Option, + pub ready: String, // e.g., "1/1" + pub restarts: i32, + pub age: String, + pub created_at: Option, +} + +/// Pod 列表响应 +#[derive(Debug, Serialize)] +pub struct PodListResponse { + pub pods: Vec, +} + +/// Pod 详情 +#[derive(Debug, Serialize)] +pub struct PodDetails { + pub name: String, + pub namespace: String, + pub pool: String, + pub status: PodStatus, + pub containers: Vec, + pub volumes: Vec, + pub node: Option, + pub ip: Option, + pub labels: std::collections::BTreeMap, + pub annotations: std::collections::BTreeMap, + pub created_at: Option, +} + +/// Pod 状态 +#[derive(Debug, Serialize)] +pub struct PodStatus { + pub phase: String, + pub conditions: Vec, + pub host_ip: Option, + pub pod_ip: Option, + pub start_time: Option, +} + +/// Pod 条件 +#[derive(Debug, Serialize)] +pub struct PodCondition { + #[serde(rename = "type")] + pub type_: String, + pub status: String, + pub reason: Option, + pub message: Option, + pub last_transition_time: Option, +} + +/// 容器信息 +#[derive(Debug, Serialize)] +pub struct ContainerInfo { + pub name: String, + pub image: String, + pub ready: bool, + pub restart_count: i32, + pub state: ContainerState, +} + +/// 容器状态 +#[derive(Debug, Serialize)] +#[serde(tag = "status")] +pub enum ContainerState { + Running { + started_at: Option, + }, + Waiting { + reason: Option, + message: Option, + }, + Terminated { + reason: Option, + exit_code: i32, + finished_at: Option, + }, +} + +/// Volume 信息 +#[derive(Debug, Serialize)] +pub struct VolumeInfo { + pub name: String, + pub volume_type: String, + pub claim_name: Option, +} + +/// 删除 Pod 响应 +#[derive(Debug, Serialize)] +pub struct DeletePodResponse { + pub success: bool, + pub message: String, +} + +/// 重启 Pod 请求 +#[derive(Debug, Deserialize)] +pub struct RestartPodRequest { + #[serde(default)] + pub force: bool, +} + +/// Pod 日志请求参数 +#[derive(Debug, Deserialize)] +pub struct LogsQuery { + /// 容器名称 + pub container: Option, + /// 尾部行数 + #[serde(default = "default_tail_lines")] + pub tail_lines: i64, + /// 是否跟随 + #[serde(default)] + pub follow: bool, + /// 显示时间戳 + #[serde(default)] + pub timestamps: bool, + /// 从指定时间开始(RFC3339 格式) + pub since_time: Option, +} + +fn default_tail_lines() -> i64 { + 100 +} diff --git a/src/console/models/pool.rs b/src/console/models/pool.rs new file mode 100644 index 0000000..22f3ef3 --- /dev/null +++ b/src/console/models/pool.rs @@ -0,0 +1,84 @@ +// Copyright 2025 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +/// Pool 信息(扩展版) +#[derive(Debug, Serialize)] +pub struct PoolDetails { + pub name: String, + pub servers: i32, + pub volumes_per_server: i32, + pub total_volumes: i32, + pub storage_class: Option, + pub volume_size: Option, + pub replicas: i32, + pub ready_replicas: i32, + pub updated_replicas: i32, + pub current_revision: Option, + pub update_revision: Option, + pub state: String, + pub created_at: Option, +} + +/// Pool 列表响应 +#[derive(Debug, Serialize)] +pub struct PoolListResponse { + pub pools: Vec, +} + +/// 添加 Pool 请求 +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AddPoolRequest { + pub name: String, + pub servers: i32, + pub volumes_per_server: i32, + pub storage_size: String, + pub storage_class: Option, + + // 可选的调度配置 + pub node_selector: Option>, + pub resources: Option, +} + +/// 资源需求 +#[derive(Debug, Deserialize, Serialize)] +pub struct ResourceRequirements { + pub requests: Option, + pub limits: Option, +} + +/// 资源列表 +#[derive(Debug, Deserialize, Serialize)] +pub struct ResourceList { + pub cpu: Option, + pub memory: Option, +} + +/// 删除 Pool 响应 +#[derive(Debug, Serialize)] +pub struct DeletePoolResponse { + pub success: bool, + pub message: String, + pub warning: Option, +} + +/// Pool 添加响应 +#[derive(Debug, Serialize)] +pub struct AddPoolResponse { + pub success: bool, + pub message: String, + pub pool: PoolDetails, +} diff --git a/src/console/models/tenant.rs b/src/console/models/tenant.rs index 2423933..8a0bdfe 100644 --- a/src/console/models/tenant.rs +++ b/src/console/models/tenant.rs @@ -94,3 +94,53 @@ pub struct DeleteTenantResponse { pub success: bool, pub message: String, } + +/// 更新 Tenant 请求 +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateTenantRequest { + /// 更新镜像版本 + pub image: Option, + + /// 更新挂载路径 + pub mount_path: Option, + + /// 更新环境变量 + pub env: Option>, + + /// 更新凭证 Secret + pub creds_secret: Option, + + /// 更新 Pod 管理策略 + pub pod_management_policy: Option, + + /// 更新镜像拉取策略 + pub image_pull_policy: Option, + + /// 更新日志配置 + pub logging: Option, +} + +/// 环境变量 +#[derive(Debug, Deserialize, Serialize)] +pub struct EnvVar { + pub name: String, + pub value: Option, +} + +/// 日志配置 +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LoggingConfig { + pub log_type: String, // "stdout" | "emptyDir" | "persistent" + pub volume_size: Option, + pub storage_class: Option, +} + +/// 更新 Tenant 响应 +#[derive(Debug, Serialize)] +pub struct UpdateTenantResponse { + pub success: bool, + pub message: String, + pub tenant: TenantListItem, +} diff --git a/src/console/routes/mod.rs b/src/console/routes/mod.rs index 87815be..1a17877 100644 --- a/src/console/routes/mod.rs +++ b/src/console/routes/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::{routing::{delete, get, post}, Router}; +use axum::{routing::{delete, get, post, put}, Router}; use crate::console::{handlers, state::AppState}; @@ -37,12 +37,58 @@ pub fn tenant_routes() -> Router { "/namespaces/:namespace/tenants/:name", get(handlers::tenants::get_tenant_details), ) + .route( + "/namespaces/:namespace/tenants/:name", + put(handlers::tenants::update_tenant), + ) .route( "/namespaces/:namespace/tenants/:name", delete(handlers::tenants::delete_tenant), ) } +/// Pool 管理路由 +pub fn pool_routes() -> Router { + Router::new() + .route( + "/namespaces/:namespace/tenants/:name/pools", + get(handlers::pools::list_pools), + ) + .route( + "/namespaces/:namespace/tenants/:name/pools", + post(handlers::pools::add_pool), + ) + .route( + "/namespaces/:namespace/tenants/:name/pools/:pool", + delete(handlers::pools::delete_pool), + ) +} + +/// Pod 管理路由 +pub fn pod_routes() -> Router { + Router::new() + .route( + "/namespaces/:namespace/tenants/:name/pods", + get(handlers::pods::list_pods), + ) + .route( + "/namespaces/:namespace/tenants/:name/pods/:pod", + get(handlers::pods::get_pod_details), + ) + .route( + "/namespaces/:namespace/tenants/:name/pods/:pod", + delete(handlers::pods::delete_pod), + ) + .route( + "/namespaces/:namespace/tenants/:name/pods/:pod/restart", + post(handlers::pods::restart_pod), + ) + .route( + "/namespaces/:namespace/tenants/:name/pods/:pod/logs", + get(handlers::pods::get_pod_logs), + ) +} + /// 事件管理路由 pub fn event_routes() -> Router { Router::new().route( diff --git a/src/console/server.rs b/src/console/server.rs index 27fe173..bdcf7ed 100644 --- a/src/console/server.rs +++ b/src/console/server.rs @@ -82,6 +82,8 @@ fn api_routes() -> Router { Router::new() .merge(routes::auth_routes()) .merge(routes::tenant_routes()) + .merge(routes::pool_routes()) + .merge(routes::pod_routes()) .merge(routes::event_routes()) .merge(routes::cluster_routes()) } From b5835225db30ad0651f84d0671475bc7fe308216 Mon Sep 17 00:00:00 2001 From: GatewayJ <835269233@qq.com> Date: Sat, 7 Feb 2026 00:19:11 -0700 Subject: [PATCH 2/2] feat(console):deployment console --- check-rustfs.sh | 13 +- cleanup-rustfs.sh | 95 +----- deploy-rustfs.sh | 170 +++++++--- deploy/console/KUBERNETES-INTEGRATION.md | 236 ------------- deploy/console/README.md | 315 ------------------ .../console/examples/ingress-tls-example.md | 132 -------- .../console/examples/loadbalancer-example.md | 77 ----- deploy/k8s-dev/console-deployment.yaml | 50 +++ deploy/k8s-dev/console-rbac.yaml | 51 +++ deploy/k8s-dev/console-service.yaml | 18 + deploy/k8s-dev/operator-deployment.yaml | 29 ++ deploy/k8s-dev/operator-rbac.yaml | 48 +++ src/console/error.rs | 11 +- src/console/handlers/auth.rs | 28 +- src/console/handlers/cluster.rs | 49 ++- src/console/handlers/events.rs | 12 +- src/console/handlers/tenants.rs | 53 +-- src/console/middleware/auth.rs | 27 +- src/console/routes/mod.rs | 10 +- src/console/server.rs | 30 +- 20 files changed, 442 insertions(+), 1012 deletions(-) delete mode 100644 deploy/console/KUBERNETES-INTEGRATION.md delete mode 100644 deploy/console/README.md delete mode 100644 deploy/console/examples/ingress-tls-example.md delete mode 100644 deploy/console/examples/loadbalancer-example.md create mode 100644 deploy/k8s-dev/console-deployment.yaml create mode 100644 deploy/k8s-dev/console-rbac.yaml create mode 100644 deploy/k8s-dev/console-service.yaml create mode 100644 deploy/k8s-dev/operator-deployment.yaml create mode 100644 deploy/k8s-dev/operator-rbac.yaml diff --git a/check-rustfs.sh b/check-rustfs.sh index fe4deca..d093c1f 100755 --- a/check-rustfs.sh +++ b/check-rustfs.sh @@ -117,10 +117,11 @@ echo " Access RustFS" echo "=========================================" echo "" -# Check if Console is running locally -if pgrep -f "target/release/operator.*console" >/dev/null; then - echo "✅ Operator Console (local):" - echo " Running at: http://localhost:9090" +# Operator Console (deployed in K8s) +if kubectl get deployment rustfs-operator-console -n rustfs-system >/dev/null 2>&1; then + echo "✅ Operator Console (K8s Deployment):" + echo " Port forward: kubectl port-forward -n rustfs-system svc/rustfs-operator-console 9090:9090" + echo " Then access: http://localhost:9090" echo " Health check: curl http://localhost:9090/healthz" echo " API docs: deploy/console/README.md" echo "" @@ -128,8 +129,8 @@ if pgrep -f "target/release/operator.*console" >/dev/null; then echo " Login: POST http://localhost:9090/api/v1/login" echo "" else - echo "⚠️ Operator Console not running locally" - echo " Start with: cargo run -- console --port 9090" + echo "⚠️ Operator Console Deployment not found in rustfs-system" + echo " Deploy with: ./deploy-rustfs.sh" echo "" fi diff --git a/cleanup-rustfs.sh b/cleanup-rustfs.sh index 0755a38..ae25f3f 100755 --- a/cleanup-rustfs.sh +++ b/cleanup-rustfs.sh @@ -47,9 +47,8 @@ confirm_cleanup() { echo "" log_warning "This operation will delete all RustFS resources:" echo " - Tenant: example-tenant" - echo " - Namespace: rustfs-system (including all Pods, PVCs, Services)" + echo " - Namespace: rustfs-system (including Operator, Console, Pods, PVCs, Services)" echo " - CRD: tenants.rustfs.com" - echo " - Operator process" echo "" read -p "Confirm deletion? (yes/no): " confirm @@ -87,76 +86,6 @@ delete_tenant() { fi } -# Stop Operator -stop_operator() { - log_info "Stopping Operator process..." - - # Method 1: Read from PID file - if [ -f operator.pid ]; then - local pid=$(cat operator.pid) - if ps -p $pid > /dev/null 2>&1; then - log_info "Stopping Operator (PID: $pid)..." - kill $pid 2>/dev/null || true - sleep 2 - - # If process still exists, force kill - if ps -p $pid > /dev/null 2>&1; then - log_warning "Process did not exit normally, forcing termination..." - kill -9 $pid 2>/dev/null || true - fi - fi - rm -f operator.pid - fi - - # Method 2: Find all operator processes - local operator_pids=$(pgrep -f "target/release/operator.*server" 2>/dev/null || true) - if [ -n "$operator_pids" ]; then - log_info "Found Operator processes: $operator_pids" - pkill -f "target/release/operator.*server" || true - sleep 2 - - # Force kill remaining processes - pkill -9 -f "target/release/operator.*server" 2>/dev/null || true - fi - - log_success "Operator stopped" -} - -# Stop Console -stop_console() { - log_info "Stopping Console process..." - - # Method 1: Read from PID file - if [ -f console.pid ]; then - local pid=$(cat console.pid) - if ps -p $pid > /dev/null 2>&1; then - log_info "Stopping Console (PID: $pid)..." - kill $pid 2>/dev/null || true - sleep 2 - - # If process still exists, force kill - if ps -p $pid > /dev/null 2>&1; then - log_warning "Process did not exit normally, forcing termination..." - kill -9 $pid 2>/dev/null || true - fi - fi - rm -f console.pid - fi - - # Method 2: Find all console processes - local console_pids=$(pgrep -f "target/release/operator.*console" 2>/dev/null || true) - if [ -n "$console_pids" ]; then - log_info "Found Console processes: $console_pids" - pkill -f "target/release/operator.*console" || true - sleep 2 - - # Force kill remaining processes - pkill -9 -f "target/release/operator.*console" 2>/dev/null || true - fi - - log_success "Console stopped" -} - # Delete Namespace delete_namespace() { log_info "Deleting Namespace: rustfs-system..." @@ -223,10 +152,6 @@ cleanup_local_files() { log_info "Cleaning up local files..." local files_to_clean=( - "operator.log" - "operator.pid" - "console.log" - "console.pid" "deploy/rustfs-operator/crds/tenant-crd.yaml" ) @@ -271,21 +196,7 @@ verify_cleanup() { log_success "✓ CRD cleaned" fi - # Check Operator process - if pgrep -f "target/release/operator.*server" >/dev/null; then - log_error "Operator process still running" - issues=$((issues + 1)) - else - log_success "✓ Operator stopped" - fi - - # Check Console process - if pgrep -f "target/release/operator.*console" >/dev/null; then - log_error "Console process still running" - issues=$((issues + 1)) - else - log_success "✓ Console stopped" - fi + # Operator and Console are deleted with namespace (no local process check) echo "" if [ $issues -eq 0 ]; then @@ -331,8 +242,6 @@ main() { echo "" delete_tenant - stop_console - stop_operator delete_namespace delete_crd cleanup_local_files diff --git a/deploy-rustfs.sh b/deploy-rustfs.sh index 11df063..833b6a2 100755 --- a/deploy-rustfs.sh +++ b/deploy-rustfs.sh @@ -14,6 +14,7 @@ # limitations under the License. # RustFS Operator deployment script - uses examples/simple-tenant.yaml +# Deploys Operator and Console as Kubernetes Deployments (Pods in K8s) # For quick deployment and CRD modification verification set -e @@ -50,6 +51,7 @@ check_prerequisites() { command -v kubectl >/dev/null 2>&1 || missing_tools+=("kubectl") command -v cargo >/dev/null 2>&1 || missing_tools+=("cargo") command -v kind >/dev/null 2>&1 || missing_tools+=("kind") + command -v docker >/dev/null 2>&1 || missing_tools+=("docker") if [ ${#missing_tools[@]} -ne 0 ]; then log_error "Missing required tools: ${missing_tools[*]}" @@ -59,6 +61,39 @@ check_prerequisites() { log_success "All required tools are installed" } +# Fix "too many open files" for kind (inotify limits) +# See: https://kind.sigs.k8s.io/docs/user/known-issues/#pod-errors-due-to-too-many-open-files +fix_inotify_limits() { + log_info "Applying inotify limits (fix for 'too many open files')..." + + local sysctl_conf="/etc/sysctl.d/99-rustfs-kind.conf" + local persisted=false + + if sudo sysctl -w fs.inotify.max_user_watches=524288 >/dev/null 2>&1 \ + && sudo sysctl -w fs.inotify.max_user_instances=512 >/dev/null 2>&1; then + log_success "Inotify limits applied (current session)" + persisted=true + fi + + if sudo test -w /etc/sysctl.d 2>/dev/null; then + if ! sudo grep -qs "fs.inotify.max_user_watches" "$sysctl_conf" 2>/dev/null; then + printf 'fs.inotify.max_user_watches = 524288\nfs.inotify.max_user_instances = 512\n' \ + | sudo tee "$sysctl_conf" >/dev/null 2>&1 && \ + log_success "Inotify limits persisted to $sysctl_conf" + fi + fi + + if [ "$persisted" = true ]; then + return 0 + fi + + log_warning "Could not set inotify limits (may need root). If you see kube-proxy 'too many open files' errors:" + echo " sudo sysctl fs.inotify.max_user_watches=524288" + echo " sudo sysctl fs.inotify.max_user_instances=512" + echo " # Make persistent: add to /etc/sysctl.conf or $sysctl_conf" + return 1 +} + # Check Kubernetes cluster connection check_cluster() { log_info "Checking Kubernetes cluster connection..." @@ -67,6 +102,8 @@ check_cluster() { log_error "Unable to connect to Kubernetes cluster" log_info "Attempting to start kind cluster..." + fix_inotify_limits || true + if kind get clusters | grep -q "rustfs-dev"; then log_info "Detected kind cluster 'rustfs-dev', attempting to restart..." kind delete cluster --name rustfs-dev @@ -74,6 +111,8 @@ check_cluster() { log_info "Creating new kind cluster..." kind create cluster --name rustfs-dev + else + fix_inotify_limits || true fi log_success "Kubernetes cluster connection OK: $(kubectl config current-context)" @@ -121,52 +160,47 @@ build_operator() { log_success "Operator build completed" } -# Start operator (background) -start_operator() { - log_info "Starting operator..." +# Build Docker image and deploy Operator + Console as Kubernetes Deployments +deploy_operator_and_console() { + local kind_cluster="rustfs-dev" + local image_name="rustfs/operator:dev" + + log_info "Building Docker image..." - # Check if operator is already running - if pgrep -f "target/release/operator.*server" >/dev/null; then - log_warning "Detected existing operator process" - log_info "Stopping old operator process..." - pkill -f "target/release/operator.*server" || true - sleep 2 + if ! docker build -t "$image_name" .; then + log_error "Docker build failed" + exit 1 fi - # Start new operator process (background) - nohup cargo run --release -- server > operator.log 2>&1 & - OPERATOR_PID=$! - echo $OPERATOR_PID > operator.pid + log_info "Loading image into kind cluster '$kind_cluster'..." - log_success "Operator started (PID: $OPERATOR_PID)" - log_info "Log file: operator.log" + if ! kind load docker-image "$image_name" --name "$kind_cluster"; then + log_error "Failed to load image into kind cluster" + log_info "Verify: 1) kind cluster exists: kind get clusters" + log_info " 2) kind cluster 'rustfs-dev' exists: kind get clusters" + log_info " 3) Docker is running and accessible" + exit 1 + fi - # Wait for operator to start - sleep 3 -} + log_info "Creating Console JWT secret..." -# Start console (background) -start_console() { - log_info "Starting console..." + local jwt_secret + jwt_secret=$(openssl rand -base64 32 2>/dev/null || head -c 32 /dev/urandom | base64) - # Check if console is already running - if pgrep -f "target/release/operator.*console" >/dev/null; then - log_warning "Detected existing console process" - log_info "Stopping old console process..." - pkill -f "target/release/operator.*console" || true - sleep 2 - fi + kubectl create secret generic rustfs-operator-console-secret \ + --namespace rustfs-system \ + --from-literal=jwt-secret="$jwt_secret" \ + --dry-run=client -o yaml | kubectl apply -f - - # Start new console process (background) - nohup cargo run --release -- console --port 9090 > console.log 2>&1 & - CONSOLE_PID=$! - echo $CONSOLE_PID > console.pid + log_info "Deploying Operator and Console (Deployment)..." - log_success "Console started (PID: $CONSOLE_PID)" - log_info "Log file: console.log" + kubectl apply -f deploy/k8s-dev/operator-rbac.yaml + kubectl apply -f deploy/k8s-dev/console-rbac.yaml + kubectl apply -f deploy/k8s-dev/operator-deployment.yaml + kubectl apply -f deploy/k8s-dev/console-deployment.yaml + kubectl apply -f deploy/k8s-dev/console-service.yaml - # Wait for console to start - sleep 2 + log_success "Operator and Console deployed to Kubernetes" } # Deploy Tenant (EC 2+1 configuration) @@ -178,24 +212,25 @@ deploy_tenant() { log_success "Tenant submitted" } -# Wait for pods to be ready +# Wait for pods to be ready (1 operator + 1 console + 2 tenant = 4) wait_for_pods() { log_info "Waiting for pods to start (max 5 minutes)..." local timeout=300 local elapsed=0 local interval=5 + local expected_pods=4 while [ $elapsed -lt $timeout ]; do local ready_count=$(kubectl get pods -n rustfs-system --no-headers 2>/dev/null | grep -c "Running" || echo "0") local total_count=$(kubectl get pods -n rustfs-system --no-headers 2>/dev/null | wc -l || echo "0") - if [ "$ready_count" -eq 2 ] && [ "$total_count" -eq 2 ]; then - log_success "All pods are ready (2/2 Running)" + if [ "$ready_count" -eq "$expected_pods" ] && [ "$total_count" -eq "$expected_pods" ]; then + log_success "All pods are ready ($expected_pods/$expected_pods Running)" return 0 fi - echo -ne "${BLUE}[INFO]${NC} Pod status: $ready_count/2 Running, waited ${elapsed}s...\r" + echo -ne "${BLUE}[INFO]${NC} Pod status: $ready_count/$expected_pods Running, waited ${elapsed}s...\r" sleep $interval elapsed=$((elapsed + interval)) done @@ -212,23 +247,27 @@ show_status() { log_info "==========================================" echo "" - log_info "1. Tenant status:" + log_info "1. Deployment status:" + kubectl get deployment -n rustfs-system + echo "" + + log_info "2. Tenant status:" kubectl get tenant -n rustfs-system echo "" - log_info "2. Pod status:" + log_info "3. Pod status:" kubectl get pods -n rustfs-system -o wide echo "" - log_info "3. Service status:" + log_info "4. Service status:" kubectl get svc -n rustfs-system echo "" - log_info "4. PVC status:" + log_info "5. PVC status:" kubectl get pvc -n rustfs-system echo "" - log_info "5. StatefulSet status:" + log_info "6. StatefulSet status:" kubectl get statefulset -n rustfs-system echo "" } @@ -241,7 +280,9 @@ show_access_info() { echo "" echo "📋 View logs:" - echo " kubectl logs -f example-tenant-primary-0 -n rustfs-system" + echo " Operator: kubectl logs -f deployment/rustfs-operator -n rustfs-system" + echo " Console: kubectl logs -f deployment/rustfs-operator-console -n rustfs-system" + echo " RustFS: kubectl logs -f example-tenant-primary-0 -n rustfs-system" echo "" echo "🔌 Port forward S3 API (9000):" @@ -252,9 +293,9 @@ show_access_info() { echo " kubectl port-forward -n rustfs-system svc/example-tenant-console 9001:9001" echo "" - echo "🖥️ Operator Console (Management API):" - echo " Listening on: http://localhost:9090" - echo " Health check: curl http://localhost:9090/healthz" + echo "🖥️ Operator Console (Management API, port 9090):" + echo " kubectl port-forward -n rustfs-system svc/rustfs-operator-console 9090:9090" + echo " Then: curl http://localhost:9090/healthz" echo "" echo "🔐 RustFS Credentials:" @@ -276,9 +317,10 @@ show_access_info() { echo " ./cleanup-rustfs.sh" echo "" - echo "📝 Logs:" - echo " Operator: tail -f operator.log" - echo " Console: tail -f console.log" + echo "⚠️ If pods show 'ImagePullBackOff' or 'image not present':" + echo " docker build -t rustfs/operator:dev ." + echo " kind load docker-image rustfs/operator:dev --name rustfs-dev" + echo " kubectl rollout restart deployment -n rustfs-system" echo "" } @@ -299,8 +341,7 @@ main() { deploy_crd create_namespace build_operator - start_operator - start_console + deploy_operator_and_console deploy_tenant echo "" @@ -318,5 +359,26 @@ main() { # Catch Ctrl+C trap 'log_error "Deployment interrupted"; exit 1' INT +# Parse arguments +case "${1:-}" in + --fix-limits) + log_info "Fix inotify limits for kind (kube-proxy 'too many open files')" + fix_inotify_limits + echo "" + log_info "If cluster already has issues, delete and recreate:" + echo " kind delete cluster --name rustfs-dev" + echo " ./deploy-rustfs.sh" + exit 0 + ;; + -h|--help) + echo "Usage: $0 [options]" + echo "" + echo "Options:" + echo " --fix-limits Apply inotify limits (fix 'too many open files'), then exit" + echo " -h, --help Show this help" + exit 0 + ;; +esac + # 执行主流程 main "$@" diff --git a/deploy/console/KUBERNETES-INTEGRATION.md b/deploy/console/KUBERNETES-INTEGRATION.md deleted file mode 100644 index 9782a25..0000000 --- a/deploy/console/KUBERNETES-INTEGRATION.md +++ /dev/null @@ -1,236 +0,0 @@ -# RustFS Operator Console - Kubernetes Integration Summary - -## ✅ 已完成的集成 - -### 1. Helm Chart 模板(7个文件) - -已在 `deploy/rustfs-operator/templates/` 中创建: - -- **console-deployment.yaml** - Console Deployment 配置 - - 运行 `./operator console --port 9090` - - 健康检查和就绪探针 - - JWT secret 通过环境变量注入 - - 支持多副本部署 - -- **console-service.yaml** - Service 配置 - - 支持 ClusterIP / NodePort / LoadBalancer - - 默认端口 9090 - -- **console-serviceaccount.yaml** - ServiceAccount - -- **console-clusterrole.yaml** - RBAC ClusterRole - - Tenant 资源:完整 CRUD 权限 - - Namespace:读取和创建权限 - - Nodes, Events, Services, Pods:只读权限 - -- **console-clusterrolebinding.yaml** - RBAC 绑定 - -- **console-secret.yaml** - JWT Secret - - 自动生成或使用配置的密钥 - -- **console-ingress.yaml** - Ingress 配置(可选) - - 支持 TLS - - 可配置域名和路径 - -### 2. Helm Values 配置 - -`deploy/rustfs-operator/values.yaml` 中新增 `console` 配置段: - -```yaml -console: - enabled: true # 启用/禁用 Console - replicas: 1 # 副本数 - port: 9090 # 端口 - logLevel: info # 日志级别 - jwtSecret: "" # JWT 密钥(留空自动生成) - - image: {} # 镜像配置(使用 operator 镜像) - resources: {} # 资源限制 - service: {} # Service 配置 - ingress: {} # Ingress 配置 - rbac: {} # RBAC 配置 - serviceAccount: {} # ServiceAccount 配置 -``` - -### 3. Helm Helpers - -`deploy/rustfs-operator/templates/_helpers.tpl` 中新增: - -- `rustfs-operator.consoleServiceAccountName` - Console ServiceAccount 名称生成 - -### 4. 部署文档 - -- **deploy/console/README.md** - 完整部署指南 - - 架构说明 - - 部署方法(Helm / kubectl) - - API 端点文档 - - 认证说明 - - RBAC 权限说明 - - 安全考虑 - - 故障排查 - -- **deploy/console/examples/loadbalancer-example.md** - LoadBalancer 部署示例 - -- **deploy/console/examples/ingress-tls-example.md** - Ingress + TLS 部署示例 - -## 部署方式 - -### 方式一:Helm(推荐) - -```bash -# 启用 Console 部署 -helm install rustfs-operator deploy/rustfs-operator \ - --set console.enabled=true - -# 使用 LoadBalancer -helm install rustfs-operator deploy/rustfs-operator \ - --set console.enabled=true \ - --set console.service.type=LoadBalancer - -# 自定义配置 -helm install rustfs-operator deploy/rustfs-operator \ - -f custom-values.yaml -``` - -### 方式二:独立部署 - -可以从 Helm 模板生成 YAML 文件独立部署(需要 helm 命令): - -```bash -helm template rustfs-operator deploy/rustfs-operator \ - --set console.enabled=true \ - > console-manifests.yaml - -kubectl apply -f console-manifests.yaml -``` - -## 访问方式 - -### ClusterIP + Port Forward - -```bash -kubectl port-forward svc/rustfs-operator-console 9090:9090 -# 访问 http://localhost:9090 -``` - -### LoadBalancer - -```bash -kubectl get svc rustfs-operator-console -# 访问 http://:9090 -``` - -### Ingress - -```bash -# 访问 https://your-domain.com -``` - -## API 测试 - -```bash -# 健康检查 -curl http://localhost:9090/healthz # => "OK" - -# 创建测试用户 -kubectl create serviceaccount test-user -kubectl create clusterrolebinding test-admin \ - --clusterrole=cluster-admin \ - --serviceaccount=default:test-user - -# 登录 -TOKEN=$(kubectl create token test-user --duration=1h) -curl -X POST http://localhost:9090/api/v1/login \ - -H "Content-Type: application/json" \ - -d "{\"token\": \"$TOKEN\"}" \ - -c cookies.txt - -# 访问 API -curl http://localhost:9090/api/v1/tenants -b cookies.txt -``` - -## 架构 - -``` -┌─────────────────────────────────────────────────────────┐ -│ Kubernetes Cluster │ -│ │ -│ ┌────────────────────┐ ┌─────────────────────┐ │ -│ │ Operator Pod │ │ Console Pod(s) │ │ -│ │ │ │ │ │ -│ │ ./operator server │ │ ./operator console │ │ -│ │ │ │ --port 9090 │ │ -│ │ - Reconcile Loop │ │ │ │ -│ │ - Watch Tenants │ │ - REST API │ │ -│ │ - Manage K8s Res │ │ - JWT Auth │ │ -│ └────────────────────┘ │ - Query K8s API │ │ -│ │ └─────────────────────┘ │ -│ │ │ │ -│ ▼ ▼ │ -│ ┌──────────────────────────────────────────────────┐ │ -│ │ Kubernetes API Server │ │ -│ │ │ │ -│ │ - Tenant CRDs │ │ -│ │ - Deployments, Services, ConfigMaps, etc. │ │ -│ └──────────────────────────────────────────────────┘ │ -│ │ -└─────────────────────────────────────────────────────────┘ - ▲ - │ - ┌────────┴────────┐ - │ Users/Clients │ - │ │ - │ HTTP API Calls │ - └─────────────────┘ -``` - -## 安全特性 - -1. **JWT 认证** - 12小时会话过期 -2. **HttpOnly Cookies** - 防止 XSS 攻击 -3. **RBAC 集成** - 使用用户的 K8s Token 授权 -4. **最小权限** - Console ServiceAccount 仅有必要权限 -5. **TLS 支持** - 通过 Ingress 配置 HTTPS - -## 下一步 - -1. **构建镜像**:Docker 镜像已包含 `console` 命令,无需修改 Dockerfile -2. **部署测试**:使用 Helm 或 kubectl 部署到集群 -3. **集成前端**:(可选)开发 Web UI 调用 REST API -4. **添加监控**:集成 Prometheus metrics(未来增强) - -## 相关文件 - -``` -deploy/ -├── rustfs-operator/ -│ ├── templates/ -│ │ ├── console-deployment.yaml ✅ -│ │ ├── console-service.yaml ✅ -│ │ ├── console-serviceaccount.yaml ✅ -│ │ ├── console-clusterrole.yaml ✅ -│ │ ├── console-clusterrolebinding.yaml ✅ -│ │ ├── console-secret.yaml ✅ -│ │ ├── console-ingress.yaml ✅ -│ │ └── _helpers.tpl ✅ (已更新) -│ └── values.yaml ✅ (已更新) -└── console/ - ├── README.md ✅ - └── examples/ - ├── loadbalancer-example.md ✅ - └── ingress-tls-example.md ✅ -``` - -## 总结 - -Console 后端已完全集成到 Kubernetes 部署体系中: - -✅ Helm Chart 模板完整 -✅ RBAC 权限配置 -✅ Service、Ingress 支持 -✅ 健康检查、就绪探针 -✅ 安全配置(JWT Secret) -✅ 部署文档和示例 -✅ 多种部署方式支持 - -**状态:生产就绪,可部署到 Kubernetes 集群** 🚀 diff --git a/deploy/console/README.md b/deploy/console/README.md deleted file mode 100644 index 43d466b..0000000 --- a/deploy/console/README.md +++ /dev/null @@ -1,315 +0,0 @@ -# RustFS Operator Console Deployment Guide - -## Overview - -The RustFS Operator Console provides a web-based management interface for RustFS Tenants deployed in Kubernetes. It offers a REST API for managing tenants, viewing events, and monitoring cluster resources. - -## Architecture - -The Console is deployed as a separate Deployment alongside the Operator: -- **Operator**: Watches Tenant CRDs and reconciles Kubernetes resources -- **Console**: Provides REST API for management operations - -Both components use the same Docker image but run different commands: -- Operator: `./operator server` -- Console: `./operator console --port 9090` - -## Deployment Methods - -### Option 1: Helm Chart (Recommended) - -The Console is integrated into the main Helm chart and can be enabled via `values.yaml`. - -#### Install with Console enabled: - -```bash -helm install rustfs-operator deploy/rustfs-operator \ - --set console.enabled=true \ - --set console.service.type=LoadBalancer -``` - -#### Upgrade existing installation to enable Console: - -```bash -helm upgrade rustfs-operator deploy/rustfs-operator \ - --set console.enabled=true -``` - -#### Custom configuration: - -Create a `custom-values.yaml`: - -```yaml -console: - enabled: true - - # Number of replicas - replicas: 2 - - # JWT secret for session signing (recommended: generate with openssl rand -base64 32) - jwtSecret: "your-secure-random-secret-here" - - # Service configuration - service: - type: LoadBalancer - port: 9090 - annotations: - service.beta.kubernetes.io/aws-load-balancer-type: "nlb" - - # Ingress configuration - ingress: - enabled: true - className: nginx - annotations: - cert-manager.io/cluster-issuer: letsencrypt-prod - hosts: - - host: rustfs-console.example.com - paths: - - path: / - pathType: Prefix - tls: - - secretName: rustfs-console-tls - hosts: - - rustfs-console.example.com - - # Resource limits - resources: - requests: - cpu: 100m - memory: 128Mi - limits: - cpu: 500m - memory: 512Mi -``` - -Apply the configuration: - -```bash -helm upgrade --install rustfs-operator deploy/rustfs-operator \ - -f custom-values.yaml -``` - -### Option 2: kubectl apply (Standalone) - -For manual deployment or customization, you can use standalone YAML files. - -See `deploy/console/` directory for standalone deployment manifests. - -## Accessing the Console - -### Via Service (ClusterIP) - -```bash -# Port forward to local machine -kubectl port-forward svc/rustfs-operator-console 9090:9090 - -# Access at http://localhost:9090 -``` - -### Via LoadBalancer - -```bash -# Get the external IP -kubectl get svc rustfs-operator-console - -# Access at http://:9090 -``` - -### Via Ingress - -Access via the configured hostname (e.g., `https://rustfs-console.example.com`) - -## API Endpoints - -### Health & Readiness - -- `GET /healthz` - Health check -- `GET /readyz` - Readiness check - -### Authentication - -- `POST /api/v1/login` - Login with Kubernetes token - ```json - { - "token": "eyJhbGciOiJSUzI1NiIsImtpZCI6..." - } - ``` - -- `POST /api/v1/logout` - Logout and clear session -- `GET /api/v1/session` - Check session status - -### Tenant Management - -- `GET /api/v1/tenants` - List all tenants -- `GET /api/v1/namespaces/{ns}/tenants` - List tenants in namespace -- `GET /api/v1/namespaces/{ns}/tenants/{name}` - Get tenant details -- `POST /api/v1/namespaces/{ns}/tenants` - Create tenant -- `DELETE /api/v1/namespaces/{ns}/tenants/{name}` - Delete tenant - -### Events - -- `GET /api/v1/namespaces/{ns}/tenants/{name}/events` - List tenant events - -### Cluster Resources - -- `GET /api/v1/nodes` - List cluster nodes -- `GET /api/v1/namespaces` - List namespaces -- `POST /api/v1/namespaces` - Create namespace -- `GET /api/v1/cluster/resources` - Get cluster resource summary - -## Authentication - -The Console uses JWT-based authentication with Kubernetes ServiceAccount tokens: - -1. **Login**: Users provide their Kubernetes ServiceAccount token -2. **Validation**: Console validates the token by making a test API call to Kubernetes -3. **Session**: Console generates a JWT session token (12-hour expiry) -4. **Cookie**: Session token stored in HttpOnly cookie -5. **Authorization**: All API requests use the user's Kubernetes token for authorization - -### Getting a Kubernetes Token - -```bash -# Create a ServiceAccount -kubectl create serviceaccount console-user - -# Create ClusterRoleBinding (for admin access) -kubectl create clusterrolebinding console-user-admin \ - --clusterrole=cluster-admin \ - --serviceaccount=default:console-user - -# Get the token -kubectl create token console-user --duration=24h -``` - -### Login Example - -```bash -TOKEN=$(kubectl create token console-user --duration=24h) - -curl -X POST http://localhost:9090/api/v1/login \ - -H "Content-Type: application/json" \ - -d "{\"token\": \"$TOKEN\"}" \ - -c cookies.txt - -# Subsequent requests use the cookie -curl http://localhost:9090/api/v1/tenants \ - -b cookies.txt -``` - -## RBAC Permissions - -The Console ServiceAccount has the following permissions: - -- **Tenants**: Full CRUD operations -- **Namespaces**: List and create -- **Services, Pods, ConfigMaps, Secrets**: Read-only -- **Nodes**: Read-only -- **Events**: Read-only -- **StatefulSets**: Read-only -- **PersistentVolumeClaims**: Read-only - -Users authenticate with their own Kubernetes tokens, so actual permissions depend on the user's RBAC roles. - -## Security Considerations - -1. **JWT Secret**: Always set a strong random JWT secret in production - ```bash - openssl rand -base64 32 - ``` - -2. **TLS/HTTPS**: Enable Ingress with TLS for production deployments - -3. **Network Policies**: Restrict Console access to specific namespaces/pods - -4. **RBAC**: Console requires cluster-wide read access and tenant management permissions - -5. **Session Expiry**: Default 12-hour session timeout (configurable in code) - -6. **CORS**: Configure allowed origins based on your frontend deployment - -## Monitoring - -### Prometheus Metrics - -(To be implemented - placeholder for future enhancement) - -### Logs - -```bash -# View Console logs -kubectl logs -l app.kubernetes.io/component=console -f - -# Set log level -helm upgrade rustfs-operator deploy/rustfs-operator \ - --set console.logLevel=debug -``` - -## Troubleshooting - -### Console Pod Not Starting - -```bash -# Check pod status -kubectl get pods -l app.kubernetes.io/component=console - -# View events -kubectl describe pod -l app.kubernetes.io/component=console - -# Check logs -kubectl logs -l app.kubernetes.io/component=console -``` - -### Authentication Failures - -- Verify Kubernetes token is valid: `kubectl auth can-i get tenants --as=system:serviceaccount:default:console-user` -- Check Console ServiceAccount has proper RBAC permissions -- Verify JWT_SECRET is consistent across Console replicas - -### CORS Errors - -- Update CORS configuration in `src/console/server.rs` -- Rebuild and redeploy the image -- Or use Ingress annotations to handle CORS - -## Configuration Reference - -See `deploy/rustfs-operator/values.yaml` for complete configuration options: - -```yaml -console: - enabled: true|false # Enable/disable Console - replicas: 1 # Number of replicas - port: 9090 # Console port - logLevel: info # Log level - jwtSecret: "" # JWT signing secret - - image: - repository: rustfs/operator - tag: latest - pullPolicy: IfNotPresent - - resources: {} # Resource requests/limits - nodeSelector: {} # Node selection - tolerations: [] # Pod tolerations - affinity: {} # Pod affinity - - service: - type: ClusterIP # Service type - port: 9090 # Service port - - ingress: - enabled: false # Enable Ingress - className: "" # Ingress class - hosts: [] # Ingress hosts - tls: [] # TLS configuration -``` - -## Examples - -See `deploy/console/examples/` for: -- Basic deployment -- LoadBalancer service -- Ingress with TLS -- Multi-replica setup -- Custom RBAC roles diff --git a/deploy/console/examples/ingress-tls-example.md b/deploy/console/examples/ingress-tls-example.md deleted file mode 100644 index 0dc0e0c..0000000 --- a/deploy/console/examples/ingress-tls-example.md +++ /dev/null @@ -1,132 +0,0 @@ -# Example: Console with Ingress and TLS - -This example shows how to deploy the Console with Nginx Ingress and Let's Encrypt TLS certificates. - -## Prerequisites - -- Nginx Ingress Controller installed -- cert-manager installed for automatic TLS certificates -- DNS record pointing to your cluster - -## Configuration - -```yaml -# values-console-ingress.yaml -console: - enabled: true - replicas: 2 # For high availability - - # JWT secret (keep this secure!) - jwtSecret: "REPLACE_WITH_YOUR_SECRET_HERE" - - service: - type: ClusterIP # No need for LoadBalancer with Ingress - port: 9090 - - ingress: - enabled: true - className: nginx - annotations: - cert-manager.io/cluster-issuer: letsencrypt-prod - nginx.ingress.kubernetes.io/ssl-redirect: "true" - nginx.ingress.kubernetes.io/force-ssl-redirect: "true" - # Console uses cookies for auth - nginx.ingress.kubernetes.io/affinity: cookie - nginx.ingress.kubernetes.io/session-cookie-name: "console-session" - hosts: - - host: rustfs-console.example.com - paths: - - path: / - pathType: Prefix - tls: - - secretName: rustfs-console-tls - hosts: - - rustfs-console.example.com - - resources: - requests: - cpu: 100m - memory: 128Mi - limits: - cpu: 500m - memory: 512Mi - - # Pod anti-affinity for HA - affinity: - podAntiAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchLabels: - app.kubernetes.io/component: console - topologyKey: kubernetes.io/hostname -``` - -## Deploy - -```bash -# Create ClusterIssuer for Let's Encrypt (if not exists) -cat < Response { let (status, error_type, message, details) = match &self { - Error::Unauthorized { message } => { - (StatusCode::UNAUTHORIZED, "Unauthorized", message.clone(), None) - } + Error::Unauthorized { message } => ( + StatusCode::UNAUTHORIZED, + "Unauthorized", + message.clone(), + None, + ), Error::Forbidden { message } => { (StatusCode::FORBIDDEN, "Forbidden", message.clone(), None) } diff --git a/src/console/handlers/auth.rs b/src/console/handlers/auth.rs index e1c96a0..f451f19 100644 --- a/src/console/handlers/auth.rs +++ b/src/console/handlers/auth.rs @@ -12,13 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::{ - extract::State, - http::header, - response::IntoResponse, - Extension, Json, -}; -use jsonwebtoken::{encode, EncodingKey, Header}; +use axum::{Extension, Json, extract::State, http::header, response::IntoResponse}; +use jsonwebtoken::{EncodingKey, Header, encode}; use kube::Client; use snafu::ResultExt; @@ -30,8 +25,11 @@ use crate::console::{ use crate::types::v1alpha1::tenant::Tenant; /// 登录处理 -/// -/// 验证 Kubernetes Token 并生成 Console Session Token +// TOKEN=$(kubectl create token rustfs-operator -n rustfs-system --duration=24h) + +// curl -X POST http://localhost:9090/api/v1/login \ +// -H "Content-Type: application/json" \ +// -d "{\"token\": \"$TOKEN\"}" pub async fn login( State(state): State, Json(req): Json, @@ -96,8 +94,8 @@ pub async fn logout() -> impl IntoResponse { /// 检查会话 pub async fn session_check(Extension(claims): Extension) -> Json { - let expires_at = chrono::DateTime::from_timestamp(claims.exp as i64, 0) - .map(|dt| dt.to_rfc3339()); + let expires_at = + chrono::DateTime::from_timestamp(claims.exp as i64, 0).map(|dt| dt.to_rfc3339()); Json(SessionResponse { valid: true, @@ -108,9 +106,11 @@ pub async fn session_check(Extension(claims): Extension) -> Json Result { // 使用默认配置加载 - let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer { - message: format!("Failed to load kubeconfig: {}", e), - })?; + let mut config = kube::Config::infer() + .await + .map_err(|e| Error::InternalServer { + message: format!("Failed to load kubeconfig: {}", e), + })?; // 覆盖 token config.auth_info.token = Some(token.to_string().into()); diff --git a/src/console/handlers/cluster.rs b/src/console/handlers/cluster.rs index bf0d50f..f6b2a22 100644 --- a/src/console/handlers/cluster.rs +++ b/src/console/handlers/cluster.rs @@ -14,7 +14,7 @@ use axum::{Extension, Json}; use k8s_openapi::api::core::v1 as corev1; -use kube::{api::ListParams, Api, Client, ResourceExt}; +use kube::{Api, Client, ResourceExt, api::ListParams}; use snafu::ResultExt; use crate::console::{ @@ -138,10 +138,7 @@ pub async fn list_namespaces( .as_ref() .and_then(|s| s.phase.clone()) .unwrap_or_else(|| "Unknown".to_string()), - created_at: ns - .metadata - .creation_timestamp - .map(|ts| ts.0.to_rfc3339()), + created_at: ns.metadata.creation_timestamp.map(|ts| ts.0.to_rfc3339()), }) .collect(); @@ -198,24 +195,24 @@ pub async fn get_cluster_resources( let total_nodes = nodes.items.len(); // 简化统计 (实际生产中需要更精确的计算) - let (total_cpu, total_memory, allocatable_cpu, allocatable_memory) = nodes - .items - .iter() - .fold( - (String::new(), String::new(), String::new(), String::new()), - |acc, node| { - // 这里简化处理,实际需要累加 Quantity - if let Some(status) = &node.status { - if let Some(capacity) = &status.capacity { - // 实际应该累加,这里仅作演示 - let cpu = capacity.get("cpu").map(|q| q.0.clone()).unwrap_or_default(); - let mem = capacity.get("memory").map(|q| q.0.clone()).unwrap_or_default(); - return (cpu, mem, acc.2, acc.3); - } + let (total_cpu, total_memory, allocatable_cpu, allocatable_memory) = nodes.items.iter().fold( + (String::new(), String::new(), String::new(), String::new()), + |acc, node| { + // 这里简化处理,实际需要累加 Quantity + if let Some(status) = &node.status { + if let Some(capacity) = &status.capacity { + // 实际应该累加,这里仅作演示 + let cpu = capacity.get("cpu").map(|q| q.0.clone()).unwrap_or_default(); + let mem = capacity + .get("memory") + .map(|q| q.0.clone()) + .unwrap_or_default(); + return (cpu, mem, acc.2, acc.3); } - acc - }, - ); + } + acc + }, + ); Ok(Json(ClusterResourcesResponse { total_nodes, @@ -228,9 +225,11 @@ pub async fn get_cluster_resources( /// 创建 Kubernetes 客户端 async fn create_client(claims: &Claims) -> Result { - let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer { - message: format!("Failed to load kubeconfig: {}", e), - })?; + let mut config = kube::Config::infer() + .await + .map_err(|e| Error::InternalServer { + message: format!("Failed to load kubeconfig: {}", e), + })?; config.auth_info.token = Some(claims.k8s_token.clone().into()); diff --git a/src/console/handlers/events.rs b/src/console/handlers/events.rs index f85125a..950a495 100644 --- a/src/console/handlers/events.rs +++ b/src/console/handlers/events.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::{extract::Path, Extension, Json}; +use axum::{Extension, Json, extract::Path}; use k8s_openapi::api::core::v1 as corev1; -use kube::{api::ListParams, Api, Client}; +use kube::{Api, Client, api::ListParams}; use snafu::ResultExt; use crate::console::{ @@ -60,9 +60,11 @@ pub async fn list_tenant_events( /// 创建 Kubernetes 客户端 async fn create_client(claims: &Claims) -> Result { - let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer { - message: format!("Failed to load kubeconfig: {}", e), - })?; + let mut config = kube::Config::infer() + .await + .map_err(|e| Error::InternalServer { + message: format!("Failed to load kubeconfig: {}", e), + })?; config.auth_info.token = Some(claims.k8s_token.clone().into()); diff --git a/src/console/handlers/tenants.rs b/src/console/handlers/tenants.rs index 19dde46..f042817 100644 --- a/src/console/handlers/tenants.rs +++ b/src/console/handlers/tenants.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::{extract::Path, Extension, Json}; +use axum::{Extension, Json, extract::Path}; use k8s_openapi::api::core::v1 as corev1; -use kube::{api::ListParams, Api, Client, ResourceExt}; +use kube::{Api, Client, ResourceExt, api::ListParams}; use snafu::ResultExt; use crate::console::{ @@ -24,8 +24,15 @@ use crate::console::{ }; use crate::types::v1alpha1::{persistence::PersistenceConfig, pool::Pool, tenant::Tenant}; -/// 列出所有 Tenants -pub async fn list_all_tenants(Extension(claims): Extension) -> Result> { +// curl -s -X POST http://localhost:9090/api/v1/login \ +// -H "Content-Type: application/json" \ +// -d "{\"token\": \"$(kubectl create token rustfs-operator-console -n rustfs-system --duration=24h)\"}" \ +// -c cookies.txt + +// curl -b cookies.txt http://localhost:9090/api/v1/tenants +pub async fn list_all_tenants( + Extension(claims): Extension, +) -> Result> { let client = create_client(&claims).await?; let api: Api = Api::all(client); @@ -55,10 +62,7 @@ pub async fn list_all_tenants(Extension(claims): Extension) -> Result Result { - let mut config = kube::Config::infer().await.map_err(|e| Error::InternalServer { - message: format!("Failed to load kubeconfig: {}", e), - })?; + let mut config = kube::Config::infer() + .await + .map_err(|e| Error::InternalServer { + message: format!("Failed to load kubeconfig: {}", e), + })?; config.auth_info.token = Some(claims.k8s_token.clone().into()); diff --git a/src/console/middleware/auth.rs b/src/console/middleware/auth.rs index 75a1c2c..ca657fe 100644 --- a/src/console/middleware/auth.rs +++ b/src/console/middleware/auth.rs @@ -14,11 +14,11 @@ use axum::{ extract::{Request, State}, - http::{header, StatusCode}, + http::{StatusCode, header}, middleware::Next, response::Response, }; -use jsonwebtoken::{decode, DecodingKey, Validation}; +use jsonwebtoken::{DecodingKey, Validation, decode}; use crate::console::state::{AppState, Claims}; @@ -72,16 +72,14 @@ pub async fn auth_middleware( /// 从 Cookie 字符串中解析 session token fn parse_session_cookie(cookies: &str) -> Option { - cookies - .split(';') - .find_map(|cookie| { - let parts: Vec<&str> = cookie.trim().splitn(2, '=').collect(); - if parts.len() == 2 && parts[0] == "session" { - Some(parts[1].to_string()) - } else { - None - } - }) + cookies.split(';').find_map(|cookie| { + let parts: Vec<&str> = cookie.trim().splitn(2, '=').collect(); + if parts.len() == 2 && parts[0] == "session" { + Some(parts[1].to_string()) + } else { + None + } + }) } #[cfg(test)] @@ -91,7 +89,10 @@ mod tests { #[test] fn test_parse_session_cookie() { let cookies = "session=test_token; other=value"; - assert_eq!(parse_session_cookie(cookies), Some("test_token".to_string())); + assert_eq!( + parse_session_cookie(cookies), + Some("test_token".to_string()) + ); let cookies = "other=value"; assert_eq!(parse_session_cookie(cookies), None); diff --git a/src/console/routes/mod.rs b/src/console/routes/mod.rs index 1a17877..cdfab9a 100644 --- a/src/console/routes/mod.rs +++ b/src/console/routes/mod.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::{routing::{delete, get, post, put}, Router}; +use axum::{ + Router, + routing::{delete, get, post}, +}; use crate::console::{handlers, state::AppState}; @@ -101,7 +104,10 @@ pub fn event_routes() -> Router { pub fn cluster_routes() -> Router { Router::new() .route("/cluster/nodes", get(handlers::cluster::list_nodes)) - .route("/cluster/resources", get(handlers::cluster::get_cluster_resources)) + .route( + "/cluster/resources", + get(handlers::cluster::get_cluster_resources), + ) .route("/namespaces", get(handlers::cluster::list_namespaces)) .route("/namespaces", post(handlers::cluster::create_namespace)) } diff --git a/src/console/server.rs b/src/console/server.rs index bdcf7ed..6840b18 100644 --- a/src/console/server.rs +++ b/src/console/server.rs @@ -12,21 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use axum::{ - middleware, - routing::get, - Router, - http::StatusCode, - response::IntoResponse, -}; -use tower_http::{ - compression::CompressionLayer, - cors::CorsLayer, - trace::TraceLayer, -}; +use crate::console::{routes, state::AppState}; use axum::http::{HeaderValue, Method, header}; - -use crate::console::{state::AppState, routes}; +use axum::{Router, http::StatusCode, middleware, response::IntoResponse, routing::get}; +use tower_http::{compression::CompressionLayer, cors::CorsLayer, trace::TraceLayer}; /// 启动 Console HTTP Server pub async fn run(port: u16) -> Result<(), Box> { @@ -53,7 +42,13 @@ pub async fn run(port: u16) -> Result<(), Box> { .layer( CorsLayer::new() .allow_origin("http://localhost:3000".parse::().unwrap()) - .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE, Method::OPTIONS]) + .allow_methods([ + Method::GET, + Method::POST, + Method::PUT, + Method::DELETE, + Method::OPTIONS, + ]) .allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION, header::COOKIE]) .allow_credentials(true), ) @@ -90,7 +85,10 @@ fn api_routes() -> Router { /// 健康检查 async fn health_check() -> impl IntoResponse { - (StatusCode::OK, "OK") + let since_epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap(); + (StatusCode::OK, format!("OK: {}", since_epoch.as_secs())) } /// 就绪检查