Files
GitM/internal/sync/engine.go
panw 5eff309a9f feat: 添加同步进度显示和中文界面支持
refactor: 重构同步引擎以支持进度跟踪
style: 更新前端界面为中文
docs: 更新README为中文文档
2026-04-01 10:43:51 +08:00

309 lines
7.9 KiB
Go

package sync
import (
"fmt"
"net/url"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
"gitm/internal/database"
"gitm/internal/gitea"
"gitm/internal/models"
)
type RepoProgress struct {
FullName string `json:"full_name"`
Action string `json:"action"` // "cloning" or "fetching"
}
type ServerProgress struct {
ServerID int64 `json:"server_id"`
Total int `json:"total"`
Completed int `json:"completed"`
SuccessCount int `json:"success_count"`
FailedCount int `json:"failed_count"`
Current *RepoProgress `json:"current"`
Status string `json:"status"` // "syncing" or "idle"
}
type Engine struct {
maxConcurrent int
mu sync.Mutex
activeTasks map[int64]string
progress map[int64]*ServerProgress
progressMu sync.RWMutex
}
func NewEngine(maxConcurrent int) *Engine {
return &Engine{
maxConcurrent: maxConcurrent,
activeTasks: make(map[int64]string),
progress: make(map[int64]*ServerProgress),
}
}
func (e *Engine) GetProgress(serverID int64) *ServerProgress {
e.progressMu.RLock()
defer e.progressMu.RUnlock()
if p, ok := e.progress[serverID]; ok {
return p
}
return &ServerProgress{ServerID: serverID, Status: "idle"}
}
func (e *Engine) setProgress(serverID int64, p *ServerProgress) {
e.progressMu.Lock()
defer e.progressMu.Unlock()
e.progress[serverID] = p
}
func (e *Engine) updateProgressCounts(serverID int64, successDelta, failedDelta int) {
e.progressMu.Lock()
defer e.progressMu.Unlock()
if p, ok := e.progress[serverID]; ok {
p.SuccessCount += successDelta
p.FailedCount += failedDelta
p.Completed = p.SuccessCount + p.FailedCount
}
}
func (e *Engine) setCurrentRepo(serverID int64, fullName, action string) {
e.progressMu.Lock()
defer e.progressMu.Unlock()
if p, ok := e.progress[serverID]; ok {
p.Current = &RepoProgress{FullName: fullName, Action: action}
}
}
func (e *Engine) clearCurrentRepo(serverID int64) {
e.progressMu.Lock()
defer e.progressMu.Unlock()
if p, ok := e.progress[serverID]; ok {
p.Current = nil
}
}
func (e *Engine) SyncServer(serverID int64) error {
e.mu.Lock()
if _, active := e.activeTasks[serverID]; active {
e.mu.Unlock()
return fmt.Errorf("sync already in progress for server %d", serverID)
}
taskID := fmt.Sprintf("%d-%d", serverID, time.Now().Unix())
e.activeTasks[serverID] = taskID
e.mu.Unlock()
defer func() {
e.mu.Lock()
delete(e.activeTasks, serverID)
e.mu.Unlock()
}()
server, err := database.GetServer(serverID)
if err != nil || server == nil {
return fmt.Errorf("failed to get server: %w", err)
}
// Initialize progress
e.setProgress(serverID, &ServerProgress{
ServerID: serverID,
Total: 0,
Status: "syncing",
})
syncLog := &models.SyncLog{
ServerID: serverID,
Status: "in_progress",
Message: "开始同步",
StartedAt: time.Now(),
}
if err := database.CreateSyncLog(syncLog); err != nil {
return fmt.Errorf("failed to create sync log: %w", err)
}
e.setCurrentRepo(serverID, "", "验证认证中")
client, err := gitea.NewClient(server.URL, server.Token)
if err != nil {
e.finishLog(syncLog, "failed", fmt.Sprintf("创建客户端失败: %v", err))
e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"})
return err
}
if _, err := client.ValidateToken(); err != nil {
e.finishLog(syncLog, "failed", fmt.Sprintf("认证失败: %v", err))
e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"})
return err
}
e.setCurrentRepo(serverID, "", "获取仓库列表中")
giteaRepos, err := client.GetAllRepos()
if err != nil {
e.finishLog(syncLog, "failed", fmt.Sprintf("获取仓库列表失败: %v", err))
e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"})
return err
}
// Update total count
e.progressMu.Lock()
if p, ok := e.progress[serverID]; ok {
p.Total = len(giteaRepos)
}
e.progressMu.Unlock()
reposDir, _ := database.GetSetting("repos_dir")
if reposDir == "" {
reposDir = "./data/repos"
}
serverDir := filepath.Join(reposDir, fmt.Sprintf("server_%d_%s", serverID, server.Name))
os.MkdirAll(serverDir, 0755)
sem := make(chan struct{}, e.maxConcurrent)
var wg sync.WaitGroup
for _, gr := range giteaRepos {
wg.Add(1)
go func(giteaRepo gitea.GiteaRepo) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
localPath := filepath.Join(serverDir, giteaRepo.FullName+".git")
existingRepo, _ := database.GetRepoByFullName(serverID, giteaRepo.FullName)
var err error
if existingRepo == nil {
e.setCurrentRepo(serverID, giteaRepo.FullName, "克隆")
err = e.cloneMirror(giteaRepo.CloneURL, server.Token, localPath)
if err == nil {
repo := &models.Repo{
ServerID: serverID,
Name: giteaRepo.Name,
FullName: giteaRepo.FullName,
CloneURL: giteaRepo.CloneURL,
LocalPath: localPath,
SyncStatus: "success",
}
database.CreateRepo(repo)
}
} else {
if _, statErr := os.Stat(localPath); statErr != nil {
e.setCurrentRepo(serverID, giteaRepo.FullName, "克隆")
err = e.cloneMirror(giteaRepo.CloneURL, server.Token, localPath)
if err == nil {
existingRepo.LocalPath = localPath
database.UpdateRepoSyncStatus(existingRepo.ID, "success")
}
} else {
e.setCurrentRepo(serverID, giteaRepo.FullName, "同步")
err = e.fetchMirror(localPath)
if err == nil {
database.UpdateRepoSyncStatus(existingRepo.ID, "success")
}
}
}
e.clearCurrentRepo(serverID)
if err != nil {
e.updateProgressCounts(serverID, 0, 1)
failLog := &models.SyncLog{
ServerID: serverID,
RepoID: getRepoID(serverID, giteaRepo.FullName),
Status: "failed",
Message: err.Error(),
StartedAt: time.Now(),
}
database.CreateSyncLog(failLog)
} else {
e.updateProgressCounts(serverID, 1, 0)
}
}(gr)
}
wg.Wait()
database.UpdateServerLastSync(serverID)
// Get final counts for message
p := e.GetProgress(serverID)
message := fmt.Sprintf("同步完成: %d 成功, %d 失败", p.SuccessCount, p.FailedCount)
e.finishLog(syncLog, "success", message)
// Mark as idle but keep final progress visible briefly
e.progressMu.Lock()
finalP := e.progress[serverID]
if finalP != nil {
finalP.Current = nil
finalP.Status = "completed"
}
e.progressMu.Unlock()
// Clear after 30 seconds
go func() {
time.Sleep(30 * time.Second)
e.progressMu.Lock()
if p, ok := e.progress[serverID]; ok && p.Status == "completed" {
p.Status = "idle"
}
e.progressMu.Unlock()
}()
return nil
}
func (e *Engine) cloneMirror(cloneURL, token, path string) error {
if _, err := os.Stat(path); err == nil {
return nil
}
os.MkdirAll(filepath.Dir(path), 0755)
authURL := injectToken(cloneURL, token)
cmd := exec.Command("git", "clone", "--mirror", authURL, path)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("clone failed: %w - %s", err, string(output))
}
return nil
}
func injectToken(rawURL, token string) string {
u, err := url.Parse(rawURL)
if err != nil {
return rawURL
}
u.User = url.UserPassword(token, "x-oauth-basic")
return u.String()
}
func (e *Engine) fetchMirror(path string) error {
cmd := exec.Command("git", "--git-dir", path, "fetch", "--all", "--prune")
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("fetch failed: %w - %s", err, string(output))
}
return nil
}
func (e *Engine) finishLog(log *models.SyncLog, status, message string) {
now := time.Now()
log.Status = status
log.Message = message
log.FinishedAt = &now
database.UpdateSyncLog(log.ID, status, message, log.FinishedAt)
}
func (e *Engine) IsSyncing(serverID int64) bool {
e.mu.Lock()
defer e.mu.Unlock()
_, active := e.activeTasks[serverID]
return active
}
func getRepoID(serverID int64, fullName string) *int64 {
repo, _ := database.GetRepoByFullName(serverID, fullName)
if repo != nil {
return &repo.ID
}
return nil
}