fix: direct K8s scaling, replicas from K8s API, button labels, modify fetch
- Add ScaleClient using K8s API (like kubectl scale deploy --replicas=N) - ScaleDeployment: patch Deployment.Spec.Replicas directly - GetDeploymentReplicas: query actual K8s deployment replicas - Search by labels then fallback to deployment name match - Wire ScaleClient to InstanceService via SetScaleClient in main.go - ModifyModal: fetch full instance detail on open (list excludes values) - InstanceCard: add text labels to action buttons (Entries/Diag/Modify/Delete) - Text visible on sm+ screens, icon-only on xs
This commit is contained in:
@ -566,17 +566,6 @@ func formatTime(value time.Time) string {
|
||||
}
|
||||
|
||||
func convertInstanceResponse(instance *entity.Instance, includeValues bool) *dto.InstanceResponse {
|
||||
replicas := 0
|
||||
if v, ok := instance.Values["replicaCount"]; ok {
|
||||
switch n := v.(type) {
|
||||
case float64:
|
||||
replicas = int(n)
|
||||
case int:
|
||||
replicas = n
|
||||
case int64:
|
||||
replicas = int(n)
|
||||
}
|
||||
}
|
||||
response := &dto.InstanceResponse{
|
||||
ID: instance.ID,
|
||||
ClusterID: instance.ClusterID,
|
||||
@ -594,7 +583,7 @@ func convertInstanceResponse(instance *entity.Instance, includeValues bool) *dto
|
||||
LastOperation: string(instance.LastOperation),
|
||||
LastError: instance.LastError,
|
||||
Revision: instance.Revision,
|
||||
Replicas: replicas,
|
||||
Replicas: 0,
|
||||
AllowedActions: []string{"view", "update", "delete"},
|
||||
CreatedAt: instance.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
|
||||
UpdatedAt: instance.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"),
|
||||
|
||||
134
backend/internal/adapter/output/k8s/scale_client.go
Normal file
134
backend/internal/adapter/output/k8s/scale_client.go
Normal file
@ -0,0 +1,134 @@
|
||||
package k8s
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ocdp/cluster-service/internal/domain/entity"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// ScaleClient provides K8s-native workload scaling (bypasses Helm)
|
||||
type ScaleClient struct{}
|
||||
|
||||
// NewScaleClient creates a ScaleClient
|
||||
func NewScaleClient() *ScaleClient {
|
||||
return &ScaleClient{}
|
||||
}
|
||||
|
||||
// findDeployment searches for a deployment matching the release name using various label strategies.
|
||||
func (c *ScaleClient) findDeployment(ctx context.Context, clientset *kubernetes.Clientset, namespace, releaseName string) (*appsv1.Deployment, error) {
|
||||
labelQueries := []string{
|
||||
fmt.Sprintf("app.kubernetes.io/instance=%s", releaseName),
|
||||
fmt.Sprintf("release=%s", releaseName),
|
||||
fmt.Sprintf("app=%s", releaseName),
|
||||
fmt.Sprintf("app.kubernetes.io/name=%s", releaseName),
|
||||
}
|
||||
|
||||
for _, query := range labelQueries {
|
||||
deployments, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: query,
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if len(deployments.Items) > 0 {
|
||||
return &deployments.Items[0], nil
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: get by name directly
|
||||
dep, err := clientset.AppsV1().Deployments(namespace).Get(ctx, releaseName, metav1.GetOptions{})
|
||||
if err == nil && dep != nil {
|
||||
return dep, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetDeploymentReplicas returns the current replicas count for a deployment.
|
||||
func (c *ScaleClient) GetDeploymentReplicas(ctx context.Context, cluster *entity.Cluster, namespace, releaseName string) (int32, error) {
|
||||
clientset, err := c.clientsetForCluster(cluster)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to create k8s client: %w", err)
|
||||
}
|
||||
|
||||
dep, err := c.findDeployment(ctx, clientset, namespace, releaseName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if dep != nil && dep.Spec.Replicas != nil {
|
||||
return *dep.Spec.Replicas, nil
|
||||
}
|
||||
|
||||
// Fallback to statefulsets
|
||||
return c.getStatefulSetReplicas(ctx, clientset, namespace, releaseName)
|
||||
}
|
||||
|
||||
func (c *ScaleClient) getStatefulSetReplicas(ctx context.Context, clientset *kubernetes.Clientset, namespace, releaseName string) (int32, error) {
|
||||
stsList, err := clientset.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: fmt.Sprintf("app.kubernetes.io/instance=%s", releaseName),
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(stsList.Items) == 0 {
|
||||
return 0, nil // No replicable workload found
|
||||
}
|
||||
sts := stsList.Items[0]
|
||||
if sts.Spec.Replicas != nil {
|
||||
return *sts.Spec.Replicas, nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// ScaleDeployment scales the K8s deployment directly (bypasses Helm).
|
||||
func (c *ScaleClient) ScaleDeployment(ctx context.Context, cluster *entity.Cluster, namespace, releaseName string, replicas int32) error {
|
||||
clientset, err := c.clientsetForCluster(cluster)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create k8s client: %w", err)
|
||||
}
|
||||
|
||||
dep, err := c.findDeployment(ctx, clientset, namespace, releaseName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if dep != nil {
|
||||
dep.Spec.Replicas = &replicas
|
||||
_, err = clientset.AppsV1().Deployments(namespace).Update(ctx, dep, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to scale deployment %s: %w", dep.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Try StatefulSets
|
||||
stsList, err := clientset.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: fmt.Sprintf("app.kubernetes.io/instance=%s", releaseName),
|
||||
})
|
||||
if err == nil && len(stsList.Items) > 0 {
|
||||
sts := stsList.Items[0]
|
||||
sts.Spec.Replicas = &replicas
|
||||
_, err = clientset.AppsV1().StatefulSets(namespace).Update(ctx, &sts, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to scale statefulset %s: %w", sts.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("no deployment or statefulset found for release %s in namespace %s", releaseName, namespace)
|
||||
}
|
||||
|
||||
func (c *ScaleClient) clientsetForCluster(cluster *entity.Cluster) (*kubernetes.Clientset, error) {
|
||||
restConfig, err := restConfigFromCluster(cluster)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create rest config: %w", err)
|
||||
}
|
||||
clientset, err := kubernetes.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create clientset: %w", err)
|
||||
}
|
||||
return clientset, nil
|
||||
}
|
||||
@ -17,6 +17,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
)
|
||||
|
||||
// ScaleClient defines the interface for K8s-native workload scaling
|
||||
type ScaleClient interface {
|
||||
GetDeploymentReplicas(ctx context.Context, cluster *entity.Cluster, namespace, releaseName string) (int32, error)
|
||||
ScaleDeployment(ctx context.Context, cluster *entity.Cluster, namespace, releaseName string, replicas int32) error
|
||||
}
|
||||
|
||||
// InstanceService Helm 实例管理领域服务
|
||||
type InstanceService struct {
|
||||
instanceRepo repository.InstanceRepository
|
||||
@ -29,6 +35,7 @@ type InstanceService struct {
|
||||
diagClient repository.InstanceDiagnosticsClient
|
||||
workspaceRepo repository.WorkspaceRepository
|
||||
tenantClient repository.TenantKubeClient
|
||||
scaleClient ScaleClient
|
||||
}
|
||||
|
||||
// NewInstanceService 创建实例服务
|
||||
@ -60,6 +67,10 @@ func (s *InstanceService) SetDiagnosticsClient(client repository.InstanceDiagnos
|
||||
s.diagClient = client
|
||||
}
|
||||
|
||||
func (s *InstanceService) SetScaleClient(client ScaleClient) {
|
||||
s.scaleClient = client
|
||||
}
|
||||
|
||||
func (s *InstanceService) SetTenantProvisioning(workspaceRepo repository.WorkspaceRepository, tenantClient repository.TenantKubeClient) {
|
||||
s.workspaceRepo = workspaceRepo
|
||||
s.tenantClient = tenantClient
|
||||
@ -436,26 +447,44 @@ func (s *InstanceService) ScaleInstance(ctx context.Context, clusterID, instance
|
||||
return nil, entity.ErrClusterNotFound
|
||||
}
|
||||
|
||||
// Get existing Helm values and patch replicaCount
|
||||
vals, err := s.helmClient.GetValues(ctx, cluster, instance.Name, instance.Namespace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get current values: %w", err)
|
||||
}
|
||||
if vals == nil {
|
||||
vals = make(map[string]interface{})
|
||||
}
|
||||
vals["replicaCount"] = replicas
|
||||
|
||||
instance.SetValues(vals)
|
||||
instance.BeginOperation(entity.OperationUpgrade, fmt.Sprintf("Scaling to %d replicas", replicas))
|
||||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||||
return nil, err
|
||||
// Scale via K8s API directly (like kubectl scale deploy --replicas=N)
|
||||
if s.scaleClient != nil {
|
||||
if err := s.scaleClient.ScaleDeployment(ctx, cluster, instance.Namespace, instance.Name, int32(replicas)); err != nil {
|
||||
return nil, fmt.Errorf("failed to scale deployment: %w", err)
|
||||
}
|
||||
} else {
|
||||
// Fallback: Helm upgrade with replicaCount
|
||||
vals, err := s.helmClient.GetValues(ctx, cluster, instance.Name, instance.Namespace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get current values: %w", err)
|
||||
}
|
||||
if vals == nil {
|
||||
vals = make(map[string]interface{})
|
||||
}
|
||||
vals["replicaCount"] = replicas
|
||||
instance.SetValues(vals)
|
||||
instance.BeginOperation(entity.OperationUpgrade, fmt.Sprintf("Scaling to %d replicas", replicas))
|
||||
if err := s.instanceRepo.Update(ctx, instance); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, nil, instance)
|
||||
}
|
||||
|
||||
go s.executeAndSyncUpgrade(context.Background(), instance.ID, cluster, nil, instance)
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
// GetRunningReplicas returns the actual K8s deployment replicas count.
|
||||
func (s *InstanceService) GetRunningReplicas(ctx context.Context, cluster *entity.Cluster, instance *entity.Instance) int {
|
||||
if s.scaleClient == nil {
|
||||
return 0
|
||||
}
|
||||
r, err := s.scaleClient.GetDeploymentReplicas(ctx, cluster, instance.Namespace, instance.Name)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return int(r)
|
||||
}
|
||||
|
||||
// GetInstanceValuesDiff 获取实例当前 values 与 chart 默认 values 的差异
|
||||
func (s *InstanceService) GetInstanceValuesDiff(ctx context.Context, clusterID, instanceID string) (*dto.InstanceValuesDiffResponse, error) {
|
||||
principal, err := authz.RequirePrincipal(ctx)
|
||||
|
||||
Reference in New Issue
Block a user