""" Sync Service. Handles Git operations for repository mirroring including: - Cloning repositories with SSH authentication - Fetching updates from mirrored repositories - Counting commits in repositories - Retrieving commit history """ import subprocess import tempfile import os from pathlib import Path from typing import List, Dict, Optional from sqlalchemy.orm import Session import time from app.models.repo import Repo class SyncService: """ Service for managing Git repository synchronization. Handles clone and fetch operations with SSH key authentication. """ def __init__(self, db: Session): """ Initialize the service with a database session. Args: db: SQLAlchemy database session """ self.db = db def sync_repo(self, repo: Repo, ssh_key_content: str) -> None: """ Synchronize a repository by cloning or fetching. If the repository doesn't exist locally, clone it. If it exists, fetch all updates. Args: repo: Repo model instance ssh_key_content: SSH private key content for authentication Raises: Exception: If clone or fetch operation fails """ local_path = Path(repo.local_path) # Update repo status to syncing repo.status = "syncing" repo.last_sync_at = int(time.time()) self.db.commit() if local_path.exists(): # Repository exists, fetch updates self._fetch_repo(str(local_path), ssh_key_content) else: # Repository doesn't exist, clone it self._clone_repo(repo.clone_url, str(local_path), ssh_key_content) def _clone_repo(self, clone_url: str, local_path: str, ssh_key: str) -> None: """ Clone a repository using git clone --mirror. Creates a bare mirror clone of the repository. Args: clone_url: Git clone URL (SSH format) local_path: Local path where repo should be cloned ssh_key: SSH private key content for authentication Raises: subprocess.CalledProcessError: If git clone fails IOError: If unable to create temporary SSH key file """ # Create a temporary file for SSH key with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.key') as key_file: key_file.write(ssh_key) key_file_path = key_file.name try: # Set appropriate permissions for SSH key os.chmod(key_file_path, 0o600) # Create SSH command wrapper that uses our key ssh_cmd = f'ssh -i {key_file_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' # Git clone command with mirror option git_cmd = [ 'git', 'clone', '--mirror', clone_url, local_path ] # Run git clone with SSH authentication env = os.environ.copy() env['GIT_SSH_COMMAND'] = ssh_cmd result = subprocess.run( git_cmd, env=env, capture_output=True, text=True, check=True ) return result finally: # Clean up temporary SSH key file try: os.unlink(key_file_path) except OSError: pass def _fetch_repo(self, local_path: str, ssh_key: str) -> None: """ Fetch all updates for an existing repository. Args: local_path: Local path to the repository ssh_key: SSH private key content for authentication Raises: subprocess.CalledProcessError: If git fetch fails IOError: If unable to create temporary SSH key file """ # Create a temporary file for SSH key with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.key') as key_file: key_file.write(ssh_key) key_file_path = key_file.name try: # Set appropriate permissions for SSH key os.chmod(key_file_path, 0o600) # Create SSH command wrapper that uses our key ssh_cmd = f'ssh -i {key_file_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' # Git fetch command to get all updates git_cmd = [ 'git', '--git-dir', local_path, 'fetch', '--all' ] # Run git fetch with SSH authentication env = os.environ.copy() env['GIT_SSH_COMMAND'] = ssh_cmd result = subprocess.run( git_cmd, env=env, capture_output=True, text=True, check=True ) return result finally: # Clean up temporary SSH key file try: os.unlink(key_file_path) except OSError: pass def _count_commits(self, repo_path: str) -> int: """ Count the number of commits in a repository. Args: repo_path: Path to the repository Returns: Number of commits, or 0 if counting fails """ try: git_cmd = [ 'git', '--git-dir', repo_path, 'rev-list', '--all', '--count' ] result = subprocess.run( git_cmd, capture_output=True, text=True, check=True ) return int(result.stdout.strip()) except (subprocess.CalledProcessError, ValueError): return 0 def get_repo_commits(self, repo: Repo, limit: int = 100) -> List[Dict[str, str]]: """ Get commit history for a repository. Args: repo: Repo model instance limit: Maximum number of commits to return Returns: List of commit dictionaries containing: - hash: Commit SHA - message: Commit message - author: Author name - email: Author email - date: Commit timestamp (Unix timestamp) """ repo_path = Path(repo.local_path) if not repo_path.exists(): return [] try: git_cmd = [ 'git', '--git-dir', str(repo_path), 'log', '--all', f'--max-count={limit}', '--format=%H|%s|%an|%ae|%ct' ] result = subprocess.run( git_cmd, capture_output=True, text=True, check=True ) commits = [] for line in result.stdout.strip().split('\n'): if line: parts = line.split('|') if len(parts) == 5: commits.append({ 'hash': parts[0], 'message': parts[1], 'author': parts[2], 'email': parts[3], 'date': int(parts[4]) }) return commits except subprocess.CalledProcessError: return []