feat: implement sync engine and scheduler
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
190
internal/sync/engine.go
Normal file
190
internal/sync/engine.go
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitm/internal/database"
|
||||||
|
"gitm/internal/gitea"
|
||||||
|
"gitm/internal/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Engine struct {
|
||||||
|
maxConcurrent int
|
||||||
|
mu sync.Mutex
|
||||||
|
activeTasks map[int64]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEngine(maxConcurrent int) *Engine {
|
||||||
|
return &Engine{
|
||||||
|
maxConcurrent: maxConcurrent,
|
||||||
|
activeTasks: make(map[int64]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) SyncServer(serverID int64) error {
|
||||||
|
e.mu.Lock()
|
||||||
|
if _, active := e.activeTasks[serverID]; active {
|
||||||
|
e.mu.Unlock()
|
||||||
|
return fmt.Errorf("sync already in progress for server %d", serverID)
|
||||||
|
}
|
||||||
|
taskID := fmt.Sprintf("%d-%d", serverID, time.Now().Unix())
|
||||||
|
e.activeTasks[serverID] = taskID
|
||||||
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
e.mu.Lock()
|
||||||
|
delete(e.activeTasks, serverID)
|
||||||
|
e.mu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
server, err := database.GetServer(serverID)
|
||||||
|
if err != nil || server == nil {
|
||||||
|
return fmt.Errorf("failed to get server: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
syncLog := &models.SyncLog{
|
||||||
|
ServerID: serverID,
|
||||||
|
Status: "in_progress",
|
||||||
|
Message: "Starting sync",
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
if err := database.CreateSyncLog(syncLog); err != nil {
|
||||||
|
return fmt.Errorf("failed to create sync log: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := gitea.NewClient(server.URL, server.Token)
|
||||||
|
if err != nil {
|
||||||
|
e.finishLog(syncLog, "failed", fmt.Sprintf("Failed to create client: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := client.ValidateToken(); err != nil {
|
||||||
|
e.finishLog(syncLog, "failed", fmt.Sprintf("Authentication failed: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
giteaRepos, err := client.GetAllRepos()
|
||||||
|
if err != nil {
|
||||||
|
e.finishLog(syncLog, "failed", fmt.Sprintf("Failed to fetch repos: %v", err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reposDir, _ := database.GetSetting("repos_dir")
|
||||||
|
if reposDir == "" {
|
||||||
|
reposDir = "./data/repos"
|
||||||
|
}
|
||||||
|
serverDir := filepath.Join(reposDir, fmt.Sprintf("server_%d_%s", serverID, server.Name))
|
||||||
|
os.MkdirAll(serverDir, 0755)
|
||||||
|
|
||||||
|
successCount := 0
|
||||||
|
failedCount := 0
|
||||||
|
sem := make(chan struct{}, e.maxConcurrent)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var resultsMu sync.Mutex
|
||||||
|
|
||||||
|
for _, gr := range giteaRepos {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(giteaRepo gitea.GiteaRepo) {
|
||||||
|
defer wg.Done()
|
||||||
|
sem <- struct{}{}
|
||||||
|
defer func() { <-sem }()
|
||||||
|
|
||||||
|
localPath := filepath.Join(serverDir, giteaRepo.FullName+".git")
|
||||||
|
existingRepo, _ := database.GetRepoByFullName(serverID, giteaRepo.FullName)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if existingRepo == nil {
|
||||||
|
err = e.cloneMirror(giteaRepo.CloneURL, localPath)
|
||||||
|
if err == nil {
|
||||||
|
repo := &models.Repo{
|
||||||
|
ServerID: serverID,
|
||||||
|
Name: giteaRepo.Name,
|
||||||
|
FullName: giteaRepo.FullName,
|
||||||
|
CloneURL: giteaRepo.CloneURL,
|
||||||
|
LocalPath: localPath,
|
||||||
|
SyncStatus: "success",
|
||||||
|
}
|
||||||
|
database.CreateRepo(repo)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = e.fetchMirror(localPath)
|
||||||
|
if err == nil {
|
||||||
|
database.UpdateRepoSyncStatus(existingRepo.ID, "success")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resultsMu.Lock()
|
||||||
|
if err != nil {
|
||||||
|
failedCount++
|
||||||
|
failLog := &models.SyncLog{
|
||||||
|
ServerID: serverID,
|
||||||
|
RepoID: getRepoID(serverID, giteaRepo.FullName),
|
||||||
|
Status: "failed",
|
||||||
|
Message: err.Error(),
|
||||||
|
StartedAt: time.Now(),
|
||||||
|
}
|
||||||
|
database.CreateSyncLog(failLog)
|
||||||
|
} else {
|
||||||
|
successCount++
|
||||||
|
}
|
||||||
|
resultsMu.Unlock()
|
||||||
|
}(gr)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
database.UpdateServerLastSync(serverID)
|
||||||
|
|
||||||
|
message := fmt.Sprintf("Sync completed: %d succeeded, %d failed", successCount, failedCount)
|
||||||
|
e.finishLog(syncLog, "success", message)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) cloneMirror(url, path string) error {
|
||||||
|
if _, err := os.Stat(path); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
os.MkdirAll(filepath.Dir(path), 0755)
|
||||||
|
cmd := exec.Command("git", "clone", "--mirror", url, path)
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("clone failed: %w - %s", err, string(output))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) fetchMirror(path string) error {
|
||||||
|
cmd := exec.Command("git", "--git-dir", path, "fetch", "--all", "--prune")
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("fetch failed: %w - %s", err, string(output))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) finishLog(log *models.SyncLog, status, message string) {
|
||||||
|
now := time.Now()
|
||||||
|
log.Status = status
|
||||||
|
log.Message = message
|
||||||
|
log.FinishedAt = &now
|
||||||
|
database.UpdateSyncLog(log.ID, status, message, log.FinishedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) IsSyncing(serverID int64) bool {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
_, active := e.activeTasks[serverID]
|
||||||
|
return active
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRepoID(serverID int64, fullName string) *int64 {
|
||||||
|
repo, _ := database.GetRepoByFullName(serverID, fullName)
|
||||||
|
if repo != nil {
|
||||||
|
return &repo.ID
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
74
internal/sync/scheduler.go
Normal file
74
internal/sync/scheduler.go
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/robfig/cron/v3"
|
||||||
|
"gitm/internal/database"
|
||||||
|
"gitm/internal/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Scheduler struct {
|
||||||
|
cron *cron.Cron
|
||||||
|
engine *Engine
|
||||||
|
serverJobs map[int64]cron.EntryID
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewScheduler(engine *Engine) *Scheduler {
|
||||||
|
return &Scheduler{
|
||||||
|
cron: cron.New(),
|
||||||
|
engine: engine,
|
||||||
|
serverJobs: make(map[int64]cron.EntryID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) Start() {
|
||||||
|
s.cron.Start()
|
||||||
|
log.Println("Scheduler started")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) Stop() {
|
||||||
|
s.cron.Stop()
|
||||||
|
log.Println("Scheduler stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) UpdateServer(server *models.GiteaServer) {
|
||||||
|
if entryID, exists := s.serverJobs[server.ID]; exists {
|
||||||
|
s.cron.Remove(entryID)
|
||||||
|
delete(s.serverJobs, server.ID)
|
||||||
|
}
|
||||||
|
if server.SyncInterval <= 0 || server.Status != "active" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
schedule := fmt.Sprintf("*/%d * * * *", server.SyncInterval)
|
||||||
|
entryID, err := s.cron.AddFunc(schedule, func() {
|
||||||
|
log.Printf("Scheduled sync for server %d (%s)", server.ID, server.Name)
|
||||||
|
if err := s.engine.SyncServer(server.ID); err != nil {
|
||||||
|
log.Printf("Scheduled sync failed for server %d: %v", server.ID, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to schedule for server %d: %v", server.ID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.serverJobs[server.ID] = entryID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) RemoveServer(serverID int64) {
|
||||||
|
if entryID, exists := s.serverJobs[serverID]; exists {
|
||||||
|
s.cron.Remove(entryID)
|
||||||
|
delete(s.serverJobs, serverID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) ReloadAll() error {
|
||||||
|
servers, err := database.GetServers()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, server := range servers {
|
||||||
|
s.UpdateServer(&server)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user