feat: Add orchestration models and services for Kubernetes cluster management

- Implemented Pydantic models for Kubernetes cluster state representation in `cluster.py`.
- Created a `Resource` class for converting JSON/dict to Python objects in `resource.py`.
- Established user models and services for user management, including password hashing and JWT token generation.
- Developed application orchestration services for managing Kubernetes applications, including installation and uninstallation.
- Added cluster service for retrieving cluster status and health reports.
- Introduced node service for fetching node resource details and health status.
- Implemented user service for handling user authentication and management.
This commit is contained in:
2025-09-02 02:50:42 +00:00
parent 00039b2fe1
commit c7f8e69d61
65 changed files with 3649 additions and 0 deletions

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,20 @@
from fastapi import APIRouter
from .user import router as user_router
from .auth import router as auth_router
from .orchestration import router as orchestration_router
router = APIRouter(prefix="/api/v1")
router.include_router(user_router)
router.include_router(auth_router)
router.include_router(orchestration_router)

View File

@ -0,0 +1,10 @@
from fastapi import APIRouter
from . import (
login
)
router = APIRouter(prefix="/auth")
router.include_router(login.router)

View File

@ -0,0 +1,60 @@
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ocdp.database import get_db
from ocdp.services.user import user_service
# 创建一个 API 路由器
router = APIRouter()
# 登录请求的数据模型
class LoginRequest(BaseModel):
"""
用户登录的请求体 (Request Body)
"""
username: str
password: str
# 登录成功后返回的数据模型
class LoginResponse(BaseModel):
"""
成功登录后返回的响应体
"""
access_token: str
token_type: str = "bearer"
# 假设你已经创建了路由器实例
# router = APIRouter(prefix="/users", tags=["Users"])
# 添加登录路由
@router.post("/login", response_model=LoginResponse)
def login(
login_in: LoginRequest,
db: Session = Depends(get_db)
):
"""
通过用户名和密码获取访问令牌 (Access Token)
"""
# 调用服务层函数来处理登录逻辑
token = user_service.login_for_access_token(
username=login_in.username,
password=login_in.password,
db=db
)
# 检查服务层返回的 token 是否为空,如果为空,表示认证失败
if not token:
# 抛出 401 Unauthorized 异常
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 认证成功,返回包含 token 的响应
return {"access_token": token}

View File

@ -0,0 +1,11 @@
from fastapi import APIRouter
from .cluster import router as cluster_router
from .application_controller import router as application_router
router = APIRouter(prefix="/orchestration")
router.include_router(cluster_router)
router.include_router(application_router)

View File

@ -0,0 +1,7 @@
from fastapi import APIRouter
router = APIRouter(prefix="/application")

View File

@ -0,0 +1,30 @@
# list_application_template.py
"""
Controller for Application Templates.
"""
from pydantic import BaseModel, Field
from fastapi import APIRouter, Depends
from cluster_tool import Cluster, get_cluster
from services import application_service
from models.application import ApplicationMetadata
# --- FastAPI Router ---
router = APIRouter(
prefix="/application-templates",
)
@router.get("/", response_model=list[ApplicationTemplate], summary="获取所有可安装的应用模板")
def list_application_templates(cluster: Cluster = Depends(get_cluster)):
"""列出在 `applications_dir` 中所有可供安装的应用及其元数据。"""
return application_service.list_available_applications(cluster)

View File

