diff --git a/modules/redis/redis_key/cloudbrain_redis_key.go b/modules/redis/redis_key/cloudbrain_redis_key.go index 4c5d05dfc..a442d89f4 100644 --- a/modules/redis/redis_key/cloudbrain_redis_key.go +++ b/modules/redis/redis_key/cloudbrain_redis_key.go @@ -1,7 +1,13 @@ package redis_key +import "fmt" + const CLOUDBRAIN_PREFIX = "cloudbrain" func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string { return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key") } + +func CloudbrainUniquenessKey(userId int64, jobType string) string { + return KeyJoin(CLOUDBRAIN_PREFIX, fmt.Sprint(userId), jobType, "uniqueness") +} diff --git a/routers/repo/aisafety.go b/routers/repo/aisafety.go index 55f25dba6..64a87af46 100644 --- a/routers/repo/aisafety.go +++ b/routers/repo/aisafety.go @@ -2,6 +2,7 @@ package repo import ( "bufio" + "code.gitea.io/gitea/services/lock" "encoding/json" "errors" "fmt" @@ -575,6 +576,21 @@ func AiSafetyCreateForPost(ctx *context.Context) { tpname = tplCloudBrainModelSafetyNewGpu } + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + modelSafetyNewDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr(errMsg), tpname, nil) + return + } + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeModelSafety), displayJobName) if err == nil { if len(tasks) != 0 { diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index cf6df6312..34b96313d 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -2,6 +2,7 @@ package repo import ( "bufio" + "code.gitea.io/gitea/services/lock" "encoding/json" "errors" "fmt" @@ -228,16 +229,20 @@ func cloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if jobType == string(models.JobTypeTrain) { tpl = tplCloudBrainTrainJobNew } + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx, jobType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } - defer lock.UnLock() tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { @@ -746,7 +751,6 @@ func CloudBrainRestart(ctx *context.Context) { }) } - func hasDatasetDeleted(task *models.Cloudbrain) bool { if task.Uuid == "" { return false @@ -2389,15 +2393,20 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo ctx.Data["benchmarkTypeID"] = benchmarkTypeID ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), form.JobType, displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx, jobType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplCloudBrainBenchmarkNew, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tplCloudBrainBenchmarkNew, &form) return } - defer lock.UnLock() tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index b3f84c169..c9c2e7403 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -1,6 +1,7 @@ package repo import ( + "code.gitea.io/gitea/services/lock" "encoding/json" "errors" "fmt" @@ -119,13 +120,18 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook codeStoragePath = grampus.JobPath + jobName + modelarts.CodePath } - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) - defer lock.UnLock() - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } @@ -1308,7 +1314,7 @@ func GrampusTrainJobShow(ctx *context.Context) { taskList := make([]*models.Cloudbrain, 0) taskList = append(taskList, task) prepareSpec4Show(ctx, task) - + ctx.Data["version_list_task"] = taskList ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) diff --git a/services/cloudbrain/lock.go b/services/cloudbrain/lock.go new file mode 100644 index 000000000..a8c05152d --- /dev/null +++ b/services/cloudbrain/lock.go @@ -0,0 +1,13 @@ +package cloudbrain + +import "code.gitea.io/gitea/services/lock" + +var defaultChain = lock.NewLockChainOperator().Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{}) + +func Lock4CloudbrainCreation(ctx *lock.LockContext) (*lock.LockChainOperator, string) { + errCode := defaultChain.Lock(ctx) + if errCode != "" { + return nil, errCode + } + return defaultChain, "" +} diff --git a/services/lock/cloudbrain_name_lock.go b/services/lock/cloudbrain_name_lock.go new file mode 100644 index 000000000..8ce22b2da --- /dev/null +++ b/services/lock/cloudbrain_name_lock.go @@ -0,0 +1,31 @@ +package lock + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "fmt" +) + +type CloudbrainDisplayJobNameLock struct { +} + +func (c CloudbrainDisplayJobNameLock) IsMatch(ctx *LockContext) bool { + return true +} + +func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) + return "repo.cloudbrain_samejob_err" + } + return "" +} + +func (c CloudbrainDisplayJobNameLock) Unlock(ctx *LockContext) error { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) + return lock.UnLock() +} diff --git a/services/lock/cloudbrain_uniqueness_lock.go b/services/lock/cloudbrain_uniqueness_lock.go new file mode 100644 index 000000000..d495532bf --- /dev/null +++ b/services/lock/cloudbrain_uniqueness_lock.go @@ -0,0 +1,31 @@ +package lock + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "time" +) + +type CloudbrainUniquenessLock struct { +} + +func (c CloudbrainUniquenessLock) IsMatch(ctx *LockContext) bool { + return true +} + +func (c CloudbrainUniquenessLock) Lock(ctx *LockContext) string { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) + isOk, err := lock.Lock(5 * time.Minute) + if !isOk { + log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) + return "you have already a running or waiting task, can not create more" + } + return "" +} + +func (c CloudbrainUniquenessLock) Unlock(ctx *LockContext) error { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) + return lock.UnLock() +} diff --git a/services/lock/lock.go b/services/lock/lock.go new file mode 100644 index 000000000..b5eae887b --- /dev/null +++ b/services/lock/lock.go @@ -0,0 +1,18 @@ +package lock + +import ( + "code.gitea.io/gitea/models" +) + +type LockContext struct { + Repo *models.Repository + DisplayJobName string + User *models.User + LockedList []Lock +} + +type Lock interface { + IsMatch(ctx *LockContext) bool + Lock(ctx *LockContext) string + Unlock(ctx *LockContext) error +} diff --git a/services/lock/lock_operator.go b/services/lock/lock_operator.go new file mode 100644 index 000000000..91ddfa908 --- /dev/null +++ b/services/lock/lock_operator.go @@ -0,0 +1,40 @@ +package lock + +type LockChainOperator struct { + ChainList []Lock +} + +func NewLockChainOperator() *LockChainOperator { + return &LockChainOperator{} +} + +func (b *LockChainOperator) Add(l Lock) *LockChainOperator { + b.ChainList = append(b.ChainList, l) + return b +} + +func (b *LockChainOperator) Lock(ctx *LockContext) string { + for i := 0; i < len(b.ChainList); i++ { + l := b.ChainList[i] + if !l.IsMatch(ctx) { + continue + } + + if errCode := l.Lock(ctx); errCode != "" { + b.Unlock(ctx) + return errCode + } + ctx.LockedList = append(ctx.LockedList, l) + } + return "" +} + +func (b *LockChainOperator) Unlock(ctx *LockContext) error { + if b.ChainList == nil || len(b.ChainList) == 0 { + return nil + } + for j := len(ctx.LockedList) - 1; j >= 0; j-- { + ctx.LockedList[j].Unlock(ctx) + } + return nil +}