feat: implement Server service layer with encrypted API tokens

Implement business logic for Gitea server management:
- create_server(): Create servers with encrypted API tokens
- list_servers(): List all servers ordered by creation time
- get_server(): Retrieve server by ID
- update_server(): Update server configuration with token re-encryption
- delete_server(): Delete servers
- get_decrypted_token(): Decrypt API tokens for operations

Features:
- API token encryption using AES-256-GCM
- Automatic local_path generation based on server name
- SSH key validation before server creation
- Name uniqueness enforcement
- Timestamp tracking (created_at, updated_at)
- Repos directory auto-creation

Tests:
- 24 comprehensive test cases covering all scenarios
- Encryption verification tests
- Edge case handling (duplicates, not found, invalid references)
- All tests passing (63/63 total)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
panw
2026-03-30 16:13:50 +08:00
parent cefdb9f51d
commit 960056c88c
3 changed files with 757 additions and 0 deletions

View File

@@ -0,0 +1,9 @@
"""
Services module.
Business logic layer for application services.
"""
from app.services.ssh_key_service import SshKeyService
from app.services.server_service import ServerService
__all__ = ['SshKeyService', 'ServerService']

View File

@@ -0,0 +1,231 @@
"""
Server Service.
Business logic for Gitea server management including:
- Creating servers with encrypted API tokens
- Listing and retrieving servers
- Updating server configurations
- Deleting servers
- Decrypting API tokens for operations
"""
import time
import base64
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.server import Server
from app.models.ssh_key import SshKey
from app.security import encrypt_data, decrypt_data
from app.config import get_settings
class ServerService:
"""
Service for managing Gitea servers.
Handles encryption of API tokens, local path generation,
and server CRUD operations.
"""
def __init__(self, db: Session):
"""
Initialize the service with a database session.
Args:
db: SQLAlchemy database session
"""
self.db = db
self.settings = get_settings()
def create_server(
self,
name: str,
url: str,
api_token: str,
ssh_key_id: int,
sync_enabled: bool = False,
schedule_cron: Optional[str] = None
) -> Server:
"""
Create a new Gitea server with encrypted API token.
Args:
name: Unique name for the server
url: Gitea server URL
api_token: API token for authentication (will be encrypted)
ssh_key_id: ID of the SSH key to use for Git operations
sync_enabled: Whether automatic sync is enabled
schedule_cron: Optional cron expression for scheduled sync
Returns:
Created Server model instance
Raises:
ValueError: If name already exists or ssh_key_id is invalid
"""
# Check if name already exists
existing_server = self.db.query(Server).filter_by(name=name).first()
if existing_server:
raise ValueError(f"Server with name '{name}' already exists")
# Verify SSH key exists
ssh_key = self.db.query(SshKey).filter_by(id=ssh_key_id).first()
if not ssh_key:
raise ValueError(f"SSH key not found with ID {ssh_key_id}")
# Generate local path for repo storage
local_path = str(self.settings.repos_dir / name)
# Encrypt the API token
encrypted_token = encrypt_data(
api_token.encode('utf-8'),
self.settings.encrypt_key
)
# Store encrypted token as base64 for database storage
encrypted_token_b64 = base64.b64encode(encrypted_token).decode('utf-8')
# Create the server record
current_time = int(time.time())
server = Server(
name=name,
url=url,
api_token=encrypted_token_b64,
ssh_key_id=ssh_key_id,
sync_enabled=sync_enabled,
schedule_cron=schedule_cron,
local_path=local_path,
status="untested",
created_at=current_time,
updated_at=current_time
)
# Ensure repos directory exists
self.settings.repos_dir.mkdir(parents=True, exist_ok=True)
self.db.add(server)
self.db.commit()
self.db.refresh(server)
return server
def list_servers(self) -> List[Server]:
"""
List all servers ordered by creation time.
Returns:
List of all Server model instances (without decrypted tokens)
"""
return self.db.query(Server).order_by(Server.created_at).all()
def get_server(self, server_id: int) -> Optional[Server]:
"""
Get a server by ID.
Args:
server_id: ID of the server
Returns:
Server model instance or None if not found
"""
return self.db.query(Server).filter_by(id=server_id).first()
def update_server(self, server_id: int, **kwargs) -> Server:
"""
Update a server's configuration.
Args:
server_id: ID of the server to update
**kwargs: Fields to update (name, url, api_token, ssh_key_id,
sync_enabled, schedule_cron, status)
Returns:
Updated Server model instance
Raises:
ValueError: If server not found, duplicate name, or invalid ssh_key_id
"""
server = self.get_server(server_id)
if not server:
raise ValueError(f"Server not found with ID {server_id}")
# Handle name uniqueness if updating name
if 'name' in kwargs and kwargs['name'] != server.name:
existing = self.db.query(Server).filter_by(name=kwargs['name']).first()
if existing:
raise ValueError(f"Server with name '{kwargs['name']}' already exists")
# Verify SSH key exists if updating ssh_key_id
if 'ssh_key_id' in kwargs:
ssh_key = self.db.query(SshKey).filter_by(id=kwargs['ssh_key_id']).first()
if not ssh_key:
raise ValueError(f"SSH key not found with ID {kwargs['ssh_key_id']}")
# Handle API token encryption
if 'api_token' in kwargs:
encrypted_token = encrypt_data(
kwargs['api_token'].encode('utf-8'),
self.settings.encrypt_key
)
kwargs['api_token'] = base64.b64encode(encrypted_token).decode('utf-8')
# Update local_path if name is being updated
if 'name' in kwargs and kwargs['name'] != server.name:
kwargs['local_path'] = str(self.settings.repos_dir / kwargs['name'])
# Update fields
for key, value in kwargs.items():
if hasattr(server, key):
setattr(server, key, value)
# Update timestamp
server.updated_at = int(time.time())
self.db.commit()
self.db.refresh(server)
return server
def delete_server(self, server_id: int) -> bool:
"""
Delete a server.
Args:
server_id: ID of the server to delete
Returns:
True if deleted, False if not found
"""
server = self.get_server(server_id)
if not server:
return False
self.db.delete(server)
self.db.commit()
return True
def get_decrypted_token(self, server: Server) -> str:
"""
Get the decrypted API token for server operations.
Args:
server: Server model instance
Returns:
Decrypted API token as a string
Raises:
ValueError: If server is None or token cannot be decrypted
"""
if server is None:
raise ValueError("Server cannot be None")
# Decode from base64 first, then decrypt
encrypted_token = base64.b64decode(server.api_token.encode('utf-8'))
decrypted = decrypt_data(
encrypted_token,
self.settings.encrypt_key
)
return decrypted.decode('utf-8')

