Files
GitMa/backend/app/api/ssh_keys.py
panw 44921c5646 feat: complete Git Repo Manager MVP implementation
Backend (Phase 1-6):
- Pydantic schemas for request/response validation
- Service layer (SSH Key, Server, Repo, Sync)
- API routes with authentication
- FastAPI main application with lifespan management
- ORM models (SshKey, Server, Repo, SyncLog)

Frontend (Phase 7):
- Vue 3 + Element Plus + Pinia + Vue Router
- API client with Axios and interceptors
- State management stores
- All page components (Dashboard, Servers, Repos, SyncLogs, SshKeys, Settings)

Deployment (Phase 8):
- README with quick start guide
- Startup script (start.sh)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 16:30:13 +08:00

202 lines
5.3 KiB
Python

"""
SSH Keys API routes.
Provides CRUD endpoints for SSH key management:
- POST /api/ssh-keys - Create a new SSH key
- GET /api/ssh-keys - List all SSH keys
- GET /api/ssh-keys/{id} - Get a specific SSH key
- DELETE /api/ssh-keys/{id} - Delete an SSH key
"""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.api.deps import get_db_session, require_auth
from app.schemas.ssh_key import SshKeyCreate, SshKeyResponse
from app.schemas.common import SuccessResponse, ErrorResponse
from app.services.ssh_key_service import SshKeyService
router = APIRouter(prefix="/api/ssh-keys", tags=["SSH Keys"])
@router.post("", response_model=SuccessResponse[SshKeyResponse], status_code=status.HTTP_201_CREATED)
def create_ssh_key(
ssh_key_data: SshKeyCreate,
db: Session = Depends(get_db_session),
_auth: None = Depends(require_auth)
):
"""
Create a new SSH key.
The private key will be encrypted before storage.
The name must be unique across all SSH keys.
Args:
ssh_key_data: SSH key creation data (name, private_key)
db: Database session (injected)
_auth: Authentication requirement (injected)
Returns:
SuccessResponse containing the created SSH key
Raises:
HTTPException 400: If validation fails or name already exists
HTTPException 401: If authentication fails
"""
service = SshKeyService(db)
try:
ssh_key = service.create_ssh_key(
name=ssh_key_data.name,
private_key=ssh_key_data.private_key
)
return SuccessResponse(
code=0,
data=SshKeyResponse(
id=ssh_key.id,
name=ssh_key.name,
fingerprint=ssh_key.fingerprint,
created_at=ssh_key.created_at
),
message="SSH key created successfully"
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.get("", response_model=SuccessResponse[List[SshKeyResponse]])
def list_ssh_keys(
db: Session = Depends(get_db_session),
_auth: None = Depends(require_auth)
):
"""
List all SSH keys.
Returns all SSH keys ordered by creation time.
Private keys are not included in the response.
Args:
db: Database session (injected)
_auth: Authentication requirement (injected)
Returns:
SuccessResponse containing list of SSH keys
Raises:
HTTPException 401: If authentication fails
"""
service = SshKeyService(db)
ssh_keys = service.list_ssh_keys()
return SuccessResponse(
code=0,
data=[
SshKeyResponse(
id=key.id,
name=key.name,
fingerprint=key.fingerprint,
created_at=key.created_at
)
for key in ssh_keys
],
message=f"Retrieved {len(ssh_keys)} SSH key(s)"
)
@router.get("/{key_id}", response_model=SuccessResponse[SshKeyResponse])
def get_ssh_key(
key_id: int,
db: Session = Depends(get_db_session),
_auth: None = Depends(require_auth)
):
"""
Get a specific SSH key by ID.
Args:
key_id: ID of the SSH key
db: Database session (injected)
_auth: Authentication requirement (injected)
Returns:
SuccessResponse containing the SSH key
Raises:
HTTPException 401: If authentication fails
HTTPException 404: If SSH key not found
"""
service = SshKeyService(db)
ssh_key = service.get_ssh_key(key_id)
if ssh_key is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"SSH key with ID {key_id} not found"
)
return SuccessResponse(
code=0,
data=SshKeyResponse(
id=ssh_key.id,
name=ssh_key.name,
fingerprint=ssh_key.fingerprint,
created_at=ssh_key.created_at
),
message="SSH key retrieved successfully"
)
@router.delete("/{key_id}", response_model=SuccessResponse[dict])
def delete_ssh_key(
key_id: int,
db: Session = Depends(get_db_session),
_auth: None = Depends(require_auth)
):
"""
Delete an SSH key.
The SSH key can only be deleted if it is not in use by any server.
If servers are using this key, the deletion will fail.
Args:
key_id: ID of the SSH key to delete
db: Database session (injected)
_auth: Authentication requirement (injected)
Returns:
SuccessResponse with empty data
Raises:
HTTPException 400: If key is in use by servers
HTTPException 401: If authentication fails
HTTPException 404: If SSH key not found
"""
service = SshKeyService(db)
try:
deleted = service.delete_ssh_key(key_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"SSH key with ID {key_id} not found"
)
return SuccessResponse(
code=0,
data={},
message="SSH key deleted successfully"
)
except ValueError as e:
# Key is in use by servers
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)