- Create backend directory structure (app/models, app/schemas, app/services, app/api, app/tasks, tests) - Create frontend directory structure (src/router, src/views, src/components, src/api, src/stores) - Create data directories (ssh_keys, repos) - Add requirements.txt with FastAPI, SQLAlchemy, Pydantic, and testing dependencies - Add frontend package.json with Vue 3, Vue Router, Pinia, and Element Plus - Add .env.example with configuration template - Add .gitignore for Python, data directories, and frontend - Add pytest conftest.py with test fixtures for database and environment Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
93 KiB
Git Repo Manager 实施计划
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
目标: 构建面向小团队的 Git 仓库同步管理工具,支持多 Gitea 服务器、Web UI、定时同步和 SSH 密钥管理。
架构: 单进程 FastAPI 应用 + 内嵌 Vue 3 SPA 前端,SQLite 持久化,APScheduler 定时任务,Git/SSH 仓库操作。
技术栈: Python 3.11+, FastAPI, SQLAlchemy, Vue 3, Element Plus, Pinia, APScheduler, Paramiko, GitPython
目录结构
git-manager/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI 入口
│ │ ├── config.py # 配置管理
│ │ ├── database.py # SQLAlchemy (Base + Engine + Session)
│ │ ├── security.py # 加密 + 认证
│ │ ├── models/ # ORM 模型
│ │ │ ├── __init__.py # 导出 Base 和所有模型
│ │ │ ├── ssh_key.py
│ │ │ ├── server.py
│ │ │ ├── repo.py
│ │ │ └── sync_log.py
│ │ ├── schemas/ # Pydantic 模型
│ │ │ ├── __init__.py
│ │ │ ├── common.py
│ │ │ ├── ssh_key.py
│ │ │ ├── server.py
│ │ │ ├── repo.py
│ │ │ └── sync_log.py
│ │ ├── services/ # 业务逻辑
│ │ │ ├── __init__.py
│ │ │ ├── ssh_key_service.py
│ │ │ ├── server_service.py
│ │ │ ├── repo_service.py
│ │ │ └── sync_service.py
│ │ ├── api/ # API 路由
│ │ │ ├── __init__.py
│ │ │ ├── deps.py # 依赖注入
│ │ │ ├── ssh_keys.py
│ │ │ ├── servers.py
│ │ │ ├── repos.py
│ │ │ ├── sync_logs.py
│ │ │ ├── schedules.py
│ │ │ └── status.py
│ │ └── tasks/ # 定时任务
│ │ ├── __init__.py
│ │ └── sync_task.py
│ ├── tests/
│ │ ├── __init__.py
│ │ ├── conftest.py # pytest fixtures
│ │ ├── test_config.py
│ │ ├── test_security.py
│ │ ├── test_models/
│ │ ├── test_schemas/
│ │ ├── test_services/
│ │ └── test_api/
│ ├── init_db.py
│ └── requirements.txt
├── frontend/
│ ├── src/
│ │ ├── main.js
│ │ ├── App.vue
│ │ ├── router/
│ │ │ └── index.js
│ │ ├── views/
│ │ │ ├── Dashboard.vue
│ │ │ ├── Servers.vue
│ │ │ ├── Repos.vue
│ │ │ ├── SyncLogs.vue
│ │ │ ├── SshKeys.vue
│ │ │ └── Settings.vue
│ │ ├── components/
│ │ │ ├── ServerForm.vue
│ │ │ ├── RepoSyncStatus.vue
│ │ │ └── CommitHistory.vue
│ │ ├── api/
│ │ │ ├── index.js # Axios 实例 + 拦截器
│ │ │ ├── servers.js
│ │ │ ├── repos.js
│ │ │ ├── sshKeys.js
│ │ │ └── syncLogs.js
│ │ └── stores/
│ │ ├── app.js
│ │ ├── servers.js
│ │ └── repos.js
│ ├── index.html
│ ├── vite.config.js
│ └── package.json
├── data/ # 运行时目录 (gitignore)
│ ├── git_manager.db
│ ├── ssh_keys/
│ └── repos/
├── .env.example
├── .gitignore
└── README.md
Phase 1: 项目基础设施
Task 1.1: 创建项目结构和配置文件
文件:
-
创建:
backend/requirements.txt -
创建:
backend/tests/__init__.py -
创建:
backend/tests/conftest.py -
创建:
backend/app/__init__.py -
创建:
backend/app/models/__init__.py -
创建:
backend/app/schemas/__init__.py -
创建:
backend/app/services/__init__.py -
创建:
backend/app/api/__init__.py -
创建:
backend/app/tasks/__init__.py -
创建:
frontend/目录结构 -
创建:
data/ssh_keys/和data/repos/ -
创建:
.env.example -
创建:
.gitignore -
Step 1: 创建目录结构
# 后端目录
mkdir -p backend/app/{models,schemas,services,api,tasks}
mkdir -p backend/tests/{test_models,test_schemas,test_services,test_api}
# 前端目录
mkdir -p frontend/src/{router,views,components,api,stores}
# 数据目录
mkdir -p data/ssh_keys data/repos
# 创建 __init__.py
touch backend/app/__init__.py
touch backend/app/models/__init__.py
touch backend/app/schemas/__init__.py
touch backend/app/services/__init__.py
touch backend/app/api/__init__.py
touch backend/app/tasks/__init__.py
touch backend/tests/__init__.py
- Step 2: 创建 requirements.txt
backend/requirements.txt:
fastapi==0.109.0
uvicorn[standard]==0.27.0
sqlalchemy==2.0.25
pydantic==2.5.3
pydantic-settings==2.1.0
python-multipart==0.0.6
apscheduler==3.10.4
paramiko==3.4.0
gitpython==3.1.40
cryptography==41.0.7
requests==2.31.0
pytest==7.4.4
pytest-asyncio==0.23.3
httpx==0.26.0
- Step 3: 创建 frontend/package.json
frontend/package.json:
{
"name": "git-manager-frontend",
"version": "1.0.0",
"type": "module",
"scripts": {
"dev": "vite",
"build": "vite build",
"preview": "vite preview"
},
"dependencies": {
"vue": "^3.4.15",
"vue-router": "^4.2.5",
"pinia": "^2.1.7",
"axios": "^1.6.5",
"element-plus": "^2.5.4",
"@element-plus/icons-vue": "^2.3.1"
},
"devDependencies": {
"@vitejs/plugin-vue": "^5.0.3",
"vite": "^5.0.11"
}
}
- Step 4: 创建 .env.example
.env.example:
# AES-256 加密密钥 (32字节 base64编码)
GM_ENCRYPT_KEY=change-this-to-a-32-byte-base64-key
# API 认证 Token
GM_API_TOKEN=change-this-api-token
# 数据目录
GM_DATA_DIR=./data
# 服务器配置
GM_HOST=0.0.0.0
GM_PORT=8000
- Step 5: 创建 .gitignore
.gitignore:
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
.venv/
# 数据
data/
*.db
*.db-journal
# 环境变量
.env
# IDE
.idea/
.vscode/
*.swp
# 前端
frontend/node_modules/
frontend/dist/
frontend/.vite/
# 日志
*.log
- Step 6: 创建测试 conftest.py
backend/tests/conftest.py:
import os
import sys
import tempfile
import pytest
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.models import Base
@pytest.fixture(scope="function")
def db_path(tmp_path):
"""临时数据库路径."""
return tmp_path / "test.db"
@pytest.fixture(scope="function")
def db_engine(db_path):
"""临时数据库引擎."""
engine = create_engine(f"sqlite:///{db_path}", connect_args={"check_same_thread": False})
Base.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture(scope="function")
def db_session(db_engine):
"""临时数据库会话."""
SessionLocal = sessionmaker(bind=db_engine, autocommit=False, autoflush=False)
session = SessionLocal()
yield session
session.close()
@pytest.fixture(scope="function")
def test_encrypt_key():
"""测试加密密钥."""
import base64
return base64.b64encode(b'test-key-32-bytes-long-1234567890').decode()
@pytest.fixture(scope="function")
def test_env_vars(db_path, test_encrypt_key, monkeypatch):
"""设置测试环境变量."""
monkeypatch.setenv("GM_ENCRYPT_KEY", test_encrypt_key)
monkeypatch.setenv("GM_API_TOKEN", "test-token")
monkeypatch.setenv("GM_DATA_DIR", str(db_path.parent))
return {
"GM_ENCRYPT_KEY": test_encrypt_key,
"GM_API_TOKEN": "test-token",
}
- Step 7: 提交
git add .
git commit -m "feat: initialize project structure and configuration"
Task 1.2: 配置管理模块
文件:
-
创建:
backend/app/config.py -
创建:
backend/tests/test_config.py -
Step 1: 编写配置测试
backend/tests/test_config.py:
import os
import pytest
from pathlib import Path
def test_config_defaults(test_env_vars):
"""测试配置默认值."""
from app.config import settings
assert settings.data_dir == Path('./data')
assert settings.host == '0.0.0.0'
assert settings.port == 8000
def test_config_from_env(monkeypatch):
"""测试从环境变量读取配置."""
monkeypatch.setenv("GM_DATA_DIR", "/custom/data")
monkeypatch.setenv("GM_PORT", "9000")
# 重新加载配置
from app.config import Settings
settings = Settings()
assert settings.data_dir == Path('/custom/data')
assert settings.port == 9000
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_config.py -v
预期: FAIL - "cannot import name 'settings' from 'app.config'"
- Step 3: 实现配置模块
backend/app/config.py:
import os
from pathlib import Path
from typing import Literal
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""应用配置,从环境变量加载."""
# 安全配置
encrypt_key: str # AES-256 密钥 (base64)
api_token: str # API 认证 Token
# 路径配置
data_dir: Path = Path('./data')
# 服务器配置
host: str = '0.0.0.0'
port: int = 8000
model_config = SettingsConfigDict(
env_prefix='GM_',
env_file='.env',
env_file_encoding='utf-8',
)
@property
def db_path(self) -> Path:
"""SQLite 数据库路径."""
return self.data_dir / 'git_manager.db'
@property
def ssh_keys_dir(self) -> Path:
"""SSH 密钥存储目录."""
return self.data_dir / 'ssh_keys'
@property
def repos_dir(self) -> Path:
"""仓库镜像存储目录."""
return self.data_dir / 'repos'
settings = Settings()
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_config.py -v
预期: PASS
- Step 5: 提交
git add backend/app/config.py backend/tests/test_config.py
git commit -m "feat: add configuration management"
Task 1.3: 安全模块 (加密 + 认证)
文件:
-
创建:
backend/app/security.py -
创建:
backend/tests/test_security.py -
Step 1: 编写安全模块测试
backend/tests/test_security.py:
import base64
import pytest
def test_encrypt_decrypt_roundtrip():
"""测试加密解密往返."""
from app.security import encrypt_data, decrypt_data
original = b"sensitive-secret-data"
key = base64.b64encode(b'test-key-32-bytes-long-1234567890').decode()
encrypted = encrypt_data(original, key)
assert encrypted != original
assert isinstance(encrypted, bytes)
assert len(encrypted) > len(original) # nonce + ciphertext + tag
decrypted = decrypt_data(encrypted, key)
assert decrypted == original
def test_verify_api_token_success(test_env_vars):
"""测试 API token 验证成功."""
from app.security import verify_api_token
assert verify_api_token("Bearer test-token") is True
def test_verify_api_token_failure():
"""测试 API token 验证失败."""
from app.security import verify_api_token
assert verify_api_token("Bearer wrong-token") is False
assert verify_api_token(None) is False
assert verify_api_token("Basic token") is False
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_security.py -v
预期: FAIL - "cannot import name 'encrypt_data' from 'app.security'"
- Step 3: 实现安全模块
backend/app/security.py:
import base64
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
def encrypt_data(data: bytes, key_b64: str) -> bytes:
"""
使用 AES-256-GCM 加密数据.
Args:
data: 原始数据
key_b64: Base64 编码的加密密钥
Returns:
加密后的数据 (nonce + ciphertext + tag)
"""
key = base64.b64decode(key_b64)
nonce = os.urandom(12) # GCM 96-bit nonce
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce),
backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
# 返回 nonce + ciphertext + tag
return nonce + ciphertext + encryptor.tag
def decrypt_data(encrypted_data: bytes, key_b64: str) -> bytes:
"""
解密使用 encrypt_data 加密的数据.
Args:
encrypted_data: 加密数据 (nonce + ciphertext + tag)
key_b64: Base64 编码的加密密钥
Returns:
解密后的原始数据
"""
key = base64.b64decode(key_b64)
nonce = encrypted_data[:12]
tag = encrypted_data[-16:]
ciphertext = encrypted_data[12:-16]
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
def verify_api_token(authorization: str | None) -> bool:
"""
验证 Bearer Token.
Args:
authorization: Authorization 头部值
Returns:
Token 是否有效
"""
if not authorization:
return False
if not authorization.startswith('Bearer '):
return False
from app.config import settings
token = authorization[7:] # 移除 'Bearer ' 前缀
return token == settings.api_token
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_security.py -v
预期: PASS
- Step 5: 提交
git add backend/app/security.py backend/tests/test_security.py
git commit -m "feat: add security module (encryption + auth)"
Task 1.4: 数据库模块
文件:
-
创建:
backend/app/database.py -
创建:
backend/init_db.py -
创建:
backend/tests/test_database.py -
Step 1: 编写数据库测试
backend/tests/test_database.py:
from pathlib import Path
def test_database_initialization(db_path):
"""测试数据库初始化."""
from app.database import init_db, get_engine, Base
init_db(db_path)
assert db_path.exists()
engine = get_engine()
assert engine is not None
# 创建所有表
Base.metadata.create_all(engine)
assert True # 如果没有异常则成功
def test_get_session(db_path):
"""测试获取数据库会话."""
from app.database import init_db, get_db
init_db(db_path)
with get_db(db_path) as session:
assert session is not None
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_database.py -v
预期: FAIL - "cannot import name 'init_db' from 'app.database'"
- Step 3: 实现数据库模块
backend/app/database.py:
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker, Session
from pathlib import Path
from contextlib import contextmanager
from typing import Generator
class Base(DeclarativeBase):
"""SQLAlchemy 声明基类."""
pass
_engine = None
_session_factory = None
def init_db(db_path: Path) -> None:
"""
初始化数据库引擎和会话工厂.
Args:
db_path: SQLite 数据库文件路径
"""
global _engine, _session_factory
# 确保父目录存在
db_path.parent.mkdir(parents=True, exist_ok=True)
# 创建引擎
_engine = create_engine(
f'sqlite:///{db_path}',
connect_args={'check_same_thread': False}
)
_session_factory = sessionmaker(bind=_engine, autocommit=False, autoflush=False)
def get_engine():
"""获取数据库引擎."""
return _engine
def get_session_factory():
"""获取会话工厂."""
return _session_factory
@contextmanager
def get_db(db_path: Path | None = None) -> Generator[Session, None, None]:
"""
获取数据库会话.
Args:
db_path: 可选,用于初始化数据库
Yields:
SQLAlchemy 会话
"""
if db_path and _engine is None:
init_db(db_path)
if _session_factory is None:
raise RuntimeError("Database not initialized. Call init_db() first.")
session = _session_factory()
try:
yield session
finally:
session.close()
backend/init_db.py:
#!/usr/bin/env python3
"""
数据库初始化脚本.
创建所有表和必要的目录.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from app.config import settings
from app.database import init_db
from app.models import Base
def main():
"""初始化数据库."""
print(f"初始化数据库: {settings.db_path}")
# 创建目录
settings.data_dir.mkdir(parents=True, exist_ok=True)
settings.ssh_keys_dir.mkdir(parents=True, exist_ok=True)
settings.repos_dir.mkdir(parents=True, exist_ok=True)
# 初始化数据库
init_db(settings.db_path)
# 创建所有表
from app.database import get_engine
Base.metadata.create_all(get_engine())
print("数据库初始化成功!")
print(f" - 数据库: {settings.db_path}")
print(f" - SSH 密钥: {settings.ssh_keys_dir}")
print(f" - 仓库: {settings.repos_dir}")
if __name__ == '__main__':
main()
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_database.py -v
预期: PASS
- Step 5: 提交
git add backend/app/database.py backend/init_db.py backend/tests/test_database.py
git commit -m "feat: add database module"
Phase 2: 数据模型
Task 2.1: 创建所有 ORM 模型
文件:
-
创建:
backend/app/models/ssh_key.py -
创建:
backend/app/models/server.py -
创建:
backend/app/models/repo.py -
创建:
backend/app/models/sync_log.py -
修改:
backend/app/models/__init__.py -
Step 1: 编写模型测试
backend/tests/test_models/test_models.py:
from datetime import datetime
import base64
def test_ssh_key_model(db_session):
"""测试 SshKey 模型."""
from app.models.ssh_key import SshKey
key = SshKey(
name='test-key',
private_key=b'encrypted-key-data',
fingerprint='SHA256:abc123'
)
db_session.add(key)
db_session.commit()
assert key.id is not None
assert key.created_at is not None
def test_server_model(db_session):
"""测试 Server 模型."""
from app.models.server import Server
from app.models.ssh_key import SshKey
# 先创建 SSH key
ssh_key = SshKey(
name='test-key',
private_key=b'encrypted',
fingerprint='SHA256:abc'
)
db_session.add(ssh_key)
db_session.flush()
server = Server(
name='test-gitea',
url='https://gitea.example.com',
api_token=b'encrypted-token',
ssh_key_id=ssh_key.id,
sync_enabled=True,
schedule_cron='0 */2 * * *',
local_path='/data/repos/test-gitea',
status='untested'
)
db_session.add(server)
db_session.commit()
assert server.id is not None
assert server.ssh_key.id == ssh_key.id
def test_repo_model(db_session):
"""测试 Repo 模型."""
from app.models.repo import Repo
from app.models.server import Server
from app.models.ssh_key import SshKey
# 创建关联数据
ssh_key = SshKey(name='k', private_key=b'x', fingerprint='y')
db_session.add(ssh_key)
db_session.flush()
server = Server(
name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id,
sync_enabled=True, schedule_cron='x', local_path='p', status='x'
)
db_session.add(server)
db_session.flush()
repo = Repo(
server_id=server.id,
name='my-repo',
full_name='owner/my-repo',
clone_url='git@gitea.com:owner/repo.git',
local_path='/data/r.git',
status='pending'
)
db_session.add(repo)
db_session.commit()
assert repo.id is not None
assert repo.server.id == server.id
def test_sync_log_model(db_session):
"""测试 SyncLog 模型."""
from app.models.sync_log import SyncLog
from app.models.repo import Repo
from app.models.server import Server
from app.models.ssh_key import SshKey
# 创建关联数据链
ssh_key = SshKey(name='k', private_key=b'x', fingerprint='y')
db_session.add(ssh_key)
db_session.flush()
server = Server(
name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id,
sync_enabled=True, schedule_cron='x', local_path='p', status='x'
)
db_session.add(server)
db_session.flush()
repo = Repo(
server_id=server.id, name='r', full_name='f', clone_url='u',
local_path='p', status='s'
)
db_session.add(repo)
db_session.flush()
log = SyncLog(
repo_id=repo.id,
status='synced',
started_at=datetime.utcnow(),
finished_at=datetime.utcnow(),
commits_count=5
)
db_session.add(log)
db_session.commit()
assert log.id is not None
assert log.repo.id == repo.id
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_models/test_models.py -v
预期: FAIL - "cannot import name 'SshKey' from 'app.models.ssh_key'"
- Step 3: 实现 SshKey 模型
backend/app/models/ssh_key.py:
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text
from app.database import Base
class SshKey(Base):
"""SSH 私钥存储模型."""
__tablename__ = 'ssh_keys'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False, unique=True)
private_key = Column(Text, nullable=False) # AES-256 加密
fingerprint = Column(String(64), nullable=False)
created_at = Column(Integer, nullable=False) # Unix timestamp
def __repr__(self):
return f'<SshKey(id={self.id}, name="{self.name}")>'
- Step 4: 实现 Server 模型
backend/app/models/server.py:
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, ForeignKey
from sqlalchemy.orm import relationship
from app.database import Base
class Server(Base):
"""Gitea 服务器配置模型."""
__tablename__ = 'servers'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False, unique=True)
url = Column(String(500), nullable=False)
api_token = Column(Text, nullable=False) # AES-256 加密
ssh_key_id = Column(Integer, ForeignKey('ssh_keys.id'), nullable=False)
sync_enabled = Column(Boolean, default=False, nullable=False)
schedule_cron = Column(String(50), nullable=False)
local_path = Column(String(500), nullable=False)
status = Column(String(20), default='untested', nullable=False)
created_at = Column(Integer, nullable=False)
updated_at = Column(Integer, nullable=False)
# 关系
ssh_key = relationship('SshKey', backref='servers')
repos = relationship('Repo', back_populates='server', cascade='all, delete-orphan')
def __repr__(self):
return f'<Server(id={self.id}, name="{self.name}", status="{self.status}")>'
- Step 5: 实现 Repo 模型
backend/app/models/repo.py:
from datetime import datetime
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from app.database import Base
class Repo(Base):
"""Git 仓库镜像模型."""
__tablename__ = 'repos'
id = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('servers.id'), nullable=False)
name = Column(String(200), nullable=False)
full_name = Column(String(300), nullable=False)
clone_url = Column(String(500), nullable=False)
local_path = Column(String(500), nullable=False)
last_sync_at = Column(Integer, nullable=True)
status = Column(String(20), default='pending', nullable=False)
created_at = Column(Integer, nullable=False)
# 关系
server = relationship('Server', back_populates='repos')
sync_logs = relationship('SyncLog', back_populates='repo', cascade='all, delete-orphan')
def __repr__(self):
return f'<Repo(id={self.id}, name="{self.full_name}", status="{self.status}")>'
- Step 6: 实现 SyncLog 模型
backend/app/models/sync_log.py:
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, ForeignKey
from sqlalchemy.orm import relationship
from app.database import Base
class SyncLog(Base):
"""仓库同步操作日志模型."""
__tablename__ = 'sync_logs'
id = Column(Integer, primary_key=True, autoincrement=True)
repo_id = Column(Integer, ForeignKey('repos.id'), nullable=False)
status = Column(String(20), nullable=False)
started_at = Column(Integer, nullable=False)
finished_at = Column(Integer, nullable=True)
commits_count = Column(Integer, nullable=True)
error_msg = Column(Text, nullable=True)
created_at = Column(Integer, nullable=False)
# 关系
repo = relationship('Repo', back_populates='sync_logs')
def __repr__(self):
return f'<SyncLog(id={self.id}, repo_id={self.repo_id}, status="{self.status}")>'
- Step 7: 更新 models/init.py
backend/app/models/__init__.py:
from app.database import Base
from app.models.ssh_key import SshKey
from app.models.server import Server
from app.models.repo import Repo
from app.models.sync_log import SyncLog
__all__ = ['Base', 'SshKey', 'Server', 'Repo', 'SyncLog']
- Step 8: 运行测试验证通过
cd backend && pytest tests/test_models/test_models.py -v
预期: PASS
- Step 9: 提交
git add backend/app/models/ backend/tests/test_models/
git commit -m "feat: add all ORM models"
Phase 3: Pydantic Schemas
Task 3.1: 创建所有 Pydantic 模型
文件:
-
创建:
backend/app/schemas/common.py -
创建:
backend/app/schemas/ssh_key.py -
创建:
backend/app/schemas/server.py -
创建:
backend/app/schemas/repo.py -
创建:
backend/app/schemas/sync_log.py -
修改:
backend/app/schemas/__init__.py -
Step 1: 编写 Schemas 测试
backend/tests/test_schemas/test_schemas.py:
from datetime import datetime
def test_common_schemas():
"""测试通用响应格式."""
from app.schemas.common import SuccessResponse, ErrorResponse
resp = SuccessResponse(data={'key': 'value'}, message='OK')
assert resp.code == 0
assert resp.data == {'key': 'value'}
err = ErrorResponse(message='Not found', code=404)
assert err.code == 404
assert err.message == 'Not found'
def test_ssh_key_schemas():
"""测试 SSH key schemas."""
from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse
create = SshKeyCreate(
name='test-key',
private_key='ssh-rsa AAAA...'
)
assert create.name == 'test-key'
resp = SshKeyResponse(
id=1,
name='test-key',
fingerprint='SHA256:abc',
servers_count=1,
created_at=int(datetime.utcnow().timestamp())
)
assert resp.servers_count == 1
def test_server_schemas():
"""测试 Server schemas."""
from app.schemas.server import ServerCreate, ServerResponse
create = ServerCreate(
name='test',
url='https://gitea.com',
api_token='token',
ssh_key_id=1
)
assert create.sync_enabled is False # 默认值
resp = ServerResponse(
id=1,
name='test',
url='https://gitea.com',
sync_enabled=True,
schedule_cron='0 */2 * * *',
local_path='/data/r',
status='connected',
repos_count=5,
ssh_key_name='key',
created_at=int(datetime.utcnow().timestamp()),
updated_at=int(datetime.utcnow().timestamp())
)
assert resp.repos_count == 5
def test_repo_schemas():
"""测试 Repo schemas."""
from app.schemas.repo import RepoResponse, CommitInfo
resp = RepoResponse(
id=1,
server_id=1,
server_name='gitea',
name='repo',
full_name='owner/repo',
clone_url='git@u:r.git',
local_path='/p',
status='synced',
created_at=int(datetime.utcnow().timestamp())
)
assert resp.full_name == 'owner/repo'
commit = CommitInfo(
hash='abc',
message='Fix',
author='Bob',
date=int(datetime.utcnow().timestamp())
)
assert commit.hash == 'abc'
def test_sync_log_schemas():
"""测试 SyncLog schemas."""
from app.schemas.sync_log import SyncLogResponse
resp = SyncLogResponse(
id=1,
repo_id=1,
repo_name='owner/repo',
status='synced',
started_at=int(datetime.utcnow().timestamp()),
finished_at=int(datetime.utcnow().timestamp()),
commits_count=5,
created_at=int(datetime.utcnow().timestamp())
)
assert resp.commits_count == 5
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_schemas/test_schemas.py -v
预期: FAIL - "cannot import..."
- Step 3: 实现所有 Schemas
backend/app/schemas/common.py:
from typing import Generic, TypeVar, Optional
from pydantic import BaseModel, Field
T = TypeVar('T')
class SuccessResponse(BaseModel, Generic[T]):
"""标准成功响应."""
code: int = Field(default=0, description="状态码,0表示成功")
data: Optional[T] = Field(default=None, description="响应数据")
message: str = Field(default='success', description="响应消息")
class ErrorResponse(BaseModel):
"""标准错误响应."""
code: int = Field(..., description="错误码")
message: str = Field(..., description="错误消息")
data: Optional[dict] = Field(default=None, description="额外错误数据")
backend/app/schemas/ssh_key.py:
from typing import Optional
from pydantic import BaseModel, Field
class SshKeyCreate(BaseModel):
"""创建 SSH key 请求."""
name: str = Field(..., min_length=1, max_length=100)
private_key: str = Field(..., description="私钥内容")
password: Optional[str] = Field(default=None, description="密钥密码")
class SshKeyResponse(BaseModel):
"""SSH key 响应."""
id: int
name: str
fingerprint: str
servers_count: int = 0
created_at: int # Unix timestamp
backend/app/schemas/server.py:
from typing import Optional
from pydantic import BaseModel, Field
class ServerCreate(BaseModel):
"""创建服务器请求."""
name: str = Field(..., min_length=1, max_length=100)
url: str = Field(..., description="Gitea 服务器 URL")
api_token: str = Field(..., description="Gitea API Token")
ssh_key_id: int = Field(..., description="SSH key ID")
sync_enabled: bool = Field(default=False)
schedule_cron: str = Field(default='0 */2 * * *')
class ServerUpdate(BaseModel):
"""更新服务器请求."""
name: Optional[str] = Field(None, min_length=1, max_length=100)
url: Optional[str] = None
api_token: Optional[str] = None
ssh_key_id: Optional[int] = None
sync_enabled: Optional[bool] = None
schedule_cron: Optional[str] = None
class ServerResponse(BaseModel):
"""服务器响应."""
id: int
name: str
url: str
sync_enabled: bool
schedule_cron: str
local_path: str
status: str
repos_count: int = 0
ssh_key_name: str
created_at: int
updated_at: int
backend/app/schemas/repo.py:
from typing import Optional, List
from pydantic import BaseModel
class CommitInfo(BaseModel):
"""提交信息."""
hash: str
message: str
author: str
date: int # Unix timestamp
class RepoResponse(BaseModel):
"""仓库响应."""
id: int
server_id: int
server_name: str
name: str
full_name: str
clone_url: str
local_path: str
status: str
last_sync_at: Optional[int] = None
created_at: int
backend/app/schemas/sync_log.py:
from typing import Optional
from pydantic import BaseModel
class SyncLogResponse(BaseModel):
"""同步日志响应."""
id: int
repo_id: int
repo_name: str
status: str
started_at: int
finished_at: Optional[int] = None
commits_count: Optional[int] = None
error_msg: Optional[str] = None
created_at: int
backend/app/schemas/__init__.py:
from app.schemas.common import SuccessResponse, ErrorResponse
from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse
from app.schemas.server import ServerCreate, ServerUpdate, ServerResponse
from app.schemas.repo import RepoResponse, CommitInfo
from app.schemas.sync_log import SyncLogResponse
__all__ = [
'SuccessResponse', 'ErrorResponse',
'SshKeyCreate', 'SshKeyResponse',
'ServerCreate', 'ServerUpdate', 'ServerResponse',
'RepoResponse', 'CommitInfo',
'SyncLogResponse',
]
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_schemas/test_schemas.py -v
预期: PASS
- Step 5: 提交
git add backend/app/schemas/ backend/tests/test_schemas/
git commit -m "feat: add all Pydantic schemas"
Phase 4: 服务层
Task 4.1: SSH Key 服务
文件:
-
创建:
backend/app/services/ssh_key_service.py -
创建:
backend/tests/test_services/test_ssh_key_service.py -
Step 1: 编写服务测试
backend/tests/test_services/test_ssh_key_service.py:
import base64
import pytest
def test_create_ssh_key(db_session, test_encrypt_key):
"""测试创建 SSH key."""
from app.services.ssh_key_service import SshKeyService
from app.models.ssh_key import SshKey
service = SshKeyService(db_session, test_encrypt_key)
ssh_key = service.create_ssh_key(
name='test-key',
private_key='ssh-rsa AAAAB3...'
)
assert ssh_key.id is not None
assert ssh_key.name == 'test-key'
assert ssh_key.fingerprint is not None
def test_list_ssh_keys(db_session, test_encrypt_key):
"""测试列出 SSH keys."""
from app.services.ssh_key_service import SshKeyService
service = SshKeyService(db_session, test_encrypt_key)
service.create_ssh_key('key1', 'ssh-rsa AAAA...')
service.create_ssh_key('key2', 'ssh-rsa BBBB...')
keys = service.list_ssh_keys()
assert len(keys) == 2
def test_delete_ssh_key(db_session, test_encrypt_key):
"""测试删除 SSH key."""
from app.services.ssh_key_service import SshKeyService
from app.models.ssh_key import SshKey
service = SshKeyService(db_session, test_encrypt_key)
ssh_key = service.create_ssh_key('test', 'ssh-rsa AAAA...')
key_id = ssh_key.id
service.delete_ssh_key(key_id)
deleted = db_session.query(SshKey).filter(SshKey.id == key_id).first()
assert deleted is None
def test_delete_in_use_key_fails(db_session, test_encrypt_key):
"""测试删除使用中的 key 失败."""
from app.services.ssh_key_service import SshKeyService
from app.models.server import Server
service = SshKeyService(db_session, test_encrypt_key)
ssh_key = service.create_ssh_key('test', 'ssh-rsa AAAA...')
# 关联服务器
server = Server(
name='s', url='u', api_token=b't', ssh_key_id=ssh_key.id,
sync_enabled=True, schedule_cron='x', local_path='p',
status='x', created_at=0, updated_at=0
)
db_session.add(server)
db_session.commit()
with pytest.raises(ValueError, match="in use"):
service.delete_ssh_key(ssh_key.id)
def test_get_decrypted_key(db_session, test_encrypt_key):
"""测试获取解密的 key."""
from app.services.ssh_key_service import SshKeyService
service = SshKeyService(db_session, test_encrypt_key)
original_key = 'ssh-rsa AAAAB3...'
ssh_key = service.create_ssh_key('test', original_key)
decrypted = service.get_decrypted_key(ssh_key.id)
assert decrypted == original_key
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_services/test_ssh_key_service.py -v
预期: FAIL
- Step 3: 实现 SSH Key 服务
backend/app/services/ssh_key_service.py:
import base64
import hashlib
import time
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.ssh_key import SshKey
from app.security import encrypt_data, decrypt_data
class SshKeyService:
"""SSH key 操作服务."""
def __init__(self, db: Session, encryption_key: str):
self.db = db
self.encryption_key = encryption_key
def create_ssh_key(self, name: str, private_key: str, password: str | None = None) -> SshKey:
"""创建新的 SSH key."""
# 检查名称是否已存在
existing = self.db.query(SshKey).filter(SshKey.name == name).first()
if existing:
raise ValueError(f"SSH key '{name}' already exists")
# 加密私钥
encrypted = encrypt_data(private_key.encode(), self.encryption_key)
# 生成指纹
fingerprint = self._generate_fingerprint(private_key)
ssh_key = SshKey(
name=name,
private_key=encrypted,
fingerprint=fingerprint,
created_at=int(time.time())
)
self.db.add(ssh_key)
self.db.commit()
self.db.refresh(ssh_key)
return ssh_key
def list_ssh_keys(self) -> List[SshKey]:
"""列出所有 SSH keys."""
return self.db.query(SshKey).all()
def get_ssh_key(self, key_id: int) -> Optional[SshKey]:
"""获取 SSH key."""
return self.db.query(SshKey).filter(SshKey.id == key_id).first()
def delete_ssh_key(self, key_id: int) -> bool:
"""删除 SSH key."""
ssh_key = self.get_ssh_key(key_id)
if not ssh_key:
return False
# 检查是否在使用中
if ssh_key.servers:
raise ValueError(f"Cannot delete SSH key used by {len(ssh_key.servers)} server(s)")
self.db.delete(ssh_key)
self.db.commit()
return True
def get_decrypted_key(self, key_id: int) -> str:
"""获取解密后的私钥."""
ssh_key = self.get_ssh_key(key_id)
if not ssh_key:
raise ValueError(f"SSH key {key_id} not found")
decrypted = decrypt_data(ssh_key.private_key, self.encryption_key)
return decrypted.decode()
def _generate_fingerprint(self, public_key: str) -> str:
"""生成 SSH key 指纹."""
try:
key_data = public_key.encode()
hash_obj = hashlib.sha256(key_data)
return f"SHA256:{base64.b64encode(hash_obj.digest()).decode()[:16]}"
except Exception:
return "SHA256:unknown"
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_services/test_ssh_key_service.py -v
预期: PASS
- Step 5: 提交
git add backend/app/services/ssh_key_service.py backend/tests/test_services/test_ssh_key_service.py
git commit -m "feat: add SSH key service"
Task 4.2: Server 服务
文件:
-
创建:
backend/app/services/server_service.py -
创建:
backend/tests/test_services/test_server_service.py -
Step 1: 编写 Server 服务测试
backend/tests/test_services/test_server_service.py:
import base64
import time
import pytest
def test_create_server(db_session, test_encrypt_key):
"""测试创建服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
# 先创建 SSH key
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('test-key', 'ssh-rsa AAAA...')
# 创建服务器
svc = ServerService(db_session, test_encrypt_key)
server = svc.create_server(
name='test-gitea',
url='https://gitea.example.com',
api_token='gitea-token',
ssh_key_id=ssh_key.id
)
assert server.id is not None
assert server.name == 'test-gitea'
assert server.status == 'untested'
def test_list_servers(db_session, test_encrypt_key):
"""测试列出服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...')
svc = ServerService(db_session, test_encrypt_key)
svc.create_server('s1', 'https://g1.com', 't1', ssh_key.id)
svc.create_server('s2', 'https://g2.com', 't2', ssh_key.id)
servers = svc.list_servers()
assert len(servers) == 2
def test_delete_server(db_session, test_encrypt_key):
"""测试删除服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
from app.models.server import Server
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...')
svc = ServerService(db_session, test_encrypt_key)
server = svc.create_server('test', 'https://g.com', 't', ssh_key.id)
server_id = server.id
svc.delete_server(server_id)
deleted = db_session.query(Server).filter(Server.id == server_id).first()
assert deleted is None
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_services/test_server_service.py -v
预期: FAIL
- Step 3: 实现 Server 服务
backend/app/services/server_service.py:
import time
from typing import List, Optional
from sqlalchemy.orm import Session
from pathlib import Path
from app.models.server import Server
from app.models.repo import Repo
from app.security import encrypt_data, decrypt_data
class ServerService:
"""服务器操作服务."""
def __init__(self, db: Session, encryption_key: str, repos_dir: Path):
self.db = db
self.encryption_key = encryption_key
self.repos_dir = repos_dir
def create_server(
self,
name: str,
url: str,
api_token: str,
ssh_key_id: int,
sync_enabled: bool = False,
schedule_cron: str = '0 */2 * * *'
) -> Server:
"""创建新服务器."""
# 检查名称
existing = self.db.query(Server).filter(Server.name == name).first()
if existing:
raise ValueError(f"Server '{name}' already exists")
# 加密 API token
encrypted_token = encrypt_data(api_token.encode(), self.encryption_key)
# 生成本地路径
local_path = str(self.repos_dir / name)
now = int(time.time())
server = Server(
name=name,
url=url,
api_token=encrypted_token,
ssh_key_id=ssh_key_id,
sync_enabled=sync_enabled,
schedule_cron=schedule_cron,
local_path=local_path,
status='untested',
created_at=now,
updated_at=now
)
self.db.add(server)
self.db.commit()
self.db.refresh(server)
return server
def list_servers(self) -> List[Server]:
"""列出所有服务器."""
return self.db.query(Server).all()
def get_server(self, server_id: int) -> Optional[Server]:
"""获取服务器."""
return self.db.query(Server).filter(Server.id == server_id).first()
def update_server(self, server_id: int, **kwargs) -> Optional[Server]:
"""更新服务器."""
server = self.get_server(server_id)
if not server:
return None
for key, value in kwargs.items():
if value is not None and hasattr(server, key):
if key == 'api_token':
value = encrypt_data(value.encode(), self.encryption_key)
setattr(server, key, value)
server.updated_at = int(time.time())
self.db.commit()
self.db.refresh(server)
return server
def delete_server(self, server_id: int) -> bool:
"""删除服务器."""
server = self.get_server(server_id)
if not server:
return False
self.db.delete(server)
self.db.commit()
return True
def get_decrypted_token(self, server: Server) -> str:
"""获取解密后的 API token."""
decrypted = decrypt_data(server.api_token, self.encryption_key)
return decrypted.decode()
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_services/test_server_service.py -v
预期: PASS
- Step 5: 提交
git add backend/app/services/server_service.py backend/tests/test_services/test_server_service.py
git commit -m "feat: add server service"
Task 4.3: Sync 服务
文件:
-
创建:
backend/app/services/sync_service.py -
创建:
backend/tests/test_services/test_sync_service.py -
Step 1: 编写 Sync 服务测试
backend/tests/test_services/test_sync_service.py:
import time
from pathlib import Path
import pytest
def test_sync_repo_not_implemented_yet(db_session):
"""测试同步仓库 (暂时跳过 Git 操作)."""
from app.services.sync_service import SyncService
from app.services.ssh_key_service import SshKeyService
from app.services.server_service import ServerService
from app.models.repo import Repo
# 准备数据
ssh_svc = SshKeyService(db_session, 'test-key')
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa AAAA...')
srv_svc = ServerService(db_session, 'test-key', Path('/tmp/repos'))
server = srv_svc.create_server('test', 'https://g.com', 't', ssh_key.id)
repo = Repo(
server_id=server.id,
name='test-repo',
full_name='owner/test-repo',
clone_url='git@example.com:owner/repo.git',
local_path='/tmp/test.git',
status='pending',
created_at=int(time.time())
)
db_session.add(repo)
db_session.commit()
# 同步 (由于没有真实 Git 服务器,会失败但验证流程)
sync_svc = SyncService(db_session, 'test-key')
# 这里应该会失败,因为没有真实的 Git 服务器
# 测试主要是验证服务结构正确
assert sync_svc is not None
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_services/test_sync_service.py -v
预期: FAIL
- Step 3: 实现 Sync 服务
backend/app/services/sync_service.py:
import time
import subprocess
from pathlib import Path
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
import tempfile
from app.models.repo import Repo
from app.models.sync_log import SyncLog
from app.models.server import Server
from app.security import decrypt_data
from app.services.ssh_key_service import SshKeyService
from app.services.server_service import ServerService
class SyncService:
"""仓库同步服务."""
def __init__(self, db: Session, encryption_key: str):
self.db = db
self.encryption_key = encryption_key
self.ssh_key_service = SshKeyService(db, encryption_key)
def sync_repo(self, repo: Repo, ssh_key_content: str) -> SyncLog:
"""
同步单个仓库.
Args:
repo: 要同步的仓库
ssh_key_content: SSH 私钥内容
Returns:
同步日志
"""
started_at = int(time.time())
# 创建同步日志
sync_log = SyncLog(
repo_id=repo.id,
status='synced',
started_at=started_at,
finished_at=None,
commits_count=None,
created_at=started_at
)
try:
# 准备本地目录
local_path = Path(repo.local_path)
local_path.parent.mkdir(parents=True, exist_ok=True)
# 使用 Git 同步
if local_path.exists():
# 已存在,执行 fetch
commits_count = self._fetch_repo(local_path, ssh_key_content)
else:
# 不存在,执行 clone
commits_count = self._clone_repo(repo.clone_url, local_path, ssh_key_content)
# 更新成功状态
sync_log.status = 'synced'
sync_log.commits_count = commits_count
sync_log.finished_at = int(time.time())
# 更新仓库状态
repo.status = 'synced'
repo.last_sync_at = sync_log.finished_at
except Exception as e:
# 更新失败状态
sync_log.status = 'failed'
sync_log.error_msg = str(e)
sync_log.finished_at = int(time.time())
repo.status = 'failed'
self.db.add(sync_log)
self.db.commit()
return sync_log
def _clone_repo(self, clone_url: str, local_path: Path, ssh_key: str) -> int:
"""克隆仓库 (mirror 模式)."""
# 使用临时 SSH key
with tempfile.NamedTemporaryFile(mode='w', suffix='.key', delete=False) as f:
f.write(ssh_key)
key_path = f.name
try:
# GIT_SSH_COMMAND 环境变量
env = {
'GIT_SSH_COMMAND': f'ssh -i {key_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
}
# git clone --mirror
result = subprocess.run(
['git', 'clone', '--mirror', clone_url, str(local_path)],
env=env,
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
raise Exception(f"Git clone failed: {result.stderr}")
# 计算提交数
return self._count_commits(local_path)
finally:
Path(key_path).unlink(missing_ok=True)
def _fetch_repo(self, local_path: Path, ssh_key: str) -> int:
"""获取仓库更新."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.key', delete=False) as f:
f.write(ssh_key)
key_path = f.name
try:
env = {
'GIT_SSH_COMMAND': f'ssh -i {key_path} -o StrictHostKeyChecking=no'
}
result = subprocess.run(
['git', 'fetch', '--all'],
cwd=local_path,
env=env,
capture_output=True,
text=True,
timeout=300
)
if result.returncode != 0:
raise Exception(f"Git fetch failed: {result.stderr}")
return self._count_commits(local_path)
finally:
Path(key_path).unlink(missing_ok=True)
def _count_commits(self, repo_path: Path) -> int:
"""计算仓库提交数."""
result = subprocess.run(
['git', 'rev-list', '--count', '--all'],
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode == 0:
try:
return int(result.stdout.strip())
except ValueError:
pass
return 0
def get_repo_commits(self, repo: Repo, limit: int = 50) -> list:
"""获取仓库提交历史."""
repo_path = Path(repo.local_path)
if not repo_path.exists():
return []
result = subprocess.run(
['git', 'log', '--all', f'-{limit}', '--format=%H|%s|%an|%ct'],
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode != 0:
return []
commits = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('|', 3)
if len(parts) == 4:
commits.append({
'hash': parts[0],
'message': parts[1],
'author': parts[2],
'date': int(parts[3])
})
return commits
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_services/test_sync_service.py -v
预期: PASS (测试结构验证)
- Step 5: 提交
git add backend/app/services/sync_service.py backend/tests/test_services/test_sync_service.py
git commit -m "feat: add sync service with Git operations"
Task 4.4: Repo 服务
文件:
-
创建:
backend/app/services/repo_service.py -
创建:
backend/tests/test_services/test_repo_service.py -
Step 1: 编写 Repo 服务测试
backend/tests/test_services/test_repo_service.py:
import time
from pathlib import Path
import pytest
def test_create_repo(db_session):
"""测试创建仓库记录."""
from app.services.repo_service import RepoService
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
# 准备数据
ssh_svc = SshKeyService(db_session, 'key')
ssh_key = ssh_svc.create_ssh_key('k', 'ssh-rsa A')
srv_svc = ServerService(db_session, 'key', Path('/tmp/r'))
server = srv_svc.create_server('s', 'https://g.com', 't', ssh_key.id)
# 创建仓库
repo_svc = RepoService(db_session)
repo = repo_svc.create_repo(
server_id=server.id,
name='test-repo',
full_name='owner/test-repo',
clone_url='git@u:r.git'
)
assert repo.id is not None
assert repo.full_name == 'owner/test-repo'
def test_list_repos_by_server(db_session):
"""测试按服务器列出仓库."""
from app.services.repo_service import RepoService
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, 'key')
ssh_key = ssh_svc.create_ssh_key('k', 'ssh-rsa A')
srv_svc = ServerService(db_session, 'key', Path('/tmp/r'))
server = srv_svc.create_server('s', 'https://g.com', 't', ssh_key.id)
repo_svc = RepoService(db_session)
repo_svc.create_repo(server.id, 'r1', 'o/r1', 'git@u:r1.git')
repo_svc.create_repo(server.id, 'r2', 'o/r2', 'git@u:r2.git')
repos = repo_svc.list_repos(server_id=server.id)
assert len(repos) == 2
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_services/test_repo_service.py -v
预期: FAIL
- Step 3: 实现 Repo 服务
backend/app/services/repo_service.py:
import time
from typing import List, Optional
from pathlib import Path
from sqlalchemy.orm import Session
from app.models.repo import Repo
class RepoService:
"""仓库操作服务."""
def __init__(self, db: Session):
self.db = db
def create_repo(
self,
server_id: int,
name: str,
full_name: str,
clone_url: str,
local_path: str | None = None
) -> Repo:
"""创建仓库记录."""
repo = Repo(
server_id=server_id,
name=name,
full_name=full_name,
clone_url=clone_url,
local_path=local_path or f'/data/repos/{full_name}.git',
status='pending',
created_at=int(time.time())
)
self.db.add(repo)
self.db.commit()
self.db.refresh(repo)
return repo
def list_repos(self, server_id: int | None = None) -> List[Repo]:
"""列出仓库."""
query = self.db.query(Repo)
if server_id is not None:
query = query.filter(Repo.server_id == server_id)
return query.all()
def get_repo(self, repo_id: int) -> Optional[Repo]:
"""获取仓库."""
return self.db.query(Repo).filter(Repo.id == repo_id).first()
def update_repo_status(self, repo_id: int, status: str) -> Optional[Repo]:
"""更新仓库状态."""
repo = self.get_repo(repo_id)
if not repo:
return None
repo.status = status
if status == 'synced':
repo.last_sync_at = int(time.time())
self.db.commit()
self.db.refresh(repo)
return repo
def delete_repo(self, repo_id: int) -> bool:
"""删除仓库."""
repo = self.get_repo(repo_id)
if not repo:
return False
self.db.delete(repo)
self.db.commit()
return True
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_services/test_repo_service.py -v
预期: PASS
- Step 5: 提交
git add backend/app/services/repo_service.py backend/tests/test_services/test_repo_service.py
git commit -m "feat: add repo service"
Phase 5: API 路由
Task 5.1: API 依赖注入
文件:
-
创建:
backend/app/api/deps.py -
创建:
backend/tests/test_api/test_deps.py -
Step 1: 编写依赖测试
backend/tests/test_api/test_deps.py:
import pytest
from fastapi import HTTPException
def test_require_auth_success(test_env_vars):
"""测试认证成功."""
from app.api.deps import require_auth
# 应该不抛出异常
import asyncio
asyncio.run(require_auth("Bearer test-token"))
def test_require_auth_failure():
"""测试认证失败."""
from app.api.deps import require_auth
import asyncio
with pytest.raises(HTTPException) as exc:
asyncio.run(require_auth("Bearer wrong-token"))
assert exc.value.status_code == 401
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_api/test_deps.py -v
预期: FAIL
- Step 3: 实现依赖注入
backend/app/api/deps.py:
from typing import Annotated
from fastapi import Depends, HTTPException, Header, status
from sqlalchemy.orm import Session
from app.database import _session_factory
from app.security import verify_api_token
def get_db_session() -> Session:
"""获取数据库会话."""
if _session_factory is None:
raise RuntimeError("Database not initialized")
session = _session_factory()
try:
yield session
finally:
session.close()
# 类型别名
DBSession = Annotated[Session, Depends(get_db_session)]
async def require_auth(
authorization: Annotated[str, Header()] = None
) -> None:
"""要求认证."""
if not verify_api_token(authorization):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token"
)
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_api/test_deps.py -v
预期: PASS
- Step 5: 提交
git add backend/app/api/deps.py backend/tests/test_api/test_deps.py
git commit -m "feat: add API dependencies (auth, DB session)"
Task 5.2: SSH Keys API
文件:
-
创建:
backend/app/api/ssh_keys.py -
创建:
backend/tests/test_api/test_ssh_keys.py -
Step 1: 编写 SSH Keys API 测试
backend/tests/test_api/test_ssh_keys.py:
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client_with_auth(test_env_vars):
"""创建带认证的测试客户端."""
from app.main import app
return TestClient(app)
def test_list_ssh_keys(client_with_auth, db_session, test_encrypt_key):
"""测试列出 SSH keys."""
from app.services.ssh_key_service import SshKeyService
# 创建测试数据
svc = SshKeyService(db_session, test_encrypt_key)
svc.create_ssh_key('key1', 'ssh-rsa A')
svc.create_ssh_key('key2', 'ssh-rsa B')
# API 调用
response = client_with_auth.get(
"/api/v1/ssh-keys",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert len(data['data']) == 2
def test_create_ssh_key(client_with_auth, db_session):
"""测试创建 SSH key."""
response = client_with_auth.post(
"/api/v1/ssh-keys",
json={
"name": "test-key",
"private_key": "ssh-rsa AAAAB3..."
},
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert data['data']['name'] == 'test-key'
def test_delete_ssh_key(client_with_auth, db_session, test_encrypt_key):
"""测试删除 SSH key."""
from app.services.ssh_key_service import SshKeyService
# 创建测试数据
svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = svc.create_ssh_key('test', 'ssh-rsa A')
# API 调用
response = client_with_auth.delete(
f"/api/v1/ssh-keys/{ssh_key.id}",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_api/test_ssh_keys.py -v
预期: FAIL
- Step 3: 实现 SSH Keys API
backend/app/api/ssh_keys.py:
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.deps import DBSession, require_auth
from app.services.ssh_key_service import SshKeyService
from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse
from app.schemas.common import SuccessResponse, ErrorResponse
from app.config import settings
router = APIRouter(prefix='/api/v1/ssh-keys', tags=['ssh-keys'])
def get_ssh_service(db: DBSession) -> SshKeyService:
"""获取 SSH key 服务实例."""
return SshKeyService(db, settings.encrypt_key)
SshService = Annotated[SshKeyService, Depends(get_ssh_service)]
@router.get('', response_model=SuccessResponse[list[SshKeyResponse]])
async def list_ssh_keys(
service: SshService,
_: Annotated[None, Depends(require_auth)]
):
"""列出所有 SSH keys."""
keys = service.list_ssh_keys()
return SuccessResponse(
data=[
SshKeyResponse(
id=k.id,
name=k.name,
fingerprint=k.fingerprint,
servers_count=len(k.servers),
created_at=k.created_at
)
for k in keys
]
)
@router.post('', response_model=SuccessResponse[SshKeyResponse])
async def create_ssh_key(
data: SshKeyCreate,
service: SshService,
_: Annotated[None, Depends(require_auth)]
):
"""创建新的 SSH key."""
try:
ssh_key = service.create_ssh_key(
name=data.name,
private_key=data.private_key,
password=data.password
)
return SuccessResponse(
data=SshKeyResponse(
id=ssh_key.id,
name=ssh_key.name,
fingerprint=ssh_key.fingerprint,
servers_count=0,
created_at=ssh_key.created_at
)
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete('/{key_id}', response_model=SuccessResponse[dict])
async def delete_ssh_key(
key_id: int,
service: SshService,
_: Annotated[None, Depends(require_auth)]
):
"""删除 SSH key."""
try:
service.delete_ssh_key(key_id)
return SuccessResponse(data={'deleted': True})
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_api/test_ssh_keys.py -v
预期: PASS
- Step 5: 提交
git add backend/app/api/ssh_keys.py backend/tests/test_api/test_ssh_keys.py
git commit -m "feat: add SSH keys API endpoints"
Task 5.3: Servers API
文件:
-
创建:
backend/app/api/servers.py -
创建:
backend/tests/test_api/test_servers.py -
Step 1: 编写 Servers API 测试
backend/tests/test_api/test_servers.py:
import pytest
from fastapi.testclient import TestClient
def test_list_servers(client_with_auth, db_session, test_encrypt_key):
"""测试列出服务器."""
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa A')
srv_svc = ServerService(db_session, test_encrypt_key, Path('/tmp/r'))
srv_svc.create_server('s1', 'https://g1.com', 't1', ssh_key.id)
srv_svc.create_server('s2', 'https://g2.com', 't2', ssh_key.id)
from app.main import app
client = TestClient(app)
response = client.get(
"/api/v1/servers",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert len(data['data']) == 2
def test_create_server(client_with_auth, db_session, test_encrypt_key):
"""测试创建服务器."""
from app.services.ssh_key_service import SshKeyService
ssh_svc = SshKeyService(db_session, test_encrypt_key)
ssh_key = ssh_svc.create_ssh_key('key', 'ssh-rsa A')
response = client_with_auth.post(
"/api/v1/servers",
json={
"name": "test-gitea",
"url": "https://gitea.example.com",
"api_token": "test-token",
"ssh_key_id": ssh_key.id
},
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['data']['name'] == 'test-gitea'
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_api/test_servers.py -v
预期: FAIL
- Step 3: 实现 Servers API
backend/app/api/servers.py:
from typing import Annotated
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.api.deps import DBSession, require_auth
from app.services.server_service import ServerService
from app.services.ssh_key_service import SshKeyService
from app.schemas.server import ServerCreate, ServerUpdate, ServerResponse
from app.schemas.common import SuccessResponse
from app.config import settings
router = APIRouter(prefix='/api/v1/servers', tags=['servers'])
def get_server_service(db: DBSession) -> ServerService:
"""获取服务器服务实例."""
return ServerService(db, settings.encrypt_key, settings.repos_dir)
ServerSvc = Annotated[ServerService, Depends(get_server_service)]
@router.get('', response_model=SuccessResponse[list[ServerResponse]])
async def list_servers(
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""列出所有服务器."""
servers = service.list_servers()
return SuccessResponse(
data=[
ServerResponse(
id=s.id,
name=s.name,
url=s.url,
sync_enabled=s.sync_enabled,
schedule_cron=s.schedule_cron,
local_path=s.local_path,
status=s.status,
repos_count=len(s.repos),
ssh_key_name=s.ssh_key.name,
created_at=s.created_at,
updated_at=s.updated_at
)
for s in servers
]
)
@router.post('', response_model=SuccessResponse[ServerResponse])
async def create_server(
data: ServerCreate,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""创建新服务器."""
try:
server = service.create_server(
name=data.name,
url=data.url,
api_token=data.api_token,
ssh_key_id=data.ssh_key_id,
sync_enabled=data.sync_enabled,
schedule_cron=data.schedule_cron
)
return SuccessResponse(
data=ServerResponse(
id=server.id,
name=server.name,
url=server.url,
sync_enabled=server.sync_enabled,
schedule_cron=server.schedule_cron,
local_path=server.local_path,
status=server.status,
repos_count=0,
ssh_key_name=server.ssh_key.name,
created_at=server.created_at,
updated_at=server.updated_at
)
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get('/{server_id}', response_model=SuccessResponse[ServerResponse])
async def get_server(
server_id: int,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""获取服务器详情."""
server = service.get_server(server_id)
if not server:
raise HTTPException(status_code=404, detail="Server not found")
return SuccessResponse(
data=ServerResponse(
id=server.id,
name=server.name,
url=server.url,
sync_enabled=server.sync_enabled,
schedule_cron=server.schedule_cron,
local_path=server.local_path,
status=server.status,
repos_count=len(server.repos),
ssh_key_name=server.ssh_key.name,
created_at=server.created_at,
updated_at=server.updated_at
)
)
@router.put('/{server_id}', response_model=SuccessResponse[ServerResponse])
async def update_server(
server_id: int,
data: ServerUpdate,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""更新服务器."""
server = service.update_server(
server_id,
name=data.name,
url=data.url,
api_token=data.api_token,
ssh_key_id=data.ssh_key_id,
sync_enabled=data.sync_enabled,
schedule_cron=data.schedule_cron
)
if not server:
raise HTTPException(status_code=404, detail="Server not found")
return SuccessResponse(
data=ServerResponse(
id=server.id,
name=server.name,
url=server.url,
sync_enabled=server.sync_enabled,
schedule_cron=server.schedule_cron,
local_path=server.local_path,
status=server.status,
repos_count=len(server.repos),
ssh_key_name=server.ssh_key.name,
created_at=server.created_at,
updated_at=server.updated_at
)
)
@router.delete('/{server_id}', response_model=SuccessResponse[dict])
async def delete_server(
server_id: int,
service: ServerSvc,
_: Annotated[None, Depends(require_auth)]
):
"""删除服务器."""
if not service.delete_server(server_id):
raise HTTPException(status_code=404, detail="Server not found")
return SuccessResponse(data={'deleted': True})
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_api/test_servers.py -v
预期: PASS
- Step 5: 提交
git add backend/app/api/servers.py backend/tests/test_api/test_servers.py
git commit -m "feat: add servers API endpoints"
Task 5.4: Status API
文件:
-
创建:
backend/app/api/status.py -
创建:
backend/tests/test_api/test_status.py -
Step 1: 编写 Status API 测试
backend/tests/test_api/test_status.py:
import pytest
from fastapi.testclient import TestClient
def test_get_status(client_with_auth, db_session):
"""测试获取系统状态."""
response = client_with_auth.get(
"/api/v1/status",
headers={"Authorization": "Bearer test-token"}
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
assert 'disk_usage' in data['data']
assert 'servers' in data['data']
assert 'repos' in data['data']
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_api/test_status.py -v
预期: FAIL
- Step 3: 实现 Status API
backend/app/api/status.py:
import shutil
from typing import Annotated
from fastapi import APIRouter, Depends
from app.api.deps import DBSession, require_auth
from app.models import Server, Repo
from app.schemas.common import SuccessResponse
from app.config import settings
router = APIRouter(tags=['status'])
@router.get('/api/v1/status', response_model=SuccessResponse[dict])
async def get_status(
db: DBSession,
_: Annotated[None, Depends(require_auth)]
):
"""获取系统状态."""
# 磁盘使用
total, used, free = shutil.disk_usage(settings.data_dir)
# 统计数据
server_count = db.query(Server).count()
repo_count = db.query(Repo).count()
synced = db.query(Repo).filter(Repo.status == 'synced').count()
failed = db.query(Repo).filter(Repo.status == 'failed').count()
syncing = db.query(Repo).filter(Repo.status == 'syncing').count()
return SuccessResponse(
data={
'disk_usage': {
'total': total,
'used': used,
'free': free,
'percent': round(used / total * 100, 2) if total > 0 else 0
},
'servers': {
'total': server_count,
'connected': db.query(Server).filter(Server.status == 'connected').count(),
'disconnected': db.query(Server).filter(Server.status == 'disconnected').count(),
},
'repos': {
'total': repo_count,
'synced': synced,
'failed': failed,
'syncing': syncing,
'pending': repo_count - synced - failed - syncing
}
}
)
- Step 4: 运行测试验证通过
cd backend && pytest tests/test_api/test_status.py -v
预期: PASS
- Step 5: 提交
git add backend/app/api/status.py backend/tests/test_api/test_status.py
git commit -m "feat: add status API endpoint"
Phase 6: FastAPI 主应用
Task 6.1: 创建 FastAPI 主应用
文件:
-
创建:
backend/app/main.py -
修改:
backend/tests/conftest.py(添加 client fixture) -
Step 1: 编写主应用测试
backend/tests/test_main.py:
import pytest
from fastapi.testclient import TestClient
def test_app_exists():
"""测试应用创建."""
from app.main import app
assert app is not None
assert app.title == "Git Repo Manager"
def test_root_redirect():
"""测试根路径重定向到前端."""
from app.main import app
client = TestClient(app)
response = client.get("/")
# 应该返回前端页面或重定向
assert response.status_code in [200, 404] # 静态文件可能不存在
def test_api_docs():
"""测试 API 文档可访问."""
from app.main import app
client = TestClient(app)
response = client.get("/docs")
assert response.status_code == 200
- Step 2: 运行测试验证失败
cd backend && pytest tests/test_main.py -v
预期: FAIL
- Step 3: 实现主应用
backend/app/main.py:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from app.config import settings
from app.database import init_db
from app.api import status
from app.api import ssh_keys, servers
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理."""
# 启动时初始化数据库
settings.data_dir.mkdir(parents=True, exist_ok=True)
init_db(settings.db_path)
# 创建表
from app.database import get_engine
from app.models import Base
Base.metadata.create_all(get_engine())
yield
# 关闭时的清理
# 创建 FastAPI 应用
app = FastAPI(
title="Git Repo Manager",
description="Git 仓库同步管理工具",
version="1.0.0",
lifespan=lifespan
)
# 注册路由
app.include_router(status.router)
app.include_router(ssh_keys.router)
app.include_router(servers.router)
# 静态文件服务 (前端构建产物)
static_dir = Path(__file__).parent.parent.parent / 'frontend' / 'dist'
if static_dir.exists():
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
@app.get("/health")
async def health_check():
"""健康检查."""
return {"status": "ok"}
- Step 4: 更新 conftest.py 添加 client fixture
backend/tests/conftest.py 中添加:
@pytest.fixture(scope="function")
def client_with_auth(test_env_vars):
"""带认证的测试客户端."""
from app.main import app
from fastapi.testclient import TestClient
def client_override():
return TestClient(app)
return client_override()
- Step 5: 运行测试验证通过
cd backend && pytest tests/test_main.py -v
预期: PASS
- Step 6: 提交
git add backend/app/main.py backend/tests/test_main.py backend/tests/conftest.py
git commit -m "feat: add FastAPI main application"
Phase 7: 前端 - 基础设置
Task 7.1: 前端项目配置和入口
文件:
-
创建:
frontend/index.html -
创建:
frontend/vite.config.js -
创建:
frontend/src/main.js -
创建:
frontend/src/App.vue -
Step 1: 创建 index.html
frontend/index.html:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Git Repo Manager</title>
</head>
<body>
<div id="app"></div>
<script type="module" src="/src/main.js"></script>
</body>
</html>
- Step 2: 创建 vite.config.js
frontend/vite.config.js:
import { defineConfig } from 'vite'
import vue from '@vitejs/plugin-vue'
export default defineConfig({
plugins: [vue()],
server: {
proxy: {
'/api': {
target: 'http://localhost:8000',
changeOrigin: true
}
}
},
build: {
outDir: 'dist',
emptyOutDir: true
}
})
- Step 3: 创建 main.js
frontend/src/main.js:
import { createApp } from 'vue'
import { createPinia } from 'pinia'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'
import zhCn from 'element-plus/es/locale/lang/zh-cn'
import App from './App.vue'
import router from './router'
const app = createApp(App)
app.use(createPinia())
app.use(router)
app.use(ElementPlus, { locale: zhCn })
app.mount('#app')
- Step 4: 创建 App.vue
frontend/src/App.vue:
<template>
<el-container class="app-container">
<el-header class="app-header">
<div class="header-left">
<h1 class="app-title">Git Manager</h1>
</div>
<div class="header-right">
<el-tag :type="statusTagType">{{ statusText }}</el-tag>
</div>
</el-header>
<el-container class="main-container">
<el-aside width="200px" class="sidebar">
<el-menu
:default-active="activeMenu"
router
class="sidebar-menu"
>
<el-menu-item index="/dashboard">
<el-icon><Odometer /></el-icon>
<span>仪表盘</span>
</el-menu-item>
<el-menu-item index="/servers">
<el-icon><Server /></el-icon>
<span>服务器</span>
</el-menu-item>
<el-menu-item index="/repos">
<el-icon><FolderOpened /></el-icon>
<span>仓库</span>
</el-menu-item>
<el-menu-item index="/sync-logs">
<el-icon><Document /></el-icon>
<span>同步记录</span>
</el-menu-item>
<el-menu-item index="/ssh-keys">
<el-icon><Key /></el-icon>
<span>SSH 密钥</span>
</el-menu-item>
<el-menu-item index="/settings">
<el-icon><Setting /></el-icon>
<span>系统设置</span>
</el-menu-item>
</el-menu>
</el-aside>
<el-main class="main-content">
<router-view />
</el-main>
</el-container>
</el-container>
</template>
<script setup>
import { computed, onMounted, ref } from 'vue'
import { useRoute } from 'vue-router'
import {
Odometer, Server, FolderOpened, Document, Key, Setting
} from '@element-plus/icons-vue'
import { useAppStore } from './stores/app'
const route = useRoute()
const appStore = useAppStore()
const activeMenu = computed(() => route.path)
const statusTagType = computed(() => appStore.connected ? 'success' : 'danger')
const statusText = computed(() => appStore.connected ? '已连接' : '未连接')
onMounted(() => {
appStore.checkStatus()
})
</script>
<style scoped>
.app-container {
height: 100vh;
}
.app-header {
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 1px solid var(--el-border-color);
padding: 0 20px;
}
.header-left .app-title {
margin: 0;
font-size: 20px;
font-weight: 600;
}
.main-container {
height: calc(100vh - 60px);
}
.sidebar {
border-right: 1px solid var(--el-border-color);
}
.sidebar-menu {
border-right: none;
height: 100%;
}
.main-content {
padding: 20px;
overflow-y: auto;
}
</style>
- Step 5: 提交
git add frontend/
git commit -m "feat: add frontend base setup (index, vite, main, App)"
Task 7.2: 路由配置
文件:
-
创建:
frontend/src/router/index.js -
Step 1: 创建路由配置
frontend/src/router/index.js:
import { createRouter, createWebHistory } from 'vue-router'
const routes = [
{
path: '/',
redirect: '/dashboard'
},
{
path: '/dashboard',
name: 'Dashboard',
component: () => import('../views/Dashboard.vue')
},
{
path: '/servers',
name: 'Servers',
component: () => import('../views/Servers.vue')
},
{
path: '/repos',
name: 'Repos',
component: () => import('../views/Repos.vue')
},
{
path: '/sync-logs',
name: 'SyncLogs',
component: () => import('../views/SyncLogs.vue')
},
{
path: '/ssh-keys',
name: 'SshKeys',
component: () => import('../views/SshKeys.vue')
},
{
path: '/settings',
name: 'Settings',
component: () => import('../views/Settings.vue')
}
]
const router = createRouter({
history: createWebHistory(),
routes
})
export default router
- Step 2: 提交
git add frontend/src/router/
git commit -m "feat: add frontend router configuration"
Task 7.3: API 客户端
文件:
-
创建:
frontend/src/api/index.js -
创建:
frontend/src/api/servers.js -
创建:
frontend/src/api/sshKeys.js -
创建:
frontend/src/api/syncLogs.js -
Step 1: 创建 Axios 实例
frontend/src/api/index.js:
import axios from 'axios'
import { ElMessage } from 'element-plus'
// 从 localStorage 读取 token
const getAuthToken = () => {
return localStorage.getItem('api_token') || 'test-token'
}
// 创建 Axios 实例
const api = axios.create({
baseURL: '/api/v1',
timeout: 30000,
headers: {
'Content-Type': 'application/json'
}
})
// 请求拦截器
api.interceptors.request.use(
(config) => {
const token = getAuthToken()
if (token) {
config.headers.Authorization = `Bearer ${token}`
}
return config
},
(error) => {
return Promise.reject(error)
}
)
// 响应拦截器
api.interceptors.response.use(
(response) => {
const data = response.data
// 标准响应格式检查
if (data.code === 0) {
return data
} else {
ElMessage.error(data.message || '请求失败')
return Promise.reject(new Error(data.message))
}
},
(error) => {
const message = error.response?.data?.detail || error.message || '网络错误'
ElMessage.error(message)
return Promise.reject(error)
}
)
export default api
- Step 2: 创建 Servers API
frontend/src/api/servers.js:
import api from './index'
export const serversApi = {
// 列出所有服务器
list: () => api.get('/servers').then(r => r.data),
// 获取服务器详情
get: (id) => api.get(`/servers/${id}`).then(r => r.data),
// 创建服务器
create: (data) => api.post('/servers', data).then(r => r.data),
// 更新服务器
update: (id, data) => api.put(`/servers/${id}`, data).then(r => r.data),
// 删除服务器
delete: (id) => api.delete(`/servers/${id}`).then(r => r.data),
// 测试连接
test: (id) => api.post(`/servers/${id}/test`).then(r => r.data),
// 触发同步
sync: (id) => api.post(`/servers/${id}/sync`).then(r => r.data)
}
- Step 3: 创建 SSH Keys API
frontend/src/api/sshKeys.js:
import api from './index'
export const sshKeysApi = {
// 列出所有 SSH keys
list: () => api.get('/ssh-keys').then(r => r.data),
// 创建 SSH key
create: (data) => api.post('/ssh-keys', data).then(r => r.data),
// 删除 SSH key
delete: (id) => api.delete(`/ssh-keys/${id}`).then(r => r.data),
// 测试 key
test: (id) => api.post(`/ssh-keys/${id}/test`).then(r => r.data)
}
- Step 4: 创建 Sync Logs API
frontend/src/api/syncLogs.js:
import api from './index'
export const syncLogsApi = {
// 列出同步日志
list: (params) => api.get('/sync-logs', { params }).then(r => r.data),
// 获取日志详情
get: (id) => api.get(`/sync-logs/${id}`).then(r => r.data)
}
- Step 5: 提交
git add frontend/src/api/
git commit -m "feat: add frontend API client with interceptors"
Task 7.4: Pinia 状态管理
文件:
-
创建:
frontend/src/stores/app.js -
创建:
frontend/src/stores/servers.js -
Step 1: 创建 App Store
frontend/src/stores/app.js:
import { defineStore } from 'pinia'
import { ref } from 'vue'
import api from '../api'
export const useAppStore = defineStore('app', () => {
const connected = ref(false)
const loading = ref(false)
const checkStatus = async () => {
try {
loading.value = true
// 这里可以调用 status API 检查连接
connected.value = true
} catch (error) {
connected.value = false
} finally {
loading.value = false
}
}
return {
connected,
loading,
checkStatus
}
})
- Step 2: 创建 Servers Store
frontend/src/stores/servers.js:
import { defineStore } from 'pinia'
import { ref } from 'vue'
import { serversApi } from '../api/servers'
export const useServersStore = defineStore('servers', () => {
const servers = ref([])
const loading = ref(false)
const fetchServers = async () => {
try {
loading.value = true
servers.value = await serversApi.list()
} catch (error) {
console.error('Failed to fetch servers:', error)
} finally {
loading.value = false
}
}
const createServer = async (data) => {
try {
const server = await serversApi.create(data)
servers.value.push(server)
return server
} catch (error) {
throw error
}
}
const updateServer = async (id, data) => {
try {
const server = await serversApi.update(id, data)
const index = servers.value.findIndex(s => s.id === id)
if (index !== -1) {
servers.value[index] = server
}
return server
} catch (error) {
throw error
}
}
const deleteServer = async (id) => {
try {
await serversApi.delete(id)
servers.value = servers.value.filter(s => s.id !== id)
} catch (error) {
throw error
}
}
return {
servers,
loading,
fetchServers,
createServer,
updateServer,
deleteServer
}
})
- Step 3: 提交
git add frontend/src/stores/
git commit -m "feat: add Pinia stores (app, servers)"
Task 7.5: 页面组件 - 占位页面
文件:
-
创建:
frontend/src/views/Dashboard.vue -
创建:
frontend/src/views/Servers.vue -
创建:
frontend/src/views/Repos.vue -
创建:
frontend/src/views/SyncLogs.vue -
创建:
frontend/src/views/SshKeys.vue -
创建:
frontend/src/views/Settings.vue -
Step 1: 创建 Dashboard 页面
frontend/src/views/Dashboard.vue:
<template>
<div class="dashboard">
<el-row :gutter="20">
<el-col :span="6">
<el-card>
<el-statistic title="服务器" :value="stats.servers" />
</el-card>
</el-col>
<el-col :span="6">
<el-card>
<el-statistic title="仓库" :value="stats.repos" />
</el-card>
</el-col>
<el-col :span="6">
<el-card>
<el-statistic title="已同步" :value="stats.synced" />
</el-card>
</el-col>
<el-col :span="6">
<el-card>
<el-statistic title="失败" :value="stats.failed" />
</el-card>
</el-col>
</el-row>
<el-card class="mt-20" header="最近同步">
<el-empty description="暂无数据" />
</el-card>
</div>
</template>
<script setup>
import { ref, onMounted } from 'vue'
import api from '../api'
const stats = ref({
servers: 0,
repos: 0,
synced: 0,
failed: 0
})
const fetchStats = async () => {
try {
const data = await api.get('/status').then(r => r.data)
stats.value = {
servers: data.servers.total,
repos: data.repos.total,
synced: data.repos.synced,
failed: data.repos.failed
}
} catch (error) {
console.error('Failed to fetch stats:', error)
}
}
onMounted(() => {
fetchStats()
})
</script>
<style scoped>
.mt-20 {
margin-top: 20px;
}
</style>
- Step 2: 创建其他页面 (占位)
frontend/src/views/Servers.vue:
<template>
<div class="servers">
<el-card header="服务器管理">
<el-empty description="服务器管理页面 - 开发中" />
</el-card>
</div>
</template>
frontend/src/views/Repos.vue:
<template>
<div class="repos">
<el-card header="仓库列表">
<el-empty description="仓库列表页面 - 开发中" />
</el-card>
</div>
</template>
frontend/src/views/SyncLogs.vue:
<template>
<div class="sync-logs">
<el-card header="同步记录">
<el-empty description="同步记录页面 - 开发中" />
</el-card>
</div>
</template>
frontend/src/views/SshKeys.vue:
<template>
<div class="ssh-keys">
<el-card header="SSH 密钥">
<el-empty description="SSH 密钥页面 - 开发中" />
</el-card>
</div>
</template>
frontend/src/views/Settings.vue:
<template>
<div class="settings">
<el-card header="系统设置">
<el-empty description="系统设置页面 - 开发中" />
</el-card>
</div>
</template>
- Step 3: 提交
git add frontend/src/views/
git commit -m "feat: add frontend page placeholders"
Phase 8: 构建和部署配置
Task 8.1: README 和启动脚本
文件:
-
创建:
README.md -
创建:
start.sh -
Step 1: 创建 README.md
README.md:
# Git Repo Manager
面向小团队的 Git 仓库同步管理工具。
## 功能特性
- 多 Gitea 服务器管理
- SSH 密钥管理
- 仓库自动同步
- 定时任务调度
- Web 管理界面
## 快速开始
### 1. 配置环境变量
```bash
cp .env.example .env
# 编辑 .env 文件,设置加密密钥和 API Token
2. 安装依赖
后端:
pip install -r backend/requirements.txt
前端:
cd frontend
npm install
3. 初始化数据库
python backend/init_db.py
4. 构建前端
cd frontend
npm run build
5. 启动服务
# 开发环境
bash start.sh
# 或手动启动
uvicorn backend.app.main:app --host 0.0.0.0 --port 8000 --reload
API 文档
启动服务后访问: http://localhost:8000/docs
开发
后端开发:
cd backend
pytest tests/
前端开发:
cd frontend
npm run dev
技术栈
- 后端: FastAPI, SQLAlchemy, APScheduler, Paramiko, GitPython
- 前端: Vue 3, Element Plus, Pinia, Axios
- 数据库: SQLite
- [ ] **Step 2: 创建启动脚本**
`start.sh`:
```bash
#!/bin/bash
# 检查 .env 文件
if [ ! -f .env ]; then
echo "错误: .env 文件不存在"
echo "请先复制 .env.example 到 .env 并配置"
exit 1
fi
# 检查数据库
if [ ! -f data/git_manager.db ]; then
echo "初始化数据库..."
python backend/init_db.py
fi
# 启动服务
echo "启动 Git Repo Manager..."
uvicorn backend.app.main:app --host 0.0.0.0 --port 8000 --reload
chmod +x start.sh
- Step 3: 提交
git add README.md start.sh
git commit -m "feat: add README and startup script"
计划总结
已完成的任务模块
| Phase | 内容 | 任务数 |
|---|---|---|
| 1 | 项目基础设施 | 4 任务 |
| 2 | 数据模型 | 1 任务 (全部4个模型) |
| 3 | Pydantic Schemas | 1 任务 (全部 schemas) |
| 4 | 服务层 | 4 任务 (SSH Key, Server, Sync, Repo) |
| 5 | API 路由 | 4 任务 (依赖注入, SSH Keys, Servers, Status) |
| 6 | FastAPI 主应用 | 1 任务 |
| 7 | 前端基础 | 5 任务 (配置, 路由, API, Store, 页面) |
| 8 | 构建部署 | 1 任务 |
总计: 21 个主要任务
后续可扩展功能
- 完善前端页面交互
- 添加定时任务调度 (APScheduler)
- 添加 Gitea API 集成 (拉取仓库列表)
- 添加同步日志页面完整实现
- 添加测试连接功能
- Docker 容器化部署
- 添加 Webhook 支持
执行方式选择
计划完整保存在: docs/superpowers/plans/2026-03-30-git-repo-manager.md
执行选项:
- Subagent-Driven (推荐) - 每个任务独立子代理执行,任务间审查
- Inline Execution - 当前会话批量执行,检查点审查
请选择执行方式开始实现。