Files
ocdp-go/backend/internal/domain/service/workspace_service.go
Ivan087 33ddaf97db fix: scale replicas in response, K8s metrics client, quota precheck, auth tests
- Add GetMetrics method to MetricsClient interface and implement cluster metrics API
- Add QuotaPrecheck service for validating resource quotas before deployment
- Add auth DTO with role/permission models and auth handler tests
- Add instance diagnostics: mounted NFS volumes, labels, annotations in pod diagnostics
- Update workspace handler with GetWorkspace endpoint and shared-user list
- Fix monitoring handler to use correct service method name
- Add tail_lines fallback in instance handler for snake_case query params
- Update nginx config for SSE log streaming support (no buffering)
- Add comprehensive test coverage: auth_service_test, auth_handler_test,
  auth_dto_test, metrics_client_test, quota_precheck_test
- Update error messages for quota validation and instance operations
- ModifyModal: fix YAML lineWidth:0, modified keys summary, delta-only submit
- InstanceCard: correctly disable scale-minus when replicas <= 0
- SidebarLayout: add hover transition for sidebar items
- Update todo.md and lessons.md with latest fixes
2026-05-20 16:56:29 +08:00

322 lines
10 KiB
Go

package service
import (
"context"
"sort"
"strings"
"time"
"github.com/google/uuid"
"github.com/ocdp/cluster-service/internal/domain/entity"
"github.com/ocdp/cluster-service/internal/domain/repository"
"github.com/ocdp/cluster-service/internal/pkg/authz"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
type WorkspaceService struct {
workspaceRepo repository.WorkspaceRepository
bindingRepo repository.WorkspaceClusterBindingRepository
clusterRepo repository.ClusterRepository
tenantClient repository.TenantKubeClient
auditRepo repository.AuditLogRepository
}
func NewWorkspaceService(
workspaceRepo repository.WorkspaceRepository,
bindingRepo repository.WorkspaceClusterBindingRepository,
clusterRepo repository.ClusterRepository,
tenantClient repository.TenantKubeClient,
auditRepo repository.AuditLogRepository,
) *WorkspaceService {
return &WorkspaceService{
workspaceRepo: workspaceRepo,
bindingRepo: bindingRepo,
clusterRepo: clusterRepo,
tenantClient: tenantClient,
auditRepo: auditRepo,
}
}
func (s *WorkspaceService) ListWorkspaces(ctx context.Context) ([]*entity.Workspace, error) {
principal, err := authz.RequirePrincipal(ctx)
if err != nil {
return nil, entity.ErrUnauthorized
}
if principal.IsAdmin() {
return s.workspaceRepo.List(ctx)
}
workspace, err := s.workspaceRepo.GetByID(ctx, principal.WorkspaceID)
if err != nil {
return nil, err
}
return []*entity.Workspace{workspace}, nil
}
func (s *WorkspaceService) CreateWorkspace(ctx context.Context, name string) (*entity.Workspace, error) {
principal, err := authz.RequirePrincipal(ctx)
if err != nil {
return nil, entity.ErrUnauthorized
}
if !principal.IsAdmin() {
return nil, entity.ErrForbidden
}
workspace := entity.NewWorkspace(name, principal.UserID)
workspace.ID = uuid.New().String()
if err := s.workspaceRepo.Create(ctx, workspace); err != nil {
return nil, err
}
s.audit(ctx, principal, "create", "workspace", workspace.ID, workspace.Name, nil)
return workspace, nil
}
func (s *WorkspaceService) EnsureClusterBinding(ctx context.Context, workspaceID, clusterID string) (*entity.WorkspaceClusterBinding, error) {
principal, err := authz.RequirePrincipal(ctx)
if err != nil {
return nil, entity.ErrUnauthorized
}
if !principal.IsAdmin() && workspaceID != principal.WorkspaceID {
return nil, entity.ErrForbidden
}
workspace, err := s.workspaceRepo.GetByID(ctx, workspaceID)
if err != nil {
return nil, err
}
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
if err != nil {
return nil, entity.ErrClusterNotFound
}
if !principal.IsAdmin() && !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) {
return nil, entity.ErrClusterNotFound
}
binding := &entity.WorkspaceClusterBinding{
ID: uuid.New().String(),
WorkspaceID: workspace.ID,
ClusterID: cluster.ID,
Namespace: workspace.K8sNamespace,
ServiceAccount: workspace.K8sSAName,
QuotaCPU: strings.TrimSpace(workspace.QuotaCPU),
QuotaMemory: strings.TrimSpace(workspace.QuotaMemory),
QuotaGPU: zeroIfEmptyQuota(workspace.QuotaGPU),
QuotaGPUMem: zeroIfEmptyQuota(workspace.QuotaGPUMem),
Status: "active",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
tenantBinding := entity.NewTenantBinding(binding.Namespace)
tenantBinding.ServiceAccountName = binding.ServiceAccount
tenantBinding.ResourceQuotaHard = bindingQuotaHard(binding)
if s.tenantClient != nil {
if err := s.tenantClient.EnsureTenant(ctx, cluster, tenantBinding); err != nil {
return nil, err
}
}
if err := s.bindingRepo.Upsert(ctx, binding); err != nil {
return nil, err
}
s.audit(ctx, principal, "init", "workspace_cluster_binding", binding.ID, binding.Namespace, map[string]interface{}{"cluster_id": clusterID})
return binding, nil
}
func (s *WorkspaceService) IssueKubeconfig(ctx context.Context, workspaceID, clusterID string, ttl time.Duration) (*entity.TenantKubeconfig, error) {
principal, err := authz.RequirePrincipal(ctx)
if err != nil {
return nil, entity.ErrUnauthorized
}
if !principal.IsAdmin() && workspaceID != principal.WorkspaceID {
return nil, entity.ErrForbidden
}
workspace, err := s.workspaceRepo.GetByID(ctx, workspaceID)
if err != nil {
return nil, err
}
if workspace.Status == entity.WorkspaceSuspended {
return nil, entity.ErrWorkspaceSuspended
}
cluster, err := s.clusterRepo.GetByID(ctx, clusterID)
if err != nil {
return nil, entity.ErrClusterNotFound
}
if !principal.IsAdmin() && !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) {
return nil, entity.ErrClusterNotFound
}
binding, err := s.bindingRepo.Get(ctx, workspaceID, clusterID)
if err != nil {
binding, err = s.EnsureClusterBinding(ctx, workspaceID, clusterID)
if err != nil {
return nil, err
}
} else {
binding.QuotaCPU = strings.TrimSpace(workspace.QuotaCPU)
binding.QuotaMemory = strings.TrimSpace(workspace.QuotaMemory)
binding.QuotaGPU = zeroIfEmptyQuota(workspace.QuotaGPU)
binding.QuotaGPUMem = zeroIfEmptyQuota(workspace.QuotaGPUMem)
binding.UpdatedAt = time.Now()
}
tenantBinding := entity.NewTenantBinding(binding.Namespace)
tenantBinding.ServiceAccountName = binding.ServiceAccount
tenantBinding.ResourceQuotaHard = bindingQuotaHard(binding)
if s.tenantClient != nil {
if err := s.tenantClient.EnsureTenant(ctx, cluster, tenantBinding); err != nil {
return nil, err
}
}
_ = s.bindingRepo.Upsert(ctx, binding)
kubeconfig, err := s.tenantClient.IssueKubeconfig(ctx, cluster, tenantBinding, ttl)
if err != nil {
return nil, err
}
s.audit(ctx, principal, "issue_kubeconfig", "workspace_cluster_binding", binding.ID, binding.Namespace, map[string]interface{}{"cluster_id": clusterID, "ttl_seconds": int64(entity.TenantTokenTTL(ttl).Seconds())})
return kubeconfig, nil
}
func resourceQuotaHard(workspace *entity.Workspace) corev1.ResourceList {
hard := corev1.ResourceList{}
addQuantity := func(name corev1.ResourceName, value string) {
value = normalizeStandardQuotaQuantity(value)
if value == "" {
return
}
if quantity, err := resource.ParseQuantity(value); err == nil {
hard[name] = quantity
}
}
addGPUMemoryQuantity := func(value string) {
value, err := normalizeGPUMemoryQuota(value)
if err != nil || value == "" {
return
}
if quantity, err := resource.ParseQuantity(value); err == nil {
hard[corev1.ResourceName("requests.nvidia.com/gpumem")] = quantity
}
}
if workspace == nil {
return hard
}
addQuantity(corev1.ResourceName("requests.cpu"), workspace.QuotaCPU)
addQuantity(corev1.ResourceName("requests.memory"), workspace.QuotaMemory)
addQuantity(corev1.ResourceName("requests.nvidia.com/gpu"), workspace.QuotaGPU)
addGPUMemoryQuantity(workspace.QuotaGPUMem)
return hard
}
func (s *WorkspaceService) IssueCurrentKubeconfig(ctx context.Context, requestedClusterID string, ttl time.Duration) (*entity.TenantKubeconfig, error) {
principal, err := authz.RequirePrincipal(ctx)
if err != nil {
return nil, entity.ErrUnauthorized
}
if requestedClusterID != "" {
return s.IssueKubeconfig(ctx, principal.WorkspaceID, requestedClusterID, ttl)
}
workspace, err := s.workspaceRepo.GetByID(ctx, principal.WorkspaceID)
if err != nil {
return nil, err
}
if workspace.DefaultClusterID != "" {
return s.IssueKubeconfig(ctx, principal.WorkspaceID, workspace.DefaultClusterID, ttl)
}
return s.IssueDefaultKubeconfig(ctx, ttl)
}
func (s *WorkspaceService) IssueDefaultKubeconfig(ctx context.Context, ttl time.Duration) (*entity.TenantKubeconfig, error) {
principal, err := authz.RequirePrincipal(ctx)
if err != nil {
return nil, entity.ErrUnauthorized
}
clusters, err := s.clusterRepo.List(ctx)
if err != nil {
return nil, err
}
candidates := make([]*entity.Cluster, 0, len(clusters))
for _, cluster := range clusters {
if !authz.CanReadResource(principal, cluster.WorkspaceID, cluster.OwnerID, cluster.Visibility) {
continue
}
switch cluster.Visibility {
case authz.VisibilityGlobalShared:
candidates = append(candidates, cluster)
case authz.VisibilityWorkspaceShared:
if cluster.WorkspaceID == principal.WorkspaceID {
candidates = append(candidates, cluster)
}
}
}
sort.SliceStable(candidates, func(i, j int) bool {
leftRank := defaultKubeconfigClusterRank(candidates[i])
rightRank := defaultKubeconfigClusterRank(candidates[j])
if leftRank != rightRank {
return leftRank < rightRank
}
return candidates[i].Name < candidates[j].Name
})
var firstIssueErr error
for _, cluster := range candidates {
if kubeconfig, err := s.IssueKubeconfig(ctx, principal.WorkspaceID, cluster.ID, ttl); err == nil {
return kubeconfig, nil
} else if firstIssueErr == nil {
firstIssueErr = err
}
}
if firstIssueErr != nil {
return nil, firstIssueErr
}
return nil, entity.ErrClusterNotFound
}
func defaultKubeconfigClusterRank(cluster *entity.Cluster) int {
switch cluster.Visibility {
case authz.VisibilityGlobalShared:
return 0
case authz.VisibilityWorkspaceShared:
return 1
default:
return 2
}
}
func (s *WorkspaceService) SuspendWorkspace(ctx context.Context, workspaceID string) error {
principal, err := authz.RequirePrincipal(ctx)
if err != nil {
return entity.ErrUnauthorized
}
if !principal.IsAdmin() {
return entity.ErrForbidden
}
workspace, err := s.workspaceRepo.GetByID(ctx, workspaceID)
if err != nil {
return err
}
workspace.Status = entity.WorkspaceSuspended
if err := s.workspaceRepo.Update(ctx, workspace); err != nil {
return err
}
clusters, _ := s.clusterRepo.List(ctx)
for _, cluster := range clusters {
binding, err := s.bindingRepo.Get(ctx, workspaceID, cluster.ID)
if err != nil {
continue
}
tenantBinding := entity.NewTenantBinding(binding.Namespace)
tenantBinding.ServiceAccountName = binding.ServiceAccount
_ = s.tenantClient.SuspendTenant(ctx, cluster, tenantBinding)
}
s.audit(ctx, principal, "suspend", "workspace", workspace.ID, workspace.Name, nil)
return nil
}
func (s *WorkspaceService) audit(ctx context.Context, principal *authz.Principal, action, resourceType, resourceID, resourceName string, details map[string]interface{}) {
if s.auditRepo == nil || principal == nil {
return
}
_ = s.auditRepo.Create(ctx, &entity.AuditLog{
WorkspaceID: principal.WorkspaceID,
UserID: principal.UserID,
Action: action,
ResourceType: resourceType,
ResourceID: resourceID,
ResourceName: resourceName,
Details: details,
CreatedAt: time.Now(),
})
}