Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions builder/rpc/mirror_svc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type MirrorSvcClient interface {
CancelMirror(ctx context.Context, mirrorID int64) error
CancelMirror(ctx context.Context, taskID int64) error
}

type MirrorSvcClientImpl struct {
Expand All @@ -23,12 +23,12 @@ func NewMirrorSvcClient(endpoint string, opts ...RequestOption) MirrorSvcClient
}
}

func (c *MirrorSvcClientImpl) CancelMirror(ctx context.Context, mirrorID int64) error {
func (c *MirrorSvcClientImpl) CancelMirror(ctx context.Context, taskID int64) error {
type CancelReq struct {
MirrorID int64 `json:"mirror_id"`
TaskID int64 `json:"task_id"`
}
req := CancelReq{
MirrorID: mirrorID,
TaskID: taskID,
}

path := "/api/v1/lfs_sync_internal/cancel"
Expand Down
83 changes: 79 additions & 4 deletions builder/store/database/mirror_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ type mirrorTaskStoreImpl struct {

type MirrorTaskStore interface {
CancelOtherTasksAndCreate(ctx context.Context, task MirrorTask) (MirrorTask, error)
CancelMirrorTaskByID(ctx context.Context, taskID int64) (bool, error)
Create(ctx context.Context, task MirrorTask) (MirrorTask, error)
Update(ctx context.Context, task MirrorTask) (MirrorTask, error)
UpdateStatusAndRepoSyncStatus(ctx context.Context, task MirrorTask, syncStatus types.RepositorySyncStatus) (MirrorTask, error)
UpdateProgress(ctx context.Context, task MirrorTask) (MirrorTask, error)
UpdateStatusAndRepoSyncStatus(ctx context.Context, task MirrorTask, statusAction string) (MirrorTask, error)
FindByMirrorID(ctx context.Context, mirrorID int64) (*MirrorTask, error)
Delete(ctx context.Context, ID int64) error
GetHighestPriorityByTaskStatus(ctx context.Context, status []types.MirrorTaskStatus) (MirrorTask, error)
Expand All @@ -29,6 +31,21 @@ type MirrorTaskStore interface {
ResetRunningTasks(ctx context.Context, fromStatus types.MirrorTaskStatus, toStatus types.MirrorTaskStatus) (int, error)
}

var mirrorTaskStatusToRepoStatusMap = map[types.MirrorTaskStatus]types.RepositorySyncStatus{
types.MirrorQueued: types.SyncStatusPending,
types.MirrorRepoSyncStart: types.SyncStatusInProgress,
types.MirrorRepoSyncFailed: types.SyncStatusFailed,
types.MirrorRepoSyncFinished: types.SyncStatusInProgress,
types.MirrorRepoSyncFatal: types.SyncStatusFailed,
types.MirrorLfsSyncStart: types.SyncStatusInProgress,
types.MirrorLfsSyncFailed: types.SyncStatusFailed,
types.MirrorLfsSyncFinished: types.SyncStatusCompleted,
types.MirrorLfsSyncFatal: types.SyncStatusFailed,
types.MirrorLfsIncomplete: types.SyncStatusFailed,
types.MirrorCanceled: types.SyncStatusCanceled,
types.MirrorRepoTooLarge: types.SyncStatusFailed,
}

func NewMirrorTaskStore() MirrorTaskStore {
return &mirrorTaskStoreImpl{
db: defaultDB,
Expand Down Expand Up @@ -196,6 +213,15 @@ func (m *mirrorTaskStoreImpl) Update(ctx context.Context, task MirrorTask) (Mirr
return task, errorx.HandleDBError(err, nil)
}

func (m *mirrorTaskStoreImpl) UpdateProgress(ctx context.Context, task MirrorTask) (MirrorTask, error) {
_, err := m.db.Operator.Core.NewUpdate().
Model(&task).
Column("progress", "error_message").
WherePK().
Exec(ctx)
return task, errorx.HandleDBError(err, nil)
}

func (m *mirrorTaskStoreImpl) FindByMirrorID(ctx context.Context, mirrorID int64) (*MirrorTask, error) {
var task MirrorTask
err := m.db.Operator.Core.NewSelect().Model(&task).Where("mirror_id = ?", mirrorID).Scan(ctx)
Expand Down Expand Up @@ -277,7 +303,7 @@ func (m *mirrorTaskStoreImpl) ListByStatusWithPriority(ctx context.Context, stat
Relation("Mirror").
Relation("Mirror.Repository").
Where("mirror_task.status IN (?)", bun.In(status)).
OrderExpr("mirror_task.priority DESC, mirror_task.created_at DESC").
OrderExpr("mirror_task.priority DESC, mirror_task.updated_at DESC").
Limit(per).
Offset((page - 1) * per).
Scan(ctx)
Expand Down Expand Up @@ -306,6 +332,33 @@ func (m *mirrorTaskStoreImpl) CancelOtherTasksAndCreate(ctx context.Context, tas
return task, errorx.HandleDBError(err, nil)
}

func (m *mirrorTaskStoreImpl) CancelMirrorTaskByID(ctx context.Context, ID int64) (bool, error) {
var cancelled bool
err := m.db.Operator.Core.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
var task MirrorTask
err := tx.NewSelect().
Model(&task).
Where("id = ?", ID).
For("UPDATE").
Scan(ctx)
if err != nil {
return err
}

tFSM := NewMirrorTaskWithFSM(&task)
if tFSM.SubmitEvent(ctx, MirrorCancel) {
task.Status = types.MirrorTaskStatus(tFSM.Current())
_, err = tx.NewUpdate().Model(&task).WherePK().Exec(ctx)
if err != nil {
return err
}
cancelled = true
}
return nil
})
return cancelled, errorx.HandleDBError(err, nil)
}

func (m *mirrorTaskStoreImpl) ResetRunningTasks(ctx context.Context, fromStatus types.MirrorTaskStatus, toStatus types.MirrorTaskStatus) (int, error) {
var task MirrorTask
result, err := m.db.Operator.Core.NewUpdate().
Expand All @@ -323,10 +376,32 @@ func (m *mirrorTaskStoreImpl) ResetRunningTasks(ctx context.Context, fromStatus
return int(rowsAffected), nil
}

func (m *mirrorTaskStoreImpl) UpdateStatusAndRepoSyncStatus(ctx context.Context, task MirrorTask, syncStatus types.RepositorySyncStatus) (MirrorTask, error) {
func (m *mirrorTaskStoreImpl) UpdateStatusAndRepoSyncStatus(ctx context.Context, task MirrorTask, statusAction string) (MirrorTask, error) {
err := m.db.Operator.Core.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
_, err := tx.NewUpdate().
// Lock the mirror_task row to prevent concurrent status updates
var current MirrorTask
err := tx.NewSelect().
Model(&current).
Where("id = ?", task.ID).
For("UPDATE").
Scan(ctx)
if err != nil {
return err
}

// Validate FSM transition on the locked row to prevent TOCTOU race
tFSM := NewMirrorTaskWithFSM(&current)
if !tFSM.SubmitEvent(ctx, statusAction) {
return fmt.Errorf("mirror task status %s not allow action %s", current.Status, statusAction)
}
task.Status = types.MirrorTaskStatus(tFSM.Current())
syncStatus := mirrorTaskStatusToRepoStatusMap[task.Status]

// Only update status and related fields, avoid overwriting fields changed
// by other processes (e.g. priority)
_, err = tx.NewUpdate().
Model(&task).
Column("status", "error_message", "progress", "updated_at", "retry_count", "before_last_commit_id", "after_last_commit_id").
WherePK().
Exec(ctx)
if err != nil {
Expand Down
Loading
Loading