Implement business logic for Gitea server management: - create_server(): Create servers with encrypted API tokens - list_servers(): List all servers ordered by creation time - get_server(): Retrieve server by ID - update_server(): Update server configuration with token re-encryption - delete_server(): Delete servers - get_decrypted_token(): Decrypt API tokens for operations Features: - API token encryption using AES-256-GCM - Automatic local_path generation based on server name - SSH key validation before server creation - Name uniqueness enforcement - Timestamp tracking (created_at, updated_at) - Repos directory auto-creation Tests: - 24 comprehensive test cases covering all scenarios - Encryption verification tests - Edge case handling (duplicates, not found, invalid references) - All tests passing (63/63 total) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
232 lines
6.8 KiB
Python
232 lines
6.8 KiB
Python
"""
|
|
Server Service.
|
|
|
|
Business logic for Gitea server management including:
|
|
- Creating servers with encrypted API tokens
|
|
- Listing and retrieving servers
|
|
- Updating server configurations
|
|
- Deleting servers
|
|
- Decrypting API tokens for operations
|
|
"""
|
|
import time
|
|
import base64
|
|
from typing import List, Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.server import Server
|
|
from app.models.ssh_key import SshKey
|
|
from app.security import encrypt_data, decrypt_data
|
|
from app.config import get_settings
|
|
|
|
|
|
class ServerService:
|
|
"""
|
|
Service for managing Gitea servers.
|
|
|
|
Handles encryption of API tokens, local path generation,
|
|
and server CRUD operations.
|
|
"""
|
|
|
|
def __init__(self, db: Session):
|
|
"""
|
|
Initialize the service with a database session.
|
|
|
|
Args:
|
|
db: SQLAlchemy database session
|
|
"""
|
|
self.db = db
|
|
self.settings = get_settings()
|
|
|
|
def create_server(
|
|
self,
|
|
name: str,
|
|
url: str,
|
|
api_token: str,
|
|
ssh_key_id: int,
|
|
sync_enabled: bool = False,
|
|
schedule_cron: Optional[str] = None
|
|
) -> Server:
|
|
"""
|
|
Create a new Gitea server with encrypted API token.
|
|
|
|
Args:
|
|
name: Unique name for the server
|
|
url: Gitea server URL
|
|
api_token: API token for authentication (will be encrypted)
|
|
ssh_key_id: ID of the SSH key to use for Git operations
|
|
sync_enabled: Whether automatic sync is enabled
|
|
schedule_cron: Optional cron expression for scheduled sync
|
|
|
|
Returns:
|
|
Created Server model instance
|
|
|
|
Raises:
|
|
ValueError: If name already exists or ssh_key_id is invalid
|
|
"""
|
|
# Check if name already exists
|
|
existing_server = self.db.query(Server).filter_by(name=name).first()
|
|
if existing_server:
|
|
raise ValueError(f"Server with name '{name}' already exists")
|
|
|
|
# Verify SSH key exists
|
|
ssh_key = self.db.query(SshKey).filter_by(id=ssh_key_id).first()
|
|
if not ssh_key:
|
|
raise ValueError(f"SSH key not found with ID {ssh_key_id}")
|
|
|
|
# Generate local path for repo storage
|
|
local_path = str(self.settings.repos_dir / name)
|
|
|
|
# Encrypt the API token
|
|
encrypted_token = encrypt_data(
|
|
api_token.encode('utf-8'),
|
|
self.settings.encrypt_key
|
|
)
|
|
|
|
# Store encrypted token as base64 for database storage
|
|
encrypted_token_b64 = base64.b64encode(encrypted_token).decode('utf-8')
|
|
|
|
# Create the server record
|
|
current_time = int(time.time())
|
|
server = Server(
|
|
name=name,
|
|
url=url,
|
|
api_token=encrypted_token_b64,
|
|
ssh_key_id=ssh_key_id,
|
|
sync_enabled=sync_enabled,
|
|
schedule_cron=schedule_cron,
|
|
local_path=local_path,
|
|
status="untested",
|
|
created_at=current_time,
|
|
updated_at=current_time
|
|
)
|
|
|
|
# Ensure repos directory exists
|
|
self.settings.repos_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.db.add(server)
|
|
self.db.commit()
|
|
self.db.refresh(server)
|
|
|
|
return server
|
|
|
|
def list_servers(self) -> List[Server]:
|
|
"""
|
|
List all servers ordered by creation time.
|
|
|
|
Returns:
|
|
List of all Server model instances (without decrypted tokens)
|
|
"""
|
|
return self.db.query(Server).order_by(Server.created_at).all()
|
|
|
|
def get_server(self, server_id: int) -> Optional[Server]:
|
|
"""
|
|
Get a server by ID.
|
|
|
|
Args:
|
|
server_id: ID of the server
|
|
|
|
Returns:
|
|
Server model instance or None if not found
|
|
"""
|
|
return self.db.query(Server).filter_by(id=server_id).first()
|
|
|
|
def update_server(self, server_id: int, **kwargs) -> Server:
|
|
"""
|
|
Update a server's configuration.
|
|
|
|
Args:
|
|
server_id: ID of the server to update
|
|
**kwargs: Fields to update (name, url, api_token, ssh_key_id,
|
|
sync_enabled, schedule_cron, status)
|
|
|
|
Returns:
|
|
Updated Server model instance
|
|
|
|
Raises:
|
|
ValueError: If server not found, duplicate name, or invalid ssh_key_id
|
|
"""
|
|
server = self.get_server(server_id)
|
|
if not server:
|
|
raise ValueError(f"Server not found with ID {server_id}")
|
|
|
|
# Handle name uniqueness if updating name
|
|
if 'name' in kwargs and kwargs['name'] != server.name:
|
|
existing = self.db.query(Server).filter_by(name=kwargs['name']).first()
|
|
if existing:
|
|
raise ValueError(f"Server with name '{kwargs['name']}' already exists")
|
|
|
|
# Verify SSH key exists if updating ssh_key_id
|
|
if 'ssh_key_id' in kwargs:
|
|
ssh_key = self.db.query(SshKey).filter_by(id=kwargs['ssh_key_id']).first()
|
|
if not ssh_key:
|
|
raise ValueError(f"SSH key not found with ID {kwargs['ssh_key_id']}")
|
|
|
|
# Handle API token encryption
|
|
if 'api_token' in kwargs:
|
|
encrypted_token = encrypt_data(
|
|
kwargs['api_token'].encode('utf-8'),
|
|
self.settings.encrypt_key
|
|
)
|
|
kwargs['api_token'] = base64.b64encode(encrypted_token).decode('utf-8')
|
|
|
|
# Update local_path if name is being updated
|
|
if 'name' in kwargs and kwargs['name'] != server.name:
|
|
kwargs['local_path'] = str(self.settings.repos_dir / kwargs['name'])
|
|
|
|
# Update fields
|
|
for key, value in kwargs.items():
|
|
if hasattr(server, key):
|
|
setattr(server, key, value)
|
|
|
|
# Update timestamp
|
|
server.updated_at = int(time.time())
|
|
|
|
self.db.commit()
|
|
self.db.refresh(server)
|
|
|
|
return server
|
|
|
|
def delete_server(self, server_id: int) -> bool:
|
|
"""
|
|
Delete a server.
|
|
|
|
Args:
|
|
server_id: ID of the server to delete
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
server = self.get_server(server_id)
|
|
if not server:
|
|
return False
|
|
|
|
self.db.delete(server)
|
|
self.db.commit()
|
|
|
|
return True
|
|
|
|
def get_decrypted_token(self, server: Server) -> str:
|
|
"""
|
|
Get the decrypted API token for server operations.
|
|
|
|
Args:
|
|
server: Server model instance
|
|
|
|
Returns:
|
|
Decrypted API token as a string
|
|
|
|
Raises:
|
|
ValueError: If server is None or token cannot be decrypted
|
|
"""
|
|
if server is None:
|
|
raise ValueError("Server cannot be None")
|
|
|
|
# Decode from base64 first, then decrypt
|
|
encrypted_token = base64.b64decode(server.api_token.encode('utf-8'))
|
|
decrypted = decrypt_data(
|
|
encrypted_token,
|
|
self.settings.encrypt_key
|
|
)
|
|
|
|
return decrypted.decode('utf-8')
|