View File

@@ -0,0 +1,517 @@
"""
Tests for Server Service.
"""
import base64
import pytest
import time
from pathlib import Path
from app.models.server import Server
from app.models.ssh_key import SshKey
from app.services.server_service import ServerService
from app.config import get_settings
# Valid test SSH key
VALID_SSH_KEY = """-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAAAAJi/vqmQv6pk
AAAAAtzc2gtZWQyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAA
AAAEBD0cWNQnpLDUYEGNMSgVIApVJfCFuRfGG3uxJZRKLvqH+kM3CM1zOc1pGUssXkb2E
JA2uuo1ntB4rIsjL+e8cAAAADm1lc3NlbmdlckBrZW50cm9zBAgMEBQ=
-----END OPENSSH PRIVATE KEY-----
"""
def create_test_ssh_key(db_session, name="test-ssh-key"):
"""Helper to create a test SSH key."""
from app.services.ssh_key_service import SshKeyService
ssh_service = SshKeyService(db_session)
return ssh_service.create_ssh_key(name=name, private_key=VALID_SSH_KEY)
def test_create_server_success(db_session, test_env_vars):
"""Test successful server creation with token encryption."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="gitea-server",
url="https://gitea.example.com",
api_token="test-api-token-123",
ssh_key_id=ssh_key.id,
sync_enabled=True,
schedule_cron="0 0 * * *"
)
assert server.id is not None
assert server.name == "gitea-server"
assert server.url == "https://gitea.example.com"
assert server.api_token != "test-api-token-123" # Should be encrypted
assert server.ssh_key_id == ssh_key.id
assert server.sync_enabled is True
assert server.schedule_cron == "0 0 * * *"
assert server.local_path is not None
assert server.status == "untested"
assert server.created_at is not None
assert server.updated_at is not None
def test_create_server_with_duplicate_name(db_session, test_env_vars):
"""Test that duplicate server names are not allowed."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
service.create_server(
name="duplicate-server",
url="https://gitea1.example.com",
api_token="token1",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
with pytest.raises(ValueError, match="already exists"):
service.create_server(
name="duplicate-server",
url="https://gitea2.example.com",
api_token="token2",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
def test_create_server_with_invalid_ssh_key_id(db_session, test_env_vars):
"""Test that invalid SSH key ID is rejected."""
service = ServerService(db_session)
with pytest.raises(ValueError, match="SSH key not found"):
service.create_server(
name="test-server",
url="https://gitea.example.com",
api_token="test-token",
ssh_key_id=99999,
sync_enabled=False,
schedule_cron=None
)
def test_create_server_generates_local_path(db_session, test_env_vars):
"""Test that local_path is generated correctly based on server name."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="my-gitea",
url="https://gitea.example.com",
api_token="token123",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
settings = get_settings()
expected_path = settings.repos_dir / "my-gitea"
assert server.local_path == str(expected_path)
def test_create_server_without_schedule(db_session, test_env_vars):
"""Test creating a server without sync schedule."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="no-schedule-server",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
assert server.schedule_cron is None
assert server.sync_enabled is False
def test_list_servers_empty(db_session, test_env_vars):
"""Test listing servers when none exist."""
service = ServerService(db_session)
servers = service.list_servers()
assert servers == []
def test_list_servers_multiple(db_session, test_env_vars):
"""Test listing multiple servers."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
service.create_server(
name="server-1",
url="https://gitea1.example.com",
api_token="token1",
ssh_key_id=ssh_key.id,
sync_enabled=True,
schedule_cron="0 0 * * *"
)
service.create_server(
name="server-2",
url="https://gitea2.example.com",
api_token="token2",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
servers = service.list_servers()
assert len(servers) == 2
assert any(s.name == "server-1" for s in servers)
assert any(s.name == "server-2" for s in servers)
def test_get_server_by_id(db_session, test_env_vars):
"""Test getting a server by ID."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
created_server = service.create_server(
name="get-test-server",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
retrieved_server = service.get_server(created_server.id)
assert retrieved_server is not None
assert retrieved_server.id == created_server.id
assert retrieved_server.name == "get-test-server"
def test_get_server_not_found(db_session, test_env_vars):
"""Test getting a non-existent server."""
service = ServerService(db_session)
server = service.get_server(99999)
assert server is None
def test_update_server_name(db_session, test_env_vars):
"""Test updating server name."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="old-name",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
updated_server = service.update_server(server.id, name="new-name")
assert updated_server.name == "new-name"
assert updated_server.id == server.id
def test_update_server_url(db_session, test_env_vars):
"""Test updating server URL."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="test-server",
url="https://old-url.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
updated_server = service.update_server(
server.id,
url="https://new-url.example.com"
)
assert updated_server.url == "https://new-url.example.com"
def test_update_server_api_token(db_session, test_env_vars):
"""Test updating server API token with encryption."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="test-server",
url="https://gitea.example.com",
api_token="old-token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
updated_server = service.update_server(
server.id,
api_token="new-token"
)
# The token should be encrypted (different from original)
assert updated_server.api_token != "new-token"
def test_update_server_sync_settings(db_session, test_env_vars):
"""Test updating server sync settings."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="test-server",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
updated_server = service.update_server(
server.id,
sync_enabled=True,
schedule_cron="*/5 * * * *"
)
assert updated_server.sync_enabled is True
assert updated_server.schedule_cron == "*/5 * * * *"
def test_update_server_not_found(db_session, test_env_vars):
"""Test updating a non-existent server."""
service = ServerService(db_session)
with pytest.raises(ValueError, match="Server not found"):
service.update_server(99999, name="new-name")
def test_update_server_duplicate_name(db_session, test_env_vars):
"""Test that updating to duplicate name is rejected."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server1 = service.create_server(
name="server-1",
url="https://gitea1.example.com",
api_token="token1",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
server2 = service.create_server(
name="server-2",
url="https://gitea2.example.com",
api_token="token2",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
with pytest.raises(ValueError, match="already exists"):
service.update_server(server2.id, name="server-1")
def test_delete_server_success(db_session, test_env_vars):
"""Test successful server deletion."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="delete-test-server",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
result = service.delete_server(server.id)
assert result is True
# Verify the server is deleted
retrieved_server = service.get_server(server.id)
assert retrieved_server is None
def test_delete_server_not_found(db_session, test_env_vars):
"""Test deleting a non-existent server."""
service = ServerService(db_session)
result = service.delete_server(99999)
assert result is False
def test_get_decrypted_token(db_session, test_env_vars):
"""Test getting decrypted API token."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
original_token = "my-secret-api-token"
server = service.create_server(
name="decrypt-test-server",
url="https://gitea.example.com",
api_token=original_token,
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
decrypted_token = service.get_decrypted_token(server)
assert decrypted_token == original_token
assert decrypted_token != server.api_token # Should differ from encrypted value
def test_get_decrypted_token_with_server_object(db_session, test_env_vars):
"""Test getting decrypted token using server object from database."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
original_token = "another-secret-token"
server = service.create_server(
name="token-test-server",
url="https://gitea.example.com",
api_token=original_token,
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
# Retrieve server from DB (simulates real usage)
db_server = db_session.query(Server).filter_by(name="token-test-server").first()
decrypted_token = service.get_decrypted_token(db_server)
assert decrypted_token == original_token
def test_create_server_creates_repos_directory(db_session, test_env_vars, tmp_path):
"""Test that repos directory is created when adding a server."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="dir-test-server",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
settings = get_settings()
repos_dir = settings.repos_dir
assert repos_dir.exists()
def test_server_default_status(db_session, test_env_vars):
"""Test that new servers have default 'untested' status."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="status-test-server",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
assert server.status == "untested"
def test_update_server_updates_timestamp(db_session, test_env_vars):
"""Test that updating server updates the updated_at timestamp."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server = service.create_server(
name="timestamp-server",
url="https://gitea.example.com",
api_token="token",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
original_updated_at = server.updated_at
# Small delay to ensure timestamp difference
time.sleep(0.1)
updated_server = service.update_server(server.id, url="https://new-url.example.com")
# Verify the URL was updated (which means update_server was called)
assert updated_server.url == "https://new-url.example.com"
# The updated_at should be >= original (may be equal on fast systems)
assert updated_server.updated_at >= original_updated_at
def test_encryption_is_different(db_session, test_env_vars):
"""Test that API tokens are encrypted and different from plaintext."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
original_token = "plaintext-token-12345"
service.create_server(
name="encryption-test-server",
url="https://gitea.example.com",
api_token=original_token,
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
# Get the raw database record
db_server = db_session.query(Server).filter_by(name="encryption-test-server").first()
# The stored token should be encrypted
assert db_server.api_token != original_token
# Should be base64 encoded (longer)
assert len(db_server.api_token) > len(original_token)
def test_list_servers_ordered_by_creation(db_session, test_env_vars):
"""Test that servers are listed in creation order."""
ssh_key = create_test_ssh_key(db_session)
service = ServerService(db_session)
server1 = service.create_server(
name="first-server",
url="https://gitea1.example.com",
api_token="token1",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
server2 = service.create_server(
name="second-server",
url="https://gitea2.example.com",
api_token="token2",
ssh_key_id=ssh_key.id,
sync_enabled=False,
schedule_cron=None
)
servers = service.list_servers()
assert servers[0].id == server1.id
assert servers[1].id == server2.id