Files
GitMa/docs/superpowers/plans/2026-03-30-git-repo-manager.md
panw f720de6b58 feat: initialize project structure and configuration
- Create backend directory structure (app/models, app/schemas, app/services, app/api, app/tasks, tests)
- Create frontend directory structure (src/router, src/views, src/components, src/api, src/stores)
- Create data directories (ssh_keys, repos)
- Add requirements.txt with FastAPI, SQLAlchemy, Pydantic, and testing dependencies
- Add frontend package.json with Vue 3, Vue Router, Pinia, and Element Plus
- Add .env.example with configuration template
- Add .gitignore for Python, data directories, and frontend
- Add pytest conftest.py with test fixtures for database and environment

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 14:57:51 +08:00

3800 lines
93 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Git Repo Manager 实施计划
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**目标:** 构建面向小团队的 Git 仓库同步管理工具,支持多 Gitea 服务器、Web UI、定时同步和 SSH 密钥管理。
**架构:** 单进程 FastAPI 应用 + 内嵌 Vue 3 SPA 前端SQLite 持久化APScheduler 定时任务Git/SSH 仓库操作。
**技术栈:** Python 3.11+, FastAPI, SQLAlchemy, Vue 3, Element Plus, Pinia, APScheduler, Paramiko, GitPython
---
## 目录结构
```
git-manager/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI 入口
│ │ ├── config.py # 配置管理
│ │ ├── database.py # SQLAlchemy (Base + Engine + Session)
│ │ ├── security.py # 加密 + 认证
│ │ ├── models/ # ORM 模型
│ │ │ ├── __init__.py # 导出 Base 和所有模型
│ │ │ ├── ssh_key.py
│ │ │ ├── server.py
│ │ │ ├── repo.py
│ │ │ └── sync_log.py
│ │ ├── schemas/ # Pydantic 模型
│ │ │ ├── __init__.py
│ │ │ ├── common.py
│ │ │ ├── ssh_key.py
│ │ │ ├── server.py
│ │ │ ├── repo.py
│ │ │ └── sync_log.py
│ │ ├── services/ # 业务逻辑
│ │ │ ├── __init__.py
│ │ │ ├── ssh_key_service.py
│ │ │ ├── server_service.py
│ │ │ ├── repo_service.py
│ │ │ └── sync_service.py
│ │ ├── api/ # API 路由
│ │ │ ├── __init__.py
│ │ │ ├── deps.py # 依赖注入
│ │ │ ├── ssh_keys.py
│ │ │ ├── servers.py
│ │ │ ├── repos.py
│ │ │ ├── sync_logs.py
│ │ │ ├── schedules.py
│ │ │ └── status.py
│ │ └── tasks/ # 定时任务
│ │ ├── __init__.py
│ │ └── sync_task.py
│ ├── tests/
│ │ ├── __init__.py
│ │ ├── conftest.py # pytest fixtures
│ │ ├── test_config.py
│ │ ├── test_security.py
│ │ ├── test_models/
│ │ ├── test_schemas/
│ │ ├── test_services/
│ │ └── test_api/
│ ├── init_db.py
│ └── requirements.txt
├── frontend/
│ ├── src/
│ │ ├── main.js
│ │ ├── App.vue
│ │ ├── router/
│ │ │ └── index.js
│ │ ├── views/
│ │ │ ├── Dashboard.vue
│ │ │ ├── Servers.vue
│ │ │ ├── Repos.vue
│ │ │ ├── SyncLogs.vue
│ │ │ ├── SshKeys.vue
│ │ │ └── Settings.vue
│ │ ├── components/
│ │ │ ├── ServerForm.vue
│ │ │ ├── RepoSyncStatus.vue
│ │ │ └── CommitHistory.vue
│ │ ├── api/
│ │ │ ├── index.js # Axios 实例 + 拦截器
│ │ │ ├── servers.js
│ │ │ ├── repos.js
│ │ │ ├── sshKeys.js
│ │ │ └── syncLogs.js
│ │ └── stores/
│ │ ├── app.js
│ │ ├── servers.js
│ │ └── repos.js
│ ├── index.html
│ ├── vite.config.js
│ └── package.json
├── data/ # 运行时目录 (gitignore)
│ ├── git_manager.db
│ ├── ssh_keys/
│ └── repos/
├── .env.example
├── .gitignore
└── README.md
```
---
## Phase 1: 项目基础设施
### Task 1.1: 创建项目结构和配置文件
**文件:**
- 创建: `backend/requirements.txt`
- 创建: `backend/tests/__init__.py`
- 创建: `backend/tests/conftest.py`
- 创建: `backend/app/__init__.py`
- 创建: `backend/app/models/__init__.py`
- 创建: `backend/app/schemas/__init__.py`
- 创建: `backend/app/services/__init__.py`
- 创建: `backend/app/api/__init__.py`
- 创建: `backend/app/tasks/__init__.py`
- 创建: `frontend/` 目录结构
- 创建: `data/ssh_keys/``data/repos/`
- 创建: `.env.example`
- 创建: `.gitignore`
- [ ] **Step 1: 创建目录结构**
```bash
# 后端目录
mkdir -p backend/app/{models,schemas,services,api,tasks}
mkdir -p backend/tests/{test_models,test_schemas,test_services,test_api}
# 前端目录
mkdir -p frontend/src/{router,views,components,api,stores}
# 数据目录
mkdir -p data/ssh_keys data/repos
# 创建 __init__.py
touch backend/app/__init__.py
touch backend/app/models/__init__.py
touch backend/app/schemas/__init__.py
touch backend/app/services/__init__.py
touch backend/app/api/__init__.py
touch backend/app/tasks/__init__.py
touch backend/tests/__init__.py
```
- [ ] **Step 2: 创建 requirements.txt**
`backend/requirements.txt`:
```txt
fastapi==0.109.0
uvicorn[standard]==0.27.0
sqlalchemy==2.0.25
pydantic==2.5.3
pydantic-settings==2.1.0
python-multipart==0.0.6
apscheduler==3.10.4
paramiko==3.4.0
gitpython==3.1.40
cryptography==41.0.7
requests==2.31.0
pytest==7.4.4
pytest-asyncio==0.23.3
httpx==0.26.0
```
- [ ] **Step 3: 创建 frontend/package.json**
`frontend/package.json`:
```json
{
"name": "git-manager-frontend",
"version": "1.0.0",
"type": "module",
"scripts": {
"dev": "vite",
"build": "vite build",
"preview": "vite preview"
},
"dependencies": {
"vue": "^3.4.15",
"vue-router": "^4.2.5",
"pinia": "^2.1.7",
"axios": "^1.6.5",
"element-plus": "^2.5.4",
"@element-plus/icons-vue": "^2.3.1"
},
"devDependencies": {
"@vitejs/plugin-vue": "^5.0.3",
"vite": "^5.0.11"
}
}
```
- [ ] **Step 4: 创建 .env.example**
`.env.example`:
```env
# AES-256 加密密钥 (32字节 base64编码)
GM_ENCRYPT_KEY=change-this-to-a-32-byte-base64-key
# API 认证 Token
GM_API_TOKEN=change-this-api-token
# 数据目录
GM_DATA_DIR=./data
# 服务器配置
GM_HOST=0.0.0.0
GM_PORT=8000
```
- [ ] **Step 5: 创建 .gitignore**
`.gitignore`:
```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
.venv/
# 数据
data/
*.db
*.db-journal
# 环境变量
.env
# IDE
.idea/
.vscode/
*.swp
# 前端
frontend/node_modules/
frontend/dist/
frontend/.vite/
# 日志
*.log
```
- [ ] **Step 6: 创建测试 conftest.py**
`backend/tests/conftest.py`:
```python
import os
import sys
import tempfile
import pytest
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.models import Base
@pytest.fixture(scope="function")
def db_path(tmp_path):
"""临时数据库路径."""
return tmp_path / "test.db"
@pytest.fixture(scope="function")
def db_engine(db_path):
"""临时数据库引擎."""
engine = create_engine(f"sqlite:///{db_path}", connect_args={"check_same_thread": False})
Base.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture(scope="function")
def db_session(db_engine):
"""临时数据库会话."""
SessionLocal = sessionmaker(bind=db_engine, autocommit=False, autoflush=False)
session = SessionLocal()
yield session
session.close()
@pytest.fixture(scope="function")
def test_encrypt_key():
"""测试加密密钥."""
import base64
return base64.b64encode(b'test-key-32-bytes-long-1234567890').decode()
@pytest.fixture(scope="function")
def test_env_vars(db_path, test_encrypt_key, monkeypatch):
"""设置测试环境变量."""
monkeypatch.setenv("GM_ENCRYPT_KEY", test_encrypt_key)
monkeypatch.setenv("GM_API_TOKEN", "test-token")
monkeypatch.setenv("GM_DATA_DIR", str(db_path.parent))
return {
"GM_ENCRYPT_KEY": test_encrypt_key,
"GM_API_TOKEN": "test-token",
}
```
- [ ] **Step 7: 提交**
```bash
git add .
git commit -m "feat: initialize project structure and configuration"
```
---
### Task 1.2: 配置管理模块
**文件:**
- 创建: `backend/app/config.py`
- 创建: `backend/tests/test_config.py`
- [ ] **Step 1: 编写配置测试**
`backend/tests/test_config.py`:
```python
import os
import pytest
from pathlib import Path
def test_config_defaults(test_env_vars):
"""测试配置默认值."""
from app.config import settings
assert settings.data_dir == Path('./data')
assert settings.host == '0.0.0.0'
assert settings.port == 8000
def test_config_from_env(monkeypatch):
"""测试从环境变量读取配置."""
monkeypatch.setenv("GM_DATA_DIR", "/custom/data")
monkeypatch.setenv("GM_PORT", "9000")
# 重新加载配置
from app.config import Settings
settings = Settings()
assert settings.data_dir == Path('/custom/data')
assert settings.port == 9000
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_config.py -v
```
预期: FAIL - "cannot import name 'settings' from 'app.config'"
- [ ] **Step 3: 实现配置模块**
`backend/app/config.py`:
```python
import os
from pathlib import Path
from typing import Literal
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""应用配置,从环境变量加载."""
# 安全配置
encrypt_key: str # AES-256 密钥 (base64)
api_token: str # API 认证 Token
# 路径配置
data_dir: Path = Path('./data')
# 服务器配置
host: str = '0.0.0.0'
port: int = 8000
model_config = SettingsConfigDict(
env_prefix='GM_',
env_file='.env',
env_file_encoding='utf-8',
)
@property
def db_path(self) -> Path:
"""SQLite 数据库路径."""
return self.data_dir / 'git_manager.db'
@property
def ssh_keys_dir(self) -> Path:
"""SSH 密钥存储目录."""
return self.data_dir / 'ssh_keys'
@property
def repos_dir(self) -> Path:
"""仓库镜像存储目录."""
return self.data_dir / 'repos'
settings = Settings()
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_config.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/config.py backend/tests/test_config.py
git commit -m "feat: add configuration management"
```
---
### Task 1.3: 安全模块 (加密 + 认证)
**文件:**
- 创建: `backend/app/security.py`
- 创建: `backend/tests/test_security.py`
- [ ] **Step 1: 编写安全模块测试**
`backend/tests/test_security.py`:
```python
import base64
import pytest
def test_encrypt_decrypt_roundtrip():
"""测试加密解密往返."""
from app.security import encrypt_data, decrypt_data
original = b"sensitive-secret-data"
key = base64.b64encode(b'test-key-32-bytes-long-1234567890').decode()
encrypted = encrypt_data(original, key)
assert encrypted != original
assert isinstance(encrypted, bytes)
assert len(encrypted) > len(original) # nonce + ciphertext + tag
decrypted = decrypt_data(encrypted, key)
assert decrypted == original
def test_verify_api_token_success(test_env_vars):
"""测试 API token 验证成功."""
from app.security import verify_api_token
assert verify_api_token("Bearer test-token") is True
def test_verify_api_token_failure():
"""测试 API token 验证失败."""
from app.security import verify_api_token
assert verify_api_token("Bearer wrong-token") is False
assert verify_api_token(None) is False
assert verify_api_token("Basic token") is False
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_security.py -v
```
预期: FAIL - "cannot import name 'encrypt_data' from 'app.security'"
- [ ] **Step 3: 实现安全模块**
`backend/app/security.py`:
```python
import base64
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
def encrypt_data(data: bytes, key_b64: str) -> bytes:
"""
使用 AES-256-GCM 加密数据.
Args:
data: 原始数据
key_b64: Base64 编码的加密密钥
Returns:
加密后的数据 (nonce + ciphertext + tag)
"""
key = base64.b64decode(key_b64)
nonce = os.urandom(12) # GCM 96-bit nonce
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce),
backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
# 返回 nonce + ciphertext + tag
return nonce + ciphertext + encryptor.tag
def decrypt_data(encrypted_data: bytes, key_b64: str) -> bytes:
"""
解密使用 encrypt_data 加密的数据.
Args:
encrypted_data: 加密数据 (nonce + ciphertext + tag)
key_b64: Base64 编码的加密密钥
Returns:
解密后的原始数据
"""
key = base64.b64decode(key_b64)
nonce = encrypted_data[:12]
tag = encrypted_data[-16:]
ciphertext = encrypted_data[12:-16]
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
def verify_api_token(authorization: str | None) -> bool:
"""
验证 Bearer Token.
Args:
authorization: Authorization 头部值
Returns:
Token 是否有效
"""
if not authorization:
return False
if not authorization.startswith('Bearer '):
return False
from app.config import settings
token = authorization[7:] # 移除 'Bearer ' 前缀
return token == settings.api_token
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_security.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/security.py backend/tests/test_security.py
git commit -m "feat: add security module (encryption + auth)"
```
---
### Task 1.4: 数据库模块
**文件:**
- 创建: `backend/app/database.py`
- 创建: `backend/init_db.py`
- 创建: `backend/tests/test_database.py`
- [ ] **Step 1: 编写数据库测试**
`backend/tests/test_database.py`:
```python
from pathlib import Path
def test_database_initialization(db_path):
"""测试数据库初始化."""
from app.database import init_db, get_engine, Base
init_db(db_path)
assert db_path.exists()
engine = get_engine()
assert engine is not None
# 创建所有表
Base.metadata.create_all(engine)
assert True # 如果没有异常则成功
def test_get_session(db_path):
"""测试获取数据库会话."""
from app.database import init_db, get_db
init_db(db_path)
with get_db(db_path) as session:
assert session is not None
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_database.py -v
```
预期: FAIL - "cannot import name 'init_db' from 'app.database'"
- [ ] **Step 3: 实现数据库模块**
`backend/app/database.py`:
```python
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker, Session
from pathlib import Path
from contextlib import contextmanager
from typing import Generator
class Base(DeclarativeBase):
"""SQLAlchemy 声明基类."""
pass
_engine = None
_session_factory = None
def init_db(db_path: Path) -> None:
"""
初始化数据库引擎和会话工厂.
Args:
db_path: SQLite 数据库文件路径
"""
global _engine, _session_factory
# 确保父目录存在
db_path.parent.mkdir(parents=True, exist_ok=True)
# 创建引擎
_engine = create_engine(
f'sqlite:///{db_path}',
connect_args={'check_same_thread': False}
)
_session_factory = sessionmaker(bind=_engine, autocommit=False, autoflush=False)
def get_engine():
"""获取数据库引擎."""
return _engine
def get_session_factory():
"""获取会话工厂."""
return _session_factory
@contextmanager
def get_db(db_path: Path | None = None) -> Generator[Session, None, None]:
"""
获取数据库会话.
Args:
db_path: 可选,用于初始化数据库
Yields:
SQLAlchemy 会话
"""
if db_path and _engine is None:
init_db(db_path)
if _session_factory is None:
raise RuntimeError("Database not initialized. Call init_db() first.")
session = _session_factory()
try:
yield session
finally:
session.close()
```
`backend/init_db.py`:
```python
#!/usr/bin/env python3
"""
数据库初始化脚本.
创建所有表和必要的目录.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from app.config import settings
from app.database import init_db
from app.models import Base
def main():
"""初始化数据库."""
print(f"初始化数据库: {settings.db_path}")
# 创建目录
settings.data_dir.mkdir(parents=True, exist_ok=True)
settings.ssh_keys_dir.mkdir(parents=True, exist_ok=True)
settings.repos_dir.mkdir(parents=True, exist_ok=True)
# 初始化数据库
init_db(settings.db_path)
# 创建所有表
from app.database import get_engine
Base.metadata.create_all(get_engine())
print("数据库初始化成功!")
print(f" - 数据库: {settings.db_path}")
print(f" - SSH 密钥: {settings.ssh_keys_dir}")
print(f" - 仓库: {settings.repos_dir}")
if __name__ == '__main__':
main()
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_database.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/database.py backend/init_db.py backend/tests/test_database.py
git commit -m "feat: add database module"
```
---
## Phase 2: 数据模型
### Task 2.1: 创建所有 ORM 模型
**文件:**
- 创建: `backend/app/models/ssh_key.py`
- 创建: `backend/app/models/server.py`
- 创建: `backend/app/models/repo.py`
- 创建: `backend/app/models/sync_log.py`
- 修改: `backend/app/models/__init__.py`
- [ ] **Step 1: 编写模型测试**
`backend/tests/test_models/test_models.py`:
```python
from datetime import datetime
import base64
def test_ssh_key_model(db_session):
"""测试 SshKey 模型."""
from app.models.ssh_key import SshKey
key = SshKey(
name='test-key',
private_key=b'encrypted-key-data',
fingerprint='SHA256:abc123'
)
db_session.add(key)
db_session.commit()
assert key.id is not None
assert key.created_at is not None
def test_server_model(db_session):
"""测试 Server 模型."""
from app.models.server import Server
from app.models.ssh_key import SshKey
# 先创建 SSH key
ssh_key = SshKey(
name='test-key',
private_key=b'encrypted',
fingerprint='SHA256:abc'
)
db_session.add(ssh_key)
db_session.flush()
server = Server(
name='test-gitea',
url='https://gitea.example.com',
api_token=b'encrypted-token',
ssh_key_id=ssh_key.id,
sync_enabled=True,
schedule_cron='0 */2 * * *',
local_path='/data/repos/test-gitea',
status='untested'
)
db_session.add(server)
db_session.commit()
assert server.id is not None
assert server.ssh_key.id == ssh_key.id
def test_repo_model(db_session):
"""测试 Repo 模型."""
from app.models.repo import Repo
from app.models.server import Server
from app.models.ssh_key import SshKey
# 创建关联数据
ssh_key = SshKey(name='k', private_key=b'x', fingerprint='y')
db_session.add(ssh_key)
db_session.flush()
server = Server(
name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id,
sync_enabled=True, schedule_cron='x', local_path='p', status='x'
)
db_session.add(server)
db_session.flush()
repo = Repo(
server_id=server.id,
name='my-repo',
full_name='owner/my-repo',
clone_url='git@gitea.com:owner/repo.git',
local_path='/data/r.git',
status='pending'
)
db_session.add(repo)
db_session.commit()
assert repo.id is not None
assert repo.server.id == server.id
def test_sync_log_model(db_session):
"""测试 SyncLog 模型."""
from app.models.sync_log import SyncLog
from app.models.repo import Repo
from app.models.server import Server
from app.models.ssh_key import SshKey
# 创建关联数据链
ssh_key = SshKey(name='k', private_key=b'x', fingerprint='y')
db_session.add(ssh_key)
db_session.flush()
server = Server(
name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id,
sync_enabled=True, schedule_cron='x', local_path='p', status='x'
)
db_session.add(server)
db_session.flush()
repo = Repo(
server_id=server.id, name='r', full_name='f', clone_url='u',
local_path='p', status='s'
)
db_session.add(repo)
db_session.flush()
log = SyncLog(
repo_id=repo.id,
status='synced',
started_at=datetime.utcnow(),
finished_at=datetime.utcnow(),
commits_count=5
)
db_session.add(log)
db_session.commit()
assert log.id is not None
assert log.repo.id == repo.id
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_models/test_models.py -v
```
预期: FAIL - "cannot import name 'SshKey' from 'app.models.ssh_key'"
- [ ] **Step 3: 实现 SshKey 模型**
`backend/app/models/ssh_key.py`:
```python
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text
from app.database import Base
class SshKey(Base):
"""SSH 私钥存储模型."""
__tablename__ = 'ssh_keys'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False, unique=True)
private_key = Column(Text, nullable=False) # AES-256 加密
fingerprint = Column(String(64), nullable=False)
created_at = Column(Integer, nullable=False) # Unix timestamp
def __repr__(self):
return f'<SshKey(id={self.id}, name="{self.name}")>'
```
- [ ] **Step 4: 实现 Server 模型**
`backend/app/models/server.py`:
```python
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey
from sqlalchemy.orm import relationship
from app.database import Base
class Server(Base):
"""Gitea 服务器配置模型."""
__tablename__ = 'servers'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False, unique=True)
url = Column(String(500), nullable=False)
api_token = Column(Text, nullable=False) # AES-256 加密
ssh_key_id = Column(Integer, ForeignKey('ssh_keys.id'), nullable=False)
sync_enabled = Column(Boolean, default=False, nullable=False)
schedule_cron = Column(String(50), nullable=False)
local_path = Column(String(500), nullable=False)
status = Column(String(20), default='untested', nullable=False)
created_at = Column(Integer, nullable=False)
updated_at = Column(Integer, nullable=False)
# 关系
ssh_key = relationship('SshKey', backref='servers')
repos = relationship('Repo', back_populates='server', cascade='all, delete-orphan')
def __repr__(self):
return f'<Server(id={self.id}, name="{self.name}", status="{self.status}")>'
```
- [ ] **Step 5: 实现 Repo 模型**
`backend/app/models/repo.py`:
```python
from datetime import datetime
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from app.database import Base
class Repo(Base):
"""Git 仓库镜像模型."""
__tablename__ = 'repos'
id = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('servers.id'), nullable=False)
name = Column(String(200), nullable=False)
full_name = Column(String(300), nullable=False)
clone_url = Column(String(500), nullable=False)
local_path = Column(String(500), nullable=False)
last_sync_at = Column(Integer, nullable=True)
status = Column(String(20), default='pending', nullable=False)
created_at = Column(Integer, nullable=False)
# 关系
server = relationship('Server', back_populates='repos')
sync_logs = relationship('SyncLog', back_populates='repo', cascade='all, delete-orphan')
def __repr__(self):
return f'<Repo(id={self.id}, name="{self.full_name}", status="{self.status}")>'
```
- [ ] **Step 6: 实现 SyncLog 模型**
`backend/app/models/sync_log.py`:
```python
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, ForeignKey
from sqlalchemy.orm import relationship
from app.database import Base
class SyncLog(Base):
"""仓库同步操作日志模型."""
__tablename__ = 'sync_logs'
id = Column(Integer, primary_key=True, autoincrement=True)
repo_id = Column(Integer, ForeignKey('repos.id'), nullable=False)
status = Column(String(20), nullable=False)
started_at = Column(Integer, nullable=False)
finished_at = Column(Integer, nullable=True)
commits_count = Column(Integer, nullable=True)
error_msg = Column(Text, nullable=True)
created_at = Column(Integer, nullable=False)
# 关系
repo = relationship('Repo', back_populates='sync_logs')
def __repr__(self):
return f'<SyncLog(id={self.id}, repo_id={self.repo_id}, status="{self.status}")>'
```
- [ ] **Step 7: 更新 models/__init__.py**
`backend/app/models/__init__.py`:
```python
from app.database import Base
from app.models.ssh_key import SshKey
from app.models.server import Server
from app.models.repo import Repo
from app.models.sync_log import SyncLog
__all__ = ['Base', 'SshKey', 'Server', 'Repo', 'SyncLog']
```
- [ ] **Step 8: 运行测试验证通过**
```bash
cd backend && pytest tests/test_models/test_models.py -v
```
预期: PASS
- [ ] **Step 9: 提交**
```bash
git add backend/app/models/ backend/tests/test_models/
git commit -m "feat: add all ORM models"
```
---
## Phase 3: Pydantic Schemas
### Task 3.1: 创建所有 Pydantic 模型
**文件:**
- 创建: `backend/app/schemas/common.py`
- 创建: `backend/app/schemas/ssh_key.py`
- 创建: `backend/app/schemas/server.py`
- 创建: `backend/app/schemas/repo.py`
- 创建: `backend/app/schemas/sync_log.py`
- 修改: `backend/app/schemas/__init__.py`
- [ ] **Step 1: 编写 Schemas 测试**
`backend/tests/test_schemas/test_schemas.py`:
```python
from datetime import datetime
def test_common_schemas():
"""测试通用响应格式."""
from app.schemas.common import SuccessResponse, ErrorResponse
resp = SuccessResponse(data={'key': 'value'}, message='OK')
assert resp.code == 0
assert resp.data == {'key': 'value'}
err = ErrorResponse(message='Not found', code=404)
assert err.code == 404
assert err.message == 'Not found'
def test_ssh_key_schemas():
"""测试 SSH key schemas."""
from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse
create = SshKeyCreate(
name='test-key',
private_key='ssh-rsa AAAA...'
)
assert create.name == 'test-key'
resp = SshKeyResponse(
id=1,
name='test-key',
fingerprint='SHA256:abc',
servers_count=1,
created_at=int(datetime.utcnow().timestamp())
)
assert resp.servers_count == 1
def test_server_schemas():
"""测试 Server schemas."""
from app.schemas.server import ServerCreate, ServerResponse
create = ServerCreate(
name='test',
url='https://gitea.com',
api_token='token',
ssh_key_id=1
)
assert create.sync_enabled is False # 默认值
resp = ServerResponse(
id=1,
name='test',
url='https://gitea.com',
sync_enabled=True,
schedule_cron='0 */2 * * *',
local_path='/data/r',
status='connected',
repos_count=5,
ssh_key_name='key',
created_at=int(datetime.utcnow().timestamp()),
updated_at=int(datetime.utcnow().timestamp())
)
assert resp.repos_count == 5
def test_repo_schemas():
"""测试 Repo schemas."""
from app.schemas.repo import RepoResponse, CommitInfo
resp = RepoResponse(
id=1,
server_id=1,
server_name='gitea',
name='repo',
full_name='owner/repo',
clone_url='git@u:r.git',
local_path='/p',
status='synced',
created_at=int(datetime.utcnow().timestamp())
)
assert resp.full_name == 'owner/repo'
commit = CommitInfo(
hash='abc',
message='Fix',
author='Bob',
date=int(datetime.utcnow().timestamp())
)
assert commit.hash == 'abc'
def test_sync_log_schemas():
"""测试 SyncLog schemas."""
from app.schemas.sync_log import SyncLogResponse
resp = SyncLogResponse(
id=1,
repo_id=1,
repo_name='owner/repo',
status='synced',
started_at=int(datetime.utcnow().timestamp()),
finished_at=int(datetime.utcnow().timestamp()),
commits_count=5,
created_at=int(datetime.utcnow().timestamp())
)
assert resp.commits_count == 5
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_schemas/test_schemas.py -v
```
预期: FAIL - "cannot import..."
- [ ] **Step 3: 实现所有 Schemas**
`backend/app/schemas/common.py`:
```python
from typing import Generic, TypeVar, Optional
from pydantic import BaseModel, Field
T = TypeVar('T')
class SuccessResponse(BaseModel, Generic[T]):
"""标准成功响应."""
code: int = Field(default=0, description="状态码0表示成功")
data: Optional[T] = Field(default=None, description="响应数据")
message: str = Field(default='success', description="响应消息")
class ErrorResponse(BaseModel):
"""标准错误响应."""
code: int = Field(..., description="错误码")
message: str = Field(..., description="错误消息")
data: Optional[dict] = Field(default=None, description="额外错误数据")
```
`backend/app/schemas/ssh_key.py`:
```python
from typing import Optional
from pydantic import BaseModel, Field
class SshKeyCreate(BaseModel):
"""创建 SSH key 请求."""
name: str = Field(..., min_length=1, max_length=100)
private_key: str = Field(..., description="私钥内容")
password: Optional[str] = Field(default=None, description="密钥密码")
class SshKeyResponse(BaseModel):
"""SSH key 响应."""
id: int
name: str
fingerprint: str
servers_count: int = 0
created_at: int # Unix timestamp
```
`backend/app/schemas/server.py`:
```python
from typing import Optional
from pydantic import BaseModel, Field
class ServerCreate(BaseModel):
"""创建服务器请求."""
name: str = Field(..., min_length=1, max_length=100)
url: str = Field(..., description="Gitea 服务器 URL")
api_token: str = Field(..., description="Gitea API Token")
ssh_key_id: int = Field(..., description="SSH key ID")
sync_enabled: bool = Field(default=False)
schedule_cron: str = Field(default='0 */2 * * *')
class ServerUpdate(BaseModel):
"""更新服务器请求."""
name: Optional[str] = Field(None, min_length=1, max_length=100)
url: Optional[str] = None
api_token: Optional[str] = None
ssh_key_id: Optional[int] = None
sync_enabled: Optional[bool] = None
schedule_cron: Optional[str] = None
class ServerResponse(BaseModel):
"""服务器响应."""
id: int
name: str
url: str
sync_enabled: bool
schedule_cron: str
local_path: str
status: str
repos_count: int = 0
ssh_key_name: str
created_at: int
updated_at: int
```
`backend/app/schemas/repo.py`:
```python
from typing import Optional, List
from pydantic import BaseModel
class CommitInfo(BaseModel):
"""提交信息."""
hash: str
message: str
author: str
date: int # Unix timestamp
class RepoResponse(BaseModel):
"""仓库响应."""
id: int
server_id: int
server_name: str
name: str
full_name: str
clone_url: str
local_path: str
status: str
last_sync_at: Optional[int] = None
created_at: int
```
`backend/app/schemas/sync_log.py`:
```python
from typing import Optional
from pydantic import BaseModel
class SyncLogResponse(BaseModel):
"""同步日志响应."""
id: int
repo_id: int
repo_name: str
status: str
started_at: int
finished_at: Optional[int] = None
commits_count: Optional[int] = None
error_msg: Optional[str] = None
created_at: int
```
`backend/app/schemas/__init__.py`:
```python
from app.schemas.common import SuccessResponse, ErrorResponse
from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse
from app.schemas.server import ServerCreate, ServerUpdate, ServerResponse
from app.schemas.repo import RepoResponse, CommitInfo
from app.schemas.sync_log import SyncLogResponse
__all__ = [
'SuccessResponse', 'ErrorResponse',
'SshKeyCreate', 'SshKeyResponse',
'ServerCreate', 'ServerUpdate', 'ServerResponse',
'RepoResponse', 'CommitInfo',
'SyncLogResponse',
]
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_schemas/test_schemas.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/schemas/ backend/tests/test_schemas/
git commit -m "feat: add all Pydantic schemas"
```
---
## Phase 4: 服务层
### Task 4.1: SSH Key 服务
**文件:**
- 创建: `backend/app/services/ssh_key_service.py`
- 创建: `backend/tests/test_services/test_ssh_key_service.py`
- [ ] **Step 1: 编写服务测试**
`backend/tests/test_services/test_ssh_key_service.py`:
```python
import base64
import pytest
def test_create_ssh_key(db_session, test_encrypt_key):
"""测试创建 SSH key."""
from app.services.ssh_key_service import SshKeyService
from app.models.ssh_key import SshKey
service = SshKeyService(db_session, test_encrypt_key)
ssh_key = service.create_ssh_key(
name='test-key',
private_key='ssh-rsa AAAAB3...'
)
assert ssh_key.id is not None
assert ssh_key.name == 'test-key'
assert ssh_key.fingerprint is not None
def test_list_ssh_keys(db_session, test_encrypt_key):
"""测试列出 SSH keys."""
from app.services.ssh_key_service import SshKeyService
service = SshKeyService(db_session, test_encrypt_key)
service.create_ssh_key('key1', 'ssh-rsa AAAA...')
service.create_ssh_key('key2', 'ssh-rsa BBBB...')
keys = service.list_ssh_keys()
assert len(keys) == 2
def test_delete_ssh_key(db_session, test_encrypt_key):
"""测试删除 SSH key."""
from app.services.ssh_key_service import SshKeyService
from app.models.ssh_key import SshKey
service = SshKeyService(db_session, test_encrypt_key)
ssh_key = service.create_ssh_key('test', 'ssh-rsa AAAA...')
key_id = ssh_key.id
service.delete_ssh_key(key_id)
deleted = db_session.query(SshKey).filter(SshKey.id == key_id).first()
assert deleted is None
def test_delete_in_use_key_fails(db_session, test_encrypt_key):
"""测试删除使用中的 key 失败."""
from app.services.ssh_key_service import SshKeyService
from app.models.server import Server
service = SshKeyService(db_session, test_encrypt_key)
ssh_key = service.create_ssh_key('test', 'ssh-rsa AAAA...')
# 关联服务器
server = Server(
name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id,
sync_enabled=True, schedule_cron='x', local_path='p',
status='x', created_at=0, updated_at=0
)
db_session.add(server)
db_session.commit()
with pytest.raises(ValueError, match="in use"):
service.delete_ssh_key(ssh_key.id)
def test_get_decrypted_key(db_session, test_encrypt_key):
"""测试获取解密的 key."""
from app.services.ssh_key_service import SshKeyService
service = SshKeyService(db_session, test_encrypt_key)
original_key = 'ssh-rsa AAAAB3...'
ssh_key = service.create_ssh_key('test', original_key)
decrypted = service.get_decrypted_key(ssh_key.id)
assert decrypted == original_key
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_services/test_ssh_key_service.py -v
```
预期: FAIL
- [ ] **Step 3: 实现 SSH Key 服务**
`backend/app/services/ssh_key_service.py`:
```python
import base64
import hashlib
import time
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.ssh_key import SshKey
from app.security import encrypt_data, decrypt_data
class SshKeyService:
"""SSH key 操作服务."""
def __init__(self, db: Session, encryption_key: str):
self.db = db
self.encryption_key = encryption_key
def create_ssh_key(self, name: str, private_key: str, password: str | None = None) -> SshKey:
"""创建新的 SSH key."""
# 检查名称是否已存在
existing = self.db.query(SshKey).filter(SshKey.name == name).first()
if existing:
raise ValueError(f"SSH key '{name}' already exists")
# 加密私钥
encrypted = encrypt_data(private_key.encode(), self.encryption_key)
# 生成指纹
fingerprint = self._generate_fingerprint(private_key)
ssh_key = SshKey(
name=name,
private_key=encrypted,
fingerprint=fingerprint,
created_at=int(time.time())
)
self.db.add(ssh_key)
self.db.commit()
self.db.refresh(ssh_key)
return ssh_key
def list_ssh_keys(self) -> List[SshKey]:
"""列出所有 SSH keys."""
return self.db.query(SshKey).all()
def get_ssh_key(self, key_id: int) -> Optional[SshKey]:
"""获取 SSH key."""
return self.db.query(SshKey).filter(SshKey.id == key_id).first()
def delete_ssh_key(self, key_id: int) -> bool:
"""删除 SSH key."""
ssh_key = self.get_ssh_key(key_id)
if not ssh_key:
return False
# 检查是否在使用中
if ssh_key.servers:
raise ValueError(f"Cannot delete SSH key used by {len(ssh_key.servers)} server(s)")
self.db.delete(ssh_key)
self.db.commit()
return True
def get_decrypted_key(self, key_id: int) -> str:
"""获取解密后的私钥."""
ssh_key = self.get_ssh_key(key_id)
if not ssh_key:
raise ValueError(f"SSH key {key_id} not found")
decrypted = decrypt_data(ssh_key.private_key, self.encryption_key)
return decrypted.decode()
def _generate_fingerprint(self, public_key: str) -> str:
"""生成 SSH key 指纹."""
try:
key_data = public_key.encode()
hash_obj = hashlib.sha256(key_data)
return f"SHA256:{base64.b64encode(hash_obj.digest()).decode()[:16]}"
except Exception:
return "SHA256:unknown"
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_services/test_ssh_key_service.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/services/ssh_key_service.py backend/tests/test_services/test_ssh_key_service.py
git commit -m "feat: add SSH key service"
```
---
### Task 4.2: Server 服务
**文件:**
- 创建: `backend/app/services/server_service.py`
- 创建: `backend/tests/test_services/test_server_service.py`
- [ ] **Step 1: 编写 Server 服务测试**
`backend/tests/test_services/test_server_service.py`:
```python
import base64
import time
import pytest
def test_create_server(db_session, test_encrypt_key):
"""测试创建服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
# 先创建 SSH key
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('test-key', 'ssh-rsa AAAA...')
# 创建服务器
svc = ServerService(db_session, test_encrypt_key)
server = svc.create_server(
name='test-gitea',
url='https://gitea.example.com',
api_token='gitea-token',
ssh_key_id=ssh_key.id
)
assert server.id is not None
assert server.name == 'test-gitea'
assert server.status == 'untested'
def test_list_servers(db_session, test_encrypt_key):
"""测试列出服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...')
svc = ServerService(db_session, test_encrypt_key)
svc.create_server('s1', 'https://g1.com', 't1', ssh_key.id)
svc.create_server('s2', 'https://g2.com', 't2', ssh_key.id)
servers = svc.list_servers()
assert len(servers) == 2
def test_delete_server(db_session, test_encrypt_key):
"""测试删除服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
from app.models.server import Server
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...')
svc = ServerService(db_session, test_encrypt_key)
server = svc.create_server('test', 'https://g.com', 't', ssh_key.id)
server_id = server.id
svc.delete_server(server_id)
deleted = db_session.query(Server).filter(Server.id == server_id).first()
assert deleted is None
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_services/test_server_service.py -v
```
预期: FAIL
- [ ] **Step 3: 实现 Server 服务**
`backend/app/services/server_service.py`:
```python
import time
from typing import List, Optional
from sqlalchemy.orm import Session
from pathlib import Path
from app.models.server import Server
from app.models.repo import Repo
from app.security import encrypt_data, decrypt_data
class ServerService:
"""服务器操作服务."""
def __init__(self, db: Session, encryption_key: str, repos_dir: Path):
self.db = db
self.encryption_key = encryption_key
self.repos_dir = repos_dir
def create_server(
self,
name: str,
url: str,
api_token: str,
ssh_key_id: int,
sync_enabled: bool = False,
schedule_cron: str = '0 */2 * * *'
) -> Server:
"""创建新服务器."""
# 检查名称
existing = self.db.query(Server).filter(Server.name == name).first()
if existing:
raise ValueError(f"Server '{name}' already exists")
# 加密 API token
encrypted_token = encrypt_data(api_token.encode(), self.encryption_key)
# 生成本地路径
local_path = str(self.repos_dir / name)
now = int(time.time())
server = Server(
name=name,
url=url,
api_token=encrypted_token,
ssh_key_id=ssh_key_id,
sync_enabled=sync_enabled,
schedule_cron=schedule_cron,
local_path=local_path,
status='untested',
created_at=now,
updated_at=now
)
self.db.add(server)
self.db.commit()
self.db.refresh(server)
return server
def list_servers(self) -> List[Server]:
"""列出所有服务器."""
return self.db.query(Server).all()
def get_server(self, server_id: int) -> Optional[Server]:
"""获取服务器."""
return self.db.query(Server).filter(Server.id == server_id).first()
def update_server(self, server_id: int, **kwargs) -> Optional[Server]:
"""更新服务器."""
server = self.get_server(server_id)
if not server:
return None
for key, value in kwargs.items():
if value is not None and hasattr(server, key):
if key == 'api_token':
value = encrypt_data(value.encode(), self.encryption_key)
setattr(server, key, value)
server.updated_at = int(time.time())
self.db.commit()
self.db.refresh(server)
return server
def delete_server(self, server_id: int) -> bool:
"""删除服务器."""
server = self.get_server(server_id)
if not server:
return False
self.db.delete(server)
self.db.commit()
return True
def get_decrypted_token(self, server: Server) -> str:
"""获取解密后的 API token."""
decrypted = decrypt_data(server.api_token, self.encryption_key)
return decrypted.decode()
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_services/test_server_service.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/services/server_service.py backend/tests/test_services/test_server_service.py
git commit -m "feat: add server service"
```
---
### Task 4.3: Sync 服务
**文件:**
- 创建: `backend/app/services/sync_service.py`
- 创建: `backend/tests/test_services/test_sync_service.py`
- [ ] **Step 1: 编写 Sync 服务测试**
`backend/tests/test_services/test_sync_service.py`:
```python
import time
from pathlib import Path
import pytest
def test_sync_repo_not_implemented_yet(db_session):
"""测试同步仓库 (暂时跳过 Git 操作)."""
from app.services.sync_service import SyncService
from app.services.ssh_key_service import SshKeyService
from app.services.server_service import ServerService
from app.models.repo import Repo
# 准备数据
ssh_svc = SshKeyService(db_session, 'test-key')
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...')
srv_svc = ServerService(db_session, 'test-key', Path('/tmp/repos'))
server = srv_svc.create_server('test', 'https://g.com', 't', ssh_key.id)
repo = Repo(
server_id=server.id,
name='test-repo',
full_name='owner/test-repo',
clone_url='git@example.com:owner/repo.git',
local_path='/tmp/test.git',
status='pending',
created_at=int(time.time())
)
db_session.add(repo)
db_session.commit()
# 同步 (由于没有真实 Git 服务器,会失败但验证流程)
sync_svc = SyncService(db_session, 'test-key')
# 这里应该会失败,因为没有真实的 Git 服务器
# 测试主要是验证服务结构正确
assert sync_svc is not None
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_services/test_sync_service.py -v
```
预期: FAIL
- [ ] **Step 3: 实现 Sync 服务**
`backend/app/services/sync_service.py`:
```python
import time
import subprocess
from pathlib import Path
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
import tempfile
from app.models.repo import Repo
from app.models.sync_log import SyncLog
from app.models.server import Server
from app.security import decrypt_data
from app.services.ssh_key_service import SshKeyService
from app.services.server_service import ServerService
class SyncService:
"""仓库同步服务."""
def __init__(self, db: Session, encryption_key: str):
self.db = db
self.encryption_key = encryption_key
self.ssh_key_service = SshKeyService(db, encryption_key)
def sync_repo(self, repo: Repo, ssh_key_content: str) -> SyncLog:
"""
同步单个仓库.
Args:
repo: 要同步的仓库
ssh_key_content: SSH 私钥内容
Returns:
同步日志
"""
started_at = int(time.time())
# 创建同步日志
sync_log = SyncLog(
repo_id=repo.id,
status='synced',
started_at=started_at,
finished_at=None,
commits_count=None,
created_at=started_at
)
try:
# 准备本地目录
local_path = Path(repo.local_path)
local_path.parent.mkdir(parents=True, exist_ok=True)
# 使用 Git 同步
if local_path.exists():
# 已存在,执行 fetch
commits_count = self._fetch_repo(local_path, ssh_key_content)
else:
# 不存在,执行 clone
commits_count = self._clone_repo(repo.clone_url, local_path, ssh_key_content)
# 更新成功状态
sync_log.status = 'synced'
sync_log.commits_count = commits_count
sync_log.finished_at = int(time.time())
# 更新仓库状态
repo.status = 'synced'
repo.last_sync_at = sync_log.finished_at
except Exception as e:
# 更新失败状态
sync_log.status = 'failed'
sync_log.error_msg = str(e)
sync_log.finished_at = int(time.time())
repo.status = 'failed'
self.db.add(sync_log)
self.db.commit()
return sync_log
def _clone_repo(self, clone_url: str, local_path: Path, ssh_key: str) -> int:
"""克隆仓库 (mirror 模式)."""
# 使用临时 SSH key
with tempfile.NamedTemporaryFile(mode='w', suffix='.key', delete=False) as f:
f.write(ssh_key)
key_path = f.name
try:
# GIT_SSH_COMMAND 环境变量
env = {
'GIT_SSH_COMMAND': f'ssh -i {key_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
}
# git clone --mirror
result = subprocess.run(
['git', 'clone', '--mirror', clone_url, str(local_path)],
env=env,
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
raise Exception(f"Git clone failed: {result.stderr}")
# 计算提交数
return self._count_commits(local_path)
finally:
Path(key_path).unlink(missing_ok=True)
def _fetch_repo(self, local_path: Path, ssh_key: str) -> int:
"""获取仓库更新."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.key', delete=False) as f:
f.write(ssh_key)
key_path = f.name
try:
env = {
'GIT_SSH_COMMAND': f'ssh -i {key_path} -o StrictHostKeyChecking=no'
}
result = subprocess.run(
['git', 'fetch', '--all'],
cwd=local_path,
env=env,
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
raise Exception(f"Git fetch failed: {result.stderr}")
return self._count_commits(local_path)
finally:
Path(key_path).unlink(missing_ok=True)
def _count_commits(self, repo_path: Path) -> int:
"""计算仓库提交数."""
result = subprocess.run(
['git', 'rev-list', '--count', '--all'],
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode == 0:
try:
return int(result.stdout.strip())
except ValueError:
pass
return 0
def get_repo_commits(self, repo: Repo, limit: int = 50) -> list:
"""获取仓库提交历史."""
repo_path = Path(repo.local_path)
if not repo_path.exists():
return []
result = subprocess.run(
['git', 'log', '--all', f'-{limit}', '--format=%H|%s|%an|%ct'],
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode != 0:
return []
commits = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('|', 3)
if len(parts) == 4:
commits.append({
'hash': parts[0],
'message': parts[1],
'author': parts[2],
'date': int(parts[3])
})
return commits
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_services/test_sync_service.py -v
```
预期: PASS (测试结构验证)
- [ ] **Step 5: 提交**
```bash
git add backend/app/services/sync_service.py backend/tests/test_services/test_sync_service.py
git commit -m "feat: add sync service with Git operations"
```
---
### Task 4.4: Repo 服务
**文件:**
- 创建: `backend/app/services/repo_service.py`
- 创建: `backend/tests/test_services/test_repo_service.py`
- [ ] **Step 1: 编写 Repo 服务测试**
`backend/tests/test_services/test_repo_service.py`:
```python
import time
from pathlib import Path
import pytest
def test_create_repo(db_session):
"""测试创建仓库记录."""
from app.services.repo_service import RepoService
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
# 准备数据
ssh_svc = SshKeyService(db_session, 'key')
ssh_key = ssh_svc.create_ssh_key('k', 'ssh-rsa A')
srv_svc = ServerService(db_session, 'key', Path('/tmp/r'))
server = srv_svc.create_server('s', 'https://g.com', 't', ssh_key.id)
# 创建仓库
repo_svc = RepoService(db_session)
repo = repo_svc.create_repo(
server_id=server.id,
name='test-repo',
full_name='owner/test-repo',
clone_url='git@u:r.git'
)
assert repo.id is not None
assert repo.full_name == 'owner/test-repo'
def test_list_repos_by_server(db_session):
"""测试按服务器列出仓库."""
from app.services.repo_service import RepoService
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, 'key')
ssh_key = ssh_svc.create_ssh_key('k', 'ssh-rsa A')
srv_svc = ServerService(db_session, 'key', Path('/tmp/r'))
server = srv_svc.create_server('s', 'https://g.com', 't', ssh_key.id)
repo_svc = RepoService(db_session)
repo_svc.create_repo(server.id, 'r1', 'o/r1', 'git@u:r1.git')
repo_svc.create_repo(server.id, 'r2', 'o/r2', 'git@u:r2.git')
repos = repo_svc.list_repos(server_id=server.id)
assert len(repos) == 2
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_services/test_repo_service.py -v
```
预期: FAIL
- [ ] **Step 3: 实现 Repo 服务**
`backend/app/services/repo_service.py`:
```python
import time
from typing import List, Optional
from pathlib import Path
from sqlalchemy.orm import Session
from app.models.repo import Repo
class RepoService:
"""仓库操作服务."""
def __init__(self, db: Session):
self.db = db
def create_repo(
self,
server_id: int,
name: str,
full_name: str,
clone_url: str,
local_path: str | None = None
) -> Repo:
"""创建仓库记录."""
repo = Repo(
server_id=server_id,
name=name,
full_name=full_name,
clone_url=clone_url,
local_path=local_path or f'/data/repos/{full_name}.git',
status='pending',
created_at=int(time.time())
)
self.db.add(repo)
self.db.commit()
self.db.refresh(repo)
return repo
def list_repos(self, server_id: int | None = None) -> List[Repo]:
"""列出仓库."""
query = self.db.query(Repo)
if server_id is not None:
query = query.filter(Repo.server_id == server_id)
return query.all()
def get_repo(self, repo_id: int) -> Optional[Repo]:
"""获取仓库."""
return self.db.query(Repo).filter(Repo.id == repo_id).first()
def update_repo_status(self, repo_id: int, status: str) -> Optional[Repo]:
"""更新仓库状态."""
repo = self.get_repo(repo_id)
if not repo:
return None
repo.status = status
if status == 'synced':
repo.last_sync_at = int(time.time())
self.db.commit()
self.db.refresh(repo)
return repo
def delete_repo(self, repo_id: int) -> bool:
"""删除仓库."""
repo = self.get_repo(repo_id)
if not repo:
return False
self.db.delete(repo)
self.db.commit()
return True
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_services/test_repo_service.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/services/repo_service.py backend/tests/test_services/test_repo_service.py
git commit -m "feat: add repo service"
```
---
## Phase 5: API 路由
### Task 5.1: API 依赖注入
**文件:**
- 创建: `backend/app/api/deps.py`
- 创建: `backend/tests/test_api/test_deps.py`
- [ ] **Step 1: 编写依赖测试**
`backend/tests/test_api/test_deps.py`:
```python
import pytest
from fastapi import HTTPException
def test_require_auth_success(test_env_vars):
"""测试认证成功."""
from app.api.deps import require_auth
# 应该不抛出异常
import asyncio
asyncio.run(require_auth("Bearer test-token"))
def test_require_auth_failure():
"""测试认证失败."""
from app.api.deps import require_auth
import asyncio
with pytest.raises(HTTPException) as exc:
asyncio.run(require_auth("Bearer wrong-token"))
assert exc.value.status_code == 401
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_api/test_deps.py -v
```
预期: FAIL
- [ ] **Step 3: 实现依赖注入**
`backend/app/api/deps.py`:
```python
from typing import Annotated
from fastapi import Depends, HTTPException, Header, status
from sqlalchemy.orm import Session
from app.database import _session_factory
from app.security import verify_api_token
def get_db_session() -> Session:
"""获取数据库会话."""
if _session_factory is None:
raise RuntimeError("Database not initialized")
session = _session_factory()
try:
yield session
finally:
session.close()
# 类型别名
DBSession = Annotated[Session, Depends(get_db_session)]
async def require_auth(
authorization: Annotated[str, Header()] = None
) -> None:
"""要求认证."""
if not verify_api_token(authorization):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token"
)
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_api/test_deps.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/api/deps.py backend/tests/test_api/test_deps.py
git commit -m "feat: add API dependencies (auth, DB session)"
```
---
### Task 5.2: SSH Keys API
**文件:**
- 创建: `backend/app/api/ssh_keys.py`
- 创建: `backend/tests/test_api/test_ssh_keys.py`
- [ ] **Step 1: 编写 SSH Keys API 测试**
`backend/tests/test_api/test_ssh_keys.py`:
```python
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client_with_auth(test_env_vars):
"""创建带认证的测试客户端."""
from app.main import app
return TestClient(app)
def test_list_ssh_keys(client_with_auth, db_session, test_encrypt_key):
"""测试列出 SSH keys."""
from app.services.ssh_key_service import SshKeyService
# 创建测试数据
svc = SshKeyService(db_session, test_encrypt_key)
svc.create_ssh_key('key1', 'ssh-rsa A')
svc.create_ssh_key('key2', 'ssh-rsa B')
# API 调用
response = client_with_auth.get(
"/api/v1/ssh-keys",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert len(data['data']) == 2
def test_create_ssh_key(client_with_auth, db_session):
"""测试创建 SSH key."""
response = client_with_auth.post(
"/api/v1/ssh-keys",
json={
"name": "test-key",
"private_key": "ssh-rsa AAAAB3..."
},
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert data['data']['name'] == 'test-key'
def test_delete_ssh_key(client_with_auth, db_session, test_encrypt_key):
"""测试删除 SSH key."""
from app.services.ssh_key_service import SshKeyService
# 创建测试数据
svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = svc.create_ssh_key('test', 'ssh-rsa A')
# API 调用
response = client_with_auth.delete(
f"/api/v1/ssh-keys/{ssh_key.id}",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_api/test_ssh_keys.py -v
```
预期: FAIL
- [ ] **Step 3: 实现 SSH Keys API**
`backend/app/api/ssh_keys.py`:
```python
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.deps import DBSession, require_auth
from app.services.ssh_key_service import SshKeyService
from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse
from app.schemas.common import SuccessResponse, ErrorResponse
from app.config import settings
router = APIRouter(prefix='/api/v1/ssh-keys', tags=['ssh-keys'])
def get_ssh_service(db: DBSession) -> SshKeyService:
"""获取 SSH key 服务实例."""
return SshKeyService(db, settings.encrypt_key)
SshService = Annotated[SshKeyService, Depends(get_ssh_service)]
@router.get('', response_model=SuccessResponse[list[SshKeyResponse]])
async def list_ssh_keys(
service: SshService,
_: Annotated[None, Depends(require_auth)]
):
"""列出所有 SSH keys."""
keys = service.list_ssh_keys()
return SuccessResponse(
data=[
SshKeyResponse(
id=k.id,
name=k.name,
fingerprint=k.fingerprint,
servers_count=len(k.servers),
created_at=k.created_at
)
for k in keys
]
)
@router.post('', response_model=SuccessResponse[SshKeyResponse])
async def create_ssh_key(
data: SshKeyCreate,
service: SshService,
_: Annotated[None, Depends(require_auth)]
):
"""创建新的 SSH key."""
try:
ssh_key = service.create_ssh_key(
name=data.name,
private_key=data.private_key,
password=data.password
)
return SuccessResponse(
data=SshKeyResponse(
id=ssh_key.id,
name=ssh_key.name,
fingerprint=ssh_key.fingerprint,
servers_count=0,
created_at=ssh_key.created_at
)
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete('/{key_id}', response_model=SuccessResponse[dict])
async def delete_ssh_key(
key_id: int,
service: SshService,
_: Annotated[None, Depends(require_auth)]
):
"""删除 SSH key."""
try:
service.delete_ssh_key(key_id)
return SuccessResponse(data={'deleted': True})
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_api/test_ssh_keys.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/api/ssh_keys.py backend/tests/test_api/test_ssh_keys.py
git commit -m "feat: add SSH keys API endpoints"
```
---
### Task 5.3: Servers API
**文件:**
- 创建: `backend/app/api/servers.py`
- 创建: `backend/tests/test_api/test_servers.py`
- [ ] **Step 1: 编写 Servers API 测试**
`backend/tests/test_api/test_servers.py`:
```python
import pytest
from fastapi.testclient import TestClient
def test_list_servers(client_with_auth, db_session, test_encrypt_key):
"""测试列出服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa A')
srv_svc = ServerService(db_session, test_encrypt_key, Path('/tmp/r'))
srv_svc.create_server('s1', 'https://g1.com', 't1', ssh_key.id)
srv_svc.create_server('s2', 'https://g2.com', 't2', ssh_key.id)
from app.main import app
client = TestClient(app)
response = client.get(
"/api/v1/servers",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert len(data['data']) == 2
def test_create_server(client_with_auth, db_session, test_encrypt_key):
"""测试创建服务器."""
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa A')
response = client_with_auth.post(
"/api/v1/servers",
json={
"name": "test-gitea",
"url": "https://gitea.example.com",
"api_token": "test-token",
"ssh_key_id": ssh_key.id
},
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['data']['name'] == 'test-gitea'
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_api/test_servers.py -v
```
预期: FAIL
- [ ] **Step 3: 实现 Servers API**
`backend/app/api/servers.py`:
```python
from typing import Annotated
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.api.deps import DBSession, require_auth
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
from app.schemas.server import ServerCreate, ServerUpdate, ServerResponse
from app.schemas.common import SuccessResponse
from app.config import settings
router = APIRouter(prefix='/api/v1/servers', tags=['servers'])
def get_server_service(db: DBSession) -> ServerService:
"""获取服务器服务实例."""
return ServerService(db, settings.encrypt_key, settings.repos_dir)
ServerSvc = Annotated[ServerService, Depends(get_server_service)]
@router.get('', response_model=SuccessResponse[list[ServerResponse]])
async def list_servers(
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""列出所有服务器."""
servers = service.list_servers()
return SuccessResponse(
data=[
ServerResponse(
id=s.id,
name=s.name,
url=s.url,
sync_enabled=s.sync_enabled,
schedule_cron=s.schedule_cron,
local_path=s.local_path,
status=s.status,
repos_count=len(s.repos),
ssh_key_name=s.ssh_key.name,
created_at=s.created_at,
updated_at=s.updated_at
)
for s in servers
]
)
@router.post('', response_model=SuccessResponse[ServerResponse])
async def create_server(
data: ServerCreate,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""创建新服务器."""
try:
server = service.create_server(
name=data.name,
url=data.url,
api_token=data.api_token,
ssh_key_id=data.ssh_key_id,
sync_enabled=data.sync_enabled,
schedule_cron=data.schedule_cron
)
return SuccessResponse(
data=ServerResponse(
id=server.id,
name=server.name,
url=server.url,
sync_enabled=server.sync_enabled,
schedule_cron=server.schedule_cron,
local_path=server.local_path,
status=server.status,
repos_count=0,
ssh_key_name=server.ssh_key.name,
created_at=server.created_at,
updated_at=server.updated_at
)
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get('/{server_id}', response_model=SuccessResponse[ServerResponse])
async def get_server(
server_id: int,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""获取服务器详情."""
server = service.get_server(server_id)
if not server:
raise HTTPException(status_code=404, detail="Server not found")
return SuccessResponse(
data=ServerResponse(
id=server.id,
name=server.name,
url=server.url,
sync_enabled=server.sync_enabled,
schedule_cron=server.schedule_cron,
local_path=server.local_path,
status=server.status,
repos_count=len(server.repos),
ssh_key_name=server.ssh_key.name,
created_at=server.created_at,
updated_at=server.updated_at
)
)
@router.put('/{server_id}', response_model=SuccessResponse[ServerResponse])
async def update_server(
server_id: int,
data: ServerUpdate,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""更新服务器."""
server = service.update_server(
server_id,
name=data.name,
url=data.url,
api_token=data.api_token,
ssh_key_id=data.ssh_key_id,
sync_enabled=data.sync_enabled,
schedule_cron=data.schedule_cron
)
if not server:
raise HTTPException(status_code=404, detail="Server not found")
return SuccessResponse(
data=ServerResponse(
id=server.id,
name=server.name,
url=server.url,
sync_enabled=server.sync_enabled,
schedule_cron=server.schedule_cron,
local_path=server.local_path,
status=server.status,
repos_count=len(server.repos),
ssh_key_name=server.ssh_key.name,
created_at=server.created_at,
updated_at=server.updated_at
)
)
@router.delete('/{server_id}', response_model=SuccessResponse[dict])
async def delete_server(
server_id: int,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""删除服务器."""
if not service.delete_server(server_id):
raise HTTPException(status_code=404, detail="Server not found")
return SuccessResponse(data={'deleted': True})
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_api/test_servers.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/api/servers.py backend/tests/test_api/test_servers.py
git commit -m "feat: add servers API endpoints"
```
---
### Task 5.4: Status API
**文件:**
- 创建: `backend/app/api/status.py`
- 创建: `backend/tests/test_api/test_status.py`
- [ ] **Step 1: 编写 Status API 测试**
`backend/tests/test_api/test_status.py`:
```python
import pytest
from fastapi.testclient import TestClient
def test_get_status(client_with_auth, db_session):
"""测试获取系统状态."""
response = client_with_auth.get(
"/api/v1/status",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert 'disk_usage' in data['data']
assert 'servers' in data['data']
assert 'repos' in data['data']
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_api/test_status.py -v
```
预期: FAIL
- [ ] **Step 3: 实现 Status API**
`backend/app/api/status.py`:
```python
import shutil
from typing import Annotated
from fastapi import APIRouter, Depends
from app.api.deps import DBSession, require_auth
from app.models import Server, Repo
from app.schemas.common import SuccessResponse
from app.config import settings
router = APIRouter(tags=['status'])
@router.get('/api/v1/status', response_model=SuccessResponse[dict])
async def get_status(
db: DBSession,
_: Annotated[None, Depends(require_auth)]
):
"""获取系统状态."""
# 磁盘使用
total, used, free = shutil.disk_usage(settings.data_dir)
# 统计数据
server_count = db.query(Server).count()
repo_count = db.query(Repo).count()
synced = db.query(Repo).filter(Repo.status == 'synced').count()
failed = db.query(Repo).filter(Repo.status == 'failed').count()
syncing = db.query(Repo).filter(Repo.status == 'syncing').count()
return SuccessResponse(
data={
'disk_usage': {
'total': total,
'used': used,
'free': free,
'percent': round(used / total * 100, 2) if total > 0 else 0
},
'servers': {
'total': server_count,
'connected': db.query(Server).filter(Server.status == 'connected').count(),
'disconnected': db.query(Server).filter(Server.status == 'disconnected').count(),
},
'repos': {
'total': repo_count,
'synced': synced,
'failed': failed,
'syncing': syncing,
'pending': repo_count - synced - failed - syncing
}
}
)
```
- [ ] **Step 4: 运行测试验证通过**
```bash
cd backend && pytest tests/test_api/test_status.py -v
```
预期: PASS
- [ ] **Step 5: 提交**
```bash
git add backend/app/api/status.py backend/tests/test_api/test_status.py
git commit -m "feat: add status API endpoint"
```
---
## Phase 6: FastAPI 主应用
### Task 6.1: 创建 FastAPI 主应用
**文件:**
- 创建: `backend/app/main.py`
- 修改: `backend/tests/conftest.py` (添加 client fixture)
- [ ] **Step 1: 编写主应用测试**
`backend/tests/test_main.py`:
```python
import pytest
from fastapi.testclient import TestClient
def test_app_exists():
"""测试应用创建."""
from app.main import app
assert app is not None
assert app.title == "Git Repo Manager"
def test_root_redirect():
"""测试根路径重定向到前端."""
from app.main import app
client = TestClient(app)
response = client.get("/")
# 应该返回前端页面或重定向
assert response.status_code in [200, 404] # 静态文件可能不存在
def test_api_docs():
"""测试 API 文档可访问."""
from app.main import app
client = TestClient(app)
response = client.get("/docs")
assert response.status_code == 200
```
- [ ] **Step 2: 运行测试验证失败**
```bash
cd backend && pytest tests/test_main.py -v
```
预期: FAIL
- [ ] **Step 3: 实现主应用**
`backend/app/main.py`:
```python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from app.config import settings
from app.database import init_db
from app.api import status
from app.api import ssh_keys, servers
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理."""
# 启动时初始化数据库
settings.data_dir.mkdir(parents=True, exist_ok=True)
init_db(settings.db_path)
# 创建表
from app.database import get_engine
from app.models import Base
Base.metadata.create_all(get_engine())
yield
# 关闭时的清理
# 创建 FastAPI 应用
app = FastAPI(
title="Git Repo Manager",
description="Git 仓库同步管理工具",
version="1.0.0",
lifespan=lifespan
)
# 注册路由
app.include_router(status.router)
app.include_router(ssh_keys.router)
app.include_router(servers.router)
# 静态文件服务 (前端构建产物)
static_dir = Path(__file__).parent.parent.parent / 'frontend' / 'dist'
if static_dir.exists():
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
@app.get("/health")
async def health_check():
"""健康检查."""
return {"status": "ok"}
```
- [ ] **Step 4: 更新 conftest.py 添加 client fixture**
`backend/tests/conftest.py` 中添加:
```python
@pytest.fixture(scope="function")
def client_with_auth(test_env_vars):
"""带认证的测试客户端."""
from app.main import app
from fastapi.testclient import TestClient
def client_override():
return TestClient(app)
return client_override()
```
- [ ] **Step 5: 运行测试验证通过**
```bash
cd backend && pytest tests/test_main.py -v
```
预期: PASS
- [ ] **Step 6: 提交**
```bash
git add backend/app/main.py backend/tests/test_main.py backend/tests/conftest.py
git commit -m "feat: add FastAPI main application"
```
---
## Phase 7: 前端 - 基础设置
### Task 7.1: 前端项目配置和入口
**文件:**
- 创建: `frontend/index.html`
- 创建: `frontend/vite.config.js`
- 创建: `frontend/src/main.js`
- 创建: `frontend/src/App.vue`
- [ ] **Step 1: 创建 index.html**
`frontend/index.html`:
```html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Git Repo Manager</title>
</head>
<body>
<div id="app"></div>
<script type="module" src="/src/main.js"></script>
</body>
</html>
```
- [ ] **Step 2: 创建 vite.config.js**
`frontend/vite.config.js`:
```javascript
import { defineConfig } from 'vite'
import vue from '@vitejs/plugin-vue'
export default defineConfig({
plugins: [vue()],
server: {
proxy: {
'/api': {
target: 'http://localhost:8000',
changeOrigin: true
}
}
},
build: {
outDir: 'dist',
emptyOutDir: true
}
})
```
- [ ] **Step 3: 创建 main.js**
`frontend/src/main.js`:
```javascript
import { createApp } from 'vue'
import { createPinia } from 'pinia'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'
import zhCn from 'element-plus/es/locale/lang/zh-cn'
import App from './App.vue'
import router from './router'
const app = createApp(App)
app.use(createPinia())
app.use(router)
app.use(ElementPlus, { locale: zhCn })
app.mount('#app')
```
- [ ] **Step 4: 创建 App.vue**
`frontend/src/App.vue`:
```vue
<template>
<el-container class="app-container">
<el-header class="app-header">
<div class="header-left">
<h1 class="app-title">Git Manager</h1>
</div>
<div class="header-right">
<el-tag :type="statusTagType">{{ statusText }}</el-tag>
</div>
</el-header>
<el-container class="main-container">
<el-aside width="200px" class="sidebar">
<el-menu
:default-active="activeMenu"
router
class="sidebar-menu"
>
<el-menu-item index="/dashboard">
<el-icon><Odometer /></el-icon>
<span>仪表盘</span>
</el-menu-item>
<el-menu-item index="/servers">
<el-icon><Server /></el-icon>
<span>服务器</span>
</el-menu-item>
<el-menu-item index="/repos">
<el-icon><FolderOpened /></el-icon>
<span>仓库</span>
</el-menu-item>
<el-menu-item index="/sync-logs">
<el-icon><Document /></el-icon>
<span>同步记录</span>
</el-menu-item>
<el-menu-item index="/ssh-keys">
<el-icon><Key /></el-icon>
<span>SSH 密钥</span>
</el-menu-item>
<el-menu-item index="/settings">
<el-icon><Setting /></el-icon>
<span>系统设置</span>
</el-menu-item>
</el-menu>
</el-aside>
<el-main class="main-content">
<router-view />
</el-main>
</el-container>
</el-container>
</template>
<script setup>
import { computed, onMounted, ref } from 'vue'
import { useRoute } from 'vue-router'
import {
Odometer, Server, FolderOpened, Document, Key, Setting
} from '@element-plus/icons-vue'
import { useAppStore } from './stores/app'
const route = useRoute()
const appStore = useAppStore()
const activeMenu = computed(() => route.path)
const statusTagType = computed(() => appStore.connected ? 'success' : 'danger')
const statusText = computed(() => appStore.connected ? '已连接' : '未连接')
onMounted(() => {
appStore.checkStatus()
})
</script>
<style scoped>
.app-container {
height: 100vh;
}
.app-header {
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 1px solid var(--el-border-color);
padding: 0 20px;
}
.header-left .app-title {
margin: 0;
font-size: 20px;
font-weight: 600;
}
.main-container {
height: calc(100vh - 60px);
}
.sidebar {
border-right: 1px solid var(--el-border-color);
}
.sidebar-menu {
border-right: none;
height: 100%;
}
.main-content {
padding: 20px;
overflow-y: auto;
}
</style>
```
- [ ] **Step 5: 提交**
```bash
git add frontend/
git commit -m "feat: add frontend base setup (index, vite, main, App)"
```
---
### Task 7.2: 路由配置
**文件:**
- 创建: `frontend/src/router/index.js`
- [ ] **Step 1: 创建路由配置**
`frontend/src/router/index.js`:
```javascript
import { createRouter, createWebHistory } from 'vue-router'
const routes = [
{
path: '/',
redirect: '/dashboard'
},
{
path: '/dashboard',
name: 'Dashboard',
component: () => import('../views/Dashboard.vue')
},
{
path: '/servers',
name: 'Servers',
component: () => import('../views/Servers.vue')
},
{
path: '/repos',
name: 'Repos',
component: () => import('../views/Repos.vue')
},
{
path: '/sync-logs',
name: 'SyncLogs',
component: () => import('../views/SyncLogs.vue')
},
{
path: '/ssh-keys',
name: 'SshKeys',
component: () => import('../views/SshKeys.vue')
},
{
path: '/settings',
name: 'Settings',
component: () => import('../views/Settings.vue')
}
]
const router = createRouter({
history: createWebHistory(),
routes
})
export default router
```
- [ ] **Step 2: 提交**
```bash
git add frontend/src/router/
git commit -m "feat: add frontend router configuration"
```
---
### Task 7.3: API 客户端
**文件:**
- 创建: `frontend/src/api/index.js`
- 创建: `frontend/src/api/servers.js`
- 创建: `frontend/src/api/sshKeys.js`
- 创建: `frontend/src/api/syncLogs.js`
- [ ] **Step 1: 创建 Axios 实例**
`frontend/src/api/index.js`:
```javascript
import axios from 'axios'
import { ElMessage } from 'element-plus'
// 从 localStorage 读取 token
const getAuthToken = () => {
return localStorage.getItem('api_token') || 'test-token'
}
// 创建 Axios 实例
const api = axios.create({
baseURL: '/api/v1',
timeout: 30000,
headers: {
'Content-Type': 'application/json'
}
})
// 请求拦截器
api.interceptors.request.use(
(config) => {
const token = getAuthToken()
if (token) {
config.headers.Authorization = `Bearer ${token}`
}
return config
},
(error) => {
return Promise.reject(error)
}
)
// 响应拦截器
api.interceptors.response.use(
(response) => {
const data = response.data
// 标准响应格式检查
if (data.code === 0) {
return data
} else {
ElMessage.error(data.message || '请求失败')
return Promise.reject(new Error(data.message))
}
},
(error) => {
const message = error.response?.data?.detail || error.message || '网络错误'
ElMessage.error(message)
return Promise.reject(error)
}
)
export default api
```
- [ ] **Step 2: 创建 Servers API**
`frontend/src/api/servers.js`:
```javascript
import api from './index'
export const serversApi = {
// 列出所有服务器
list: () => api.get('/servers').then(r => r.data),
// 获取服务器详情
get: (id) => api.get(`/servers/${id}`).then(r => r.data),
// 创建服务器
create: (data) => api.post('/servers', data).then(r => r.data),
// 更新服务器
update: (id, data) => api.put(`/servers/${id}`, data).then(r => r.data),
// 删除服务器
delete: (id) => api.delete(`/servers/${id}`).then(r => r.data),
// 测试连接
test: (id) => api.post(`/servers/${id}/test`).then(r => r.data),
// 触发同步
sync: (id) => api.post(`/servers/${id}/sync`).then(r => r.data)
}
```
- [ ] **Step 3: 创建 SSH Keys API**
`frontend/src/api/sshKeys.js`:
```javascript
import api from './index'
export const sshKeysApi = {
// 列出所有 SSH keys
list: () => api.get('/ssh-keys').then(r => r.data),
// 创建 SSH key
create: (data) => api.post('/ssh-keys', data).then(r => r.data),
// 删除 SSH key
delete: (id) => api.delete(`/ssh-keys/${id}`).then(r => r.data),
// 测试 key
test: (id) => api.post(`/ssh-keys/${id}/test`).then(r => r.data)
}
```
- [ ] **Step 4: 创建 Sync Logs API**
`frontend/src/api/syncLogs.js`:
```javascript
import api from './index'
export const syncLogsApi = {
// 列出同步日志
list: (params) => api.get('/sync-logs', { params }).then(r => r.data),
// 获取日志详情
get: (id) => api.get(`/sync-logs/${id}`).then(r => r.data)
}
```
- [ ] **Step 5: 提交**
```bash
git add frontend/src/api/
git commit -m "feat: add frontend API client with interceptors"
```
---
### Task 7.4: Pinia 状态管理
**文件:**
- 创建: `frontend/src/stores/app.js`
- 创建: `frontend/src/stores/servers.js`
- [ ] **Step 1: 创建 App Store**
`frontend/src/stores/app.js`:
```javascript
import { defineStore } from 'pinia'
import { ref } from 'vue'
import api from '../api'
export const useAppStore = defineStore('app', () => {
const connected = ref(false)
const loading = ref(false)
const checkStatus = async () => {
try {
loading.value = true
// 这里可以调用 status API 检查连接
connected.value = true
} catch (error) {
connected.value = false
} finally {
loading.value = false
}
}
return {
connected,
loading,
checkStatus
}
})
```
- [ ] **Step 2: 创建 Servers Store**
`frontend/src/stores/servers.js`:
```javascript
import { defineStore } from 'pinia'
import { ref } from 'vue'
import { serversApi } from '../api/servers'
export const useServersStore = defineStore('servers', () => {
const servers = ref([])
const loading = ref(false)
const fetchServers = async () => {
try {
loading.value = true
servers.value = await serversApi.list()
} catch (error) {
console.error('Failed to fetch servers:', error)
} finally {
loading.value = false
}
}
const createServer = async (data) => {
try {
const server = await serversApi.create(data)
servers.value.push(server)
return server
} catch (error) {
throw error
}
}
const updateServer = async (id, data) => {
try {
const server = await serversApi.update(id, data)
const index = servers.value.findIndex(s => s.id === id)
if (index !== -1) {
servers.value[index] = server
}
return server
} catch (error) {
throw error
}
}
const deleteServer = async (id) => {
try {
await serversApi.delete(id)
servers.value = servers.value.filter(s => s.id !== id)
} catch (error) {
throw error
}
}
return {
servers,
loading,
fetchServers,
createServer,
updateServer,
deleteServer
}
})
```
- [ ] **Step 3: 提交**
```bash
git add frontend/src/stores/
git commit -m "feat: add Pinia stores (app, servers)"
```
---
### Task 7.5: 页面组件 - 占位页面
**文件:**
- 创建: `frontend/src/views/Dashboard.vue`
- 创建: `frontend/src/views/Servers.vue`
- 创建: `frontend/src/views/Repos.vue`
- 创建: `frontend/src/views/SyncLogs.vue`
- 创建: `frontend/src/views/SshKeys.vue`
- 创建: `frontend/src/views/Settings.vue`
- [ ] **Step 1: 创建 Dashboard 页面**
`frontend/src/views/Dashboard.vue`:
```vue
<template>
<div class="dashboard">
<el-row :gutter="20">
<el-col :span="6">
<el-card>
<el-statistic title="服务器" :value="stats.servers" />
</el-card>
</el-col>
<el-col :span="6">
<el-card>
<el-statistic title="仓库" :value="stats.repos" />
</el-card>
</el-col>
<el-col :span="6">
<el-card>
<el-statistic title="已同步" :value="stats.synced" />
</el-card>
</el-col>
<el-col :span="6">
<el-card>
<el-statistic title="失败" :value="stats.failed" />
</el-card>
</el-col>
</el-row>
<el-card class="mt-20" header="最近同步">
<el-empty description="暂无数据" />
</el-card>
</div>
</template>
<script setup>
import { ref, onMounted } from 'vue'
import api from '../api'
const stats = ref({
servers: 0,
repos: 0,
synced: 0,
failed: 0
})
const fetchStats = async () => {
try {
const data = await api.get('/status').then(r => r.data)
stats.value = {
servers: data.servers.total,
repos: data.repos.total,
synced: data.repos.synced,
failed: data.repos.failed
}
} catch (error) {
console.error('Failed to fetch stats:', error)
}
}
onMounted(() => {
fetchStats()
})
</script>
<style scoped>
.mt-20 {
margin-top: 20px;
}
</style>
```
- [ ] **Step 2: 创建其他页面 (占位)**
`frontend/src/views/Servers.vue`:
```vue
<template>
<div class="servers">
<el-card header="服务器管理">
<el-empty description="服务器管理页面 - 开发中" />
</el-card>
</div>
</template>
```
`frontend/src/views/Repos.vue`:
```vue
<template>
<div class="repos">
<el-card header="仓库列表">
<el-empty description="仓库列表页面 - 开发中" />
</el-card>
</div>
</template>
```
`frontend/src/views/SyncLogs.vue`:
```vue
<template>
<div class="sync-logs">
<el-card header="同步记录">
<el-empty description="同步记录页面 - 开发中" />
</el-card>
</div>
</template>
```
`frontend/src/views/SshKeys.vue`:
```vue
<template>
<div class="ssh-keys">
<el-card header="SSH 密钥">
<el-empty description="SSH 密钥页面 - 开发中" />
</el-card>
</div>
</template>
```
`frontend/src/views/Settings.vue`:
```vue
<template>
<div class="settings">
<el-card header="系统设置">
<el-empty description="系统设置页面 - 开发中" />
</el-card>
</div>
</template>
```
- [ ] **Step 3: 提交**
```bash
git add frontend/src/views/
git commit -m "feat: add frontend page placeholders"
```
---
## Phase 8: 构建和部署配置
### Task 8.1: README 和启动脚本
**文件:**
- 创建: `README.md`
- 创建: `start.sh`
- [ ] **Step 1: 创建 README.md**
`README.md`:
```markdown
# Git Repo Manager
面向小团队的 Git 仓库同步管理工具。
## 功能特性
- 多 Gitea 服务器管理
- SSH 密钥管理
- 仓库自动同步
- 定时任务调度
- Web 管理界面
## 快速开始
### 1. 配置环境变量
```bash
cp .env.example .env
# 编辑 .env 文件,设置加密密钥和 API Token
```
### 2. 安装依赖
**后端:**
```bash
pip install -r backend/requirements.txt
```
**前端:**
```bash
cd frontend
npm install
```
### 3. 初始化数据库
```bash
python backend/init_db.py
```
### 4. 构建前端
```bash
cd frontend
npm run build
```
### 5. 启动服务
```bash
# 开发环境
bash start.sh
# 或手动启动
uvicorn backend.app.main:app --host 0.0.0.0 --port 8000 --reload
```
访问 http://localhost:8000
## API 文档
启动服务后访问: http://localhost:8000/docs
## 开发
**后端开发:**
```bash
cd backend
pytest tests/
```
**前端开发:**
```bash
cd frontend
npm run dev
```
## 技术栈
- **后端:** FastAPI, SQLAlchemy, APScheduler, Paramiko, GitPython
- **前端:** Vue 3, Element Plus, Pinia, Axios
- **数据库:** SQLite
```
- [ ] **Step 2: 创建启动脚本**
`start.sh`:
```bash
#!/bin/bash
# 检查 .env 文件
if [ ! -f .env ]; then
echo "错误: .env 文件不存在"
echo "请先复制 .env.example 到 .env 并配置"
exit 1
fi
# 检查数据库
if [ ! -f data/git_manager.db ]; then
echo "初始化数据库..."
python backend/init_db.py
fi
# 启动服务
echo "启动 Git Repo Manager..."
uvicorn backend.app.main:app --host 0.0.0.0 --port 8000 --reload
```
```bash
chmod +x start.sh
```
- [ ] **Step 3: 提交**
```bash
git add README.md start.sh
git commit -m "feat: add README and startup script"
```
---
## 计划总结
### 已完成的任务模块
| Phase | 内容 | 任务数 |
|-------|------|--------|
| 1 | 项目基础设施 | 4 任务 |
| 2 | 数据模型 | 1 任务 (全部4个模型) |
| 3 | Pydantic Schemas | 1 任务 (全部 schemas) |
| 4 | 服务层 | 4 任务 (SSH Key, Server, Sync, Repo) |
| 5 | API 路由 | 4 任务 (依赖注入, SSH Keys, Servers, Status) |
| 6 | FastAPI 主应用 | 1 任务 |
| 7 | 前端基础 | 5 任务 (配置, 路由, API, Store, 页面) |
| 8 | 构建部署 | 1 任务 |
**总计:** 21 个主要任务
### 后续可扩展功能
1. 完善前端页面交互
2. 添加定时任务调度 (APScheduler)
3. 添加 Gitea API 集成 (拉取仓库列表)
4. 添加同步日志页面完整实现
5. 添加测试连接功能
6. Docker 容器化部署
7. 添加 Webhook 支持
---
## 执行方式选择
**计划完整保存在:** `docs/superpowers/plans/2026-03-30-git-repo-manager.md`
**执行选项:**
1. **Subagent-Driven (推荐)** - 每个任务独立子代理执行,任务间审查
2. **Inline Execution** - 当前会话批量执行,检查点审查
请选择执行方式开始实现。