diff --git a/internal/sync/engine.go b/internal/sync/engine.go new file mode 100644 index 0000000..2dd1831 --- /dev/null +++ b/internal/sync/engine.go @@ -0,0 +1,190 @@ +package sync + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "sync" + "time" + + "gitm/internal/database" + "gitm/internal/gitea" + "gitm/internal/models" +) + +type Engine struct { + maxConcurrent int + mu sync.Mutex + activeTasks map[int64]string +} + +func NewEngine(maxConcurrent int) *Engine { + return &Engine{ + maxConcurrent: maxConcurrent, + activeTasks: make(map[int64]string), + } +} + +func (e *Engine) SyncServer(serverID int64) error { + e.mu.Lock() + if _, active := e.activeTasks[serverID]; active { + e.mu.Unlock() + return fmt.Errorf("sync already in progress for server %d", serverID) + } + taskID := fmt.Sprintf("%d-%d", serverID, time.Now().Unix()) + e.activeTasks[serverID] = taskID + e.mu.Unlock() + + defer func() { + e.mu.Lock() + delete(e.activeTasks, serverID) + e.mu.Unlock() + }() + + server, err := database.GetServer(serverID) + if err != nil || server == nil { + return fmt.Errorf("failed to get server: %w", err) + } + + syncLog := &models.SyncLog{ + ServerID: serverID, + Status: "in_progress", + Message: "Starting sync", + StartedAt: time.Now(), + } + if err := database.CreateSyncLog(syncLog); err != nil { + return fmt.Errorf("failed to create sync log: %w", err) + } + + client, err := gitea.NewClient(server.URL, server.Token) + if err != nil { + e.finishLog(syncLog, "failed", fmt.Sprintf("Failed to create client: %v", err)) + return err + } + + if _, err := client.ValidateToken(); err != nil { + e.finishLog(syncLog, "failed", fmt.Sprintf("Authentication failed: %v", err)) + return err + } + + giteaRepos, err := client.GetAllRepos() + if err != nil { + e.finishLog(syncLog, "failed", fmt.Sprintf("Failed to fetch repos: %v", err)) + return err + } + + reposDir, _ := database.GetSetting("repos_dir") + if reposDir == "" { + reposDir = "./data/repos" + } + serverDir := filepath.Join(reposDir, fmt.Sprintf("server_%d_%s", serverID, server.Name)) + os.MkdirAll(serverDir, 0755) + + successCount := 0 + failedCount := 0 + sem := make(chan struct{}, e.maxConcurrent) + var wg sync.WaitGroup + var resultsMu sync.Mutex + + for _, gr := range giteaRepos { + wg.Add(1) + go func(giteaRepo gitea.GiteaRepo) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + localPath := filepath.Join(serverDir, giteaRepo.FullName+".git") + existingRepo, _ := database.GetRepoByFullName(serverID, giteaRepo.FullName) + + var err error + if existingRepo == nil { + err = e.cloneMirror(giteaRepo.CloneURL, localPath) + if err == nil { + repo := &models.Repo{ + ServerID: serverID, + Name: giteaRepo.Name, + FullName: giteaRepo.FullName, + CloneURL: giteaRepo.CloneURL, + LocalPath: localPath, + SyncStatus: "success", + } + database.CreateRepo(repo) + } + } else { + err = e.fetchMirror(localPath) + if err == nil { + database.UpdateRepoSyncStatus(existingRepo.ID, "success") + } + } + + resultsMu.Lock() + if err != nil { + failedCount++ + failLog := &models.SyncLog{ + ServerID: serverID, + RepoID: getRepoID(serverID, giteaRepo.FullName), + Status: "failed", + Message: err.Error(), + StartedAt: time.Now(), + } + database.CreateSyncLog(failLog) + } else { + successCount++ + } + resultsMu.Unlock() + }(gr) + } + + wg.Wait() + database.UpdateServerLastSync(serverID) + + message := fmt.Sprintf("Sync completed: %d succeeded, %d failed", successCount, failedCount) + e.finishLog(syncLog, "success", message) + return nil +} + +func (e *Engine) cloneMirror(url, path string) error { + if _, err := os.Stat(path); err == nil { + return nil + } + os.MkdirAll(filepath.Dir(path), 0755) + cmd := exec.Command("git", "clone", "--mirror", url, path) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("clone failed: %w - %s", err, string(output)) + } + return nil +} + +func (e *Engine) fetchMirror(path string) error { + cmd := exec.Command("git", "--git-dir", path, "fetch", "--all", "--prune") + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("fetch failed: %w - %s", err, string(output)) + } + return nil +} + +func (e *Engine) finishLog(log *models.SyncLog, status, message string) { + now := time.Now() + log.Status = status + log.Message = message + log.FinishedAt = &now + database.UpdateSyncLog(log.ID, status, message, log.FinishedAt) +} + +func (e *Engine) IsSyncing(serverID int64) bool { + e.mu.Lock() + defer e.mu.Unlock() + _, active := e.activeTasks[serverID] + return active +} + +func getRepoID(serverID int64, fullName string) *int64 { + repo, _ := database.GetRepoByFullName(serverID, fullName) + if repo != nil { + return &repo.ID + } + return nil +} diff --git a/internal/sync/scheduler.go b/internal/sync/scheduler.go new file mode 100644 index 0000000..0492b44 --- /dev/null +++ b/internal/sync/scheduler.go @@ -0,0 +1,74 @@ +package sync + +import ( + "fmt" + "log" + + "github.com/robfig/cron/v3" + "gitm/internal/database" + "gitm/internal/models" +) + +type Scheduler struct { + cron *cron.Cron + engine *Engine + serverJobs map[int64]cron.EntryID +} + +func NewScheduler(engine *Engine) *Scheduler { + return &Scheduler{ + cron: cron.New(), + engine: engine, + serverJobs: make(map[int64]cron.EntryID), + } +} + +func (s *Scheduler) Start() { + s.cron.Start() + log.Println("Scheduler started") +} + +func (s *Scheduler) Stop() { + s.cron.Stop() + log.Println("Scheduler stopped") +} + +func (s *Scheduler) UpdateServer(server *models.GiteaServer) { + if entryID, exists := s.serverJobs[server.ID]; exists { + s.cron.Remove(entryID) + delete(s.serverJobs, server.ID) + } + if server.SyncInterval <= 0 || server.Status != "active" { + return + } + schedule := fmt.Sprintf("*/%d * * * *", server.SyncInterval) + entryID, err := s.cron.AddFunc(schedule, func() { + log.Printf("Scheduled sync for server %d (%s)", server.ID, server.Name) + if err := s.engine.SyncServer(server.ID); err != nil { + log.Printf("Scheduled sync failed for server %d: %v", server.ID, err) + } + }) + if err != nil { + log.Printf("Failed to schedule for server %d: %v", server.ID, err) + return + } + s.serverJobs[server.ID] = entryID +} + +func (s *Scheduler) RemoveServer(serverID int64) { + if entryID, exists := s.serverJobs[serverID]; exists { + s.cron.Remove(entryID) + delete(s.serverJobs, serverID) + } +} + +func (s *Scheduler) ReloadAll() error { + servers, err := database.GetServers() + if err != nil { + return err + } + for _, server := range servers { + s.UpdateServer(&server) + } + return nil +}