From f720de6b588f0638700f1892bad00b731213646e Mon Sep 17 00:00:00 2001 From: panw Date: Mon, 30 Mar 2026 14:57:51 +0800 Subject: [PATCH] feat: initialize project structure and configuration - Create backend directory structure (app/models, app/schemas, app/services, app/api, app/tasks, tests) - Create frontend directory structure (src/router, src/views, src/components, src/api, src/stores) - Create data directories (ssh_keys, repos) - Add requirements.txt with FastAPI, SQLAlchemy, Pydantic, and testing dependencies - Add frontend package.json with Vue 3, Vue Router, Pinia, and Element Plus - Add .env.example with configuration template - Add .gitignore for Python, data directories, and frontend - Add pytest conftest.py with test fixtures for database and environment Co-Authored-By: Claude Opus 4.6 --- .env.example | 12 + .gitignore | 30 + backend/app/__init__.py | 0 backend/app/api/__init__.py | 0 backend/app/models/__init__.py | 0 backend/app/schemas/__init__.py | 0 backend/app/services/__init__.py | 0 backend/app/tasks/__init__.py | 0 backend/requirements.txt | 14 + backend/tests/__init__.py | 0 backend/tests/conftest.py | 55 + .../plans/2026-03-30-git-repo-manager.md | 3799 +++++++++++++++++ .../2026-03-30-git-repo-manager-design.md | 378 ++ frontend/package.json | 22 + 14 files changed, 4310 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 backend/app/__init__.py create mode 100644 backend/app/api/__init__.py create mode 100644 backend/app/models/__init__.py create mode 100644 backend/app/schemas/__init__.py create mode 100644 backend/app/services/__init__.py create mode 100644 backend/app/tasks/__init__.py create mode 100644 backend/requirements.txt create mode 100644 backend/tests/__init__.py create mode 100644 backend/tests/conftest.py create mode 100644 docs/superpowers/plans/2026-03-30-git-repo-manager.md create mode 100644 docs/superpowers/specs/2026-03-30-git-repo-manager-design.md create mode 100644 frontend/package.json diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..4a510a9 --- /dev/null +++ b/.env.example @@ -0,0 +1,12 @@ +# AES-256 加密密钥 (32字节 base64编码) +GM_ENCRYPT_KEY=change-this-to-a-32-byte-base64-key + +# API 认证 Token +GM_API_TOKEN=change-this-api-token + +# 数据目录 +GM_DATA_DIR=./data + +# 服务器配置 +GM_HOST=0.0.0.0 +GM_PORT=8000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cfa8077 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +.venv/ + +# 数据 +data/ +*.db +*.db-journal + +# 环境变量 +.env + +# IDE +.idea/ +.vscode/ +*.swp + +# 前端 +frontend/node_modules/ +frontend/dist/ +frontend/.vite/ + +# 日志 +*.log diff --git a/backend/app/__init__.py b/backend/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/schemas/__init__.py b/backend/app/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/app/tasks/__init__.py b/backend/app/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..a7e414e --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,14 @@ +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +sqlalchemy==2.0.25 +pydantic==2.5.3 +pydantic-settings==2.1.0 +python-multipart==0.0.6 +apscheduler==3.10.4 +paramiko==3.4.0 +gitpython==3.1.40 +cryptography==41.0.7 +requests==2.31.0 +pytest==7.4.4 +pytest-asyncio==0.23.3 +httpx==0.26.0 diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py new file mode 100644 index 0000000..4ffe1b8 --- /dev/null +++ b/backend/tests/conftest.py @@ -0,0 +1,55 @@ +import os +import sys +import tempfile +import pytest +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from app.models import Base + + +@pytest.fixture(scope="function") +def db_path(tmp_path): + """临时数据库路径.""" + return tmp_path / "test.db" + + +@pytest.fixture(scope="function") +def db_engine(db_path): + """临时数据库引擎.""" + engine = create_engine(f"sqlite:///{db_path}", connect_args={"check_same_thread": False}) + Base.metadata.create_all(engine) + yield engine + engine.dispose() + + +@pytest.fixture(scope="function") +def db_session(db_engine): + """临时数据库会话.""" + SessionLocal = sessionmaker(bind=db_engine, autocommit=False, autoflush=False) + session = SessionLocal() + yield session + session.close() + + +@pytest.fixture(scope="function") +def test_encrypt_key(): + """测试加密密钥.""" + import base64 + return base64.b64encode(b'test-key-32-bytes-long-1234567890').decode() + + +@pytest.fixture(scope="function") +def test_env_vars(db_path, test_encrypt_key, monkeypatch): + """设置测试环境变量.""" + monkeypatch.setenv("GM_ENCRYPT_KEY", test_encrypt_key) + monkeypatch.setenv("GM_API_TOKEN", "test-token") + monkeypatch.setenv("GM_DATA_DIR", str(db_path.parent)) + return { + "GM_ENCRYPT_KEY": test_encrypt_key, + "GM_API_TOKEN": "test-token", + } diff --git a/docs/superpowers/plans/2026-03-30-git-repo-manager.md b/docs/superpowers/plans/2026-03-30-git-repo-manager.md new file mode 100644 index 0000000..8fb100a --- /dev/null +++ b/docs/superpowers/plans/2026-03-30-git-repo-manager.md @@ -0,0 +1,3799 @@ +# Git Repo Manager 实施计划 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**目标:** 构建面向小团队的 Git 仓库同步管理工具,支持多 Gitea 服务器、Web UI、定时同步和 SSH 密钥管理。 + +**架构:** 单进程 FastAPI 应用 + 内嵌 Vue 3 SPA 前端,SQLite 持久化,APScheduler 定时任务,Git/SSH 仓库操作。 + +**技术栈:** Python 3.11+, FastAPI, SQLAlchemy, Vue 3, Element Plus, Pinia, APScheduler, Paramiko, GitPython + +--- + +## 目录结构 + +``` +git-manager/ +├── backend/ +│ ├── app/ +│ │ ├── __init__.py +│ │ ├── main.py # FastAPI 入口 +│ │ ├── config.py # 配置管理 +│ │ ├── database.py # SQLAlchemy (Base + Engine + Session) +│ │ ├── security.py # 加密 + 认证 +│ │ ├── models/ # ORM 模型 +│ │ │ ├── __init__.py # 导出 Base 和所有模型 +│ │ │ ├── ssh_key.py +│ │ │ ├── server.py +│ │ │ ├── repo.py +│ │ │ └── sync_log.py +│ │ ├── schemas/ # Pydantic 模型 +│ │ │ ├── __init__.py +│ │ │ ├── common.py +│ │ │ ├── ssh_key.py +│ │ │ ├── server.py +│ │ │ ├── repo.py +│ │ │ └── sync_log.py +│ │ ├── services/ # 业务逻辑 +│ │ │ ├── __init__.py +│ │ │ ├── ssh_key_service.py +│ │ │ ├── server_service.py +│ │ │ ├── repo_service.py +│ │ │ └── sync_service.py +│ │ ├── api/ # API 路由 +│ │ │ ├── __init__.py +│ │ │ ├── deps.py # 依赖注入 +│ │ │ ├── ssh_keys.py +│ │ │ ├── servers.py +│ │ │ ├── repos.py +│ │ │ ├── sync_logs.py +│ │ │ ├── schedules.py +│ │ │ └── status.py +│ │ └── tasks/ # 定时任务 +│ │ ├── __init__.py +│ │ └── sync_task.py +│ ├── tests/ +│ │ ├── __init__.py +│ │ ├── conftest.py # pytest fixtures +│ │ ├── test_config.py +│ │ ├── test_security.py +│ │ ├── test_models/ +│ │ ├── test_schemas/ +│ │ ├── test_services/ +│ │ └── test_api/ +│ ├── init_db.py +│ └── requirements.txt +├── frontend/ +│ ├── src/ +│ │ ├── main.js +│ │ ├── App.vue +│ │ ├── router/ +│ │ │ └── index.js +│ │ ├── views/ +│ │ │ ├── Dashboard.vue +│ │ │ ├── Servers.vue +│ │ │ ├── Repos.vue +│ │ │ ├── SyncLogs.vue +│ │ │ ├── SshKeys.vue +│ │ │ └── Settings.vue +│ │ ├── components/ +│ │ │ ├── ServerForm.vue +│ │ │ ├── RepoSyncStatus.vue +│ │ │ └── CommitHistory.vue +│ │ ├── api/ +│ │ │ ├── index.js # Axios 实例 + 拦截器 +│ │ │ ├── servers.js +│ │ │ ├── repos.js +│ │ │ ├── sshKeys.js +│ │ │ └── syncLogs.js +│ │ └── stores/ +│ │ ├── app.js +│ │ ├── servers.js +│ │ └── repos.js +│ ├── index.html +│ ├── vite.config.js +│ └── package.json +├── data/ # 运行时目录 (gitignore) +│ ├── git_manager.db +│ ├── ssh_keys/ +│ └── repos/ +├── .env.example +├── .gitignore +└── README.md +``` + +--- + +## Phase 1: 项目基础设施 + +### Task 1.1: 创建项目结构和配置文件 + +**文件:** +- 创建: `backend/requirements.txt` +- 创建: `backend/tests/__init__.py` +- 创建: `backend/tests/conftest.py` +- 创建: `backend/app/__init__.py` +- 创建: `backend/app/models/__init__.py` +- 创建: `backend/app/schemas/__init__.py` +- 创建: `backend/app/services/__init__.py` +- 创建: `backend/app/api/__init__.py` +- 创建: `backend/app/tasks/__init__.py` +- 创建: `frontend/` 目录结构 +- 创建: `data/ssh_keys/` 和 `data/repos/` +- 创建: `.env.example` +- 创建: `.gitignore` + +- [ ] **Step 1: 创建目录结构** + +```bash +# 后端目录 +mkdir -p backend/app/{models,schemas,services,api,tasks} +mkdir -p backend/tests/{test_models,test_schemas,test_services,test_api} + +# 前端目录 +mkdir -p frontend/src/{router,views,components,api,stores} + +# 数据目录 +mkdir -p data/ssh_keys data/repos + +# 创建 __init__.py +touch backend/app/__init__.py +touch backend/app/models/__init__.py +touch backend/app/schemas/__init__.py +touch backend/app/services/__init__.py +touch backend/app/api/__init__.py +touch backend/app/tasks/__init__.py +touch backend/tests/__init__.py +``` + +- [ ] **Step 2: 创建 requirements.txt** + +`backend/requirements.txt`: +```txt +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +sqlalchemy==2.0.25 +pydantic==2.5.3 +pydantic-settings==2.1.0 +python-multipart==0.0.6 +apscheduler==3.10.4 +paramiko==3.4.0 +gitpython==3.1.40 +cryptography==41.0.7 +requests==2.31.0 +pytest==7.4.4 +pytest-asyncio==0.23.3 +httpx==0.26.0 +``` + +- [ ] **Step 3: 创建 frontend/package.json** + +`frontend/package.json`: +```json +{ + "name": "git-manager-frontend", + "version": "1.0.0", + "type": "module", + "scripts": { + "dev": "vite", + "build": "vite build", + "preview": "vite preview" + }, + "dependencies": { + "vue": "^3.4.15", + "vue-router": "^4.2.5", + "pinia": "^2.1.7", + "axios": "^1.6.5", + "element-plus": "^2.5.4", + "@element-plus/icons-vue": "^2.3.1" + }, + "devDependencies": { + "@vitejs/plugin-vue": "^5.0.3", + "vite": "^5.0.11" + } +} +``` + +- [ ] **Step 4: 创建 .env.example** + +`.env.example`: +```env +# AES-256 加密密钥 (32字节 base64编码) +GM_ENCRYPT_KEY=change-this-to-a-32-byte-base64-key + +# API 认证 Token +GM_API_TOKEN=change-this-api-token + +# 数据目录 +GM_DATA_DIR=./data + +# 服务器配置 +GM_HOST=0.0.0.0 +GM_PORT=8000 +``` + +- [ ] **Step 5: 创建 .gitignore** + +`.gitignore`: +``` +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +.venv/ + +# 数据 +data/ +*.db +*.db-journal + +# 环境变量 +.env + +# IDE +.idea/ +.vscode/ +*.swp + +# 前端 +frontend/node_modules/ +frontend/dist/ +frontend/.vite/ + +# 日志 +*.log +``` + +- [ ] **Step 6: 创建测试 conftest.py** + +`backend/tests/conftest.py`: +```python +import os +import sys +import tempfile +import pytest +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from app.models import Base + + +@pytest.fixture(scope="function") +def db_path(tmp_path): + """临时数据库路径.""" + return tmp_path / "test.db" + + +@pytest.fixture(scope="function") +def db_engine(db_path): + """临时数据库引擎.""" + engine = create_engine(f"sqlite:///{db_path}", connect_args={"check_same_thread": False}) + Base.metadata.create_all(engine) + yield engine + engine.dispose() + + +@pytest.fixture(scope="function") +def db_session(db_engine): + """临时数据库会话.""" + SessionLocal = sessionmaker(bind=db_engine, autocommit=False, autoflush=False) + session = SessionLocal() + yield session + session.close() + + +@pytest.fixture(scope="function") +def test_encrypt_key(): + """测试加密密钥.""" + import base64 + return base64.b64encode(b'test-key-32-bytes-long-1234567890').decode() + + +@pytest.fixture(scope="function") +def test_env_vars(db_path, test_encrypt_key, monkeypatch): + """设置测试环境变量.""" + monkeypatch.setenv("GM_ENCRYPT_KEY", test_encrypt_key) + monkeypatch.setenv("GM_API_TOKEN", "test-token") + monkeypatch.setenv("GM_DATA_DIR", str(db_path.parent)) + return { + "GM_ENCRYPT_KEY": test_encrypt_key, + "GM_API_TOKEN": "test-token", + } +``` + +- [ ] **Step 7: 提交** + +```bash +git add . +git commit -m "feat: initialize project structure and configuration" +``` + +--- + +### Task 1.2: 配置管理模块 + +**文件:** +- 创建: `backend/app/config.py` +- 创建: `backend/tests/test_config.py` + +- [ ] **Step 1: 编写配置测试** + +`backend/tests/test_config.py`: +```python +import os +import pytest +from pathlib import Path + +def test_config_defaults(test_env_vars): + """测试配置默认值.""" + from app.config import settings + + assert settings.data_dir == Path('./data') + assert settings.host == '0.0.0.0' + assert settings.port == 8000 + +def test_config_from_env(monkeypatch): + """测试从环境变量读取配置.""" + monkeypatch.setenv("GM_DATA_DIR", "/custom/data") + monkeypatch.setenv("GM_PORT", "9000") + + # 重新加载配置 + from app.config import Settings + settings = Settings() + + assert settings.data_dir == Path('/custom/data') + assert settings.port == 9000 +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_config.py -v +``` + +预期: FAIL - "cannot import name 'settings' from 'app.config'" + +- [ ] **Step 3: 实现配置模块** + +`backend/app/config.py`: +```python +import os +from pathlib import Path +from typing import Literal +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """应用配置,从环境变量加载.""" + + # 安全配置 + encrypt_key: str # AES-256 密钥 (base64) + api_token: str # API 认证 Token + + # 路径配置 + data_dir: Path = Path('./data') + + # 服务器配置 + host: str = '0.0.0.0' + port: int = 8000 + + model_config = SettingsConfigDict( + env_prefix='GM_', + env_file='.env', + env_file_encoding='utf-8', + ) + + @property + def db_path(self) -> Path: + """SQLite 数据库路径.""" + return self.data_dir / 'git_manager.db' + + @property + def ssh_keys_dir(self) -> Path: + """SSH 密钥存储目录.""" + return self.data_dir / 'ssh_keys' + + @property + def repos_dir(self) -> Path: + """仓库镜像存储目录.""" + return self.data_dir / 'repos' + + +settings = Settings() +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_config.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/config.py backend/tests/test_config.py +git commit -m "feat: add configuration management" +``` + +--- + +### Task 1.3: 安全模块 (加密 + 认证) + +**文件:** +- 创建: `backend/app/security.py` +- 创建: `backend/tests/test_security.py` + +- [ ] **Step 1: 编写安全模块测试** + +`backend/tests/test_security.py`: +```python +import base64 +import pytest + +def test_encrypt_decrypt_roundtrip(): + """测试加密解密往返.""" + from app.security import encrypt_data, decrypt_data + + original = b"sensitive-secret-data" + key = base64.b64encode(b'test-key-32-bytes-long-1234567890').decode() + + encrypted = encrypt_data(original, key) + assert encrypted != original + assert isinstance(encrypted, bytes) + assert len(encrypted) > len(original) # nonce + ciphertext + tag + + decrypted = decrypt_data(encrypted, key) + assert decrypted == original + +def test_verify_api_token_success(test_env_vars): + """测试 API token 验证成功.""" + from app.security import verify_api_token + + assert verify_api_token("Bearer test-token") is True + +def test_verify_api_token_failure(): + """测试 API token 验证失败.""" + from app.security import verify_api_token + + assert verify_api_token("Bearer wrong-token") is False + assert verify_api_token(None) is False + assert verify_api_token("Basic token") is False +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_security.py -v +``` + +预期: FAIL - "cannot import name 'encrypt_data' from 'app.security'" + +- [ ] **Step 3: 实现安全模块** + +`backend/app/security.py`: +```python +import base64 +import os +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend + + +def encrypt_data(data: bytes, key_b64: str) -> bytes: + """ + 使用 AES-256-GCM 加密数据. + + Args: + data: 原始数据 + key_b64: Base64 编码的加密密钥 + + Returns: + 加密后的数据 (nonce + ciphertext + tag) + """ + key = base64.b64decode(key_b64) + nonce = os.urandom(12) # GCM 96-bit nonce + + cipher = Cipher( + algorithms.AES(key), + modes.GCM(nonce), + backend=default_backend() + ) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(data) + encryptor.finalize() + + # 返回 nonce + ciphertext + tag + return nonce + ciphertext + encryptor.tag + + +def decrypt_data(encrypted_data: bytes, key_b64: str) -> bytes: + """ + 解密使用 encrypt_data 加密的数据. + + Args: + encrypted_data: 加密数据 (nonce + ciphertext + tag) + key_b64: Base64 编码的加密密钥 + + Returns: + 解密后的原始数据 + """ + key = base64.b64decode(key_b64) + nonce = encrypted_data[:12] + tag = encrypted_data[-16:] + ciphertext = encrypted_data[12:-16] + + cipher = Cipher( + algorithms.AES(key), + modes.GCM(nonce, tag), + backend=default_backend() + ) + decryptor = cipher.decryptor() + return decryptor.update(ciphertext) + decryptor.finalize() + + +def verify_api_token(authorization: str | None) -> bool: + """ + 验证 Bearer Token. + + Args: + authorization: Authorization 头部值 + + Returns: + Token 是否有效 + """ + if not authorization: + return False + + if not authorization.startswith('Bearer '): + return False + + from app.config import settings + token = authorization[7:] # 移除 'Bearer ' 前缀 + return token == settings.api_token +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_security.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/security.py backend/tests/test_security.py +git commit -m "feat: add security module (encryption + auth)" +``` + +--- + +### Task 1.4: 数据库模块 + +**文件:** +- 创建: `backend/app/database.py` +- 创建: `backend/init_db.py` +- 创建: `backend/tests/test_database.py` + +- [ ] **Step 1: 编写数据库测试** + +`backend/tests/test_database.py`: +```python +from pathlib import Path + +def test_database_initialization(db_path): + """测试数据库初始化.""" + from app.database import init_db, get_engine, Base + + init_db(db_path) + + assert db_path.exists() + + engine = get_engine() + assert engine is not None + + # 创建所有表 + Base.metadata.create_all(engine) + assert True # 如果没有异常则成功 + +def test_get_session(db_path): + """测试获取数据库会话.""" + from app.database import init_db, get_db + + init_db(db_path) + + with get_db(db_path) as session: + assert session is not None +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_database.py -v +``` + +预期: FAIL - "cannot import name 'init_db' from 'app.database'" + +- [ ] **Step 3: 实现数据库模块** + +`backend/app/database.py`: +```python +from sqlalchemy import create_engine +from sqlalchemy.orm import DeclarativeBase, sessionmaker, Session +from pathlib import Path +from contextlib import contextmanager +from typing import Generator + + +class Base(DeclarativeBase): + """SQLAlchemy 声明基类.""" + pass + + +_engine = None +_session_factory = None + + +def init_db(db_path: Path) -> None: + """ + 初始化数据库引擎和会话工厂. + + Args: + db_path: SQLite 数据库文件路径 + """ + global _engine, _session_factory + + # 确保父目录存在 + db_path.parent.mkdir(parents=True, exist_ok=True) + + # 创建引擎 + _engine = create_engine( + f'sqlite:///{db_path}', + connect_args={'check_same_thread': False} + ) + _session_factory = sessionmaker(bind=_engine, autocommit=False, autoflush=False) + + +def get_engine(): + """获取数据库引擎.""" + return _engine + + +def get_session_factory(): + """获取会话工厂.""" + return _session_factory + + +@contextmanager +def get_db(db_path: Path | None = None) -> Generator[Session, None, None]: + """ + 获取数据库会话. + + Args: + db_path: 可选,用于初始化数据库 + + Yields: + SQLAlchemy 会话 + """ + if db_path and _engine is None: + init_db(db_path) + + if _session_factory is None: + raise RuntimeError("Database not initialized. Call init_db() first.") + + session = _session_factory() + try: + yield session + finally: + session.close() +``` + +`backend/init_db.py`: +```python +#!/usr/bin/env python3 +""" +数据库初始化脚本. +创建所有表和必要的目录. +""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) + +from app.config import settings +from app.database import init_db +from app.models import Base + + +def main(): + """初始化数据库.""" + print(f"初始化数据库: {settings.db_path}") + + # 创建目录 + settings.data_dir.mkdir(parents=True, exist_ok=True) + settings.ssh_keys_dir.mkdir(parents=True, exist_ok=True) + settings.repos_dir.mkdir(parents=True, exist_ok=True) + + # 初始化数据库 + init_db(settings.db_path) + + # 创建所有表 + from app.database import get_engine + Base.metadata.create_all(get_engine()) + + print("数据库初始化成功!") + print(f" - 数据库: {settings.db_path}") + print(f" - SSH 密钥: {settings.ssh_keys_dir}") + print(f" - 仓库: {settings.repos_dir}") + + +if __name__ == '__main__': + main() +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_database.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/database.py backend/init_db.py backend/tests/test_database.py +git commit -m "feat: add database module" +``` + +--- + +## Phase 2: 数据模型 + +### Task 2.1: 创建所有 ORM 模型 + +**文件:** +- 创建: `backend/app/models/ssh_key.py` +- 创建: `backend/app/models/server.py` +- 创建: `backend/app/models/repo.py` +- 创建: `backend/app/models/sync_log.py` +- 修改: `backend/app/models/__init__.py` + +- [ ] **Step 1: 编写模型测试** + +`backend/tests/test_models/test_models.py`: +```python +from datetime import datetime +import base64 + +def test_ssh_key_model(db_session): + """测试 SshKey 模型.""" + from app.models.ssh_key import SshKey + + key = SshKey( + name='test-key', + private_key=b'encrypted-key-data', + fingerprint='SHA256:abc123' + ) + db_session.add(key) + db_session.commit() + + assert key.id is not None + assert key.created_at is not None + +def test_server_model(db_session): + """测试 Server 模型.""" + from app.models.server import Server + from app.models.ssh_key import SshKey + + # 先创建 SSH key + ssh_key = SshKey( + name='test-key', + private_key=b'encrypted', + fingerprint='SHA256:abc' + ) + db_session.add(ssh_key) + db_session.flush() + + server = Server( + name='test-gitea', + url='https://gitea.example.com', + api_token=b'encrypted-token', + ssh_key_id=ssh_key.id, + sync_enabled=True, + schedule_cron='0 */2 * * *', + local_path='/data/repos/test-gitea', + status='untested' + ) + db_session.add(server) + db_session.commit() + + assert server.id is not None + assert server.ssh_key.id == ssh_key.id + +def test_repo_model(db_session): + """测试 Repo 模型.""" + from app.models.repo import Repo + from app.models.server import Server + from app.models.ssh_key import SshKey + + # 创建关联数据 + ssh_key = SshKey(name='k', private_key=b'x', fingerprint='y') + db_session.add(ssh_key) + db_session.flush() + + server = Server( + name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id, + sync_enabled=True, schedule_cron='x', local_path='p', status='x' + ) + db_session.add(server) + db_session.flush() + + repo = Repo( + server_id=server.id, + name='my-repo', + full_name='owner/my-repo', + clone_url='git@gitea.com:owner/repo.git', + local_path='/data/r.git', + status='pending' + ) + db_session.add(repo) + db_session.commit() + + assert repo.id is not None + assert repo.server.id == server.id + +def test_sync_log_model(db_session): + """测试 SyncLog 模型.""" + from app.models.sync_log import SyncLog + from app.models.repo import Repo + from app.models.server import Server + from app.models.ssh_key import SshKey + + # 创建关联数据链 + ssh_key = SshKey(name='k', private_key=b'x', fingerprint='y') + db_session.add(ssh_key) + db_session.flush() + + server = Server( + name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id, + sync_enabled=True, schedule_cron='x', local_path='p', status='x' + ) + db_session.add(server) + db_session.flush() + + repo = Repo( + server_id=server.id, name='r', full_name='f', clone_url='u', + local_path='p', status='s' + ) + db_session.add(repo) + db_session.flush() + + log = SyncLog( + repo_id=repo.id, + status='synced', + started_at=datetime.utcnow(), + finished_at=datetime.utcnow(), + commits_count=5 + ) + db_session.add(log) + db_session.commit() + + assert log.id is not None + assert log.repo.id == repo.id +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_models/test_models.py -v +``` + +预期: FAIL - "cannot import name 'SshKey' from 'app.models.ssh_key'" + +- [ ] **Step 3: 实现 SshKey 模型** + +`backend/app/models/ssh_key.py`: +```python +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text +from app.database import Base + + +class SshKey(Base): + """SSH 私钥存储模型.""" + + __tablename__ = 'ssh_keys' + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String(100), nullable=False, unique=True) + private_key = Column(Text, nullable=False) # AES-256 加密 + fingerprint = Column(String(64), nullable=False) + created_at = Column(Integer, nullable=False) # Unix timestamp + + def __repr__(self): + return f'' +``` + +- [ ] **Step 4: 实现 Server 模型** + +`backend/app/models/server.py`: +```python +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey +from sqlalchemy.orm import relationship +from app.database import Base + + +class Server(Base): + """Gitea 服务器配置模型.""" + + __tablename__ = 'servers' + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String(100), nullable=False, unique=True) + url = Column(String(500), nullable=False) + api_token = Column(Text, nullable=False) # AES-256 加密 + ssh_key_id = Column(Integer, ForeignKey('ssh_keys.id'), nullable=False) + sync_enabled = Column(Boolean, default=False, nullable=False) + schedule_cron = Column(String(50), nullable=False) + local_path = Column(String(500), nullable=False) + status = Column(String(20), default='untested', nullable=False) + created_at = Column(Integer, nullable=False) + updated_at = Column(Integer, nullable=False) + + # 关系 + ssh_key = relationship('SshKey', backref='servers') + repos = relationship('Repo', back_populates='server', cascade='all, delete-orphan') + + def __repr__(self): + return f'' +``` + +- [ ] **Step 5: 实现 Repo 模型** + +`backend/app/models/repo.py`: +```python +from datetime import datetime +from sqlalchemy import Column, Integer, String, ForeignKey +from sqlalchemy.orm import relationship +from app.database import Base + + +class Repo(Base): + """Git 仓库镜像模型.""" + + __tablename__ = 'repos' + + id = Column(Integer, primary_key=True, autoincrement=True) + server_id = Column(Integer, ForeignKey('servers.id'), nullable=False) + name = Column(String(200), nullable=False) + full_name = Column(String(300), nullable=False) + clone_url = Column(String(500), nullable=False) + local_path = Column(String(500), nullable=False) + last_sync_at = Column(Integer, nullable=True) + status = Column(String(20), default='pending', nullable=False) + created_at = Column(Integer, nullable=False) + + # 关系 + server = relationship('Server', back_populates='repos') + sync_logs = relationship('SyncLog', back_populates='repo', cascade='all, delete-orphan') + + def __repr__(self): + return f'' +``` + +- [ ] **Step 6: 实现 SyncLog 模型** + +`backend/app/models/sync_log.py`: +```python +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, ForeignKey +from sqlalchemy.orm import relationship +from app.database import Base + + +class SyncLog(Base): + """仓库同步操作日志模型.""" + + __tablename__ = 'sync_logs' + + id = Column(Integer, primary_key=True, autoincrement=True) + repo_id = Column(Integer, ForeignKey('repos.id'), nullable=False) + status = Column(String(20), nullable=False) + started_at = Column(Integer, nullable=False) + finished_at = Column(Integer, nullable=True) + commits_count = Column(Integer, nullable=True) + error_msg = Column(Text, nullable=True) + created_at = Column(Integer, nullable=False) + + # 关系 + repo = relationship('Repo', back_populates='sync_logs') + + def __repr__(self): + return f'' +``` + +- [ ] **Step 7: 更新 models/__init__.py** + +`backend/app/models/__init__.py`: +```python +from app.database import Base +from app.models.ssh_key import SshKey +from app.models.server import Server +from app.models.repo import Repo +from app.models.sync_log import SyncLog + +__all__ = ['Base', 'SshKey', 'Server', 'Repo', 'SyncLog'] +``` + +- [ ] **Step 8: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_models/test_models.py -v +``` + +预期: PASS + +- [ ] **Step 9: 提交** + +```bash +git add backend/app/models/ backend/tests/test_models/ +git commit -m "feat: add all ORM models" +``` + +--- + +## Phase 3: Pydantic Schemas + +### Task 3.1: 创建所有 Pydantic 模型 + +**文件:** +- 创建: `backend/app/schemas/common.py` +- 创建: `backend/app/schemas/ssh_key.py` +- 创建: `backend/app/schemas/server.py` +- 创建: `backend/app/schemas/repo.py` +- 创建: `backend/app/schemas/sync_log.py` +- 修改: `backend/app/schemas/__init__.py` + +- [ ] **Step 1: 编写 Schemas 测试** + +`backend/tests/test_schemas/test_schemas.py`: +```python +from datetime import datetime + +def test_common_schemas(): + """测试通用响应格式.""" + from app.schemas.common import SuccessResponse, ErrorResponse + + resp = SuccessResponse(data={'key': 'value'}, message='OK') + assert resp.code == 0 + assert resp.data == {'key': 'value'} + + err = ErrorResponse(message='Not found', code=404) + assert err.code == 404 + assert err.message == 'Not found' + +def test_ssh_key_schemas(): + """测试 SSH key schemas.""" + from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse + + create = SshKeyCreate( + name='test-key', + private_key='ssh-rsa AAAA...' + ) + assert create.name == 'test-key' + + resp = SshKeyResponse( + id=1, + name='test-key', + fingerprint='SHA256:abc', + servers_count=1, + created_at=int(datetime.utcnow().timestamp()) + ) + assert resp.servers_count == 1 + +def test_server_schemas(): + """测试 Server schemas.""" + from app.schemas.server import ServerCreate, ServerResponse + + create = ServerCreate( + name='test', + url='https://gitea.com', + api_token='token', + ssh_key_id=1 + ) + assert create.sync_enabled is False # 默认值 + + resp = ServerResponse( + id=1, + name='test', + url='https://gitea.com', + sync_enabled=True, + schedule_cron='0 */2 * * *', + local_path='/data/r', + status='connected', + repos_count=5, + ssh_key_name='key', + created_at=int(datetime.utcnow().timestamp()), + updated_at=int(datetime.utcnow().timestamp()) + ) + assert resp.repos_count == 5 + +def test_repo_schemas(): + """测试 Repo schemas.""" + from app.schemas.repo import RepoResponse, CommitInfo + + resp = RepoResponse( + id=1, + server_id=1, + server_name='gitea', + name='repo', + full_name='owner/repo', + clone_url='git@u:r.git', + local_path='/p', + status='synced', + created_at=int(datetime.utcnow().timestamp()) + ) + assert resp.full_name == 'owner/repo' + + commit = CommitInfo( + hash='abc', + message='Fix', + author='Bob', + date=int(datetime.utcnow().timestamp()) + ) + assert commit.hash == 'abc' + +def test_sync_log_schemas(): + """测试 SyncLog schemas.""" + from app.schemas.sync_log import SyncLogResponse + + resp = SyncLogResponse( + id=1, + repo_id=1, + repo_name='owner/repo', + status='synced', + started_at=int(datetime.utcnow().timestamp()), + finished_at=int(datetime.utcnow().timestamp()), + commits_count=5, + created_at=int(datetime.utcnow().timestamp()) + ) + assert resp.commits_count == 5 +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_schemas/test_schemas.py -v +``` + +预期: FAIL - "cannot import..." + +- [ ] **Step 3: 实现所有 Schemas** + +`backend/app/schemas/common.py`: +```python +from typing import Generic, TypeVar, Optional +from pydantic import BaseModel, Field + +T = TypeVar('T') + +class SuccessResponse(BaseModel, Generic[T]): + """标准成功响应.""" + code: int = Field(default=0, description="状态码,0表示成功") + data: Optional[T] = Field(default=None, description="响应数据") + message: str = Field(default='success', description="响应消息") + +class ErrorResponse(BaseModel): + """标准错误响应.""" + code: int = Field(..., description="错误码") + message: str = Field(..., description="错误消息") + data: Optional[dict] = Field(default=None, description="额外错误数据") +``` + +`backend/app/schemas/ssh_key.py`: +```python +from typing import Optional +from pydantic import BaseModel, Field + +class SshKeyCreate(BaseModel): + """创建 SSH key 请求.""" + name: str = Field(..., min_length=1, max_length=100) + private_key: str = Field(..., description="私钥内容") + password: Optional[str] = Field(default=None, description="密钥密码") + +class SshKeyResponse(BaseModel): + """SSH key 响应.""" + id: int + name: str + fingerprint: str + servers_count: int = 0 + created_at: int # Unix timestamp +``` + +`backend/app/schemas/server.py`: +```python +from typing import Optional +from pydantic import BaseModel, Field + +class ServerCreate(BaseModel): + """创建服务器请求.""" + name: str = Field(..., min_length=1, max_length=100) + url: str = Field(..., description="Gitea 服务器 URL") + api_token: str = Field(..., description="Gitea API Token") + ssh_key_id: int = Field(..., description="SSH key ID") + sync_enabled: bool = Field(default=False) + schedule_cron: str = Field(default='0 */2 * * *') + +class ServerUpdate(BaseModel): + """更新服务器请求.""" + name: Optional[str] = Field(None, min_length=1, max_length=100) + url: Optional[str] = None + api_token: Optional[str] = None + ssh_key_id: Optional[int] = None + sync_enabled: Optional[bool] = None + schedule_cron: Optional[str] = None + +class ServerResponse(BaseModel): + """服务器响应.""" + id: int + name: str + url: str + sync_enabled: bool + schedule_cron: str + local_path: str + status: str + repos_count: int = 0 + ssh_key_name: str + created_at: int + updated_at: int +``` + +`backend/app/schemas/repo.py`: +```python +from typing import Optional, List +from pydantic import BaseModel + +class CommitInfo(BaseModel): + """提交信息.""" + hash: str + message: str + author: str + date: int # Unix timestamp + +class RepoResponse(BaseModel): + """仓库响应.""" + id: int + server_id: int + server_name: str + name: str + full_name: str + clone_url: str + local_path: str + status: str + last_sync_at: Optional[int] = None + created_at: int +``` + +`backend/app/schemas/sync_log.py`: +```python +from typing import Optional +from pydantic import BaseModel + +class SyncLogResponse(BaseModel): + """同步日志响应.""" + id: int + repo_id: int + repo_name: str + status: str + started_at: int + finished_at: Optional[int] = None + commits_count: Optional[int] = None + error_msg: Optional[str] = None + created_at: int +``` + +`backend/app/schemas/__init__.py`: +```python +from app.schemas.common import SuccessResponse, ErrorResponse +from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse +from app.schemas.server import ServerCreate, ServerUpdate, ServerResponse +from app.schemas.repo import RepoResponse, CommitInfo +from app.schemas.sync_log import SyncLogResponse + +__all__ = [ + 'SuccessResponse', 'ErrorResponse', + 'SshKeyCreate', 'SshKeyResponse', + 'ServerCreate', 'ServerUpdate', 'ServerResponse', + 'RepoResponse', 'CommitInfo', + 'SyncLogResponse', +] +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_schemas/test_schemas.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/schemas/ backend/tests/test_schemas/ +git commit -m "feat: add all Pydantic schemas" +``` + +--- + +## Phase 4: 服务层 + +### Task 4.1: SSH Key 服务 + +**文件:** +- 创建: `backend/app/services/ssh_key_service.py` +- 创建: `backend/tests/test_services/test_ssh_key_service.py` + +- [ ] **Step 1: 编写服务测试** + +`backend/tests/test_services/test_ssh_key_service.py`: +```python +import base64 +import pytest + +def test_create_ssh_key(db_session, test_encrypt_key): + """测试创建 SSH key.""" + from app.services.ssh_key_service import SshKeyService + from app.models.ssh_key import SshKey + + service = SshKeyService(db_session, test_encrypt_key) + ssh_key = service.create_ssh_key( + name='test-key', + private_key='ssh-rsa AAAAB3...' + ) + + assert ssh_key.id is not None + assert ssh_key.name == 'test-key' + assert ssh_key.fingerprint is not None + +def test_list_ssh_keys(db_session, test_encrypt_key): + """测试列出 SSH keys.""" + from app.services.ssh_key_service import SshKeyService + + service = SshKeyService(db_session, test_encrypt_key) + service.create_ssh_key('key1', 'ssh-rsa AAAA...') + service.create_ssh_key('key2', 'ssh-rsa BBBB...') + + keys = service.list_ssh_keys() + assert len(keys) == 2 + +def test_delete_ssh_key(db_session, test_encrypt_key): + """测试删除 SSH key.""" + from app.services.ssh_key_service import SshKeyService + from app.models.ssh_key import SshKey + + service = SshKeyService(db_session, test_encrypt_key) + ssh_key = service.create_ssh_key('test', 'ssh-rsa AAAA...') + key_id = ssh_key.id + + service.delete_ssh_key(key_id) + + deleted = db_session.query(SshKey).filter(SshKey.id == key_id).first() + assert deleted is None + +def test_delete_in_use_key_fails(db_session, test_encrypt_key): + """测试删除使用中的 key 失败.""" + from app.services.ssh_key_service import SshKeyService + from app.models.server import Server + + service = SshKeyService(db_session, test_encrypt_key) + ssh_key = service.create_ssh_key('test', 'ssh-rsa AAAA...') + + # 关联服务器 + server = Server( + name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id, + sync_enabled=True, schedule_cron='x', local_path='p', + status='x', created_at=0, updated_at=0 + ) + db_session.add(server) + db_session.commit() + + with pytest.raises(ValueError, match="in use"): + service.delete_ssh_key(ssh_key.id) + +def test_get_decrypted_key(db_session, test_encrypt_key): + """测试获取解密的 key.""" + from app.services.ssh_key_service import SshKeyService + + service = SshKeyService(db_session, test_encrypt_key) + original_key = 'ssh-rsa AAAAB3...' + ssh_key = service.create_ssh_key('test', original_key) + + decrypted = service.get_decrypted_key(ssh_key.id) + assert decrypted == original_key +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_services/test_ssh_key_service.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现 SSH Key 服务** + +`backend/app/services/ssh_key_service.py`: +```python +import base64 +import hashlib +import time +from typing import List, Optional +from sqlalchemy.orm import Session + +from app.models.ssh_key import SshKey +from app.security import encrypt_data, decrypt_data + + +class SshKeyService: + """SSH key 操作服务.""" + + def __init__(self, db: Session, encryption_key: str): + self.db = db + self.encryption_key = encryption_key + + def create_ssh_key(self, name: str, private_key: str, password: str | None = None) -> SshKey: + """创建新的 SSH key.""" + # 检查名称是否已存在 + existing = self.db.query(SshKey).filter(SshKey.name == name).first() + if existing: + raise ValueError(f"SSH key '{name}' already exists") + + # 加密私钥 + encrypted = encrypt_data(private_key.encode(), self.encryption_key) + + # 生成指纹 + fingerprint = self._generate_fingerprint(private_key) + + ssh_key = SshKey( + name=name, + private_key=encrypted, + fingerprint=fingerprint, + created_at=int(time.time()) + ) + + self.db.add(ssh_key) + self.db.commit() + self.db.refresh(ssh_key) + + return ssh_key + + def list_ssh_keys(self) -> List[SshKey]: + """列出所有 SSH keys.""" + return self.db.query(SshKey).all() + + def get_ssh_key(self, key_id: int) -> Optional[SshKey]: + """获取 SSH key.""" + return self.db.query(SshKey).filter(SshKey.id == key_id).first() + + def delete_ssh_key(self, key_id: int) -> bool: + """删除 SSH key.""" + ssh_key = self.get_ssh_key(key_id) + if not ssh_key: + return False + + # 检查是否在使用中 + if ssh_key.servers: + raise ValueError(f"Cannot delete SSH key used by {len(ssh_key.servers)} server(s)") + + self.db.delete(ssh_key) + self.db.commit() + return True + + def get_decrypted_key(self, key_id: int) -> str: + """获取解密后的私钥.""" + ssh_key = self.get_ssh_key(key_id) + if not ssh_key: + raise ValueError(f"SSH key {key_id} not found") + + decrypted = decrypt_data(ssh_key.private_key, self.encryption_key) + return decrypted.decode() + + def _generate_fingerprint(self, public_key: str) -> str: + """生成 SSH key 指纹.""" + try: + key_data = public_key.encode() + hash_obj = hashlib.sha256(key_data) + return f"SHA256:{base64.b64encode(hash_obj.digest()).decode()[:16]}" + except Exception: + return "SHA256:unknown" +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_services/test_ssh_key_service.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/services/ssh_key_service.py backend/tests/test_services/test_ssh_key_service.py +git commit -m "feat: add SSH key service" +``` + +--- + +### Task 4.2: Server 服务 + +**文件:** +- 创建: `backend/app/services/server_service.py` +- 创建: `backend/tests/test_services/test_server_service.py` + +- [ ] **Step 1: 编写 Server 服务测试** + +`backend/tests/test_services/test_server_service.py`: +```python +import base64 +import time +import pytest + +def test_create_server(db_session, test_encrypt_key): + """测试创建服务器.""" + from app.services.server_service import ServerService + from app.services.ssh_key_service import SshKeyService + + # 先创建 SSH key + ssh_svc = SshKeyService(db_session, test_encrypt_key) + ssh_key = ssh_svc.create_ssh_key('test-key', 'ssh-rsa AAAA...') + + # 创建服务器 + svc = ServerService(db_session, test_encrypt_key) + server = svc.create_server( + name='test-gitea', + url='https://gitea.example.com', + api_token='gitea-token', + ssh_key_id=ssh_key.id + ) + + assert server.id is not None + assert server.name == 'test-gitea' + assert server.status == 'untested' + +def test_list_servers(db_session, test_encrypt_key): + """测试列出服务器.""" + from app.services.server_service import ServerService + from app.services.ssh_key_service import SshKeyService + + ssh_svc = SshKeyService(db_session, test_encrypt_key) + ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...') + + svc = ServerService(db_session, test_encrypt_key) + svc.create_server('s1', 'https://g1.com', 't1', ssh_key.id) + svc.create_server('s2', 'https://g2.com', 't2', ssh_key.id) + + servers = svc.list_servers() + assert len(servers) == 2 + +def test_delete_server(db_session, test_encrypt_key): + """测试删除服务器.""" + from app.services.server_service import ServerService + from app.services.ssh_key_service import SshKeyService + from app.models.server import Server + + ssh_svc = SshKeyService(db_session, test_encrypt_key) + ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...') + + svc = ServerService(db_session, test_encrypt_key) + server = svc.create_server('test', 'https://g.com', 't', ssh_key.id) + server_id = server.id + + svc.delete_server(server_id) + + deleted = db_session.query(Server).filter(Server.id == server_id).first() + assert deleted is None +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_services/test_server_service.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现 Server 服务** + +`backend/app/services/server_service.py`: +```python +import time +from typing import List, Optional +from sqlalchemy.orm import Session +from pathlib import Path + +from app.models.server import Server +from app.models.repo import Repo +from app.security import encrypt_data, decrypt_data + + +class ServerService: + """服务器操作服务.""" + + def __init__(self, db: Session, encryption_key: str, repos_dir: Path): + self.db = db + self.encryption_key = encryption_key + self.repos_dir = repos_dir + + def create_server( + self, + name: str, + url: str, + api_token: str, + ssh_key_id: int, + sync_enabled: bool = False, + schedule_cron: str = '0 */2 * * *' + ) -> Server: + """创建新服务器.""" + # 检查名称 + existing = self.db.query(Server).filter(Server.name == name).first() + if existing: + raise ValueError(f"Server '{name}' already exists") + + # 加密 API token + encrypted_token = encrypt_data(api_token.encode(), self.encryption_key) + + # 生成本地路径 + local_path = str(self.repos_dir / name) + + now = int(time.time()) + server = Server( + name=name, + url=url, + api_token=encrypted_token, + ssh_key_id=ssh_key_id, + sync_enabled=sync_enabled, + schedule_cron=schedule_cron, + local_path=local_path, + status='untested', + created_at=now, + updated_at=now + ) + + self.db.add(server) + self.db.commit() + self.db.refresh(server) + + return server + + def list_servers(self) -> List[Server]: + """列出所有服务器.""" + return self.db.query(Server).all() + + def get_server(self, server_id: int) -> Optional[Server]: + """获取服务器.""" + return self.db.query(Server).filter(Server.id == server_id).first() + + def update_server(self, server_id: int, **kwargs) -> Optional[Server]: + """更新服务器.""" + server = self.get_server(server_id) + if not server: + return None + + for key, value in kwargs.items(): + if value is not None and hasattr(server, key): + if key == 'api_token': + value = encrypt_data(value.encode(), self.encryption_key) + setattr(server, key, value) + + server.updated_at = int(time.time()) + self.db.commit() + self.db.refresh(server) + + return server + + def delete_server(self, server_id: int) -> bool: + """删除服务器.""" + server = self.get_server(server_id) + if not server: + return False + + self.db.delete(server) + self.db.commit() + return True + + def get_decrypted_token(self, server: Server) -> str: + """获取解密后的 API token.""" + decrypted = decrypt_data(server.api_token, self.encryption_key) + return decrypted.decode() +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_services/test_server_service.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/services/server_service.py backend/tests/test_services/test_server_service.py +git commit -m "feat: add server service" +``` + +--- + +### Task 4.3: Sync 服务 + +**文件:** +- 创建: `backend/app/services/sync_service.py` +- 创建: `backend/tests/test_services/test_sync_service.py` + +- [ ] **Step 1: 编写 Sync 服务测试** + +`backend/tests/test_services/test_sync_service.py`: +```python +import time +from pathlib import Path +import pytest + +def test_sync_repo_not_implemented_yet(db_session): + """测试同步仓库 (暂时跳过 Git 操作).""" + from app.services.sync_service import SyncService + from app.services.ssh_key_service import SshKeyService + from app.services.server_service import ServerService + from app.models.repo import Repo + + # 准备数据 + ssh_svc = SshKeyService(db_session, 'test-key') + ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...') + + srv_svc = ServerService(db_session, 'test-key', Path('/tmp/repos')) + server = srv_svc.create_server('test', 'https://g.com', 't', ssh_key.id) + + repo = Repo( + server_id=server.id, + name='test-repo', + full_name='owner/test-repo', + clone_url='git@example.com:owner/repo.git', + local_path='/tmp/test.git', + status='pending', + created_at=int(time.time()) + ) + db_session.add(repo) + db_session.commit() + + # 同步 (由于没有真实 Git 服务器,会失败但验证流程) + sync_svc = SyncService(db_session, 'test-key') + + # 这里应该会失败,因为没有真实的 Git 服务器 + # 测试主要是验证服务结构正确 + assert sync_svc is not None +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_services/test_sync_service.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现 Sync 服务** + +`backend/app/services/sync_service.py`: +```python +import time +import subprocess +from pathlib import Path +from typing import Optional, Dict, Any +from sqlalchemy.orm import Session +import tempfile + +from app.models.repo import Repo +from app.models.sync_log import SyncLog +from app.models.server import Server +from app.security import decrypt_data +from app.services.ssh_key_service import SshKeyService +from app.services.server_service import ServerService + + +class SyncService: + """仓库同步服务.""" + + def __init__(self, db: Session, encryption_key: str): + self.db = db + self.encryption_key = encryption_key + self.ssh_key_service = SshKeyService(db, encryption_key) + + def sync_repo(self, repo: Repo, ssh_key_content: str) -> SyncLog: + """ + 同步单个仓库. + + Args: + repo: 要同步的仓库 + ssh_key_content: SSH 私钥内容 + + Returns: + 同步日志 + """ + started_at = int(time.time()) + + # 创建同步日志 + sync_log = SyncLog( + repo_id=repo.id, + status='synced', + started_at=started_at, + finished_at=None, + commits_count=None, + created_at=started_at + ) + + try: + # 准备本地目录 + local_path = Path(repo.local_path) + local_path.parent.mkdir(parents=True, exist_ok=True) + + # 使用 Git 同步 + if local_path.exists(): + # 已存在,执行 fetch + commits_count = self._fetch_repo(local_path, ssh_key_content) + else: + # 不存在,执行 clone + commits_count = self._clone_repo(repo.clone_url, local_path, ssh_key_content) + + # 更新成功状态 + sync_log.status = 'synced' + sync_log.commits_count = commits_count + sync_log.finished_at = int(time.time()) + + # 更新仓库状态 + repo.status = 'synced' + repo.last_sync_at = sync_log.finished_at + + except Exception as e: + # 更新失败状态 + sync_log.status = 'failed' + sync_log.error_msg = str(e) + sync_log.finished_at = int(time.time()) + + repo.status = 'failed' + + self.db.add(sync_log) + self.db.commit() + + return sync_log + + def _clone_repo(self, clone_url: str, local_path: Path, ssh_key: str) -> int: + """克隆仓库 (mirror 模式).""" + # 使用临时 SSH key + with tempfile.NamedTemporaryFile(mode='w', suffix='.key', delete=False) as f: + f.write(ssh_key) + key_path = f.name + + try: + # GIT_SSH_COMMAND 环境变量 + env = { + 'GIT_SSH_COMMAND': f'ssh -i {key_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' + } + + # git clone --mirror + result = subprocess.run( + ['git', 'clone', '--mirror', clone_url, str(local_path)], + env=env, + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode != 0: + raise Exception(f"Git clone failed: {result.stderr}") + + # 计算提交数 + return self._count_commits(local_path) + + finally: + Path(key_path).unlink(missing_ok=True) + + def _fetch_repo(self, local_path: Path, ssh_key: str) -> int: + """获取仓库更新.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.key', delete=False) as f: + f.write(ssh_key) + key_path = f.name + + try: + env = { + 'GIT_SSH_COMMAND': f'ssh -i {key_path} -o StrictHostKeyChecking=no' + } + + result = subprocess.run( + ['git', 'fetch', '--all'], + cwd=local_path, + env=env, + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode != 0: + raise Exception(f"Git fetch failed: {result.stderr}") + + return self._count_commits(local_path) + + finally: + Path(key_path).unlink(missing_ok=True) + + def _count_commits(self, repo_path: Path) -> int: + """计算仓库提交数.""" + result = subprocess.run( + ['git', 'rev-list', '--count', '--all'], + cwd=repo_path, + capture_output=True, + text=True + ) + + if result.returncode == 0: + try: + return int(result.stdout.strip()) + except ValueError: + pass + + return 0 + + def get_repo_commits(self, repo: Repo, limit: int = 50) -> list: + """获取仓库提交历史.""" + repo_path = Path(repo.local_path) + + if not repo_path.exists(): + return [] + + result = subprocess.run( + ['git', 'log', '--all', f'-{limit}', '--format=%H|%s|%an|%ct'], + cwd=repo_path, + capture_output=True, + text=True + ) + + if result.returncode != 0: + return [] + + commits = [] + for line in result.stdout.strip().split('\n'): + if not line: + continue + + parts = line.split('|', 3) + if len(parts) == 4: + commits.append({ + 'hash': parts[0], + 'message': parts[1], + 'author': parts[2], + 'date': int(parts[3]) + }) + + return commits +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_services/test_sync_service.py -v +``` + +预期: PASS (测试结构验证) + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/services/sync_service.py backend/tests/test_services/test_sync_service.py +git commit -m "feat: add sync service with Git operations" +``` + +--- + +### Task 4.4: Repo 服务 + +**文件:** +- 创建: `backend/app/services/repo_service.py` +- 创建: `backend/tests/test_services/test_repo_service.py` + +- [ ] **Step 1: 编写 Repo 服务测试** + +`backend/tests/test_services/test_repo_service.py`: +```python +import time +from pathlib import Path +import pytest + +def test_create_repo(db_session): + """测试创建仓库记录.""" + from app.services.repo_service import RepoService + from app.services.server_service import ServerService + from app.services.ssh_key_service import SshKeyService + + # 准备数据 + ssh_svc = SshKeyService(db_session, 'key') + ssh_key = ssh_svc.create_ssh_key('k', 'ssh-rsa A') + + srv_svc = ServerService(db_session, 'key', Path('/tmp/r')) + server = srv_svc.create_server('s', 'https://g.com', 't', ssh_key.id) + + # 创建仓库 + repo_svc = RepoService(db_session) + repo = repo_svc.create_repo( + server_id=server.id, + name='test-repo', + full_name='owner/test-repo', + clone_url='git@u:r.git' + ) + + assert repo.id is not None + assert repo.full_name == 'owner/test-repo' + +def test_list_repos_by_server(db_session): + """测试按服务器列出仓库.""" + from app.services.repo_service import RepoService + from app.services.server_service import ServerService + from app.services.ssh_key_service import SshKeyService + + ssh_svc = SshKeyService(db_session, 'key') + ssh_key = ssh_svc.create_ssh_key('k', 'ssh-rsa A') + + srv_svc = ServerService(db_session, 'key', Path('/tmp/r')) + server = srv_svc.create_server('s', 'https://g.com', 't', ssh_key.id) + + repo_svc = RepoService(db_session) + repo_svc.create_repo(server.id, 'r1', 'o/r1', 'git@u:r1.git') + repo_svc.create_repo(server.id, 'r2', 'o/r2', 'git@u:r2.git') + + repos = repo_svc.list_repos(server_id=server.id) + assert len(repos) == 2 +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_services/test_repo_service.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现 Repo 服务** + +`backend/app/services/repo_service.py`: +```python +import time +from typing import List, Optional +from pathlib import Path +from sqlalchemy.orm import Session + +from app.models.repo import Repo + + +class RepoService: + """仓库操作服务.""" + + def __init__(self, db: Session): + self.db = db + + def create_repo( + self, + server_id: int, + name: str, + full_name: str, + clone_url: str, + local_path: str | None = None + ) -> Repo: + """创建仓库记录.""" + repo = Repo( + server_id=server_id, + name=name, + full_name=full_name, + clone_url=clone_url, + local_path=local_path or f'/data/repos/{full_name}.git', + status='pending', + created_at=int(time.time()) + ) + + self.db.add(repo) + self.db.commit() + self.db.refresh(repo) + + return repo + + def list_repos(self, server_id: int | None = None) -> List[Repo]: + """列出仓库.""" + query = self.db.query(Repo) + + if server_id is not None: + query = query.filter(Repo.server_id == server_id) + + return query.all() + + def get_repo(self, repo_id: int) -> Optional[Repo]: + """获取仓库.""" + return self.db.query(Repo).filter(Repo.id == repo_id).first() + + def update_repo_status(self, repo_id: int, status: str) -> Optional[Repo]: + """更新仓库状态.""" + repo = self.get_repo(repo_id) + if not repo: + return None + + repo.status = status + if status == 'synced': + repo.last_sync_at = int(time.time()) + + self.db.commit() + self.db.refresh(repo) + + return repo + + def delete_repo(self, repo_id: int) -> bool: + """删除仓库.""" + repo = self.get_repo(repo_id) + if not repo: + return False + + self.db.delete(repo) + self.db.commit() + return True +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_services/test_repo_service.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/services/repo_service.py backend/tests/test_services/test_repo_service.py +git commit -m "feat: add repo service" +``` + +--- + +## Phase 5: API 路由 + +### Task 5.1: API 依赖注入 + +**文件:** +- 创建: `backend/app/api/deps.py` +- 创建: `backend/tests/test_api/test_deps.py` + +- [ ] **Step 1: 编写依赖测试** + +`backend/tests/test_api/test_deps.py`: +```python +import pytest +from fastapi import HTTPException + +def test_require_auth_success(test_env_vars): + """测试认证成功.""" + from app.api.deps import require_auth + + # 应该不抛出异常 + import asyncio + asyncio.run(require_auth("Bearer test-token")) + +def test_require_auth_failure(): + """测试认证失败.""" + from app.api.deps import require_auth + + import asyncio + with pytest.raises(HTTPException) as exc: + asyncio.run(require_auth("Bearer wrong-token")) + + assert exc.value.status_code == 401 +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_api/test_deps.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现依赖注入** + +`backend/app/api/deps.py`: +```python +from typing import Annotated +from fastapi import Depends, HTTPException, Header, status +from sqlalchemy.orm import Session + +from app.database import _session_factory +from app.security import verify_api_token + + +def get_db_session() -> Session: + """获取数据库会话.""" + if _session_factory is None: + raise RuntimeError("Database not initialized") + + session = _session_factory() + try: + yield session + finally: + session.close() + + +# 类型别名 +DBSession = Annotated[Session, Depends(get_db_session)] + + +async def require_auth( + authorization: Annotated[str, Header()] = None +) -> None: + """要求认证.""" + if not verify_api_token(authorization): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication token" + ) +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_api/test_deps.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/api/deps.py backend/tests/test_api/test_deps.py +git commit -m "feat: add API dependencies (auth, DB session)" +``` + +--- + +### Task 5.2: SSH Keys API + +**文件:** +- 创建: `backend/app/api/ssh_keys.py` +- 创建: `backend/tests/test_api/test_ssh_keys.py` + +- [ ] **Step 1: 编写 SSH Keys API 测试** + +`backend/tests/test_api/test_ssh_keys.py`: +```python +import pytest +from fastapi.testclient import TestClient + +@pytest.fixture +def client_with_auth(test_env_vars): + """创建带认证的测试客户端.""" + from app.main import app + return TestClient(app) + +def test_list_ssh_keys(client_with_auth, db_session, test_encrypt_key): + """测试列出 SSH keys.""" + from app.services.ssh_key_service import SshKeyService + + # 创建测试数据 + svc = SshKeyService(db_session, test_encrypt_key) + svc.create_ssh_key('key1', 'ssh-rsa A') + svc.create_ssh_key('key2', 'ssh-rsa B') + + # API 调用 + response = client_with_auth.get( + "/api/v1/ssh-keys", + headers={"Authorization": "Bearer test-token"} + ) + + assert response.status_code == 200 + data = response.json() + assert data['code'] == 0 + assert len(data['data']) == 2 + +def test_create_ssh_key(client_with_auth, db_session): + """测试创建 SSH key.""" + response = client_with_auth.post( + "/api/v1/ssh-keys", + json={ + "name": "test-key", + "private_key": "ssh-rsa AAAAB3..." + }, + headers={"Authorization": "Bearer test-token"} + ) + + assert response.status_code == 200 + data = response.json() + assert data['code'] == 0 + assert data['data']['name'] == 'test-key' + +def test_delete_ssh_key(client_with_auth, db_session, test_encrypt_key): + """测试删除 SSH key.""" + from app.services.ssh_key_service import SshKeyService + + # 创建测试数据 + svc = SshKeyService(db_session, test_encrypt_key) + ssh_key = svc.create_ssh_key('test', 'ssh-rsa A') + + # API 调用 + response = client_with_auth.delete( + f"/api/v1/ssh-keys/{ssh_key.id}", + headers={"Authorization": "Bearer test-token"} + ) + + assert response.status_code == 200 +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_api/test_ssh_keys.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现 SSH Keys API** + +`backend/app/api/ssh_keys.py`: +```python +from typing import Annotated +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session + +from app.api.deps import DBSession, require_auth +from app.services.ssh_key_service import SshKeyService +from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse +from app.schemas.common import SuccessResponse, ErrorResponse +from app.config import settings + + +router = APIRouter(prefix='/api/v1/ssh-keys', tags=['ssh-keys']) + + +def get_ssh_service(db: DBSession) -> SshKeyService: + """获取 SSH key 服务实例.""" + return SshKeyService(db, settings.encrypt_key) + + +SshService = Annotated[SshKeyService, Depends(get_ssh_service)] + + +@router.get('', response_model=SuccessResponse[list[SshKeyResponse]]) +async def list_ssh_keys( + service: SshService, + _: Annotated[None, Depends(require_auth)] +): + """列出所有 SSH keys.""" + keys = service.list_ssh_keys() + + return SuccessResponse( + data=[ + SshKeyResponse( + id=k.id, + name=k.name, + fingerprint=k.fingerprint, + servers_count=len(k.servers), + created_at=k.created_at + ) + for k in keys + ] + ) + + +@router.post('', response_model=SuccessResponse[SshKeyResponse]) +async def create_ssh_key( + data: SshKeyCreate, + service: SshService, + _: Annotated[None, Depends(require_auth)] +): + """创建新的 SSH key.""" + try: + ssh_key = service.create_ssh_key( + name=data.name, + private_key=data.private_key, + password=data.password + ) + + return SuccessResponse( + data=SshKeyResponse( + id=ssh_key.id, + name=ssh_key.name, + fingerprint=ssh_key.fingerprint, + servers_count=0, + created_at=ssh_key.created_at + ) + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.delete('/{key_id}', response_model=SuccessResponse[dict]) +async def delete_ssh_key( + key_id: int, + service: SshService, + _: Annotated[None, Depends(require_auth)] +): + """删除 SSH key.""" + try: + service.delete_ssh_key(key_id) + return SuccessResponse(data={'deleted': True}) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_api/test_ssh_keys.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/api/ssh_keys.py backend/tests/test_api/test_ssh_keys.py +git commit -m "feat: add SSH keys API endpoints" +``` + +--- + +### Task 5.3: Servers API + +**文件:** +- 创建: `backend/app/api/servers.py` +- 创建: `backend/tests/test_api/test_servers.py` + +- [ ] **Step 1: 编写 Servers API 测试** + +`backend/tests/test_api/test_servers.py`: +```python +import pytest +from fastapi.testclient import TestClient + +def test_list_servers(client_with_auth, db_session, test_encrypt_key): + """测试列出服务器.""" + from app.services.server_service import ServerService + from app.services.ssh_key_service import SshKeyService + + ssh_svc = SshKeyService(db_session, test_encrypt_key) + ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa A') + + srv_svc = ServerService(db_session, test_encrypt_key, Path('/tmp/r')) + srv_svc.create_server('s1', 'https://g1.com', 't1', ssh_key.id) + srv_svc.create_server('s2', 'https://g2.com', 't2', ssh_key.id) + + from app.main import app + client = TestClient(app) + + response = client.get( + "/api/v1/servers", + headers={"Authorization": "Bearer test-token"} + ) + + assert response.status_code == 200 + data = response.json() + assert data['code'] == 0 + assert len(data['data']) == 2 + +def test_create_server(client_with_auth, db_session, test_encrypt_key): + """测试创建服务器.""" + from app.services.ssh_key_service import SshKeyService + + ssh_svc = SshKeyService(db_session, test_encrypt_key) + ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa A') + + response = client_with_auth.post( + "/api/v1/servers", + json={ + "name": "test-gitea", + "url": "https://gitea.example.com", + "api_token": "test-token", + "ssh_key_id": ssh_key.id + }, + headers={"Authorization": "Bearer test-token"} + ) + + assert response.status_code == 200 + data = response.json() + assert data['data']['name'] == 'test-gitea' +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_api/test_servers.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现 Servers API** + +`backend/app/api/servers.py`: +```python +from typing import Annotated +from pathlib import Path +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session + +from app.api.deps import DBSession, require_auth +from app.services.server_service import ServerService +from app.services.ssh_key_service import SshKeyService +from app.schemas.server import ServerCreate, ServerUpdate, ServerResponse +from app.schemas.common import SuccessResponse +from app.config import settings + + +router = APIRouter(prefix='/api/v1/servers', tags=['servers']) + + +def get_server_service(db: DBSession) -> ServerService: + """获取服务器服务实例.""" + return ServerService(db, settings.encrypt_key, settings.repos_dir) + + +ServerSvc = Annotated[ServerService, Depends(get_server_service)] + + +@router.get('', response_model=SuccessResponse[list[ServerResponse]]) +async def list_servers( + service: ServerSvc, + _: Annotated[None, Depends(require_auth)] +): + """列出所有服务器.""" + servers = service.list_servers() + + return SuccessResponse( + data=[ + ServerResponse( + id=s.id, + name=s.name, + url=s.url, + sync_enabled=s.sync_enabled, + schedule_cron=s.schedule_cron, + local_path=s.local_path, + status=s.status, + repos_count=len(s.repos), + ssh_key_name=s.ssh_key.name, + created_at=s.created_at, + updated_at=s.updated_at + ) + for s in servers + ] + ) + + +@router.post('', response_model=SuccessResponse[ServerResponse]) +async def create_server( + data: ServerCreate, + service: ServerSvc, + _: Annotated[None, Depends(require_auth)] +): + """创建新服务器.""" + try: + server = service.create_server( + name=data.name, + url=data.url, + api_token=data.api_token, + ssh_key_id=data.ssh_key_id, + sync_enabled=data.sync_enabled, + schedule_cron=data.schedule_cron + ) + + return SuccessResponse( + data=ServerResponse( + id=server.id, + name=server.name, + url=server.url, + sync_enabled=server.sync_enabled, + schedule_cron=server.schedule_cron, + local_path=server.local_path, + status=server.status, + repos_count=0, + ssh_key_name=server.ssh_key.name, + created_at=server.created_at, + updated_at=server.updated_at + ) + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get('/{server_id}', response_model=SuccessResponse[ServerResponse]) +async def get_server( + server_id: int, + service: ServerSvc, + _: Annotated[None, Depends(require_auth)] +): + """获取服务器详情.""" + server = service.get_server(server_id) + if not server: + raise HTTPException(status_code=404, detail="Server not found") + + return SuccessResponse( + data=ServerResponse( + id=server.id, + name=server.name, + url=server.url, + sync_enabled=server.sync_enabled, + schedule_cron=server.schedule_cron, + local_path=server.local_path, + status=server.status, + repos_count=len(server.repos), + ssh_key_name=server.ssh_key.name, + created_at=server.created_at, + updated_at=server.updated_at + ) + ) + + +@router.put('/{server_id}', response_model=SuccessResponse[ServerResponse]) +async def update_server( + server_id: int, + data: ServerUpdate, + service: ServerSvc, + _: Annotated[None, Depends(require_auth)] +): + """更新服务器.""" + server = service.update_server( + server_id, + name=data.name, + url=data.url, + api_token=data.api_token, + ssh_key_id=data.ssh_key_id, + sync_enabled=data.sync_enabled, + schedule_cron=data.schedule_cron + ) + + if not server: + raise HTTPException(status_code=404, detail="Server not found") + + return SuccessResponse( + data=ServerResponse( + id=server.id, + name=server.name, + url=server.url, + sync_enabled=server.sync_enabled, + schedule_cron=server.schedule_cron, + local_path=server.local_path, + status=server.status, + repos_count=len(server.repos), + ssh_key_name=server.ssh_key.name, + created_at=server.created_at, + updated_at=server.updated_at + ) + ) + + +@router.delete('/{server_id}', response_model=SuccessResponse[dict]) +async def delete_server( + server_id: int, + service: ServerSvc, + _: Annotated[None, Depends(require_auth)] +): + """删除服务器.""" + if not service.delete_server(server_id): + raise HTTPException(status_code=404, detail="Server not found") + + return SuccessResponse(data={'deleted': True}) +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_api/test_servers.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/api/servers.py backend/tests/test_api/test_servers.py +git commit -m "feat: add servers API endpoints" +``` + +--- + +### Task 5.4: Status API + +**文件:** +- 创建: `backend/app/api/status.py` +- 创建: `backend/tests/test_api/test_status.py` + +- [ ] **Step 1: 编写 Status API 测试** + +`backend/tests/test_api/test_status.py`: +```python +import pytest +from fastapi.testclient import TestClient + +def test_get_status(client_with_auth, db_session): + """测试获取系统状态.""" + response = client_with_auth.get( + "/api/v1/status", + headers={"Authorization": "Bearer test-token"} + ) + + assert response.status_code == 200 + data = response.json() + assert data['code'] == 0 + assert 'disk_usage' in data['data'] + assert 'servers' in data['data'] + assert 'repos' in data['data'] +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_api/test_status.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现 Status API** + +`backend/app/api/status.py`: +```python +import shutil +from typing import Annotated +from fastapi import APIRouter, Depends + +from app.api.deps import DBSession, require_auth +from app.models import Server, Repo +from app.schemas.common import SuccessResponse +from app.config import settings + + +router = APIRouter(tags=['status']) + + +@router.get('/api/v1/status', response_model=SuccessResponse[dict]) +async def get_status( + db: DBSession, + _: Annotated[None, Depends(require_auth)] +): + """获取系统状态.""" + # 磁盘使用 + total, used, free = shutil.disk_usage(settings.data_dir) + + # 统计数据 + server_count = db.query(Server).count() + repo_count = db.query(Repo).count() + + synced = db.query(Repo).filter(Repo.status == 'synced').count() + failed = db.query(Repo).filter(Repo.status == 'failed').count() + syncing = db.query(Repo).filter(Repo.status == 'syncing').count() + + return SuccessResponse( + data={ + 'disk_usage': { + 'total': total, + 'used': used, + 'free': free, + 'percent': round(used / total * 100, 2) if total > 0 else 0 + }, + 'servers': { + 'total': server_count, + 'connected': db.query(Server).filter(Server.status == 'connected').count(), + 'disconnected': db.query(Server).filter(Server.status == 'disconnected').count(), + }, + 'repos': { + 'total': repo_count, + 'synced': synced, + 'failed': failed, + 'syncing': syncing, + 'pending': repo_count - synced - failed - syncing + } + } + ) +``` + +- [ ] **Step 4: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_api/test_status.py -v +``` + +预期: PASS + +- [ ] **Step 5: 提交** + +```bash +git add backend/app/api/status.py backend/tests/test_api/test_status.py +git commit -m "feat: add status API endpoint" +``` + +--- + +## Phase 6: FastAPI 主应用 + +### Task 6.1: 创建 FastAPI 主应用 + +**文件:** +- 创建: `backend/app/main.py` +- 修改: `backend/tests/conftest.py` (添加 client fixture) + +- [ ] **Step 1: 编写主应用测试** + +`backend/tests/test_main.py`: +```python +import pytest +from fastapi.testclient import TestClient + +def test_app_exists(): + """测试应用创建.""" + from app.main import app + assert app is not None + assert app.title == "Git Repo Manager" + +def test_root_redirect(): + """测试根路径重定向到前端.""" + from app.main import app + client = TestClient(app) + + response = client.get("/") + # 应该返回前端页面或重定向 + assert response.status_code in [200, 404] # 静态文件可能不存在 + +def test_api_docs(): + """测试 API 文档可访问.""" + from app.main import app + client = TestClient(app) + + response = client.get("/docs") + assert response.status_code == 200 +``` + +- [ ] **Step 2: 运行测试验证失败** + +```bash +cd backend && pytest tests/test_main.py -v +``` + +预期: FAIL + +- [ ] **Step 3: 实现主应用** + +`backend/app/main.py`: +```python +from contextlib import asynccontextmanager +from fastapi import FastAPI +from fastapi.staticfiles import StaticFiles +from pathlib import Path + +from app.config import settings +from app.database import init_db +from app.api import status +from app.api import ssh_keys, servers + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期管理.""" + # 启动时初始化数据库 + settings.data_dir.mkdir(parents=True, exist_ok=True) + init_db(settings.db_path) + + # 创建表 + from app.database import get_engine + from app.models import Base + Base.metadata.create_all(get_engine()) + + yield + + # 关闭时的清理 + + +# 创建 FastAPI 应用 +app = FastAPI( + title="Git Repo Manager", + description="Git 仓库同步管理工具", + version="1.0.0", + lifespan=lifespan +) + +# 注册路由 +app.include_router(status.router) +app.include_router(ssh_keys.router) +app.include_router(servers.router) + +# 静态文件服务 (前端构建产物) +static_dir = Path(__file__).parent.parent.parent / 'frontend' / 'dist' +if static_dir.exists(): + app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static") + + +@app.get("/health") +async def health_check(): + """健康检查.""" + return {"status": "ok"} +``` + +- [ ] **Step 4: 更新 conftest.py 添加 client fixture** + +`backend/tests/conftest.py` 中添加: +```python +@pytest.fixture(scope="function") +def client_with_auth(test_env_vars): + """带认证的测试客户端.""" + from app.main import app + from fastapi.testclient import TestClient + + def client_override(): + return TestClient(app) + + return client_override() +``` + +- [ ] **Step 5: 运行测试验证通过** + +```bash +cd backend && pytest tests/test_main.py -v +``` + +预期: PASS + +- [ ] **Step 6: 提交** + +```bash +git add backend/app/main.py backend/tests/test_main.py backend/tests/conftest.py +git commit -m "feat: add FastAPI main application" +``` + +--- + +## Phase 7: 前端 - 基础设置 + +### Task 7.1: 前端项目配置和入口 + +**文件:** +- 创建: `frontend/index.html` +- 创建: `frontend/vite.config.js` +- 创建: `frontend/src/main.js` +- 创建: `frontend/src/App.vue` + +- [ ] **Step 1: 创建 index.html** + +`frontend/index.html`: +```html + + + + + + Git Repo Manager + + +
+ + + +``` + +- [ ] **Step 2: 创建 vite.config.js** + +`frontend/vite.config.js`: +```javascript +import { defineConfig } from 'vite' +import vue from '@vitejs/plugin-vue' + +export default defineConfig({ + plugins: [vue()], + server: { + proxy: { + '/api': { + target: 'http://localhost:8000', + changeOrigin: true + } + } + }, + build: { + outDir: 'dist', + emptyOutDir: true + } +}) +``` + +- [ ] **Step 3: 创建 main.js** + +`frontend/src/main.js`: +```javascript +import { createApp } from 'vue' +import { createPinia } from 'pinia' +import ElementPlus from 'element-plus' +import 'element-plus/dist/index.css' +import zhCn from 'element-plus/es/locale/lang/zh-cn' + +import App from './App.vue' +import router from './router' + +const app = createApp(App) + +app.use(createPinia()) +app.use(router) +app.use(ElementPlus, { locale: zhCn }) + +app.mount('#app') +``` + +- [ ] **Step 4: 创建 App.vue** + +`frontend/src/App.vue`: +```vue + + + + + +``` + +- [ ] **Step 5: 提交** + +```bash +git add frontend/ +git commit -m "feat: add frontend base setup (index, vite, main, App)" +``` + +--- + +### Task 7.2: 路由配置 + +**文件:** +- 创建: `frontend/src/router/index.js` + +- [ ] **Step 1: 创建路由配置** + +`frontend/src/router/index.js`: +```javascript +import { createRouter, createWebHistory } from 'vue-router' + +const routes = [ + { + path: '/', + redirect: '/dashboard' + }, + { + path: '/dashboard', + name: 'Dashboard', + component: () => import('../views/Dashboard.vue') + }, + { + path: '/servers', + name: 'Servers', + component: () => import('../views/Servers.vue') + }, + { + path: '/repos', + name: 'Repos', + component: () => import('../views/Repos.vue') + }, + { + path: '/sync-logs', + name: 'SyncLogs', + component: () => import('../views/SyncLogs.vue') + }, + { + path: '/ssh-keys', + name: 'SshKeys', + component: () => import('../views/SshKeys.vue') + }, + { + path: '/settings', + name: 'Settings', + component: () => import('../views/Settings.vue') + } +] + +const router = createRouter({ + history: createWebHistory(), + routes +}) + +export default router +``` + +- [ ] **Step 2: 提交** + +```bash +git add frontend/src/router/ +git commit -m "feat: add frontend router configuration" +``` + +--- + +### Task 7.3: API 客户端 + +**文件:** +- 创建: `frontend/src/api/index.js` +- 创建: `frontend/src/api/servers.js` +- 创建: `frontend/src/api/sshKeys.js` +- 创建: `frontend/src/api/syncLogs.js` + +- [ ] **Step 1: 创建 Axios 实例** + +`frontend/src/api/index.js`: +```javascript +import axios from 'axios' +import { ElMessage } from 'element-plus' + +// 从 localStorage 读取 token +const getAuthToken = () => { + return localStorage.getItem('api_token') || 'test-token' +} + +// 创建 Axios 实例 +const api = axios.create({ + baseURL: '/api/v1', + timeout: 30000, + headers: { + 'Content-Type': 'application/json' + } +}) + +// 请求拦截器 +api.interceptors.request.use( + (config) => { + const token = getAuthToken() + if (token) { + config.headers.Authorization = `Bearer ${token}` + } + return config + }, + (error) => { + return Promise.reject(error) + } +) + +// 响应拦截器 +api.interceptors.response.use( + (response) => { + const data = response.data + + // 标准响应格式检查 + if (data.code === 0) { + return data + } else { + ElMessage.error(data.message || '请求失败') + return Promise.reject(new Error(data.message)) + } + }, + (error) => { + const message = error.response?.data?.detail || error.message || '网络错误' + ElMessage.error(message) + return Promise.reject(error) + } +) + +export default api +``` + +- [ ] **Step 2: 创建 Servers API** + +`frontend/src/api/servers.js`: +```javascript +import api from './index' + +export const serversApi = { + // 列出所有服务器 + list: () => api.get('/servers').then(r => r.data), + + // 获取服务器详情 + get: (id) => api.get(`/servers/${id}`).then(r => r.data), + + // 创建服务器 + create: (data) => api.post('/servers', data).then(r => r.data), + + // 更新服务器 + update: (id, data) => api.put(`/servers/${id}`, data).then(r => r.data), + + // 删除服务器 + delete: (id) => api.delete(`/servers/${id}`).then(r => r.data), + + // 测试连接 + test: (id) => api.post(`/servers/${id}/test`).then(r => r.data), + + // 触发同步 + sync: (id) => api.post(`/servers/${id}/sync`).then(r => r.data) +} +``` + +- [ ] **Step 3: 创建 SSH Keys API** + +`frontend/src/api/sshKeys.js`: +```javascript +import api from './index' + +export const sshKeysApi = { + // 列出所有 SSH keys + list: () => api.get('/ssh-keys').then(r => r.data), + + // 创建 SSH key + create: (data) => api.post('/ssh-keys', data).then(r => r.data), + + // 删除 SSH key + delete: (id) => api.delete(`/ssh-keys/${id}`).then(r => r.data), + + // 测试 key + test: (id) => api.post(`/ssh-keys/${id}/test`).then(r => r.data) +} +``` + +- [ ] **Step 4: 创建 Sync Logs API** + +`frontend/src/api/syncLogs.js`: +```javascript +import api from './index' + +export const syncLogsApi = { + // 列出同步日志 + list: (params) => api.get('/sync-logs', { params }).then(r => r.data), + + // 获取日志详情 + get: (id) => api.get(`/sync-logs/${id}`).then(r => r.data) +} +``` + +- [ ] **Step 5: 提交** + +```bash +git add frontend/src/api/ +git commit -m "feat: add frontend API client with interceptors" +``` + +--- + +### Task 7.4: Pinia 状态管理 + +**文件:** +- 创建: `frontend/src/stores/app.js` +- 创建: `frontend/src/stores/servers.js` + +- [ ] **Step 1: 创建 App Store** + +`frontend/src/stores/app.js`: +```javascript +import { defineStore } from 'pinia' +import { ref } from 'vue' +import api from '../api' + +export const useAppStore = defineStore('app', () => { + const connected = ref(false) + const loading = ref(false) + + const checkStatus = async () => { + try { + loading.value = true + // 这里可以调用 status API 检查连接 + connected.value = true + } catch (error) { + connected.value = false + } finally { + loading.value = false + } + } + + return { + connected, + loading, + checkStatus + } +}) +``` + +- [ ] **Step 2: 创建 Servers Store** + +`frontend/src/stores/servers.js`: +```javascript +import { defineStore } from 'pinia' +import { ref } from 'vue' +import { serversApi } from '../api/servers' + +export const useServersStore = defineStore('servers', () => { + const servers = ref([]) + const loading = ref(false) + + const fetchServers = async () => { + try { + loading.value = true + servers.value = await serversApi.list() + } catch (error) { + console.error('Failed to fetch servers:', error) + } finally { + loading.value = false + } + } + + const createServer = async (data) => { + try { + const server = await serversApi.create(data) + servers.value.push(server) + return server + } catch (error) { + throw error + } + } + + const updateServer = async (id, data) => { + try { + const server = await serversApi.update(id, data) + const index = servers.value.findIndex(s => s.id === id) + if (index !== -1) { + servers.value[index] = server + } + return server + } catch (error) { + throw error + } + } + + const deleteServer = async (id) => { + try { + await serversApi.delete(id) + servers.value = servers.value.filter(s => s.id !== id) + } catch (error) { + throw error + } + } + + return { + servers, + loading, + fetchServers, + createServer, + updateServer, + deleteServer + } +}) +``` + +- [ ] **Step 3: 提交** + +```bash +git add frontend/src/stores/ +git commit -m "feat: add Pinia stores (app, servers)" +``` + +--- + +### Task 7.5: 页面组件 - 占位页面 + +**文件:** +- 创建: `frontend/src/views/Dashboard.vue` +- 创建: `frontend/src/views/Servers.vue` +- 创建: `frontend/src/views/Repos.vue` +- 创建: `frontend/src/views/SyncLogs.vue` +- 创建: `frontend/src/views/SshKeys.vue` +- 创建: `frontend/src/views/Settings.vue` + +- [ ] **Step 1: 创建 Dashboard 页面** + +`frontend/src/views/Dashboard.vue`: +```vue + + + + + +``` + +- [ ] **Step 2: 创建其他页面 (占位)** + +`frontend/src/views/Servers.vue`: +```vue + +``` + +`frontend/src/views/Repos.vue`: +```vue + +``` + +`frontend/src/views/SyncLogs.vue`: +```vue + +``` + +`frontend/src/views/SshKeys.vue`: +```vue + +``` + +`frontend/src/views/Settings.vue`: +```vue + +``` + +- [ ] **Step 3: 提交** + +```bash +git add frontend/src/views/ +git commit -m "feat: add frontend page placeholders" +``` + +--- + +## Phase 8: 构建和部署配置 + +### Task 8.1: README 和启动脚本 + +**文件:** +- 创建: `README.md` +- 创建: `start.sh` + +- [ ] **Step 1: 创建 README.md** + +`README.md`: +```markdown +# Git Repo Manager + +面向小团队的 Git 仓库同步管理工具。 + +## 功能特性 + +- 多 Gitea 服务器管理 +- SSH 密钥管理 +- 仓库自动同步 +- 定时任务调度 +- Web 管理界面 + +## 快速开始 + +### 1. 配置环境变量 + +```bash +cp .env.example .env +# 编辑 .env 文件,设置加密密钥和 API Token +``` + +### 2. 安装依赖 + +**后端:** +```bash +pip install -r backend/requirements.txt +``` + +**前端:** +```bash +cd frontend +npm install +``` + +### 3. 初始化数据库 + +```bash +python backend/init_db.py +``` + +### 4. 构建前端 + +```bash +cd frontend +npm run build +``` + +### 5. 启动服务 + +```bash +# 开发环境 +bash start.sh + +# 或手动启动 +uvicorn backend.app.main:app --host 0.0.0.0 --port 8000 --reload +``` + +访问 http://localhost:8000 + +## API 文档 + +启动服务后访问: http://localhost:8000/docs + +## 开发 + +**后端开发:** +```bash +cd backend +pytest tests/ +``` + +**前端开发:** +```bash +cd frontend +npm run dev +``` + +## 技术栈 + +- **后端:** FastAPI, SQLAlchemy, APScheduler, Paramiko, GitPython +- **前端:** Vue 3, Element Plus, Pinia, Axios +- **数据库:** SQLite +``` + +- [ ] **Step 2: 创建启动脚本** + +`start.sh`: +```bash +#!/bin/bash + +# 检查 .env 文件 +if [ ! -f .env ]; then + echo "错误: .env 文件不存在" + echo "请先复制 .env.example 到 .env 并配置" + exit 1 +fi + +# 检查数据库 +if [ ! -f data/git_manager.db ]; then + echo "初始化数据库..." + python backend/init_db.py +fi + +# 启动服务 +echo "启动 Git Repo Manager..." +uvicorn backend.app.main:app --host 0.0.0.0 --port 8000 --reload +``` + +```bash +chmod +x start.sh +``` + +- [ ] **Step 3: 提交** + +```bash +git add README.md start.sh +git commit -m "feat: add README and startup script" +``` + +--- + +## 计划总结 + +### 已完成的任务模块 + +| Phase | 内容 | 任务数 | +|-------|------|--------| +| 1 | 项目基础设施 | 4 任务 | +| 2 | 数据模型 | 1 任务 (全部4个模型) | +| 3 | Pydantic Schemas | 1 任务 (全部 schemas) | +| 4 | 服务层 | 4 任务 (SSH Key, Server, Sync, Repo) | +| 5 | API 路由 | 4 任务 (依赖注入, SSH Keys, Servers, Status) | +| 6 | FastAPI 主应用 | 1 任务 | +| 7 | 前端基础 | 5 任务 (配置, 路由, API, Store, 页面) | +| 8 | 构建部署 | 1 任务 | + +**总计:** 21 个主要任务 + +### 后续可扩展功能 + +1. 完善前端页面交互 +2. 添加定时任务调度 (APScheduler) +3. 添加 Gitea API 集成 (拉取仓库列表) +4. 添加同步日志页面完整实现 +5. 添加测试连接功能 +6. Docker 容器化部署 +7. 添加 Webhook 支持 + +--- + +## 执行方式选择 + +**计划完整保存在:** `docs/superpowers/plans/2026-03-30-git-repo-manager.md` + +**执行选项:** + +1. **Subagent-Driven (推荐)** - 每个任务独立子代理执行,任务间审查 +2. **Inline Execution** - 当前会话批量执行,检查点审查 + +请选择执行方式开始实现。 diff --git a/docs/superpowers/specs/2026-03-30-git-repo-manager-design.md b/docs/superpowers/specs/2026-03-30-git-repo-manager-design.md new file mode 100644 index 0000000..4f7f23d --- /dev/null +++ b/docs/superpowers/specs/2026-03-30-git-repo-manager-design.md @@ -0,0 +1,378 @@ +# Git Repo Manager - 设计文档 + +## 概述 + +一个面向小团队(3-10人)的 Git 仓库本地同步管理工具。通过 Web 界面管理多个 Gitea 服务器,支持将远程仓库批量同步到本地,提供提交历史查看和定时自动同步功能。首版聚焦 Gitea 服务器支持。 + +## 技术选型 + +| 层 | 技术 | 理由 | +|----|------|------| +| 后端 | Python FastAPI | 开发快速,异步支持好 | +| 前端 | Vue 3 + Element Plus + Pinia | 中文生态完善,适合管理后台 | +| 数据库 | SQLite + SQLAlchemy ORM | 零部署,适合小团队 | +| 调度 | APScheduler | Python 生态成熟方案 | +| SSH | paramiko | 纯 Python SSH 实现 | +| HTTP 客户端 | Axios | 前端请求拦截器 | +| 部署 | 单进程 uvicorn | 一个命令启动全部功能 | + +## 架构设计 + +### 整体架构 + +单体 FastAPI 应用,内嵌前端静态资源,单进程运行: + +``` +┌─────────────────────────────────────────────┐ +│ FastAPI 单进程 │ +│ │ +│ ┌─────────┐ ┌──────────┐ ┌────────────┐ │ +│ │ REST API │ │ Scheduler │ │ Git Engine │ │ +│ │ /api/v1 │ │ APSched │ │ git+SSH │ │ +│ └────┬─────┘ └─────┬────┘ └─────┬──────┘ │ +│ │ │ │ │ +│ ┌────┴──────────────┴──────────────┴──────┐ │ +│ │ Service Layer │ │ +│ │ ServerService / RepoService / SyncSvc │ │ +│ └────────────────┬────────────────────────┘ │ +│ │ │ +│ ┌────────────────┴────────────────────────┐ │ +│ │ SQLAlchemy + SQLite (data + keys) │ │ +│ └─────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────┐ │ +│ │ Vue 3 SPA (打包后静态托管) │ │ +│ └─────────────────────────────────────────┘ │ +└─────────────────────────────────────────────┘ +``` + +### 项目目录结构 + +``` +git-manager/ +├── backend/ +│ ├── app/ +│ │ ├── main.py # FastAPI 入口,挂载路由和静态文件 +│ │ ├── config.py # 配置管理,读取环境变量 +│ │ ├── database.py # SQLite 连接,会话管理 +│ │ ├── security.py # 加解密工具,API 认证 +│ │ ├── models/ # SQLAlchemy 数据模型 +│ │ │ ├── __init__.py +│ │ │ ├── server.py # Server 模型 +│ │ │ ├── repo.py # Repo 模型 +│ │ │ ├── ssh_key.py # SshKey 模型 +│ │ │ └── sync_log.py # SyncLog 模型 +│ │ ├── schemas/ # Pydantic 请求/响应模型 +│ │ │ ├── __init__.py +│ │ │ ├── server.py +│ │ │ ├── repo.py +│ │ │ ├── ssh_key.py +│ │ │ ├── sync_log.py +│ │ │ └── common.py # 标准响应格式 +│ │ ├── services/ # 业务逻辑层 +│ │ │ ├── __init__.py +│ │ │ ├── server_service.py +│ │ │ ├── repo_service.py +│ │ │ ├── sync_service.py +│ │ │ └── ssh_key_service.py +│ │ ├── api/ # API 路由 +│ │ │ ├── __init__.py +│ │ │ ├── deps.py # 依赖注入(DB 会话、认证) +│ │ │ ├── servers.py +│ │ │ ├── repos.py +│ │ │ ├── ssh_keys.py +│ │ │ ├── sync_logs.py +│ │ │ ├── schedules.py +│ │ │ └── status.py +│ │ └── tasks/ # 定时任务 +│ │ ├── __init__.py +│ │ └── sync_task.py +│ ├── init_db.py # 数据库初始化脚本 +│ └── requirements.txt +├── frontend/ +│ ├── src/ +│ │ ├── App.vue +│ │ ├── main.js +│ │ ├── router/ +│ │ │ └── index.js +│ │ ├── views/ # 页面组件 +│ │ │ ├── Dashboard.vue +│ │ │ ├── Servers.vue +│ │ │ ├── Repos.vue +│ │ │ ├── SyncLogs.vue +│ │ │ ├── SshKeys.vue +│ │ │ └── Settings.vue +│ │ ├── components/ # 通用组件 +│ │ │ ├── ServerForm.vue +│ │ │ ├── RepoSyncStatus.vue +│ │ │ └── CommitHistory.vue +│ │ ├── api/ # API 调用封装 +│ │ │ ├── index.js # Axios 实例,拦截器 +│ │ │ ├── servers.js +│ │ │ ├── repos.js +│ │ │ ├── sshKeys.js +│ │ │ └── syncLogs.js +│ │ └── stores/ # Pinia 状态管理 +│ │ ├── servers.js +│ │ ├── repos.js +│ │ └── app.js +│ ├── index.html +│ ├── vite.config.js +│ └── package.json +└── data/ # 运行时数据目录 + ├── git_manager.db # SQLite 数据库 + ├── ssh_keys/ # SSH 密钥文件(加密存储) + └── repos/ # 本地仓库镜像 + └── {server_name}/ + └── {repo_full_name}/ +``` + +## API 设计 + +### 设计原则 + +- 统一 `/api/v1/` 前缀,版本化管理 +- 标准 JSON 响应格式:`{"code": 0, "data": {}, "message": "success"}` +- Bearer Token 认证:`Authorization: Bearer ` +- Swagger 文档自动生成(`/docs`) + +### 接口列表 + +#### 服务器管理 + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/v1/servers` | 添加 Gitea 服务器 | +| GET | `/api/v1/servers` | 服务器列表 | +| GET | `/api/v1/servers/{id}` | 服务器详情 | +| PUT | `/api/v1/servers/{id}` | 更新服务器配置 | +| DELETE | `/api/v1/servers/{id}` | 删除服务器 | +| POST | `/api/v1/servers/{id}/test` | 测试连接 + SSH 密钥有效性 | +| POST | `/api/v1/servers/{id}/sync` | 触发该服务器所有仓库同步 | + +#### 仓库管理 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/v1/servers/{id}/repos` | 获取服务器仓库列表(调 Gitea API) | +| GET | `/api/v1/repos` | 所有本地已同步的仓库 | +| GET | `/api/v1/repos/{id}` | 仓库详情 | +| POST | `/api/v1/repos/{id}/sync` | 手动同步单个仓库 | +| GET | `/api/v1/repos/{id}/commits` | 仓库提交历史 | + +#### SSH 密钥管理 + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/v1/ssh-keys` | 上传私钥 | +| GET | `/api/v1/ssh-keys` | 密钥列表 | +| DELETE | `/api/v1/ssh-keys/{id}` | 删除密钥 | +| POST | `/api/v1/ssh-keys/{id}/test` | 测试密钥有效性 | + +#### 同步记录 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/v1/sync-logs` | 同步日志列表(支持分页和状态筛选) | +| GET | `/api/v1/sync-logs/{id}` | 同步详情 | + +#### 调度配置 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/v1/schedules` | 查看调度配置 | +| PUT | `/api/v1/schedules` | 修改调度配置 | + +#### 系统状态 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/v1/status` | 系统运行状态、磁盘空间、仓库统计 | + +## 数据模型 + +### ER 关系 + +``` +ssh_keys 1 ──── N servers +servers 1 ──── N repos +repos 1 ──── N sync_logs +``` + +### ssh_keys 表 + +| 字段 | 类型 | 说明 | +|------|------|------| +| id | INTEGER PK | 自增主键 | +| name | VARCHAR(100) | 密钥名称 | +| private_key | TEXT | AES-256 加密后的私钥内容 | +| fingerprint | VARCHAR(64) | 密钥指纹(用于展示) | +| created_at | DATETIME | 创建时间 | + +### servers 表 + +| 字段 | 类型 | 说明 | +|------|------|------| +| id | INTEGER PK | 自增主键 | +| name | VARCHAR(100) | 服务器名称 | +| url | VARCHAR(500) | Gitea 服务器地址(如 https://gitea.example.com) | +| api_token | TEXT | Gitea API Token(AES-256 加密存储) | +| ssh_key_id | INTEGER FK | 关联 ssh_keys.id | +| sync_enabled | BOOLEAN | 是否启用自动同步 | +| schedule_cron | VARCHAR(50) | 定时表达式(如 `0 */2 * * *`) | +| local_path | VARCHAR(500) | 本地仓库存储路径 | +| status | VARCHAR(20) | 连接状态:connected / disconnected / untested | +| created_at | DATETIME | 创建时间 | +| updated_at | DATETIME | 更新时间 | + +### repos 表 + +| 字段 | 类型 | 说明 | +|------|------|------| +| id | INTEGER PK | 自增主键 | +| server_id | INTEGER FK | 关联 servers.id | +| name | VARCHAR(200) | 仓库名称 | +| full_name | VARCHAR(300) | 完整路径(owner/repo) | +| clone_url | VARCHAR(500) | SSH 克隆地址 | +| local_path | VARCHAR(500) | 本地镜像路径 | +| last_sync_at | DATETIME | 最后同步时间 | +| status | VARCHAR(20) | 状态:pending / syncing / synced / failed | +| created_at | DATETIME | 创建时间 | + +### sync_logs 表 + +| 字段 | 类型 | 说明 | +|------|------|------| +| id | INTEGER PK | 自增主键 | +| repo_id | INTEGER FK | 关联 repos.id | +| status | VARCHAR(20) | synced / failed | +| started_at | DATETIME | 开始时间 | +| finished_at | DATETIME | 结束时间 | +| commits_count | INTEGER | 本次同步新增提交数 | +| error_msg | TEXT | 失败时的错误信息 | +| created_at | DATETIME | 创建时间 | + +## 同步流程 + +### 首次同步(clone) + +``` +用户添加服务器 → 调 Gitea API 获取仓库列表 + → 对每个仓库: + 1. 生成 local_path: data/repos/{server_name}/{full_name} + 2. 解密 SSH 私钥到内存 + 3. git clone --mirror + 4. 记录 sync_log(状态 synced,提交数) + 5. 更新 repo.status = synced +``` + +### 增量同步(fetch) + +``` +定时任务触发 / 手动触发 + → 遍历该服务器下所有仓库: + 1. repo.status = syncing + 2. 解密 SSH 私钥到内存 + 3. git fetch --all + 4. 对比 refs 变化,计算新增提交数 + 5. 记录 sync_log + 6. repo.status = synced / failed +``` + +## 前端页面 + +### 页面布局 + +``` +┌──────────────────────────────────────────────────┐ +│ Git Manager [状态] [≡] │ +├────────┬─────────────────────────────────────────┤ +│ 仪表盘 │ │ +│ 服务器 │ 主内容区 │ +│ 仓库 │ │ +│ 同步记录 │ │ +│ SSH密钥 │ │ +│ 系统设置 │ │ +├────────┴─────────────────────────────────────────┤ +│ 上次同步: 2026-03-30 14:00 | 下次: 16:00 | 共 42 仓库 │ +└──────────────────────────────────────────────────┘ +``` + +### 页面功能 + +**仪表盘:** 服务器数量、仓库总数、同步状态统计(synced/failed/syncing)、最近 5 条同步记录、磁盘占用。 + +**服务器管理:** 添加/编辑/删除 Gitea 服务器,配置 URL、API Token、关联 SSH 密钥、同步计划(Cron 表达式),"测试连接"按钮验证配置有效性。 + +**仓库列表:** 按服务器分组展示,显示同步状态(标签颜色区分)、最后同步时间。支持搜索过滤。每行有"手动同步"按钮。 + +**同步记录:** 时间线形式展示同步日志,支持按状态(成功/失败)筛选,展开查看新增提交数和错误详情。 + +**SSH 密钥:** 上传私钥文件或粘贴内容,显示密钥指纹和关联服务器数,删除时检查是否有关联服务器。 + +**系统设置:** API Token 管理、全局仓库存储根路径、调度全局开关(暂停/恢复所有定时任务)。 + +## 安全设计 + +| 项 | 方案 | +|----|------| +| 私钥存储 | AES-256 加密后存入 SQLite,加密密钥从环境变量 `GM_ENCRYPT_KEY` 读取 | +| Gitea Token | 同样 AES-256 加密存储 | +| API 认证 | Bearer Token,配置在环境变量 `GM_API_TOKEN` | +| SSH 连接 | paramiko 使用内存中的解密私钥,不写入磁盘临时文件 | +| 输入校验 | Pydantic 模型自动校验所有 API 入参 | + +## 环境变量 + +| 变量 | 必填 | 默认值 | 说明 | +|------|------|--------|------| +| GM_ENCRYPT_KEY | 是 | - | AES-256 加密密钥(32字节) | +| GM_API_TOKEN | 是 | - | API 认证 Token | +| GM_DATA_DIR | 否 | ./data | 数据存储根目录 | +| GM_HOST | 否 | 0.0.0.0 | 监听地址 | +| GM_PORT | 否 | 8000 | 监听端口 | + +## 部署方式 + +```bash +# 安装后端依赖 +pip install -r backend/requirements.txt + +# 构建前端 +cd frontend && npm install && npm run build +# 构建产物自动输出到 backend/app/static/ + +# 配置环境变量 +export GM_ENCRYPT_KEY= +export GM_API_TOKEN= + +# 初始化数据库 +python -m backend.init_db + +# 启动服务 +uvicorn backend.app.main:app --host 0.0.0.0 --port 8000 +``` + +## MVP 交付范围 + +### 包含 + +- 多 Gitea 服务器管理(增删改查) +- SSH 密钥上传和管理 +- 拉取服务器所有仓库列表 +- 仓库 clone/fetch 同步 +- 手动触发同步 +- 定时自动同步(APScheduler + Cron) +- 同步历史记录查看 +- 提交历史查看 +- REST API(版本化,供外部集成) +- API Token 认证 +- Vue 3 Web 管理界面 + +### 不包含(后续迭代) + +- 多用户 / 权限系统 +- Webhook 实时推送 +- GitLab / GitHub 支持 +- Docker 容器化部署 +- 仓库内容浏览(文件树) diff --git a/frontend/package.json b/frontend/package.json new file mode 100644 index 0000000..108f054 --- /dev/null +++ b/frontend/package.json @@ -0,0 +1,22 @@ +{ + "name": "git-manager-frontend", + "version": "1.0.0", + "type": "module", + "scripts": { + "dev": "vite", + "build": "vite build", + "preview": "vite preview" + }, + "dependencies": { + "vue": "^3.4.15", + "vue-router": "^4.2.5", + "pinia": "^2.1.7", + "axios": "^1.6.5", + "element-plus": "^2.5.4", + "@element-plus/icons-vue": "^2.3.1" + }, + "devDependencies": { + "@vitejs/plugin-vue": "^5.0.3", + "vite": "^5.0.11" + } +}