package real import ( "archive/tar" "compress/gzip" "context" "encoding/json" "fmt" "io" "log" "net/http" "os" "path/filepath" "strings" "github.com/ocdp/cluster-service/internal/domain/entity" "github.com/ocdp/cluster-service/internal/domain/repository" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2/registry/remote" "oras.land/oras-go/v2/registry/remote/auth" ) // OCIClient 真实的 OCI 客户端实现(使用 ORAS) type OCIClient struct { httpClient *http.Client } // NewOCIClient 创建真实的 OCI 客户端 func NewOCIClient() repository.OCIClient { return &OCIClient{ httpClient: &http.Client{}, } } // getRegistry 创建 ORAS Registry 客户端 func (c *OCIClient) getRegistry(reg *entity.Registry) (*remote.Registry, error) { // 解析 Registry URL registryURL := strings.TrimPrefix(reg.URL, "https://") registryURL = strings.TrimPrefix(registryURL, "http://") registry, err := remote.NewRegistry(registryURL) if err != nil { return nil, fmt.Errorf("failed to create registry client: %w", err) } // 设置认证 - 优先使用 registry 自己的凭证,否则使用 .env 中的默认凭证 username := reg.Username password := reg.Password // 如果没有提供凭证,尝试从环境变量加载 if (username == "" || password == "") && strings.Contains(reg.URL, "harbor") { if envUser := os.Getenv("HARBOR_USERNAME"); envUser != "" { username = envUser } if envPass := os.Getenv("HARBOR_PASSWORD"); envPass != "" { password = envPass } } if username != "" && password != "" { registry.Client = &auth.Client{ Client: c.httpClient, Credential: auth.StaticCredential(registryURL, auth.Credential{ Username: username, Password: password, }), } } // 设置 PlainHTTP(如果是 insecure) registry.PlainHTTP = reg.Insecure return registry, nil } // ListRepositories 列出 Registry 中的所有 repositories // 优先使用 OCI _catalog API,失败时回退到 Harbor REST API v2 func (c *OCIClient) ListRepositories(ctx context.Context, registry *entity.Registry) ([]string, error) { repositories := make([]string, 0) // 尝试 OCI _catalog API reg, err := c.getRegistry(registry) log.Printf("[DEBUG ListRepositories] registry=%s, getRegistry err=%v", registry.URL, err) if err == nil { err = reg.Repositories(ctx, "", func(repos []string) error { log.Printf("[DEBUG ListRepositories] OCI got repos batch: %d", len(repos)) repositories = append(repositories, repos...) return nil }) log.Printf("[DEBUG ListRepositories] OCI reg.Repositories returned: err=%v, total_repos=%d", err, len(repositories)) } log.Printf("[DEBUG ListRepositories] post-OCI check: err=%v, repos_count=%d", err, len(repositories)) if err == nil && len(repositories) > 0 { log.Printf("[DEBUG ListRepositories] OCI success, returning %d repos", len(repositories)) return repositories, nil } // 回退: 使用 Harbor REST API v2 log.Printf("[Harbor Fallback] OCI failed (err=%v, repos=%d), checking if Harbor...", err, len(repositories)) log.Printf("[Harbor Fallback] registry.URL=%s, contains 'harbor'=%v", registry.URL, strings.Contains(registry.URL, "harbor")) if strings.Contains(registry.URL, "harbor") { log.Printf("[Harbor Fallback] Yes, this is Harbor! Calling Harbor REST API...") repos, fallbackErr := c.listHarborRepositories(registry) log.Printf("[Harbor Fallback] Got %d repos, err=%v", len(repos), fallbackErr) if fallbackErr == nil && len(repos) > 0 { log.Printf("[Harbor Fallback] Returning %d repos from Harbor API", len(repos)) return repos, nil } if err != nil { return nil, fmt.Errorf("failed to list repositories: %w", err) } return nil, fallbackErr } if err != nil { return nil, fmt.Errorf("failed to list repositories: %w", err) } return repositories, nil } // listHarborRepositories 使用 Harbor REST API v2 获取仓库列表 func (c *OCIClient) listHarborRepositories(registry *entity.Registry) ([]string, error) { // 解析 Harbor URL 基础地址 baseURL := registry.URL baseURL = strings.TrimSuffix(baseURL, "/") baseURL = strings.TrimPrefix(baseURL, "https://") baseURL = strings.TrimPrefix(baseURL, "http://") harborHost := "https://" + baseURL // 获取认证信息 username := registry.Username password := registry.Password if username == "" || password == "" { username = os.Getenv("HARBOR_USERNAME") password = os.Getenv("HARBOR_PASSWORD") } // 获取项目列表 projectsURL := harborHost + "/api/v2.0/projects" req, err := http.NewRequest("GET", projectsURL, nil) if err != nil { return nil, err } req.SetBasicAuth(username, password) resp, err := c.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("failed to list projects: status %d", resp.StatusCode) } var projects []struct { Name string `json:"name"` } if err := json.NewDecoder(resp.Body).Decode(&projects); err != nil { return nil, err } repositories := make([]string, 0) pageSize := 100 for _, project := range projects { page := 1 log.Printf("[listHarborRepositories] Processing project: %s", project.Name) for { reposURL := fmt.Sprintf("%s/api/v2.0/projects/%s/repositories?page=%d&page_size=%d", harborHost, project.Name, page, pageSize) req, err := http.NewRequest("GET", reposURL, nil) if err != nil { log.Printf("[listHarborRepositories] page %d: NewRequest error: %v", page, err) break } req.SetBasicAuth(username, password) resp, err := c.httpClient.Do(req) if err != nil { log.Printf("[listHarborRepositories] page %d: Do error: %v", page, err) break } if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 200)) resp.Body.Close() log.Printf("[listHarborRepositories] page %d: HTTP %d, body: %s", page, resp.StatusCode, string(bodyBytes)) break } var repos []struct { Name string `json:"name"` } if err := json.NewDecoder(resp.Body).Decode(&repos); err != nil { resp.Body.Close() log.Printf("[listHarborRepositories] page %d: Decode error: %v", page, err) break } resp.Body.Close() log.Printf("[listHarborRepositories] page %d: got %d repos", page, len(repos)) if len(repos) == 0 { break } for _, repo := range repos { repositories = append(repositories, repo.Name) } page++ } } log.Printf("[listHarborRepositories] Total repos collected: %d", len(repositories)) return repositories, nil } // ListArtifacts 列出指定 repository 的所有 artifacts // mediaTypeFilter: "all", "image", "chart", "other" - 使用模糊匹配过滤 func (c *OCIClient) ListArtifacts(ctx context.Context, registry *entity.Registry, repository, mediaTypeFilter string) ([]*entity.Artifact, error) { reg, err := c.getRegistry(registry) if err != nil { return nil, err } repo, err := reg.Repository(ctx, repository) if err != nil { return nil, fmt.Errorf("failed to get repository: %w", err) } artifacts := make([]*entity.Artifact, 0) err = repo.Tags(ctx, "", func(tags []string) error { for _, tag := range tags { // 获取 manifest 以获取更多信息 desc, err := repo.Resolve(ctx, tag) if err != nil { // 跳过无法解析的 tag continue } artifact := &entity.Artifact{ Repository: repository, Tag: tag, Digest: desc.Digest.String(), MediaType: desc.MediaType, Size: desc.Size, } // 尝试获取 config.mediaType 以更准确判断类型 if manifestBytes, err := repo.Fetch(ctx, desc); err == nil { defer manifestBytes.Close() if manifestData, err := io.ReadAll(manifestBytes); err == nil { var manifest map[string]interface{} if err := json.Unmarshal(manifestData, &manifest); err == nil { // 获取 config.mediaType if config, ok := manifest["config"].(map[string]interface{}); ok { if configMediaType, ok := config["mediaType"].(string); ok { artifact.ConfigType = configMediaType } } } } } // 使用智能类型判断(综合多种信息) artifact.DetermineType() // 应用 mediaType 过滤 if c.shouldIncludeArtifact(artifact, mediaTypeFilter) { artifacts = append(artifacts, artifact) } } return nil }) if err != nil { return nil, fmt.Errorf("failed to list artifacts: %w", err) } return artifacts, nil } // shouldIncludeArtifact 判断是否应该包含该 artifact func (c *OCIClient) shouldIncludeArtifact(artifact *entity.Artifact, filter string) bool { // 默认或 "all" 返回所有 if filter == "" || filter == "all" { return true } filter = strings.ToLower(strings.TrimSpace(filter)) switch filter { case "chart": // 只返回 Helm Charts return artifact.Type == entity.ArtifactTypeChart case "image": // 返回 Docker 或 OCI images return artifact.Type == entity.ArtifactTypeImage case "other": // 返回其他类型 return artifact.Type == entity.ArtifactTypeOther default: // 未知的 filter,返回所有 return true } } // GetArtifact 获取指定 artifact 的详细信息 func (c *OCIClient) GetArtifact(ctx context.Context, registry *entity.Registry, repository, reference string) (*entity.Artifact, error) { reg, err := c.getRegistry(registry) if err != nil { return nil, err } repo, err := reg.Repository(ctx, repository) if err != nil { return nil, fmt.Errorf("failed to get repository: %w", err) } // 解析 reference desc, err := repo.Resolve(ctx, reference) if err != nil { return nil, fmt.Errorf("failed to resolve artifact: %w", err) } // 获取 manifest manifestBytes, err := repo.Fetch(ctx, desc) if err != nil { return nil, fmt.Errorf("failed to fetch manifest: %w", err) } defer manifestBytes.Close() manifestData, err := io.ReadAll(manifestBytes) if err != nil { return nil, fmt.Errorf("failed to read manifest: %w", err) } // 解析 manifest 获取配置信息 var manifest map[string]interface{} if err := json.Unmarshal(manifestData, &manifest); err != nil { return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) } artifact := &entity.Artifact{ Repository: repository, Tag: reference, Digest: desc.Digest.String(), MediaType: desc.MediaType, Size: desc.Size, Annotations: make(map[string]string), } // 获取 config.mediaType 和 annotations if config, ok := manifest["config"].(map[string]interface{}); ok { // 获取 config.mediaType(用于准确的类型判断) if configMediaType, ok := config["mediaType"].(string); ok { artifact.ConfigType = configMediaType } // 获取 annotations if annotations, ok := config["annotations"].(map[string]interface{}); ok { for k, v := range annotations { if str, ok := v.(string); ok { artifact.Annotations[k] = str } } } } // 使用智能类型判断(综合 ConfigType, Annotations, Repository 名称等) artifact.DetermineType() return artifact, nil } // GetValuesSchema 获取 Helm Chart 的 values schema func (c *OCIClient) GetValuesSchema(ctx context.Context, registry *entity.Registry, repository, reference string) (string, error) { reg, err := c.getRegistry(registry) if err != nil { return "", err } repo, err := reg.Repository(ctx, repository) if err != nil { return "", fmt.Errorf("failed to get repository: %w", err) } // 解析 reference (tag 或 digest) desc, err := repo.Resolve(ctx, reference) if err != nil { return "", fmt.Errorf("failed to resolve artifact: %w", err) } manifestReader, err := repo.Fetch(ctx, desc) if err != nil { return "", fmt.Errorf("failed to fetch manifest: %w", err) } defer manifestReader.Close() manifestBytes, err := io.ReadAll(manifestReader) if err != nil { return "", fmt.Errorf("failed to read manifest: %w", err) } var manifest ocispec.Manifest if err := json.Unmarshal(manifestBytes, &manifest); err != nil { return "", fmt.Errorf("failed to unmarshal manifest: %w", err) } // 优先查找是否存在独立的 values schema layer(一些 registry 会将 values.schema.json 作为单独的 layer 存储) var valuesSchemaLayer *ocispec.Descriptor for i := range manifest.Layers { layer := manifest.Layers[i] mediaType := strings.ToLower(layer.MediaType) if strings.Contains(mediaType, "helm.values.schema") || strings.Contains(mediaType, "values.schema") { valuesSchemaLayer = &manifest.Layers[i] break } } // 如果存在独立的 values schema layer,直接返回 if valuesSchemaLayer != nil { reader, err := repo.Fetch(ctx, *valuesSchemaLayer) if err != nil { return "", fmt.Errorf("failed to fetch values schema layer: %w", err) } defer reader.Close() data, err := io.ReadAll(reader) if err != nil { return "", fmt.Errorf("failed to read values schema layer: %w", err) } if len(data) == 0 { return "", entity.ErrValuesSchemaNotFound } return string(data), nil } // 回退:查找 Helm Chart layer(tar+gzip 包含 chart 内容)并从中读取 values.schema.json var chartLayer *ocispec.Descriptor for i := range manifest.Layers { layer := manifest.Layers[i] if strings.Contains(layer.MediaType, "cncf.helm.chart") || strings.Contains(layer.MediaType, "helm.chart.content") { chartLayer = &manifest.Layers[i] break } } if chartLayer == nil { return "", entity.ErrValuesSchemaNotFound } if chartLayer.Digest == "" { return "", fmt.Errorf("chart layer digest is empty") } if _, err := digest.Parse(string(chartLayer.Digest)); err != nil { return "", fmt.Errorf("invalid chart layer digest: %w", err) } layerReader, err := repo.Fetch(ctx, *chartLayer) if err != nil { return "", fmt.Errorf("failed to fetch chart layer: %w", err) } defer layerReader.Close() gzipReader, err := gzip.NewReader(layerReader) if err != nil { return "", fmt.Errorf("failed to create gzip reader: %w", err) } defer gzipReader.Close() tarReader := tar.NewReader(gzipReader) for { header, err := tarReader.Next() if err == io.EOF { break } if err != nil { return "", fmt.Errorf("failed to read chart archive: %w", err) } if header.Typeflag != tar.TypeReg { continue } if strings.HasSuffix(header.Name, "values.schema.json") { data, err := io.ReadAll(tarReader) if err != nil { return "", fmt.Errorf("failed to read values.schema.json: %w", err) } if len(data) == 0 { return "", entity.ErrValuesSchemaNotFound } return string(data), nil } } return "", entity.ErrValuesSchemaNotFound } // GetValues 获取 Helm Chart 的 values.yaml func (c *OCIClient) GetValues(ctx context.Context, registry *entity.Registry, repository, reference string) (string, error) { reg, err := c.getRegistry(registry) if err != nil { return "", err } repo, err := reg.Repository(ctx, repository) if err != nil { return "", fmt.Errorf("failed to get repository: %w", err) } // 解析 reference (tag 或 digest) desc, err := repo.Resolve(ctx, reference) if err != nil { return "", fmt.Errorf("failed to resolve artifact: %w", err) } manifestReader, err := repo.Fetch(ctx, desc) if err != nil { return "", fmt.Errorf("failed to fetch manifest: %w", err) } defer manifestReader.Close() manifestBytes, err := io.ReadAll(manifestReader) if err != nil { return "", fmt.Errorf("failed to read manifest: %w", err) } var manifest ocispec.Manifest if err := json.Unmarshal(manifestBytes, &manifest); err != nil { return "", fmt.Errorf("failed to unmarshal manifest: %w", err) } // 查找 Helm Chart layer(tar+gzip 包含 chart 内容)并从中读取 values.yaml var chartLayer *ocispec.Descriptor for i := range manifest.Layers { layer := manifest.Layers[i] if strings.Contains(layer.MediaType, "cncf.helm.chart") || strings.Contains(layer.MediaType, "helm.chart.content") { chartLayer = &manifest.Layers[i] break } } if chartLayer == nil { return "", entity.ErrValuesNotFound } if chartLayer.Digest == "" { return "", fmt.Errorf("chart layer digest is empty") } if _, err := digest.Parse(string(chartLayer.Digest)); err != nil { return "", fmt.Errorf("invalid chart layer digest: %w", err) } layerReader, err := repo.Fetch(ctx, *chartLayer) if err != nil { return "", fmt.Errorf("failed to fetch chart layer: %w", err) } defer layerReader.Close() gzipReader, err := gzip.NewReader(layerReader) if err != nil { return "", fmt.Errorf("failed to create gzip reader: %w", err) } defer gzipReader.Close() tarReader := tar.NewReader(gzipReader) for { header, err := tarReader.Next() if err == io.EOF { break } if err != nil { return "", fmt.Errorf("failed to read chart archive: %w", err) } if header.Typeflag != tar.TypeReg { continue } // 查找 values.yaml 文件(可能在 chart 根目录或子目录中) // 通常路径格式为: {chart-name}/values.yaml if strings.HasSuffix(header.Name, "values.yaml") { data, err := io.ReadAll(tarReader) if err != nil { return "", fmt.Errorf("failed to read values.yaml: %w", err) } if len(data) == 0 { return "", entity.ErrValuesNotFound } return string(data), nil } } return "", entity.ErrValuesNotFound } // PullArtifact 下载 artifact 到本地 func (c *OCIClient) PullArtifact(ctx context.Context, registry *entity.Registry, repository, reference, destPath string) error { reg, err := c.getRegistry(registry) if err != nil { return err } repo, err := reg.Repository(ctx, repository) if err != nil { return fmt.Errorf("failed to get repository: %w", err) } // 解析 reference desc, err := repo.Resolve(ctx, reference) if err != nil { return fmt.Errorf("failed to resolve artifact: %w", err) } // 获取 manifest 内容 manifestReader, err := repo.Fetch(ctx, desc) if err != nil { return fmt.Errorf("failed to fetch manifest: %w", err) } defer manifestReader.Close() manifestBytes, err := io.ReadAll(manifestReader) if err != nil { return fmt.Errorf("failed to read manifest: %w", err) } var manifest ocispec.Manifest if err := json.Unmarshal(manifestBytes, &manifest); err != nil { return fmt.Errorf("failed to unmarshal manifest: %w", err) } var chartLayer *ocispec.Descriptor for i := range manifest.Layers { layer := manifest.Layers[i] if strings.Contains(layer.MediaType, "cncf.helm.chart") || strings.Contains(layer.MediaType, "helm.chart.content") { chartLayer = &layer break } } if chartLayer == nil { return fmt.Errorf("helm chart layer not found in manifest") } content, err := repo.Fetch(ctx, *chartLayer) if err != nil { return fmt.Errorf("failed to fetch chart layer: %w", err) } defer content.Close() // 确保目标目录存在 if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil { return fmt.Errorf("failed to create destination directory: %w", err) } // 写入文件 file, err := os.Create(destPath) if err != nil { return fmt.Errorf("failed to create file: %w", err) } defer file.Close() if _, err := io.Copy(file, content); err != nil { return fmt.Errorf("failed to write artifact: %w", err) } return nil } // PushArtifact 推送 artifact 到 Registry func (c *OCIClient) PushArtifact(ctx context.Context, registry *entity.Registry, repository, tag, sourcePath string) error { // 这是一个简化实现 // 实际应该实现完整的 OCI artifact push 流程 return fmt.Errorf("push artifact not fully implemented yet") } // CheckHealth 检查 Registry 健康状态 func (c *OCIClient) CheckHealth(ctx context.Context, registry *entity.Registry) error { reg, err := c.getRegistry(registry) if err != nil { return err } // 尝试 ping registry err = reg.Ping(ctx) if err != nil { return fmt.Errorf("registry health check failed: %w", err) } return nil }