Add new frontend pages for the multi-tenant OCDP platform: - Charts page (/charts): Browse Harbor OCI registries to list Helm chart repositories and versions, with deploy modal to launch charts on selected clusters - Monitoring page (/monitoring): Display cluster metrics (CPU/Memory/GPU usage) and per-node details with resource utilization bars - Chart References page (/chart-references): CRUD for chart metadata references - Values Templates page (/templates): CRUD for Helm values templates with version history and rollback support - Sidebar: Add Charts navigation, update Storage and Templates links - api.ts: Add all API client functions (clusterApi, registryApi, instanceApi, monitoringApi, storageApi, chartReferenceApi, valuesTemplateApi, workspaceApi, userApi) with full TypeScript types Note: deploy flow and values template rollback not yet end-to-end tested.
475 lines
16 KiB
Go
475 lines
16 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/ocdp/cluster-service/internal/domain/entity"
|
||
"github.com/ocdp/cluster-service/internal/domain/repository"
|
||
)
|
||
|
||
// InstanceService Helm 实例管理领域服务
|
||
type InstanceService struct {
|
||
instanceRepo repository.InstanceRepository
|
||
clusterRepo repository.ClusterRepository
|
||
registryRepo repository.RegistryRepository
|
||
helmClient repository.HelmClient
|
||
ociClient repository.OCIClient
|
||
entryClient repository.InstanceEntryClient
|
||
}
|
||
|
||
// NewInstanceService 创建实例服务
|
||
func NewInstanceService(
|
||
instanceRepo repository.InstanceRepository,
|
||
clusterRepo repository.ClusterRepository,
|
||
registryRepo repository.RegistryRepository,
|
||
helmClient repository.HelmClient,
|
||
ociClient repository.OCIClient,
|
||
entryClient repository.InstanceEntryClient,
|
||
) *InstanceService {
|
||
return &InstanceService{
|
||
instanceRepo: instanceRepo,
|
||
clusterRepo: clusterRepo,
|
||
registryRepo: registryRepo,
|
||
helmClient: helmClient,
|
||
ociClient: ociClient,
|
||
entryClient: entryClient,
|
||
}
|
||
}
|
||
|
||
const chartCacheDir = "/tmp/charts"
|
||
|
||
func (s *InstanceService) chartArchivePath(instance *entity.Instance) string {
|
||
filename := fmt.Sprintf("%s-%s.tgz", instance.Chart, instance.Version)
|
||
return filepath.Join(chartCacheDir, filename)
|
||
}
|
||
|
||
func (s *InstanceService) downloadChart(ctx context.Context, registry *entity.Registry, instance *entity.Instance) error {
|
||
if err := os.MkdirAll(chartCacheDir, 0755); err != nil {
|
||
return fmt.Errorf("failed to ensure chart cache dir: %w", err)
|
||
}
|
||
chartPath := s.chartArchivePath(instance)
|
||
if err := s.ociClient.PullArtifact(ctx, registry, instance.Repository, instance.Version, chartPath); err != nil {
|
||
return fmt.Errorf("failed to download chart artifact: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// CreateInstance 创建(安装)新实例
|
||
func (s *InstanceService) CreateInstance(ctx context.Context, instance *entity.Instance) error {
|
||
// 生成 ID
|
||
instance.ID = uuid.New().String()
|
||
|
||
// 验证
|
||
if err := instance.Validate(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 检查集群是否存在
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 检查 Registry 是否存在
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
|
||
// 检查实例是否已存在
|
||
existingInstance, _ := s.instanceRepo.GetByClusterAndName(ctx, instance.ClusterID, instance.Name)
|
||
if existingInstance != nil {
|
||
return entity.ErrInstanceExists
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationInstall, "Preparing installation")
|
||
|
||
// 先写入数据库,记录 pending 状态
|
||
if err := s.instanceRepo.Create(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 下载 chart artifact 供 Helm 使用
|
||
if err := s.downloadChart(ctx, registry, instance); err != nil {
|
||
instance.MarkFailure("Failed to download chart", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 安装并监控状态
|
||
go s.executeAndSyncInstall(context.Background(), instance.ID, cluster, registry, instance)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// GetInstance 获取实例
|
||
func (s *InstanceService) GetInstance(ctx context.Context, id string) (*entity.Instance, error) {
|
||
return s.instanceRepo.GetByID(ctx, id)
|
||
}
|
||
|
||
// GetInstanceStatus 获取实例实时状态
|
||
func (s *InstanceService) GetInstanceStatus(ctx context.Context, id string) (*entity.Instance, error) {
|
||
// 从数据库获取基本信息
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 从 Helm 获取实时状态
|
||
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, instance.Name, instance.Namespace)
|
||
if err != nil {
|
||
return instance, err // 返回数据库中的信息,但标记错误
|
||
}
|
||
|
||
// 合并实时状态
|
||
instance.Status = liveStatus.Status
|
||
instance.Revision = liveStatus.Revision
|
||
|
||
return instance, nil
|
||
}
|
||
|
||
// UpdateInstance 更新(升级)实例
|
||
func (s *InstanceService) UpdateInstance(ctx context.Context, instance *entity.Instance) error {
|
||
// 检查实例是否存在
|
||
existingInstance, err := s.instanceRepo.GetByID(ctx, instance.ID)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, existingInstance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 获取 Registry 信息
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationUpgrade, "Pending upgrade")
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 下载所需 Chart
|
||
if err := s.downloadChart(ctx, registry, instance); err != nil {
|
||
instance.MarkFailure("Failed to download chart", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 升级并监控状态
|
||
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, registry, instance)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// DeleteInstance 删除(卸载)实例
|
||
func (s *InstanceService) DeleteInstance(ctx context.Context, id string) error {
|
||
// 检查实例是否存在
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationDelete, "Pending uninstall")
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 卸载并监控状态
|
||
go s.executeAndSyncUninstall(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// RollbackInstance 回滚实例
|
||
func (s *InstanceService) RollbackInstance(ctx context.Context, id string, revision int) error {
|
||
// 检查实例是否存在
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationRollback, fmt.Sprintf("Rolling back to revision %d", revision))
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 回滚并监控状态
|
||
go s.executeAndSyncRollback(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace, revision)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// GetInstanceHistory 获取实例历史
|
||
func (s *InstanceService) GetInstanceHistory(ctx context.Context, id string) ([]*entity.ReleaseHistory, error) {
|
||
// 检查实例是否存在
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 从 Helm 获取历史
|
||
return s.helmClient.GetHistory(ctx, cluster, instance.Name, instance.Namespace)
|
||
}
|
||
|
||
// ListInstancesByCluster 列出集群的所有实例
|
||
func (s *InstanceService) ListInstancesByCluster(ctx context.Context, clusterID string) ([]*entity.Instance, error) {
|
||
// 检查集群是否存在
|
||
_, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
return s.instanceRepo.ListByCluster(ctx, clusterID)
|
||
}
|
||
|
||
// ListInstanceEntries 列出实例关联的入口信息(Service / Ingress)
|
||
func (s *InstanceService) ListInstanceEntries(ctx context.Context, clusterID, instanceID string) ([]*entity.InstanceEntry, error) {
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
if s.entryClient == nil {
|
||
return nil, fmt.Errorf("instance entry client is not configured")
|
||
}
|
||
|
||
return s.entryClient.ListEntries(ctx, cluster, instance)
|
||
}
|
||
|
||
// executeAndSyncInstall 异步执行安装并监控状态
|
||
func (s *InstanceService) executeAndSyncInstall(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
|
||
// 执行 Helm 安装
|
||
if err := s.helmClient.Install(ctx, cluster, instance); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm install failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 安装成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationInstall)
|
||
}
|
||
|
||
// executeAndSyncUpgrade 异步执行升级并监控状态
|
||
func (s *InstanceService) executeAndSyncUpgrade(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
|
||
// 执行 Helm 升级
|
||
if err := s.helmClient.Upgrade(ctx, cluster, instance); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm upgrade failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 升级成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationUpgrade)
|
||
}
|
||
|
||
// executeAndSyncRollback 异步执行回滚并监控状态
|
||
func (s *InstanceService) executeAndSyncRollback(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, revision int) {
|
||
// 执行 Helm 回滚
|
||
if err := s.helmClient.Rollback(ctx, cluster, releaseName, namespace, revision); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm rollback failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 回滚成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, releaseName, namespace, entity.OperationRollback)
|
||
}
|
||
|
||
// executeAndSyncUninstall 异步执行卸载并监控状态
|
||
func (s *InstanceService) executeAndSyncUninstall(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string) {
|
||
// 先验证 release 名称是否有效
|
||
// 如果名称无效,说明这个 release 根本不可能存在于 Helm 中,直接删除数据库记录
|
||
if err := entity.ValidateReleaseName(releaseName); err != nil {
|
||
// Release 名称无效,直接删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
return
|
||
}
|
||
|
||
// 执行 Helm 卸载
|
||
err := s.helmClient.Uninstall(ctx, cluster, releaseName, namespace)
|
||
|
||
// 获取实例
|
||
instance, getErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if getErr != nil {
|
||
return
|
||
}
|
||
|
||
if err != nil {
|
||
// 检查错误类型
|
||
if errors.Is(err, entity.ErrInstanceNotFound) {
|
||
// 未找到,说明已经卸载,直接删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
} else {
|
||
// 检查是否是 release 名称无效的错误(可能在某些情况下 Helm 会返回这个错误)
|
||
errMsg := strings.ToLower(err.Error())
|
||
if strings.Contains(errMsg, "release name is invalid") ||
|
||
(strings.Contains(errMsg, "invalid") && strings.Contains(errMsg, "release")) {
|
||
// Release 名称无效,直接删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
} else {
|
||
// 其他错误,标记为失败
|
||
instance.MarkFailure("Helm uninstall failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
// 卸载成功,标记为已卸载
|
||
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Instance uninstalled successfully")
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
|
||
// 验证卸载是否完成:尝试获取状态,如果获取不到说明已卸载
|
||
time.Sleep(3 * time.Second)
|
||
_, statusErr := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
|
||
if statusErr != nil {
|
||
// 无法获取状态,说明已卸载,删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
} else {
|
||
// 仍然可以获取状态,可能还在卸载中,继续等待
|
||
// 设置状态为 uninstalled,但不删除记录,让用户手动删除或等待自动清理
|
||
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Uninstall in progress")
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
}
|
||
|
||
// syncInstanceStatus 同步实例状态(定期检查 Helm 状态并更新数据库)
|
||
func (s *InstanceService) syncInstanceStatus(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, operation entity.InstanceOperation) {
|
||
maxAttempts := 30 // 最多尝试30次(约5分钟)
|
||
interval := 10 * time.Second // 每10秒检查一次
|
||
|
||
for i := 0; i < maxAttempts; i++ {
|
||
time.Sleep(interval)
|
||
|
||
// 获取数据库中的实例
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
// 实例不存在,停止同步
|
||
return
|
||
}
|
||
|
||
// 从 Helm 获取实时状态
|
||
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
|
||
if err != nil {
|
||
// 如果获取状态失败,可能是还在部署中,继续等待
|
||
if i < maxAttempts-1 {
|
||
continue
|
||
}
|
||
// 最后一次尝试失败,标记为失败
|
||
instance.MarkFailure("Failed to get status from Helm", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return
|
||
}
|
||
|
||
// 根据操作类型和 Helm 状态更新实例状态
|
||
shouldUpdate := false
|
||
switch operation {
|
||
case entity.OperationInstall:
|
||
// 安装操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance deployed successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Installation failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
case entity.OperationUpgrade:
|
||
// 升级操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance upgraded successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Upgrade failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
case entity.OperationRollback:
|
||
// 回滚操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance rolled back successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Rollback failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
}
|
||
|
||
// 如果状态已更新为最终状态,停止同步
|
||
if shouldUpdate {
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return
|
||
}
|
||
|
||
// 如果状态已经是最终状态(deployed 或 failed),停止同步
|
||
if instance.Status == entity.StatusDeployed || instance.Status == entity.StatusFailed {
|
||
return
|
||
}
|
||
}
|
||
|
||
// 超时,标记为失败
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err == nil && instance != nil {
|
||
instance.MarkFailure("Operation timeout", fmt.Errorf("Status sync timeout after %d attempts", maxAttempts))
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
}
|