feat: fix YAML field conversion, admin namespace, streaming logs, and vllm-serve deploy
- Fix Axios keysToSnake converting user values map keys (gpuMem -> gpu_mem) - Add skipRecurseKeys to keysToSnake for values/valuesYaml fields - Add values_yaml alt json tag and Normalize() in DTOs - Check both camelCase/snake_case in enforceNamespaceValues - Read both tailLines/tail_lines query param for diagnostics - Admin users can freely choose namespace in LaunchModal (free-text input) - Block only kube-system/kube-public/kube-node-lease for admin - Regular users keep existing namespace restrictions - Add SSE streaming pod logs endpoint (backend + frontend) - New PodLogStreamer interface and K8s Follow:true implementation - SSE handler with text/event-stream output - Frontend DiagnosticsModal: Stream button, auto-scroll, live indicator - Remove per-card Refresh button from InstanceCard (redundant with page refresh) - Deploy bge-m3 on vllm-serve 0.6.0 (gpuMem=10000, status=deployed)
This commit is contained in:
@ -2,23 +2,25 @@ package dto
|
||||
|
||||
// CreateInstanceRequest 创建实例请求
|
||||
type CreateInstanceRequest struct {
|
||||
Name string `json:"name" binding:"required"`
|
||||
Namespace string `json:"namespace" binding:"required"`
|
||||
RegistryID string `json:"registryId" binding:"required"`
|
||||
RegistryIDAlt string `json:"registry_id"`
|
||||
Repository string `json:"repository" binding:"required"`
|
||||
Tag string `json:"tag" binding:"required"`
|
||||
Description string `json:"description"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
ValuesYAML string `json:"valuesYaml"`
|
||||
Name string `json:"name" binding:"required"`
|
||||
Namespace string `json:"namespace" binding:"required"`
|
||||
RegistryID string `json:"registryId" binding:"required"`
|
||||
RegistryIDAlt string `json:"registry_id"`
|
||||
Repository string `json:"repository" binding:"required"`
|
||||
Tag string `json:"tag" binding:"required"`
|
||||
Description string `json:"description"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
ValuesYAML string `json:"valuesYaml"`
|
||||
ValuesYAMLAlt string `json:"values_yaml"`
|
||||
}
|
||||
|
||||
// UpdateInstanceRequest 更新实例请求
|
||||
type UpdateInstanceRequest struct {
|
||||
Version string `json:"version"`
|
||||
Description string `json:"description"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
ValuesYAML string `json:"valuesYaml"`
|
||||
Version string `json:"version"`
|
||||
Description string `json:"description"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
ValuesYAML string `json:"valuesYaml"`
|
||||
ValuesYAMLAlt string `json:"values_yaml"`
|
||||
}
|
||||
|
||||
// Normalize 将多种命名风格的字段合并到统一字段
|
||||
@ -26,6 +28,16 @@ func (r *CreateInstanceRequest) Normalize() {
|
||||
if r.RegistryID == "" {
|
||||
r.RegistryID = r.RegistryIDAlt
|
||||
}
|
||||
if r.ValuesYAML == "" {
|
||||
r.ValuesYAML = r.ValuesYAMLAlt
|
||||
}
|
||||
}
|
||||
|
||||
// Normalize 将多种命名风格的字段合并到统一字段
|
||||
func (r *UpdateInstanceRequest) Normalize() {
|
||||
if r.ValuesYAML == "" {
|
||||
r.ValuesYAML = r.ValuesYAMLAlt
|
||||
}
|
||||
}
|
||||
|
||||
// RollbackInstanceRequest 回滚实例请求
|
||||
|
||||
@ -173,6 +173,7 @@ func (h *InstanceHandler) UpdateInstance(w http.ResponseWriter, r *http.Request)
|
||||
respondError(w, http.StatusBadRequest, "Invalid request body", err.Error())
|
||||
return
|
||||
}
|
||||
req.Normalize()
|
||||
|
||||
// 获取现有实例
|
||||
instance, err := h.instanceService.GetInstance(r.Context(), instanceID)
|
||||
@ -281,6 +282,13 @@ func (h *InstanceHandler) GetInstanceDiagnostics(w http.ResponseWriter, r *http.
|
||||
return
|
||||
}
|
||||
tailLines = parsed
|
||||
} else if raw := strings.TrimSpace(r.URL.Query().Get("tail_lines")); raw != "" {
|
||||
parsed, err := strconv.ParseInt(raw, 10, 64)
|
||||
if err != nil || parsed < 0 {
|
||||
respondError(w, http.StatusBadRequest, "Invalid tail_lines", "tail_lines must be a positive integer")
|
||||
return
|
||||
}
|
||||
tailLines = parsed
|
||||
}
|
||||
|
||||
diagnostics, err := h.instanceService.GetInstanceDiagnostics(r.Context(), clusterID, instanceID, tailLines)
|
||||
@ -298,6 +306,71 @@ func (h *InstanceHandler) GetInstanceDiagnostics(w http.ResponseWriter, r *http.
|
||||
respondJSON(w, http.StatusOK, convertInstanceDiagnostics(diagnostics))
|
||||
}
|
||||
|
||||
func (h *InstanceHandler) StreamInstanceLogs(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
clusterID := vars["cluster_id"]
|
||||
instanceID := vars["instance_id"]
|
||||
|
||||
podName := strings.TrimSpace(r.URL.Query().Get("pod"))
|
||||
containerName := strings.TrimSpace(r.URL.Query().Get("container"))
|
||||
if podName == "" || containerName == "" {
|
||||
respondError(w, http.StatusBadRequest, "Missing required query parameter", "both 'pod' and 'container' are required")
|
||||
return
|
||||
}
|
||||
|
||||
tailLines := int64(200)
|
||||
if raw := strings.TrimSpace(r.URL.Query().Get("tailLines")); raw != "" {
|
||||
parsed, err := strconv.ParseInt(raw, 10, 64)
|
||||
if err != nil || parsed < 0 {
|
||||
respondError(w, http.StatusBadRequest, "Invalid tailLines", "tailLines must be a positive integer")
|
||||
return
|
||||
}
|
||||
tailLines = parsed
|
||||
}
|
||||
|
||||
lines, errs, err := h.instanceService.StreamInstanceLogs(r.Context(), clusterID, instanceID, podName, containerName, tailLines)
|
||||
if err != nil {
|
||||
status := http.StatusInternalServerError
|
||||
switch err {
|
||||
case entity.ErrInstanceNotFound, entity.ErrClusterNotFound:
|
||||
status = http.StatusNotFound
|
||||
}
|
||||
respondError(w, status, "Failed to stream instance logs", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
respondError(w, http.StatusInternalServerError, "Streaming not supported", "server does not support response flushing")
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case line, open := <-lines:
|
||||
if !open {
|
||||
fmt.Fprintf(w, "data: [DONE]\n\n")
|
||||
flusher.Flush()
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", line)
|
||||
flusher.Flush()
|
||||
case err, open := <-errs:
|
||||
if open && err != nil {
|
||||
fmt.Fprintf(w, "data: [ERROR] %s\n\n", err.Error())
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func convertInstanceEntry(entry *entity.InstanceEntry) *dto.InstanceEntryResponse {
|
||||
portResponses := make([]dto.InstanceEntryPortResponse, 0, len(entry.Ports))
|
||||
for _, port := range entry.Ports {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package k8s
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -36,6 +37,23 @@ func (*MockDiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entit
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (*MockDiagnosticsClient) StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
|
||||
lines := make(chan string, 10)
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(lines)
|
||||
defer close(errs)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case lines <- "[mock] Streaming pod logs...":
|
||||
case lines <- "[mock] Container started successfully":
|
||||
case lines <- "[mock] Listening on :8080":
|
||||
}
|
||||
}()
|
||||
return lines, errs, nil
|
||||
}
|
||||
|
||||
func (c *DiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance, tailLines int64) (*entity.InstanceDiagnostics, error) {
|
||||
clientset, err := diagnosticsClientset(cluster)
|
||||
if err != nil {
|
||||
@ -73,6 +91,68 @@ func (c *DiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entity.
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *DiagnosticsClient) StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
|
||||
clientset, err := diagnosticsClientset(cluster)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if tailLines <= 0 {
|
||||
tailLines = 200
|
||||
}
|
||||
if tailLines > 2000 {
|
||||
tailLines = 2000
|
||||
}
|
||||
|
||||
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
|
||||
Container: containerName,
|
||||
Follow: true,
|
||||
TailLines: &tailLines,
|
||||
})
|
||||
|
||||
stream, err := req.Stream(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to open log stream for %s/%s: %w", podName, containerName, err)
|
||||
}
|
||||
|
||||
lines := make(chan string, 64)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
defer close(lines)
|
||||
defer close(errs)
|
||||
defer func() { _ = stream.Close() }()
|
||||
|
||||
scanner := bufio.NewScanner(stream)
|
||||
// Allow long lines; Kubernetes log entries can exceed the default 64 KiB
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
line := scanner.Text()
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case lines <- line:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
select {
|
||||
case errs <- err:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return lines, errs, nil
|
||||
}
|
||||
|
||||
func diagnosticsClientset(cluster *entity.Cluster) (kubernetes.Interface, error) {
|
||||
config, err := restConfigFromCluster(cluster)
|
||||
if err != nil {
|
||||
|
||||
@ -9,3 +9,9 @@ import (
|
||||
type InstanceDiagnosticsClient interface {
|
||||
GetDiagnostics(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance, tailLines int64) (*entity.InstanceDiagnostics, error)
|
||||
}
|
||||
|
||||
// PodLogStreamer streams pod log lines over channels. The caller reads from the
|
||||
// lines channel until it is closed; errors are sent to the errs channel.
|
||||
type PodLogStreamer interface {
|
||||
StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error)
|
||||
}
|
||||
|
||||
@ -395,6 +395,28 @@ func (s *InstanceService) GetInstanceDiagnostics(ctx context.Context, clusterID,
|
||||
return s.diagClient.GetDiagnostics(ctx, cluster, instance, tailLines)
|
||||
}
|
||||
|
||||
func (s *InstanceService) StreamInstanceLogs(ctx context.Context, clusterID, instanceID, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
|
||||
instance, err := s.GetInstance(ctx, instanceID)
|
||||
if err != nil {
|
||||
return nil, nil, entity.ErrInstanceNotFound
|
||||
}
|
||||
if instance.ClusterID != clusterID {
|
||||
return nil, nil, entity.ErrInstanceNotFound
|
||||
}
|
||||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||||
if err != nil {
|
||||
return nil, nil, entity.ErrClusterNotFound
|
||||
}
|
||||
if s.diagClient == nil {
|
||||
return nil, nil, fmt.Errorf("instance diagnostics client is not configured")
|
||||
}
|
||||
streamer, ok := s.diagClient.(repository.PodLogStreamer)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("diagnostics client does not support log streaming")
|
||||
}
|
||||
return streamer.StreamPodLogs(ctx, cluster, instance.Namespace, podName, containerName, tailLines)
|
||||
}
|
||||
|
||||
func (s *InstanceService) canReadInstance(principal *authz.Principal, instance *entity.Instance) bool {
|
||||
if principal.IsAdmin() {
|
||||
return true
|
||||
@ -418,9 +440,12 @@ func enforceNamespaceValues(instance *entity.Instance) {
|
||||
}
|
||||
instance.Values["namespace"] = instance.Namespace
|
||||
setExistingStringValue(instance.Values, "namespaceOverride", instance.Namespace)
|
||||
setExistingStringValue(instance.Values, "namespace_override", instance.Namespace)
|
||||
setExistingStringValue(instance.Values, "targetNamespace", instance.Namespace)
|
||||
setExistingStringValue(instance.Values, "target_namespace", instance.Namespace)
|
||||
setExistingNestedStringValue(instance.Values, "global", "namespace", instance.Namespace)
|
||||
setExistingNestedStringValue(instance.Values, "global", "namespaceOverride", instance.Namespace)
|
||||
setExistingNestedStringValue(instance.Values, "global", "namespace_override", instance.Namespace)
|
||||
}
|
||||
|
||||
func setExistingStringValue(values map[string]interface{}, key, namespace string) {
|
||||
|
||||
Reference in New Issue
Block a user