package k8s import ( "bufio" "context" "fmt" "io" "sort" "strings" "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "github.com/ocdp/cluster-service/internal/domain/entity" "github.com/ocdp/cluster-service/internal/domain/repository" ) type DiagnosticsClient struct{} func NewDiagnosticsClient() repository.InstanceDiagnosticsClient { return &DiagnosticsClient{} } type MockDiagnosticsClient struct{} func NewMockDiagnosticsClient() repository.InstanceDiagnosticsClient { return &MockDiagnosticsClient{} } func (*MockDiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance, tailLines int64) (*entity.InstanceDiagnostics, error) { return &entity.InstanceDiagnostics{ InstanceName: instance.Name, Namespace: instance.Namespace, CollectedAt: time.Now(), }, nil } func (*MockDiagnosticsClient) StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) { lines := make(chan string, 10) errs := make(chan error, 1) go func() { defer close(lines) defer close(errs) select { case <-ctx.Done(): return case lines <- "[mock] Streaming pod logs...": case lines <- "[mock] Container started successfully": case lines <- "[mock] Listening on :8080": } }() return lines, errs, nil } func (c *DiagnosticsClient) GetDiagnostics(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance, tailLines int64) (*entity.InstanceDiagnostics, error) { clientset, err := diagnosticsClientset(cluster) if err != nil { return nil, err } if tailLines <= 0 { tailLines = 200 } if tailLines > 2000 { tailLines = 2000 } pods, err := listInstancePods(ctx, clientset, instance) if err != nil { return nil, err } services, err := listInstanceServices(ctx, clientset, instance) if err != nil { return nil, err } events, err := listInstanceEvents(ctx, clientset, instance, pods, services) if err != nil { return nil, err } logs := collectPodLogs(ctx, clientset, pods, tailLines) return &entity.InstanceDiagnostics{ InstanceName: instance.Name, Namespace: instance.Namespace, Pods: convertPodsToDiagnostics(pods), Services: convertServicesToDiagnostics(services), Events: convertEventsToDiagnostics(events), Logs: logs, CollectedAt: time.Now(), }, nil } func (c *DiagnosticsClient) StreamPodLogs(ctx context.Context, cluster *entity.Cluster, namespace, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) { clientset, err := diagnosticsClientset(cluster) if err != nil { return nil, nil, err } if tailLines <= 0 { tailLines = 200 } if tailLines > 2000 { tailLines = 2000 } req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ Container: containerName, Follow: true, TailLines: &tailLines, }) stream, err := req.Stream(ctx) if err != nil { return nil, nil, fmt.Errorf("failed to open log stream for %s/%s: %w", podName, containerName, err) } lines := make(chan string, 64) errs := make(chan error, 1) go func() { defer close(lines) defer close(errs) defer func() { _ = stream.Close() }() scanner := bufio.NewScanner(stream) // Allow long lines; Kubernetes log entries can exceed the default 64 KiB scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024) for scanner.Scan() { select { case <-ctx.Done(): return default: } line := scanner.Text() if line == "" { continue } select { case lines <- line: case <-ctx.Done(): return } } if err := scanner.Err(); err != nil { select { case errs <- err: case <-ctx.Done(): } } }() return lines, errs, nil } func diagnosticsClientset(cluster *entity.Cluster) (kubernetes.Interface, error) { config, err := restConfigFromCluster(cluster) if err != nil { return nil, err } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create diagnostics kubernetes client: %w", err) } return clientset, nil } func listInstancePods(ctx context.Context, clientset kubernetes.Interface, instance *entity.Instance) ([]corev1.Pod, error) { selector := fmt.Sprintf("app.kubernetes.io/instance=%s", instance.Name) pods, err := clientset.CoreV1().Pods(instance.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return nil, fmt.Errorf("failed to list instance pods: %w", err) } if len(pods.Items) > 0 { return pods.Items, nil } all, err := clientset.CoreV1().Pods(instance.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list namespace pods: %w", err) } filtered := make([]corev1.Pod, 0) for _, pod := range all.Items { if resourceMatchesInstance(pod.ObjectMeta, instance) { filtered = append(filtered, pod) } } return filtered, nil } func listInstanceServices(ctx context.Context, clientset kubernetes.Interface, instance *entity.Instance) ([]corev1.Service, error) { selector := fmt.Sprintf("app.kubernetes.io/instance=%s", instance.Name) services, err := clientset.CoreV1().Services(instance.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) if err != nil { return nil, fmt.Errorf("failed to list instance services: %w", err) } if len(services.Items) > 0 { return services.Items, nil } all, err := clientset.CoreV1().Services(instance.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list namespace services: %w", err) } filtered := make([]corev1.Service, 0) for _, svc := range all.Items { if resourceMatchesInstance(svc.ObjectMeta, instance) { filtered = append(filtered, svc) } } return filtered, nil } func listInstanceEvents(ctx context.Context, clientset kubernetes.Interface, instance *entity.Instance, pods []corev1.Pod, services []corev1.Service) ([]corev1.Event, error) { events, err := clientset.CoreV1().Events(instance.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, fmt.Errorf("failed to list instance events: %w", err) } names := map[string]bool{instance.Name: true} for _, pod := range pods { names[pod.Name] = true } for _, svc := range services { names[svc.Name] = true } filtered := make([]corev1.Event, 0) for _, event := range events.Items { if names[event.InvolvedObject.Name] || strings.Contains(event.Message, instance.Name) { filtered = append(filtered, event) } } sort.SliceStable(filtered, func(i, j int) bool { return filtered[i].LastTimestamp.Time.After(filtered[j].LastTimestamp.Time) }) if len(filtered) > 100 { filtered = filtered[:100] } return filtered, nil } func collectPodLogs(ctx context.Context, clientset kubernetes.Interface, pods []corev1.Pod, tailLines int64) []entity.InstancePodLog { logs := make([]entity.InstancePodLog, 0) for _, pod := range pods { for _, container := range pod.Spec.Containers { item := entity.InstancePodLog{Pod: pod.Name, Container: container.Name, TailLines: tailLines} req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ Container: container.Name, TailLines: &tailLines, }) stream, err := req.Stream(ctx) if err != nil { item.Error = err.Error() logs = append(logs, item) continue } data, err := io.ReadAll(io.LimitReader(stream, 1<<20)) _ = stream.Close() if err != nil { item.Error = err.Error() } else { item.Log = string(data) } logs = append(logs, item) } } return logs } func convertPodsToDiagnostics(pods []corev1.Pod) []entity.InstancePodDiagnostics { out := make([]entity.InstancePodDiagnostics, 0, len(pods)) for _, pod := range pods { containers := make([]entity.InstanceContainerDiagnostics, 0, len(pod.Status.ContainerStatuses)) var restarts int32 for _, status := range pod.Status.ContainerStatuses { restarts += status.RestartCount containers = append(containers, entity.InstanceContainerDiagnostics{ Name: status.Name, Image: status.Image, Ready: status.Ready, RestartCount: status.RestartCount, State: containerStateName(status.State), Reason: containerStateReason(status.State), Message: containerStateMessage(status.State), }) } conditions := make([]entity.InstanceConditionDiagnostics, 0, len(pod.Status.Conditions)) for _, condition := range pod.Status.Conditions { conditions = append(conditions, entity.InstanceConditionDiagnostics{ Type: string(condition.Type), Status: string(condition.Status), Reason: condition.Reason, Message: condition.Message, }) } out = append(out, entity.InstancePodDiagnostics{ Name: pod.Name, Namespace: pod.Namespace, Phase: string(pod.Status.Phase), NodeName: pod.Spec.NodeName, PodIP: pod.Status.PodIP, HostIP: pod.Status.HostIP, RestartCount: restarts, Containers: containers, Conditions: conditions, CreationTimestamp: pod.CreationTimestamp.Time, }) } return out } func convertServicesToDiagnostics(services []corev1.Service) []entity.InstanceServiceDiagnostics { out := make([]entity.InstanceServiceDiagnostics, 0, len(services)) for _, svc := range services { entry := convertServiceToEntry(&svc) out = append(out, entity.InstanceServiceDiagnostics{ Name: svc.Name, Namespace: svc.Namespace, Type: string(svc.Spec.Type), ClusterIP: svc.Spec.ClusterIP, Ports: entry.Ports, }) } return out } func convertEventsToDiagnostics(events []corev1.Event) []entity.InstanceEventDiagnostics { out := make([]entity.InstanceEventDiagnostics, 0, len(events)) for _, event := range events { out = append(out, entity.InstanceEventDiagnostics{ Type: event.Type, Reason: event.Reason, Message: event.Message, InvolvedKind: event.InvolvedObject.Kind, InvolvedName: event.InvolvedObject.Name, Count: event.Count, FirstTimestamp: event.FirstTimestamp.Time, LastTimestamp: event.LastTimestamp.Time, }) } return out } func containerStateName(state corev1.ContainerState) string { switch { case state.Running != nil: return "running" case state.Waiting != nil: return "waiting" case state.Terminated != nil: return "terminated" default: return "unknown" } } func containerStateReason(state corev1.ContainerState) string { switch { case state.Waiting != nil: return state.Waiting.Reason case state.Terminated != nil: return state.Terminated.Reason default: return "" } } func containerStateMessage(state corev1.ContainerState) string { switch { case state.Waiting != nil: return state.Waiting.Message case state.Terminated != nil: return state.Terminated.Message default: return "" } }