From 4017242a5f56f2bc8f24f04812f0c29d0d48ebdf Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Fri, 6 Jan 2023 15:49:45 +0800 Subject: [PATCH 1/3] #2417 add creation lock --- modules/redis/redis_key/cloudbrain_redis_key.go | 6 ++++ routers/repo/aisafety.go | 16 ++++++++++ routers/repo/cloudbrain.go | 35 ++++++++++++++-------- routers/repo/grampus.go | 20 ++++++++----- services/cloudbrain/lock.go | 13 ++++++++ services/lock/cloudbrain_name_lock.go | 31 +++++++++++++++++++ services/lock/cloudbrain_uniqueness_lock.go | 31 +++++++++++++++++++ services/lock/lock.go | 18 +++++++++++ services/lock/lock_operator.go | 40 +++++++++++++++++++++++++ 9 files changed, 190 insertions(+), 20 deletions(-) create mode 100644 services/cloudbrain/lock.go create mode 100644 services/lock/cloudbrain_name_lock.go create mode 100644 services/lock/cloudbrain_uniqueness_lock.go create mode 100644 services/lock/lock.go create mode 100644 services/lock/lock_operator.go diff --git a/modules/redis/redis_key/cloudbrain_redis_key.go b/modules/redis/redis_key/cloudbrain_redis_key.go index 4c5d05dfc..a442d89f4 100644 --- a/modules/redis/redis_key/cloudbrain_redis_key.go +++ b/modules/redis/redis_key/cloudbrain_redis_key.go @@ -1,7 +1,13 @@ package redis_key +import "fmt" + const CLOUDBRAIN_PREFIX = "cloudbrain" func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string { return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key") } + +func CloudbrainUniquenessKey(userId int64, jobType string) string { + return KeyJoin(CLOUDBRAIN_PREFIX, fmt.Sprint(userId), jobType, "uniqueness") +} diff --git a/routers/repo/aisafety.go b/routers/repo/aisafety.go index 55f25dba6..64a87af46 100644 --- a/routers/repo/aisafety.go +++ b/routers/repo/aisafety.go @@ -2,6 +2,7 @@ package repo import ( "bufio" + "code.gitea.io/gitea/services/lock" "encoding/json" "errors" "fmt" @@ -575,6 +576,21 @@ func AiSafetyCreateForPost(ctx *context.Context) { tpname = tplCloudBrainModelSafetyNewGpu } + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + modelSafetyNewDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr(errMsg), tpname, nil) + return + } + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeModelSafety), displayJobName) if err == nil { if len(tasks) != 0 { diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index cf6df6312..34b96313d 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -2,6 +2,7 @@ package repo import ( "bufio" + "code.gitea.io/gitea/services/lock" "encoding/json" "errors" "fmt" @@ -228,16 +229,20 @@ func cloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if jobType == string(models.JobTypeTrain) { tpl = tplCloudBrainTrainJobNew } + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx, jobType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } - defer lock.UnLock() tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { @@ -746,7 +751,6 @@ func CloudBrainRestart(ctx *context.Context) { }) } - func hasDatasetDeleted(task *models.Cloudbrain) bool { if task.Uuid == "" { return false @@ -2389,15 +2393,20 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo ctx.Data["benchmarkTypeID"] = benchmarkTypeID ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), form.JobType, displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx, jobType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplCloudBrainBenchmarkNew, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tplCloudBrainBenchmarkNew, &form) return } - defer lock.UnLock() tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index b3f84c169..c9c2e7403 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -1,6 +1,7 @@ package repo import ( + "code.gitea.io/gitea/services/lock" "encoding/json" "errors" "fmt" @@ -119,13 +120,18 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook codeStoragePath = grampus.JobPath + jobName + modelarts.CodePath } - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) - defer lock.UnLock() - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + defer func() { + if lockOperator != nil { + lockOperator.Unlock(limiterCtx) + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } @@ -1308,7 +1314,7 @@ func GrampusTrainJobShow(ctx *context.Context) { taskList := make([]*models.Cloudbrain, 0) taskList = append(taskList, task) prepareSpec4Show(ctx, task) - + ctx.Data["version_list_task"] = taskList ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) diff --git a/services/cloudbrain/lock.go b/services/cloudbrain/lock.go new file mode 100644 index 000000000..a8c05152d --- /dev/null +++ b/services/cloudbrain/lock.go @@ -0,0 +1,13 @@ +package cloudbrain + +import "code.gitea.io/gitea/services/lock" + +var defaultChain = lock.NewLockChainOperator().Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{}) + +func Lock4CloudbrainCreation(ctx *lock.LockContext) (*lock.LockChainOperator, string) { + errCode := defaultChain.Lock(ctx) + if errCode != "" { + return nil, errCode + } + return defaultChain, "" +} diff --git a/services/lock/cloudbrain_name_lock.go b/services/lock/cloudbrain_name_lock.go new file mode 100644 index 000000000..8ce22b2da --- /dev/null +++ b/services/lock/cloudbrain_name_lock.go @@ -0,0 +1,31 @@ +package lock + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "fmt" +) + +type CloudbrainDisplayJobNameLock struct { +} + +func (c CloudbrainDisplayJobNameLock) IsMatch(ctx *LockContext) bool { + return true +} + +func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) + return "repo.cloudbrain_samejob_err" + } + return "" +} + +func (c CloudbrainDisplayJobNameLock) Unlock(ctx *LockContext) error { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) + return lock.UnLock() +} diff --git a/services/lock/cloudbrain_uniqueness_lock.go b/services/lock/cloudbrain_uniqueness_lock.go new file mode 100644 index 000000000..d495532bf --- /dev/null +++ b/services/lock/cloudbrain_uniqueness_lock.go @@ -0,0 +1,31 @@ +package lock + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "time" +) + +type CloudbrainUniquenessLock struct { +} + +func (c CloudbrainUniquenessLock) IsMatch(ctx *LockContext) bool { + return true +} + +func (c CloudbrainUniquenessLock) Lock(ctx *LockContext) string { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) + isOk, err := lock.Lock(5 * time.Minute) + if !isOk { + log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) + return "you have already a running or waiting task, can not create more" + } + return "" +} + +func (c CloudbrainUniquenessLock) Unlock(ctx *LockContext) error { + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) + return lock.UnLock() +} diff --git a/services/lock/lock.go b/services/lock/lock.go new file mode 100644 index 000000000..b5eae887b --- /dev/null +++ b/services/lock/lock.go @@ -0,0 +1,18 @@ +package lock + +import ( + "code.gitea.io/gitea/models" +) + +type LockContext struct { + Repo *models.Repository + DisplayJobName string + User *models.User + LockedList []Lock +} + +type Lock interface { + IsMatch(ctx *LockContext) bool + Lock(ctx *LockContext) string + Unlock(ctx *LockContext) error +} diff --git a/services/lock/lock_operator.go b/services/lock/lock_operator.go new file mode 100644 index 000000000..91ddfa908 --- /dev/null +++ b/services/lock/lock_operator.go @@ -0,0 +1,40 @@ +package lock + +type LockChainOperator struct { + ChainList []Lock +} + +func NewLockChainOperator() *LockChainOperator { + return &LockChainOperator{} +} + +func (b *LockChainOperator) Add(l Lock) *LockChainOperator { + b.ChainList = append(b.ChainList, l) + return b +} + +func (b *LockChainOperator) Lock(ctx *LockContext) string { + for i := 0; i < len(b.ChainList); i++ { + l := b.ChainList[i] + if !l.IsMatch(ctx) { + continue + } + + if errCode := l.Lock(ctx); errCode != "" { + b.Unlock(ctx) + return errCode + } + ctx.LockedList = append(ctx.LockedList, l) + } + return "" +} + +func (b *LockChainOperator) Unlock(ctx *LockContext) error { + if b.ChainList == nil || len(b.ChainList) == 0 { + return nil + } + for j := len(ctx.LockedList) - 1; j >= 0; j-- { + ctx.LockedList[j].Unlock(ctx) + } + return nil +} From f7c85b2b9262b3e0fddb6462cc30374eb522e89c Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Wed, 11 Jan 2023 17:49:03 +0800 Subject: [PATCH 2/3] #2417 fix concurrency problem of all type cloudbrain tasks --- models/cloudbrain.go | 5 +- modules/setting/setting.go | 4 ++ routers/repo/aisafety.go | 5 +- routers/repo/cloudbrain.go | 65 +++++++++++++-------- routers/repo/grampus.go | 62 +++++++++++++------- routers/repo/modelarts.go | 88 +++++++++++++++++++---------- services/cloudbrain/cloudbrainTask/count.go | 4 +- services/cloudbrain/lock.go | 15 +++-- services/lock/cloudbrain_name_lock.go | 4 +- services/lock/cloudbrain_uniqueness_lock.go | 11 ++-- services/lock/lock.go | 7 +-- services/lock/lock_operator.go | 32 ++++++----- 12 files changed, 190 insertions(+), 112 deletions(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index e8fcb75b5..9d1fc3b6c 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -2308,11 +2308,10 @@ func GetWaitingCloudbrainCount(cloudbrainType int, computeResource string, jobTy } return sess.Count(new(Cloudbrain)) } -func GetNotFinalStatusTaskCount(userID int64, notFinalStatus []string, jobTypes []JobType, cloudbrainTypes []int, computeResource string) (int, error) { +func GetNotFinalStatusTaskCount(userID int64, notFinalStatus []string, jobTypes []JobType) (int, error) { count, err := x.In("status", notFinalStatus). In("job_type", jobTypes). - In("type", cloudbrainTypes). - And("user_id = ? and compute_resource = ?", userID, computeResource).Count(new(Cloudbrain)) + And("user_id = ? ", userID).Count(new(Cloudbrain)) return int(count), err } diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 09e7259f2..44dff2779 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -690,6 +690,8 @@ var ( IncubationSourceOrgName string PaperRepoTopicName string + CloudbrainUniquenessLockTime time.Duration + //nginx proxy PROXYURL string RadarMap = struct { @@ -1506,6 +1508,8 @@ func NewContext() { CullInterval = sec.Key("CULL_INTERVAL").MustString("60") DebugAttachSize = sec.Key("DEBUG_ATTACH_SIZE").MustInt(20) + CloudbrainUniquenessLockTime = sec.Key("UNIQUENESS_LOCK_TIME").MustDuration(5 * time.Minute) + sec = Cfg.Section("benchmark") IsBenchmarkEnabled = sec.Key("ENABLED").MustBool(false) BenchmarkOwner = sec.Key("OWNER").MustString("") diff --git a/routers/repo/aisafety.go b/routers/repo/aisafety.go index 64a87af46..27e123fac 100644 --- a/routers/repo/aisafety.go +++ b/routers/repo/aisafety.go @@ -576,11 +576,10 @@ func AiSafetyCreateForPost(ctx *context.Context) { tpname = tplCloudBrainModelSafetyNewGpu } - limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} - lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeInference)}, User: ctx.User}) defer func() { if lockOperator != nil { - lockOperator.Unlock(limiterCtx) + lockOperator.Unlock() } }() diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 34b96313d..ad03c583e 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -41,8 +41,6 @@ import ( "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" - "code.gitea.io/gitea/modules/redis/redis_key" - "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/util" @@ -229,11 +227,10 @@ func cloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if jobType == string(models.JobTypeTrain) { tpl = tplCloudBrainTrainJobNew } - limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} - lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: form.JobType}, User: ctx.User}) defer func() { if lockOperator != nil { - lockOperator.Unlock(limiterCtx) + lockOperator.Unlock() } }() @@ -467,15 +464,19 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra repo := ctx.Repo.Repository tpl := tplCloudBrainInferenceJobNew - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) - cloudBrainNewDataPrepare(ctx, jobType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: jobType}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } - defer lock.UnLock() ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName log.Info("ckpt url:" + ckptUrl) @@ -650,7 +651,24 @@ func CloudBrainRestart(ctx *context.Context) { var errorMsg = "" var status = string(models.JobWaiting) task := ctx.Cloudbrain + + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + resultCode = "-1" + errorMsg = ctx.Tr(errMsg) + } for { + if errorMsg != "" { + break + } + if task.Status != string(models.JobStopped) && task.Status != string(models.JobSucceeded) && task.Status != string(models.JobFailed) { log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) resultCode = "-1" @@ -2393,11 +2411,10 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo ctx.Data["benchmarkTypeID"] = benchmarkTypeID ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID - limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} - lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: form.JobType}, User: ctx.User}) defer func() { if lockOperator != nil { - lockOperator.Unlock(limiterCtx) + lockOperator.Unlock() } }() @@ -2587,15 +2604,19 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm) tpl := tplCloudBrainBenchmarkNew command := cloudbrain.GetCloudbrainDebugCommand() - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) - cloudBrainNewDataPrepare(ctx, jobType) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: jobType}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } - defer lock.UnLock() tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index f82f332e9..6a0348f3c 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -28,8 +28,6 @@ import ( "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/notification" - "code.gitea.io/gitea/modules/redis/redis_key" - "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "github.com/unknwon/com" @@ -132,11 +130,10 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook codeStoragePath = setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" } - limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} - lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeDebug)}, User: ctx.User}) defer func() { if lockOperator != nil { - lockOperator.Unlock(limiterCtx) + lockOperator.Unlock() } }() @@ -498,19 +495,23 @@ func grampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain image := strings.TrimSpace(form.Image) tpl := tplGrampusTrainJobGPUNew - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobGPUNew, &form) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) return } - defer lock.UnLock() - if !jobNamePattern.MatchString(displayJobName) { + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } @@ -746,19 +747,23 @@ func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain engineName := form.EngineName tpl := tplGrampusTrainJobNPUNew - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobNPUNew, &form) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) return } - defer lock.UnLock() - if !jobNamePattern.MatchString(displayJobName) { + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) return } @@ -1641,7 +1646,22 @@ func GrampusNotebookRestart(ctx *context.Context) { return } + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + errorMsg = ctx.Tr(errMsg) + } + for { + if errorMsg != "" { + break + } if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed { log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) diff --git a/routers/repo/modelarts.go b/routers/repo/modelarts.go index 84fcecb52..5a773ef87 100755 --- a/routers/repo/modelarts.go +++ b/routers/repo/modelarts.go @@ -2,6 +2,7 @@ package repo import ( "archive/zip" + "code.gitea.io/gitea/services/lock" "encoding/json" "errors" "fmt" @@ -35,8 +36,6 @@ import ( "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/obs" - "code.gitea.io/gitea/modules/redis/redis_key" - "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/timeutil" @@ -176,15 +175,19 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm imageId := form.ImageId repo := ctx.Repo.Repository - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeDebug)}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) notebookNewDataPrepare(ctx) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsNotebookNew, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsNotebookNew, &form) return } - defer lock.UnLock() count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeDebug)) @@ -494,8 +497,21 @@ func NotebookRestart(ctx *context.Context) { var spec *models.Specification task := ctx.Cloudbrain + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + errorMsg = ctx.Tr(errMsg) + } for { + if errMsg != "" { + break + } ctx.CheckWechatBind() if ctx.Written() { return @@ -1083,15 +1099,19 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) return } - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) trainJobNewDataPrepare(ctx) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobNew, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsTrainJobNew, &form) return } - defer lock.UnLock() count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeTrain)) @@ -1461,6 +1481,20 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ ctx.Data["PageIsTrainJob"] = true var jobID = ctx.Params(":jobid") + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: form.DisplayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) + trainJobNewVersionDataPrepare(ctx) + ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsTrainJobVersionNew, &form) + return + } + errStr := checkMultiNode(ctx.User.ID, form.WorkServerNumber) if errStr != "" { trainJobNewVersionDataPrepare(ctx) @@ -1512,16 +1546,6 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ EngineName := form.EngineName isLatestVersion := modelarts.IsLatestVersion - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) - trainJobNewVersionDataPrepare(ctx) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobVersionNew, &form) - return - } - defer lock.UnLock() - canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID) if !canNewJob { trainJobNewVersionDataPrepare(ctx) @@ -2109,15 +2133,19 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference return } - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)) - isOk, err := lock.Lock(models.CloudbrainKeyDuration) - if !isOk { - log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeInference)}, User: ctx.User}) + defer func() { + if lockOperator != nil { + lockOperator.Unlock() + } + }() + + if errMsg != "" { + log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) inferenceJobErrorNewDataPrepare(ctx, form) - ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsInferenceJobNew, &form) + ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsInferenceJobNew, &form) return } - defer lock.UnLock() count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeInference)) diff --git a/services/cloudbrain/cloudbrainTask/count.go b/services/cloudbrain/cloudbrainTask/count.go index 0010164ae..186d52fb9 100644 --- a/services/cloudbrain/cloudbrainTask/count.go +++ b/services/cloudbrain/cloudbrainTask/count.go @@ -16,7 +16,7 @@ type StatusInfo struct { var CloudbrainOneNotFinalStatuses = []string{string(models.JobWaiting), string(models.JobRunning)} var CloudbrainTwoNotFinalStatuses = []string{string(models.ModelArtsTrainJobInit), string(models.ModelArtsTrainJobImageCreating), string(models.ModelArtsTrainJobSubmitTrying), string(models.ModelArtsTrainJobWaiting), string(models.ModelArtsTrainJobRunning), string(models.ModelArtsTrainJobScaling), string(models.ModelArtsTrainJobCheckInit), string(models.ModelArtsTrainJobCheckRunning), string(models.ModelArtsTrainJobCheckRunningCompleted)} -var GrampusNotFinalStatuses = []string{models.GrampusStatusWaiting, models.GrampusStatusRunning} +var GrampusNotFinalStatuses = []string{models.GrampusStatusWaiting, models.GrampusStatusRunning, models.GrampusStatusPending} var StatusInfoDict = map[string]StatusInfo{string(models.JobTypeDebug) + "-" + strconv.Itoa(models.TypeCloudBrainOne): { CloudBrainTypes: []int{models.TypeCloudBrainOne}, JobType: []models.JobType{models.JobTypeDebug}, @@ -92,7 +92,7 @@ func GetNotFinalStatusTaskCount(uid int64, cloudbrainType int, jobType string, c if statusInfo, ok := StatusInfoDict[key]; ok { - return models.GetNotFinalStatusTaskCount(uid, statusInfo.NotFinalStatuses, statusInfo.JobType, statusInfo.CloudBrainTypes, statusInfo.ComputeResource) + return models.GetNotFinalStatusTaskCount(uid, statusInfo.NotFinalStatuses, statusInfo.JobType) } else { return 0, fmt.Errorf("Can not find the status info.") diff --git a/services/cloudbrain/lock.go b/services/cloudbrain/lock.go index a8c05152d..c9d6080f1 100644 --- a/services/cloudbrain/lock.go +++ b/services/cloudbrain/lock.go @@ -2,12 +2,19 @@ package cloudbrain import "code.gitea.io/gitea/services/lock" -var defaultChain = lock.NewLockChainOperator().Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{}) - func Lock4CloudbrainCreation(ctx *lock.LockContext) (*lock.LockChainOperator, string) { - errCode := defaultChain.Lock(ctx) + op := lock.NewLockChainOperator(ctx).Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{}) + errCode := op.Lock() + if errCode != "" { + return nil, errCode + } + return op, "" +} +func Lock4CloudbrainRestart(ctx *lock.LockContext) (*lock.LockChainOperator, string) { + op := lock.NewLockChainOperator(ctx).Add(lock.CloudbrainUniquenessLock{}) + errCode := op.Lock() if errCode != "" { return nil, errCode } - return defaultChain, "" + return op, "" } diff --git a/services/lock/cloudbrain_name_lock.go b/services/lock/cloudbrain_name_lock.go index 8ce22b2da..bed388e01 100644 --- a/services/lock/cloudbrain_name_lock.go +++ b/services/lock/cloudbrain_name_lock.go @@ -16,7 +16,7 @@ func (c CloudbrainDisplayJobNameLock) IsMatch(ctx *LockContext) bool { } func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string { - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), ctx.Task.JobType, ctx.Task.DisplayJobName)) isOk, err := lock.Lock(models.CloudbrainKeyDuration) if !isOk { log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) @@ -26,6 +26,6 @@ func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string { } func (c CloudbrainDisplayJobNameLock) Unlock(ctx *LockContext) error { - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), ctx.Task.JobType, ctx.Task.DisplayJobName)) return lock.UnLock() } diff --git a/services/lock/cloudbrain_uniqueness_lock.go b/services/lock/cloudbrain_uniqueness_lock.go index d495532bf..8297e8b03 100644 --- a/services/lock/cloudbrain_uniqueness_lock.go +++ b/services/lock/cloudbrain_uniqueness_lock.go @@ -1,11 +1,10 @@ package lock import ( - "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" - "time" + "code.gitea.io/gitea/modules/setting" ) type CloudbrainUniquenessLock struct { @@ -16,16 +15,16 @@ func (c CloudbrainUniquenessLock) IsMatch(ctx *LockContext) bool { } func (c CloudbrainUniquenessLock) Lock(ctx *LockContext) string { - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) - isOk, err := lock.Lock(5 * time.Minute) + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, ctx.Task.JobType)) + isOk, err := lock.Lock(setting.CloudbrainUniquenessLockTime) if !isOk { log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) - return "you have already a running or waiting task, can not create more" + return "repo.cloudbrain.morethanonejob" } return "" } func (c CloudbrainUniquenessLock) Unlock(ctx *LockContext) error { - lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, ctx.Task.JobType)) return lock.UnLock() } diff --git a/services/lock/lock.go b/services/lock/lock.go index b5eae887b..308cebd2a 100644 --- a/services/lock/lock.go +++ b/services/lock/lock.go @@ -5,10 +5,9 @@ import ( ) type LockContext struct { - Repo *models.Repository - DisplayJobName string - User *models.User - LockedList []Lock + Repo *models.Repository + Task *models.Cloudbrain + User *models.User } type Lock interface { diff --git a/services/lock/lock_operator.go b/services/lock/lock_operator.go index 91ddfa908..cd2002f1c 100644 --- a/services/lock/lock_operator.go +++ b/services/lock/lock_operator.go @@ -1,40 +1,42 @@ package lock type LockChainOperator struct { - ChainList []Lock + chainList []Lock + lockedList []Lock + ctx *LockContext } -func NewLockChainOperator() *LockChainOperator { - return &LockChainOperator{} +func NewLockChainOperator(ctx *LockContext) *LockChainOperator { + return &LockChainOperator{ctx: ctx} } func (b *LockChainOperator) Add(l Lock) *LockChainOperator { - b.ChainList = append(b.ChainList, l) + b.chainList = append(b.chainList, l) return b } -func (b *LockChainOperator) Lock(ctx *LockContext) string { - for i := 0; i < len(b.ChainList); i++ { - l := b.ChainList[i] - if !l.IsMatch(ctx) { +func (b *LockChainOperator) Lock() string { + for i := 0; i < len(b.chainList); i++ { + l := b.chainList[i] + if !l.IsMatch(b.ctx) { continue } - if errCode := l.Lock(ctx); errCode != "" { - b.Unlock(ctx) + if errCode := l.Lock(b.ctx); errCode != "" { + b.Unlock() return errCode } - ctx.LockedList = append(ctx.LockedList, l) + b.lockedList = append(b.lockedList, l) } return "" } -func (b *LockChainOperator) Unlock(ctx *LockContext) error { - if b.ChainList == nil || len(b.ChainList) == 0 { +func (b *LockChainOperator) Unlock() error { + if b.chainList == nil || len(b.chainList) == 0 { return nil } - for j := len(ctx.LockedList) - 1; j >= 0; j-- { - ctx.LockedList[j].Unlock(ctx) + for j := len(b.lockedList) - 1; j >= 0; j-- { + b.lockedList[j].Unlock(b.ctx) } return nil } From 2e3824540e7a3e56f97083c4d2137b53f4376ff3 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Wed, 11 Jan 2023 17:59:53 +0800 Subject: [PATCH 3/3] #2417 merge --- models/cloudbrain.go | 1 - 1 file changed, 1 deletion(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index cebb8df61..f4a2a9e08 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -2664,7 +2664,6 @@ type DatasetInfo struct { FullName string Type int Size int - Type int } func GetDatasetInfo(uuidStr string, grampusType ...string) (map[string]DatasetInfo, string, error) {