Files
ocdp-go/backend/internal/adapter/output/persistence/postgres/cluster_repository.go
Ivan087 47849042a7 feat: complete E2E deployment flow with storage layered config and values template versioning
- Instance deployment: charts browser, deploy modal, instances list
- Values Template version management (create/history/rollback)
- Storage layered config (cluster > workspace > shared priority)
- Cluster credential decryptIfNeeded for mixed encrypted/plaintext kubeconfig
- YAML syntax validation (client-side + server-side warning)
- Frontend: charts, instances, storage, templates, admin pages
- Backend: storage service, instance service, cluster service, helm client
- Multi-Tenant Kubeconfig.md: added by user
2026-04-30 16:31:00 +08:00

400 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package postgres
import (
"context"
"database/sql"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/ocdp/cluster-service/internal/domain/entity"
"github.com/ocdp/cluster-service/internal/domain/repository"
"github.com/ocdp/cluster-service/internal/pkg/crypto"
)
// ClusterRepository PostgreSQL 集群仓储实现
type ClusterRepository struct {
db *DB
encryptor crypto.Encryptor
}
// NewClusterRepository 创建 PostgreSQL 集群仓储
func NewClusterRepository(db *DB, encryptor crypto.Encryptor) repository.ClusterRepository {
return &ClusterRepository{
db: db,
encryptor: encryptor,
}
}
// Create 创建集群
func (r *ClusterRepository) Create(ctx context.Context, cluster *entity.Cluster) error {
if cluster.ID == "" {
cluster.ID = uuid.New().String()
}
// 设置默认值
if cluster.IsolationMode == "" {
cluster.IsolationMode = entity.IsolationModeNamespace
}
// 加密敏感数据
encryptedCAData, err := r.encryptor.Encrypt(cluster.CAData)
if err != nil {
return fmt.Errorf("failed to encrypt CA data: %w", err)
}
encryptedCertData, err := r.encryptor.Encrypt(cluster.CertData)
if err != nil {
return fmt.Errorf("failed to encrypt cert data: %w", err)
}
encryptedKeyData, err := r.encryptor.Encrypt(cluster.KeyData)
if err != nil {
return fmt.Errorf("failed to encrypt key data: %w", err)
}
encryptedToken, err := r.encryptor.Encrypt(cluster.Token)
if err != nil {
return fmt.Errorf("failed to encrypt token: %w", err)
}
query := `
INSERT INTO clusters (id, workspace_id, owner_id, name, host, ca_data, cert_data, key_data, token, description, isolation_mode, default_namespace, is_shared, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
`
_, err = r.db.conn.ExecContext(ctx, query,
cluster.ID,
cluster.WorkspaceID,
cluster.OwnerID,
cluster.Name,
cluster.Host,
encryptedCAData,
encryptedCertData,
encryptedKeyData,
encryptedToken,
cluster.Description,
cluster.IsolationMode,
cluster.DefaultNamespace,
cluster.IsShared,
cluster.CreatedAt,
cluster.UpdatedAt,
)
if err != nil {
return fmt.Errorf("failed to create cluster: %w", err)
}
return nil
}
// GetByID 根据 ID 获取集群
func (r *ClusterRepository) GetByID(ctx context.Context, id string) (*entity.Cluster, error) {
query := `
SELECT id, workspace_id, owner_id, name, host, ca_data, cert_data, key_data, token, description, isolation_mode, default_namespace, is_shared, created_at, updated_at
FROM clusters
WHERE id = $1
`
cluster := &entity.Cluster{}
var encryptedCAData, encryptedCertData, encryptedKeyData, encryptedToken string
err := r.db.conn.QueryRowContext(ctx, query, id).Scan(
&cluster.ID,
&cluster.WorkspaceID,
&cluster.OwnerID,
&cluster.Name,
&cluster.Host,
&encryptedCAData,
&encryptedCertData,
&encryptedKeyData,
&encryptedToken,
&cluster.Description,
&cluster.IsolationMode,
&cluster.DefaultNamespace,
&cluster.IsShared,
&cluster.CreatedAt,
&cluster.UpdatedAt,
)
if err == sql.ErrNoRows {
return nil, entity.ErrClusterNotFound
}
if err != nil {
return nil, fmt.Errorf("failed to get cluster: %w", err)
}
// 解密敏感数据(检测 kubeconfig 格式则跳过解密)
cluster.CAData = r.decryptIfNeeded(encryptedCAData, "ca_data")
cluster.CertData = r.decryptIfNeeded(encryptedCertData, "cert_data")
cluster.KeyData = r.decryptIfNeeded(encryptedKeyData, "key_data")
cluster.Token = r.decryptIfNeeded(encryptedToken, "token")
return cluster, nil
}
// GetByName 根据名称获取集群
func (r *ClusterRepository) GetByName(ctx context.Context, name string) (*entity.Cluster, error) {
query := `
SELECT id, workspace_id, owner_id, name, host, ca_data, cert_data, key_data, token, description, isolation_mode, default_namespace, is_shared, created_at, updated_at
FROM clusters
WHERE name = $1
`
cluster := &entity.Cluster{}
var encryptedCAData, encryptedCertData, encryptedKeyData, encryptedToken string
err := r.db.conn.QueryRowContext(ctx, query, name).Scan(
&cluster.ID,
&cluster.WorkspaceID,
&cluster.OwnerID,
&cluster.Name,
&cluster.Host,
&encryptedCAData,
&encryptedCertData,
&encryptedKeyData,
&encryptedToken,
&cluster.Description,
&cluster.IsolationMode,
&cluster.DefaultNamespace,
&cluster.IsShared,
&cluster.CreatedAt,
&cluster.UpdatedAt,
)
if err == sql.ErrNoRows {
return nil, entity.ErrClusterNotFound
}
if err != nil {
return nil, fmt.Errorf("failed to get cluster: %w", err)
}
// 解密敏感数据(检测 kubeconfig 格式则跳过解密)
cluster.CAData = r.decryptIfNeeded(encryptedCAData, "ca_data")
cluster.CertData = r.decryptIfNeeded(encryptedCertData, "cert_data")
cluster.KeyData = r.decryptIfNeeded(encryptedKeyData, "key_data")
cluster.Token = r.decryptIfNeeded(encryptedToken, "token")
return cluster, nil
}
// decryptIfNeeded 解密数据。如果数据以 "apiVersion:" 或 "kind:" 开头kubeconfig 格式),
// 则跳过解密直接返回原值。
func (r *ClusterRepository) decryptIfNeeded(data string, fieldName string) string {
if data == "" {
return ""
}
// 检测 kubeconfig 格式(明文 YAML
if (len(data) > 10 && data[:11] == "apiVersion:") ||
(len(data) > 5 && data[:5] == "kind:") {
return data
}
// 否则尝试解密
decrypted, err := r.encryptor.Decrypt(data)
if err != nil {
log.Printf("[ClusterRepository] WARNING: failed to decrypt %s for field %s: %v (field will be empty)", data[:min(50, len(data))], fieldName, err)
return ""
}
return decrypted
}
// Update 更新集群
func (r *ClusterRepository) Update(ctx context.Context, cluster *entity.Cluster) error {
cluster.UpdatedAt = time.Now()
// 加密敏感数据
encryptedCAData, err := r.encryptor.Encrypt(cluster.CAData)
if err != nil {
return fmt.Errorf("failed to encrypt CA data: %w", err)
}
encryptedCertData, err := r.encryptor.Encrypt(cluster.CertData)
if err != nil {
return fmt.Errorf("failed to encrypt cert data: %w", err)
}
encryptedKeyData, err := r.encryptor.Encrypt(cluster.KeyData)
if err != nil {
return fmt.Errorf("failed to encrypt key data: %w", err)
}
encryptedToken, err := r.encryptor.Encrypt(cluster.Token)
if err != nil {
return fmt.Errorf("failed to encrypt token: %w", err)
}
query := `
UPDATE clusters
SET name = $1, host = $2, ca_data = $3, cert_data = $4, key_data = $5,
token = $6, description = $7, isolation_mode = $8, default_namespace = $9,
is_shared = $10, updated_at = $11
WHERE id = $12
`
result, err := r.db.conn.ExecContext(ctx, query,
cluster.Name,
cluster.Host,
encryptedCAData,
encryptedCertData,
encryptedKeyData,
encryptedToken,
cluster.Description,
cluster.IsolationMode,
cluster.DefaultNamespace,
cluster.IsShared,
cluster.UpdatedAt,
cluster.ID,
)
if err != nil {
return fmt.Errorf("failed to update cluster: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get affected rows: %w", err)
}
if rows == 0 {
return entity.ErrClusterNotFound
}
return nil
}
// Delete 删除集群
func (r *ClusterRepository) Delete(ctx context.Context, id string) error {
query := `DELETE FROM clusters WHERE id = $1`
result, err := r.db.conn.ExecContext(ctx, query, id)
if err != nil {
return fmt.Errorf("failed to delete cluster: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get affected rows: %w", err)
}
if rows == 0 {
return entity.ErrClusterNotFound
}
return nil
}
// List 列出所有集群
func (r *ClusterRepository) List(ctx context.Context) ([]*entity.Cluster, error) {
query := `
SELECT id, workspace_id, owner_id, name, host, ca_data, cert_data, key_data, token, description, isolation_mode, default_namespace, is_shared, created_at, updated_at
FROM clusters
ORDER BY created_at DESC
`
rows, err := r.db.conn.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to list clusters: %w", err)
}
defer rows.Close()
return r.scanClusters(rows)
}
// GetByWorkspace 获取 workspace 的所有集群(包括共享集群)
func (r *ClusterRepository) GetByWorkspace(ctx context.Context, workspaceID string) ([]*entity.Cluster, error) {
query := `
SELECT id, workspace_id, owner_id, name, host, ca_data, cert_data, key_data, token, description, isolation_mode, default_namespace, is_shared, created_at, updated_at
FROM clusters
WHERE workspace_id = $1 OR is_shared = TRUE
ORDER BY is_shared, created_at DESC
`
rows, err := r.db.conn.QueryContext(ctx, query, workspaceID)
if err != nil {
return nil, fmt.Errorf("failed to list clusters by workspace: %w", err)
}
defer rows.Close()
return r.scanClusters(rows)
}
// GetShared 获取所有共享集群
func (r *ClusterRepository) GetShared(ctx context.Context) ([]*entity.Cluster, error) {
query := `
SELECT id, workspace_id, owner_id, name, host, ca_data, cert_data, key_data, token, description, isolation_mode, default_namespace, is_shared, created_at, updated_at
FROM clusters
WHERE is_shared = TRUE
ORDER BY created_at DESC
`
rows, err := r.db.conn.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to list shared clusters: %w", err)
}
defer rows.Close()
return r.scanClusters(rows)
}
// scanClusters 扫描多行结果
func (r *ClusterRepository) scanClusters(rows *sql.Rows) ([]*entity.Cluster, error) {
clusters := make([]*entity.Cluster, 0)
for rows.Next() {
cluster := &entity.Cluster{}
var (
encryptedCAData, encryptedCertData, encryptedKeyData, encryptedToken sql.NullString
workspaceID, ownerID, defaultNamespace sql.NullString
)
err := rows.Scan(
&cluster.ID,
&workspaceID,
&ownerID,
&cluster.Name,
&cluster.Host,
&encryptedCAData,
&encryptedCertData,
&encryptedKeyData,
&encryptedToken,
&cluster.Description,
&cluster.IsolationMode,
&defaultNamespace,
&cluster.IsShared,
&cluster.CreatedAt,
&cluster.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to scan cluster: %w", err)
}
// 处理 NULL 值
cluster.WorkspaceID = workspaceID.String
cluster.OwnerID = ownerID.String
cluster.DefaultNamespace = defaultNamespace.String
// 解密敏感数据(检测 kubeconfig 格式则跳过解密)
if encryptedCAData.Valid {
cluster.CAData = r.decryptIfNeeded(encryptedCAData.String, "ca_data")
}
if encryptedCertData.Valid {
cluster.CertData = r.decryptIfNeeded(encryptedCertData.String, "cert_data")
}
if encryptedKeyData.Valid {
cluster.KeyData = r.decryptIfNeeded(encryptedKeyData.String, "key_data")
}
if encryptedToken.Valid {
cluster.Token = r.decryptIfNeeded(encryptedToken.String, "token")
}
clusters = append(clusters, cluster)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration error: %w", err)
}
return clusters, nil
}