Backend (Phase 1):
- Add ScaleInstance endpoint (POST /clusters/{id}/instances/{id}/scale)
- Add GetInstanceValuesDiff endpoint (GET .../values-diff)
- Enable ReuseValues=true in Helm Upgrade for --reuse-values behavior
- Add GetValues/GetChartDefaultValues to HelmClient interface
- Add ScaleInstanceRequest/Response and InstanceValuesDiffResponse DTOs
Frontend (Phase 2):
- InstanceCard: +/- scale buttons with loading spinner
- ModifyModal: values diff view (current vs defaults), Use Defaults button
- ArtifactBrowserPage: collapsible sidebar, compact tag grid, search filter
- TagCard: "LATEST" badge, compact layout, responsive design
- InstanceCard: compact 3-column layout, fewer scrolls needed
- InstancesManagementPage: 3-column grid, compact view
- Global hover-lift and hover-glow CSS utilities
- SidebarNav: subtle hover transition on links
832 lines
27 KiB
Go
832 lines
27 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"os"
|
||
"path/filepath"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/ocdp/cluster-service/internal/adapter/input/http/dto"
|
||
"github.com/ocdp/cluster-service/internal/domain/entity"
|
||
"github.com/ocdp/cluster-service/internal/domain/repository"
|
||
"github.com/ocdp/cluster-service/internal/pkg/authz"
|
||
corev1 "k8s.io/api/core/v1"
|
||
"k8s.io/apimachinery/pkg/api/resource"
|
||
)
|
||
|
||
// InstanceService Helm 实例管理领域服务
|
||
type InstanceService struct {
|
||
instanceRepo repository.InstanceRepository
|
||
clusterRepo repository.ClusterRepository
|
||
registryRepo repository.RegistryRepository
|
||
bindingRepo repository.WorkspaceClusterBindingRepository
|
||
helmClient repository.HelmClient
|
||
ociClient repository.OCIClient
|
||
entryClient repository.InstanceEntryClient
|
||
diagClient repository.InstanceDiagnosticsClient
|
||
workspaceRepo repository.WorkspaceRepository
|
||
tenantClient repository.TenantKubeClient
|
||
}
|
||
|
||
// NewInstanceService 创建实例服务
|
||
func NewInstanceService(
|
||
instanceRepo repository.InstanceRepository,
|
||
clusterRepo repository.ClusterRepository,
|
||
registryRepo repository.RegistryRepository,
|
||
helmClient repository.HelmClient,
|
||
ociClient repository.OCIClient,
|
||
entryClient repository.InstanceEntryClient,
|
||
bindingRepo ...repository.WorkspaceClusterBindingRepository,
|
||
) *InstanceService {
|
||
var workspaceBindingRepo repository.WorkspaceClusterBindingRepository
|
||
if len(bindingRepo) > 0 {
|
||
workspaceBindingRepo = bindingRepo[0]
|
||
}
|
||
return &InstanceService{
|
||
instanceRepo: instanceRepo,
|
||
clusterRepo: clusterRepo,
|
||
registryRepo: registryRepo,
|
||
bindingRepo: workspaceBindingRepo,
|
||
helmClient: helmClient,
|
||
ociClient: ociClient,
|
||
entryClient: entryClient,
|
||
}
|
||
}
|
||
|
||
func (s *InstanceService) SetDiagnosticsClient(client repository.InstanceDiagnosticsClient) {
|
||
s.diagClient = client
|
||
}
|
||
|
||
func (s *InstanceService) SetTenantProvisioning(workspaceRepo repository.WorkspaceRepository, tenantClient repository.TenantKubeClient) {
|
||
s.workspaceRepo = workspaceRepo
|
||
s.tenantClient = tenantClient
|
||
}
|
||
|
||
const chartCacheDir = "/tmp/charts"
|
||
|
||
func (s *InstanceService) chartArchivePath(instance *entity.Instance) string {
|
||
filename := fmt.Sprintf("%s-%s.tgz", instance.Chart, instance.Version)
|
||
return filepath.Join(chartCacheDir, filename)
|
||
}
|
||
|
||
func (s *InstanceService) downloadChart(ctx context.Context, registry *entity.Registry, instance *entity.Instance) error {
|
||
if err := os.MkdirAll(chartCacheDir, 0755); err != nil {
|
||
return fmt.Errorf("failed to ensure chart cache dir: %w", err)
|
||
}
|
||
chartPath := s.chartArchivePath(instance)
|
||
if err := s.ociClient.PullArtifact(ctx, registry, instance.Repository, instance.Version, chartPath); err != nil {
|
||
return fmt.Errorf("failed to download chart artifact: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// CreateInstance 创建(安装)新实例
|
||
func (s *InstanceService) CreateInstance(ctx context.Context, instance *entity.Instance) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 生成 ID
|
||
instance.ID = uuid.New().String()
|
||
instance.WorkspaceID = principal.WorkspaceID
|
||
instance.OwnerID = principal.UserID
|
||
|
||
// 验证
|
||
if err := instance.Validate(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 检查集群是否存在
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
if !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 检查 Registry 是否存在
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
if !authz.CanReadResource(principal, registry.WorkspaceID, registry.OwnerID, registry.Visibility) {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
if err := s.applyNamespacePolicy(ctx, principal, cluster, instance); err != nil {
|
||
return err
|
||
}
|
||
enforceNamespaceValues(instance)
|
||
if err := s.ensureTenantForInstance(ctx, principal, cluster, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 检查实例是否已存在
|
||
existingInstance, _ := s.instanceRepo.GetByClusterAndName(ctx, instance.ClusterID, instance.Name)
|
||
if existingInstance != nil {
|
||
return entity.ErrInstanceExists
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationInstall, "Preparing installation")
|
||
|
||
// 先写入数据库,记录 pending 状态
|
||
if err := s.instanceRepo.Create(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 下载 chart artifact 供 Helm 使用
|
||
if err := s.downloadChart(ctx, registry, instance); err != nil {
|
||
instance.MarkFailure("Failed to download chart", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 安装并监控状态
|
||
go s.executeAndSyncInstall(context.Background(), instance.ID, cluster, registry, instance)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// GetInstance 获取实例
|
||
func (s *InstanceService) GetInstance(ctx context.Context, id string) (*entity.Instance, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if !s.canReadInstance(principal, instance) {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
return instance, nil
|
||
}
|
||
|
||
// GetInstanceStatus 获取实例实时状态
|
||
func (s *InstanceService) GetInstanceStatus(ctx context.Context, id string) (*entity.Instance, error) {
|
||
// 从数据库获取基本信息
|
||
instance, err := s.GetInstance(ctx, id)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 从 Helm 获取实时状态
|
||
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, instance.Name, instance.Namespace)
|
||
if err != nil {
|
||
return instance, err // 返回数据库中的信息,但标记错误
|
||
}
|
||
|
||
// 合并实时状态
|
||
instance.Status = liveStatus.Status
|
||
instance.Revision = liveStatus.Revision
|
||
|
||
return instance, nil
|
||
}
|
||
|
||
// UpdateInstance 更新(升级)实例
|
||
func (s *InstanceService) UpdateInstance(ctx context.Context, instance *entity.Instance) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 检查实例是否存在
|
||
existingInstance, err := s.instanceRepo.GetByID(ctx, instance.ID)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, existingInstance) {
|
||
return entity.ErrForbidden
|
||
}
|
||
instance.WorkspaceID = existingInstance.WorkspaceID
|
||
instance.OwnerID = existingInstance.OwnerID
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, existingInstance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 获取 Registry 信息
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
|
||
instance.Namespace = existingInstance.Namespace
|
||
enforceNamespaceValues(instance)
|
||
instance.BeginOperation(entity.OperationUpgrade, "Pending upgrade")
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 下载所需 Chart
|
||
if err := s.downloadChart(ctx, registry, instance); err != nil {
|
||
instance.MarkFailure("Failed to download chart", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 升级并监控状态
|
||
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, registry, instance)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// DeleteInstance 删除(卸载)实例
|
||
func (s *InstanceService) DeleteInstance(ctx context.Context, id string) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 检查实例是否存在
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, instance) {
|
||
return entity.ErrForbidden
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationDelete, "Pending uninstall")
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 卸载并监控状态
|
||
go s.executeAndSyncUninstall(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// RollbackInstance 回滚实例
|
||
func (s *InstanceService) RollbackInstance(ctx context.Context, id string, revision int) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 检查实例是否存在
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, instance) {
|
||
return entity.ErrForbidden
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationRollback, fmt.Sprintf("Rolling back to revision %d", revision))
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 回滚并监控状态
|
||
go s.executeAndSyncRollback(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace, revision)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// GetInstanceHistory 获取实例历史
|
||
func (s *InstanceService) GetInstanceHistory(ctx context.Context, id string) ([]*entity.ReleaseHistory, error) {
|
||
// 检查实例是否存在
|
||
instance, err := s.GetInstance(ctx, id)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 从 Helm 获取历史
|
||
return s.helmClient.GetHistory(ctx, cluster, instance.Name, instance.Namespace)
|
||
}
|
||
|
||
// ListInstancesByCluster 列出集群的所有实例
|
||
func (s *InstanceService) ListInstancesByCluster(ctx context.Context, clusterID string) ([]*entity.Instance, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
// 检查集群是否存在
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
if !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
instances, err := s.instanceRepo.ListByCluster(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
visible := make([]*entity.Instance, 0, len(instances))
|
||
for _, instance := range instances {
|
||
if s.canReadInstance(principal, instance) {
|
||
visible = append(visible, instance)
|
||
}
|
||
}
|
||
return visible, nil
|
||
}
|
||
|
||
// ListInstanceEntries 列出实例关联的入口信息(Service / Ingress)
|
||
func (s *InstanceService) ListInstanceEntries(ctx context.Context, clusterID, instanceID string) ([]*entity.InstanceEntry, error) {
|
||
instance, err := s.GetInstance(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
if s.entryClient == nil {
|
||
return nil, fmt.Errorf("instance entry client is not configured")
|
||
}
|
||
|
||
return s.entryClient.ListEntries(ctx, cluster, instance)
|
||
}
|
||
|
||
func (s *InstanceService) GetInstanceDiagnostics(ctx context.Context, clusterID, instanceID string, tailLines int64) (*entity.InstanceDiagnostics, error) {
|
||
instance, err := s.GetInstance(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
if s.diagClient == nil {
|
||
return nil, fmt.Errorf("instance diagnostics client is not configured")
|
||
}
|
||
return s.diagClient.GetDiagnostics(ctx, cluster, instance, tailLines)
|
||
}
|
||
|
||
func (s *InstanceService) StreamInstanceLogs(ctx context.Context, clusterID, instanceID, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
|
||
instance, err := s.GetInstance(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, nil, entity.ErrInstanceNotFound
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, nil, entity.ErrClusterNotFound
|
||
}
|
||
if s.diagClient == nil {
|
||
return nil, nil, fmt.Errorf("instance diagnostics client is not configured")
|
||
}
|
||
streamer, ok := s.diagClient.(repository.PodLogStreamer)
|
||
if !ok {
|
||
return nil, nil, fmt.Errorf("diagnostics client does not support log streaming")
|
||
}
|
||
return streamer.StreamPodLogs(ctx, cluster, instance.Namespace, podName, containerName, tailLines)
|
||
}
|
||
|
||
// ScaleInstance 扩缩容实例(修改 replicaCount 后执行 Helm upgrade)
|
||
func (s *InstanceService) ScaleInstance(ctx context.Context, clusterID, instanceID string, replicas int, workload string) (*entity.Instance, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, instance) {
|
||
return nil, entity.ErrForbidden
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
// Get existing Helm values and patch replicaCount
|
||
vals, err := s.helmClient.GetValues(ctx, cluster, instance.Name, instance.Namespace)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to get current values: %w", err)
|
||
}
|
||
if vals == nil {
|
||
vals = make(map[string]interface{})
|
||
}
|
||
vals["replicaCount"] = replicas
|
||
|
||
instance.SetValues(vals)
|
||
instance.BeginOperation(entity.OperationUpgrade, fmt.Sprintf("Scaling to %d replicas", replicas))
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, nil, instance)
|
||
return instance, nil
|
||
}
|
||
|
||
// GetInstanceValuesDiff 获取实例当前 values 与 chart 默认 values 的差异
|
||
func (s *InstanceService) GetInstanceValuesDiff(ctx context.Context, clusterID, instanceID string) (*dto.InstanceValuesDiffResponse, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canReadInstance(principal, instance) {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
current, err := s.helmClient.GetValues(ctx, cluster, instance.Name, instance.Namespace)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Get default values from the chart archive
|
||
chartPath := s.chartArchivePath(instance)
|
||
defaults, err := s.helmClient.GetChartDefaultValues(chartPath)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to read chart defaults: %w", err)
|
||
}
|
||
|
||
return &dto.InstanceValuesDiffResponse{
|
||
Current: current,
|
||
Defaults: defaults,
|
||
}, nil
|
||
}
|
||
|
||
func (s *InstanceService) canReadInstance(principal *authz.Principal, instance *entity.Instance) bool {
|
||
if principal.IsAdmin() {
|
||
return true
|
||
}
|
||
return instance.WorkspaceID == principal.WorkspaceID && instance.OwnerID == principal.UserID
|
||
}
|
||
|
||
func (s *InstanceService) canWriteInstance(principal *authz.Principal, instance *entity.Instance) bool {
|
||
if principal.IsAdmin() {
|
||
return true
|
||
}
|
||
return instance.WorkspaceID == principal.WorkspaceID && instance.OwnerID == principal.UserID
|
||
}
|
||
|
||
func enforceNamespaceValues(instance *entity.Instance) {
|
||
if instance == nil || instance.Namespace == "" {
|
||
return
|
||
}
|
||
if instance.Values == nil {
|
||
instance.Values = map[string]interface{}{}
|
||
}
|
||
instance.Values["namespace"] = instance.Namespace
|
||
setExistingStringValue(instance.Values, "namespaceOverride", instance.Namespace)
|
||
setExistingStringValue(instance.Values, "namespace_override", instance.Namespace)
|
||
setExistingStringValue(instance.Values, "targetNamespace", instance.Namespace)
|
||
setExistingStringValue(instance.Values, "target_namespace", instance.Namespace)
|
||
setExistingNestedStringValue(instance.Values, "global", "namespace", instance.Namespace)
|
||
setExistingNestedStringValue(instance.Values, "global", "namespaceOverride", instance.Namespace)
|
||
setExistingNestedStringValue(instance.Values, "global", "namespace_override", instance.Namespace)
|
||
}
|
||
|
||
func setExistingStringValue(values map[string]interface{}, key, namespace string) {
|
||
if _, ok := values[key]; ok {
|
||
values[key] = namespace
|
||
}
|
||
}
|
||
|
||
func setExistingNestedStringValue(values map[string]interface{}, parent, key, namespace string) {
|
||
child, ok := values[parent].(map[string]interface{})
|
||
if !ok {
|
||
return
|
||
}
|
||
if _, ok := child[key]; ok {
|
||
child[key] = namespace
|
||
}
|
||
}
|
||
|
||
func (s *InstanceService) applyNamespacePolicy(ctx context.Context, principal *authz.Principal, cluster *entity.Cluster, instance *entity.Instance) error {
|
||
if principal.IsAdmin() {
|
||
if isProtectedSystemNamespace(instance.Namespace) {
|
||
return entity.ErrInvalidNamespace
|
||
}
|
||
return nil
|
||
}
|
||
if isReservedNamespace(instance.Namespace) {
|
||
return entity.ErrInvalidNamespace
|
||
}
|
||
if cluster.Visibility != authz.VisibilityPrivate || cluster.OwnerID != principal.UserID {
|
||
namespace := principal.Namespace
|
||
if namespace == "" {
|
||
namespace = entity.NamespaceForWorkspace(principal.WorkspaceName)
|
||
}
|
||
if s.bindingRepo != nil {
|
||
if binding, err := s.bindingRepo.Get(ctx, principal.WorkspaceID, cluster.ID); err == nil && binding != nil && binding.Namespace != "" {
|
||
namespace = binding.Namespace
|
||
}
|
||
}
|
||
instance.Namespace = namespace
|
||
return nil
|
||
}
|
||
if instance.Namespace == "" {
|
||
if cluster.DefaultNamespace != "" {
|
||
instance.Namespace = cluster.DefaultNamespace
|
||
} else if principal.Namespace != "" {
|
||
instance.Namespace = principal.Namespace
|
||
} else {
|
||
instance.Namespace = entity.NamespaceForWorkspace(principal.Username)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *InstanceService) ensureTenantForInstance(ctx context.Context, principal *authz.Principal, cluster *entity.Cluster, instance *entity.Instance) error {
|
||
if principal.IsAdmin() || s.workspaceRepo == nil || s.tenantClient == nil {
|
||
return nil
|
||
}
|
||
workspace, err := s.workspaceRepo.GetByID(ctx, principal.WorkspaceID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if workspace.Status == entity.WorkspaceSuspended {
|
||
return entity.ErrWorkspaceSuspended
|
||
}
|
||
binding := entity.NewTenantBinding(instance.Namespace)
|
||
binding.ServiceAccountName = workspace.K8sSAName
|
||
binding.ResourceQuotaHard = instanceResourceQuotaHard(workspace)
|
||
if err := s.tenantClient.EnsureTenant(ctx, cluster, binding); err != nil {
|
||
return err
|
||
}
|
||
if s.bindingRepo != nil {
|
||
_ = s.bindingRepo.Upsert(ctx, &entity.WorkspaceClusterBinding{
|
||
ID: uuid.New().String(),
|
||
WorkspaceID: workspace.ID,
|
||
ClusterID: cluster.ID,
|
||
Namespace: instance.Namespace,
|
||
ServiceAccount: workspace.K8sSAName,
|
||
QuotaCPU: workspace.QuotaCPU,
|
||
QuotaMemory: workspace.QuotaMemory,
|
||
QuotaGPU: workspace.QuotaGPU,
|
||
QuotaGPUMem: workspace.QuotaGPUMem,
|
||
Status: "active",
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
})
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func instanceResourceQuotaHard(workspace *entity.Workspace) corev1.ResourceList {
|
||
hard := corev1.ResourceList{}
|
||
addQuantity := func(name corev1.ResourceName, value string) {
|
||
value = normalizeStandardQuotaQuantity(value)
|
||
if value == "" {
|
||
return
|
||
}
|
||
if quantity, err := resource.ParseQuantity(value); err == nil {
|
||
hard[name] = quantity
|
||
}
|
||
}
|
||
addGPUMemoryQuantity := func(value string) {
|
||
value, err := normalizeGPUMemoryQuota(value)
|
||
if err != nil || value == "" {
|
||
return
|
||
}
|
||
if quantity, err := resource.ParseQuantity(value); err == nil {
|
||
hard[corev1.ResourceName("requests.nvidia.com/gpumem")] = quantity
|
||
}
|
||
}
|
||
if workspace == nil {
|
||
return hard
|
||
}
|
||
addQuantity(corev1.ResourceName("requests.cpu"), workspace.QuotaCPU)
|
||
addQuantity(corev1.ResourceName("requests.memory"), workspace.QuotaMemory)
|
||
addQuantity(corev1.ResourceName("requests.nvidia.com/gpu"), workspace.QuotaGPU)
|
||
addGPUMemoryQuantity(workspace.QuotaGPUMem)
|
||
return hard
|
||
}
|
||
|
||
func isReservedNamespace(namespace string) bool {
|
||
switch namespace {
|
||
case "default", "kube-system", "kube-public", "kube-node-lease":
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func isProtectedSystemNamespace(namespace string) bool {
|
||
switch namespace {
|
||
case "kube-system", "kube-public", "kube-node-lease":
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
// executeAndSyncInstall 异步执行安装并监控状态
|
||
func (s *InstanceService) executeAndSyncInstall(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
|
||
// 执行 Helm 安装
|
||
if err := s.helmClient.Install(ctx, cluster, instance); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm install failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 安装成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationInstall)
|
||
}
|
||
|
||
// executeAndSyncUpgrade 异步执行升级并监控状态
|
||
func (s *InstanceService) executeAndSyncUpgrade(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
|
||
// 执行 Helm 升级
|
||
if err := s.helmClient.Upgrade(ctx, cluster, instance); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm upgrade failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 升级成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationUpgrade)
|
||
}
|
||
|
||
// executeAndSyncRollback 异步执行回滚并监控状态
|
||
func (s *InstanceService) executeAndSyncRollback(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, revision int) {
|
||
// 执行 Helm 回滚
|
||
if err := s.helmClient.Rollback(ctx, cluster, releaseName, namespace, revision); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm rollback failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 回滚成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, releaseName, namespace, entity.OperationRollback)
|
||
}
|
||
|
||
// executeAndSyncUninstall 异步执行卸载并监控状态
|
||
func (s *InstanceService) executeAndSyncUninstall(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string) {
|
||
// 执行 Helm 卸载
|
||
err := s.helmClient.Uninstall(ctx, cluster, releaseName, namespace)
|
||
|
||
// 获取实例
|
||
instance, getErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if getErr != nil {
|
||
return
|
||
}
|
||
|
||
if err != nil {
|
||
// 如果错误不是"未找到",则标记为失败
|
||
if !errors.Is(err, entity.ErrInstanceNotFound) {
|
||
instance.MarkFailure("Helm uninstall failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
} else {
|
||
// 如果未找到,说明已经卸载,直接删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 卸载成功,标记为已卸载
|
||
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Instance uninstalled successfully")
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
|
||
// 验证卸载是否完成:尝试获取状态,如果获取不到说明已卸载
|
||
time.Sleep(3 * time.Second)
|
||
_, statusErr := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
|
||
if statusErr != nil {
|
||
// 无法获取状态,说明已卸载,删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
} else {
|
||
// 仍然可以获取状态,可能还在卸载中,继续等待
|
||
// 设置状态为 uninstalled,但不删除记录,让用户手动删除或等待自动清理
|
||
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Uninstall in progress")
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
}
|
||
|
||
// syncInstanceStatus 同步实例状态(定期检查 Helm 状态并更新数据库)
|
||
func (s *InstanceService) syncInstanceStatus(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, operation entity.InstanceOperation) {
|
||
maxAttempts := 30 // 最多尝试30次(约5分钟)
|
||
interval := 10 * time.Second // 每10秒检查一次
|
||
|
||
for i := 0; i < maxAttempts; i++ {
|
||
time.Sleep(interval)
|
||
|
||
// 获取数据库中的实例
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
// 实例不存在,停止同步
|
||
return
|
||
}
|
||
|
||
// 从 Helm 获取实时状态
|
||
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
|
||
if err != nil {
|
||
// 如果获取状态失败,可能是还在部署中,继续等待
|
||
if i < maxAttempts-1 {
|
||
continue
|
||
}
|
||
// 最后一次尝试失败,标记为失败
|
||
instance.MarkFailure("Failed to get status from Helm", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return
|
||
}
|
||
|
||
// 根据操作类型和 Helm 状态更新实例状态
|
||
shouldUpdate := false
|
||
switch operation {
|
||
case entity.OperationInstall:
|
||
// 安装操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance deployed successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Installation failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
case entity.OperationUpgrade:
|
||
// 升级操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance upgraded successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Upgrade failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
case entity.OperationRollback:
|
||
// 回滚操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance rolled back successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Rollback failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
}
|
||
|
||
// 如果状态已更新为最终状态,停止同步
|
||
if shouldUpdate {
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return
|
||
}
|
||
|
||
// 如果状态已经是最终状态(deployed 或 failed),停止同步
|
||
if instance.Status == entity.StatusDeployed || instance.Status == entity.StatusFailed {
|
||
return
|
||
}
|
||
}
|
||
|
||
// 超时,标记为失败
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err == nil && instance != nil {
|
||
instance.MarkFailure("Operation timeout", fmt.Errorf("Status sync timeout after %d attempts", maxAttempts))
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
}
|