@ -0,0 +1,87 @@
# ocdp/controllers/application_instances.py
"""
Controller for Application Instances.
(Authentication updated to use OAuth2 Password Bearer flow)
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from ocdp.orchestration.cluster import Cluster, get_cluster
from ocdp.services.orchestration import application_service
from ocdp.models.orchestration.application import (
InstalledApplicationInstance, InstallReceipt,
UninstallReceipt, NamespaceDeleteReceipt, ApplicationStatus, ApplicationTemplate
)
# 假设的依赖和 Service 函数导入路径
from ocdp.database import get_db
from ocdp.services.user import user_service
# ----------------
from pydantic import BaseModel
from typing import Optional, Dict, Any
class InstallRequest(BaseModel):
app_template_name: str
mode: str
user_overrides: Optional[Dict[str, Any]] = None
# --- FastAPI Router ---
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def get_current_user_id_dependency(token: str = Depends(oauth2_scheme)):
"""
此依赖函数负责调用 user_service 来验证 token 并获取用户 ID。
"""
# 将获取到的 token 传递给 user_service 的方法
return user_service.get_user_id_by_token(token)
@router.get("/application-templates", response_model=list[ApplicationTemplate], summary="列出所有可用的应用模板")
def list_application_templates(
cluster: Cluster = Depends(get_cluster)
):
"""
获取系统中所有可用的应用模板列表。
"""
return application_service.list_available_applications(cluster)
@router.get("/application-instances", response_model=list[InstalledApplicationInstance], summary="列出当前用户已安装的应用实例")
def list_application_instances(
# 使用新的依赖函数
user_id: str = Depends(get_current_user_id_dependency),
cluster: Cluster = Depends(get_cluster)
):
# 这里直接使用 user_id业务逻辑保持不变
return application_service.list_user_applications(cluster, user_id)
@router.post("/application-instances", response_model=InstallReceipt, status_code=status.HTTP_202_ACCEPTED, summary="安装一个新的应用实例")
def install_application_instance(
request: InstallRequest,
user_id: str = Depends(get_current_user_id_dependency),
cluster: Cluster = Depends(get_cluster)
):
try:
return application_service.install_new_application(
cluster=cluster, user_id=user_id, app_template_name=request.app_template_name,
mode=request.mode, user_overrides=request.user_overrides
)
except (ValueError, FileNotFoundError) as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"An unexpected error occurred: {e}")
# ... (get_status 和 uninstall/delete 端点保持不变,因为它们不直接依赖 user_id) ...
@router.get("/application-instances/{namespace}/{app_template_name}/status", response_model=ApplicationStatus, summary="获取指定应用实例的状态")
def get_application_instance_status(namespace: str, app_template_name: str, mode: str, cluster: Cluster = Depends(get_cluster)):
return application_service.get_instance_status(cluster, namespace, app_template_name, mode)
@router.delete("/application-instances/{namespace}/{app_template_name}", response_model=UninstallReceipt, summary="步骤1卸载应用实例的 Release")
def uninstall_instance_release(namespace: str, app_template_name: str, mode: str, cluster: Cluster = Depends(get_cluster)):
return application_service.uninstall_application_release(cluster, namespace, app_template_name, mode)
@router.delete("/application-instances/{namespace}", response_model=NamespaceDeleteReceipt, summary="步骤2删除应用实例的命名空间")
def delete_instance_namespace(namespace: str, cluster: Cluster = Depends(get_cluster)):
return application_service.delete_application_namespace(cluster, namespace)

View File

@ -0,0 +1,8 @@
from fastapi import APIRouter
from .get_cluster_status import router as get_cluster_status_router
router = APIRouter(prefix="/cluster")
router.include_router(get_cluster_status_router)

View File

@ -0,0 +1,24 @@
# get_cluster_status.py
from fastapi import APIRouter, Depends
from ocdp.orchestration.cluster import Cluster, get_cluster
from ocdp.services.orchestration import cluster_service
from ocdp.models.orchestration.cluster import ClusterStatus
router = APIRouter()
# 依赖注入函数名和参数名也同步调整,更清晰
def get_status_from_service(cluster: Cluster = Depends(get_cluster)) -> ClusterStatus:
"""辅助函数在Depends中调用service函数"""
return cluster_service.get_cluster_status(cluster)
@router.get(
"/cluster-status",
response_model=ClusterStatus,
summary="Get Comprehensive Cluster Status"
)
def get_comprehensive_cluster_status(status: ClusterStatus = Depends(get_status_from_service)):
"""
Provides a complete snapshot of the cluster's health and resources.
"""
return status

View File

@ -0,0 +1,34 @@
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException
from ocdp.orchestration import Cluster
# 假设的依赖和 Service 函数导入路径
from ocdp.orchestration import get_cluster
from ocdp.services.orchestration import node_service
# --- Response Models for this endpoint ---
class NodeHealthStatus(BaseModel):
is_ready: bool
pressures: dict[str, bool]
HealthReportResponse = dict[str, NodeHealthStatus]
# --- Router Definition ---
router = APIRouter()
@router.get(
"/health",
response_model=HealthReportResponse,
# summary="获取集群节点健康状态"
)
def get_health(cluster: Cluster = Depends(get_cluster)):
"""
获取集群所有节点的健康状态报告。
- **is_ready**: 节点是否就绪。
- **pressures**: 节点的各项压力状态,`true` 表示存在压力。
"""
try:
return node_service.get_cluster_health_report(cluster)
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@ -0,0 +1,39 @@
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException
from ocdp.orchestration import Cluster, get_cluster
from ocdp.services.orchestration import node_service
# --- Response Models for this endpoint ---
class ResourceDetail(BaseModel):
total: str
used: str # 修正: 'used_by_system' -> 'used'
free: str # 修正: 'free_for_pods' -> 'free'
class GPUSummary(BaseModel):
total_count: int
allocatable_count: int
models_summary: dict[str, int]
class ClusterSummaryResponse(BaseModel):
cpu: ResourceDetail
memory: ResourceDetail
storage: ResourceDetail
gpu: GPUSummary
# --- Router Definition ---
router = APIRouter()
@router.get(
"/summary/resources",
response_model=ClusterSummaryResponse,
# summary="获取集群资源汇总"
)
def get_summary_resources(cluster: Cluster = Depends(get_cluster)):
"""
获取整个集群的资源聚合汇总报告。
"""
try:
return node_service.get_cluster_summary_report(cluster)
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@ -0,0 +1,44 @@
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException
from ocdp.orchestration import Cluster, get_cluster
from ocdp.services.orchestration import node_service
# --- Response Models for this endpoint ---
class ResourceDetail(BaseModel):
total: str
used: str # 修正: 'used_by_system' -> 'used'
free: str # 修正: 'free_for_pods' -> 'free'
class GPUInfo(BaseModel):
count: int
allocatable_count: int # 新增: 匹配 service 返回的可分配 GPU 数量
model: str
memory_mb: int
class NodeResourceDetail(BaseModel):
cpu: ResourceDetail
memory: ResourceDetail
storage: ResourceDetail
gpu: GPUInfo
NodeResourcesResponse = dict[str, NodeResourceDetail]
# --- Router Definition ---
router = APIRouter()
@router.get(
"/nodes/resources",
response_model=NodeResourcesResponse,
# summary="获取各节点资源详情"
)
def list_nodes_resources(cluster: Cluster = Depends(get_cluster)):
"""
获取集群中每个节点的详细资源使用报告。
- **used_by_system**: 被系统和 Kubelet 预留的资源。
- **free_for_pods**: 可供 Pod 调度的资源。
"""
try:
return node_service.get_per_node_resource_report(cluster)
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@ -0,0 +1,15 @@
from fastapi import APIRouter
from . import (
register_user,
get_current_user
)
router = APIRouter(prefix="/users")
router.include_router(register_user.router)
router.include_router(get_current_user.router)

View File

@ -0,0 +1,39 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel, constr, EmailStr, validator
from sqlalchemy.orm import Session
from ocdp.database import get_db
from ocdp.services.user import user_service
from ocdp.services.user import user_exceptions
router = APIRouter()
class GetCurrentUserResponse(BaseModel):
"""
获取当前登录用户的信息 (Response Body)
"""
user_id: int
username: str
email: EmailStr
class Config:
# Pydantic V2 推荐的用法
from_attributes = True
# Pydantic V1 的旧用法
# orm_mode = True
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
@router.get("/me", response_model=GetCurrentUserResponse)
def get_current_user(
db: Session = Depends(get_db),
token: str = Depends(oauth2_scheme)
):
"""
获取当前登录用户的信息.
"""
current_user = user_service.get_current_user(token, db)
if not current_user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials")
return current_user

View File

@ -0,0 +1,79 @@
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, constr, EmailStr, validator
from sqlalchemy.orm import Session
from ocdp.database import get_db
from ocdp.services.user import user_service
from ocdp.services.user import user_exceptions
# 创建一个 API 路由器
router = APIRouter()
ALLOWED_PASSWORD_CHARS = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789@#$%&*_-")
class RegisterUserRequest(BaseModel):
"""
用户注册的请求体 (Request Body)
"""
username: str
password: str
email: EmailStr
@validator('password')
def validate_password(cls, v):
if len(v) < 8 or len(v) > 32:
raise ValueError('密码长度应在8~32位')
if not any(c.isalpha() for c in v):
raise ValueError('密码必须包含字母')
if not any(c.isdigit() for c in v):
raise ValueError('密码必须包含数字')
if any(c.isspace() for c in v):
raise ValueError('密码不能包含空格')
if any(c not in ALLOWED_PASSWORD_CHARS for c in v):
raise ValueError('密码包含非法字符')
return v
class RegisterUserResponse(BaseModel):
"""
成功注册后返回的用户信息 (Response Body)
不包含密码等敏感数据
"""
id: int
username: str
email: EmailStr
class Config:
# Pydantic V2 推荐的用法
from_attributes = True
# Pydantic V1 的旧用法
# orm_mode = True
@router.post("/", response_model=RegisterUserResponse, status_code=status.HTTP_201_CREATED)
def register_user(
user_in: RegisterUserRequest,
db: Session = Depends(get_db)
):
"""
注册一个新用户.
- **username**: 用户的唯一名称.
- **email**: 用户的唯一邮箱.
- **password**: 用户的密码.
"""
try:
# 调用 service 层的函数来创建用户
# user_in.dict() 将 Pydantic 模型转换为字典
created_user = user_service.create_user(
username=user_in.username,
password=user_in.password,
email=user_in.email,
db=db
)
return created_user
except user_exceptions.UserAlreadyExistsError as e:
# 捕获 service 层抛出的特定异常
# 返回 400 错误,并附带清晰的错误信息
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)

View File

@ -0,0 +1,15 @@
from fastapi import APIRouter
router = APIRouter(prefix="/api/v2")