Files
ocdp-go/backend/internal/adapter/input/http/rest/instance_handler.go
Ivan087 b88fe24aab fix: real K8s replicas in list API, full Helm values in modify YAML editor
- Add Replicas field to entity.Instance
- Add EnrichReplicas to InstanceService (batch K8s deployment query)
- convertInstanceResponse uses instance.Replicas instead of hardcoded 0
- ModifyModal: load full Helm values from values-diff API (Current deployed)
- Remove stale loadValuesDiff, use single useEffect for all data loading
- Fix YAML lineWidth:0 for no line wrapping
2026-05-13 16:15:11 +08:00

665 lines
20 KiB
Go

package rest
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/ocdp/cluster-service/internal/adapter/input/http/dto"
"github.com/ocdp/cluster-service/internal/domain/entity"
"github.com/ocdp/cluster-service/internal/domain/service"
"gopkg.in/yaml.v3"
)
// InstanceHandler 实例 Handler
type InstanceHandler struct {
instanceService *service.InstanceService
}
// NewInstanceHandler 创建实例 Handler
func NewInstanceHandler(instanceService *service.InstanceService) *InstanceHandler {
return &InstanceHandler{
instanceService: instanceService,
}
}
// CreateInstance 创建实例
// @Summary 创建实例
// @Description 在指定集群上部署一个 artifact
// @Tags Instances
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param cluster_id path string true "集群 ID"
// @Param request body dto.CreateInstanceRequest true "实例配置"
// @Success 201 {object} dto.InstanceResponse
// @Failure 400 {object} dto.ErrorResponse
// @Router /clusters/{cluster_id}/instances [post]
func (h *InstanceHandler) CreateInstance(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
var req dto.CreateInstanceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
respondError(w, http.StatusBadRequest, "Invalid request body", err.Error())
return
}
req.Normalize()
// Extract chart name from repository (e.g., "charts/nginx" -> "nginx")
chart := req.Repository
if lastSlash := strings.LastIndex(req.Repository, "/"); lastSlash != -1 {
chart = req.Repository[lastSlash+1:]
}
// 创建实体
instance := entity.NewInstance(
clusterID,
req.Name,
req.Namespace,
req.RegistryID,
req.Repository,
chart, // Extracted chart name
req.Tag, // Tag mapped to version
)
instance.Description = req.Description
if req.Values != nil {
instance.SetValues(req.Values)
}
if req.ValuesYAML != "" {
instance.SetValuesYAML(req.ValuesYAML)
if req.Values == nil {
values, err := parseValuesYAML(req.ValuesYAML)
if err != nil {
respondError(w, http.StatusBadRequest, "Invalid values YAML", err.Error())
return
}
instance.SetValues(values)
}
}
// 调用领域服务
if err := h.instanceService.CreateInstance(r.Context(), instance); err != nil {
respondError(w, http.StatusBadRequest, "Failed to create instance", err.Error())
return
}
respondJSON(w, http.StatusCreated, convertInstanceResponse(instance, true))
}
// GetInstance 获取实例详情
// @Summary 获取实例详情
// @Tags Instances
// @Produce json
// @Security BearerAuth
// @Param cluster_id path string true "集群 ID"
// @Param instance_id path string true "实例 ID"
// @Success 200 {object} dto.InstanceResponse
// @Failure 404 {object} dto.ErrorResponse
// @Router /clusters/{cluster_id}/instances/{instance_id} [get]
func (h *InstanceHandler) GetInstance(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
instanceID := vars["instance_id"]
instance, err := h.instanceService.GetInstance(r.Context(), instanceID)
if err != nil {
respondError(w, http.StatusNotFound, "Instance not found", err.Error())
return
}
if instance.ClusterID != clusterID {
respondError(w, http.StatusNotFound, "Instance not found", "resource does not belong to cluster")
return
}
respondJSON(w, http.StatusOK, convertInstanceResponse(instance, true))
}
// ListInstances 列出集群的所有实例
// @Summary 列出实例
// @Tags Instances
// @Produce json
// @Security BearerAuth
// @Param cluster_id path string true "集群 ID"
// @Success 200 {object} dto.InstanceListResponse
// @Failure 500 {object} dto.ErrorResponse
// @Router /clusters/{cluster_id}/instances [get]
func (h *InstanceHandler) ListInstances(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
instances, err := h.instanceService.ListInstancesByCluster(r.Context(), clusterID)
if err != nil {
respondServiceError(w, err, "Failed to list instances")
return
}
// Enrich with running replicas from K8s
instances = h.instanceService.EnrichReplicas(r.Context(), clusterID, instances)
responses := make([]*dto.InstanceResponse, 0, len(instances))
for _, instance := range instances {
responses = append(responses, convertInstanceResponse(instance, false))
}
response := &dto.InstanceListResponse{
Instances: responses,
Total: len(responses),
}
respondJSON(w, http.StatusOK, response)
}
// UpdateInstance 更新实例
// @Summary 更新实例
// @Tags Instances
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param cluster_id path string true "集群 ID"
// @Param instance_id path string true "实例 ID"
// @Param request body dto.UpdateInstanceRequest true "更新内容"
// @Success 200 {object} dto.InstanceResponse
// @Failure 404 {object} dto.ErrorResponse
// @Router /clusters/{cluster_id}/instances/{instance_id} [put]
func (h *InstanceHandler) UpdateInstance(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
instanceID := vars["instance_id"]
var req dto.UpdateInstanceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
respondError(w, http.StatusBadRequest, "Invalid request body", err.Error())
return
}
req.Normalize()
// 获取现有实例
instance, err := h.instanceService.GetInstance(r.Context(), instanceID)
if err != nil {
respondError(w, http.StatusNotFound, "Instance not found", err.Error())
return
}
// 更新字段
if req.Version != "" {
instance.Upgrade(req.Version, req.Values)
} else if req.Values != nil {
instance.SetValues(req.Values)
}
if req.Description != "" {
instance.Description = req.Description
}
if req.ValuesYAML != "" {
instance.SetValuesYAML(req.ValuesYAML)
if req.Values == nil {
values, err := parseValuesYAML(req.ValuesYAML)
if err != nil {
respondError(w, http.StatusBadRequest, "Invalid values YAML", err.Error())
return
}
instance.SetValues(values)
}
}
// 调用领域服务
if err := h.instanceService.UpdateInstance(r.Context(), instance); err != nil {
respondError(w, http.StatusBadRequest, "Failed to update instance", err.Error())
return
}
respondJSON(w, http.StatusOK, convertInstanceResponse(instance, true))
}
// DeleteInstance 删除实例
// @Summary 删除实例
// @Tags Instances
// @Produce json
// @Security BearerAuth
// @Param cluster_id path string true "集群 ID"
// @Param instance_id path string true "实例 ID"
// @Success 204 {string} string "No Content"
// @Failure 404 {object} dto.ErrorResponse
// @Router /clusters/{cluster_id}/instances/{instance_id} [delete]
func (h *InstanceHandler) DeleteInstance(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
instanceID := vars["instance_id"]
if err := h.instanceService.DeleteInstance(r.Context(), instanceID); err != nil {
respondError(w, http.StatusNotFound, "Failed to delete instance", err.Error())
return
}
w.WriteHeader(http.StatusNoContent)
}
// ListInstanceEntries 获取实例入口
// @Summary 获取实例 Service/Ingress 入口
// @Tags Instances
// @Produce json
// @Security BearerAuth
// @Param cluster_id path string true "集群 ID"
// @Param instance_id path string true "实例 ID"
// @Success 200 {array} dto.InstanceEntryResponse
// @Failure 404 {object} dto.ErrorResponse
// @Router /clusters/{cluster_id}/instances/{instance_id}/entries [get]
func (h *InstanceHandler) ListInstanceEntries(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
instanceID := vars["instance_id"]
entries, err := h.instanceService.ListInstanceEntries(r.Context(), clusterID, instanceID)
if err != nil {
status := http.StatusInternalServerError
switch err {
case entity.ErrInstanceNotFound:
status = http.StatusNotFound
case entity.ErrClusterNotFound:
status = http.StatusNotFound
}
respondError(w, status, "Failed to list instance entries", err.Error())
return
}
responses := make([]*dto.InstanceEntryResponse, 0, len(entries))
for _, entry := range entries {
responses = append(responses, convertInstanceEntry(entry))
}
respondJSON(w, http.StatusOK, responses)
}
func (h *InstanceHandler) GetInstanceDiagnostics(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
instanceID := vars["instance_id"]
tailLines := int64(200)
if raw := strings.TrimSpace(r.URL.Query().Get("tailLines")); raw != "" {
parsed, err := strconv.ParseInt(raw, 10, 64)
if err != nil || parsed < 0 {
respondError(w, http.StatusBadRequest, "Invalid tailLines", "tailLines must be a positive integer")
return
}
tailLines = parsed
} else if raw := strings.TrimSpace(r.URL.Query().Get("tail_lines")); raw != "" {
parsed, err := strconv.ParseInt(raw, 10, 64)
if err != nil || parsed < 0 {
respondError(w, http.StatusBadRequest, "Invalid tail_lines", "tail_lines must be a positive integer")
return
}
tailLines = parsed
}
diagnostics, err := h.instanceService.GetInstanceDiagnostics(r.Context(), clusterID, instanceID, tailLines)
if err != nil {
status := http.StatusInternalServerError
switch err {
case entity.ErrInstanceNotFound, entity.ErrClusterNotFound:
status = http.StatusNotFound
case entity.ErrForbidden:
status = http.StatusForbidden
}
respondError(w, status, "Failed to collect instance diagnostics", err.Error())
return
}
respondJSON(w, http.StatusOK, convertInstanceDiagnostics(diagnostics))
}
func (h *InstanceHandler) StreamInstanceLogs(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
instanceID := vars["instance_id"]
podName := strings.TrimSpace(r.URL.Query().Get("pod"))
containerName := strings.TrimSpace(r.URL.Query().Get("container"))
if podName == "" || containerName == "" {
respondError(w, http.StatusBadRequest, "Missing required query parameter", "both 'pod' and 'container' are required")
return
}
tailLines := int64(200)
if raw := strings.TrimSpace(r.URL.Query().Get("tailLines")); raw != "" {
parsed, err := strconv.ParseInt(raw, 10, 64)
if err != nil || parsed < 0 {
respondError(w, http.StatusBadRequest, "Invalid tailLines", "tailLines must be a positive integer")
return
}
tailLines = parsed
}
lines, errs, err := h.instanceService.StreamInstanceLogs(r.Context(), clusterID, instanceID, podName, containerName, tailLines)
if err != nil {
status := http.StatusInternalServerError
switch err {
case entity.ErrInstanceNotFound, entity.ErrClusterNotFound:
status = http.StatusNotFound
}
respondError(w, status, "Failed to stream instance logs", err.Error())
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
respondError(w, http.StatusInternalServerError, "Streaming not supported", "server does not support response flushing")
return
}
for {
select {
case <-r.Context().Done():
return
case line, open := <-lines:
if !open {
fmt.Fprintf(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
fmt.Fprintf(w, "data: %s\n\n", line)
flusher.Flush()
case err, open := <-errs:
if open && err != nil {
fmt.Fprintf(w, "data: [ERROR] %s\n\n", err.Error())
flusher.Flush()
}
}
}
}
// ScaleInstance 扩缩容实例
func (h *InstanceHandler) ScaleInstance(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
instanceID := vars["instance_id"]
var req dto.ScaleInstanceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
respondError(w, http.StatusBadRequest, "Invalid request body", err.Error())
return
}
if req.Replicas < 0 {
respondError(w, http.StatusBadRequest, "Invalid replicas", "replicas must be >= 0")
return
}
result, err := h.instanceService.ScaleInstance(r.Context(), clusterID, instanceID, req.Replicas, req.Workload)
if err != nil {
respondServiceError(w, err, "Failed to scale instance")
return
}
instResp := convertInstanceResponse(result, true)
instResp.Replicas = req.Replicas
respondJSON(w, http.StatusOK, dto.ScaleInstanceResponse{
Instance: instResp,
Replicas: req.Replicas,
Message: fmt.Sprintf("Scaled to %d replicas", req.Replicas),
})
}
// GetInstanceValuesDiff 获取实例 values 差异
func (h *InstanceHandler) GetInstanceValuesDiff(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterID := vars["cluster_id"]
instanceID := vars["instance_id"]
diff, err := h.instanceService.GetInstanceValuesDiff(r.Context(), clusterID, instanceID)
if err != nil {
respondServiceError(w, err, "Failed to get values diff")
return
}
respondJSON(w, http.StatusOK, diff)
}
func convertInstanceEntry(entry *entity.InstanceEntry) *dto.InstanceEntryResponse {
portResponses := make([]dto.InstanceEntryPortResponse, 0, len(entry.Ports))
for _, port := range entry.Ports {
portResponses = append(portResponses, dto.InstanceEntryPortResponse{
Name: port.Name,
Protocol: port.Protocol,
Port: port.Port,
TargetPort: port.TargetPort,
NodePort: port.NodePort,
})
}
hostResponses := make([]dto.InstanceEntryHostResponse, 0, len(entry.Hosts))
for _, host := range entry.Hosts {
pathResponses := make([]dto.InstanceEntryPathResponse, 0, len(host.Paths))
for _, path := range host.Paths {
pathResponses = append(pathResponses, dto.InstanceEntryPathResponse{
Path: path.Path,
ServiceName: path.ServiceName,
ServicePort: path.ServicePort,
})
}
hostResponses = append(hostResponses, dto.InstanceEntryHostResponse{
Host: host.Host,
Paths: pathResponses,
})
}
tlsResponses := make([]dto.InstanceEntryTLSResponse, 0, len(entry.TLS))
for _, tls := range entry.TLS {
tlsResponses = append(tlsResponses, dto.InstanceEntryTLSResponse{
Hosts: tls.Hosts,
SecretName: tls.SecretName,
})
}
return &dto.InstanceEntryResponse{
Kind: entry.Kind,
Name: entry.Name,
Namespace: entry.Namespace,
Type: entry.Type,
ClusterIP: entry.ClusterIP,
ExternalIPs: entry.ExternalIPs,
LoadBalancerIngress: entry.LoadBalancerIngress,
Ports: portResponses,
Hosts: hostResponses,
TLS: tlsResponses,
}
}
func convertInstanceDiagnostics(diagnostics *entity.InstanceDiagnostics) *dto.InstanceDiagnosticsResponse {
if diagnostics == nil {
return &dto.InstanceDiagnosticsResponse{}
}
pods := make([]dto.InstancePodDiagnostics, 0, len(diagnostics.Pods))
for _, pod := range diagnostics.Pods {
containers := make([]dto.InstanceContainerDiagnostics, 0, len(pod.Containers))
for _, container := range pod.Containers {
containers = append(containers, dto.InstanceContainerDiagnostics{
Name: container.Name,
Image: container.Image,
Ready: container.Ready,
RestartCount: container.RestartCount,
State: container.State,
Reason: container.Reason,
Message: container.Message,
})
}
conditions := make([]dto.InstanceConditionDiagnostics, 0, len(pod.Conditions))
for _, condition := range pod.Conditions {
conditions = append(conditions, dto.InstanceConditionDiagnostics{
Type: condition.Type,
Status: condition.Status,
Reason: condition.Reason,
Message: condition.Message,
})
}
pods = append(pods, dto.InstancePodDiagnostics{
Name: pod.Name,
Namespace: pod.Namespace,
Phase: pod.Phase,
NodeName: pod.NodeName,
PodIP: pod.PodIP,
HostIP: pod.HostIP,
RestartCount: pod.RestartCount,
Containers: containers,
Conditions: conditions,
CreationTimestamp: formatTime(pod.CreationTimestamp),
})
}
services := make([]dto.InstanceServiceDiagnostics, 0, len(diagnostics.Services))
for _, svc := range diagnostics.Services {
ports := make([]dto.InstanceEntryPortResponse, 0, len(svc.Ports))
for _, port := range svc.Ports {
ports = append(ports, dto.InstanceEntryPortResponse{
Name: port.Name,
Protocol: port.Protocol,
Port: port.Port,
TargetPort: port.TargetPort,
NodePort: port.NodePort,
})
}
services = append(services, dto.InstanceServiceDiagnostics{
Name: svc.Name,
Namespace: svc.Namespace,
Type: svc.Type,
ClusterIP: svc.ClusterIP,
Ports: ports,
})
}
events := make([]dto.InstanceEventDiagnostics, 0, len(diagnostics.Events))
for _, event := range diagnostics.Events {
events = append(events, dto.InstanceEventDiagnostics{
Type: event.Type,
Reason: event.Reason,
Message: event.Message,
InvolvedKind: event.InvolvedKind,
InvolvedName: event.InvolvedName,
Count: event.Count,
FirstTimestamp: formatTime(event.FirstTimestamp),
LastTimestamp: formatTime(event.LastTimestamp),
})
}
logs := make([]dto.InstancePodLogResponse, 0, len(diagnostics.Logs))
for _, logEntry := range diagnostics.Logs {
logs = append(logs, dto.InstancePodLogResponse{
Pod: logEntry.Pod,
Container: logEntry.Container,
TailLines: logEntry.TailLines,
Log: logEntry.Log,
Error: logEntry.Error,
})
}
return &dto.InstanceDiagnosticsResponse{
InstanceName: diagnostics.InstanceName,
Namespace: diagnostics.Namespace,
Pods: pods,
Services: services,
Events: events,
Logs: logs,
CollectedAt: formatTime(diagnostics.CollectedAt),
}
}
func formatTime(value time.Time) string {
if value.IsZero() {
return ""
}
return value.Format(time.RFC3339)
}
func convertInstanceResponse(instance *entity.Instance, includeValues bool) *dto.InstanceResponse {
response := &dto.InstanceResponse{
ID: instance.ID,
ClusterID: instance.ClusterID,
Name: instance.Name,
Namespace: instance.Namespace,
RegistryID: instance.RegistryID,
Repository: instance.Repository,
Chart: instance.Chart,
Version: instance.Version,
Description: instance.Description,
Status: string(instance.Status),
WorkspaceID: instance.WorkspaceID,
OwnerID: instance.OwnerID,
StatusReason: instance.StatusReason,
LastOperation: string(instance.LastOperation),
LastError: instance.LastError,
Revision: instance.Revision,
Replicas: instance.Replicas,
AllowedActions: []string{"view", "update", "delete"},
CreatedAt: instance.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
UpdatedAt: instance.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"),
}
if includeValues {
response.Values = instance.Values
}
return response
}
func parseValuesYAML(valuesYAML string) (map[string]interface{}, error) {
valuesYAML = strings.TrimSpace(valuesYAML)
if valuesYAML == "" {
return map[string]interface{}{}, nil
}
var decoded interface{}
if err := yaml.Unmarshal([]byte(valuesYAML), &decoded); err != nil {
return nil, err
}
normalized, err := normalizeYAMLValue(decoded)
if err != nil {
return nil, err
}
values, ok := normalized.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("values YAML must be a mapping at the top level")
}
return values, nil
}
func normalizeYAMLValue(value interface{}) (interface{}, error) {
switch typed := value.(type) {
case map[string]interface{}:
normalized := make(map[string]interface{}, len(typed))
for key, child := range typed {
normalizedChild, err := normalizeYAMLValue(child)
if err != nil {
return nil, err
}
normalized[key] = normalizedChild
}
return normalized, nil
case map[interface{}]interface{}:
normalized := make(map[string]interface{}, len(typed))
for key, child := range typed {
keyString, ok := key.(string)
if !ok {
return nil, fmt.Errorf("values YAML contains non-string key %v", key)
}
normalizedChild, err := normalizeYAMLValue(child)
if err != nil {
return nil, err
}
normalized[keyString] = normalizedChild
}
return normalized, nil
case []interface{}:
normalized := make([]interface{}, 0, len(typed))
for _, child := range typed {
normalizedChild, err := normalizeYAMLValue(child)
if err != nil {
return nil, err
}
normalized = append(normalized, normalizedChild)
}
return normalized, nil
default:
return typed, nil
}
}