package sync import ( "fmt" "net/url" "os" "os/exec" "path/filepath" "sync" "time" "gitm/internal/database" "gitm/internal/gitea" "gitm/internal/models" ) type RepoProgress struct { FullName string `json:"full_name"` Action string `json:"action"` // "cloning" or "fetching" } type ServerProgress struct { ServerID int64 `json:"server_id"` Total int `json:"total"` Completed int `json:"completed"` SuccessCount int `json:"success_count"` FailedCount int `json:"failed_count"` Current *RepoProgress `json:"current"` Status string `json:"status"` // "syncing" or "idle" } type Engine struct { maxConcurrent int mu sync.Mutex activeTasks map[int64]string progress map[int64]*ServerProgress progressMu sync.RWMutex } func NewEngine(maxConcurrent int) *Engine { return &Engine{ maxConcurrent: maxConcurrent, activeTasks: make(map[int64]string), progress: make(map[int64]*ServerProgress), } } func (e *Engine) GetProgress(serverID int64) *ServerProgress { e.progressMu.RLock() defer e.progressMu.RUnlock() if p, ok := e.progress[serverID]; ok { return p } return &ServerProgress{ServerID: serverID, Status: "idle"} } func (e *Engine) setProgress(serverID int64, p *ServerProgress) { e.progressMu.Lock() defer e.progressMu.Unlock() e.progress[serverID] = p } func (e *Engine) updateProgressCounts(serverID int64, successDelta, failedDelta int) { e.progressMu.Lock() defer e.progressMu.Unlock() if p, ok := e.progress[serverID]; ok { p.SuccessCount += successDelta p.FailedCount += failedDelta p.Completed = p.SuccessCount + p.FailedCount } } func (e *Engine) setCurrentRepo(serverID int64, fullName, action string) { e.progressMu.Lock() defer e.progressMu.Unlock() if p, ok := e.progress[serverID]; ok { p.Current = &RepoProgress{FullName: fullName, Action: action} } } func (e *Engine) clearCurrentRepo(serverID int64) { e.progressMu.Lock() defer e.progressMu.Unlock() if p, ok := e.progress[serverID]; ok { p.Current = nil } } func (e *Engine) SyncServer(serverID int64) error { e.mu.Lock() if _, active := e.activeTasks[serverID]; active { e.mu.Unlock() return fmt.Errorf("sync already in progress for server %d", serverID) } taskID := fmt.Sprintf("%d-%d", serverID, time.Now().Unix()) e.activeTasks[serverID] = taskID e.mu.Unlock() defer func() { e.mu.Lock() delete(e.activeTasks, serverID) e.mu.Unlock() }() server, err := database.GetServer(serverID) if err != nil || server == nil { return fmt.Errorf("failed to get server: %w", err) } // Initialize progress e.setProgress(serverID, &ServerProgress{ ServerID: serverID, Total: 0, Status: "syncing", }) syncLog := &models.SyncLog{ ServerID: serverID, Status: "in_progress", Message: "开始同步", StartedAt: time.Now(), } if err := database.CreateSyncLog(syncLog); err != nil { return fmt.Errorf("failed to create sync log: %w", err) } e.setCurrentRepo(serverID, "", "验证认证中") client, err := gitea.NewClient(server.URL, server.Token) if err != nil { e.finishLog(syncLog, "failed", fmt.Sprintf("创建客户端失败: %v", err)) e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"}) return err } if _, err := client.ValidateToken(); err != nil { e.finishLog(syncLog, "failed", fmt.Sprintf("认证失败: %v", err)) e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"}) return err } e.setCurrentRepo(serverID, "", "获取仓库列表中") giteaRepos, err := client.GetAllRepos() if err != nil { e.finishLog(syncLog, "failed", fmt.Sprintf("获取仓库列表失败: %v", err)) e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"}) return err } // Update total count e.progressMu.Lock() if p, ok := e.progress[serverID]; ok { p.Total = len(giteaRepos) } e.progressMu.Unlock() reposDir, _ := database.GetSetting("repos_dir") if reposDir == "" { reposDir = "./data/repos" } serverDir := filepath.Join(reposDir, fmt.Sprintf("server_%d_%s", serverID, server.Name)) os.MkdirAll(serverDir, 0755) sem := make(chan struct{}, e.maxConcurrent) var wg sync.WaitGroup for _, gr := range giteaRepos { wg.Add(1) go func(giteaRepo gitea.GiteaRepo) { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() localPath := filepath.Join(serverDir, giteaRepo.FullName+".git") existingRepo, _ := database.GetRepoByFullName(serverID, giteaRepo.FullName) var err error if existingRepo == nil { e.setCurrentRepo(serverID, giteaRepo.FullName, "克隆") err = e.cloneMirror(giteaRepo.CloneURL, server.Token, localPath) if err == nil { repo := &models.Repo{ ServerID: serverID, Name: giteaRepo.Name, FullName: giteaRepo.FullName, CloneURL: giteaRepo.CloneURL, LocalPath: localPath, SyncStatus: "success", } database.CreateRepo(repo) } } else { if _, statErr := os.Stat(localPath); statErr != nil { e.setCurrentRepo(serverID, giteaRepo.FullName, "克隆") err = e.cloneMirror(giteaRepo.CloneURL, server.Token, localPath) if err == nil { existingRepo.LocalPath = localPath database.UpdateRepoSyncStatus(existingRepo.ID, "success") } } else { e.setCurrentRepo(serverID, giteaRepo.FullName, "同步") err = e.fetchMirror(localPath) if err == nil { database.UpdateRepoSyncStatus(existingRepo.ID, "success") } } } e.clearCurrentRepo(serverID) if err != nil { e.updateProgressCounts(serverID, 0, 1) failLog := &models.SyncLog{ ServerID: serverID, RepoID: getRepoID(serverID, giteaRepo.FullName), Status: "failed", Message: err.Error(), StartedAt: time.Now(), } database.CreateSyncLog(failLog) } else { e.updateProgressCounts(serverID, 1, 0) } }(gr) } wg.Wait() database.UpdateServerLastSync(serverID) // Get final counts for message p := e.GetProgress(serverID) message := fmt.Sprintf("同步完成: %d 成功, %d 失败", p.SuccessCount, p.FailedCount) e.finishLog(syncLog, "success", message) // Mark as idle but keep final progress visible briefly e.progressMu.Lock() finalP := e.progress[serverID] if finalP != nil { finalP.Current = nil finalP.Status = "completed" } e.progressMu.Unlock() // Clear after 30 seconds go func() { time.Sleep(30 * time.Second) e.progressMu.Lock() if p, ok := e.progress[serverID]; ok && p.Status == "completed" { p.Status = "idle" } e.progressMu.Unlock() }() return nil } func (e *Engine) cloneMirror(cloneURL, token, path string) error { if _, err := os.Stat(path); err == nil { return nil } os.MkdirAll(filepath.Dir(path), 0755) authURL := injectToken(cloneURL, token) cmd := exec.Command("git", "clone", "--mirror", authURL, path) output, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("clone failed: %w - %s", err, string(output)) } return nil } func injectToken(rawURL, token string) string { u, err := url.Parse(rawURL) if err != nil { return rawURL } u.User = url.UserPassword(token, "x-oauth-basic") return u.String() } func (e *Engine) fetchMirror(path string) error { cmd := exec.Command("git", "--git-dir", path, "fetch", "--all", "--prune") output, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("fetch failed: %w - %s", err, string(output)) } return nil } func (e *Engine) finishLog(log *models.SyncLog, status, message string) { now := time.Now() log.Status = status log.Message = message log.FinishedAt = &now database.UpdateSyncLog(log.ID, status, message, log.FinishedAt) } func (e *Engine) IsSyncing(serverID int64) bool { e.mu.Lock() defer e.mu.Unlock() _, active := e.activeTasks[serverID] return active } func getRepoID(serverID int64, fullName string) *int64 { repo, _ := database.GetRepoByFullName(serverID, fullName) if repo != nil { return &repo.ID } return nil }