diff --git a/.env.template b/.env.template new file mode 100644 index 0000000..5aeccbf --- /dev/null +++ b/.env.template @@ -0,0 +1,13 @@ + +# config +OCDP_CONFIG_FILE="~/.ocdp/config.yaml" + +# password +DATABASE_MYSQL_PASSWORD="****" + +# token +TOKEN_JWT_SECRET_KEY="****" + +# admin +ADMIN_USERNAME="admin" +ADMIN_PASSWORD="****" \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f104273 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ + +# python +__pycache__/ +*.py[cod] + +# data +*.csv +*.json +*.xlsx +*.yaml +*.yml + +# env variable +.env +*.ini + + diff --git a/README.md b/README.md index e69de29..8b13789 100644 --- a/README.md +++ b/README.md @@ -0,0 +1 @@ + diff --git a/config.yaml.template b/config.yaml.template new file mode 100644 index 0000000..ffce710 --- /dev/null +++ b/config.yaml.template @@ -0,0 +1,28 @@ +orchestration: + kube: + kubectl_file: "~/.ocdp/kube/config" + applications_dir: "~/.ocdp/kube/applications" + logs_dir: "~/.ocdp/kube/logs" + +logger: + loki: + url: "https://loki.bwgdi.com/loki/api/v1/push" + labels: "application=myapp,environment=develop" + label_keys: "" + +database: + mysql: + host: "localhost" + port: 3306 + db_name: "ocdp" + username: "root" + # ❗️ Password should be read from environment variables, not provided here + +password: + hash: + algorithm: "ARGON2" + +token: + jwt: + signing_algorithm: "HS256" + # ❗️ Secret should be read from environment variables, not provided here \ No newline at end of file diff --git a/frontend/frontend.html b/frontend/frontend.html new file mode 100644 index 0000000..1a01dc3 --- /dev/null +++ b/frontend/frontend.html @@ -0,0 +1,515 @@ + + + + + + Cluster Status Dashboard - Dynamic + + + + + + + + + + + + + +
+ +
+
+ + + +
+
+
+ + +
+ +
+

Orchestration Cluster Status

+

Real-time overview of cluster health and resource allocation.

+
+
+ +
+ +
+

Cluster Summary

+
+
+
+

Core Resource Usage

+
+
+
+

CPU Usage

+
+
+
+

Memory Usage

+
+
+
+

Ephemeral Storage

+
+
+
+

Pod Allocation

+
+
+
+
+
+
+

Scheduling Hints

+
+
+
+

GPU Availability

+
+
+
+
+ + +
+

Node Details

+
+
+
+
+ + + + + + + + + OCDP Application Manager + + + + + + + + +
+
+
+ +
+

Application Manager

+

Manage and deploy applications on your cluster.

