Files
beaver_project/app-instance/instance-registry.py
2026-06-03 12:06:34 +08:00

348 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import fcntl
import json
import socket
import sys
from contextlib import contextmanager
from pathlib import Path
from typing import Any
DEFAULT_REGISTRY = Path(__file__).resolve().parent / "runtime" / "registry" / "instances.json"
def _default_data() -> dict[str, Any]:
return {"version": 1, "instances": []}
def _normalize_record(record: dict[str, Any]) -> dict[str, Any]:
normalized = dict(record)
normalized["host_port"] = int(record.get("host_port", 0) or 0)
for key in (
"instance_id",
"instance_slug",
"container_name",
"image_name",
"public_url",
"instance_root",
"beaver_home",
"config_path",
"auth_users_path",
"network_name",
"backend_id",
"backend_name",
"authz_base_url",
"created_at",
"username",
"email",
"instance_host",
"frontend_base_url",
"api_base_url",
):
normalized[key] = str(record.get(key, "") or "")
return normalized
def read_registry(path: Path) -> dict[str, Any]:
if not path.exists():
return _default_data()
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return _default_data()
if not isinstance(data, dict):
return _default_data()
if not isinstance(data.get("instances"), list):
data["instances"] = []
data["instances"] = [_normalize_record(item) for item in data["instances"] if isinstance(item, dict)]
return data
@contextmanager
def locked_registry(path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
lock_path = path.with_suffix(".lock")
lock_path.touch(exist_ok=True)
with lock_path.open("r+", encoding="utf-8") as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
if path.exists():
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
data = _default_data()
else:
data = _default_data()
if not isinstance(data, dict):
data = _default_data()
if not isinstance(data.get("instances"), list):
data["instances"] = []
data["instances"] = [_normalize_record(item) for item in data["instances"] if isinstance(item, dict)]
try:
yield data
finally:
path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def _match(
record: dict[str, Any],
*,
instance_id: str | None,
slug: str | None,
container_name: str | None,
username: str | None,
instance_host: str | None,
) -> bool:
if instance_id and record.get("instance_id") == instance_id:
return True
if slug and record.get("instance_slug") == slug:
return True
if container_name and record.get("container_name") == container_name:
return True
if username and record.get("username") == username:
return True
if instance_host and record.get("instance_host") == instance_host:
return True
return False
def _get_record(
data: dict[str, Any],
*,
instance_id: str | None,
slug: str | None,
container_name: str | None,
username: str | None,
instance_host: str | None,
) -> dict[str, Any] | None:
for item in data["instances"]:
if _match(
item,
instance_id=instance_id,
slug=slug,
container_name=container_name,
username=username,
instance_host=instance_host,
):
return item
return None
def cmd_list(args: argparse.Namespace) -> int:
path = Path(args.registry).expanduser()
data = read_registry(path)
instances = list(data["instances"])
if args.json:
json.dump({"instances": instances}, sys.stdout, indent=2, ensure_ascii=False)
sys.stdout.write("\n")
return 0
for item in instances:
print(
"\t".join(
[
str(item.get("instance_id", "")),
str(item.get("instance_slug", "")),
str(item.get("container_name", "")),
str(item.get("host_port", "")),
str(item.get("public_url", "")),
str(item.get("instance_root", "")),
]
)
)
return 0
def cmd_get(args: argparse.Namespace) -> int:
path = Path(args.registry).expanduser()
data = read_registry(path)
record = _get_record(
data,
instance_id=args.instance_id,
slug=args.slug,
container_name=args.container_name,
username=args.username,
instance_host=args.instance_host,
)
if record is None:
return 1
json.dump(record, sys.stdout, indent=2, ensure_ascii=False)
sys.stdout.write("\n")
return 0
def cmd_upsert(args: argparse.Namespace) -> int:
path = Path(args.registry).expanduser()
record = {
"instance_id": args.instance_id,
"instance_slug": args.instance_slug,
"container_name": args.container_name,
"image_name": args.image_name,
"host_port": int(args.host_port),
"public_url": args.public_url,
"instance_root": args.instance_root,
"beaver_home": args.beaver_home,
"config_path": args.config_path,
"auth_users_path": args.auth_users_path,
"network_name": args.network_name or "",
"backend_id": args.backend_id or "",
"backend_name": args.backend_name or "",
"authz_base_url": args.authz_base_url or "",
"username": args.username or "",
"email": args.email or "",
"instance_host": args.instance_host or "",
"frontend_base_url": args.frontend_base_url or "",
"api_base_url": args.api_base_url or "",
"created_at": args.created_at,
}
with locked_registry(path) as data:
kept: list[dict[str, Any]] = []
for item in data["instances"]:
if _match(
item,
instance_id=args.instance_id,
slug=args.instance_slug,
container_name=args.container_name,
username=args.username,
instance_host=args.instance_host,
):
continue
kept.append(item)
kept.append(_normalize_record(record))
kept.sort(key=lambda item: str(item.get("instance_id", "")))
data["instances"] = kept
return 0
def cmd_remove(args: argparse.Namespace) -> int:
path = Path(args.registry).expanduser()
removed = False
removed_record: dict[str, Any] | None = None
with locked_registry(path) as data:
kept: list[dict[str, Any]] = []
for item in data["instances"]:
if removed:
kept.append(item)
continue
if _match(
item,
instance_id=args.instance_id,
slug=args.slug,
container_name=args.container_name,
username=args.username,
instance_host=args.instance_host,
):
removed = True
removed_record = item
continue
kept.append(item)
data["instances"] = kept
if not removed:
return 1
if args.print_removed and removed_record is not None:
json.dump(removed_record, sys.stdout, indent=2, ensure_ascii=False)
sys.stdout.write("\n")
return 0
def cmd_next_port(args: argparse.Namespace) -> int:
path = Path(args.registry).expanduser()
with locked_registry(path) as data:
reserved = {
int(item.get("host_port", 0))
for item in data["instances"]
if int(item.get("host_port", 0) or 0) > 0
and item.get("instance_id") != (args.exclude_instance_id or "")
}
for port in range(args.start, args.end + 1):
if port in reserved:
continue
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((args.host, port))
except OSError:
continue
print(port)
return 0
print(f"no free port in range {args.start}-{args.end}", file=sys.stderr)
return 1
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Manage app-instance registry.")
parser.set_defaults(func=None)
parser.add_argument("--registry", default=str(DEFAULT_REGISTRY), help="Registry JSON path.")
subparsers = parser.add_subparsers(dest="command")
list_parser = subparsers.add_parser("list", help="List registered instances.")
list_parser.add_argument("--json", action="store_true", help="Output JSON.")
list_parser.set_defaults(func=cmd_list)
get_parser = subparsers.add_parser("get", help="Get one registered instance.")
get_parser.add_argument("--instance-id")
get_parser.add_argument("--slug")
get_parser.add_argument("--container-name")
get_parser.add_argument("--username")
get_parser.add_argument("--instance-host")
get_parser.set_defaults(func=cmd_get)
upsert_parser = subparsers.add_parser("upsert", help="Create or update one registry record.")
upsert_parser.add_argument("--instance-id", required=True)
upsert_parser.add_argument("--instance-slug", required=True)
upsert_parser.add_argument("--container-name", required=True)
upsert_parser.add_argument("--image-name", required=True)
upsert_parser.add_argument("--host-port", required=True, type=int)
upsert_parser.add_argument("--public-url", required=True)
upsert_parser.add_argument("--instance-root", required=True)
upsert_parser.add_argument("--beaver-home", required=True)
upsert_parser.add_argument("--config-path", required=True)
upsert_parser.add_argument("--auth-users-path", required=True)
upsert_parser.add_argument("--network-name", default="")
upsert_parser.add_argument("--backend-id", default="")
upsert_parser.add_argument("--backend-name", default="")
upsert_parser.add_argument("--authz-base-url", default="")
upsert_parser.add_argument("--username", default="")
upsert_parser.add_argument("--email", default="")
upsert_parser.add_argument("--instance-host", default="")
upsert_parser.add_argument("--frontend-base-url", default="")
upsert_parser.add_argument("--api-base-url", default="")
upsert_parser.add_argument("--created-at", required=True)
upsert_parser.set_defaults(func=cmd_upsert)
remove_parser = subparsers.add_parser("remove", help="Remove one registry record.")
remove_parser.add_argument("--instance-id")
remove_parser.add_argument("--slug")
remove_parser.add_argument("--container-name")
remove_parser.add_argument("--username")
remove_parser.add_argument("--instance-host")
remove_parser.add_argument("--print-removed", action="store_true")
remove_parser.set_defaults(func=cmd_remove)
port_parser = subparsers.add_parser("next-port", help="Find next free host port.")
port_parser.add_argument("--host", default="127.0.0.1")
port_parser.add_argument("--start", default=20000, type=int)
port_parser.add_argument("--end", default=29999, type=int)
port_parser.add_argument("--exclude-instance-id", default="")
port_parser.set_defaults(func=cmd_next_port)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.func is None:
parser.print_help()
return 1
return int(args.func(args) or 0)
if __name__ == "__main__":
raise SystemExit(main())