feat: implement SSH Key service layer with encryption and business logic

Implemented SshKeyService class following TDD principles with comprehensive test coverage:

Service Methods:
- create_ssh_key(name, private_key, password) - Creates SSH key with AES-256-GCM encryption
- list_ssh_keys() - Lists all SSH keys (without decrypted keys)
- get_ssh_key(key_id) - Retrieves SSH key by ID
- delete_ssh_key(key_id) - Deletes key with usage validation
- get_decrypted_key(key_id) - Returns decrypted private key for Git operations

Features:
- Encrypts SSH private keys before storing using app.security.encrypt_data
- Generates SHA256 fingerprints for key identification
- Validates SSH key format (RSA, OpenSSH, DSA, EC, ED25519, PGP)
- Prevents deletion of keys in use by servers
- Base64-encoding for encrypted data storage in Text columns
- Uses app.config.settings.encrypt_key for encryption

Tests:
- 16 comprehensive test cases covering all service methods
- All tests passing (16/16)
- Tests for encryption/decryption, validation, usage checks, edge cases

Files:
- backend/app/services/ssh_key_service.py - SshKeyService implementation
- backend/tests/test_services/test_ssh_key_service.py - Test suite
- backend/tests/conftest.py - Fixed test encryption key length (32 bytes)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
panw
2026-03-30 16:06:56 +08:00
parent f425a49773
commit cefdb9f51d
3 changed files with 519 additions and 1 deletions

View File

@@ -41,12 +41,16 @@ def db_session(db_engine):
def test_encrypt_key():
"""测试加密密钥."""
import base64
return base64.b64encode(b'test-key-32-bytes-long-1234567890').decode()
return base64.b64encode(b'test-key-32-bytes-long-123456789').decode()
@pytest.fixture(scope="function")
def test_env_vars(db_path, test_encrypt_key, monkeypatch):
"""设置测试环境变量."""
# Clear global settings to ensure fresh config
import app.config
app.config._settings = None
monkeypatch.setenv("GM_ENCRYPT_KEY", test_encrypt_key)
monkeypatch.setenv("GM_API_TOKEN", "test-token")
monkeypatch.setenv("GM_DATA_DIR", str(db_path.parent))

View File

