309 lines
7.9 KiB
Go
309 lines
7.9 KiB
Go
package sync
|
|
|
|
import (
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitm/internal/database"
|
|
"gitm/internal/gitea"
|
|
"gitm/internal/models"
|
|
)
|
|
|
|
type RepoProgress struct {
|
|
FullName string `json:"full_name"`
|
|
Action string `json:"action"` // "cloning" or "fetching"
|
|
}
|
|
|
|
type ServerProgress struct {
|
|
ServerID int64 `json:"server_id"`
|
|
Total int `json:"total"`
|
|
Completed int `json:"completed"`
|
|
SuccessCount int `json:"success_count"`
|
|
FailedCount int `json:"failed_count"`
|
|
Current *RepoProgress `json:"current"`
|
|
Status string `json:"status"` // "syncing" or "idle"
|
|
}
|
|
|
|
type Engine struct {
|
|
maxConcurrent int
|
|
mu sync.Mutex
|
|
activeTasks map[int64]string
|
|
progress map[int64]*ServerProgress
|
|
progressMu sync.RWMutex
|
|
}
|
|
|
|
func NewEngine(maxConcurrent int) *Engine {
|
|
return &Engine{
|
|
maxConcurrent: maxConcurrent,
|
|
activeTasks: make(map[int64]string),
|
|
progress: make(map[int64]*ServerProgress),
|
|
}
|
|
}
|
|
|
|
func (e *Engine) GetProgress(serverID int64) *ServerProgress {
|
|
e.progressMu.RLock()
|
|
defer e.progressMu.RUnlock()
|
|
if p, ok := e.progress[serverID]; ok {
|
|
return p
|
|
}
|
|
return &ServerProgress{ServerID: serverID, Status: "idle"}
|
|
}
|
|
|
|
func (e *Engine) setProgress(serverID int64, p *ServerProgress) {
|
|
e.progressMu.Lock()
|
|
defer e.progressMu.Unlock()
|
|
e.progress[serverID] = p
|
|
}
|
|
|
|
func (e *Engine) updateProgressCounts(serverID int64, successDelta, failedDelta int) {
|
|
e.progressMu.Lock()
|
|
defer e.progressMu.Unlock()
|
|
if p, ok := e.progress[serverID]; ok {
|
|
p.SuccessCount += successDelta
|
|
p.FailedCount += failedDelta
|
|
p.Completed = p.SuccessCount + p.FailedCount
|
|
}
|
|
}
|
|
|
|
func (e *Engine) setCurrentRepo(serverID int64, fullName, action string) {
|
|
e.progressMu.Lock()
|
|
defer e.progressMu.Unlock()
|
|
if p, ok := e.progress[serverID]; ok {
|
|
p.Current = &RepoProgress{FullName: fullName, Action: action}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) clearCurrentRepo(serverID int64) {
|
|
e.progressMu.Lock()
|
|
defer e.progressMu.Unlock()
|
|
if p, ok := e.progress[serverID]; ok {
|
|
p.Current = nil
|
|
}
|
|
}
|
|
|
|
func (e *Engine) SyncServer(serverID int64) error {
|
|
e.mu.Lock()
|
|
if _, active := e.activeTasks[serverID]; active {
|
|
e.mu.Unlock()
|
|
return fmt.Errorf("sync already in progress for server %d", serverID)
|
|
}
|
|
taskID := fmt.Sprintf("%d-%d", serverID, time.Now().Unix())
|
|
e.activeTasks[serverID] = taskID
|
|
e.mu.Unlock()
|
|
|
|
defer func() {
|
|
e.mu.Lock()
|
|
delete(e.activeTasks, serverID)
|
|
e.mu.Unlock()
|
|
}()
|
|
|
|
server, err := database.GetServer(serverID)
|
|
if err != nil || server == nil {
|
|
return fmt.Errorf("failed to get server: %w", err)
|
|
}
|
|
|
|
// Initialize progress
|
|
e.setProgress(serverID, &ServerProgress{
|
|
ServerID: serverID,
|
|
Total: 0,
|
|
Status: "syncing",
|
|
})
|
|
|
|
syncLog := &models.SyncLog{
|
|
ServerID: serverID,
|
|
Status: "in_progress",
|
|
Message: "开始同步",
|
|
StartedAt: time.Now(),
|
|
}
|
|
if err := database.CreateSyncLog(syncLog); err != nil {
|
|
return fmt.Errorf("failed to create sync log: %w", err)
|
|
}
|
|
|
|
e.setCurrentRepo(serverID, "", "验证认证中")
|
|
client, err := gitea.NewClient(server.URL, server.Token)
|
|
if err != nil {
|
|
e.finishLog(syncLog, "failed", fmt.Sprintf("创建客户端失败: %v", err))
|
|
e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"})
|
|
return err
|
|
}
|
|
|
|
if _, err := client.ValidateToken(); err != nil {
|
|
e.finishLog(syncLog, "failed", fmt.Sprintf("认证失败: %v", err))
|
|
e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"})
|
|
return err
|
|
}
|
|
|
|
e.setCurrentRepo(serverID, "", "获取仓库列表中")
|
|
giteaRepos, err := client.GetAllRepos()
|
|
if err != nil {
|
|
e.finishLog(syncLog, "failed", fmt.Sprintf("获取仓库列表失败: %v", err))
|
|
e.setProgress(serverID, &ServerProgress{ServerID: serverID, Status: "idle"})
|
|
return err
|
|
}
|
|
|
|
// Update total count
|
|
e.progressMu.Lock()
|
|
if p, ok := e.progress[serverID]; ok {
|
|
p.Total = len(giteaRepos)
|
|
}
|
|
e.progressMu.Unlock()
|
|
|
|
reposDir, _ := database.GetSetting("repos_dir")
|
|
if reposDir == "" {
|
|
reposDir = "./data/repos"
|
|
}
|
|
serverDir := filepath.Join(reposDir, fmt.Sprintf("server_%d_%s", serverID, server.Name))
|
|
os.MkdirAll(serverDir, 0755)
|
|
|
|
sem := make(chan struct{}, e.maxConcurrent)
|
|
var wg sync.WaitGroup
|
|
|
|
for _, gr := range giteaRepos {
|
|
wg.Add(1)
|
|
go func(giteaRepo gitea.GiteaRepo) {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
localPath := filepath.Join(serverDir, giteaRepo.FullName+".git")
|
|
existingRepo, _ := database.GetRepoByFullName(serverID, giteaRepo.FullName)
|
|
|
|
var err error
|
|
if existingRepo == nil {
|
|
e.setCurrentRepo(serverID, giteaRepo.FullName, "克隆")
|
|
err = e.cloneMirror(giteaRepo.CloneURL, server.Token, localPath)
|
|
if err == nil {
|
|
repo := &models.Repo{
|
|
ServerID: serverID,
|
|
Name: giteaRepo.Name,
|
|
FullName: giteaRepo.FullName,
|
|
CloneURL: giteaRepo.CloneURL,
|
|
LocalPath: localPath,
|
|
SyncStatus: "success",
|
|
}
|
|
database.CreateRepo(repo)
|
|
}
|
|
} else {
|
|
if _, statErr := os.Stat(localPath); statErr != nil {
|
|
e.setCurrentRepo(serverID, giteaRepo.FullName, "克隆")
|
|
err = e.cloneMirror(giteaRepo.CloneURL, server.Token, localPath)
|
|
if err == nil {
|
|
existingRepo.LocalPath = localPath
|
|
database.UpdateRepoSyncStatus(existingRepo.ID, "success")
|
|
}
|
|
} else {
|
|
e.setCurrentRepo(serverID, giteaRepo.FullName, "同步")
|
|
err = e.fetchMirror(localPath)
|
|
if err == nil {
|
|
database.UpdateRepoSyncStatus(existingRepo.ID, "success")
|
|
}
|
|
}
|
|
}
|
|
|
|
e.clearCurrentRepo(serverID)
|
|
if err != nil {
|
|
e.updateProgressCounts(serverID, 0, 1)
|
|
failLog := &models.SyncLog{
|
|
ServerID: serverID,
|
|
RepoID: getRepoID(serverID, giteaRepo.FullName),
|
|
Status: "failed",
|
|
Message: err.Error(),
|
|
StartedAt: time.Now(),
|
|
}
|
|
database.CreateSyncLog(failLog)
|
|
} else {
|
|
e.updateProgressCounts(serverID, 1, 0)
|
|
}
|
|
}(gr)
|
|
}
|
|
|
|
wg.Wait()
|
|
database.UpdateServerLastSync(serverID)
|
|
|
|
// Get final counts for message
|
|
p := e.GetProgress(serverID)
|
|
message := fmt.Sprintf("同步完成: %d 成功, %d 失败", p.SuccessCount, p.FailedCount)
|
|
e.finishLog(syncLog, "success", message)
|
|
|
|
// Mark as idle but keep final progress visible briefly
|
|
e.progressMu.Lock()
|
|
finalP := e.progress[serverID]
|
|
if finalP != nil {
|
|
finalP.Current = nil
|
|
finalP.Status = "completed"
|
|
}
|
|
e.progressMu.Unlock()
|
|
|
|
// Clear after 30 seconds
|
|
go func() {
|
|
time.Sleep(30 * time.Second)
|
|
e.progressMu.Lock()
|
|
if p, ok := e.progress[serverID]; ok && p.Status == "completed" {
|
|
p.Status = "idle"
|
|
}
|
|
e.progressMu.Unlock()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) cloneMirror(cloneURL, token, path string) error {
|
|
if _, err := os.Stat(path); err == nil {
|
|
return nil
|
|
}
|
|
os.MkdirAll(filepath.Dir(path), 0755)
|
|
authURL := injectToken(cloneURL, token)
|
|
cmd := exec.Command("git", "clone", "--mirror", authURL, path)
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("clone failed: %w - %s", err, string(output))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func injectToken(rawURL, token string) string {
|
|
u, err := url.Parse(rawURL)
|
|
if err != nil {
|
|
return rawURL
|
|
}
|
|
u.User = url.UserPassword(token, "x-oauth-basic")
|
|
return u.String()
|
|
}
|
|
|
|
func (e *Engine) fetchMirror(path string) error {
|
|
cmd := exec.Command("git", "--git-dir", path, "fetch", "--all", "--prune")
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("fetch failed: %w - %s", err, string(output))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) finishLog(log *models.SyncLog, status, message string) {
|
|
now := time.Now()
|
|
log.Status = status
|
|
log.Message = message
|
|
log.FinishedAt = &now
|
|
database.UpdateSyncLog(log.ID, status, message, log.FinishedAt)
|
|
}
|
|
|
|
func (e *Engine) IsSyncing(serverID int64) bool {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
_, active := e.activeTasks[serverID]
|
|
return active
|
|
}
|
|
|
|
func getRepoID(serverID int64, fullName string) *int64 {
|
|
repo, _ := database.GetRepoByFullName(serverID, fullName)
|
|
if repo != nil {
|
|
return &repo.ID
|
|
}
|
|
return nil
|
|
}
|