Implemented SshKeyService class following TDD principles with comprehensive test coverage: Service Methods: - create_ssh_key(name, private_key, password) - Creates SSH key with AES-256-GCM encryption - list_ssh_keys() - Lists all SSH keys (without decrypted keys) - get_ssh_key(key_id) - Retrieves SSH key by ID - delete_ssh_key(key_id) - Deletes key with usage validation - get_decrypted_key(key_id) - Returns decrypted private key for Git operations Features: - Encrypts SSH private keys before storing using app.security.encrypt_data - Generates SHA256 fingerprints for key identification - Validates SSH key format (RSA, OpenSSH, DSA, EC, ED25519, PGP) - Prevents deletion of keys in use by servers - Base64-encoding for encrypted data storage in Text columns - Uses app.config.settings.encrypt_key for encryption Tests: - 16 comprehensive test cases covering all service methods - All tests passing (16/16) - Tests for encryption/decryption, validation, usage checks, edge cases Files: - backend/app/services/ssh_key_service.py - SshKeyService implementation - backend/tests/test_services/test_ssh_key_service.py - Test suite - backend/tests/conftest.py - Fixed test encryption key length (32 bytes) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
232 lines
6.7 KiB
Python
232 lines
6.7 KiB
Python
"""
|
|
SSH Key Service.
|
|
|
|
Business logic for SSH key management including:
|
|
- Creating SSH keys with encryption
|
|
- Listing and retrieving SSH keys
|
|
- Deleting SSH keys (with usage check)
|
|
- Decrypting private keys for Git operations
|
|
"""
|
|
import time
|
|
import hashlib
|
|
import base64
|
|
from typing import List, Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.ssh_key import SshKey
|
|
from app.models.server import Server
|
|
from app.security import encrypt_data, decrypt_data
|
|
from app.config import get_settings
|
|
|
|
|
|
class SshKeyService:
|
|
"""
|
|
Service for managing SSH keys.
|
|
|
|
Handles encryption, decryption, and validation of SSH private keys.
|
|
"""
|
|
|
|
def __init__(self, db: Session):
|
|
"""
|
|
Initialize the service with a database session.
|
|
|
|
Args:
|
|
db: SQLAlchemy database session
|
|
"""
|
|
self.db = db
|
|
self.settings = get_settings()
|
|
|
|
def create_ssh_key(self, name: str, private_key: str, password: Optional[str] = None) -> SshKey:
|
|
"""
|
|
Create a new SSH key with encryption.
|
|
|
|
Args:
|
|
name: Unique name for the SSH key
|
|
private_key: SSH private key content (PEM/OpenSSH format)
|
|
password: Optional password for key protection (not stored, used for deployment)
|
|
|
|
Returns:
|
|
Created SshKey model instance
|
|
|
|
Raises:
|
|
ValueError: If name already exists or private_key is invalid
|
|
"""
|
|
# Check if name already exists
|
|
existing_key = self.db.query(SshKey).filter_by(name=name).first()
|
|
if existing_key:
|
|
raise ValueError(f"SSH key with name '{name}' already exists")
|
|
|
|
# Validate SSH private key format
|
|
if not self._is_valid_ssh_key(private_key):
|
|
raise ValueError("Invalid SSH private key format. Must be a valid PEM or OpenSSH private key.")
|
|
|
|
# Generate fingerprint
|
|
fingerprint = self._generate_fingerprint(private_key)
|
|
|
|
# Encrypt the private key
|
|
encrypted_key = encrypt_data(
|
|
private_key.encode('utf-8'),
|
|
self.settings.encrypt_key
|
|
)
|
|
|
|
# Store encrypted key as base64 for database storage
|
|
encrypted_key_b64 = base64.b64encode(encrypted_key).decode('utf-8')
|
|
|
|
# Create the SSH key record
|
|
ssh_key = SshKey(
|
|
name=name,
|
|
private_key=encrypted_key_b64,
|
|
fingerprint=fingerprint,
|
|
created_at=int(time.time())
|
|
)
|
|
|
|
self.db.add(ssh_key)
|
|
self.db.commit()
|
|
self.db.refresh(ssh_key)
|
|
|
|
return ssh_key
|
|
|
|
def list_ssh_keys(self) -> List[SshKey]:
|
|
"""
|
|
List all SSH keys.
|
|
|
|
Returns:
|
|
List of all SshKey model instances (without decrypted keys)
|
|
"""
|
|
return self.db.query(SshKey).all()
|
|
|
|
def get_ssh_key(self, key_id: int) -> Optional[SshKey]:
|
|
"""
|
|
Get an SSH key by ID.
|
|
|
|
Args:
|
|
key_id: ID of the SSH key
|
|
|
|
Returns:
|
|
SshKey model instance or None if not found
|
|
"""
|
|
return self.db.query(SshKey).filter_by(id=key_id).first()
|
|
|
|
def delete_ssh_key(self, key_id: int) -> bool:
|
|
"""
|
|
Delete an SSH key.
|
|
|
|
Args:
|
|
key_id: ID of the SSH key to delete
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
|
|
Raises:
|
|
ValueError: If key is in use by a server
|
|
"""
|
|
ssh_key = self.get_ssh_key(key_id)
|
|
if not ssh_key:
|
|
return False
|
|
|
|
# Check if key is in use by any server
|
|
servers_using_key = self.db.query(Server).filter_by(ssh_key_id=key_id).count()
|
|
if servers_using_key > 0:
|
|
raise ValueError(
|
|
f"Cannot delete SSH key '{ssh_key.name}'. "
|
|
f"It is in use by {servers_using_key} server(s)."
|
|
)
|
|
|
|
self.db.delete(ssh_key)
|
|
self.db.commit()
|
|
|
|
return True
|
|
|
|
def get_decrypted_key(self, key_id: int) -> str:
|
|
"""
|
|
Get the decrypted private key for Git operations.
|
|
|
|
Args:
|
|
key_id: ID of the SSH key
|
|
|
|
Returns:
|
|
Decrypted private key as a string
|
|
|
|
Raises:
|
|
ValueError: If key not found
|
|
"""
|
|
ssh_key = self.get_ssh_key(key_id)
|
|
if not ssh_key:
|
|
raise ValueError(f"SSH key with ID {key_id} not found")
|
|
|
|
# Decode from base64 first, then decrypt
|
|
encrypted_key = base64.b64decode(ssh_key.private_key.encode('utf-8'))
|
|
decrypted = decrypt_data(
|
|
encrypted_key,
|
|
self.settings.encrypt_key
|
|
)
|
|
|
|
return decrypted.decode('utf-8')
|
|
|
|
def _is_valid_ssh_key(self, private_key: str) -> bool:
|
|
"""
|
|
Validate if the provided string is a valid SSH private key.
|
|
|
|
Args:
|
|
private_key: Private key content to validate
|
|
|
|
Returns:
|
|
True if valid SSH private key format, False otherwise
|
|
"""
|
|
if not private_key or not private_key.strip():
|
|
return False
|
|
|
|
# Check for common SSH private key markers
|
|
valid_markers = [
|
|
"-----BEGIN RSA PRIVATE KEY-----",
|
|
"-----BEGIN OPENSSH PRIVATE KEY-----",
|
|
"-----BEGIN DSA PRIVATE KEY-----",
|
|
"-----BEGIN EC PRIVATE KEY-----",
|
|
"-----BEGIN ED25519 PRIVATE KEY-----",
|
|
"-----BEGIN PGP PRIVATE KEY BLOCK-----", # GPG keys
|
|
]
|
|
|
|
private_key_stripped = private_key.strip()
|
|
for marker in valid_markers:
|
|
if marker in private_key_stripped:
|
|
return True
|
|
|
|
return False
|
|
|
|
def _generate_fingerprint(self, private_key: str) -> str:
|
|
"""
|
|
Generate a fingerprint for an SSH private key.
|
|
|
|
For simplicity, we use SHA256 hash of the public key portion.
|
|
In production, you'd use cryptography or paramiko to extract
|
|
the actual public key and generate a proper SSH fingerprint.
|
|
|
|
Args:
|
|
private_key: Private key content
|
|
|
|
Returns:
|
|
Fingerprint string (SHA256 format)
|
|
"""
|
|
# Extract the key data (between BEGIN and END markers)
|
|
lines = private_key.strip().split('\n')
|
|
key_data = []
|
|
|
|
in_key_section = False
|
|
for line in lines:
|
|
if '-----BEGIN' in line:
|
|
in_key_section = True
|
|
continue
|
|
if '-----END' in line:
|
|
in_key_section = False
|
|
continue
|
|
if in_key_section and not line.startswith('---'):
|
|
key_data.append(line.strip())
|
|
|
|
key_content = ''.join(key_data)
|
|
|
|
# Generate SHA256 hash
|
|
sha256_hash = hashlib.sha256(key_content.encode('utf-8')).digest()
|
|
b64_hash = base64.b64encode(sha256_hash).decode('utf-8').rstrip('=')
|
|
|
|
return f"SHA256:{b64_hash}"
|