@@ -0,0 +1,283 @@
"""
Tests for SSH Key Service.
"""
import base64
import pytest
import time
from app.models.ssh_key import SshKey
from app.models.server import Server
from app.services.ssh_key_service import SshKeyService
from app.config import get_settings
# Test SSH key samples
VALID_SSH_KEY = """-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAAAAJi/vqmQv6pk
AAAAAtzc2gtZWQyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAA
AAAEBD0cWNQnpLDUYEGNMSgVIApVJfCFuRfGG3uxJZRKLvqH+kM3CM1zOc1pGUssXkb2E
JA2uuo1ntB4rIsjL+e8cAAAADm1lc3NlbmdlckBrZW50cm9zBAgMEBQ=
-----END OPENSSH PRIVATE KEY-----
"""
VALID_SSH_KEY_2 = """-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAAAAJi/vqmQv6pk
AAAAAtzc2gtZWQyNTUxOQAAACB/pDNwjNcznNaRlLNF5G9hCQNjbqNZ7QeKyLIy/nvHAA
AAAEBD0cWNQnpLDUYEGNMSgVIApVJfCFuRfGG3uxJZRKLvqH+kM3CM1zOc1pGUssXkb2E
JA2uuo1ntB4rIsjL+e8cAAAADm1lc3NlbmdlckBrZW50cm9zBAgMEBQ=
-----END OPENSSH PRIVATE KEY-----
"""
def test_create_ssh_key_success(db_session, test_env_vars):
"""Test successful SSH key creation with encryption."""
service = SshKeyService(db_session)
key = service.create_ssh_key(
name="test-key",
private_key=VALID_SSH_KEY,
password=None
)
assert key.id is not None
assert key.name == "test-key"
assert key.fingerprint is not None
assert key.created_at is not None
# The private key should be encrypted (different from original)
assert key.private_key != VALID_SSH_KEY
def test_create_ssh_key_with_duplicate_name(db_session, test_env_vars):
"""Test that duplicate SSH key names are not allowed."""
service = SshKeyService(db_session)
service.create_ssh_key(
name="duplicate-key",
private_key=VALID_SSH_KEY,
password=None
)
with pytest.raises(ValueError, match="already exists"):
service.create_ssh_key(
name="duplicate-key",
private_key=VALID_SSH_KEY_2,
password=None
)
def test_create_ssh_key_with_invalid_key(db_session, test_env_vars):
"""Test that invalid SSH keys are rejected."""
service = SshKeyService(db_session)
with pytest.raises(ValueError, match="Invalid SSH private key"):
service.create_ssh_key(
name="invalid-key",
private_key="not-a-valid-ssh-key",
password=None
)
def test_list_ssh_keys_empty(db_session, test_env_vars):
"""Test listing SSH keys when none exist."""
service = SshKeyService(db_session)
keys = service.list_ssh_keys()
assert keys == []
def test_list_ssh_keys_multiple(db_session, test_env_vars):
"""Test listing multiple SSH keys."""
service = SshKeyService(db_session)
service.create_ssh_key(
name="key-1",
private_key=VALID_SSH_KEY,
password=None
)
service.create_ssh_key(
name="key-2",
private_key=VALID_SSH_KEY_2,
password=None
)
keys = service.list_ssh_keys()
assert len(keys) == 2
assert any(k.name == "key-1" for k in keys)
assert any(k.name == "key-2" for k in keys)
def test_get_ssh_key_by_id(db_session, test_env_vars):
"""Test getting an SSH key by ID."""
service = SshKeyService(db_session)
created_key = service.create_ssh_key(
name="get-test-key",
private_key=VALID_SSH_KEY,
password=None
)
retrieved_key = service.get_ssh_key(created_key.id)
assert retrieved_key is not None
assert retrieved_key.id == created_key.id
assert retrieved_key.name == "get-test-key"
def test_get_ssh_key_not_found(db_session, test_env_vars):
"""Test getting a non-existent SSH key."""
service = SshKeyService(db_session)
key = service.get_ssh_key(99999)
assert key is None
def test_delete_ssh_key_success(db_session, test_env_vars):
"""Test successful SSH key deletion."""
service = SshKeyService(db_session)
created_key = service.create_ssh_key(
name="delete-test-key",
private_key=VALID_SSH_KEY,
password=None
)
result = service.delete_ssh_key(created_key.id)
assert result is True
# Verify the key is deleted
retrieved_key = service.get_ssh_key(created_key.id)
assert retrieved_key is None
def test_delete_ssh_key_in_use(db_session, test_env_vars):
"""Test that SSH keys in use cannot be deleted."""
service = SshKeyService(db_session)
created_key = service.create_ssh_key(
name="in-use-key",
private_key=VALID_SSH_KEY,
password=None
)
# Create a server that uses this SSH key
server = Server(
name="test-server",
url="https://gitea.example.com",
api_token="test-token",
ssh_key_id=created_key.id,
local_path="/tmp/test",
created_at=int(time.time()),
updated_at=int(time.time())
)
db_session.add(server)
db_session.commit()
with pytest.raises(ValueError, match="is in use"):
service.delete_ssh_key(created_key.id)
def test_delete_ssh_key_not_found(db_session, test_env_vars):
"""Test deleting a non-existent SSH key."""
service = SshKeyService(db_session)
result = service.delete_ssh_key(99999)
assert result is False
def test_get_decrypted_key(db_session, test_env_vars):
"""Test getting a decrypted SSH private key."""
service = SshKeyService(db_session)
created_key = service.create_ssh_key(
name="decrypt-test-key",
private_key=VALID_SSH_KEY,
password=None
)
decrypted_key = service.get_decrypted_key(created_key.id)
assert decrypted_key == VALID_SSH_KEY
def test_get_decrypted_key_not_found(db_session, test_env_vars):
"""Test getting decrypted key for non-existent ID."""
service = SshKeyService(db_session)
with pytest.raises(ValueError, match="SSH key with ID 99999 not found"):
service.get_decrypted_key(99999)
def test_ssh_key_fingerprint_generation(db_session, test_env_vars):
"""Test that SSH key fingerprints are generated correctly."""
service = SshKeyService(db_session)
key = service.create_ssh_key(
name="fingerprint-key",
private_key=VALID_SSH_KEY,
password=None
)
assert key.fingerprint is not None
assert len(key.fingerprint) > 0
# Fingerprints typically start with SHA256: or MD5:
assert ":" in key.fingerprint or len(key.fingerprint) == 47 # SHA256 format
def test_encryption_is_different(db_session, test_env_vars):
"""Test that encrypted keys are different from plaintext."""
service = SshKeyService(db_session)
service.create_ssh_key(
name="encryption-test",
private_key=VALID_SSH_KEY,
password=None
)
# Get the raw database record
db_key = db_session.query(SshKey).filter_by(name="encryption-test").first()
# The stored key should be encrypted
assert db_key.private_key != VALID_SSH_KEY
# Should be base64 encoded (longer)
assert len(db_key.private_key) > len(VALID_SSH_KEY)
def test_create_ssh_key_with_password_protection(db_session, test_env_vars):
"""Test creating SSH key that has password protection."""
service = SshKeyService(db_session)
# This test verifies we can store password-protected keys
# The service doesn't validate the password, just stores the key
key = service.create_ssh_key(
name="password-protected-key",
private_key=VALID_SSH_KEY,
password=None # Password would be used when key is deployed
)
assert key is not None
assert key.name == "password-protected-key"
def test_concurrent_same_name_creation(db_session, test_env_vars):
"""Test that concurrent creation with same name is handled."""
service = SshKeyService(db_session)
service.create_ssh_key(
name="concurrent-key",
private_key=VALID_SSH_KEY,
password=None
)
# Second creation should fail
with pytest.raises(ValueError):
service.create_ssh_key(
name="concurrent-key",
private_key=VALID_SSH_KEY_2,
password=None
)