+
+
+
+ +
+
+ +
+
+ + +
+ +
+
+
+
+ + +
+
+ + + + + + + + \ No newline at end of file diff --git a/migration/alembic.ini.template b/migration/alembic.ini.template new file mode 100644 index 0000000..ef83cb9 --- /dev/null +++ b/migration/alembic.ini.template @@ -0,0 +1,147 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +sqlalchemy.url = mysql+pymysql://root:GDIP%%40ssw0rd@localhost:3306/ocdp + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module +# hooks = ruff +# ruff.type = module +# ruff.module = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Alternatively, use the exec runner to execute a binary found on your PATH +# hooks = ruff +# ruff.type = exec +# ruff.executable = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/migration/alembic/README b/migration/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/migration/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/migration/alembic/env.py b/migration/alembic/env.py new file mode 100644 index 0000000..c3c1fcf --- /dev/null +++ b/migration/alembic/env.py @@ -0,0 +1,81 @@ +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +#target_metadata = None +from ocdp.database import Base +from ocdp.models import * +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migration/alembic/script.py.mako b/migration/alembic/script.py.mako new file mode 100644 index 0000000..1101630 --- /dev/null +++ b/migration/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/migration/alembic/versions/796b67d23c1c_create_initial_tables.py b/migration/alembic/versions/796b67d23c1c_create_initial_tables.py new file mode 100644 index 0000000..f1f7a2b --- /dev/null +++ b/migration/alembic/versions/796b67d23c1c_create_initial_tables.py @@ -0,0 +1,49 @@ +"""create initial tables + +Revision ID: 796b67d23c1c +Revises: +Create Date: 2025-08-23 16:05:51.420713 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '796b67d23c1c' +down_revision: Union[str, Sequence[str], None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('users', + sa.Column('user_id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('username', sa.String(length=64), nullable=False), + sa.Column('email', sa.String(length=128), nullable=False), + sa.Column('hashed_password', sa.String(length=128), nullable=False), + sa.Column('is_active', sa.Boolean(), nullable=False), + sa.Column('is_admin', sa.Boolean(), nullable=False), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.Column('updated_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.Column('last_login_at', sa.TIMESTAMP(timezone=True), nullable=True), + sa.PrimaryKeyConstraint('user_id') + ) + op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True) + op.create_index(op.f('ix_users_user_id'), 'users', ['user_id'], unique=False) + op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_users_username'), table_name='users') + op.drop_index(op.f('ix_users_user_id'), table_name='users') + op.drop_index(op.f('ix_users_email'), table_name='users') + op.drop_table('users') + # ### end Alembic commands ### diff --git a/ocdp/__init__.py b/ocdp/__init__.py new file mode 100644 index 0000000..9240e29 --- /dev/null +++ b/ocdp/__init__.py @@ -0,0 +1,5 @@ +# env +from dotenv import load_dotenv +load_dotenv() + +from . import logger diff --git a/ocdp/__main__.py b/ocdp/__main__.py new file mode 100644 index 0000000..18b493c --- /dev/null +++ b/ocdp/__main__.py @@ -0,0 +1,30 @@ +from fastapi import FastAPI + +from ocdp.controllers.v1 import router as api_v1_router + +app = FastAPI(title="One Click Deployment API", ) + +app.include_router(api_v1_router, tags=["v1"]) +# app.include_router(api_v2_router, tags=["v2"]) + +# 允许的来源(可以改成你前端的地址,比如 http://localhost:3000) +from fastapi.middleware.cors import CORSMiddleware + +origins = [ + "*", # 允许所有来源,生产环境建议改成具体域名 + # "http://localhost:3000", + # "https://yourdomain.com", +] + +# 添加中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=origins, # 允许访问的源 + allow_credentials=True, # 允许携带 Cookie + allow_methods=["*"], # 允许的方法,如 GET、POST 等 + allow_headers=["*"], # 允许的请求头 +) + + +import uvicorn +uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/ocdp/config.py b/ocdp/config.py new file mode 100644 index 0000000..d6a14a4 --- /dev/null +++ b/ocdp/config.py @@ -0,0 +1,137 @@ +import os +import yaml +from dataclasses import dataclass + +# ---------------------------- +# 1️⃣ Define configuration data structures +# ---------------------------- +@dataclass +class HashConfig: + algorithm: str # Password hashing algorithm + +@dataclass +class PasswordConfig: + hash: HashConfig + +@dataclass +class KubeConfig: + kubectl_file: str # Path to kubectl config file + applications_dir: str # Directory to store Kubernetes applications + logs_dir: str # Directory for logs + +@dataclass +class OrchestrationConfig: + kube: KubeConfig + +@dataclass +class LokiConfig: + url: str # Loki push endpoint + labels: str # Default labels for logs + label_keys: str # Optional: comma-separated label keys + +@dataclass +class LoggerConfig: + loki: LokiConfig + +@dataclass +class MySQLConfig: + host: str # MySQL host + port: int # MySQL port + db_name: str # Database db_name + username: str # Database username + password: str # Password read from environment variable + +@dataclass +class DatabaseConfig: + mysql: MySQLConfig + +@dataclass +class JWTConfig: + signing_algorithm: str # JWT signing algorithm + secret_key: str # Secret key read from environment variable + +@dataclass +class TokenConfig: + jwt: JWTConfig + +@dataclass +class AdminConfig: + username: str + password: str + +@dataclass +class Config: + orchestration: OrchestrationConfig + logger: LoggerConfig + database: DatabaseConfig + password: PasswordConfig + token: TokenConfig + admin: AdminConfig + +# ---------------------------- +# 2️⃣ Load YAML configuration and environment variables +# ---------------------------- +def load_config(yaml_path: str) -> Config: + with open(yaml_path, "r") as f: + raw = yaml.safe_load(f) + + # orchestration.kube + kube_cfg = KubeConfig(**raw["orchestration"]["kube"]) + orchestration_cfg = OrchestrationConfig(kube=kube_cfg) + + # logger.loki + loki_cfg = LokiConfig(**raw["logger"]["loki"]) + logger_cfg = LoggerConfig(loki=loki_cfg) + + # database.mysql + mysql_raw = raw["database"]["mysql"] + mysql_password = os.environ.get("DATABASE_MYSQL_PASSWORD") + if not mysql_password: + raise ValueError("Environment variable DATABASE_MYSQL_PASSWORD not set") + mysql_cfg = MySQLConfig(**mysql_raw, password=mysql_password) + database_cfg = DatabaseConfig(mysql=mysql_cfg) + + # password.hash + hash_cfg = HashConfig(**raw["password"]["hash"]) + password_cfg = PasswordConfig(hash=hash_cfg) + + # token.jwt + jwt_raw = raw["token"]["jwt"] + jwt_secret_key = os.environ.get("TOKEN_JWT_SECRET_KEY") + if not jwt_secret_key: + raise ValueError("Environment variable TOKEN_JWT_SECRET_KEY not set") + jwt_cfg = JWTConfig(**jwt_raw, secret_key=jwt_secret_key) + token_cfg = TokenConfig(jwt=jwt_cfg) + + # admin + admin_cfg = AdminConfig( + username=os.environ.get("ADMIN_USERNAME"), + password=os.environ.get("ADMIN_PASSWORD") + ) + + # Return final Config object + return Config( + orchestration=orchestration_cfg, + logger=logger_cfg, + database=database_cfg, + password=password_cfg, + token=token_cfg, + admin=admin_cfg + ) + +# ---------------------------- +# 3️⃣ Usage example +# ---------------------------- + +from dotenv import load_dotenv +load_dotenv() # Load environment variables from .env file +CONFIG = load_config(os.path.expanduser(os.environ.get("OCDP_CONFIG_FILE"))) +if CONFIG is None: + raise ValueError("Failed to load configuration from YAML file") + +print("Kube config path:", CONFIG.orchestration.kube.kubectl_file) +print("Password hash algorithm:", CONFIG.password.hash.algorithm) +print("MySQL password:", CONFIG.database.mysql.password) +print("JWT secret key:", CONFIG.token.jwt.secret_key) +print("Loki URL:", CONFIG.logger.loki.url) +print("Admin username:", CONFIG.admin.username) \ No newline at end of file diff --git a/ocdp/controllers/__init__.py b/ocdp/controllers/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ocdp/controllers/__init__.py @@ -0,0 +1 @@ + diff --git a/ocdp/controllers/v1/__init__.py b/ocdp/controllers/v1/__init__.py new file mode 100644 index 0000000..f9c0f5d --- /dev/null +++ b/ocdp/controllers/v1/__init__.py @@ -0,0 +1,20 @@ + +from fastapi import APIRouter + +from .user import router as user_router +from .auth import router as auth_router +from .orchestration import router as orchestration_router + +router = APIRouter(prefix="/api/v1") + +router.include_router(user_router) +router.include_router(auth_router) +router.include_router(orchestration_router) + + + + + + + + diff --git a/ocdp/controllers/v1/auth/__init__.py b/ocdp/controllers/v1/auth/__init__.py new file mode 100644 index 0000000..998d041 --- /dev/null +++ b/ocdp/controllers/v1/auth/__init__.py @@ -0,0 +1,10 @@ + +from fastapi import APIRouter + +from . import ( + login +) + +router = APIRouter(prefix="/auth") + +router.include_router(login.router) \ No newline at end of file diff --git a/ocdp/controllers/v1/auth/login.py b/ocdp/controllers/v1/auth/login.py new file mode 100644 index 0000000..6a9f57d --- /dev/null +++ b/ocdp/controllers/v1/auth/login.py @@ -0,0 +1,60 @@ + + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from ocdp.database import get_db +from ocdp.services.user import user_service + + +# 创建一个 API 路由器 +router = APIRouter() + + +# 登录请求的数据模型 +class LoginRequest(BaseModel): + """ + 用户登录的请求体 (Request Body) + """ + username: str + password: str + +# 登录成功后返回的数据模型 +class LoginResponse(BaseModel): + """ + 成功登录后返回的响应体 + """ + access_token: str + token_type: str = "bearer" + +# 假设你已经创建了路由器实例 +# router = APIRouter(prefix="/users", tags=["Users"]) + +# 添加登录路由 +@router.post("/login", response_model=LoginResponse) +def login( + login_in: LoginRequest, + db: Session = Depends(get_db) +): + """ + 通过用户名和密码获取访问令牌 (Access Token) + """ + # 调用服务层函数来处理登录逻辑 + token = user_service.login_for_access_token( + username=login_in.username, + password=login_in.password, + db=db + ) + + # 检查服务层返回的 token 是否为空,如果为空,表示认证失败 + if not token: + # 抛出 401 Unauthorized 异常 + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + + # 认证成功,返回包含 token 的响应 + return {"access_token": token} \ No newline at end of file diff --git a/ocdp/controllers/v1/orchestration/__init__.py b/ocdp/controllers/v1/orchestration/__init__.py new file mode 100644 index 0000000..44fa03f --- /dev/null +++ b/ocdp/controllers/v1/orchestration/__init__.py @@ -0,0 +1,11 @@ + +from fastapi import APIRouter + +from .cluster import router as cluster_router +from .application_controller import router as application_router + +router = APIRouter(prefix="/orchestration") + +router.include_router(cluster_router) +router.include_router(application_router) + diff --git a/ocdp/controllers/v1/orchestration/application/__init__.py b/ocdp/controllers/v1/orchestration/application/__init__.py new file mode 100644 index 0000000..d7a769f --- /dev/null +++ b/ocdp/controllers/v1/orchestration/application/__init__.py @@ -0,0 +1,7 @@ + +from fastapi import APIRouter + + + +router = APIRouter(prefix="/application") + diff --git a/ocdp/controllers/v1/orchestration/application/get_application_instance_status.py b/ocdp/controllers/v1/orchestration/application/get_application_instance_status.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/controllers/v1/orchestration/application/install_application_instance.py b/ocdp/controllers/v1/orchestration/application/install_application_instance.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/controllers/v1/orchestration/application/list_application_instances.py b/ocdp/controllers/v1/orchestration/application/list_application_instances.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/controllers/v1/orchestration/application/list_application_templates.py b/ocdp/controllers/v1/orchestration/application/list_application_templates.py new file mode 100644 index 0000000..307665f --- /dev/null +++ b/ocdp/controllers/v1/orchestration/application/list_application_templates.py @@ -0,0 +1,30 @@ +# list_application_template.py + +""" +Controller for Application Templates. +""" +from pydantic import BaseModel, Field +from fastapi import APIRouter, Depends + + + +from cluster_tool import Cluster, get_cluster +from services import application_service +from models.application import ApplicationMetadata + + + + + +# --- FastAPI Router --- + +router = APIRouter( + prefix="/application-templates", +) + + + +@router.get("/", response_model=list[ApplicationTemplate], summary="获取所有可安装的应用模板") +def list_application_templates(cluster: Cluster = Depends(get_cluster)): + """列出在 `applications_dir` 中所有可供安装的应用及其元数据。""" + return application_service.list_available_applications(cluster) \ No newline at end of file diff --git a/ocdp/controllers/v1/orchestration/application/uninstall_application_instance.py b/ocdp/controllers/v1/orchestration/application/uninstall_application_instance.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/controllers/v1/orchestration/application_controller.py b/ocdp/controllers/v1/orchestration/application_controller.py new file mode 100644 index 0000000..bb14d34 --- /dev/null +++ b/ocdp/controllers/v1/orchestration/application_controller.py @@ -0,0 +1,87 @@ +# ocdp/controllers/application_instances.py +""" +Controller for Application Instances. +(Authentication updated to use OAuth2 Password Bearer flow) +""" +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from sqlalchemy.orm import Session + +from ocdp.orchestration.cluster import Cluster, get_cluster +from ocdp.services.orchestration import application_service +from ocdp.models.orchestration.application import ( + InstalledApplicationInstance, InstallReceipt, + UninstallReceipt, NamespaceDeleteReceipt, ApplicationStatus, ApplicationTemplate +) +# 假设的依赖和 Service 函数导入路径 +from ocdp.database import get_db +from ocdp.services.user import user_service + +# ---------------- +from pydantic import BaseModel +from typing import Optional, Dict, Any + +class InstallRequest(BaseModel): + app_template_name: str + mode: str + user_overrides: Optional[Dict[str, Any]] = None + + +# --- FastAPI Router --- +router = APIRouter() + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") + +def get_current_user_id_dependency(token: str = Depends(oauth2_scheme)): + """ + 此依赖函数负责调用 user_service 来验证 token 并获取用户 ID。 + """ + # 将获取到的 token 传递给 user_service 的方法 + return user_service.get_user_id_by_token(token) + +@router.get("/application-templates", response_model=list[ApplicationTemplate], summary="列出所有可用的应用模板") +def list_application_templates( + cluster: Cluster = Depends(get_cluster) +): + """ + 获取系统中所有可用的应用模板列表。 + """ + return application_service.list_available_applications(cluster) + +@router.get("/application-instances", response_model=list[InstalledApplicationInstance], summary="列出当前用户已安装的应用实例") +def list_application_instances( + # 使用新的依赖函数 + user_id: str = Depends(get_current_user_id_dependency), + cluster: Cluster = Depends(get_cluster) +): + # 这里直接使用 user_id,业务逻辑保持不变 + return application_service.list_user_applications(cluster, user_id) + +@router.post("/application-instances", response_model=InstallReceipt, status_code=status.HTTP_202_ACCEPTED, summary="安装一个新的应用实例") +def install_application_instance( + request: InstallRequest, + user_id: str = Depends(get_current_user_id_dependency), + cluster: Cluster = Depends(get_cluster) +): + try: + return application_service.install_new_application( + cluster=cluster, user_id=user_id, app_template_name=request.app_template_name, + mode=request.mode, user_overrides=request.user_overrides + ) + except (ValueError, FileNotFoundError) as e: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"An unexpected error occurred: {e}") + +# ... (get_status 和 uninstall/delete 端点保持不变,因为它们不直接依赖 user_id) ... +@router.get("/application-instances/{namespace}/{app_template_name}/status", response_model=ApplicationStatus, summary="获取指定应用实例的状态") +def get_application_instance_status(namespace: str, app_template_name: str, mode: str, cluster: Cluster = Depends(get_cluster)): + return application_service.get_instance_status(cluster, namespace, app_template_name, mode) + +@router.delete("/application-instances/{namespace}/{app_template_name}", response_model=UninstallReceipt, summary="步骤1:卸载应用实例的 Release") +def uninstall_instance_release(namespace: str, app_template_name: str, mode: str, cluster: Cluster = Depends(get_cluster)): + return application_service.uninstall_application_release(cluster, namespace, app_template_name, mode) + +@router.delete("/application-instances/{namespace}", response_model=NamespaceDeleteReceipt, summary="步骤2:删除应用实例的命名空间") +def delete_instance_namespace(namespace: str, cluster: Cluster = Depends(get_cluster)): + return application_service.delete_application_namespace(cluster, namespace) \ No newline at end of file diff --git a/ocdp/controllers/v1/orchestration/cluster/__init__.py b/ocdp/controllers/v1/orchestration/cluster/__init__.py new file mode 100644 index 0000000..845856a --- /dev/null +++ b/ocdp/controllers/v1/orchestration/cluster/__init__.py @@ -0,0 +1,8 @@ + +from fastapi import APIRouter + +from .get_cluster_status import router as get_cluster_status_router + +router = APIRouter(prefix="/cluster") + +router.include_router(get_cluster_status_router) diff --git a/ocdp/controllers/v1/orchestration/cluster/get_cluster_status.py b/ocdp/controllers/v1/orchestration/cluster/get_cluster_status.py new file mode 100644 index 0000000..b009974 --- /dev/null +++ b/ocdp/controllers/v1/orchestration/cluster/get_cluster_status.py @@ -0,0 +1,24 @@ +# get_cluster_status.py +from fastapi import APIRouter, Depends + +from ocdp.orchestration.cluster import Cluster, get_cluster +from ocdp.services.orchestration import cluster_service +from ocdp.models.orchestration.cluster import ClusterStatus + +router = APIRouter() + +# 依赖注入函数名和参数名也同步调整,更清晰 +def get_status_from_service(cluster: Cluster = Depends(get_cluster)) -> ClusterStatus: + """辅助函数,在Depends中调用service函数""" + return cluster_service.get_cluster_status(cluster) + +@router.get( + "/cluster-status", + response_model=ClusterStatus, + summary="Get Comprehensive Cluster Status" +) +def get_comprehensive_cluster_status(status: ClusterStatus = Depends(get_status_from_service)): + """ + Provides a complete snapshot of the cluster's health and resources. + """ + return status \ No newline at end of file diff --git a/ocdp/controllers/v1/orchestration/cluster/get_health.py b/ocdp/controllers/v1/orchestration/cluster/get_health.py new file mode 100644 index 0000000..cd8a532 --- /dev/null +++ b/ocdp/controllers/v1/orchestration/cluster/get_health.py @@ -0,0 +1,34 @@ + +from pydantic import BaseModel +from fastapi import APIRouter, Depends, HTTPException +from ocdp.orchestration import Cluster + +# 假设的依赖和 Service 函数导入路径 +from ocdp.orchestration import get_cluster +from ocdp.services.orchestration import node_service + +# --- Response Models for this endpoint --- +class NodeHealthStatus(BaseModel): + is_ready: bool + pressures: dict[str, bool] + +HealthReportResponse = dict[str, NodeHealthStatus] + +# --- Router Definition --- +router = APIRouter() + +@router.get( + "/health", + response_model=HealthReportResponse, + # summary="获取集群节点健康状态" +) +def get_health(cluster: Cluster = Depends(get_cluster)): + """ + 获取集群所有节点的健康状态报告。 + - **is_ready**: 节点是否就绪。 + - **pressures**: 节点的各项压力状态,`true` 表示存在压力。 + """ + try: + return node_service.get_cluster_health_report(cluster) + except RuntimeError as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/ocdp/controllers/v1/orchestration/cluster/get_summary_resources.py b/ocdp/controllers/v1/orchestration/cluster/get_summary_resources.py new file mode 100644 index 0000000..8afb090 --- /dev/null +++ b/ocdp/controllers/v1/orchestration/cluster/get_summary_resources.py @@ -0,0 +1,39 @@ +from pydantic import BaseModel +from fastapi import APIRouter, Depends, HTTPException + +from ocdp.orchestration import Cluster, get_cluster +from ocdp.services.orchestration import node_service + +# --- Response Models for this endpoint --- +class ResourceDetail(BaseModel): + total: str + used: str # 修正: 'used_by_system' -> 'used' + free: str # 修正: 'free_for_pods' -> 'free' + +class GPUSummary(BaseModel): + total_count: int + allocatable_count: int + models_summary: dict[str, int] + +class ClusterSummaryResponse(BaseModel): + cpu: ResourceDetail + memory: ResourceDetail + storage: ResourceDetail + gpu: GPUSummary + +# --- Router Definition --- +router = APIRouter() + +@router.get( + "/summary/resources", + response_model=ClusterSummaryResponse, + # summary="获取集群资源汇总" +) +def get_summary_resources(cluster: Cluster = Depends(get_cluster)): + """ + 获取整个集群的资源聚合汇总报告。 + """ + try: + return node_service.get_cluster_summary_report(cluster) + except RuntimeError as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/ocdp/controllers/v1/orchestration/cluster/list_nodes_resources.py b/ocdp/controllers/v1/orchestration/cluster/list_nodes_resources.py new file mode 100644 index 0000000..fcc272d --- /dev/null +++ b/ocdp/controllers/v1/orchestration/cluster/list_nodes_resources.py @@ -0,0 +1,44 @@ +from pydantic import BaseModel +from fastapi import APIRouter, Depends, HTTPException + +from ocdp.orchestration import Cluster, get_cluster +from ocdp.services.orchestration import node_service + +# --- Response Models for this endpoint --- +class ResourceDetail(BaseModel): + total: str + used: str # 修正: 'used_by_system' -> 'used' + free: str # 修正: 'free_for_pods' -> 'free' + +class GPUInfo(BaseModel): + count: int + allocatable_count: int # 新增: 匹配 service 返回的可分配 GPU 数量 + model: str + memory_mb: int + +class NodeResourceDetail(BaseModel): + cpu: ResourceDetail + memory: ResourceDetail + storage: ResourceDetail + gpu: GPUInfo + +NodeResourcesResponse = dict[str, NodeResourceDetail] + +# --- Router Definition --- +router = APIRouter() + +@router.get( + "/nodes/resources", + response_model=NodeResourcesResponse, + # summary="获取各节点资源详情" +) +def list_nodes_resources(cluster: Cluster = Depends(get_cluster)): + """ + 获取集群中每个节点的详细资源使用报告。 + - **used_by_system**: 被系统和 Kubelet 预留的资源。 + - **free_for_pods**: 可供 Pod 调度的资源。 + """ + try: + return node_service.get_per_node_resource_report(cluster) + except RuntimeError as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/ocdp/controllers/v1/user/__init__.py b/ocdp/controllers/v1/user/__init__.py new file mode 100644 index 0000000..48d1711 --- /dev/null +++ b/ocdp/controllers/v1/user/__init__.py @@ -0,0 +1,15 @@ + +from fastapi import APIRouter + +from . import ( + register_user, + get_current_user +) + +router = APIRouter(prefix="/users") + +router.include_router(register_user.router) +router.include_router(get_current_user.router) + + + diff --git a/ocdp/controllers/v1/user/get_current_user.py b/ocdp/controllers/v1/user/get_current_user.py new file mode 100644 index 0000000..d29670f --- /dev/null +++ b/ocdp/controllers/v1/user/get_current_user.py @@ -0,0 +1,39 @@ + +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from pydantic import BaseModel, constr, EmailStr, validator +from sqlalchemy.orm import Session + +from ocdp.database import get_db +from ocdp.services.user import user_service +from ocdp.services.user import user_exceptions + +router = APIRouter() + +class GetCurrentUserResponse(BaseModel): + """ + 获取当前登录用户的信息 (Response Body) + """ + user_id: int + username: str + email: EmailStr + + class Config: + # Pydantic V2 推荐的用法 + from_attributes = True + # Pydantic V1 的旧用法 + # orm_mode = True + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") +@router.get("/me", response_model=GetCurrentUserResponse) +def get_current_user( + db: Session = Depends(get_db), + token: str = Depends(oauth2_scheme) +): + """ + 获取当前登录用户的信息. + """ + current_user = user_service.get_current_user(token, db) + if not current_user: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials") + return current_user \ No newline at end of file diff --git a/ocdp/controllers/v1/user/register_user.py b/ocdp/controllers/v1/user/register_user.py new file mode 100644 index 0000000..0c7d6fb --- /dev/null +++ b/ocdp/controllers/v1/user/register_user.py @@ -0,0 +1,79 @@ + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel, constr, EmailStr, validator +from sqlalchemy.orm import Session + +from ocdp.database import get_db +from ocdp.services.user import user_service +from ocdp.services.user import user_exceptions + +# 创建一个 API 路由器 +router = APIRouter() + +ALLOWED_PASSWORD_CHARS = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789@#$%&*_-") +class RegisterUserRequest(BaseModel): + """ + 用户注册的请求体 (Request Body) + """ + username: str + password: str + email: EmailStr + + @validator('password') + def validate_password(cls, v): + if len(v) < 8 or len(v) > 32: + raise ValueError('密码长度应在8~32位') + if not any(c.isalpha() for c in v): + raise ValueError('密码必须包含字母') + if not any(c.isdigit() for c in v): + raise ValueError('密码必须包含数字') + if any(c.isspace() for c in v): + raise ValueError('密码不能包含空格') + if any(c not in ALLOWED_PASSWORD_CHARS for c in v): + raise ValueError('密码包含非法字符') + return v + +class RegisterUserResponse(BaseModel): + """ + 成功注册后返回的用户信息 (Response Body) + 不包含密码等敏感数据 + """ + id: int + username: str + email: EmailStr + + class Config: + # Pydantic V2 推荐的用法 + from_attributes = True + # Pydantic V1 的旧用法 + # orm_mode = True + +@router.post("/", response_model=RegisterUserResponse, status_code=status.HTTP_201_CREATED) +def register_user( + user_in: RegisterUserRequest, + db: Session = Depends(get_db) +): + """ + 注册一个新用户. + + - **username**: 用户的唯一名称. + - **email**: 用户的唯一邮箱. + - **password**: 用户的密码. + """ + try: + # 调用 service 层的函数来创建用户 + # user_in.dict() 将 Pydantic 模型转换为字典 + created_user = user_service.create_user( + username=user_in.username, + password=user_in.password, + email=user_in.email, + db=db + ) + return created_user + except user_exceptions.UserAlreadyExistsError as e: + # 捕获 service 层抛出的特定异常 + # 返回 400 错误,并附带清晰的错误信息 + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) diff --git a/ocdp/controllers/v2/__init__.py b/ocdp/controllers/v2/__init__.py new file mode 100644 index 0000000..77892c6 --- /dev/null +++ b/ocdp/controllers/v2/__init__.py @@ -0,0 +1,15 @@ + +from fastapi import APIRouter + + + +router = APIRouter(prefix="/api/v2") + + + + + + + + + diff --git a/ocdp/daos/__init__.py b/ocdp/daos/__init__.py new file mode 100644 index 0000000..fa112b0 --- /dev/null +++ b/ocdp/daos/__init__.py @@ -0,0 +1,2 @@ + +from .user import user_dao diff --git a/ocdp/daos/orchestration/__init__.py b/ocdp/daos/orchestration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/daos/orchestration/application_dao.py b/ocdp/daos/orchestration/application_dao.py new file mode 100644 index 0000000..57348d7 --- /dev/null +++ b/ocdp/daos/orchestration/application_dao.py @@ -0,0 +1,304 @@ +# dao.py +import os +import yaml +import json +import time +from pydantic import ValidationError + +from ocdp.orchestration.cluster import Cluster +from ocdp.models.orchestration.application import (ApplicationTemplate, ApplicationMetadata, InstallReceipt, + ApplicationStatus, UninstallReceipt, PodStatusDetail, + InstalledApplicationInstance, NamespaceDeleteReceipt) # <-- 导入更新后的模型 + +# ... (list_application_templates, list_application_instances, _deep_merge 保持不变) ... +def list_application_templates(cluster: Cluster) -> list[ApplicationTemplate]: + app_dirs = cluster.list_applications(); template_list = [] + for app_dir in app_dirs: + try: + metadata_dict = cluster.get_application_metadata(app_dir) + validated_metadata = ApplicationMetadata(**metadata_dict) + template_list.append(ApplicationTemplate(name=app_dir,metadata=validated_metadata)) + except Exception as e: print(f"⚠️ Warning: Could not load or validate metadata for '{app_dir}': {e}") + return template_list + +def list_application_instances(cluster: Cluster, user_id: str) -> list[InstalledApplicationInstance]: + prefix = f"{user_id}-"; + try: + ns_json_str = cluster.get("namespaces"); all_ns_data = json.loads(ns_json_str).get("items", []) + user_namespaces = [ns['metadata']['name'] for ns in all_ns_data if ns['metadata']['name'].startswith(prefix)] + if not user_namespaces: return [] + releases_json_str = cluster.list_releases(all_namespaces=True, output="json"); all_releases = json.loads(releases_json_str) + instances = [] + for rel in all_releases: + if rel.get("namespace") in user_namespaces: + ns_parts = rel.get("namespace").split('-'); app_name = ns_parts[1] if len(ns_parts) > 2 else "unknown" + instances.append(InstalledApplicationInstance( + application_name=app_name, release_name=rel.get("name"), namespace=rel.get("namespace"), chart=rel.get("chart"), status=rel.get("status") + )) + return instances + except (RuntimeError, json.JSONDecodeError) as e: + print(f"❌ Error listing application instances: {e}"); return [] + +def _deep_merge(source: dict, destination: dict) -> dict: + for key, value in source.items(): + if isinstance(value, dict) and key in destination and isinstance(destination[key], dict): + destination[key] = _deep_merge(value, destination[key]) + else: + destination[key] = value + return destination + +def install_application( + cluster, + namespace, + app_template_name, + mode, + user_overrides=None +) -> InstallReceipt: + metadata = cluster.get_application_metadata(app_template_name) + print(f"Metadata for '{app_template_name}': {metadata}") + app_meta = ApplicationMetadata(**metadata) + + deployment_mode = getattr(app_meta, mode, None) + if not deployment_mode: + raise ValueError(f"Mode '{mode}' not found.") + + release_name = deployment_mode.release_name + chart_source = deployment_mode.chart + values_to_set = deployment_mode.sets + + if user_overrides: + values_to_set = _deep_merge(user_overrides, values_to_set) + + temp_values_path = f"/tmp/temp-values-{namespace}.yaml" + with open(temp_values_path, 'w') as f: + yaml.dump(values_to_set, f) + + try: + output = cluster.install_release( + release_name=release_name, + chart_source=chart_source, + namespace=namespace, + config_file=temp_values_path, + create_namespace=True + ) + print(output) + return InstallReceipt( + application_name=app_meta.application_name, + release_name=release_name, + namespace=namespace, + message=f"Installation triggered successfully. Raw output: {output.strip()}" + ) + finally: + if os.path.exists(temp_values_path): + os.remove(temp_values_path) + +def uninstall_application_release(cluster: Cluster, namespace: str, app_name: str, mode: str) -> UninstallReceipt: + try: + # 1. 获取并验证元数据 + metadata = cluster.get_application_metadata(app_name) + app_meta = ApplicationMetadata(**metadata) + deployment_mode = getattr(app_meta, mode, None) + if not deployment_mode: + raise ValueError(f"Mode '{mode}' not found in metadata.") + + release_name = deployment_mode.release_name + + # 2. 卸载 Helm Release + output = cluster.uninstall_release(release_name, namespace=namespace, wait=True) + uninstalled_successfully = True + + # 3. 验证卸载是否成功 + verification_message = "Verification successful: Release is no longer listed by Helm." + is_clean = True + try: + time.sleep(2) + releases_json_str = cluster.list_releases(namespace=namespace, output="json") + releases = json.loads(releases_json_str) + release_found = any(r['name'] == release_name for r in releases) + if release_found: + is_clean = False + verification_message = "Verification failed: Release is still present in Helm's list." + except Exception as e: + verification_message = f"Verification check failed: {e}" + + except (ValidationError, ValueError, RuntimeError) as e: + # 捕获所有已知的预处理和运行时错误 + return UninstallReceipt( + application_name=app_name, + release_name=release_name if 'release_name' in locals() else 'unknown', + namespace=namespace, + uninstalled_successfully=False, + is_clean=False, + message=f"Operation failed due to an error: {e}" + ) + except Exception as e: + # 捕获所有其他意外错误 + return UninstallReceipt( + application_name=app_name, + release_name=release_name if 'release_name' in locals() else 'unknown', + namespace=namespace, + uninstalled_successfully=False, + is_clean=False, + message=f"An unexpected error occurred: {e}" + ) + + return UninstallReceipt( + application_name=app_name, + release_name=release_name, + namespace=namespace, + uninstalled_successfully=uninstalled_successfully, + is_clean=is_clean, + message=f"{output.strip()}. {verification_message}" + ) + +def delete_namespace(cluster: Cluster, namespace: str) -> NamespaceDeleteReceipt: + app_name = "unknown" + try: + # 尝试从命名空间中提取应用名称 + ns_parts = namespace.split('-') + if len(ns_parts) > 2: + app_name = ns_parts[1] + except Exception: + pass # 如果解析失败,app_name 保持为 'unknown' + + try: + # 1. 提交删除命名空间的命令 + output = cluster.delete(resource_type="namespace", name=namespace) + deleted_successfully = True + + # 2. 验证命名空间是否已被删除 + is_clean = False + verification_message = "Delete command submitted. Namespace is terminating." + try: + # 循环检查命名空间直到它不存在 + timeout = 60 + start_time = time.time() + while time.time() - start_time < timeout: + try: + cluster.get("namespace", name=namespace) + time.sleep(5) + except RuntimeError as e: + if "not found" in str(e).lower(): + is_clean = True + verification_message = "Verification successful: Namespace not found." + break + else: + raise e # 重新抛出其他运行时错误 + if not is_clean: + verification_message = "Verification failed: Namespace still exists after timeout." + + except Exception as e: + verification_message = f"Verification check failed: {e}" + + except RuntimeError as e: + # 如果 delete 命令本身失败 + return NamespaceDeleteReceipt( + application_name=app_name, + namespace=namespace, + deleted_successfully=False, + is_clean=False, + message=f"Delete namespace command failed: {e}" + ) + + return NamespaceDeleteReceipt( + application_name=app_name, + namespace=namespace, + deleted_successfully=deleted_successfully, + is_clean=is_clean, + message=f"{output.strip()}. {verification_message}" + ) + +def get_application_status( + cluster, + namespace: str, + app_template_name: str, + mode: str +): + app_name = "Unknown" + base_access_url = None + paths = None + + try: + metadata_dict = cluster.get_application_metadata(app_template_name) + app_meta = ApplicationMetadata(**metadata_dict) + + deployment_mode = getattr(app_meta, mode, None) + if not deployment_mode: + raise ValueError(f"Mode '{mode}' not found.") + + app_name = app_meta.application_name + + if not deployment_mode.pod or not deployment_mode.pod.name: + raise ValueError("Pod name pattern is not defined.") + + pod_name_pattern = deployment_mode.pod.name + + if deployment_mode.svc: + base_access_url = deployment_mode.svc.url + paths = deployment_mode.svc.paths + + pods_json_str = cluster.get("pods", namespace=namespace) + all_pods = json.loads(pods_json_str).get("items", []) + + target_pods = [p for p in all_pods if p.get('metadata', {}).get('name', '').startswith(pod_name_pattern)] + + if not target_pods: + return ApplicationStatus( + application_name=app_name, + namespace=namespace, + is_ready=False, + base_access_url=base_access_url, + paths=paths, + details=[] + ) + + all_ready = True + pod_details = [] + for pod in target_pods: + pod_name = pod['metadata']['name'] + container_statuses = pod.get('status', {}).get('containerStatuses', []) + pod_phase = pod.get('status', {}).get('phase', '') + + ready_count = sum(1 for s in container_statuses if s.get('ready')) + total_count = len(container_statuses) + + pod_is_ready = (pod_phase == 'Running') and (ready_count == total_count) + if not pod_is_ready: + all_ready = False + + pod_details.append( + PodStatusDetail( + pod_name=pod_name, + is_ready=pod_is_ready, + ready_status=f"{ready_count}/{total_count}", + status_phase=pod_phase + ) + ) + + return ApplicationStatus( + application_name=app_name, + namespace=namespace, + is_ready=all_ready, + base_access_url=base_access_url, + paths=paths, + details=pod_details + ) + + except (ValidationError, json.JSONDecodeError, KeyError, ValueError, AttributeError) as e: + return ApplicationStatus( + application_name=app_name, + namespace=namespace, + is_ready=False, + base_access_url=base_access_url, + paths=paths, + details=[PodStatusDetail(pod_name="Error", is_ready=False, ready_status="0/0", status_phase=f"Error: {e}")] + ) + except Exception as e: + return ApplicationStatus( + application_name=app_name, + namespace=namespace, + is_ready=False, + base_access_url=base_access_url, + paths=paths, + details=[PodStatusDetail(pod_name="Unexpected Error", is_ready=False, ready_status="0/0", status_phase=f"Error: {e}")] + ) \ No newline at end of file diff --git a/ocdp/daos/orchestration/cluster_dao.py b/ocdp/daos/orchestration/cluster_dao.py new file mode 100644 index 0000000..2ed1af4 --- /dev/null +++ b/ocdp/daos/orchestration/cluster_dao.py @@ -0,0 +1,241 @@ +# cluster_dao.py +""" +Data Access Object (DAO) 层 - 函数式实现。 + +本模块负责执行 kubectl 命令,并解析其输出,将其转换为结构化的 Pydantic 模型。 +所有与数据获取、解析、转换和计算相关的逻辑都集中在此。 +""" +import re + +from ocdp.orchestration.cluster import Cluster +from ocdp.models.orchestration.cluster import ( + ClusterStatus, ClusterSummary, ClusterHealthSummary, ClusterResourceSummary, + NodeInfo, NodeHealth, NodeCondition, ResourceUsage, GPUType, GPUUsage, GPUInfo, + PodsUsage, PodDetail, TotalResourceUsage, PodsTotalUsage, MaxFreeNodeInfo, + MaxFreeGPUNodeInfo, DistributedGPUAvailability +) + +# ... (辅助函数 _parse_size_to_kib, _parse_cpu 等保持不变) ... +def _parse_size_to_kib(size_str: str | None) -> int: + if not size_str or not size_str[0].isdigit(): return 0 + size_str = size_str.lower() + val_match = re.search(r'(\d+)', size_str) + if not val_match: return 0 + val = int(val_match.group(1)) + if 'gi' in size_str: return val * 1024 * 1024 + if 'mi' in size_str: return val * 1024 + if 'ki' in size_str: return val + return val // 1024 +def _format_size_from_kib(kib: int) -> str: + if kib < 0: kib = 0 + if kib >= 1024 * 1024: return f"{round(kib / (1024 * 1024), 2)}Gi" + if kib >= 1024: return f"{round(kib / 1024, 2)}Mi" + return f"{kib}Ki" +def _parse_cpu(cpu_str: str | None) -> int: + if not cpu_str or not cpu_str[0].isdigit(): return 0 + if 'm' in cpu_str: return int(cpu_str.replace('m', '')) + if '.' in cpu_str: return int(float(cpu_str) * 1000) + if cpu_str.isdigit(): return int(cpu_str) * 1000 + return 0 +def _format_cpu(millicores: int) -> str: + if millicores < 0: millicores = 0 + if millicores < 1000: return f"{millicores}m" + return str(round(millicores / 1000, 3)) +def _find_value(pattern: str, text: str, flags: int = 0) -> str | None: + match = re.search(pattern, text, flags) + return match.group(1).strip() if match else None +def _parse_pods_table(full_node_text: str) -> list[PodDetail]: + pods_block_match = re.search(r"Non-terminated Pods:(.*?)(?=\nAllocated resources:|\nEvents:)", full_node_text, re.DOTALL) + if not pods_block_match: return [] + pods_text = pods_block_match.group(1).strip() + lines = pods_text.split('\n') + header_index = -1 + for i, line in enumerate(lines): + if "Namespace" in line and "Name" in line and "CPU Requests" in line: + header_index = i + break + if header_index == -1 or len(lines) <= header_index + 1: return [] + header = lines[header_index] + col_starts = {"ns": header.find("Namespace"), "name": header.find("Name"), "cpu_req": header.find("CPU Requests"), "cpu_lim": header.find("CPU Limits"), "mem_req": header.find("Memory Requests"), "mem_lim": header.find("Memory Limits"), "age": header.find("Age"),} + pod_list = [] + for line in lines[header_index + 1:]: + if not line.strip() or "---" in line: continue + parts = re.split(r'\s{2,}', line.strip()) + if len(parts) != 7: continue + pod_list.append(PodDetail( + namespace=parts[0], name=parts[1], cpu_requests=parts[2], cpu_limits=parts[3], + memory_requests=parts[4], memory_limits=parts[5], age=parts[6] + )) + return pod_list +def _parse_key_value_block(text: str) -> dict[str, str]: + data = {} + for line in text.strip().split('\n'): + parts = line.split(':', 1) + if len(parts) == 2: + key = parts[0].strip() + value = parts[1].strip() + data[key] = value + return data + +def _parse_single_node(text: str) -> NodeInfo | None: + name = _find_value(r"Name:\s*(\S+)", text) + if not name: return None + + roles_str = _find_value(r"Roles:\s*([^\n]*)", text) or "" + roles = [r.strip() for r in roles_str.split(',')] if roles_str != '' else ['worker'] + labels_text = _find_value(r"Labels:(.*?)(?=\nAnnotations:)", text, re.DOTALL) or "" + labels = {} + if labels_text: + for line in labels_text.strip().split('\n'): + key_value = line.strip().split('=', 1) + if len(key_value) == 2: + labels[key_value[0]] = key_value[1] + + conditions_text = _find_value(r"Conditions:(.*?)(?=\nAddresses:)", text, re.DOTALL) or "" + conditions = [] + for line in conditions_text.strip().split('\n')[2:]: + parts = re.split(r'\s{2,}', line.strip()) + if len(parts) >= 6: + conditions.append(NodeCondition(type=parts[0], status=parts[1], last_heartbeat_time=parts[2], last_transition_time=parts[3], reason=parts[4], message=parts[5])) + health = NodeHealth(conditions=conditions) + + allocatable_text = _find_value(r"Allocatable:(.*?)(?=\nSystem Info:)", text, re.DOTALL) or "" + allocatable = _parse_key_value_block(allocatable_text) + + allocated_block = _find_value(r"Allocated resources:(.*?)(?=\nEvents:)", text, re.DOTALL) or "" + allocated_requests = {} + for line in allocated_block.strip().split('\n'): + line = line.strip() + if not line or line.startswith('(') or line.startswith('Resource') or line.startswith('---'): + continue + match = re.match(r'^(\S+)\s+(\S+)', line) + if match: + resource, request_val = match.groups() + allocated_requests[resource] = request_val + + pods = PodsUsage( + total=int(allocatable.get("pods", "0")), + used=int(_find_value(r"Non-terminated Pods:\s*\((\d+) in total\)", text) or "0"), + free=0 + ) + cpu = ResourceUsage(total=allocatable.get("cpu", "0"), used=allocated_requests.get("cpu", "0m"), free="0") + memory = ResourceUsage(total=allocatable.get("memory", "0Ki"), used=allocated_requests.get("memory", "0Ki"), free="0") + + ephemeral_storage, hugepages_1Gi, hugepages_2Mi, rdma_shared_device_a = None, None, None, None + + if "ephemeral-storage" in allocatable: + ephemeral_storage = ResourceUsage( + total=allocatable["ephemeral-storage"], + used=allocated_requests.get("ephemeral-storage", "0Ki"), + free="0" + ) + if "hugepages-1Gi" in allocatable: + hugepages_1Gi = ResourceUsage( + total=allocatable["hugepages-1Gi"], + used=allocated_requests.get("hugepages-1Gi", "0"), + free="0" + ) + if "hugepages-2Mi" in allocatable: + hugepages_2Mi = ResourceUsage( + total=allocatable["hugepages-2Mi"], + used=allocated_requests.get("hugepages-2Mi", "0"), + free="0" + ) + if "rdma/rdma_shared_device_a" in allocatable: + rdma_shared_device_a = ResourceUsage( + total=allocatable["rdma/rdma_shared_device_a"], + used=allocated_requests.get("rdma/rdma_shared_device_a", "0"), + free="0" + ) + print(ephemeral_storage, hugepages_1Gi, hugepages_2Mi, rdma_shared_device_a) + running_pods = _parse_pods_table(text) + + gpu_info = None + gpu_total = int(allocatable.get("nvidia.com/gpu", "0")) + if gpu_total > 0: + gpu_usage = GPUUsage(total=gpu_total, used=int(allocated_requests.get("nvidia.com/gpu", "0")), free=0) + gpu_type = GPUType(product=labels.get("nvidia.com/gpu.product", "Unknown"), memory_mb=int(labels.get("nvidia.com/gpu.memory", "0"))) + gpu_info = GPUInfo(usage=gpu_usage, types=[gpu_type]) + + return NodeInfo( + name=name, roles=roles, labels=labels, health=health, cpu=cpu, memory=memory, pods=pods, + ephemeral_storage=ephemeral_storage, hugepages_1Gi=hugepages_1Gi, + hugepages_2Mi=hugepages_2Mi, rdma_shared_device_a=rdma_shared_device_a, + gpu_info=gpu_info, running_pods=running_pods + ) + +def get_cluster_status(cluster: Cluster) -> ClusterStatus: + raw_output = cluster.describe("nodes") + node_texts = re.split(r'\n(?=Name:\s+)', raw_output.strip()) + + nodes = [_parse_single_node(text) for text in node_texts if text.strip()] + nodes = [node for node in nodes if node is not None] + + # --- 更新: 计算所有 ResourceUsage 对象的 free 值 --- + for node in nodes: + node.cpu.free = _format_cpu(_parse_cpu(node.cpu.total) - _parse_cpu(node.cpu.used)) + node.memory.free = _format_size_from_kib(_parse_size_to_kib(node.memory.total) - _parse_size_to_kib(node.memory.used)) + node.pods.free = node.pods.total - node.pods.used + if node.gpu_info: + node.gpu_info.usage.free = node.gpu_info.usage.total - node.gpu_info.usage.used + if node.ephemeral_storage: + node.ephemeral_storage.free = _format_size_from_kib(_parse_size_to_kib(node.ephemeral_storage.total) - _parse_size_to_kib(node.ephemeral_storage.used)) + + # hugepages 和 rdma 是纯数字,直接计算 + if node.hugepages_1Gi: + node.hugepages_1Gi.free = str(int(node.hugepages_1Gi.total) - int(node.hugepages_1Gi.used)) + if node.hugepages_2Mi: + node.hugepages_2Mi.free = str(int(node.hugepages_2Mi.total) - int(node.hugepages_2Mi.used)) + if node.rdma_shared_device_a: + node.rdma_shared_device_a.free = str(int(node.rdma_shared_device_a.total) - int(node.rdma_shared_device_a.used)) + + # (汇总逻辑保持不变) + health_summary = ClusterHealthSummary(total_nodes=len(nodes), ready_nodes=sum(1 for n in nodes if n.health.overall_status == "Ready"), unhealthy_nodes=sum(1 for n in nodes if n.health.overall_status != "Ready")) + total_cpu_m = sum(_parse_cpu(n.cpu.total) for n in nodes) + used_cpu_m = sum(_parse_cpu(n.cpu.used) for n in nodes) + total_mem_kib = sum(_parse_size_to_kib(n.memory.total) for n in nodes) + used_mem_kib = sum(_parse_size_to_kib(n.memory.used) for n in nodes) + total_storage_kib = sum(_parse_size_to_kib(n.ephemeral_storage.total if n.ephemeral_storage else None) for n in nodes) + used_storage_kib = sum(_parse_size_to_kib(n.ephemeral_storage.used if n.ephemeral_storage else None) for n in nodes) + cluster_total_cpu = TotalResourceUsage(total=_format_cpu(total_cpu_m), used=_format_cpu(used_cpu_m), free=_format_cpu(total_cpu_m - used_cpu_m)) + cluster_total_memory = TotalResourceUsage(total=_format_size_from_kib(total_mem_kib), used=_format_size_from_kib(used_mem_kib), free=_format_size_from_kib(total_mem_kib - used_mem_kib)) + cluster_total_storage = TotalResourceUsage(total=_format_size_from_kib(total_storage_kib), used=_format_size_from_kib(used_storage_kib), free=_format_size_from_kib(total_storage_kib - used_storage_kib)) if total_storage_kib > 0 else None + total_pods = sum(n.pods.total for n in nodes) + used_pods = sum(n.pods.used for n in nodes) + cluster_total_pods = PodsTotalUsage(total=total_pods, used=used_pods, free=total_pods - used_pods) + best_cpu_node, best_mem_node, best_gpu_node = None, None, None + if nodes: + cpu_leader = max(nodes, key=lambda n: _parse_cpu(n.cpu.free)) + best_cpu_node = MaxFreeNodeInfo(node_name=cpu_leader.name, free_amount=cpu_leader.cpu.free) + mem_leader = max(nodes, key=lambda n: _parse_size_to_kib(n.memory.free)) + best_mem_node = MaxFreeNodeInfo(node_name=mem_leader.name, free_amount=mem_leader.memory.free) + gpu_nodes = [n for n in nodes if n.gpu_info and n.gpu_info.types] + if gpu_nodes: + gpu_leader = max(gpu_nodes, key=lambda n: n.gpu_info.usage.free * n.gpu_info.types[0].memory_mb) + best_gpu_node = MaxFreeGPUNodeInfo( + node_name=gpu_leader.name, + free_gpu_count=gpu_leader.gpu_info.usage.free, + memory_per_gpu_mb=gpu_leader.gpu_info.types[0].memory_mb, + total_potential_memory_gb=round(gpu_leader.gpu_info.usage.free * gpu_leader.gpu_info.types[0].memory_mb / 1024, 2) + ) + dist_gpu_map = {} + for node in nodes: + if node.gpu_info: + for gpu_type in node.gpu_info.types: + if gpu_type.product not in dist_gpu_map: + dist_gpu_map[gpu_type.product] = {"product": gpu_type.product, "memory_per_gpu_mb": gpu_type.memory_mb, "total_free_count": 0} + dist_gpu_map[gpu_type.product]["total_free_count"] += node.gpu_info.usage.free + distributed_gpu_availability = [DistributedGPUAvailability(**data) for data in dist_gpu_map.values()] + resource_summary = ClusterResourceSummary( + cluster_total_cpu=cluster_total_cpu, + cluster_total_memory=cluster_total_memory, + cluster_total_pods=cluster_total_pods, + cluster_total_ephemeral_storage=cluster_total_storage, + best_node_for_cpu=best_cpu_node, + best_node_for_memory=best_mem_node, + best_node_for_gpu_app=best_gpu_node, + distributed_gpu_availability=distributed_gpu_availability + ) + cluster_summary = ClusterSummary(health=health_summary, resources=resource_summary) + + return ClusterStatus(summary=cluster_summary, nodes=nodes) \ No newline at end of file diff --git a/ocdp/daos/user/__init__.py b/ocdp/daos/user/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/daos/user/user_dao.py b/ocdp/daos/user/user_dao.py new file mode 100644 index 0000000..a6e1f44 --- /dev/null +++ b/ocdp/daos/user/user_dao.py @@ -0,0 +1,36 @@ + +from sqlalchemy.orm import Session + +from ocdp.models.user import User + + +def get_user_by_id(user_id: int, db: Session = None): + return db.query(User).filter(User.user_id == user_id).first() + + +def get_user_by_username(username: str, db: Session = None): + return db.query(User).filter(User.username == username).first() + + +def get_user_by_email(email: str, db: Session = None): + return db.query(User).filter(User.email == email).first() + + +def add_user(user: User, db: Session = None): + db.add(user) + db.commit() + db.refresh(user) + return user + + +def update_user(user: User, db: Session = None): + db.commit() + db.refresh(user) + return user + + +def delete_user(user: User, db: Session = None): + db.delete(user) + db.commit() + return True + diff --git a/ocdp/database/__init__.py b/ocdp/database/__init__.py new file mode 100644 index 0000000..6c90301 --- /dev/null +++ b/ocdp/database/__init__.py @@ -0,0 +1,3 @@ + +from .database import get_db, Base + diff --git a/ocdp/database/database.py b/ocdp/database/database.py new file mode 100644 index 0000000..5d84c47 --- /dev/null +++ b/ocdp/database/database.py @@ -0,0 +1,36 @@ + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.declarative import declarative_base +from urllib.parse import quote_plus + +from ocdp.config import CONFIG + + +# +mysql_host = CONFIG.database.mysql.host +mysql_port = CONFIG.database.mysql.port +mysql_username = CONFIG.database.mysql.username +mysql_password = quote_plus(CONFIG.database.mysql.password) +mysql_db_name = CONFIG.database.mysql.db_name + +mysql_url = f"mysql+pymysql://{mysql_username}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_db_name}?charset=utf8mb4" + + +engine = create_engine( + mysql_url +) + +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +Base = declarative_base() + +def get_db(): + db = SessionLocal() + try: + yield db + except Exception as e: + db.rollback() + raise e + finally: + db.close() \ No newline at end of file diff --git a/ocdp/logger.py b/ocdp/logger.py new file mode 100644 index 0000000..998709c --- /dev/null +++ b/ocdp/logger.py @@ -0,0 +1,42 @@ +import os +import sys +from loguru import logger +from loguru_loki_handler import loki_handler + +from ocdp.config import CONFIG + +LABELS = CONFIG.logger.loki.labels +LABELS = dict(item.split("=") for item in LABELS.split(",")) if LABELS else {} +LABEL_KEYS = CONFIG.logger.loki.label_keys +LABEL_KEYS = LABEL_KEYS.split(",") if LABEL_KEYS else [] + +URL = CONFIG.logger.loki.url + +LOGS_DIR = CONFIG.orchestration.kube.logs_dir +LOGS_DIR = os.path.expanduser(LOGS_DIR) + +logger.configure(handlers=[ + { + "sink": sys.stdout, + "level": "INFO", + }, + { + "sink": sys.stderr, + "level": "ERROR", + }, + { + "sink": f"{LOGS_DIR}/app.log", + "serialize": True, + "level": "DEBUG", + "rotation": "1 day", + "compression": "zip" + }, + { + "sink": loki_handler( + url=URL, + labels=LABELS, + labelKeys=LABEL_KEYS + ), + "level": "INFO" + }, +]) diff --git a/ocdp/models/__init__.py b/ocdp/models/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ocdp/models/__init__.py @@ -0,0 +1 @@ + diff --git a/ocdp/models/orchestration/__init__.py b/ocdp/models/orchestration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/models/orchestration/application.py b/ocdp/models/orchestration/application.py new file mode 100644 index 0000000..6a5a61d --- /dev/null +++ b/ocdp/models/orchestration/application.py @@ -0,0 +1,74 @@ +# models.py +""" +定义了用于应用编排的所有 Pydantic 数据模型。 +""" +from pydantic import BaseModel, Field + +# ... (InstallationConfig, ApplicationDeploymentMode, ApplicationMetadata 保持不变) ... +class SvcInfo(BaseModel): + # 根据你的 YAML,URL 可以是 ~ (None) + svc_type: str + protocol: str + hostname: str + port: int + url: str | None = None # 允许 str 或 None + paths: dict[str, str] | None = None # 允许 dict 或 None + +class PodInfo(BaseModel): + name: str + +class ApplicationDeploymentMode(BaseModel): + method: str + release_name: str + chart: str + sets: dict = Field(default_factory=dict) + svc: SvcInfo + pod: PodInfo + +class ApplicationMetadata(BaseModel): + application_name: str + distributed: ApplicationDeploymentMode + monolithic: ApplicationDeploymentMode + +class ApplicationTemplate(BaseModel): + """代表一个可供安装的应用模板。""" + name: str = Field(..., description="应用模板的名称 (文件夹名)") + metadata: ApplicationMetadata = Field(..., description="从 metadata.yaml 解析出的完整配置") + +class InstalledApplicationInstance(BaseModel): + """代表一个已安装的应用实例。""" + application_name: str = Field(..., description="应用的业务名称") + release_name: str = Field(..., description="部署的 Helm Release 名称") + namespace: str = Field(..., description="应用实例所在的唯一命名空间") + chart: str = Field(..., description="所使用的 Helm Chart") + status: str = Field(..., description="Helm Release 的状态 (e.g., 'deployed', 'failed')") + +class InstallReceipt(BaseModel): + """成功触发安装后,返回给客户端的回执。""" + application_name: str; release_name: str; namespace: str; message: str + +class UninstallReceipt(BaseModel): + """卸载 Helm Release 后的回执。""" + application_name: str; release_name: str; namespace: str; + uninstalled_successfully: bool; is_clean: bool; message: str + +# --- 关键修改:增加 application_name 字段 --- +class NamespaceDeleteReceipt(BaseModel): + """删除 Namespace 后的回执。""" + application_name: str = Field(..., description="被删除实例的应用业务名称") + namespace: str = Field(..., description="被删除的命名空间") + deleted_successfully: bool = Field(..., description="delete 命令是否成功提交") + is_clean: bool = Field(..., description="验证步骤:Namespace 是否已从集群中清除") + message: str = Field(..., description="操作结果消息") + +class PodStatusDetail(BaseModel): + """描述单个 Pod 的详细状态。""" + pod_name: str; is_ready: bool; ready_status: str; status_phase: str | None + +class ApplicationStatus(BaseModel): + application_name: str + namespace: str + is_ready: bool + base_access_url: str | None # 允许 str 或 None + paths: dict | None # 允许 dict 或 None + details: list[PodStatusDetail] \ No newline at end of file diff --git a/ocdp/models/orchestration/cluster.py b/ocdp/models/orchestration/cluster.py new file mode 100644 index 0000000..fd4cfc5 --- /dev/null +++ b/ocdp/models/orchestration/cluster.py @@ -0,0 +1,156 @@ +# models.py +""" +定义了用于解析和展示 Kubernetes 集群状态的所有 Pydantic 数据模型。 +这些模型被用于 API 的请求响应、数据校验以及各层之间的数据传输。 +""" +from pydantic import BaseModel, Field, computed_field + +# --------------------------------------------------------------------------- +# I. 单个 K8s 对象的详细模型 (Detailed Models for Single K8s Objects) +# --------------------------------------------------------------------------- + +class PodDetail(BaseModel): + """代表节点上运行的一个 Pod 的详细资源占用信息。""" + namespace: str = Field(..., description="Pod 所在的命名空间") + name: str = Field(..., description="Pod 的名称") + cpu_requests: str = Field(..., description="CPU 请求量") + cpu_limits: str = Field(..., description="CPU 限制量") + memory_requests: str = Field(..., description="内存请求量") + memory_limits: str = Field(..., description="内存限制量") + age: str = Field(..., description="Pod 的运行时长") + +class NodeCondition(BaseModel): + """代表从 `kubectl describe node` 输出中解析出的单个 Condition 行。""" + type: str + status: str + last_heartbeat_time: str + last_transition_time: str + reason: str + message: str + +class NodeHealth(BaseModel): + """封装节点的健康状况,主要由 Condition 列表构成。""" + conditions: list[NodeCondition] = Field(..., description="节点的健康状况条件列表") + + @computed_field + @property + def overall_status(self) -> str: + """根据 'Ready' 类型的 Condition 计算出一个简明的总体状态。""" + for condition in self.conditions: + if condition.type == "Ready": + return "Ready" if condition.status == "True" else "NotReady" + return "Unknown" + +class ResourceUsage(BaseModel): + """通用资源使用情况模型,用于表示带单位的资源(如CPU, Memory, 存储)。""" + total: str = Field(..., description="资源总量 (来自 Allocatable)") + used: str = Field(..., description="已用资源量 (来自 Allocated Requests)") + free: str = Field(..., description="剩余可用资源量 (计算得出)") + +class PodsUsage(BaseModel): + """Pod 使用情况模型,表示可调度 Pod 的数量。""" + total: int = Field(..., description="节点可容纳的 Pod 总数 (Capacity)") + used: int = Field(..., description="节点上当前运行的 Pod 数量") + free: int = Field(..., description="剩余可调度的 Pod 数量 (计算得出)") + +class GPUUsage(BaseModel): + """表示节点上 GPU 设备的调度使用情况 (使用整数)。""" + total: int = Field(..., description="GPU 设备总数") + used: int = Field(..., description="已被 Pod 请求的 GPU 数量") + free: int = Field(..., description="空闲可用的 GPU 数量") + +class GPUType(BaseModel): + """表示节点上 GPU 的物理规格。""" + product: str = Field(..., description="GPU 产品型号") + memory_mb: int = Field(..., description="单块 GPU 的显存大小 (MB)") + +class GPUInfo(BaseModel): + """统一的、包含嵌套信息的 GPU 汇总对象。""" + usage: GPUUsage = Field(..., description="GPU 数量统计") + types: list[GPUType] = Field(..., description="节点上的 GPU 型号列表") + +class NodeInfo(BaseModel): + """核心模型,完整地描述了一个节点的所有相关信息。""" + name: str = Field(..., description="节点名称") + roles: list[str] = Field(..., description="节点角色") + labels: dict[str, str] = Field(..., description="节点的标签集合") + health: NodeHealth = Field(..., description="节点健康状况") + + cpu: ResourceUsage = Field(..., description="CPU 资源使用情况") + memory: ResourceUsage = Field(..., description="内存资源使用情况") + pods: PodsUsage = Field(..., description="Pod 使用情况") + + # 注意: 以下资源并非在所有节点上都存在,因此使用 `| None` (可选)。 + # 在 API 输出时,如果值为 None,这些字段将不会出现。 + ephemeral_storage: ResourceUsage | None = Field(None, description="临时存储资源使用情况") + hugepages_1Gi: ResourceUsage | None = Field(None, description="1Gi 大页内存使用情况") + hugepages_2Mi: ResourceUsage | None = Field(None, description="2Mi 大页内存使用情况") + rdma_shared_device_a: ResourceUsage | None = Field(None, description="RDMA 共享设备使用情况") + + gpu_info: GPUInfo | None = Field(None, description="节点上所有 GPU 的汇总信息") + running_pods: list[PodDetail] = Field(..., description="在该节点上运行的 Pod 列表") + + +# --------------------------------------------------------------------------- +# II. 集群级别汇总信息模型 (Cluster-Level Summary Information Models) +# --------------------------------------------------------------------------- + +# Section 1: 集群资源总览模型 +class TotalResourceUsage(BaseModel): + """用于集群总览的资源使用情况模型(字符串类型)。""" + total: str; used: str; free: str + +class PodsTotalUsage(BaseModel): + """用于集群总览的 Pod 使用情况模型(整数类型)。""" + total: int; used: int; free: int + +# Section 2: 单机最大承载能力模型 +class MaxFreeNodeInfo(BaseModel): + """用于在汇总信息中标识拥有最多空闲资源的节点。""" + node_name: str = Field(..., description="节点名称") + free_amount: str = Field(..., description="空闲资源量(带单位)") + +class MaxFreeGPUNodeInfo(BaseModel): + """专用于标识最适合部署大型单机 GPU 应用的节点。""" + node_name: str = Field(..., description="节点名称") + free_gpu_count: int = Field(..., description="该节点上的空闲 GPU 数量") + memory_per_gpu_mb: int = Field(..., description="该型号 GPU 的单卡显存") + total_potential_memory_gb: float = Field(..., description="空闲 GPU 总显存潜力 (GB), 计算公式: free_gpu_count * memory_per_gpu_mb") + +# Section 3: 分布式应用潜力模型 +class DistributedGPUAvailability(BaseModel): + """按型号汇总集群中所有可用(空闲)的 GPU,用于评估分布式应用潜力。""" + product: str = Field(..., description="GPU 产品型号") + memory_per_gpu_mb: int = Field(..., description="该型号 GPU 的单卡显存") + total_free_count: int = Field(..., description="该型号 GPU 在整个集群中的空闲总数") + +# --- 主汇总模型 --- +class ClusterResourceSummary(BaseModel): + """集群资源的详细汇总,分为总览、单机最大能力和分布式潜力三个部分。""" + # Part 1: 集群资源总览 + cluster_total_cpu: TotalResourceUsage = Field(..., description="集群 CPU 资源总览") + cluster_total_memory: TotalResourceUsage = Field(..., description="集群内存资源总览") + cluster_total_pods: PodsTotalUsage = Field(..., description="集群 Pod 容量总览") + cluster_total_ephemeral_storage: TotalResourceUsage | None = Field(None, description="集群临时存储资源总览") + + # Part 2: 单机最大承载能力 + best_node_for_cpu: MaxFreeNodeInfo | None = Field(None, description="拥有最多空闲CPU的节点") + best_node_for_memory: MaxFreeNodeInfo | None = Field(None, description="拥有最多空闲内存的节点") + best_node_for_gpu_app: MaxFreeGPUNodeInfo | None = Field(None, description="最适合部署大型单机GPU应用的节点") + + # Part 3: 分布式应用潜力 + distributed_gpu_availability: list[DistributedGPUAvailability] = Field(..., description="按型号汇总的、整个集群的空闲GPU数量") + +class ClusterHealthSummary(BaseModel): + """集群整体健康状况的简要汇总。""" + total_nodes: int; ready_nodes: int; unhealthy_nodes: int + +class ClusterSummary(BaseModel): + """将健康状况和资源汇总组合在一起的中间模型。""" + health: ClusterHealthSummary + resources: ClusterResourceSummary + +class ClusterStatus(BaseModel): + """API 的顶级响应模型,包含了集群的完整状态。""" + summary: ClusterSummary = Field(..., description="集群的整体汇总信息") + nodes: list[NodeInfo] = Field(..., description="集群中所有节点的详细信息列表") \ No newline at end of file diff --git a/ocdp/models/orchestration/resource.py b/ocdp/models/orchestration/resource.py new file mode 100644 index 0000000..130b1a1 --- /dev/null +++ b/ocdp/models/orchestration/resource.py @@ -0,0 +1,55 @@ +import json + +class Resource: + """ + 将 JSON / dict 转成 Python 对象,嵌套 dict 会自动变成 Resource, + list 中的 dict 也会自动转换。 + """ + def __init__(self, data): + if isinstance(data, dict): + for k, v in data.items(): + if isinstance(v, dict): + setattr(self, k, Resource(v)) + elif isinstance(v, list): + setattr(self, k, [Resource(i) if isinstance(i, dict) else i for i in v]) + else: + setattr(self, k, v) + else: + self.value = data # 基础类型直接存储到 value + + def __repr__(self): + return f"{self.__dict__}" + + def to_dict(self): + """可选:将 Resource 再转回 dict""" + result = {} + for k, v in self.__dict__.items(): + if isinstance(v, Resource): + result[k] = v.to_dict() + elif isinstance(v, list): + result[k] = [i.to_dict() if isinstance(i, Resource) else i for i in v] + else: + result[k] = v + return result + +# ---------------- 使用示例 ---------------- +if __name__ == "__main__": + kubectl_json = ''' + { + "metadata": {"name": "nginx", "namespace": "default"}, + "spec": {"containers": [{"name": "nginx", "image": "nginx:latest"}]}, + "status": {"phase": "Running"} + } + ''' + + # 转成 Resource 对象 + data_dict = json.loads(kubectl_json) + pod = Resource(data_dict) + + # 访问字段 + print(pod.metadata.name) # nginx + print(pod.spec.containers[0].image) # nginx:latest + print(pod.status.phase) # Running + + # 可选转回 dict + print(pod.to_dict()) \ No newline at end of file diff --git a/ocdp/models/user/__init__.py b/ocdp/models/user/__init__.py new file mode 100644 index 0000000..64ae1f1 --- /dev/null +++ b/ocdp/models/user/__init__.py @@ -0,0 +1,2 @@ + +from .user import User \ No newline at end of file diff --git a/ocdp/models/user/user.py b/ocdp/models/user/user.py new file mode 100644 index 0000000..10570b2 --- /dev/null +++ b/ocdp/models/user/user.py @@ -0,0 +1,21 @@ + +from sqlalchemy import Column, Integer, String, Boolean, TIMESTAMP, func +import datetime + +from ocdp.database import Base + +class User(Base): + __tablename__ = "users" + + user_id = Column(Integer, primary_key=True, index=True ,autoincrement=True) + username = Column(String(64), unique=True, index=True, nullable=False) + email = Column(String(128), unique=True, index=True, nullable=False) + hashed_password = Column(String(128), nullable=False) + is_active = Column(Boolean, nullable=False, default=True) + is_admin = Column(Boolean, nullable=False, default=False) + created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now()) + updated_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now()) + last_login_at = Column(TIMESTAMP(timezone=True), nullable=True) + + + diff --git a/ocdp/orchestration/__init__.py b/ocdp/orchestration/__init__.py new file mode 100644 index 0000000..1b94bbc --- /dev/null +++ b/ocdp/orchestration/__init__.py @@ -0,0 +1,2 @@ + +from .cluster import Cluster, get_cluster \ No newline at end of file diff --git a/ocdp/orchestration/cluster.py b/ocdp/orchestration/cluster.py new file mode 100644 index 0000000..9090fab --- /dev/null +++ b/ocdp/orchestration/cluster.py @@ -0,0 +1,265 @@ +import os +import time +import yaml +import subprocess + +from ocdp.config import CONFIG + + +class Cluster: + def __init__(self, kubeconfig: str | None = None): + self.kubeconfig = kubeconfig or os.path.expanduser(CONFIG.orchestration.kube.kubectl_file) + self.applications_dir = os.path.expanduser(CONFIG.orchestration.kube.applications_dir) + self.env = os.environ.copy() + self.env['KUBECONFIG'] = self.kubeconfig + + # ----------------- 应用元数据接口 ----------------- + def list_applications(self) -> list[str]: + """列出 applications_dir 中的所有应用(文件夹名)。""" + if not os.path.exists(self.applications_dir): + raise FileNotFoundError(f"The applications directory {self.applications_dir} does not exist.") + return [f for f in os.listdir(self.applications_dir) + if os.path.isdir(os.path.join(self.applications_dir, f))] + + def get_application_metadata(self, application_dir: str) -> dict: + """ + 获取指定应用的元数据信息。 + + Args: + application_dir (str): 应用在 applications_dir 中的文件夹名。 + + Returns: + dict: 从 metadata.yaml 文件中解析出的内容。 + """ + meta_path = os.path.join(self.applications_dir, application_dir, "metadata.yaml") + if not os.path.exists(meta_path): + raise FileNotFoundError(f"metadata.yaml not found in {application_dir}") + + with open(meta_path, 'r') as f: + try: + # 使用 FullLoader 以支持 YAML 锚点 (&) 和别名 (*) + metadata = yaml.full_load(f) + except yaml.YAMLError as e: + raise RuntimeError(f"Error parsing metadata.yaml: {e}") + + # --- 动态处理元数据 --- + # 遍历所有顶层键 (如 'distributed', 'monolithic') + # --- 动态处理元数据 --- + for mode, config in metadata.items(): + if isinstance(config, dict) and config.get('method') == 'helm': + # 1. 拼 chart 绝对路径 + if 'chart' in config: + chart_name = config['chart'] + config['chart'] = os.path.join(self.applications_dir, application_dir, chart_name) + + # 2. 拼服务 URL + # 直接从 config 中获取 svc_config,而不是从 sets 中 + svc_config = config.get('svc') + if isinstance(svc_config, dict): + protocol = svc_config.get('protocol') + hostname = svc_config.get('hostname') + port = svc_config.get('port') + if protocol and hostname and port: + svc_config['url'] = f"{protocol}://{hostname}:{port}" + + + return metadata + + # ----------------- Kubectl 高层接口 ----------------- + def apply(self, file_name: str) -> str: + """应用 Kubernetes 配置文件""" + return self._run_kubectl_cmd(["apply", "-f", self._resolve_file(file_name)]) + + def delete( + self, + file_name: str | None = None, + resource_type: str | None = None, + name: str | None = None, + namespace: str | None = None, + force: bool = False + ) -> str: + """删除 Kubernetes 资源。""" + if file_name: + cmd = ["delete", "-f", self._resolve_file(file_name)] + elif resource_type and name: + cmd = ["delete", resource_type, name] + if namespace: + cmd.extend(["--namespace", namespace]) + else: + raise ValueError("Invalid arguments: Provide 'file_name' or both 'resource_type' and 'name'.") + + if force: + cmd.extend(["--force", "--grace-period=0"]) + + return self._run_kubectl_cmd(cmd) + + def create(self, namespace_name: str) -> str: + """创建一个新的 Kubernetes 命名空间。""" + cmd = ["create", "namespace", namespace_name] + return self._run_kubectl_cmd(cmd) + + def get(self, resource_type: str, namespace: str = None, name: str = None, + output: str = "json", + all_namespaces: bool = False) -> str: + """通用资源获取方法。默认返回 JSON 格式。""" + cmd = ["get", resource_type.lower()] + if name: cmd.append(name) + if all_namespaces: cmd.append("-A") + elif namespace: cmd.extend(["-n", namespace]) + if output: cmd.extend(["-o", output]) + return self._run_kubectl_cmd(cmd) + + def describe(self, resource_type: str, name: str = None, namespace: str = None) -> str: + """描述指定资源。""" + cmd = ["describe", resource_type.lower()] + if name: cmd.append(name) + if namespace: cmd.extend(["-n", namespace]) + return self._run_kubectl_cmd(cmd) + + # ----------------- Helm Repository 管理接口 (新增) ----------------- + def add_repo( + self, + repo_name: str, + repo_url: str, + username: str | None = None, + password: str | None = None + ) -> str: + """ + 添加一个 Helm Chart 仓库 (helm repo add)。 + + Args: + repo_name (str): 仓库的本地别名。 + repo_url (str): 仓库的 URL。 + username (str, optional): 私有仓库的用户名。 + password (str, optional): 私有仓库的密码。 + + Returns: + str: 命令的输出。 + """ + cmd = ["repo", "add", repo_name, repo_url] + if username: + cmd.extend(["--username", username]) + if password: + cmd.extend(["--password", password, "--pass-credentials"]) + + return self._run_helm_cmd(cmd) + + def update_repos(self, repo_names: list[str] | None = None) -> str: + """ + 更新一个或多个 Helm Chart 仓库 (helm repo update)。 + + Args: + repo_names (list[str], optional): 要更新的仓库名称列表。如果为 None,则更新所有仓库。 + + Returns: + str: 命令的输出。 + """ + cmd = ["repo", "update"] + if repo_names: + cmd.extend(repo_names) + + return self._run_helm_cmd(cmd) + + # ----------------- Helm Release 管理接口 ----------------- + def install_release( + self, + release_name: str, + chart_source: str, + namespace: str, + config_file: str | None = None, + create_namespace: bool = True + ) -> str: + """ + 安装一个 Helm Release (应用实例)。 + """ + cmd = ["install", release_name, chart_source, "--namespace", namespace] + if create_namespace: + cmd.append("--create-namespace") + + # --- 关键修复:直接使用 config_file 路径 --- + if config_file: + # 不再调用 self._resolve_file,因为 config_file 是由上层逻辑(DAO) + # 提供的完整路径(例如 /tmp/temp-values.yaml)。 + if not os.path.exists(config_file): + raise FileNotFoundError(f"Provided config_file does not exist: {config_file}") + cmd.extend(["-f", config_file]) + return self._run_helm_cmd(cmd) + + def uninstall_release( + self, + release_name: str, + namespace: str | None = None, + wait: bool = False + ) -> str: + """卸载一个 Helm Release (应用实例)。""" + cmd = ["uninstall", release_name] + if namespace: + cmd.extend(["--namespace", namespace]) + if wait: + cmd.append("--wait") + return self._run_helm_cmd(cmd) + + def list_releases( + self, + namespace: str | None = None, + all_namespaces: bool = False, + output: str = None + ) -> str: + """列出已安装的 Helm Releases (应用实例)。""" + cmd = ["list"] + if all_namespaces: + cmd.append("--all-namespaces") + elif namespace: + cmd.extend(["--namespace", namespace]) + if output: + cmd.extend(["--output", output]) + + return self._run_helm_cmd(cmd) + + # ----------------- 私有方法 ----------------- + def _run_kubectl_cmd(self, cmd_args: list[str]) -> str: + """执行 kubectl 命令""" + try: + command = ["kubectl"] + cmd_args + print(f"🚀 Executing Kubectl: {' '.join(command)}") + result = subprocess.run( + command, check=True, capture_output=True, text=True, env=self.env + ) + return result.stdout + except FileNotFoundError: + raise RuntimeError("`kubectl` command not found. Is it installed and in your PATH?") + except subprocess.CalledProcessError as e: + error_message = f"kubectl command failed with exit code {e.returncode}:\n{e.stderr.strip()}" + raise RuntimeError(error_message) from e + + def _run_helm_cmd(self, cmd_args: list[str]) -> str: + """[内部实现] 执行 helm 命令""" + try: + command = ["helm", "--kubeconfig", self.kubeconfig] + cmd_args + print(f"🚀 Executing Helm: {' '.join(command)}") + result = subprocess.run( + command, check=True, capture_output=True, text=True, env=self.env + ) + return result.stdout + except FileNotFoundError: + raise RuntimeError("`helm` command not found. Is it installed and in your PATH?") + except subprocess.CalledProcessError as e: + error_message = f"Helm command failed with exit code {e.returncode}:\n{e.stderr.strip()}" + raise RuntimeError(error_message) from e + + def _resolve_file(self, file_name: str) -> str: + """解析资源文件路径 (相对于 applications_dir)""" + file_path = os.path.join(self.applications_dir, file_name) + if not os.path.exists(file_path): + raise FileNotFoundError(f"The file {file_path} does not exist.") + return file_path + + +def get_cluster() -> Cluster: + return Cluster() + + +if __name__ == "__main__": + cluster = get_cluster() + + print(cluster.get_application_metadata("infer")) \ No newline at end of file diff --git a/ocdp/services/__init__.py b/ocdp/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/services/orchestration/__init__.py b/ocdp/services/orchestration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/services/orchestration/application_service.py b/ocdp/services/orchestration/application_service.py new file mode 100644 index 0000000..7da351c --- /dev/null +++ b/ocdp/services/orchestration/application_service.py @@ -0,0 +1,71 @@ +# service.py +""" +Service (服务) 层 - 应用编排。 +负责处理核心业务逻辑(如权限、命名),并调用 DAO 层来执行数据操作。 +""" +import ulid + +from ocdp.orchestration.cluster import Cluster +from ocdp.daos.orchestration import application_dao as dao +from ocdp.models.orchestration.application import (ApplicationTemplate, InstallReceipt, ApplicationStatus, + InstalledApplicationInstance, UninstallReceipt, NamespaceDeleteReceipt, ApplicationMetadata) + +# ... (list_available_applications, list_user_applications 保持不变) ... +def list_available_applications(cluster: Cluster) -> list[ApplicationTemplate]: + """(Service) 获取所有可供安装的应用模板列表。""" + return dao.list_application_templates(cluster) +def list_user_applications(cluster: Cluster, user_id: str) -> list[InstalledApplicationInstance]: + """(Service) 获取指定用户已经安装的应用实例列表。""" + return dao.list_application_instances(cluster, user_id) + +def install_new_application( + cluster: Cluster, + user_id: str, + app_template_name: str, + mode: str, + user_overrides: dict | None = None +) -> InstallReceipt: + """ + (Service) 触发一个新应用的安装。 + 核心职责:根据业务规则生成唯一的命名空间。 + """ + # 1. (Service 职责) 获取应用的业务名称,用于构造命名空间 + # 这里通过调用一次 get_application_metadata 来获取,但只为了 application_name + # DAO 层为了执行任务,也会自己获取一次 + metadata = cluster.get_application_metadata(app_template_name) + application_name = metadata.get("application_name", app_template_name) + + # 2. (Service 职责) 生成唯一的命名空间 + instance_id = str(ulid.new()).lower() + namespace = f"{user_id}-{application_name}-{instance_id}" + + # 3. (Service 职责) 将所有参数(包括生成的namespace)传递给 DAO 层执行 + return dao.install_application( + cluster=cluster, + namespace=namespace, + app_template_name=app_template_name, + mode=mode, + user_overrides=user_overrides + ) + +def get_instance_status( + cluster: Cluster, + namespace: str, + app_template_name: str, + mode: str +) -> ApplicationStatus: + """(Service) 获取指定应用实例的详细状态。""" + return dao.get_application_status(cluster, namespace, app_template_name, mode) + +def uninstall_application_release( + cluster: Cluster, + namespace: str, + app_template_name: str, + mode: str +) -> UninstallReceipt: + """(Service) 卸载应用实例 (Helm Release)。""" + return dao.uninstall_application_release(cluster, namespace, app_template_name, mode) + +def delete_application_namespace(cluster: Cluster, namespace: str) -> NamespaceDeleteReceipt: + """(Service) 删除应用实例的命名空间。""" + return dao.delete_namespace(cluster, namespace) \ No newline at end of file diff --git a/ocdp/services/orchestration/cluster_service.py b/ocdp/services/orchestration/cluster_service.py new file mode 100644 index 0000000..bec7ed5 --- /dev/null +++ b/ocdp/services/orchestration/cluster_service.py @@ -0,0 +1,12 @@ +# services.py + +from ocdp.orchestration.cluster import Cluster +from ocdp.daos.orchestration import cluster_dao +from ocdp.models.orchestration.cluster import ClusterStatus + +def get_cluster_status(cluster: Cluster) -> ClusterStatus: # 参数名修改 + """ + Service层函数, 作为业务逻辑的入口。 + 它将请求直接委托给DAO层来处理。 + """ + return cluster_dao.get_cluster_status(cluster) # 变量名修改 \ No newline at end of file diff --git a/ocdp/services/orchestration/node_service.py b/ocdp/services/orchestration/node_service.py new file mode 100644 index 0000000..46ccdde --- /dev/null +++ b/ocdp/services/orchestration/node_service.py @@ -0,0 +1,135 @@ +# node_service.py + +import json +from collections import defaultdict +from ocdp.orchestration import Cluster, get_cluster + +# 从我们的 DAO 模块导入函数 +from ocdp.daos.orchestration.node_dao import ( + get_all_nodes_health_status, + get_all_nodes_resource_details, + refresh_nodes_cache as refresh_dao_cache +) + +# --- Service 层的格式化工具 --- +def _format_bytes(byte_count: int) -> str: + """将字节数格式化为人类可读的字符串""" + if byte_count < 0: return "N/A" + power = 1024 + n = 0 + power_labels = {0: 'B', 1: 'KiB', 2: 'MiB', 3: 'GiB', 4: 'TiB'} + while byte_count >= power and n < len(power_labels) -1 : + byte_count /= power + n += 1 + return f"{byte_count:.2f} {power_labels[n]}" + +# --- 对外暴露的 Service 接口函数 --- + +def get_cluster_health_report(cluster: Cluster) -> dict: + """1. (Service) 获取集群 nodes 健康状态报告""" + # 直接调用 DAO 函数,它的返回格式已经很好了 + return get_all_nodes_health_status(cluster) + +def get_per_node_resource_report(cluster: Cluster) -> dict: + """2. (Service) 获取各 node 资源的格式化报告""" + # 从 DAO 获取数值数据 + raw_resources = get_all_nodes_resource_details(cluster) + formatted_report = {} + + for name, data in raw_resources.items(): + # 格式化 DAO 传来的数值 + formatted_report[name] = { + "cpu": { + "total": f"{data['cpu']['total']:.2f} Cores", + "used": f"{data['cpu']['total'] - data['cpu']['allocatable']:.2f} Cores", + "free": f"{data['cpu']['allocatable']:.2f} Cores" + }, + "memory": { + "total": _format_bytes(data['memory']['total']), + "used": _format_bytes(data['memory']['total'] - data['memory']['allocatable']), + "free": _format_bytes(data['memory']['allocatable']) + }, + "storage": { + "total": _format_bytes(data['storage']['total']), + "used": _format_bytes(data['storage']['total'] - data['storage']['allocatable']), + "free": _format_bytes(data['storage']['allocatable']) + }, + # GPU 信息现在直接从 DAO 获取,已经是正确的格式 + "gpu": data['gpu'] + } + return formatted_report + +def get_cluster_summary_report(cluster: Cluster) -> dict: + """3. (Service) 获取集群总资源的汇总报告""" + # 从 DAO 获取数值数据 + raw_resources = get_all_nodes_resource_details(cluster) + + # 初始化聚合器 + total_cpu, alloc_cpu = 0.0, 0.0 + total_mem, alloc_mem = 0, 0 + total_sto, alloc_sto = 0, 0 + total_gpu_count = 0 + alloc_gpu_count = 0 + gpu_models = defaultdict(int) + + # 聚合 DAO 传来的数值 + for data in raw_resources.values(): + total_cpu += data['cpu']['total'] + alloc_cpu += data['cpu']['allocatable'] + total_mem += data['memory']['total'] + alloc_mem += data['memory']['allocatable'] + total_sto += data['storage']['total'] + alloc_sto += data['storage']['allocatable'] + + gpu_data = data['gpu'] + if gpu_data['count'] > 0: + total_gpu_count += gpu_data['count'] + alloc_gpu_count += gpu_data['allocatable_count'] + gpu_models[gpu_data['model']] += gpu_data['count'] + + # 格式化最终结果 + return { + "note": "'used' 代表被系统或 Kubelet 预留的资源, 'free' 代表可供 Pod 调度的资源。", + "cpu": { + "total": f"{total_cpu:.2f} Cores", + "used": f"{total_cpu - alloc_cpu:.2f} Cores", + "free": f"{alloc_cpu:.2f} Cores" + }, + "memory": { + "total": _format_bytes(total_mem), + "used": _format_bytes(total_mem - alloc_mem), + "free": _format_bytes(alloc_mem) + }, + "storage": { + "total": _format_bytes(total_sto), + "used": _format_bytes(total_sto - alloc_sto), + "free": _format_bytes(alloc_sto) + }, + "gpu": { + "total_count": total_gpu_count, + "allocatable_count": alloc_gpu_count, + "models_summary": dict(gpu_models) + } + } + +# --- 使用示例 --- +if __name__ == "__main__": + try: + # 只需要一个 cluster 客户端 + cluster_client = get_cluster() + + # 调用 Service 层的函数 + print("\n" + "="*20 + " 1. 集群健康状态 " + "="*20) + health_report = get_cluster_health_report(cluster_client) + print(json.dumps(health_report, indent=2)) + + print("\n" + "="*20 + " 2. 各节点资源详情 " + "="*20) + per_node_report = get_per_node_resource_report(cluster_client) + print(json.dumps(per_node_report, indent=2)) + + print("\n" + "="*20 + " 3. 集群资源汇总 " + "="*20) + summary_report = get_cluster_summary_report(cluster_client) + print(json.dumps(summary_report, indent=2)) + + except RuntimeError as e: + print(f"\n发生错误: {e}") diff --git a/ocdp/services/user/__init__.py b/ocdp/services/user/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ocdp/services/user/helpers/__init__.py b/ocdp/services/user/helpers/__init__.py new file mode 100644 index 0000000..0a3b5eb --- /dev/null +++ b/ocdp/services/user/helpers/__init__.py @@ -0,0 +1,3 @@ + +from .password_handler import hash_password, verify_password +from .token_handler import generate_token, verify_token diff --git a/ocdp/services/user/helpers/password_handler.py b/ocdp/services/user/helpers/password_handler.py new file mode 100644 index 0000000..71d5527 --- /dev/null +++ b/ocdp/services/user/helpers/password_handler.py @@ -0,0 +1,11 @@ + +from argon2 import PasswordHasher + +PH = PasswordHasher() + +def hash_password(password, ph=PH): + hashed = ph.hash(password) + return hashed + +def verify_password(hashed, password, ph=PH): + return ph.verify(hashed, password) \ No newline at end of file diff --git a/ocdp/services/user/helpers/token_handler.py b/ocdp/services/user/helpers/token_handler.py new file mode 100644 index 0000000..12c0d30 --- /dev/null +++ b/ocdp/services/user/helpers/token_handler.py @@ -0,0 +1,73 @@ +import os +import jwt +import datetime + +from ocdp.config import CONFIG + +# Best practice: Load from environment variables. +# For local development, you can use a .env file and the python-dotenv library. +SECRET_KEY = CONFIG.token.jwt.secret_key +ALGORITHM = CONFIG.token.jwt.signing_algorithm + +def generate_token( + user_id: str, + expires_delta: datetime.timedelta = datetime.timedelta(minutes=30), + secret_key: str = SECRET_KEY, + algorithm: str = ALGORITHM +) -> str: + """ + Generates a JWT token with an expiration time. + """ + # Ensure a secret key is available before proceeding. + if not secret_key: + raise ValueError("SECRET_KEY not found in environment variables.") + + # Get the current time in UTC. + issue_time = datetime.datetime.now(datetime.timezone.utc) + # Calculate the expiration time. + expire_time = issue_time + expires_delta + + # Create the payload with standard claims. + payload = { + "sub": user_id, # 'sub' (Subject): The user's unique identifier. + "iat": issue_time, # 'iat' (Issued At): The time the token was created. + "exp": expire_time # 'exp' (Expiration Time): When the token becomes invalid. + } + + # Encode the payload into a JWT token. + token = jwt.encode(payload, secret_key, algorithm=algorithm) + return token + +def verify_token( + token: str, + secret_key: str = SECRET_KEY, + algorithm: str = ALGORITHM +) -> dict | None: + """ + Verifies a JWT token. + + If the token is valid, it returns the decoded payload as a dictionary. + If the token is invalid (e.g., expired or bad signature), it returns None. + """ + # Ensure a secret key is available for decoding. + if not secret_key: + raise ValueError("SECRET_KEY not found in environment variables.") + + try: + # The core of verification. jwt.decode handles signature, expiration, and algorithm checks. + payload = jwt.decode( + token, + secret_key, + algorithms=[algorithm] # Specify the algorithm to prevent certain attacks. + ) + return payload + + except jwt.ExpiredSignatureError: + # This is one of the most common errors: the token is past its expiration date. + print("Token verification failed: Token has expired.") + return None + + except jwt.InvalidTokenError as e: + # This catches all other JWT errors, such as a bad signature or a malformed token. + print(f"Token verification failed: Invalid token. Error: {e}") + return None \ No newline at end of file diff --git a/ocdp/services/user/user_exceptions.py b/ocdp/services/user/user_exceptions.py new file mode 100644 index 0000000..2258f69 --- /dev/null +++ b/ocdp/services/user/user_exceptions.py @@ -0,0 +1,4 @@ + + +class UserAlreadyExistsError(Exception): + pass \ No newline at end of file diff --git a/ocdp/services/user/user_service.py b/ocdp/services/user/user_service.py new file mode 100644 index 0000000..83cd741 --- /dev/null +++ b/ocdp/services/user/user_service.py @@ -0,0 +1,119 @@ +# 文件名: user_service.py + +from sqlalchemy.orm import Session +import datetime + +from .helpers import hash_password, verify_password, generate_token, verify_token +from .user_exceptions import UserAlreadyExistsError + +# 从 DAO 层导入具体的数据库操作函数 +from ocdp.daos.user import user_dao +# 从模型和 DTOs/Schemas 中导入 +from ocdp.models.user import User +# 从辅助模块导入密码和 Token 相关函数 + + +# 第二层 +# --- 业务逻辑函数 --- + +def login_for_access_token(username: str, password: str, db: Session) -> str | None: + """处理用户登录的核心业务逻辑。""" + # 调用 DAO 层函数来获取用户 + user = user_dao.get_user_by_username(username, db) + + # 验证用户是否存在以及密码是否正确 + if not user or not verify_password(user.hashed_password, password): + return None # 认证失败 + + # 认证成功,生成 Token + token = generate_token(user_id=str(user.user_id)) # 假设主键是 user_id + + # (可选)更新最后登录时间 + set_last_login(user.user_id, db) + + return token + +# 第一层 +def get_user_by_id(user_id: int, db: Session): + # 直接调用 DAO 层函数 + return user_dao.get_user_by_id(user_id, db) + +def get_user_by_username(username: str, db: Session): + # 直接调用 DAO 层函数 + return user_dao.get_user_by_username(username, db) + +def get_user_by_email(email:str, db: Session): + # 直接调用 DAO 层函数 + return user_dao.get_user_by_email(email, db) + +def get_current_user(token: str, db: Session): + user_id = verify_token(token).get("sub") + if not user_id: + return None + return user_dao.get_user_by_id(user_id, db) + +def get_user_id_by_token(token: str): + user_id = verify_token(token).get("sub") + if not user_id: + return None + return user_id + +# 文件名: user_service.py + +def create_user(username: str, password: str, email: str, db: Session): + # 将验证逻辑移入 Service 层 + existing_user = user_dao.get_user_by_username(username, db) + if existing_user: + raise UserAlreadyExistsError(f"User with username '{username}' already exists.") + + # 可以在这里也增加 email 的存在性检查 + existing_email = user_dao.get_user_by_email(email, db) + if existing_email: + raise UserAlreadyExistsError(f"User with email '{email}' already exists.") + + hashed = hash_password(password) + user = User( + username=username, + email=email, + hashed_password=hashed + ) + return user_dao.add_user(user, db) + +def update_user_password(user_id: int, new_password: str, db: Session): + user = user_dao.get_user_by_id(user_id, db) + if not user: + return None + + # 业务逻辑:哈希新密码 + user.hashed_password = hash_password(new_password) + + # 调用 DAO 层函数来持久化更新 + return user_dao.update_user(user, db) + +def set_last_login(user_id: int, db: Session): + user = user_dao.get_user_by_id(user_id, db) + if not user: + return None + + user.last_login_at = datetime.datetime.now(datetime.timezone.utc) + + # 调用 DAO 层函数来持久化更新 + return user_dao.update_user(user, db) + +def deactivate_user(user_id: int, db: Session): + user = user_dao.get_user_by_id(user_id, db) + if not user: + return None + + user.is_active = False + + # 调用 DAO 层函数来持久化更新 + return user_dao.update_user(user, db) + +def delete_user(user_id: int, db: Session): + user = user_dao.get_user_by_id(user_id, db) + if not user: + return False # 表示用户不存在,删除失败 + + # 调用 DAO 层函数来删除 + return user_dao.delete_user(user, db) \ No newline at end of file diff --git a/ocdp/utils/__init__.py b/ocdp/utils/__init__.py new file mode 100644 index 0000000..e69de29