- Add GetMetrics method to MetricsClient interface and implement cluster metrics API - Add QuotaPrecheck service for validating resource quotas before deployment - Add auth DTO with role/permission models and auth handler tests - Add instance diagnostics: mounted NFS volumes, labels, annotations in pod diagnostics - Update workspace handler with GetWorkspace endpoint and shared-user list - Fix monitoring handler to use correct service method name - Add tail_lines fallback in instance handler for snake_case query params - Update nginx config for SSE log streaming support (no buffering) - Add comprehensive test coverage: auth_service_test, auth_handler_test, auth_dto_test, metrics_client_test, quota_precheck_test - Update error messages for quota validation and instance operations - ModifyModal: fix YAML lineWidth:0, modified keys summary, delta-only submit - InstanceCard: correctly disable scale-minus when replicas <= 0 - SidebarLayout: add hover transition for sidebar items - Update todo.md and lessons.md with latest fixes
1086 lines
35 KiB
Go
1086 lines
35 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/ocdp/cluster-service/internal/adapter/input/http/dto"
|
||
"github.com/ocdp/cluster-service/internal/domain/entity"
|
||
"github.com/ocdp/cluster-service/internal/domain/repository"
|
||
"github.com/ocdp/cluster-service/internal/pkg/authz"
|
||
corev1 "k8s.io/api/core/v1"
|
||
"k8s.io/apimachinery/pkg/api/resource"
|
||
)
|
||
|
||
// ScaleClient defines the interface for K8s-native workload scaling
|
||
type ScaleClient interface {
|
||
GetDeploymentReplicas(ctx context.Context, cluster *entity.Cluster, namespace, releaseName string) (int32, error)
|
||
ScaleDeployment(ctx context.Context, cluster *entity.Cluster, namespace, releaseName string, replicas int32) error
|
||
}
|
||
|
||
// InstanceService Helm 实例管理领域服务
|
||
type InstanceService struct {
|
||
instanceRepo repository.InstanceRepository
|
||
clusterRepo repository.ClusterRepository
|
||
registryRepo repository.RegistryRepository
|
||
bindingRepo repository.WorkspaceClusterBindingRepository
|
||
helmClient repository.HelmClient
|
||
ociClient repository.OCIClient
|
||
entryClient repository.InstanceEntryClient
|
||
diagClient repository.InstanceDiagnosticsClient
|
||
workspaceRepo repository.WorkspaceRepository
|
||
userRepo repository.UserRepository
|
||
tenantClient repository.TenantKubeClient
|
||
scaleClient ScaleClient
|
||
}
|
||
|
||
// NewInstanceService 创建实例服务
|
||
func NewInstanceService(
|
||
instanceRepo repository.InstanceRepository,
|
||
clusterRepo repository.ClusterRepository,
|
||
registryRepo repository.RegistryRepository,
|
||
helmClient repository.HelmClient,
|
||
ociClient repository.OCIClient,
|
||
entryClient repository.InstanceEntryClient,
|
||
bindingRepo ...repository.WorkspaceClusterBindingRepository,
|
||
) *InstanceService {
|
||
var workspaceBindingRepo repository.WorkspaceClusterBindingRepository
|
||
if len(bindingRepo) > 0 {
|
||
workspaceBindingRepo = bindingRepo[0]
|
||
}
|
||
return &InstanceService{
|
||
instanceRepo: instanceRepo,
|
||
clusterRepo: clusterRepo,
|
||
registryRepo: registryRepo,
|
||
bindingRepo: workspaceBindingRepo,
|
||
helmClient: helmClient,
|
||
ociClient: ociClient,
|
||
entryClient: entryClient,
|
||
}
|
||
}
|
||
|
||
func (s *InstanceService) SetDiagnosticsClient(client repository.InstanceDiagnosticsClient) {
|
||
s.diagClient = client
|
||
}
|
||
|
||
func (s *InstanceService) SetScaleClient(client ScaleClient) {
|
||
s.scaleClient = client
|
||
}
|
||
|
||
func (s *InstanceService) SetTenantProvisioning(workspaceRepo repository.WorkspaceRepository, tenantClient repository.TenantKubeClient) {
|
||
s.workspaceRepo = workspaceRepo
|
||
s.tenantClient = tenantClient
|
||
}
|
||
|
||
func (s *InstanceService) SetUserRepository(userRepo repository.UserRepository) {
|
||
s.userRepo = userRepo
|
||
}
|
||
|
||
const chartCacheDir = "/tmp/charts"
|
||
|
||
func (s *InstanceService) chartArchivePath(instance *entity.Instance) string {
|
||
filename := fmt.Sprintf("%s-%s.tgz", instance.Chart, instance.Version)
|
||
return filepath.Join(chartCacheDir, filename)
|
||
}
|
||
|
||
func (s *InstanceService) downloadChart(ctx context.Context, registry *entity.Registry, instance *entity.Instance) error {
|
||
if err := os.MkdirAll(chartCacheDir, 0755); err != nil {
|
||
return fmt.Errorf("failed to ensure chart cache dir: %w", err)
|
||
}
|
||
chartPath := s.chartArchivePath(instance)
|
||
if err := s.ociClient.PullArtifact(ctx, registry, instance.Repository, instance.Version, chartPath); err != nil {
|
||
return fmt.Errorf("failed to download chart artifact: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// CreateInstance 创建(安装)新实例
|
||
func (s *InstanceService) CreateInstance(ctx context.Context, instance *entity.Instance) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 生成 ID
|
||
instance.ID = uuid.New().String()
|
||
instance.WorkspaceID = principal.WorkspaceID
|
||
instance.OwnerID = principal.UserID
|
||
|
||
// 验证
|
||
if err := instance.Validate(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 检查集群是否存在
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
if !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 检查 Registry 是否存在
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
if !authz.CanReadResource(principal, registry.WorkspaceID, registry.OwnerID, registry.Visibility) {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
if err := s.applyNamespacePolicy(ctx, principal, cluster, instance); err != nil {
|
||
return err
|
||
}
|
||
enforceNamespaceValues(instance)
|
||
|
||
existingInstance, _ := s.instanceRepo.GetByClusterAndName(ctx, instance.ClusterID, instance.Name)
|
||
if existingInstance != nil {
|
||
return entity.ErrInstanceExists
|
||
}
|
||
if err := s.downloadChart(ctx, registry, instance); err != nil {
|
||
return err
|
||
}
|
||
binding, err := s.ensureTenantForInstance(ctx, principal, cluster, instance)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if err := s.precheckInstanceQuota(ctx, principal, cluster, binding, instance, nil); err != nil {
|
||
return err
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationInstall, "Preparing installation")
|
||
|
||
// 先写入数据库,记录 pending 状态
|
||
if err := s.instanceRepo.Create(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 安装并监控状态
|
||
go s.executeAndSyncInstall(context.Background(), instance.ID, cluster, registry, instance)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// GetInstance 获取实例
|
||
func (s *InstanceService) GetInstance(ctx context.Context, id string) (*entity.Instance, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if !s.canReadInstance(principal, instance) {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
s.enrichOwnerUsernames(ctx, []*entity.Instance{instance})
|
||
return instance, nil
|
||
}
|
||
|
||
// GetInstanceStatus 获取实例实时状态
|
||
func (s *InstanceService) GetInstanceStatus(ctx context.Context, id string) (*entity.Instance, error) {
|
||
// 从数据库获取基本信息
|
||
instance, err := s.GetInstance(ctx, id)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 从 Helm 获取实时状态
|
||
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, instance.Name, instance.Namespace)
|
||
if err != nil {
|
||
return instance, err // 返回数据库中的信息,但标记错误
|
||
}
|
||
|
||
// 合并实时状态
|
||
instance.Status = liveStatus.Status
|
||
instance.Revision = liveStatus.Revision
|
||
|
||
return instance, nil
|
||
}
|
||
|
||
// UpdateInstance 更新(升级)实例
|
||
func (s *InstanceService) UpdateInstance(ctx context.Context, instance *entity.Instance) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 检查实例是否存在
|
||
existingInstance, err := s.instanceRepo.GetByID(ctx, instance.ID)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, existingInstance) {
|
||
return entity.ErrForbidden
|
||
}
|
||
instance.ClusterID = existingInstance.ClusterID
|
||
instance.WorkspaceID = existingInstance.WorkspaceID
|
||
instance.OwnerID = existingInstance.OwnerID
|
||
instance.Name = existingInstance.Name
|
||
if instance.RegistryID == "" {
|
||
instance.RegistryID = existingInstance.RegistryID
|
||
}
|
||
if instance.Repository == "" {
|
||
instance.Repository = existingInstance.Repository
|
||
}
|
||
if instance.Chart == "" {
|
||
instance.Chart = existingInstance.Chart
|
||
}
|
||
if instance.Version == "" {
|
||
instance.Version = existingInstance.Version
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, existingInstance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 获取 Registry 信息
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return entity.ErrRegistryNotFound
|
||
}
|
||
|
||
instance.Namespace = existingInstance.Namespace
|
||
enforceNamespaceValues(instance)
|
||
|
||
// 下载所需 Chart
|
||
if err := s.downloadChart(ctx, registry, instance); err != nil {
|
||
return err
|
||
}
|
||
binding, err := s.ensureTenantForInstance(ctx, principal, cluster, instance)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if err := s.precheckInstanceQuota(ctx, principal, cluster, binding, instance, existingInstance); err != nil {
|
||
return err
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationUpgrade, "Pending upgrade")
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 升级并监控状态
|
||
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, registry, instance)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// DeleteInstance 删除(卸载)实例
|
||
func (s *InstanceService) DeleteInstance(ctx context.Context, id string) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 检查实例是否存在
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, instance) {
|
||
return entity.ErrForbidden
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationDelete, "Pending uninstall")
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 卸载并监控状态
|
||
go s.executeAndSyncUninstall(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// RollbackInstance 回滚实例
|
||
func (s *InstanceService) RollbackInstance(ctx context.Context, id string, revision int) error {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return entity.ErrUnauthorized
|
||
}
|
||
// 检查实例是否存在
|
||
instance, err := s.instanceRepo.GetByID(ctx, id)
|
||
if err != nil {
|
||
return entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, instance) {
|
||
return entity.ErrForbidden
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return entity.ErrClusterNotFound
|
||
}
|
||
|
||
instance.BeginOperation(entity.OperationRollback, fmt.Sprintf("Rolling back to revision %d", revision))
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 异步执行 Helm 回滚并监控状态
|
||
go s.executeAndSyncRollback(context.Background(), instance.ID, cluster, instance.Name, instance.Namespace, revision)
|
||
|
||
// 立即返回,状态同步由后台任务处理
|
||
return nil
|
||
}
|
||
|
||
// GetInstanceHistory 获取实例历史
|
||
func (s *InstanceService) GetInstanceHistory(ctx context.Context, id string) ([]*entity.ReleaseHistory, error) {
|
||
// 检查实例是否存在
|
||
instance, err := s.GetInstance(ctx, id)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
// 获取集群信息
|
||
cluster, err := s.clusterRepo.GetByID(ctx, instance.ClusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
// 从 Helm 获取历史
|
||
return s.helmClient.GetHistory(ctx, cluster, instance.Name, instance.Namespace)
|
||
}
|
||
|
||
// ListInstancesByCluster 列出集群的所有实例
|
||
func (s *InstanceService) ListInstancesByCluster(ctx context.Context, clusterID string) ([]*entity.Instance, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
// 检查集群是否存在
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
if !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
instances, err := s.instanceRepo.ListByCluster(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
visible := make([]*entity.Instance, 0, len(instances))
|
||
for _, instance := range instances {
|
||
if s.canReadInstance(principal, instance) {
|
||
visible = append(visible, instance)
|
||
}
|
||
}
|
||
s.enrichOwnerUsernames(ctx, visible)
|
||
return visible, nil
|
||
}
|
||
|
||
func (s *InstanceService) enrichOwnerUsernames(ctx context.Context, instances []*entity.Instance) {
|
||
if s.userRepo == nil || len(instances) == 0 {
|
||
return
|
||
}
|
||
usernames := make(map[string]string)
|
||
for _, instance := range instances {
|
||
if instance == nil || instance.OwnerID == "" {
|
||
continue
|
||
}
|
||
if username, ok := usernames[instance.OwnerID]; ok {
|
||
instance.OwnerUsername = username
|
||
continue
|
||
}
|
||
user, err := s.userRepo.GetByID(ctx, instance.OwnerID)
|
||
if err != nil || user == nil {
|
||
continue
|
||
}
|
||
usernames[instance.OwnerID] = user.Username
|
||
instance.OwnerUsername = user.Username
|
||
}
|
||
}
|
||
|
||
// ListInstanceEntries 列出实例关联的入口信息(Service / Ingress)
|
||
func (s *InstanceService) ListInstanceEntries(ctx context.Context, clusterID, instanceID string) ([]*entity.InstanceEntry, error) {
|
||
instance, err := s.GetInstance(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
if s.entryClient == nil {
|
||
return nil, fmt.Errorf("instance entry client is not configured")
|
||
}
|
||
|
||
return s.entryClient.ListEntries(ctx, cluster, instance)
|
||
}
|
||
|
||
func (s *InstanceService) GetInstanceDiagnostics(ctx context.Context, clusterID, instanceID string, tailLines int64) (*entity.InstanceDiagnostics, error) {
|
||
instance, err := s.GetInstance(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
if s.diagClient == nil {
|
||
return nil, fmt.Errorf("instance diagnostics client is not configured")
|
||
}
|
||
return s.diagClient.GetDiagnostics(ctx, cluster, instance, tailLines)
|
||
}
|
||
|
||
func (s *InstanceService) StreamInstanceLogs(ctx context.Context, clusterID, instanceID, podName, containerName string, tailLines int64) (<-chan string, <-chan error, error) {
|
||
instance, err := s.GetInstance(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, nil, entity.ErrInstanceNotFound
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, nil, entity.ErrClusterNotFound
|
||
}
|
||
if s.diagClient == nil {
|
||
return nil, nil, fmt.Errorf("instance diagnostics client is not configured")
|
||
}
|
||
streamer, ok := s.diagClient.(repository.PodLogStreamer)
|
||
if !ok {
|
||
return nil, nil, fmt.Errorf("diagnostics client does not support log streaming")
|
||
}
|
||
return streamer.StreamPodLogs(ctx, cluster, instance.Namespace, podName, containerName, tailLines)
|
||
}
|
||
|
||
// ScaleInstance 扩缩容实例(修改 replicaCount 后执行 Helm upgrade)
|
||
func (s *InstanceService) ScaleInstance(ctx context.Context, clusterID, instanceID string, replicas int, workload string) (*entity.Instance, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canWriteInstance(principal, instance) {
|
||
return nil, entity.ErrForbidden
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
current := cloneInstanceForQuota(instance)
|
||
currentValues, err := s.helmClient.GetValues(ctx, cluster, instance.Name, instance.Namespace)
|
||
if err == nil && currentValues != nil {
|
||
current.SetValues(currentValues)
|
||
}
|
||
target := cloneInstanceForQuota(instance)
|
||
targetValues := copyValues(current.Values)
|
||
if targetValues == nil {
|
||
targetValues = copyValues(instance.Values)
|
||
}
|
||
if targetValues == nil {
|
||
targetValues = map[string]interface{}{}
|
||
}
|
||
targetValues["replicaCount"] = replicas
|
||
target.SetValues(targetValues)
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return nil, entity.ErrRegistryNotFound
|
||
}
|
||
if err := s.downloadChart(ctx, registry, target); err != nil {
|
||
return nil, err
|
||
}
|
||
binding, err := s.ensureTenantForInstance(ctx, principal, cluster, target)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if err := s.precheckInstanceQuota(ctx, principal, cluster, binding, target, current); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Scale via K8s API directly (like kubectl scale deploy --replicas=N)
|
||
if s.scaleClient != nil {
|
||
if err := s.scaleClient.ScaleDeployment(ctx, cluster, instance.Namespace, instance.Name, int32(replicas)); err != nil {
|
||
return nil, fmt.Errorf("failed to scale deployment: %w", err)
|
||
}
|
||
instance.SetValues(targetValues)
|
||
instance.Replicas = replicas
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return nil, err
|
||
}
|
||
} else {
|
||
// Fallback: Helm upgrade with replicaCount
|
||
instance.SetValues(targetValues)
|
||
instance.BeginOperation(entity.OperationUpgrade, fmt.Sprintf("Scaling to %d replicas", replicas))
|
||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||
return nil, err
|
||
}
|
||
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, nil, instance)
|
||
}
|
||
|
||
return instance, nil
|
||
}
|
||
|
||
// EnrichReplicas 批量获取实例的 K8s 实际副本数并设置到 entity 上
|
||
func (s *InstanceService) EnrichReplicas(ctx context.Context, clusterID string, instances []*entity.Instance) []*entity.Instance {
|
||
if s.scaleClient == nil || len(instances) == 0 {
|
||
return instances
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return instances
|
||
}
|
||
for _, inst := range instances {
|
||
r, err := s.scaleClient.GetDeploymentReplicas(ctx, cluster, inst.Namespace, inst.Name)
|
||
if err == nil {
|
||
inst.Replicas = int(r)
|
||
}
|
||
}
|
||
return instances
|
||
}
|
||
|
||
// GetRunningReplicas returns the actual K8s deployment replicas count.
|
||
func (s *InstanceService) GetRunningReplicas(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance) int {
|
||
if s.scaleClient == nil {
|
||
return 0
|
||
}
|
||
r, err := s.scaleClient.GetDeploymentReplicas(ctx, cluster, instance.Namespace, instance.Name)
|
||
if err != nil {
|
||
return 0
|
||
}
|
||
return int(r)
|
||
}
|
||
|
||
// GetInstanceValuesDiff 获取实例当前 values 与 chart 默认 values 的差异
|
||
func (s *InstanceService) GetInstanceValuesDiff(ctx context.Context, clusterID, instanceID string) (*dto.InstanceValuesDiffResponse, error) {
|
||
principal, err := authz.RequirePrincipal(ctx)
|
||
if err != nil {
|
||
return nil, entity.ErrUnauthorized
|
||
}
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if !s.canReadInstance(principal, instance) {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
if instance.ClusterID != clusterID {
|
||
return nil, entity.ErrInstanceNotFound
|
||
}
|
||
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
|
||
if err != nil {
|
||
return nil, entity.ErrClusterNotFound
|
||
}
|
||
|
||
current, err := s.helmClient.GetValues(ctx, cluster, instance.Name, instance.Namespace)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Get default values from the chart archive
|
||
chartPath := s.chartArchivePath(instance)
|
||
if _, statErr := os.Stat(chartPath); statErr != nil {
|
||
if !errors.Is(statErr, os.ErrNotExist) {
|
||
return nil, fmt.Errorf("failed to inspect chart defaults: %w", statErr)
|
||
}
|
||
registry, err := s.registryRepo.GetByID(ctx, instance.RegistryID)
|
||
if err != nil {
|
||
return nil, entity.ErrRegistryNotFound
|
||
}
|
||
if err := s.downloadChart(ctx, registry, instance); err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
defaults, err := s.helmClient.GetChartDefaultValues(chartPath)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to read chart defaults: %w", err)
|
||
}
|
||
|
||
return &dto.InstanceValuesDiffResponse{
|
||
Current: current,
|
||
Defaults: defaults,
|
||
}, nil
|
||
}
|
||
|
||
func (s *InstanceService) canReadInstance(principal *authz.Principal, instance *entity.Instance) bool {
|
||
if principal.IsAdmin() {
|
||
return true
|
||
}
|
||
return instance.WorkspaceID == principal.WorkspaceID && instance.OwnerID == principal.UserID
|
||
}
|
||
|
||
func (s *InstanceService) canWriteInstance(principal *authz.Principal, instance *entity.Instance) bool {
|
||
if principal.IsAdmin() {
|
||
return true
|
||
}
|
||
return instance.WorkspaceID == principal.WorkspaceID && instance.OwnerID == principal.UserID
|
||
}
|
||
|
||
func enforceNamespaceValues(instance *entity.Instance) {
|
||
if instance == nil || instance.Namespace == "" {
|
||
return
|
||
}
|
||
if instance.Values == nil {
|
||
instance.Values = map[string]interface{}{}
|
||
}
|
||
instance.Values["namespace"] = instance.Namespace
|
||
setExistingStringValue(instance.Values, "namespaceOverride", instance.Namespace)
|
||
setExistingStringValue(instance.Values, "namespace_override", instance.Namespace)
|
||
setExistingStringValue(instance.Values, "targetNamespace", instance.Namespace)
|
||
setExistingStringValue(instance.Values, "target_namespace", instance.Namespace)
|
||
setExistingNestedStringValue(instance.Values, "global", "namespace", instance.Namespace)
|
||
setExistingNestedStringValue(instance.Values, "global", "namespaceOverride", instance.Namespace)
|
||
setExistingNestedStringValue(instance.Values, "global", "namespace_override", instance.Namespace)
|
||
}
|
||
|
||
func setExistingStringValue(values map[string]interface{}, key, namespace string) {
|
||
if _, ok := values[key]; ok {
|
||
values[key] = namespace
|
||
}
|
||
}
|
||
|
||
func setExistingNestedStringValue(values map[string]interface{}, parent, key, namespace string) {
|
||
child, ok := values[parent].(map[string]interface{})
|
||
if !ok {
|
||
return
|
||
}
|
||
if _, ok := child[key]; ok {
|
||
child[key] = namespace
|
||
}
|
||
}
|
||
|
||
func (s *InstanceService) applyNamespacePolicy(ctx context.Context, principal *authz.Principal, cluster *entity.Cluster, instance *entity.Instance) error {
|
||
if principal.IsAdmin() {
|
||
if isProtectedSystemNamespace(instance.Namespace) {
|
||
return entity.ErrInvalidNamespace
|
||
}
|
||
return nil
|
||
}
|
||
if cluster.Visibility != authz.VisibilityPrivate || cluster.OwnerID != principal.UserID {
|
||
namespace := principal.Namespace
|
||
if namespace == "" {
|
||
namespace = entity.NamespaceForWorkspace(principal.WorkspaceName)
|
||
}
|
||
if s.bindingRepo != nil {
|
||
if binding, err := s.bindingRepo.Get(ctx, principal.WorkspaceID, cluster.ID); err == nil && binding != nil && binding.Namespace != "" {
|
||
namespace = binding.Namespace
|
||
}
|
||
}
|
||
if instance.Namespace != "" && instance.Namespace != namespace {
|
||
return entity.ErrForbidden
|
||
}
|
||
instance.Namespace = namespace
|
||
return nil
|
||
}
|
||
if isReservedNamespace(instance.Namespace) {
|
||
return entity.ErrInvalidNamespace
|
||
}
|
||
if instance.Namespace == "" {
|
||
if cluster.DefaultNamespace != "" {
|
||
instance.Namespace = cluster.DefaultNamespace
|
||
} else if principal.Namespace != "" {
|
||
instance.Namespace = principal.Namespace
|
||
} else {
|
||
instance.Namespace = entity.NamespaceForWorkspace(principal.Username)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *InstanceService) ensureTenantForInstance(ctx context.Context, principal *authz.Principal, cluster *entity.Cluster, instance *entity.Instance) (*entity.WorkspaceClusterBinding, error) {
|
||
if principal.IsAdmin() || s.workspaceRepo == nil || s.tenantClient == nil {
|
||
return nil, nil
|
||
}
|
||
workspace, err := s.workspaceRepo.GetByID(ctx, principal.WorkspaceID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if workspace.Status == entity.WorkspaceSuspended {
|
||
return nil, entity.ErrWorkspaceSuspended
|
||
}
|
||
binding := &entity.WorkspaceClusterBinding{
|
||
ID: uuid.New().String(),
|
||
WorkspaceID: workspace.ID,
|
||
ClusterID: cluster.ID,
|
||
Namespace: instance.Namespace,
|
||
ServiceAccount: workspace.K8sSAName,
|
||
QuotaCPU: strings.TrimSpace(workspace.QuotaCPU),
|
||
QuotaMemory: strings.TrimSpace(workspace.QuotaMemory),
|
||
QuotaGPU: zeroIfEmptyQuota(workspace.QuotaGPU),
|
||
QuotaGPUMem: zeroIfEmptyQuota(workspace.QuotaGPUMem),
|
||
Status: "active",
|
||
CreatedAt: time.Now(),
|
||
UpdatedAt: time.Now(),
|
||
}
|
||
if s.bindingRepo != nil {
|
||
if existing, err := s.bindingRepo.Get(ctx, workspace.ID, cluster.ID); err == nil && existing != nil {
|
||
binding.ID = existing.ID
|
||
binding.CreatedAt = existing.CreatedAt
|
||
if existing.Namespace != "" {
|
||
binding.Namespace = existing.Namespace
|
||
instance.Namespace = existing.Namespace
|
||
enforceNamespaceValues(instance)
|
||
}
|
||
if existing.ServiceAccount != "" {
|
||
binding.ServiceAccount = existing.ServiceAccount
|
||
}
|
||
if existing.Status != "" {
|
||
binding.Status = existing.Status
|
||
}
|
||
}
|
||
}
|
||
tenantBinding := tenantBindingFromWorkspaceClusterBinding(binding)
|
||
if err := s.tenantClient.EnsureTenant(ctx, cluster, tenantBinding); err != nil {
|
||
return nil, err
|
||
}
|
||
if s.bindingRepo != nil {
|
||
if err := s.bindingRepo.Upsert(ctx, binding); err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
return binding, nil
|
||
}
|
||
|
||
func (s *InstanceService) precheckInstanceQuota(ctx context.Context, principal *authz.Principal, cluster *entity.Cluster, binding *entity.WorkspaceClusterBinding, target, current *entity.Instance) error {
|
||
if principal.IsAdmin() || s.workspaceRepo == nil || s.helmClient == nil {
|
||
return nil
|
||
}
|
||
workspace, err := s.workspaceRepo.GetByID(ctx, principal.WorkspaceID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if workspace.Status == entity.WorkspaceSuspended {
|
||
return entity.ErrWorkspaceSuspended
|
||
}
|
||
if binding == nil {
|
||
binding = &entity.WorkspaceClusterBinding{
|
||
WorkspaceID: principal.WorkspaceID,
|
||
ClusterID: cluster.ID,
|
||
Namespace: target.Namespace,
|
||
QuotaCPU: strings.TrimSpace(workspace.QuotaCPU),
|
||
QuotaMemory: strings.TrimSpace(workspace.QuotaMemory),
|
||
QuotaGPU: zeroIfEmptyQuota(workspace.QuotaGPU),
|
||
QuotaGPUMem: zeroIfEmptyQuota(workspace.QuotaGPUMem),
|
||
}
|
||
}
|
||
var usage *repository.ResourceQuotaUsage
|
||
if s.tenantClient != nil {
|
||
tenantBinding := tenantBindingFromWorkspaceClusterBinding(binding)
|
||
quotaUsage, err := s.tenantClient.GetResourceQuotaUsage(ctx, cluster, tenantBinding)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
usage = quotaUsage
|
||
}
|
||
result, err := NewQuotaPrecheckService(s.helmClient).EstimateAndCompareBinding(ctx, cluster, binding, usage, target, current)
|
||
if err == nil {
|
||
return nil
|
||
}
|
||
if errors.Is(err, ErrQuotaExceeded) && result != nil {
|
||
return fmt.Errorf("%w: %s", ErrQuotaExceeded, formatQuotaExceeded(result.Exceeded))
|
||
}
|
||
return err
|
||
}
|
||
|
||
func formatQuotaExceeded(exceeded []QuotaExceededResource) string {
|
||
if len(exceeded) == 0 {
|
||
return "requested resources exceed workspace quota"
|
||
}
|
||
parts := make([]string, 0, len(exceeded))
|
||
for _, item := range exceeded {
|
||
parts = append(parts, fmt.Sprintf("%s required=%s quota=%s", item.Name, item.Required, item.Hard))
|
||
}
|
||
return strings.Join(parts, "; ")
|
||
}
|
||
|
||
func instanceResourceQuotaHard(workspace *entity.Workspace) corev1.ResourceList {
|
||
hard := corev1.ResourceList{}
|
||
addQuantity := func(name corev1.ResourceName, value string) {
|
||
value = normalizeStandardQuotaQuantity(value)
|
||
if value == "" {
|
||
return
|
||
}
|
||
if quantity, err := resource.ParseQuantity(value); err == nil {
|
||
hard[name] = quantity
|
||
}
|
||
}
|
||
addGPUMemoryQuantity := func(value string) {
|
||
value, err := normalizeGPUMemoryQuota(value)
|
||
if err != nil || value == "" {
|
||
return
|
||
}
|
||
if quantity, err := resource.ParseQuantity(value); err == nil {
|
||
hard[corev1.ResourceName("requests.nvidia.com/gpumem")] = quantity
|
||
}
|
||
}
|
||
if workspace == nil {
|
||
return hard
|
||
}
|
||
addQuantity(corev1.ResourceName("requests.cpu"), workspace.QuotaCPU)
|
||
addQuantity(corev1.ResourceName("requests.memory"), workspace.QuotaMemory)
|
||
addQuantity(corev1.ResourceName("requests.nvidia.com/gpu"), workspace.QuotaGPU)
|
||
addGPUMemoryQuantity(workspace.QuotaGPUMem)
|
||
return hard
|
||
}
|
||
|
||
func tenantBindingFromWorkspaceClusterBinding(binding *entity.WorkspaceClusterBinding) entity.TenantBinding {
|
||
namespace := ""
|
||
if binding != nil {
|
||
namespace = binding.Namespace
|
||
}
|
||
tenantBinding := entity.NewTenantBinding(namespace)
|
||
if binding != nil {
|
||
tenantBinding.ServiceAccountName = binding.ServiceAccount
|
||
tenantBinding.ResourceQuotaHard = bindingQuotaHard(binding)
|
||
}
|
||
return tenantBinding
|
||
}
|
||
|
||
func zeroIfEmptyQuota(value string) string {
|
||
if strings.TrimSpace(value) == "" {
|
||
return "0"
|
||
}
|
||
return strings.TrimSpace(value)
|
||
}
|
||
|
||
func cloneInstanceForQuota(instance *entity.Instance) *entity.Instance {
|
||
if instance == nil {
|
||
return nil
|
||
}
|
||
cloned := *instance
|
||
cloned.SetValues(copyValues(instance.Values))
|
||
return &cloned
|
||
}
|
||
|
||
func copyValues(values map[string]interface{}) map[string]interface{} {
|
||
if values == nil {
|
||
return nil
|
||
}
|
||
copied := make(map[string]interface{}, len(values))
|
||
for key, value := range values {
|
||
copied[key] = value
|
||
}
|
||
return copied
|
||
}
|
||
|
||
func isReservedNamespace(namespace string) bool {
|
||
switch namespace {
|
||
case "default", "kube-system", "kube-public", "kube-node-lease":
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func isProtectedSystemNamespace(namespace string) bool {
|
||
switch namespace {
|
||
case "kube-system", "kube-public", "kube-node-lease":
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
// executeAndSyncInstall 异步执行安装并监控状态
|
||
func (s *InstanceService) executeAndSyncInstall(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
|
||
// 执行 Helm 安装
|
||
if err := s.helmClient.Install(ctx, cluster, instance); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm install failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 安装成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationInstall)
|
||
}
|
||
|
||
// executeAndSyncUpgrade 异步执行升级并监控状态
|
||
func (s *InstanceService) executeAndSyncUpgrade(ctx context.Context, instanceID string, cluster *entity.Cluster, registry *entity.Registry, instance *entity.Instance) {
|
||
// 执行 Helm 升级
|
||
if err := s.helmClient.Upgrade(ctx, cluster, instance); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm upgrade failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 升级成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, instance.Name, instance.Namespace, entity.OperationUpgrade)
|
||
}
|
||
|
||
// executeAndSyncRollback 异步执行回滚并监控状态
|
||
func (s *InstanceService) executeAndSyncRollback(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, revision int) {
|
||
// 执行 Helm 回滚
|
||
if err := s.helmClient.Rollback(ctx, cluster, releaseName, namespace, revision); err != nil {
|
||
// 更新实例状态为失败
|
||
instance, updateErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if updateErr == nil && instance != nil {
|
||
instance.MarkFailure("Helm rollback failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 回滚成功后,同步状态
|
||
s.syncInstanceStatus(ctx, instanceID, cluster, releaseName, namespace, entity.OperationRollback)
|
||
}
|
||
|
||
// executeAndSyncUninstall 异步执行卸载并监控状态
|
||
func (s *InstanceService) executeAndSyncUninstall(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string) {
|
||
// 执行 Helm 卸载
|
||
err := s.helmClient.Uninstall(ctx, cluster, releaseName, namespace)
|
||
|
||
// 获取实例
|
||
instance, getErr := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if getErr != nil {
|
||
return
|
||
}
|
||
|
||
if err != nil {
|
||
// 如果错误不是"未找到",则标记为失败
|
||
if !errors.Is(err, entity.ErrInstanceNotFound) {
|
||
instance.MarkFailure("Helm uninstall failed", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
} else {
|
||
// 如果未找到,说明已经卸载,直接删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 卸载成功,标记为已卸载
|
||
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Instance uninstalled successfully")
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
|
||
// 验证卸载是否完成:尝试获取状态,如果获取不到说明已卸载
|
||
time.Sleep(3 * time.Second)
|
||
_, statusErr := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
|
||
if statusErr != nil {
|
||
// 无法获取状态,说明已卸载,删除数据库记录
|
||
_ = s.instanceRepo.Delete(ctx, instanceID)
|
||
} else {
|
||
// 仍然可以获取状态,可能还在卸载中,继续等待
|
||
// 设置状态为 uninstalled,但不删除记录,让用户手动删除或等待自动清理
|
||
instance.MarkSuccess(entity.StatusUninstalled, instance.Revision, "Uninstall in progress")
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
}
|
||
|
||
// syncInstanceStatus 同步实例状态(定期检查 Helm 状态并更新数据库)
|
||
func (s *InstanceService) syncInstanceStatus(ctx context.Context, instanceID string, cluster *entity.Cluster, releaseName, namespace string, operation entity.InstanceOperation) {
|
||
maxAttempts := 30 // 最多尝试30次(约5分钟)
|
||
interval := 10 * time.Second // 每10秒检查一次
|
||
|
||
for i := 0; i < maxAttempts; i++ {
|
||
time.Sleep(interval)
|
||
|
||
// 获取数据库中的实例
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err != nil {
|
||
// 实例不存在,停止同步
|
||
return
|
||
}
|
||
|
||
// 从 Helm 获取实时状态
|
||
liveStatus, err := s.helmClient.GetStatus(ctx, cluster, releaseName, namespace)
|
||
if err != nil {
|
||
// 如果获取状态失败,可能是还在部署中,继续等待
|
||
if i < maxAttempts-1 {
|
||
continue
|
||
}
|
||
// 最后一次尝试失败,标记为失败
|
||
instance.MarkFailure("Failed to get status from Helm", err)
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return
|
||
}
|
||
|
||
// 根据操作类型和 Helm 状态更新实例状态
|
||
shouldUpdate := false
|
||
switch operation {
|
||
case entity.OperationInstall:
|
||
// 安装操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance deployed successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Installation failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
case entity.OperationUpgrade:
|
||
// 升级操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance upgraded successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Upgrade failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
case entity.OperationRollback:
|
||
// 回滚操作:如果 Helm 状态是 deployed,则更新为 deployed
|
||
if liveStatus.Status == entity.StatusDeployed {
|
||
instance.MarkSuccess(entity.StatusDeployed, liveStatus.Revision, "Instance rolled back successfully")
|
||
shouldUpdate = true
|
||
} else if liveStatus.Status == entity.StatusFailed {
|
||
instance.MarkFailure("Rollback failed", fmt.Errorf("Helm status: %s", liveStatus.Status))
|
||
shouldUpdate = true
|
||
}
|
||
}
|
||
|
||
// 如果状态已更新为最终状态,停止同步
|
||
if shouldUpdate {
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
return
|
||
}
|
||
|
||
// 如果状态已经是最终状态(deployed 或 failed),停止同步
|
||
if instance.Status == entity.StatusDeployed || instance.Status == entity.StatusFailed {
|
||
return
|
||
}
|
||
}
|
||
|
||
// 超时,标记为失败
|
||
instance, err := s.instanceRepo.GetByID(ctx, instanceID)
|
||
if err == nil && instance != nil {
|
||
instance.MarkFailure("Operation timeout", fmt.Errorf("Status sync timeout after %d attempts", maxAttempts))
|
||
_ = s.instanceRepo.Update(ctx, instance)
|
||
}
|
||
}
|