""" SSH Key Service. Business logic for SSH key management including: - Creating SSH keys with encryption - Listing and retrieving SSH keys - Deleting SSH keys (with usage check) - Decrypting private keys for Git operations """ import time import hashlib import base64 from typing import List, Optional from sqlalchemy.orm import Session from app.models.ssh_key import SshKey from app.models.server import Server from app.security import encrypt_data, decrypt_data from app.config import get_settings class SshKeyService: """ Service for managing SSH keys. Handles encryption, decryption, and validation of SSH private keys. """ def __init__(self, db: Session): """ Initialize the service with a database session. Args: db: SQLAlchemy database session """ self.db = db self.settings = get_settings() def create_ssh_key(self, name: str, private_key: str, password: Optional[str] = None) -> SshKey: """ Create a new SSH key with encryption. Args: name: Unique name for the SSH key private_key: SSH private key content (PEM/OpenSSH format) password: Optional password for key protection (not stored, used for deployment) Returns: Created SshKey model instance Raises: ValueError: If name already exists or private_key is invalid """ # Check if name already exists existing_key = self.db.query(SshKey).filter_by(name=name).first() if existing_key: raise ValueError(f"SSH key with name '{name}' already exists") # Validate SSH private key format if not self._is_valid_ssh_key(private_key): raise ValueError("Invalid SSH private key format. Must be a valid PEM or OpenSSH private key.") # Generate fingerprint fingerprint = self._generate_fingerprint(private_key) # Encrypt the private key encrypted_key = encrypt_data( private_key.encode('utf-8'), self.settings.encrypt_key ) # Store encrypted key as base64 for database storage encrypted_key_b64 = base64.b64encode(encrypted_key).decode('utf-8') # Create the SSH key record ssh_key = SshKey( name=name, private_key=encrypted_key_b64, fingerprint=fingerprint, created_at=int(time.time()) ) self.db.add(ssh_key) self.db.commit() self.db.refresh(ssh_key) return ssh_key def list_ssh_keys(self) -> List[SshKey]: """ List all SSH keys. Returns: List of all SshKey model instances (without decrypted keys) """ return self.db.query(SshKey).all() def get_ssh_key(self, key_id: int) -> Optional[SshKey]: """ Get an SSH key by ID. Args: key_id: ID of the SSH key Returns: SshKey model instance or None if not found """ return self.db.query(SshKey).filter_by(id=key_id).first() def delete_ssh_key(self, key_id: int) -> bool: """ Delete an SSH key. Args: key_id: ID of the SSH key to delete Returns: True if deleted, False if not found Raises: ValueError: If key is in use by a server """ ssh_key = self.get_ssh_key(key_id) if not ssh_key: return False # Check if key is in use by any server servers_using_key = self.db.query(Server).filter_by(ssh_key_id=key_id).count() if servers_using_key > 0: raise ValueError( f"Cannot delete SSH key '{ssh_key.name}'. " f"It is in use by {servers_using_key} server(s)." ) self.db.delete(ssh_key) self.db.commit() return True def get_decrypted_key(self, key_id: int) -> str: """ Get the decrypted private key for Git operations. Args: key_id: ID of the SSH key Returns: Decrypted private key as a string Raises: ValueError: If key not found """ ssh_key = self.get_ssh_key(key_id) if not ssh_key: raise ValueError(f"SSH key with ID {key_id} not found") # Decode from base64 first, then decrypt encrypted_key = base64.b64decode(ssh_key.private_key.encode('utf-8')) decrypted = decrypt_data( encrypted_key, self.settings.encrypt_key ) return decrypted.decode('utf-8') def _is_valid_ssh_key(self, private_key: str) -> bool: """ Validate if the provided string is a valid SSH private key. Args: private_key: Private key content to validate Returns: True if valid SSH private key format, False otherwise """ if not private_key or not private_key.strip(): return False # Check for common SSH private key markers valid_markers = [ "-----BEGIN RSA PRIVATE KEY-----", "-----BEGIN OPENSSH PRIVATE KEY-----", "-----BEGIN DSA PRIVATE KEY-----", "-----BEGIN EC PRIVATE KEY-----", "-----BEGIN ED25519 PRIVATE KEY-----", "-----BEGIN PGP PRIVATE KEY BLOCK-----", # GPG keys ] private_key_stripped = private_key.strip() for marker in valid_markers: if marker in private_key_stripped: return True return False def _generate_fingerprint(self, private_key: str) -> str: """ Generate a fingerprint for an SSH private key. For simplicity, we use SHA256 hash of the public key portion. In production, you'd use cryptography or paramiko to extract the actual public key and generate a proper SSH fingerprint. Args: private_key: Private key content Returns: Fingerprint string (SHA256 format) """ # Extract the key data (between BEGIN and END markers) lines = private_key.strip().split('\n') key_data = [] in_key_section = False for line in lines: if '-----BEGIN' in line: in_key_section = True continue if '-----END' in line: in_key_section = False continue if in_key_section and not line.startswith('---'): key_data.append(line.strip()) key_content = ''.join(key_data) # Generate SHA256 hash sha256_hash = hashlib.sha256(key_content.encode('utf-8')).digest() b64_hash = base64.b64encode(sha256_hash).decode('utf-8').rstrip('=') return f"SHA256:{b64_hash}"