diff --git a/backend/cmd/api/main.go b/backend/cmd/api/main.go
index 4f1a3be..641c4b0 100644
--- a/backend/cmd/api/main.go
+++ b/backend/cmd/api/main.go
@@ -284,6 +284,7 @@ func setupRouter(
protected.HandleFunc("/clusters/{cluster_id}/instances/{instance_id}", instanceHandler.DeleteInstance).Methods(http.MethodDelete)
protected.HandleFunc("/clusters/{cluster_id}/instances/{instance_id}/entries", instanceHandler.ListInstanceEntries).Methods(http.MethodGet)
protected.HandleFunc("/clusters/{cluster_id}/instances/{instance_id}/diagnostics", instanceHandler.GetInstanceDiagnostics).Methods(http.MethodGet)
+ protected.HandleFunc("/clusters/{cluster_id}/instances/{instance_id}/logs/stream", instanceHandler.StreamInstanceLogs).Methods(http.MethodGet)
// ===== Monitoring 路由 =====
protected.HandleFunc("/monitoring/clusters", monitoringHandler.ListClusterMonitoring).Methods(http.MethodGet)
diff --git a/backend/internal/adapter/input/http/dto/instance_dto.go b/backend/internal/adapter/input/http/dto/instance_dto.go
index b354055..8c70de5 100644
--- a/backend/internal/adapter/input/http/dto/instance_dto.go
+++ b/backend/internal/adapter/input/http/dto/instance_dto.go
@@ -2,23 +2,25 @@ package dto
// CreateInstanceRequest 创建实例请求
type CreateInstanceRequest struct {
- Name string `json:"name" binding:"required"`
- Namespace string `json:"namespace" binding:"required"`
- RegistryID string `json:"registryId" binding:"required"`
- RegistryIDAlt string `json:"registry_id"`
- Repository string `json:"repository" binding:"required"`
- Tag string `json:"tag" binding:"required"`
- Description string `json:"description"`
- Values map[string]interface{} `json:"values"`
- ValuesYAML string `json:"valuesYaml"`
+ Name string `json:"name" binding:"required"`
+ Namespace string `json:"namespace" binding:"required"`
+ RegistryID string `json:"registryId" binding:"required"`
+ RegistryIDAlt string `json:"registry_id"`
+ Repository string `json:"repository" binding:"required"`
+ Tag string `json:"tag" binding:"required"`
+ Description string `json:"description"`
+ Values map[string]interface{} `json:"values"`
+ ValuesYAML string `json:"valuesYaml"`
+ ValuesYAMLAlt string `json:"values_yaml"`
}
// UpdateInstanceRequest 更新实例请求
type UpdateInstanceRequest struct {
- Version string `json:"version"`
- Description string `json:"description"`
- Values map[string]interface{} `json:"values"`
- ValuesYAML string `json:"valuesYaml"`
+ Version string `json:"version"`
+ Description string `json:"description"`
+ Values map[string]interface{} `json:"values"`
+ ValuesYAML string `json:"valuesYaml"`
+ ValuesYAMLAlt string `json:"values_yaml"`
}
// Normalize 将多种命名风格的字段合并到统一字段
@@ -26,6 +28,16 @@ func (r *CreateInstanceRequest) Normalize() {
if r.RegistryID == "" {
r.RegistryID = r.RegistryIDAlt
}
+ if r.ValuesYAML == "" {
+ r.ValuesYAML = r.ValuesYAMLAlt
+ }
+}
+
+// Normalize 将多种命名风格的字段合并到统一字段
+func (r *UpdateInstanceRequest) Normalize() {
+ if r.ValuesYAML == "" {
+ r.ValuesYAML = r.ValuesYAMLAlt
+ }
}
// RollbackInstanceRequest 回滚实例请求
diff --git a/backend/internal/adapter/input/http/rest/instance_handler.go b/backend/internal/adapter/input/http/rest/instance_handler.go
index 7592a06..d3f4457 100644
--- a/backend/internal/adapter/input/http/rest/instance_handler.go
+++ b/backend/internal/adapter/input/http/rest/instance_handler.go
@@ -173,6 +173,7 @@ func (h *InstanceHandler) UpdateInstance(w http.ResponseWriter, r *http.Request)
respondError(w, http.StatusBadRequest, "Invalid request body", err.Error())
return
}
+ req.Normalize()
// 获取现有实例
instance, err := h.instanceService.GetInstance(r.Context(), instanceID)
@@ -281,6 +282,13 @@ func (h *InstanceHandler) GetInstanceDiagnostics(w http.ResponseWriter, r *http.
return
}
tailLines = parsed
+ } else if raw := strings.TrimSpace(r.URL.Query().Get("tail_lines")); raw != "" {
+ parsed, err := strconv.ParseInt(raw, 10, 64)
+ if err != nil || parsed < 0 {
+ respondError(w, http.StatusBadRequest, "Invalid tail_lines", "tail_lines must be a positive integer")
+ return
+ }
+ tailLines = parsed
}
diagnostics, err := h.instanceService.GetInstanceDiagnostics(r.Context(), clusterID, instanceID, tailLines)
@@ -298,6 +306,71 @@ func (h *InstanceHandler) GetInstanceDiagnostics(w http.ResponseWriter, r *http.
respondJSON(w, http.StatusOK, convertInstanceDiagnostics(diagnostics))
}
+func (h *InstanceHandler) StreamInstanceLogs(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ clusterID := vars["cluster_id"]
+ instanceID := vars["instance_id"]
+
+ podName := strings.TrimSpace(r.URL.Query().Get("pod"))
+ containerName := strings.TrimSpace(r.URL.Query().Get("container"))
+ if podName == "" || containerName == "" {
+ respondError(w, http.StatusBadRequest, "Missing required query parameter", "both 'pod' and 'container' are required")
+ return
+ }
+
+ tailLines := int64(200)
+ if raw := strings.TrimSpace(r.URL.Query().Get("tailLines")); raw != "" {
+ parsed, err := strconv.ParseInt(raw, 10, 64)
+ if err != nil || parsed < 0 {
+ respondError(w, http.StatusBadRequest, "Invalid tailLines", "tailLines must be a positive integer")
+ return
+ }
+ tailLines = parsed
+ }
+
+ lines, errs, err := h.instanceService.StreamInstanceLogs(r.Context(), clusterID, instanceID, podName, containerName, tailLines)
+ if err != nil {
+ status := http.StatusInternalServerError
+ switch err {
+ case entity.ErrInstanceNotFound, entity.ErrClusterNotFound:
+ status = http.StatusNotFound
+ }
+ respondError(w, status, "Failed to stream instance logs", err.Error())
+ return
+ }
+
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+
+ flusher, ok := w.(http.Flusher)
+ if !ok {
+ respondError(w, http.StatusInternalServerError, "Streaming not supported", "server does not support response flushing")
+ return
+ }
+
+ for {
+ select {
+ case <-r.Context().Done():
+ return
+ case line, open := <-lines:
+ if !open {
+ fmt.Fprintf(w, "data: [DONE]\n\n")
+ flusher.Flush()
+ return
+ }
+ fmt.Fprintf(w, "data: %s\n\n", line)
+ flusher.Flush()
+ case err, open := <-errs:
+ if open && err != nil {
+ fmt.Fprintf(w, "data: [ERROR] %s\n\n", err.Error())
+ flusher.Flush()
+ }
+ }
+ }
+}
+
func convertInstanceEntry(entry *entity.InstanceEntry) *dto.InstanceEntryResponse {
portResponses := make([]dto.InstanceEntryPortResponse, 0, len(entry.Ports))
for _, port := range entry.Ports {
diff --git a/backend/internal/adapter/output/k8s/diagnostics_client.go b/backend/internal/adapter/output/k8s/diagnostics_client.go
index 146d45f..8ff484f 100644
--- a/backend/internal/adapter/output/k8s/diagnostics_client.go
+++ b/backend/internal/adapter/output/k8s/diagnostics_client.go
@@ -1,6 +1,7 @@
package k8s
import (
+ "bufio"
"context"
"fmt"
"io"
@@ -36,6 +37,23 @@ func (*MockDiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entit
}, nil
}
+func (*MockDiagnosticsClient) StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
+ lines := make(chan string, 10)
+ errs := make(chan error, 1)
+ go func() {
+ defer close(lines)
+ defer close(errs)
+ select {
+ case <-ctx.Done():
+ return
+ case lines <- "[mock] Streaming pod logs...":
+ case lines <- "[mock] Container started successfully":
+ case lines <- "[mock] Listening on :8080":
+ }
+ }()
+ return lines, errs, nil
+}
+
func (c *DiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance, tailLines int64) (*entity.InstanceDiagnostics, error) {
clientset, err := diagnosticsClientset(cluster)
if err != nil {
@@ -73,6 +91,68 @@ func (c *DiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entity.
}, nil
}
+func (c *DiagnosticsClient) StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
+ clientset, err := diagnosticsClientset(cluster)
+ if err != nil {
+ return nil, nil, err
+ }
+ if tailLines <= 0 {
+ tailLines = 200
+ }
+ if tailLines > 2000 {
+ tailLines = 2000
+ }
+
+ req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
+ Container: containerName,
+ Follow: true,
+ TailLines: &tailLines,
+ })
+
+ stream, err := req.Stream(ctx)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to open log stream for %s/%s: %w", podName, containerName, err)
+ }
+
+ lines := make(chan string, 64)
+ errs := make(chan error, 1)
+
+ go func() {
+ defer close(lines)
+ defer close(errs)
+ defer func() { _ = stream.Close() }()
+
+ scanner := bufio.NewScanner(stream)
+ // Allow long lines; Kubernetes log entries can exceed the default 64 KiB
+ scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024)
+
+ for scanner.Scan() {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ line := scanner.Text()
+ if line == "" {
+ continue
+ }
+ select {
+ case lines <- line:
+ case <-ctx.Done():
+ return
+ }
+ }
+ if err := scanner.Err(); err != nil {
+ select {
+ case errs <- err:
+ case <-ctx.Done():
+ }
+ }
+ }()
+
+ return lines, errs, nil
+}
+
func diagnosticsClientset(cluster *entity.Cluster) (kubernetes.Interface, error) {
config, err := restConfigFromCluster(cluster)
if err != nil {
diff --git a/backend/internal/domain/repository/instance_diagnostics_client.go b/backend/internal/domain/repository/instance_diagnostics_client.go
index f04b98a..2f65c2b 100644
--- a/backend/internal/domain/repository/instance_diagnostics_client.go
+++ b/backend/internal/domain/repository/instance_diagnostics_client.go
@@ -9,3 +9,9 @@ import (
type InstanceDiagnosticsClient interface {
GetDiagnostics(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance, tailLines int64) (*entity.InstanceDiagnostics, error)
}
+
+// PodLogStreamer streams pod log lines over channels. The caller reads from the
+// lines channel until it is closed; errors are sent to the errs channel.
+type PodLogStreamer interface {
+ StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error)
+}
diff --git a/backend/internal/domain/service/instance_service.go b/backend/internal/domain/service/instance_service.go
index 9532648..d714b4f 100644
--- a/backend/internal/domain/service/instance_service.go
+++ b/backend/internal/domain/service/instance_service.go
@@ -395,6 +395,28 @@ func (s *InstanceService) GetInstanceDiagnostics(ctx context.Context, clusterID,
return s.diagClient.GetDiagnostics(ctx, cluster, instance, tailLines)
}
+func (s *InstanceService) StreamInstanceLogs(ctx context.Context, clusterID, instanceID, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
+ instance, err := s.GetInstance(ctx, instanceID)
+ if err != nil {
+ return nil, nil, entity.ErrInstanceNotFound
+ }
+ if instance.ClusterID != clusterID {
+ return nil, nil, entity.ErrInstanceNotFound
+ }
+ cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
+ if err != nil {
+ return nil, nil, entity.ErrClusterNotFound
+ }
+ if s.diagClient == nil {
+ return nil, nil, fmt.Errorf("instance diagnostics client is not configured")
+ }
+ streamer, ok := s.diagClient.(repository.PodLogStreamer)
+ if !ok {
+ return nil, nil, fmt.Errorf("diagnostics client does not support log streaming")
+ }
+ return streamer.StreamPodLogs(ctx, cluster, instance.Namespace, podName, containerName, tailLines)
+}
+
func (s *InstanceService) canReadInstance(principal *authz.Principal, instance *entity.Instance) bool {
if principal.IsAdmin() {
return true
@@ -418,9 +440,12 @@ func enforceNamespaceValues(instance *entity.Instance) {
}
instance.Values["namespace"] = instance.Namespace
setExistingStringValue(instance.Values, "namespaceOverride", instance.Namespace)
+ setExistingStringValue(instance.Values, "namespace_override", instance.Namespace)
setExistingStringValue(instance.Values, "targetNamespace", instance.Namespace)
+ setExistingStringValue(instance.Values, "target_namespace", instance.Namespace)
setExistingNestedStringValue(instance.Values, "global", "namespace", instance.Namespace)
setExistingNestedStringValue(instance.Values, "global", "namespaceOverride", instance.Namespace)
+ setExistingNestedStringValue(instance.Values, "global", "namespace_override", instance.Namespace)
}
func setExistingStringValue(values map[string]interface{}, key, namespace string) {
diff --git a/frontend/src/api/axios-mutator.ts b/frontend/src/api/axios-mutator.ts
index ce47bca..5d8d62e 100644
--- a/frontend/src/api/axios-mutator.ts
+++ b/frontend/src/api/axios-mutator.ts
@@ -28,9 +28,11 @@ const isTransformablePayload = (payload: unknown) => {
return typeof payload === "object";
};
+const SKIP_RECURSE_KEYS = new Set(["values", "valuesYaml"]);
+
AXIOS_INSTANCE.interceptors.request.use((config) => {
if (isTransformablePayload(config.data)) {
- config.data = keysToSnake(config.data);
+ config.data = keysToSnake(config.data, SKIP_RECURSE_KEYS);
}
if (isTransformablePayload(config.params)) {
config.params = keysToSnake(config.params);
diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts
index 0e55bfb..c9da3a1 100644
--- a/frontend/src/api/index.ts
+++ b/frontend/src/api/index.ts
@@ -76,7 +76,7 @@ import type {
PutRegistriesRegistryIdPathParameters,
} from './generated-orval/api.schemas';
-import { customAxiosInstance } from './axios-mutator';
+import { AXIOS_INSTANCE, customAxiosInstance } from './axios-mutator';
import {
GithubComOcdpClusterServiceInternalAdapterInputHttpDtoInstanceResponseLastOperation as GeneratedInstanceLastOperationEnum,
@@ -247,6 +247,88 @@ export const getInstanceDiagnostics = (
params: options?.tailLines ? { tailLines: options.tailLines } : undefined,
});
+/**
+ * Stream pod logs via SSE from the backend.
+ * Returns an AbortController to cancel the stream at any time.
+ */
+export function streamInstanceLogs(
+ clusterId: string,
+ instanceId: string,
+ pod: string,
+ container: string,
+ tailLines: number = 200,
+ onLine: (line: string) => void,
+ onDone: () => void,
+ onError: (err: Error) => void,
+): AbortController {
+ const controller = new AbortController();
+ const baseUrl = AXIOS_INSTANCE.defaults.baseURL ?? "/api/v1";
+ const authHeader = AXIOS_INSTANCE.defaults.headers.common["Authorization"] as string | undefined;
+
+ const params = new URLSearchParams({ pod, container, tailLines: String(tailLines) });
+ const url = `${baseUrl}/clusters/${encodeURIComponent(clusterId)}/instances/${encodeURIComponent(instanceId)}/logs/stream?${params}`;
+
+ const headers: Record
{entry.error || entry.log || ""}
+const LogsTab = ({
+ data,
+ combinedLogs,
+ onCopy,
+ streamingKey,
+ streamingLines,
+ onStartStream,
+ onStopStream,
+}: {
+ data: InstanceDiagnosticsResponse;
+ combinedLogs: string;
+ onCopy: () => void;
+ streamingKey: string | null;
+ streamingLines: string[];
+ onStartStream: (pod: string, container: string) => void;
+ onStopStream: () => void;
+}) => {
+ const preRef = useRef
+ {isStreaming
+ ? streamingLines.join("\n") || "Waiting for log data..."
+ : entry.error || entry.log || ""}
+
+