package service import ( "context" "errors" "fmt" "os" "path/filepath" "time" "github.com/google/uuid" "github.com/ocdp/cluster-service/internal/domain/entity" "github.com/ocdp/cluster-service/internal/domain/repository" "github.com/ocdp/cluster-service/internal/pkg/authz" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) // InstanceService Helm 实例管理领域服务 type InstanceService struct { instanceRepo repository.InstanceRepository clusterRepo repository.ClusterRepository registryRepo repository.RegistryRepository bindingRepo repository.WorkspaceClusterBindingRepository helmClient repository.HelmClient ociClient repository.OCIClient entryClient repository.InstanceEntryClient diagClient repository.InstanceDiagnosticsClient workspaceRepo repository.WorkspaceRepository tenantClient repository.TenantKubeClient } // NewInstanceService 创建实例服务 func NewInstanceService( instanceRepo repository.InstanceRepository, clusterRepo repository.ClusterRepository, registryRepo repository.RegistryRepository, helmClient repository.HelmClient, ociClient repository.OCIClient, entryClient repository.InstanceEntryClient, bindingRepo ...repository.WorkspaceClusterBindingRepository, ) *InstanceService { var workspaceBindingRepo repository.WorkspaceClusterBindingRepository if len(bindingRepo) > 0 { workspaceBindingRepo = bindingRepo[0] } return &InstanceService{ instanceRepo: instanceRepo, clusterRepo: clusterRepo, registryRepo: registryRepo, bindingRepo: workspaceBindingRepo, helmClient: helmClient, ociClient: ociClient, entryClient: entryClient, } } func (s *InstanceService) SetDiagnosticsClient(client repository.InstanceDiagnosticsClient) { s.diagClient = client } func (s *InstanceService) SetTenantProvisioning(workspaceRepo repository.WorkspaceRepository, tenantClient repository.TenantKubeClient) { s.workspaceRepo = workspaceRepo s.tenantClient = tenantClient } const chartCacheDir = "/tmp/charts" func (s *InstanceService) chartArchivePath(instance *entity.Instance) string { filename := fmt.Sprintf("%s-%s.tgz", instance.Chart, instance.Version) return filepath.Join(chartCacheDir, filename) } func (s *InstanceService) downloadChart(ctx context.Context, registry *entity.Registry, instance *entity.Instance) error { if err := os.MkdirAll(chartCacheDir, 0755); err != nil { return fmt.Errorf("failed to ensure chart cache dir: %w", err) } chartPath := s.chartArchivePath(instance) if err := s.ociClient.PullArtifact(ctx, registry, instance.Repository, instance.Version, chartPath); err != nil { return fmt.Errorf("failed to download chart artifact: %w", err) } return nil } // CreateInstance 创建(安装)新实例 func (s *InstanceService) CreateInstance(ctx context.Context, instance *entity.Instance) error { principal, err := authz.RequirePrincipal(ctx) if err != nil { return entity.ErrUnauthorized } // 生成 ID instance.ID = uuid.New().String() instance.WorkspaceID = principal.WorkspaceID instance.OwnerID = principal.UserID // 验证 if err := instance.Validate(); err != nil { return err } // 检查集群是否存在 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return entity.ErrClusterNotFound } if !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) { return entity.ErrClusterNotFound } // 检查 Registry 是否存在 registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID) if err != nil { return entity.ErrRegistryNotFound } if !authz.CanReadResource(principal, registry.WorkspaceID, registry.OwnerID, registry.Visibility) { return entity.ErrRegistryNotFound } if err := s.applyNamespacePolicy(ctx, principal, cluster, instance); err != nil { return err } enforceNamespaceValues(instance) if err := s.ensureTenantForInstance(ctx, principal, cluster, instance); err != nil { return err } // 检查实例是否已存在 existingInstance, _ := s.instanceRepo.GetByClusterAndName(ctx, instance.ClusterID, instance.Name) if existingInstance != nil { return entity.ErrInstanceExists } instance.BeginOperation(entity.OperationInstall, "Preparing installation") // 先写入数据库,记录 pending 状态 if err := s.instanceRepo.Create(ctx, instance); err != nil { return err } // 下载 chart artifact 供 Helm 使用 if err := s.downloadChart(ctx, registry, instance); err != nil { instance.MarkFailure("Failed to download chart", err) _ = s.instanceRepo.Update(ctx, instance) return err } // 异步执行 Helm 安装并监控状态 go s.executeAndSyncInstall(context.Background(), instance.ID, cluster, registry, instance) // 立即返回,状态同步由后台任务处理 return nil } // GetInstance 获取实例 func (s *InstanceService) GetInstance(ctx context.Context, id string) (*entity.Instance, error) { principal, err := authz.RequirePrincipal(ctx) if err != nil { return nil, entity.ErrUnauthorized } instance, err := s.instanceRepo.GetByID(ctx, id) if err != nil { return nil, err } if !s.canReadInstance(principal, instance) { return nil, entity.ErrInstanceNotFound } return instance, nil } // GetInstanceStatus 获取实例实时状态 func (s *InstanceService) GetInstanceStatus(ctx context.Context, id string) (*entity.Instance, error) { // 从数据库获取基本信息 instance, err := s.GetInstance(ctx, id) if err != nil { return nil, entity.ErrInstanceNotFound } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return nil, entity.ErrClusterNotFound } // 从 Helm 获取实时状态 liveStatus, err := s.helmClient.GetStatus(ctx, cluster, instance.Name, instance.Namespace) if err != nil { return instance, err // 返回数据库中的信息,但标记错误 } // 合并实时状态 instance.Status = liveStatus.Status instance.Revision = liveStatus.Revision return instance, nil } // UpdateInstance 更新(升级)实例 func (s *InstanceService) UpdateInstance(ctx context.Context, instance *entity.Instance) error { principal, err := authz.RequirePrincipal(ctx) if err != nil { return entity.ErrUnauthorized } // 检查实例是否存在 existingInstance, err := s.instanceRepo.GetByID(ctx, instance.ID) if err != nil { return entity.ErrInstanceNotFound } if !s.canWriteInstance(principal, existingInstance) { return entity.ErrForbidden } instance.WorkspaceID = existingInstance.WorkspaceID instance.OwnerID = existingInstance.OwnerID // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, existingInstance.ClusterID) if err != nil { return entity.ErrClusterNotFound } // 获取 Registry 信息 registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID) if err != nil { return entity.ErrRegistryNotFound } instance.Namespace = existingInstance.Namespace enforceNamespaceValues(instance) instance.BeginOperation(entity.OperationUpgrade, "Pending upgrade") if err := s.instanceRepo.Update(ctx, instance); err != nil { return err } // 下载所需 Chart if err := s.downloadChart(ctx, registry, instance); err != nil { instance.MarkFailure("Failed to download chart", err) _ = s.instanceRepo.Update(ctx, instance) return err } // 异步执行 Helm 升级并监控状态 go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, registry, instance) // 立即返回,状态同步由后台任务处理 return nil } // DeleteInstance 删除(卸载)实例 func (s *InstanceService) DeleteInstance(ctx context.Context, id string) error { principal, err := authz.RequirePrincipal(ctx) if err != nil { return entity.ErrUnauthorized } // 检查实例是否存在 instance, err := s.instanceRepo.GetByID(ctx, id) if err != nil { return entity.ErrInstanceNotFound } if !s.canWriteInstance(principal, instance) { return entity.ErrForbidden } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return entity.ErrClusterNotFound } instance.BeginOperation(entity.OperationDelete, "Pending uninstall") if err := s.instanceRepo.Update(ctx, instance); err != nil { return err } // 异步执行 Helm 卸载并监控状态 go s.executeAndSyncUninstall(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace) // 立即返回,状态同步由后台任务处理 return nil } // RollbackInstance 回滚实例 func (s *InstanceService) RollbackInstance(ctx context.Context, id string, revision int) error { principal, err := authz.RequirePrincipal(ctx) if err != nil { return entity.ErrUnauthorized } // 检查实例是否存在 instance, err := s.instanceRepo.GetByID(ctx, id) if err != nil { return entity.ErrInstanceNotFound } if !s.canWriteInstance(principal, instance) { return entity.ErrForbidden } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return entity.ErrClusterNotFound } instance.BeginOperation(entity.OperationRollback, fmt.Sprintf("Rolling back to revision %d", revision)) if err := s.instanceRepo.Update(ctx, instance); err != nil { return err } // 异步执行 Helm 回滚并监控状态 go s.executeAndSyncRollback(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace, revision) // 立即返回,状态同步由后台任务处理 return nil } // GetInstanceHistory 获取实例历史 func (s *InstanceService) GetInstanceHistory(ctx context.Context, id string) ([]*entity.ReleaseHistory, error) { // 检查实例是否存在 instance, err := s.GetInstance(ctx, id) if err != nil { return nil, entity.ErrInstanceNotFound } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return nil, entity.ErrClusterNotFound } // 从 Helm 获取历史 return s.helmClient.GetHistory(ctx, cluster, instance.Name, instance.Namespace) } // ListInstancesByCluster 列出集群的所有实例 func (s *InstanceService) ListInstancesByCluster(ctx context.Context, clusterID string) ([]*entity.Instance, error) { principal, err := authz.RequirePrincipal(ctx) if err != nil { return nil, entity.ErrUnauthorized } // 检查集群是否存在 cluster, err := s.clusterRepo.GetByID(ctx, clusterID) if err != nil { return nil, entity.ErrClusterNotFound } if !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) { return nil, entity.ErrClusterNotFound } instances, err := s.instanceRepo.ListByCluster(ctx, clusterID) if err != nil { return nil, err } visible := make([]*entity.Instance, 0, len(instances)) for _, instance := range instances { if s.canReadInstance(principal, instance) { visible = append(visible, instance) } } return visible, nil } // ListInstanceEntries 列出实例关联的入口信息(Service / Ingress) func (s *InstanceService) ListInstanceEntries(ctx context.Context, clusterID, instanceID string) ([]*entity.InstanceEntry, error) { instance, err := s.GetInstance(ctx, instanceID) if err != nil { return nil, entity.ErrInstanceNotFound } if instance.ClusterID != clusterID { return nil, entity.ErrInstanceNotFound } cluster, err := s.clusterRepo.GetByID(ctx, clusterID) if err != nil { return nil, entity.ErrClusterNotFound } if s.entryClient == nil { return nil, fmt.Errorf("instance entry client is not configured") } return s.entryClient.ListEntries(ctx, cluster, instance) } func (s *InstanceService) GetInstanceDiagnostics(ctx context.Context, clusterID, instanceID string, tailLines int64) (*entity.InstanceDiagnostics, error) { instance, err := s.GetInstance(ctx, instanceID) if err != nil { return nil, entity.ErrInstanceNotFound } if instance.ClusterID != clusterID { return nil, entity.ErrInstanceNotFound } cluster, err := s.clusterRepo.GetByID(ctx, clusterID) if err != nil { return nil, entity.ErrClusterNotFound } if s.diagClient == nil { return nil, fmt.Errorf("instance diagnostics client is not configured") } return s.diagClient.GetDiagnostics(ctx, cluster, instance, tailLines) } func (s *InstanceService) canReadInstance(principal *authz.Principal, instance *entity.Instance) bool { if principal.IsAdmin() { return true } return instance.WorkspaceID == principal.WorkspaceID && instance.OwnerID == principal.UserID } func (s *InstanceService) canWriteInstance(principal *authz.Principal, instance *entity.Instance) bool { if principal.IsAdmin() { return true } return instance.WorkspaceID == principal.WorkspaceID && instance.OwnerID == principal.UserID } func enforceNamespaceValues(instance *entity.Instance) { if instance == nil || instance.Namespace == "" { return } if instance.Values == nil { instance.Values = map[string]interface{}{} } instance.Values["namespace"] = instance.Namespace setExistingStringValue(instance.Values, "namespaceOverride", instance.Namespace) setExistingStringValue(instance.Values, "targetNamespace", instance.Namespace) setExistingNestedStringValue(instance.Values, "global", "namespace", instance.Namespace) setExistingNestedStringValue(instance.Values, "global", "namespaceOverride", instance.Namespace) } func setExistingStringValue(values map[string]interface{}, key, namespace string) { if _, ok := values[key]; ok { values[key] = namespace } } func setExistingNestedStringValue(values map[string]interface{}, parent, key, namespace string) { child, ok := values[parent].(map[string]interface{}) if !ok { return } if _, ok := child[key]; ok { child[key] = namespace } } func (s *InstanceService) applyNamespacePolicy(ctx context.Context, principal *authz.Principal, cluster *entity.Cluster, instance *entity.Instance) error { if principal.IsAdmin() { if isProtectedSystemNamespace(instance.Namespace) { return entity.ErrInvalidNamespace } return nil } if isReservedNamespace(instance.Namespace) { return entity.ErrInvalidNamespace } if cluster.Visibility != authz.VisibilityPrivate || cluster.OwnerID != principal.UserID { namespace := principal.Namespace if namespace == "" { namespace = entity.NamespaceForWorkspace(principal.WorkspaceName) } if s.bindingRepo != nil { if binding, err := s.bindingRepo.Get(ctx, principal.WorkspaceID, cluster.ID); err == nil && binding != nil && binding.Namespace != "" { namespace = binding.Namespace } } instance.Namespace = namespace return nil } if instance.Namespace == "" { if cluster.DefaultNamespace != "" { instance.Namespace = cluster.DefaultNamespace } else if principal.Namespace != "" { instance.Namespace = principal.Namespace } else { instance.Namespace = entity.NamespaceForWorkspace(principal.Username) } } return nil } func (s *InstanceService) ensureTenantForInstance(ctx context.Context, principal *authz.Principal, cluster *entity.Cluster, instance *entity.Instance) error { if principal.IsAdmin() || s.workspaceRepo == nil || s.tenantClient == nil { return nil } workspace, err := s.workspaceRepo.GetByID(ctx, principal.WorkspaceID) if err != nil { return err } if workspace.Status == entity.WorkspaceSuspended { return entity.ErrWorkspaceSuspended } binding := entity.NewTenantBinding(instance.Namespace) binding.ServiceAccountName = workspace.K8sSAName binding.ResourceQuotaHard = instanceResourceQuotaHard(workspace) if err := s.tenantClient.EnsureTenant(ctx, cluster, binding); err != nil { return err } if s.bindingRepo != nil { _ = s.bindingRepo.Upsert(ctx, &entity.WorkspaceClusterBinding{ ID: uuid.New().String(), WorkspaceID: workspace.ID, ClusterID: cluster.ID, Namespace: instance.Namespace, ServiceAccount: workspace.K8sSAName, QuotaCPU: workspace.QuotaCPU, QuotaMemory: workspace.QuotaMemory, QuotaGPU: workspace.QuotaGPU, QuotaGPUMem: workspace.QuotaGPUMem, Status: "active", CreatedAt: time.Now(), UpdatedAt: time.Now(), }) } return nil } func instanceResourceQuotaHard(workspace *entity.Workspace) corev1.ResourceList { hard := corev1.ResourceList{} addQuantity := func(name corev1.ResourceName, value string) { value = normalizeStandardQuotaQuantity(value) if value == "" { return } if quantity, err := resource.ParseQuantity(value); err == nil { hard[name] = quantity } } addGPUMemoryQuantity := func(value string) { value, err := normalizeGPUMemoryQuota(value) if err != nil || value == "" { return } if quantity, err := resource.ParseQuantity(value); err == nil { hard[corev1.ResourceName("requests.nvidia.com/gpumem")] = quantity } } if workspace == nil { return hard } addQuantity(corev1.ResourceName("requests.cpu"), workspace.QuotaCPU) addQuantity(corev1.ResourceName("requests.memory"), workspace.QuotaMemory) addQuantity(corev1.ResourceName("requests.nvidia.com/gpu"), workspace.QuotaGPU) addGPUMemoryQuantity(workspace.QuotaGPUMem) return hard } func isReservedNamespace(namespace string) bool { switch namespace { case "default", "kube-system", "kube-public", "kube-node-lease": return true default: return false } } func isProtectedSystemNamespace(namespace string) bool { switch namespace { case "kube-system", "kube-public", "kube-node-lease": return true default: return false } } // executeAndSyncInstall 异步执行安装并监控状态 func (s *InstanceService) executeAndSyncInstall(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) { // 执行 Helm 安装 if err := s.helmClient.Install(ctx, cluster, instance); err != nil { // 更新实例状态为失败 instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID) if updateErr == nil && instance != nil { instance.MarkFailure("Helm install failed", err) _ = s.instanceRepo.Update(ctx, instance) } return } // 安装成功后,同步状态 s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationInstall) } // executeAndSyncUpgrade 异步执行升级并监控状态 func (s *InstanceService) executeAndSyncUpgrade(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) { // 执行 Helm 升级 if err := s.helmClient.Upgrade(ctx, cluster, instance); err != nil { // 更新实例状态为失败 instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID) if updateErr == nil && instance != nil { instance.MarkFailure("Helm upgrade failed", err) _ = s.instanceRepo.Update(ctx, instance) } return } // 升级成功后,同步状态 s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationUpgrade) } // executeAndSyncRollback 异步执行回滚并监控状态 func (s *InstanceService) executeAndSyncRollback(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, revision int) { // 执行 Helm 回滚 if err := s.helmClient.Rollback(ctx, cluster, releaseName, namespace, revision); err != nil { // 更新实例状态为失败 instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID) if updateErr == nil && instance != nil { instance.MarkFailure("Helm rollback failed", err) _ = s.instanceRepo.Update(ctx, instance) } return } // 回滚成功后,同步状态 s.syncInstanceStatus(ctx, instanceID, cluster, releaseName, namespace, entity.OperationRollback) } // executeAndSyncUninstall 异步执行卸载并监控状态 func (s *InstanceService) executeAndSyncUninstall(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string) { // 执行 Helm 卸载 err := s.helmClient.Uninstall(ctx, cluster, releaseName, namespace) // 获取实例 instance, getErr := s.instanceRepo.GetByID(ctx, instanceID) if getErr != nil { return } if err != nil { // 如果错误不是"未找到",则标记为失败 if !errors.Is(err, entity.ErrInstanceNotFound) { instance.MarkFailure("Helm uninstall failed", err) _ = s.instanceRepo.Update(ctx, instance) } else { // 如果未找到,说明已经卸载,直接删除数据库记录 _ = s.instanceRepo.Delete(ctx, instanceID) } return } // 卸载成功,标记为已卸载 instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Instance uninstalled successfully") _ = s.instanceRepo.Update(ctx, instance) // 验证卸载是否完成:尝试获取状态,如果获取不到说明已卸载 time.Sleep(3 * time.Second) _, statusErr := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace) if statusErr != nil { // 无法获取状态,说明已卸载,删除数据库记录 _ = s.instanceRepo.Delete(ctx, instanceID) } else { // 仍然可以获取状态,可能还在卸载中,继续等待 // 设置状态为 uninstalled,但不删除记录,让用户手动删除或等待自动清理 instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Uninstall in progress") _ = s.instanceRepo.Update(ctx, instance) } } // syncInstanceStatus 同步实例状态(定期检查 Helm 状态并更新数据库) func (s *InstanceService) syncInstanceStatus(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, operation entity.InstanceOperation) { maxAttempts := 30 // 最多尝试30次(约5分钟) interval := 10 * time.Second // 每10秒检查一次 for i := 0; i < maxAttempts; i++ { time.Sleep(interval) // 获取数据库中的实例 instance, err := s.instanceRepo.GetByID(ctx, instanceID) if err != nil { // 实例不存在,停止同步 return } // 从 Helm 获取实时状态 liveStatus, err := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace) if err != nil { // 如果获取状态失败,可能是还在部署中,继续等待 if i < maxAttempts-1 { continue } // 最后一次尝试失败,标记为失败 instance.MarkFailure("Failed to get status from Helm", err) _ = s.instanceRepo.Update(ctx, instance) return } // 根据操作类型和 Helm 状态更新实例状态 shouldUpdate := false switch operation { case entity.OperationInstall: // 安装操作:如果 Helm 状态是 deployed,则更新为 deployed if liveStatus.Status == entity.StatusDeployed { instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance deployed successfully") shouldUpdate = true } else if liveStatus.Status == entity.StatusFailed { instance.MarkFailure("Installation failed", fmt.Errorf("Helm status: %s", liveStatus.Status)) shouldUpdate = true } case entity.OperationUpgrade: // 升级操作:如果 Helm 状态是 deployed,则更新为 deployed if liveStatus.Status == entity.StatusDeployed { instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance upgraded successfully") shouldUpdate = true } else if liveStatus.Status == entity.StatusFailed { instance.MarkFailure("Upgrade failed", fmt.Errorf("Helm status: %s", liveStatus.Status)) shouldUpdate = true } case entity.OperationRollback: // 回滚操作:如果 Helm 状态是 deployed,则更新为 deployed if liveStatus.Status == entity.StatusDeployed { instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance rolled back successfully") shouldUpdate = true } else if liveStatus.Status == entity.StatusFailed { instance.MarkFailure("Rollback failed", fmt.Errorf("Helm status: %s", liveStatus.Status)) shouldUpdate = true } } // 如果状态已更新为最终状态,停止同步 if shouldUpdate { _ = s.instanceRepo.Update(ctx, instance) return } // 如果状态已经是最终状态(deployed 或 failed),停止同步 if instance.Status == entity.StatusDeployed || instance.Status == entity.StatusFailed { return } } // 超时,标记为失败 instance, err := s.instanceRepo.GetByID(ctx, instanceID) if err == nil && instance != nil { instance.MarkFailure("Operation timeout", fmt.Errorf("Status sync timeout after %d attempts", maxAttempts)) _ = s.instanceRepo.Update(ctx, instance) } }