Files
ocdp/ocdp/services/user/helpers/token_handler.py
jackyliu c7f8e69d61 feat: Add orchestration models and services for Kubernetes cluster management
- Implemented Pydantic models for Kubernetes cluster state representation in `cluster.py`.
- Created a `Resource` class for converting JSON/dict to Python objects in `resource.py`.
- Established user models and services for user management, including password hashing and JWT token generation.
- Developed application orchestration services for managing Kubernetes applications, including installation and uninstallation.
- Added cluster service for retrieving cluster status and health reports.
- Introduced node service for fetching node resource details and health status.
- Implemented user service for handling user authentication and management.
2025-09-02 02:54:26 +00:00

73 lines
2.5 KiB
Python

import os
import jwt
import datetime
from ocdp.config import CONFIG
# Best practice: Load from environment variables.
# For local development, you can use a .env file and the python-dotenv library.
SECRET_KEY = CONFIG.token.jwt.secret_key
ALGORITHM = CONFIG.token.jwt.signing_algorithm
def generate_token(
user_id: str,
expires_delta: datetime.timedelta = datetime.timedelta(minutes=30),
secret_key: str = SECRET_KEY,
algorithm: str = ALGORITHM
) -> str:
"""
Generates a JWT token with an expiration time.
"""
# Ensure a secret key is available before proceeding.
if not secret_key:
raise ValueError("SECRET_KEY not found in environment variables.")
# Get the current time in UTC.
issue_time = datetime.datetime.now(datetime.timezone.utc)
# Calculate the expiration time.
expire_time = issue_time + expires_delta
# Create the payload with standard claims.
payload = {
"sub": user_id, # 'sub' (Subject): The user's unique identifier.
"iat": issue_time, # 'iat' (Issued At): The time the token was created.
"exp": expire_time # 'exp' (Expiration Time): When the token becomes invalid.
}
# Encode the payload into a JWT token.
token = jwt.encode(payload, secret_key, algorithm=algorithm)
return token
def verify_token(
token: str,
secret_key: str = SECRET_KEY,
algorithm: str = ALGORITHM
) -> dict | None:
"""
Verifies a JWT token.
If the token is valid, it returns the decoded payload as a dictionary.
If the token is invalid (e.g., expired or bad signature), it returns None.
"""
# Ensure a secret key is available for decoding.
if not secret_key:
raise ValueError("SECRET_KEY not found in environment variables.")
try:
# The core of verification. jwt.decode handles signature, expiration, and algorithm checks.
payload = jwt.decode(
token,
secret_key,
algorithms=[algorithm] # Specify the algorithm to prevent certain attacks.
)
return payload
except jwt.ExpiredSignatureError:
# This is one of the most common errors: the token is past its expiration date.
print("Token verification failed: Token has expired.")
return None
except jwt.InvalidTokenError as e:
# This catches all other JWT errors, such as a bad signature or a malformed token.
print(f"Token verification failed: Invalid token. Error: {e}")
return None