package service import ( "context" "errors" "fmt" "os" "path/filepath" "strings" "time" "github.com/google/uuid" "github.com/ocdp/cluster-service/internal/domain/entity" "github.com/ocdp/cluster-service/internal/domain/repository" ) // InstanceService Helm 实例管理领域服务 type InstanceService struct { instanceRepo repository.InstanceRepository clusterRepo repository.ClusterRepository registryRepo repository.RegistryRepository helmClient repository.HelmClient ociClient repository.OCIClient entryClient repository.InstanceEntryClient } // NewInstanceService 创建实例服务 func NewInstanceService( instanceRepo repository.InstanceRepository, clusterRepo repository.ClusterRepository, registryRepo repository.RegistryRepository, helmClient repository.HelmClient, ociClient repository.OCIClient, entryClient repository.InstanceEntryClient, ) *InstanceService { return &InstanceService{ instanceRepo: instanceRepo, clusterRepo: clusterRepo, registryRepo: registryRepo, helmClient: helmClient, ociClient: ociClient, entryClient: entryClient, } } const chartCacheDir = "/tmp/charts" func (s *InstanceService) chartArchivePath(instance *entity.Instance) string { filename := fmt.Sprintf("%s-%s.tgz", instance.Chart, instance.Version) return filepath.Join(chartCacheDir, filename) } func (s *InstanceService) downloadChart(ctx context.Context, registry *entity.Registry, instance *entity.Instance) error { if err := os.MkdirAll(chartCacheDir, 0755); err != nil { return fmt.Errorf("failed to ensure chart cache dir: %w", err) } chartPath := s.chartArchivePath(instance) if err := s.ociClient.PullArtifact(ctx, registry, instance.Repository, instance.Version, chartPath); err != nil { return fmt.Errorf("failed to download chart artifact: %w", err) } return nil } // CreateInstance 创建(安装)新实例 func (s *InstanceService) CreateInstance(ctx context.Context, instance *entity.Instance) error { // 生成 ID instance.ID = uuid.New().String() // 验证 if err := instance.Validate(); err != nil { return err } // 检查集群是否存在 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return entity.ErrClusterNotFound } // 检查 Registry 是否存在 registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID) if err != nil { return entity.ErrRegistryNotFound } // 检查实例是否已存在 existingInstance, _ := s.instanceRepo.GetByClusterAndName(ctx, instance.ClusterID, instance.Name) if existingInstance != nil { return entity.ErrInstanceExists } instance.BeginOperation(entity.OperationInstall, "Preparing installation") // 先写入数据库,记录 pending 状态 if err := s.instanceRepo.Create(ctx, instance); err != nil { return err } // 下载 chart artifact 供 Helm 使用 if err := s.downloadChart(ctx, registry, instance); err != nil { instance.MarkFailure("Failed to download chart", err) _ = s.instanceRepo.Update(ctx, instance) return err } // 异步执行 Helm 安装并监控状态 go s.executeAndSyncInstall(context.Background(), instance.ID, cluster, registry, instance) // 立即返回,状态同步由后台任务处理 return nil } // GetInstance 获取实例 func (s *InstanceService) GetInstance(ctx context.Context, id string) (*entity.Instance, error) { return s.instanceRepo.GetByID(ctx, id) } // GetInstanceStatus 获取实例实时状态 func (s *InstanceService) GetInstanceStatus(ctx context.Context, id string) (*entity.Instance, error) { // 从数据库获取基本信息 instance, err := s.instanceRepo.GetByID(ctx, id) if err != nil { return nil, entity.ErrInstanceNotFound } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return nil, entity.ErrClusterNotFound } // 从 Helm 获取实时状态 liveStatus, err := s.helmClient.GetStatus(ctx, cluster, instance.Name, instance.Namespace) if err != nil { return instance, err // 返回数据库中的信息,但标记错误 } // 合并实时状态 instance.Status = liveStatus.Status instance.Revision = liveStatus.Revision return instance, nil } // UpdateInstance 更新(升级)实例 func (s *InstanceService) UpdateInstance(ctx context.Context, instance *entity.Instance) error { // 检查实例是否存在 existingInstance, err := s.instanceRepo.GetByID(ctx, instance.ID) if err != nil { return entity.ErrInstanceNotFound } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, existingInstance.ClusterID) if err != nil { return entity.ErrClusterNotFound } // 获取 Registry 信息 registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID) if err != nil { return entity.ErrRegistryNotFound } instance.BeginOperation(entity.OperationUpgrade, "Pending upgrade") if err := s.instanceRepo.Update(ctx, instance); err != nil { return err } // 下载所需 Chart if err := s.downloadChart(ctx, registry, instance); err != nil { instance.MarkFailure("Failed to download chart", err) _ = s.instanceRepo.Update(ctx, instance) return err } // 异步执行 Helm 升级并监控状态 go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, registry, instance) // 立即返回,状态同步由后台任务处理 return nil } // DeleteInstance 删除(卸载)实例 func (s *InstanceService) DeleteInstance(ctx context.Context, id string) error { // 检查实例是否存在 instance, err := s.instanceRepo.GetByID(ctx, id) if err != nil { return entity.ErrInstanceNotFound } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return entity.ErrClusterNotFound } instance.BeginOperation(entity.OperationDelete, "Pending uninstall") if err := s.instanceRepo.Update(ctx, instance); err != nil { return err } // 异步执行 Helm 卸载并监控状态 go s.executeAndSyncUninstall(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace) // 立即返回,状态同步由后台任务处理 return nil } // RollbackInstance 回滚实例 func (s *InstanceService) RollbackInstance(ctx context.Context, id string, revision int) error { // 检查实例是否存在 instance, err := s.instanceRepo.GetByID(ctx, id) if err != nil { return entity.ErrInstanceNotFound } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return entity.ErrClusterNotFound } instance.BeginOperation(entity.OperationRollback, fmt.Sprintf("Rolling back to revision %d", revision)) if err := s.instanceRepo.Update(ctx, instance); err != nil { return err } // 异步执行 Helm 回滚并监控状态 go s.executeAndSyncRollback(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace, revision) // 立即返回,状态同步由后台任务处理 return nil } // GetInstanceHistory 获取实例历史 func (s *InstanceService) GetInstanceHistory(ctx context.Context, id string) ([]*entity.ReleaseHistory, error) { // 检查实例是否存在 instance, err := s.instanceRepo.GetByID(ctx, id) if err != nil { return nil, entity.ErrInstanceNotFound } // 获取集群信息 cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID) if err != nil { return nil, entity.ErrClusterNotFound } // 从 Helm 获取历史 return s.helmClient.GetHistory(ctx, cluster, instance.Name, instance.Namespace) } // ListInstancesByCluster 列出集群的所有实例 func (s *InstanceService) ListInstancesByCluster(ctx context.Context, clusterID string) ([]*entity.Instance, error) { // 检查集群是否存在 _, err := s.clusterRepo.GetByID(ctx, clusterID) if err != nil { return nil, entity.ErrClusterNotFound } return s.instanceRepo.ListByCluster(ctx, clusterID) } // ListInstanceEntries 列出实例关联的入口信息(Service / Ingress) func (s *InstanceService) ListInstanceEntries(ctx context.Context, clusterID, instanceID string) ([]*entity.InstanceEntry, error) { instance, err := s.instanceRepo.GetByID(ctx, instanceID) if err != nil { return nil, entity.ErrInstanceNotFound } if instance.ClusterID != clusterID { return nil, entity.ErrInstanceNotFound } cluster, err := s.clusterRepo.GetByID(ctx, clusterID) if err != nil { return nil, entity.ErrClusterNotFound } if s.entryClient == nil { return nil, fmt.Errorf("instance entry client is not configured") } return s.entryClient.ListEntries(ctx, cluster, instance) } // executeAndSyncInstall 异步执行安装并监控状态 func (s *InstanceService) executeAndSyncInstall(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) { // 执行 Helm 安装 if err := s.helmClient.Install(ctx, cluster, instance); err != nil { // 更新实例状态为失败 instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID) if updateErr == nil && instance != nil { instance.MarkFailure("Helm install failed", err) _ = s.instanceRepo.Update(ctx, instance) } return } // 安装成功后,同步状态 s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationInstall) } // executeAndSyncUpgrade 异步执行升级并监控状态 func (s *InstanceService) executeAndSyncUpgrade(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) { // 执行 Helm 升级 if err := s.helmClient.Upgrade(ctx, cluster, instance); err != nil { // 更新实例状态为失败 instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID) if updateErr == nil && instance != nil { instance.MarkFailure("Helm upgrade failed", err) _ = s.instanceRepo.Update(ctx, instance) } return } // 升级成功后,同步状态 s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationUpgrade) } // executeAndSyncRollback 异步执行回滚并监控状态 func (s *InstanceService) executeAndSyncRollback(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, revision int) { // 执行 Helm 回滚 if err := s.helmClient.Rollback(ctx, cluster, releaseName, namespace, revision); err != nil { // 更新实例状态为失败 instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID) if updateErr == nil && instance != nil { instance.MarkFailure("Helm rollback failed", err) _ = s.instanceRepo.Update(ctx, instance) } return } // 回滚成功后,同步状态 s.syncInstanceStatus(ctx, instanceID, cluster, releaseName, namespace, entity.OperationRollback) } // executeAndSyncUninstall 异步执行卸载并监控状态 func (s *InstanceService) executeAndSyncUninstall(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string) { // 先验证 release 名称是否有效 // 如果名称无效,说明这个 release 根本不可能存在于 Helm 中,直接删除数据库记录 if err := entity.ValidateReleaseName(releaseName); err != nil { // Release 名称无效,直接删除数据库记录 _ = s.instanceRepo.Delete(ctx, instanceID) return } // 执行 Helm 卸载 err := s.helmClient.Uninstall(ctx, cluster, releaseName, namespace) // 获取实例 instance, getErr := s.instanceRepo.GetByID(ctx, instanceID) if getErr != nil { return } if err != nil { // 检查错误类型 if errors.Is(err, entity.ErrInstanceNotFound) { // 未找到,说明已经卸载,直接删除数据库记录 _ = s.instanceRepo.Delete(ctx, instanceID) } else { // 检查是否是 release 名称无效的错误(可能在某些情况下 Helm 会返回这个错误) errMsg := strings.ToLower(err.Error()) if strings.Contains(errMsg, "release name is invalid") || (strings.Contains(errMsg, "invalid") && strings.Contains(errMsg, "release")) { // Release 名称无效,直接删除数据库记录 _ = s.instanceRepo.Delete(ctx, instanceID) } else { // 其他错误,标记为失败 instance.MarkFailure("Helm uninstall failed", err) _ = s.instanceRepo.Update(ctx, instance) } } return } // 卸载成功,标记为已卸载 instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Instance uninstalled successfully") _ = s.instanceRepo.Update(ctx, instance) // 验证卸载是否完成:尝试获取状态,如果获取不到说明已卸载 time.Sleep(3 * time.Second) _, statusErr := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace) if statusErr != nil { // 无法获取状态,说明已卸载,删除数据库记录 _ = s.instanceRepo.Delete(ctx, instanceID) } else { // 仍然可以获取状态,可能还在卸载中,继续等待 // 设置状态为 uninstalled,但不删除记录,让用户手动删除或等待自动清理 instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Uninstall in progress") _ = s.instanceRepo.Update(ctx, instance) } } // syncInstanceStatus 同步实例状态(定期检查 Helm 状态并更新数据库) func (s *InstanceService) syncInstanceStatus(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, operation entity.InstanceOperation) { maxAttempts := 30 // 最多尝试30次(约5分钟) interval := 10 * time.Second // 每10秒检查一次 for i := 0; i < maxAttempts; i++ { time.Sleep(interval) // 获取数据库中的实例 instance, err := s.instanceRepo.GetByID(ctx, instanceID) if err != nil { // 实例不存在,停止同步 return } // 从 Helm 获取实时状态 liveStatus, err := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace) if err != nil { // 如果获取状态失败,可能是还在部署中,继续等待 if i < maxAttempts-1 { continue } // 最后一次尝试失败,标记为失败 instance.MarkFailure("Failed to get status from Helm", err) _ = s.instanceRepo.Update(ctx, instance) return } // 根据操作类型和 Helm 状态更新实例状态 shouldUpdate := false switch operation { case entity.OperationInstall: // 安装操作:如果 Helm 状态是 deployed,则更新为 deployed if liveStatus.Status == entity.StatusDeployed { instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance deployed successfully") shouldUpdate = true } else if liveStatus.Status == entity.StatusFailed { instance.MarkFailure("Installation failed", fmt.Errorf("Helm status: %s", liveStatus.Status)) shouldUpdate = true } case entity.OperationUpgrade: // 升级操作:如果 Helm 状态是 deployed,则更新为 deployed if liveStatus.Status == entity.StatusDeployed { instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance upgraded successfully") shouldUpdate = true } else if liveStatus.Status == entity.StatusFailed { instance.MarkFailure("Upgrade failed", fmt.Errorf("Helm status: %s", liveStatus.Status)) shouldUpdate = true } case entity.OperationRollback: // 回滚操作:如果 Helm 状态是 deployed,则更新为 deployed if liveStatus.Status == entity.StatusDeployed { instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance rolled back successfully") shouldUpdate = true } else if liveStatus.Status == entity.StatusFailed { instance.MarkFailure("Rollback failed", fmt.Errorf("Helm status: %s", liveStatus.Status)) shouldUpdate = true } } // 如果状态已更新为最终状态,停止同步 if shouldUpdate { _ = s.instanceRepo.Update(ctx, instance) return } // 如果状态已经是最终状态(deployed 或 failed),停止同步 if instance.Status == entity.StatusDeployed || instance.Status == entity.StatusFailed { return } } // 超时,标记为失败 instance, err := s.instanceRepo.GetByID(ctx, instanceID) if err == nil && instance != nil { instance.MarkFailure("Operation timeout", fmt.Errorf("Status sync timeout after %d attempts", maxAttempts)) _ = s.instanceRepo.Update(ctx, instance) } }