Files
ocdp-go/backend/internal/domain/service/instance_service.go
Ivan087 47849042a7 feat: complete E2E deployment flow with storage layered config and values template versioning
- Instance deployment: charts browser, deploy modal, instances list
- Values Template version management (create/history/rollback)
- Storage layered config (cluster > workspace > shared priority)
- Cluster credential decryptIfNeeded for mixed encrypted/plaintext kubeconfig
- YAML syntax validation (client-side + server-side warning)
- Frontend: charts, instances, storage, templates, admin pages
- Backend: storage service, instance service, cluster service, helm client
- Multi-Tenant Kubeconfig.md: added by user
2026-04-30 16:31:00 +08:00

554 lines
19 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"errors"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/google/uuid"
"github.com/ocdp/cluster-service/internal/domain/entity"
"github.com/ocdp/cluster-service/internal/domain/repository"
)
// InstanceService Helm 实例管理领域服务
type InstanceService struct {
instanceRepo repository.InstanceRepository
clusterRepo repository.ClusterRepository
registryRepo repository.RegistryRepository
helmClient repository.HelmClient
ociClient repository.OCIClient
entryClient repository.InstanceEntryClient
storageService *StorageService // for layered storage config resolution
}
// NewInstanceService 创建实例服务
func NewInstanceService(
instanceRepo repository.InstanceRepository,
clusterRepo repository.ClusterRepository,
registryRepo repository.RegistryRepository,
helmClient repository.HelmClient,
ociClient repository.OCIClient,
entryClient repository.InstanceEntryClient,
) *InstanceService {
return &InstanceService{
instanceRepo: instanceRepo,
clusterRepo: clusterRepo,
registryRepo: registryRepo,
helmClient: helmClient,
ociClient: ociClient,
entryClient: entryClient,
storageService: nil, // set via SetStorageService for layered storage
}
}
// SetStorageService 设置存储服务(用于分层存储配置解析)
func (s *InstanceService) SetStorageService(storageService *StorageService) {
s.storageService = storageService
}
const chartCacheDir = "/tmp/charts"
func (s *InstanceService) chartArchivePath(instance *entity.Instance) string {
filename := fmt.Sprintf("%s-%s.tgz", instance.Chart, instance.Version)
return filepath.Join(chartCacheDir, filename)
}
func (s *InstanceService) downloadChart(ctx context.Context, registry *entity.Registry, instance *entity.Instance) error {
if err := os.MkdirAll(chartCacheDir, 0755); err != nil {
return fmt.Errorf("failed to ensure chart cache dir: %w", err)
}
chartPath := s.chartArchivePath(instance)
if err := s.ociClient.PullArtifact(ctx, registry, instance.Repository, instance.Version, chartPath); err != nil {
return fmt.Errorf("failed to download chart artifact: %w", err)
}
return nil
}
// CreateInstance 创建(安装)新实例
func (s *InstanceService) CreateInstance(ctx context.Context, instance *entity.Instance) error {
// 生成 ID
instance.ID = uuid.New().String()
// 验证
if err := instance.Validate(); err != nil {
return err
}
// 检查集群是否存在
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
if err != nil {
return entity.ErrClusterNotFound
}
// 检查 Registry 是否存在
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
if err != nil {
return entity.ErrRegistryNotFound
}
// 检查实例是否已存在
existingInstance, _ := s.instanceRepo.GetByClusterAndName(ctx, instance.ClusterID, instance.Name)
if existingInstance != nil {
return entity.ErrInstanceExists
}
// ===== 分层存储配置解析 =====
// Priority: workspace-level default > cluster-level default > shared default
if s.storageService != nil && instance.WorkspaceID != "" {
resolution, err := s.storageService.ResolveStorageConfig(ctx, instance.ClusterID, instance.WorkspaceID)
if err == nil && resolution != nil && resolution.Storage != nil {
// Merge resolved storage values into instance.Values
if instance.Values == nil {
instance.Values = make(map[string]interface{})
}
// User override takes highest priority (already set), so we only set if not already present
mergeStorageToValues(instance.Values, resolution.Storage)
}
}
instance.BeginOperation(entity.OperationInstall, "Preparing installation")
// 先写入数据库,记录 pending 状态
if err := s.instanceRepo.Create(ctx, instance); err != nil {
return err
}
// 下载 chart artifact 供 Helm 使用
if err := s.downloadChart(ctx, registry, instance); err != nil {
instance.MarkFailure("Failed to download chart", err)
_ = s.instanceRepo.Update(ctx, instance)
return err
}
// 异步执行 Helm 安装并监控状态
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[goroutine-panic] instanceID=%s panic=%v", instance.ID, r)
}
}()
log.Printf("[goroutine-start] instanceID=%s name=%s cluster=%s", instance.ID, instance.Name, cluster.Name)
s.executeAndSyncInstall(context.Background(), instance.ID, cluster, registry, instance)
log.Printf("[goroutine-done] instanceID=%s", instance.ID)
}()
// 立即返回,状态同步由后台任务处理
return nil
}
// GetInstance 获取实例
func (s *InstanceService) GetInstance(ctx context.Context, id string) (*entity.Instance, error) {
return s.instanceRepo.GetByID(ctx, id)
}
// GetInstanceStatus 获取实例实时状态
func (s *InstanceService) GetInstanceStatus(ctx context.Context, id string) (*entity.Instance, error) {
// 从数据库获取基本信息
instance, err := s.instanceRepo.GetByID(ctx, id)
if err != nil {
return nil, entity.ErrInstanceNotFound
}
// 获取集群信息
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
if err != nil {
return nil, entity.ErrClusterNotFound
}
// 从 Helm 获取实时状态
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, instance.Name, instance.Namespace)
if err != nil {
return instance, err // 返回数据库中的信息,但标记错误
}
// 合并实时状态
instance.Status = liveStatus.Status
instance.Revision = liveStatus.Revision
return instance, nil
}
// UpdateInstance 更新(升级)实例
func (s *InstanceService) UpdateInstance(ctx context.Context, instance *entity.Instance) error {
// 检查实例是否存在
existingInstance, err := s.instanceRepo.GetByID(ctx, instance.ID)
if err != nil {
return entity.ErrInstanceNotFound
}
// 获取集群信息
cluster, err := s.clusterRepo.GetByID(ctx, existingInstance.ClusterID)
if err != nil {
return entity.ErrClusterNotFound
}
// 获取 Registry 信息
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
if err != nil {
return entity.ErrRegistryNotFound
}
instance.BeginOperation(entity.OperationUpgrade, "Pending upgrade")
if err := s.instanceRepo.Update(ctx, instance); err != nil {
return err
}
// 下载所需 Chart
if err := s.downloadChart(ctx, registry, instance); err != nil {
instance.MarkFailure("Failed to download chart", err)
_ = s.instanceRepo.Update(ctx, instance)
return err
}
// 异步执行 Helm 升级并监控状态
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, registry, instance)
// 立即返回,状态同步由后台任务处理
return nil
}
// DeleteInstance 删除(卸载)实例
func (s *InstanceService) DeleteInstance(ctx context.Context, id string) error {
// 检查实例是否存在
instance, err := s.instanceRepo.GetByID(ctx, id)
if err != nil {
return entity.ErrInstanceNotFound
}
// 获取集群信息
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
if err != nil {
return entity.ErrClusterNotFound
}
instance.BeginOperation(entity.OperationDelete, "Pending uninstall")
if err := s.instanceRepo.Update(ctx, instance); err != nil {
return err
}
// 异步执行 Helm 卸载并监控状态
go s.executeAndSyncUninstall(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace)
// 立即返回,状态同步由后台任务处理
return nil
}
// RollbackInstance 回滚实例
func (s *InstanceService) RollbackInstance(ctx context.Context, id string, revision int) error {
// 检查实例是否存在
instance, err := s.instanceRepo.GetByID(ctx, id)
if err != nil {
return entity.ErrInstanceNotFound
}
// 获取集群信息
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
if err != nil {
return entity.ErrClusterNotFound
}
instance.BeginOperation(entity.OperationRollback, fmt.Sprintf("Rolling back to revision %d", revision))
if err := s.instanceRepo.Update(ctx, instance); err != nil {
return err
}
// 异步执行 Helm 回滚并监控状态
go s.executeAndSyncRollback(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace, revision)
// 立即返回,状态同步由后台任务处理
return nil
}
// GetInstanceHistory 获取实例历史
func (s *InstanceService) GetInstanceHistory(ctx context.Context, id string) ([]*entity.ReleaseHistory, error) {
// 检查实例是否存在
instance, err := s.instanceRepo.GetByID(ctx, id)
if err != nil {
return nil, entity.ErrInstanceNotFound
}
// 获取集群信息
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
if err != nil {
return nil, entity.ErrClusterNotFound
}
// 从 Helm 获取历史
return s.helmClient.GetHistory(ctx, cluster, instance.Name, instance.Namespace)
}
// ListInstancesByCluster 列出集群的所有实例
func (s *InstanceService) ListInstancesByCluster(ctx context.Context, clusterID string) ([]*entity.Instance, error) {
// 检查集群是否存在
_, err := s.clusterRepo.GetByID(ctx, clusterID)
if err != nil {
return nil, entity.ErrClusterNotFound
}
return s.instanceRepo.ListByCluster(ctx, clusterID)
}
// ListInstanceEntries 列出实例关联的入口信息Service / Ingress
func (s *InstanceService) ListInstanceEntries(ctx context.Context, clusterID, instanceID string) ([]*entity.InstanceEntry, error) {
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
if err != nil {
return nil, entity.ErrInstanceNotFound
}
if instance.ClusterID != clusterID {
return nil, entity.ErrInstanceNotFound
}
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
if err != nil {
return nil, entity.ErrClusterNotFound
}
if s.entryClient == nil {
return nil, fmt.Errorf("instance entry client is not configured")
}
return s.entryClient.ListEntries(ctx, cluster, instance)
}
// executeAndSyncInstall 异步执行安装并监控状态
func (s *InstanceService) executeAndSyncInstall(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
log.Printf("[install-start] instanceID=%s values=%v", instanceID, instance.Values)
// 执行 Helm 安装
if err := s.helmClient.Install(ctx, cluster, instance); err != nil {
log.Printf("[install-fail] instanceID=%s err=%v", instanceID, err)
// 更新实例状态为失败
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
if updateErr == nil && instance != nil {
instance.MarkFailure("Helm install failed", err)
_ = s.instanceRepo.Update(ctx, instance)
}
return
}
log.Printf("[install-ok] instanceID=%s revision=%d", instanceID, instance.Revision)
// 安装成功后,同步状态
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationInstall)
}
// executeAndSyncUpgrade 异步执行升级并监控状态
func (s *InstanceService) executeAndSyncUpgrade(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
// 执行 Helm 升级
if err := s.helmClient.Upgrade(ctx, cluster, instance); err != nil {
// 更新实例状态为失败
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
if updateErr == nil && instance != nil {
instance.MarkFailure("Helm upgrade failed", err)
_ = s.instanceRepo.Update(ctx, instance)
}
return
}
// 升级成功后,同步状态
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationUpgrade)
}
// executeAndSyncRollback 异步执行回滚并监控状态
func (s *InstanceService) executeAndSyncRollback(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, revision int) {
// 执行 Helm 回滚
if err := s.helmClient.Rollback(ctx, cluster, releaseName, namespace, revision); err != nil {
// 更新实例状态为失败
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
if updateErr == nil && instance != nil {
instance.MarkFailure("Helm rollback failed", err)
_ = s.instanceRepo.Update(ctx, instance)
}
return
}
// 回滚成功后,同步状态
s.syncInstanceStatus(ctx, instanceID, cluster, releaseName, namespace, entity.OperationRollback)
}
// executeAndSyncUninstall 异步执行卸载并监控状态
func (s *InstanceService) executeAndSyncUninstall(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string) {
// 先验证 release 名称是否有效
// 如果名称无效,说明这个 release 根本不可能存在于 Helm 中,直接删除数据库记录
if err := entity.ValidateReleaseName(releaseName); err != nil {
// Release 名称无效,直接删除数据库记录
_ = s.instanceRepo.Delete(ctx, instanceID)
return
}
// 执行 Helm 卸载
err := s.helmClient.Uninstall(ctx, cluster, releaseName, namespace)
// 获取实例
instance, getErr := s.instanceRepo.GetByID(ctx, instanceID)
if getErr != nil {
return
}
if err != nil {
// 检查错误类型
if errors.Is(err, entity.ErrInstanceNotFound) {
// 未找到,说明已经卸载,直接删除数据库记录
_ = s.instanceRepo.Delete(ctx, instanceID)
} else {
// 检查是否是 release 名称无效的错误(可能在某些情况下 Helm 会返回这个错误)
errMsg := strings.ToLower(err.Error())
if strings.Contains(errMsg, "release name is invalid") ||
(strings.Contains(errMsg, "invalid") && strings.Contains(errMsg, "release")) {
// Release 名称无效,直接删除数据库记录
_ = s.instanceRepo.Delete(ctx, instanceID)
} else {
// 其他错误,标记为失败
instance.MarkFailure("Helm uninstall failed", err)
_ = s.instanceRepo.Update(ctx, instance)
}
}
return
}
// 卸载成功,标记为已卸载
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Instance uninstalled successfully")
_ = s.instanceRepo.Update(ctx, instance)
// 验证卸载是否完成:尝试获取状态,如果获取不到说明已卸载
time.Sleep(3 * time.Second)
_, statusErr := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
if statusErr != nil {
// 无法获取状态,说明已卸载,删除数据库记录
_ = s.instanceRepo.Delete(ctx, instanceID)
} else {
// 仍然可以获取状态,可能还在卸载中,继续等待
// 设置状态为 uninstalled但不删除记录让用户手动删除或等待自动清理
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Uninstall in progress")
_ = s.instanceRepo.Update(ctx, instance)
}
}
// syncInstanceStatus 同步实例状态(定期检查 Helm 状态并更新数据库)
func (s *InstanceService) syncInstanceStatus(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, operation entity.InstanceOperation) {
maxAttempts := 30 // 最多尝试30次约5分钟
interval := 10 * time.Second // 每10秒检查一次
for i := 0; i < maxAttempts; i++ {
time.Sleep(interval)
// 获取数据库中的实例
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
if err != nil {
// 实例不存在,停止同步
return
}
// 从 Helm 获取实时状态
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
if err != nil {
// 如果获取状态失败,可能是还在部署中,继续等待
if i < maxAttempts-1 {
continue
}
// 最后一次尝试失败,标记为失败
instance.MarkFailure("Failed to get status from Helm", err)
_ = s.instanceRepo.Update(ctx, instance)
return
}
// 根据操作类型和 Helm 状态更新实例状态
shouldUpdate := false
switch operation {
case entity.OperationInstall:
// 安装操作:如果 Helm 状态是 deployed则更新为 deployed
if liveStatus.Status == entity.StatusDeployed {
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance deployed successfully")
shouldUpdate = true
} else if liveStatus.Status == entity.StatusFailed {
instance.MarkFailure("Installation failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
shouldUpdate = true
}
case entity.OperationUpgrade:
// 升级操作:如果 Helm 状态是 deployed则更新为 deployed
if liveStatus.Status == entity.StatusDeployed {
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance upgraded successfully")
shouldUpdate = true
} else if liveStatus.Status == entity.StatusFailed {
instance.MarkFailure("Upgrade failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
shouldUpdate = true
}
case entity.OperationRollback:
// 回滚操作:如果 Helm 状态是 deployed则更新为 deployed
if liveStatus.Status == entity.StatusDeployed {
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance rolled back successfully")
shouldUpdate = true
} else if liveStatus.Status == entity.StatusFailed {
instance.MarkFailure("Rollback failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
shouldUpdate = true
}
}
// 如果状态已更新为最终状态,停止同步
if shouldUpdate {
_ = s.instanceRepo.Update(ctx, instance)
return
}
// 如果状态已经是最终状态deployed 或 failed停止同步
if instance.Status == entity.StatusDeployed || instance.Status == entity.StatusFailed {
return
}
}
// 超时,标记为失败
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
if err == nil && instance != nil {
instance.MarkFailure("Operation timeout", fmt.Errorf("Status sync timeout after %d attempts", maxAttempts))
_ = s.instanceRepo.Update(ctx, instance)
}
}
// mergeStorageToValues 将存储配置 merge 到 Helm values
// 只覆盖 nil/空的字段,保留用户已设置的 values
func mergeStorageToValues(values map[string]interface{}, storage *entity.StorageBackend) {
if storage == nil || values == nil {
return
}
persistence := make(map[string]interface{})
switch storage.Type {
case entity.StorageTypeNFS:
if storage.Config.NFS != nil {
persistence["type"] = "nfs"
persistence["nfs"] = map[string]interface{}{
"server": storage.Config.NFS.Server,
"path": storage.Config.NFS.Path,
}
// Helm common chart labels
persistence["mountOptions"] = []string{"rw", "relatime", "vers=3"}
persistence["reclaimPolicy"] = "Retain"
}
case entity.StorageTypePV:
if storage.Config.PV != nil {
persistence["type"] = "persistentVolumeClaim"
persistence["storageClass"] = storage.Config.PV.StorageClassName
persistence["size"] = storage.Config.PV.Capacity
persistence["accessMode"] = storage.Config.PV.AccessModes
}
case entity.StorageTypeHostPath:
if storage.Config.HostPath != nil {
persistence["type"] = "hostPath"
persistence["hostPath"] = map[string]interface{}{
"path": storage.Config.HostPath.Path,
}
}
}
// Only merge if key doesn't already exist and has a value
for key, val := range persistence {
if _, exists := values[key]; !exists && val != nil {
values[key] = val
}
}
}