From 960056c88c35a1ba0c0c0a60eb3131f472bf93f8 Mon Sep 17 00:00:00 2001 From: panw Date: Mon, 30 Mar 2026 16:13:50 +0800 Subject: [PATCH] feat: implement Server service layer with encrypted API tokens Implement business logic for Gitea server management: - create_server(): Create servers with encrypted API tokens - list_servers(): List all servers ordered by creation time - get_server(): Retrieve server by ID - update_server(): Update server configuration with token re-encryption - delete_server(): Delete servers - get_decrypted_token(): Decrypt API tokens for operations Features: - API token encryption using AES-256-GCM - Automatic local_path generation based on server name - SSH key validation before server creation - Name uniqueness enforcement - Timestamp tracking (created_at, updated_at) - Repos directory auto-creation Tests: - 24 comprehensive test cases covering all scenarios - Encryption verification tests - Edge case handling (duplicates, not found, invalid references) - All tests passing (63/63 total) Co-Authored-By: Claude Opus 4.6 --- backend/app/services/__init__.py | 9 + backend/app/services/server_service.py | 231 ++++++++ .../test_services/test_server_service.py | 517 ++++++++++++++++++ 3 files changed, 757 insertions(+) create mode 100644 backend/app/services/server_service.py create mode 100644 backend/tests/test_services/test_server_service.py diff --git a/backend/app/services/__init__.py b/backend/app/services/__init__.py index e69de29..1261f5d 100644 --- a/backend/app/services/__init__.py +++ b/backend/app/services/__init__.py @@ -0,0 +1,9 @@ +""" +Services module. + +Business logic layer for application services. +""" +from app.services.ssh_key_service import SshKeyService +from app.services.server_service import ServerService + +__all__ = ['SshKeyService', 'ServerService'] diff --git a/backend/app/services/server_service.py b/backend/app/services/server_service.py new file mode 100644 index 0000000..aba6f7a --- /dev/null +++ b/backend/app/services/server_service.py @@ -0,0 +1,231 @@ +""" +Server Service. + +Business logic for Gitea server management including: +- Creating servers with encrypted API tokens +- Listing and retrieving servers +- Updating server configurations +- Deleting servers +- Decrypting API tokens for operations +""" +import time +import base64 +from typing import List, Optional +from sqlalchemy.orm import Session + +from app.models.server import Server +from app.models.ssh_key import SshKey +from app.security import encrypt_data, decrypt_data +from app.config import get_settings + + +class ServerService: + """ + Service for managing Gitea servers. + + Handles encryption of API tokens, local path generation, + and server CRUD operations. + """ + + def __init__(self, db: Session): + """ + Initialize the service with a database session. + + Args: + db: SQLAlchemy database session + """ + self.db = db + self.settings = get_settings() + + def create_server( + self, + name: str, + url: str, + api_token: str, + ssh_key_id: int, + sync_enabled: bool = False, + schedule_cron: Optional[str] = None + ) -> Server: + """ + Create a new Gitea server with encrypted API token. + + Args: + name: Unique name for the server + url: Gitea server URL + api_token: API token for authentication (will be encrypted) + ssh_key_id: ID of the SSH key to use for Git operations + sync_enabled: Whether automatic sync is enabled + schedule_cron: Optional cron expression for scheduled sync + + Returns: + Created Server model instance + + Raises: + ValueError: If name already exists or ssh_key_id is invalid + """ + # Check if name already exists + existing_server = self.db.query(Server).filter_by(name=name).first() + if existing_server: + raise ValueError(f"Server with name '{name}' already exists") + + # Verify SSH key exists + ssh_key = self.db.query(SshKey).filter_by(id=ssh_key_id).first() + if not ssh_key: + raise ValueError(f"SSH key not found with ID {ssh_key_id}") + + # Generate local path for repo storage + local_path = str(self.settings.repos_dir / name) + + # Encrypt the API token + encrypted_token = encrypt_data( + api_token.encode('utf-8'), + self.settings.encrypt_key + ) + + # Store encrypted token as base64 for database storage + encrypted_token_b64 = base64.b64encode(encrypted_token).decode('utf-8') + + # Create the server record + current_time = int(time.time()) + server = Server( + name=name, + url=url, + api_token=encrypted_token_b64, + ssh_key_id=ssh_key_id, + sync_enabled=sync_enabled, + schedule_cron=schedule_cron, + local_path=local_path, + status="untested", + created_at=current_time, + updated_at=current_time + ) + + # Ensure repos directory exists + self.settings.repos_dir.mkdir(parents=True, exist_ok=True) + + self.db.add(server) + self.db.commit() + self.db.refresh(server) + + return server + + def list_servers(self) -> List[Server]: + """ + List all servers ordered by creation time. + + Returns: + List of all Server model instances (without decrypted tokens) + """ + return self.db.query(Server).order_by(Server.created_at).all() + + def get_server(self, server_id: int) -> Optional[Server]: + """ + Get a server by ID. + + Args: + server_id: ID of the server + + Returns: + Server model instance or None if not found + """ + return self.db.query(Server).filter_by(id=server_id).first() + + def update_server(self, server_id: int, **kwargs) -> Server: + """ + Update a server's configuration. + + Args: + server_id: ID of the server to update + **kwargs: Fields to update (name, url, api_token, ssh_key_id, + sync_enabled, schedule_cron, status) + + Returns: + Updated Server model instance + + Raises: + ValueError: If server not found, duplicate name, or invalid ssh_key_id + """ + server = self.get_server(server_id) + if not server: + raise ValueError(f"Server not found with ID {server_id}") + + # Handle name uniqueness if updating name + if 'name' in kwargs and kwargs['name'] != server.name: + existing = self.db.query(Server).filter_by(name=kwargs['name']).first() + if existing: + raise ValueError(f"Server with name '{kwargs['name']}' already exists") + + # Verify SSH key exists if updating ssh_key_id + if 'ssh_key_id' in kwargs: + ssh_key = self.db.query(SshKey).filter_by(id=kwargs['ssh_key_id']).first() + if not ssh_key: + raise ValueError(f"SSH key not found with ID {kwargs['ssh_key_id']}") + + # Handle API token encryption + if 'api_token' in kwargs: + encrypted_token = encrypt_data( + kwargs['api_token'].encode('utf-8'), + self.settings.encrypt_key + ) + kwargs['api_token'] = base64.b64encode(encrypted_token).decode('utf-8') + + # Update local_path if name is being updated + if 'name' in kwargs and kwargs['name'] != server.name: + kwargs['local_path'] = str(self.settings.repos_dir / kwargs['name']) + + # Update fields + for key, value in kwargs.items(): + if hasattr(server, key): + setattr(server, key, value) + + # Update timestamp + server.updated_at = int(time.time()) + + self.db.commit() + self.db.refresh(server) + + return server + + def delete_server(self, server_id: int) -> bool: + """ + Delete a server. + + Args: + server_id: ID of the server to delete + + Returns: + True if deleted, False if not found + """ + server = self.get_server(server_id) + if not server: + return False + + self.db.delete(server) + self.db.commit() + + return True + + def get_decrypted_token(self, server: Server) -> str: + """ + Get the decrypted API token for server operations. + + Args: + server: Server model instance + + Returns: + Decrypted API token as a string + + Raises: + ValueError: If server is None or token cannot be decrypted + """ + if server is None: + raise ValueError("Server cannot be None") + + # Decode from base64 first, then decrypt + encrypted_token = base64.b64decode(server.api_token.encode('utf-8')) + decrypted = decrypt_data( + encrypted_token, + self.settings.encrypt_key + ) + + return decrypted.decode('utf-8') diff --git a/backend/tests/test_services/test_server_service.py b/backend/tests/test_services/test_server_service.py new file mode 100644 index 0000000..2a60b88 --- /dev/null +++ b/backend/tests/test_services/test_server_service.py @@ -0,0 +1,517 @@ +""" +Tests for Server Service. +""" +import base64 +import pytest +import time +from pathlib import Path +from app.models.server import Server +from app.models.ssh_key import SshKey +from app.services.server_service import ServerService +from app.config import get_settings + + +# Valid test SSH key +VALID_SSH_KEY = """-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAAAAJi/vqmQv6pk +AAAAAtzc2gtZWQyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAA +AAAEBD0cWNQnpLDUYEGNMSgVIApVJfCFuRfGG3uxJZRKLvqH+kM3CM1zOc1pGUssXkb2E +JA2uuo1ntB4rIsjL+e8cAAAADm1lc3NlbmdlckBrZW50cm9zBAgMEBQ= +-----END OPENSSH PRIVATE KEY----- +""" + + +def create_test_ssh_key(db_session, name="test-ssh-key"): + """Helper to create a test SSH key.""" + from app.services.ssh_key_service import SshKeyService + ssh_service = SshKeyService(db_session) + return ssh_service.create_ssh_key(name=name, private_key=VALID_SSH_KEY) + + +def test_create_server_success(db_session, test_env_vars): + """Test successful server creation with token encryption.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="gitea-server", + url="https://gitea.example.com", + api_token="test-api-token-123", + ssh_key_id=ssh_key.id, + sync_enabled=True, + schedule_cron="0 0 * * *" + ) + + assert server.id is not None + assert server.name == "gitea-server" + assert server.url == "https://gitea.example.com" + assert server.api_token != "test-api-token-123" # Should be encrypted + assert server.ssh_key_id == ssh_key.id + assert server.sync_enabled is True + assert server.schedule_cron == "0 0 * * *" + assert server.local_path is not None + assert server.status == "untested" + assert server.created_at is not None + assert server.updated_at is not None + + +def test_create_server_with_duplicate_name(db_session, test_env_vars): + """Test that duplicate server names are not allowed.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + service.create_server( + name="duplicate-server", + url="https://gitea1.example.com", + api_token="token1", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + with pytest.raises(ValueError, match="already exists"): + service.create_server( + name="duplicate-server", + url="https://gitea2.example.com", + api_token="token2", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + +def test_create_server_with_invalid_ssh_key_id(db_session, test_env_vars): + """Test that invalid SSH key ID is rejected.""" + service = ServerService(db_session) + + with pytest.raises(ValueError, match="SSH key not found"): + service.create_server( + name="test-server", + url="https://gitea.example.com", + api_token="test-token", + ssh_key_id=99999, + sync_enabled=False, + schedule_cron=None + ) + + +def test_create_server_generates_local_path(db_session, test_env_vars): + """Test that local_path is generated correctly based on server name.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="my-gitea", + url="https://gitea.example.com", + api_token="token123", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + settings = get_settings() + expected_path = settings.repos_dir / "my-gitea" + assert server.local_path == str(expected_path) + + +def test_create_server_without_schedule(db_session, test_env_vars): + """Test creating a server without sync schedule.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="no-schedule-server", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + assert server.schedule_cron is None + assert server.sync_enabled is False + + +def test_list_servers_empty(db_session, test_env_vars): + """Test listing servers when none exist.""" + service = ServerService(db_session) + + servers = service.list_servers() + + assert servers == [] + + +def test_list_servers_multiple(db_session, test_env_vars): + """Test listing multiple servers.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + service.create_server( + name="server-1", + url="https://gitea1.example.com", + api_token="token1", + ssh_key_id=ssh_key.id, + sync_enabled=True, + schedule_cron="0 0 * * *" + ) + service.create_server( + name="server-2", + url="https://gitea2.example.com", + api_token="token2", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + servers = service.list_servers() + + assert len(servers) == 2 + assert any(s.name == "server-1" for s in servers) + assert any(s.name == "server-2" for s in servers) + + +def test_get_server_by_id(db_session, test_env_vars): + """Test getting a server by ID.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + created_server = service.create_server( + name="get-test-server", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + retrieved_server = service.get_server(created_server.id) + + assert retrieved_server is not None + assert retrieved_server.id == created_server.id + assert retrieved_server.name == "get-test-server" + + +def test_get_server_not_found(db_session, test_env_vars): + """Test getting a non-existent server.""" + service = ServerService(db_session) + + server = service.get_server(99999) + + assert server is None + + +def test_update_server_name(db_session, test_env_vars): + """Test updating server name.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="old-name", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + updated_server = service.update_server(server.id, name="new-name") + + assert updated_server.name == "new-name" + assert updated_server.id == server.id + + +def test_update_server_url(db_session, test_env_vars): + """Test updating server URL.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="test-server", + url="https://old-url.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + updated_server = service.update_server( + server.id, + url="https://new-url.example.com" + ) + + assert updated_server.url == "https://new-url.example.com" + + +def test_update_server_api_token(db_session, test_env_vars): + """Test updating server API token with encryption.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="test-server", + url="https://gitea.example.com", + api_token="old-token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + updated_server = service.update_server( + server.id, + api_token="new-token" + ) + + # The token should be encrypted (different from original) + assert updated_server.api_token != "new-token" + + +def test_update_server_sync_settings(db_session, test_env_vars): + """Test updating server sync settings.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="test-server", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + updated_server = service.update_server( + server.id, + sync_enabled=True, + schedule_cron="*/5 * * * *" + ) + + assert updated_server.sync_enabled is True + assert updated_server.schedule_cron == "*/5 * * * *" + + +def test_update_server_not_found(db_session, test_env_vars): + """Test updating a non-existent server.""" + service = ServerService(db_session) + + with pytest.raises(ValueError, match="Server not found"): + service.update_server(99999, name="new-name") + + +def test_update_server_duplicate_name(db_session, test_env_vars): + """Test that updating to duplicate name is rejected.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server1 = service.create_server( + name="server-1", + url="https://gitea1.example.com", + api_token="token1", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + server2 = service.create_server( + name="server-2", + url="https://gitea2.example.com", + api_token="token2", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + with pytest.raises(ValueError, match="already exists"): + service.update_server(server2.id, name="server-1") + + +def test_delete_server_success(db_session, test_env_vars): + """Test successful server deletion.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="delete-test-server", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + result = service.delete_server(server.id) + + assert result is True + + # Verify the server is deleted + retrieved_server = service.get_server(server.id) + assert retrieved_server is None + + +def test_delete_server_not_found(db_session, test_env_vars): + """Test deleting a non-existent server.""" + service = ServerService(db_session) + + result = service.delete_server(99999) + + assert result is False + + +def test_get_decrypted_token(db_session, test_env_vars): + """Test getting decrypted API token.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + original_token = "my-secret-api-token" + server = service.create_server( + name="decrypt-test-server", + url="https://gitea.example.com", + api_token=original_token, + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + decrypted_token = service.get_decrypted_token(server) + + assert decrypted_token == original_token + assert decrypted_token != server.api_token # Should differ from encrypted value + + +def test_get_decrypted_token_with_server_object(db_session, test_env_vars): + """Test getting decrypted token using server object from database.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + original_token = "another-secret-token" + server = service.create_server( + name="token-test-server", + url="https://gitea.example.com", + api_token=original_token, + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + # Retrieve server from DB (simulates real usage) + db_server = db_session.query(Server).filter_by(name="token-test-server").first() + + decrypted_token = service.get_decrypted_token(db_server) + + assert decrypted_token == original_token + + +def test_create_server_creates_repos_directory(db_session, test_env_vars, tmp_path): + """Test that repos directory is created when adding a server.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="dir-test-server", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + settings = get_settings() + repos_dir = settings.repos_dir + assert repos_dir.exists() + + +def test_server_default_status(db_session, test_env_vars): + """Test that new servers have default 'untested' status.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="status-test-server", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + assert server.status == "untested" + + +def test_update_server_updates_timestamp(db_session, test_env_vars): + """Test that updating server updates the updated_at timestamp.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server = service.create_server( + name="timestamp-server", + url="https://gitea.example.com", + api_token="token", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + original_updated_at = server.updated_at + + # Small delay to ensure timestamp difference + time.sleep(0.1) + + updated_server = service.update_server(server.id, url="https://new-url.example.com") + + # Verify the URL was updated (which means update_server was called) + assert updated_server.url == "https://new-url.example.com" + # The updated_at should be >= original (may be equal on fast systems) + assert updated_server.updated_at >= original_updated_at + + +def test_encryption_is_different(db_session, test_env_vars): + """Test that API tokens are encrypted and different from plaintext.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + original_token = "plaintext-token-12345" + service.create_server( + name="encryption-test-server", + url="https://gitea.example.com", + api_token=original_token, + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + # Get the raw database record + db_server = db_session.query(Server).filter_by(name="encryption-test-server").first() + + # The stored token should be encrypted + assert db_server.api_token != original_token + # Should be base64 encoded (longer) + assert len(db_server.api_token) > len(original_token) + + +def test_list_servers_ordered_by_creation(db_session, test_env_vars): + """Test that servers are listed in creation order.""" + ssh_key = create_test_ssh_key(db_session) + service = ServerService(db_session) + + server1 = service.create_server( + name="first-server", + url="https://gitea1.example.com", + api_token="token1", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + server2 = service.create_server( + name="second-server", + url="https://gitea2.example.com", + api_token="token2", + ssh_key_id=ssh_key.id, + sync_enabled=False, + schedule_cron=None + ) + + servers = service.list_servers() + + assert servers[0].id == server1.id + assert servers[1].id == server2.id