Browse Source

Merge branch 'V20230215' into zouap

pull/3589/head
zouap 2 years ago
parent
commit
12a7bebe80
16 changed files with 376 additions and 107 deletions
  1. +2
    -3
      models/cloudbrain.go
  2. +6
    -0
      modules/redis/redis_key/cloudbrain_redis_key.go
  3. +4
    -0
      modules/setting/setting.go
  4. +1
    -0
      routers/api/v1/api.go
  5. +22
    -0
      routers/api/v1/repo/datasets.go
  6. +15
    -0
      routers/repo/aisafety.go
  7. +64
    -36
      routers/repo/cloudbrain.go
  8. +50
    -24
      routers/repo/grampus.go
  9. +58
    -30
      routers/repo/modelarts.go
  10. +12
    -12
      routers/routes/routes.go
  11. +2
    -2
      services/cloudbrain/cloudbrainTask/count.go
  12. +20
    -0
      services/cloudbrain/lock.go
  13. +31
    -0
      services/lock/cloudbrain_name_lock.go
  14. +30
    -0
      services/lock/cloudbrain_uniqueness_lock.go
  15. +17
    -0
      services/lock/lock.go
  16. +42
    -0
      services/lock/lock_operator.go

+ 2
- 3
models/cloudbrain.go View File

@@ -2315,11 +2315,10 @@ func GetWaitingCloudbrainCount(cloudbrainType int, computeResource string, jobTy
} }
return sess.Count(new(Cloudbrain)) return sess.Count(new(Cloudbrain))
} }
func GetNotFinalStatusTaskCount(userID int64, notFinalStatus []string, jobTypes []JobType, cloudbrainTypes []int, computeResource string) (int, error) {
func GetNotFinalStatusTaskCount(userID int64, notFinalStatus []string, jobTypes []JobType) (int, error) {
count, err := x.In("status", notFinalStatus). count, err := x.In("status", notFinalStatus).
In("job_type", jobTypes). In("job_type", jobTypes).
In("type", cloudbrainTypes).
And("user_id = ? and compute_resource = ?", userID, computeResource).Count(new(Cloudbrain))
And("user_id = ? ", userID).Count(new(Cloudbrain))
return int(count), err return int(count), err
} }




+ 6
- 0
modules/redis/redis_key/cloudbrain_redis_key.go View File

@@ -1,7 +1,13 @@
package redis_key package redis_key


import "fmt"

const CLOUDBRAIN_PREFIX = "cloudbrain" const CLOUDBRAIN_PREFIX = "cloudbrain"


func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string { func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string {
return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key") return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key")
} }

func CloudbrainUniquenessKey(userId int64, jobType string) string {
return KeyJoin(CLOUDBRAIN_PREFIX, fmt.Sprint(userId), jobType, "uniqueness")
}

+ 4
- 0
modules/setting/setting.go View File

@@ -690,6 +690,8 @@ var (
IncubationSourceOrgName string IncubationSourceOrgName string
PaperRepoTopicName string PaperRepoTopicName string


CloudbrainUniquenessLockTime time.Duration

//nginx proxy //nginx proxy
PROXYURL string PROXYURL string
RadarMap = struct { RadarMap = struct {
@@ -1506,6 +1508,8 @@ func NewContext() {
CullInterval = sec.Key("CULL_INTERVAL").MustString("60") CullInterval = sec.Key("CULL_INTERVAL").MustString("60")
DebugAttachSize = sec.Key("DEBUG_ATTACH_SIZE").MustInt(20) DebugAttachSize = sec.Key("DEBUG_ATTACH_SIZE").MustInt(20)


CloudbrainUniquenessLockTime = sec.Key("UNIQUENESS_LOCK_TIME").MustDuration(5 * time.Minute)

sec = Cfg.Section("benchmark") sec = Cfg.Section("benchmark")
IsBenchmarkEnabled = sec.Key("ENABLED").MustBool(false) IsBenchmarkEnabled = sec.Key("ENABLED").MustBool(false)
BenchmarkOwner = sec.Key("OWNER").MustString("") BenchmarkOwner = sec.Key("OWNER").MustString("")


+ 1
- 0
routers/api/v1/api.go View File

@@ -738,6 +738,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Combo("/repositories/:id", reqToken()).Get(repo.GetByID) m.Combo("/repositories/:id", reqToken()).Get(repo.GetByID)


m.Group("/datasets/:username/:reponame", func() { m.Group("/datasets/:username/:reponame", func() {
m.Get("", repo.CurrentRepoDatasetInfoWithoutAttachment)
m.Get("/current_repo", repo.CurrentRepoDatasetMultiple) m.Get("/current_repo", repo.CurrentRepoDatasetMultiple)
m.Get("/my_datasets", repo.MyDatasetsMultiple) m.Get("/my_datasets", repo.MyDatasetsMultiple)
m.Get("/public_datasets", repo.PublicDatasetMultiple) m.Get("/public_datasets", repo.PublicDatasetMultiple)


+ 22
- 0
routers/api/v1/repo/datasets.go View File

@@ -51,6 +51,28 @@ func CurrentRepoDatasetMultiple(ctx *context.APIContext) {


} }


func CurrentRepoDatasetInfoWithoutAttachment(ctx *context.APIContext) {
dataset, err := models.GetDatasetByRepo(ctx.Repo.Repository)

if err != nil {
log.Warn("can not get dataset.", err)
ctx.JSON(200, map[string]interface{}{
"code": 0,
"message": "",
"data": []*api.Dataset{},
})
return
}

dataset.Repo = ctx.Repo.Repository
ctx.JSON(200, map[string]interface{}{
"code": 0,
"message": "",
"data": []*api.Dataset{convert.ToDataset(dataset)},
})

}

func MyDatasetsMultiple(ctx *context.APIContext) { func MyDatasetsMultiple(ctx *context.APIContext) {


opts := &models.SearchDatasetOptions{ opts := &models.SearchDatasetOptions{


+ 15
- 0
routers/repo/aisafety.go View File

@@ -2,6 +2,7 @@ package repo


import ( import (
"bufio" "bufio"
"code.gitea.io/gitea/services/lock"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -575,6 +576,20 @@ func AiSafetyCreateForPost(ctx *context.Context) {
tpname = tplCloudBrainModelSafetyNewGpu tpname = tplCloudBrainModelSafetyNewGpu
} }


lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeInference)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
modelSafetyNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr(errMsg), tpname, nil)
return
}

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeModelSafety), displayJobName) tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeModelSafety), displayJobName)
if err == nil { if err == nil {
if len(tasks) != 0 { if len(tasks) != 0 {


+ 64
- 36
routers/repo/cloudbrain.go View File

@@ -2,6 +2,7 @@ package repo


import ( import (
"bufio" "bufio"
"code.gitea.io/gitea/services/lock"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -40,8 +41,6 @@ import (
"code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
@@ -228,16 +227,19 @@ func cloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if jobType == string(models.JobTypeTrain) { if jobType == string(models.JobTypeTrain) {
tpl = tplCloudBrainTrainJobNew tpl = tplCloudBrainTrainJobNew
} }
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: form.JobType}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx, jobType) cloudBrainNewDataPrepare(ctx, jobType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return return
} }
defer lock.UnLock()


tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil { if err == nil {
@@ -462,15 +464,19 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra
repo := ctx.Repo.Repository repo := ctx.Repo.Repository
tpl := tplCloudBrainInferenceJobNew tpl := tplCloudBrainInferenceJobNew


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx, jobType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: jobType}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return return
} }
defer lock.UnLock()


ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName
log.Info("ckpt url:" + ckptUrl) log.Info("ckpt url:" + ckptUrl)
@@ -647,7 +653,24 @@ func CloudBrainRestart(ctx *context.Context) {
var errorMsg = "" var errorMsg = ""
var status = string(models.JobWaiting) var status = string(models.JobWaiting)
task := ctx.Cloudbrain task := ctx.Cloudbrain

lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
resultCode = "-1"
errorMsg = ctx.Tr(errMsg)
}
for { for {
if errorMsg != "" {
break
}

if task.Status != string(models.JobStopped) && task.Status != string(models.JobSucceeded) && task.Status != string(models.JobFailed) { if task.Status != string(models.JobStopped) && task.Status != string(models.JobSucceeded) && task.Status != string(models.JobFailed) {
log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"])
resultCode = "-1" resultCode = "-1"
@@ -1353,14 +1376,11 @@ func StopJobs(cloudBrains []*models.Cloudbrain) {
logErrorAndUpdateJobStatus(err, taskInfo) logErrorAndUpdateJobStatus(err, taskInfo)
} }
} else if taskInfo.Type == models.TypeC2Net { } else if taskInfo.Type == models.TypeC2Net {
if taskInfo.JobType == string(models.JobTypeTrain) {
err := retry(3, time.Second*30, func() error {
_, err := grampus.StopJob(taskInfo.JobID)
return err
})
logErrorAndUpdateJobStatus(err, taskInfo)
}

err := retry(3, time.Second*30, func() error {
_, err := grampus.StopJob(taskInfo.JobID, taskInfo.JobType)
return err
})
logErrorAndUpdateJobStatus(err, taskInfo)
} }
} }
} }
@@ -2390,15 +2410,19 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo
ctx.Data["benchmarkTypeID"] = benchmarkTypeID ctx.Data["benchmarkTypeID"] = benchmarkTypeID
ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), form.JobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: form.JobType}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx, jobType) cloudBrainNewDataPrepare(ctx, jobType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplCloudBrainBenchmarkNew, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tplCloudBrainBenchmarkNew, &form)
return return
} }
defer lock.UnLock()


tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil { if err == nil {
@@ -2579,15 +2603,19 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm)
tpl := tplCloudBrainBenchmarkNew tpl := tplCloudBrainBenchmarkNew
command := cloudbrain.GetCloudbrainDebugCommand() command := cloudbrain.GetCloudbrainDebugCommand()


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx, jobType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: jobType}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return return
} }
defer lock.UnLock()


tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil { if err == nil {


+ 50
- 24
routers/repo/grampus.go View File

@@ -1,6 +1,7 @@
package repo package repo


import ( import (
"code.gitea.io/gitea/services/lock"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -27,8 +28,6 @@ import (
"code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"github.com/unknwon/com" "github.com/unknwon/com"
@@ -131,13 +130,17 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook
codeStoragePath = setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" codeStoragePath = setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/"
} }


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName))
defer lock.UnLock()
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeDebug)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
grampusNotebookNewDataPrepare(ctx, processType) grampusNotebookNewDataPrepare(ctx, processType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return return
} }


@@ -514,19 +517,23 @@ func grampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
image := strings.TrimSpace(form.Image) image := strings.TrimSpace(form.Image)
tpl := tplGrampusTrainJobGPUNew tpl := tplGrampusTrainJobGPUNew


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobGPUNew, &form)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form)
return return
} }
defer lock.UnLock()


if !jobNamePattern.MatchString(displayJobName) {
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return return
} }


@@ -762,19 +769,23 @@ func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain
engineName := form.EngineName engineName := form.EngineName
tpl := tplGrampusTrainJobNPUNew tpl := tplGrampusTrainJobNPUNew


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
if !jobNamePattern.MatchString(displayJobName) {
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobNPUNew, &form)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form)
return return
} }
defer lock.UnLock()


if !jobNamePattern.MatchString(displayJobName) {
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return return
} }


@@ -1663,7 +1674,22 @@ func GrampusNotebookRestart(ctx *context.Context) {
return return
} }


lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
errorMsg = ctx.Tr(errMsg)
}

for { for {
if errorMsg != "" {
break
}


if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed { if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed {
log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"])


+ 58
- 30
routers/repo/modelarts.go View File

@@ -2,6 +2,7 @@ package repo


import ( import (
"archive/zip" "archive/zip"
"code.gitea.io/gitea/services/lock"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -35,8 +36,6 @@ import (
"code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/obs" "code.gitea.io/gitea/modules/obs"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
@@ -176,15 +175,19 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm
imageId := form.ImageId imageId := form.ImageId
repo := ctx.Repo.Repository repo := ctx.Repo.Repository


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeDebug)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
notebookNewDataPrepare(ctx) notebookNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsNotebookNew, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsNotebookNew, &form)
return return
} }
defer lock.UnLock()


count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeDebug)) count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeDebug))


@@ -494,8 +497,21 @@ func NotebookRestart(ctx *context.Context) {
var spec *models.Specification var spec *models.Specification


task := ctx.Cloudbrain task := ctx.Cloudbrain
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()


if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
errorMsg = ctx.Tr(errMsg)
}
for { for {
if errMsg != "" {
break
}
ctx.CheckWechatBind() ctx.CheckWechatBind()
if ctx.Written() { if ctx.Written() {
return return
@@ -1083,15 +1099,19 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm)
return return
} }


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
trainJobNewDataPrepare(ctx) trainJobNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobNew, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsTrainJobNew, &form)
return return
} }
defer lock.UnLock()


count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeTrain)) count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeTrain))


@@ -1461,6 +1481,20 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ
ctx.Data["PageIsTrainJob"] = true ctx.Data["PageIsTrainJob"] = true
var jobID = ctx.Params(":jobid") var jobID = ctx.Params(":jobid")


lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: form.DisplayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
trainJobNewVersionDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsTrainJobVersionNew, &form)
return
}

errStr := checkMultiNode(ctx.User.ID, form.WorkServerNumber) errStr := checkMultiNode(ctx.User.ID, form.WorkServerNumber)
if errStr != "" { if errStr != "" {
trainJobNewVersionDataPrepare(ctx) trainJobNewVersionDataPrepare(ctx)
@@ -1512,16 +1546,6 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ
EngineName := form.EngineName EngineName := form.EngineName
isLatestVersion := modelarts.IsLatestVersion isLatestVersion := modelarts.IsLatestVersion


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
trainJobNewVersionDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobVersionNew, &form)
return
}
defer lock.UnLock()

canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID) canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID)
if !canNewJob { if !canNewJob {
trainJobNewVersionDataPrepare(ctx) trainJobNewVersionDataPrepare(ctx)
@@ -2109,15 +2133,19 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference
return return
} }


lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeInference)}, User: ctx.User})
defer func() {
if lockOperator != nil {
lockOperator.Unlock()
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
inferenceJobErrorNewDataPrepare(ctx, form) inferenceJobErrorNewDataPrepare(ctx, form)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsInferenceJobNew, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsInferenceJobNew, &form)
return return
} }
defer lock.UnLock()


count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeInference)) count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeInference))




+ 12
- 12
routers/routes/routes.go View File

@@ -1190,7 +1190,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("/download_multi_model", cloudbrain.AdminOrJobCreaterRight, repo.CloudBrainDownloadMultiModel) m.Get("/download_multi_model", cloudbrain.AdminOrJobCreaterRight, repo.CloudBrainDownloadMultiModel)
}) })
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.CloudBrainNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.CloudBrainNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainForm{}), repo.CloudBrainCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainForm{}), context.PointAccount(), repo.CloudBrainCreate)


m.Group("/benchmark", func() { m.Group("/benchmark", func() {
m.Get("", reqRepoCloudBrainReader, repo.CloudBrainBenchmarkIndex) m.Get("", reqRepoCloudBrainReader, repo.CloudBrainBenchmarkIndex)
@@ -1201,7 +1201,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("/rate", reqRepoCloudBrainReader, repo.GetRate) m.Get("/rate", reqRepoCloudBrainReader, repo.GetRate)
}) })
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.CloudBrainBenchmarkNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.CloudBrainBenchmarkNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainForm{}), repo.CloudBrainBenchmarkCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainForm{}), context.PointAccount(), repo.CloudBrainBenchmarkCreate)
m.Get("/get_child_types", repo.GetChildTypes) m.Get("/get_child_types", repo.GetChildTypes)
}) })


@@ -1218,7 +1218,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Post("/create_version", reqWechatBind, cloudbrain.AdminOrJobCreaterRightForTrain, bindIgnErr(auth.CreateCloudBrainForm{}), repo.CloudBrainTrainJobVersionCreate) m.Post("/create_version", reqWechatBind, cloudbrain.AdminOrJobCreaterRightForTrain, bindIgnErr(auth.CreateCloudBrainForm{}), repo.CloudBrainTrainJobVersionCreate)
}) })
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.CloudBrainTrainJobNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.CloudBrainTrainJobNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainForm{}), repo.CloudBrainCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainForm{}), context.PointAccount(), repo.CloudBrainCreate)
}) })
m.Group("/inference-job", func() { m.Group("/inference-job", func() {
m.Group("/:jobid", func() { m.Group("/:jobid", func() {
@@ -1228,7 +1228,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("/downloadall", cloudbrain.AdminOrJobCreaterRightForTrain, repo.DownloadGPUInferenceResultFile) m.Get("/downloadall", cloudbrain.AdminOrJobCreaterRightForTrain, repo.DownloadGPUInferenceResultFile)
}) })
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.InferenceCloudBrainJobNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.InferenceCloudBrainJobNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainInferencForm{}), repo.CloudBrainInferenceJobCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainInferencForm{}), context.PointAccount(), repo.CloudBrainInferenceJobCreate)
}) })
}, context.RepoRef()) }, context.RepoRef())
m.Group("/grampus", func() { m.Group("/grampus", func() {
@@ -1242,7 +1242,7 @@ func RegisterRoutes(m *macaron.Macaron) {
}) })


m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.GrampusNotebookNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.GrampusNotebookNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusNotebookForm{}), repo.GrampusNotebookCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusNotebookForm{}), context.PointAccount(), repo.GrampusNotebookCreate)
}) })


m.Group("/train-job", func() { m.Group("/train-job", func() {
@@ -1257,11 +1257,11 @@ func RegisterRoutes(m *macaron.Macaron) {
}) })
m.Group("/gpu", func() { m.Group("/gpu", func() {
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.GrampusTrainJobGPUNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.GrampusTrainJobGPUNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusTrainJobForm{}), repo.GrampusTrainJobGpuCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusTrainJobForm{}), context.PointAccount(), repo.GrampusTrainJobGpuCreate)
}) })
m.Group("/npu", func() { m.Group("/npu", func() {
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.GrampusTrainJobNPUNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.GrampusTrainJobNPUNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusTrainJobForm{}), repo.GrampusTrainJobNpuCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusTrainJobForm{}), context.PointAccount(), repo.GrampusTrainJobNpuCreate)
}) })
}) })
}, context.RepoRef()) }, context.RepoRef())
@@ -1310,7 +1310,7 @@ func RegisterRoutes(m *macaron.Macaron) {
}) })
m.Get("/create_gpu", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.AiSafetyCreateForGetGPU) m.Get("/create_gpu", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.AiSafetyCreateForGetGPU)
m.Get("/create_npu", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.AiSafetyCreateForGetNPU) m.Get("/create_npu", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.AiSafetyCreateForGetNPU)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, repo.AiSafetyCreateForPost)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.AiSafetyCreateForPost)
}, context.RepoRef()) }, context.RepoRef())


m.Group("/debugjob", func() { m.Group("/debugjob", func() {
@@ -1327,7 +1327,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Post("/del", cloudbrain.AdminOrOwnerOrJobCreaterRight, repo.NotebookDel) m.Post("/del", cloudbrain.AdminOrOwnerOrJobCreaterRight, repo.NotebookDel)
}) })
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.NotebookNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.NotebookNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateModelArtsNotebookForm{}), repo.Notebook2Create)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateModelArtsNotebookForm{}), context.PointAccount(), repo.Notebook2Create)
}) })


m.Group("/train-job", func() { m.Group("/train-job", func() {
@@ -1340,10 +1340,10 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("/download_multi_model", cloudbrain.AdminOrJobCreaterRightForTrain, repo.MultiModelDownload) m.Get("/download_multi_model", cloudbrain.AdminOrJobCreaterRightForTrain, repo.MultiModelDownload)
m.Get("/download_log_file", cloudbrain.AdminOrJobCreaterRightForTrain, repo.TrainJobDownloadLogFile) m.Get("/download_log_file", cloudbrain.AdminOrJobCreaterRightForTrain, repo.TrainJobDownloadLogFile)
m.Get("/create_version", reqWechatBind, cloudbrain.AdminOrJobCreaterRightForTrain, context.PointAccount(), repo.TrainJobNewVersion) m.Get("/create_version", reqWechatBind, cloudbrain.AdminOrJobCreaterRightForTrain, context.PointAccount(), repo.TrainJobNewVersion)
m.Post("/create_version", reqWechatBind, cloudbrain.AdminOrJobCreaterRightForTrain, bindIgnErr(auth.CreateModelArtsTrainJobForm{}), repo.TrainJobCreateVersion)
m.Post("/create_version", reqWechatBind, cloudbrain.AdminOrJobCreaterRightForTrain, bindIgnErr(auth.CreateModelArtsTrainJobForm{}), context.PointAccount(), repo.TrainJobCreateVersion)
}) })
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.TrainJobNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.TrainJobNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateModelArtsTrainJobForm{}), repo.TrainJobCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateModelArtsTrainJobForm{}), context.PointAccount(), repo.TrainJobCreate)


m.Get("/para-config-list", reqRepoCloudBrainReader, repo.TrainJobGetConfigList) m.Get("/para-config-list", reqRepoCloudBrainReader, repo.TrainJobGetConfigList)
}) })
@@ -1356,7 +1356,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("/downloadall", cloudbrain.AdminOrJobCreaterRightForTrain, repo.DownloadMultiResultFile) m.Get("/downloadall", cloudbrain.AdminOrJobCreaterRightForTrain, repo.DownloadMultiResultFile)
}) })
m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.InferenceJobNew) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.InferenceJobNew)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateModelArtsInferenceJobForm{}), repo.InferenceJobCreate)
m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateModelArtsInferenceJobForm{}), context.PointAccount(), repo.InferenceJobCreate)
}) })
}, context.RepoRef()) }, context.RepoRef())




+ 2
- 2
services/cloudbrain/cloudbrainTask/count.go View File

@@ -16,7 +16,7 @@ type StatusInfo struct {


var CloudbrainOneNotFinalStatuses = []string{string(models.JobWaiting), string(models.JobRunning)} var CloudbrainOneNotFinalStatuses = []string{string(models.JobWaiting), string(models.JobRunning)}
var CloudbrainTwoNotFinalStatuses = []string{string(models.ModelArtsTrainJobInit), string(models.ModelArtsTrainJobImageCreating), string(models.ModelArtsTrainJobSubmitTrying), string(models.ModelArtsTrainJobWaiting), string(models.ModelArtsTrainJobRunning), string(models.ModelArtsTrainJobScaling), string(models.ModelArtsTrainJobCheckInit), string(models.ModelArtsTrainJobCheckRunning), string(models.ModelArtsTrainJobCheckRunningCompleted)} var CloudbrainTwoNotFinalStatuses = []string{string(models.ModelArtsTrainJobInit), string(models.ModelArtsTrainJobImageCreating), string(models.ModelArtsTrainJobSubmitTrying), string(models.ModelArtsTrainJobWaiting), string(models.ModelArtsTrainJobRunning), string(models.ModelArtsTrainJobScaling), string(models.ModelArtsTrainJobCheckInit), string(models.ModelArtsTrainJobCheckRunning), string(models.ModelArtsTrainJobCheckRunningCompleted)}
var GrampusNotFinalStatuses = []string{models.GrampusStatusWaiting, models.GrampusStatusRunning}
var GrampusNotFinalStatuses = []string{models.GrampusStatusWaiting, models.GrampusStatusRunning, models.GrampusStatusPending}
var StatusInfoDict = map[string]StatusInfo{string(models.JobTypeDebug) + "-" + strconv.Itoa(models.TypeCloudBrainOne): { var StatusInfoDict = map[string]StatusInfo{string(models.JobTypeDebug) + "-" + strconv.Itoa(models.TypeCloudBrainOne): {
CloudBrainTypes: []int{models.TypeCloudBrainOne}, CloudBrainTypes: []int{models.TypeCloudBrainOne},
JobType: []models.JobType{models.JobTypeDebug}, JobType: []models.JobType{models.JobTypeDebug},
@@ -92,7 +92,7 @@ func GetNotFinalStatusTaskCount(uid int64, cloudbrainType int, jobType string, c


if statusInfo, ok := StatusInfoDict[key]; ok { if statusInfo, ok := StatusInfoDict[key]; ok {


return models.GetNotFinalStatusTaskCount(uid, statusInfo.NotFinalStatuses, statusInfo.JobType, statusInfo.CloudBrainTypes, statusInfo.ComputeResource)
return models.GetNotFinalStatusTaskCount(uid, statusInfo.NotFinalStatuses, statusInfo.JobType)


} else { } else {
return 0, fmt.Errorf("Can not find the status info.") return 0, fmt.Errorf("Can not find the status info.")


+ 20
- 0
services/cloudbrain/lock.go View File

@@ -0,0 +1,20 @@
package cloudbrain

import "code.gitea.io/gitea/services/lock"

func Lock4CloudbrainCreation(ctx *lock.LockContext) (*lock.LockChainOperator, string) {
op := lock.NewLockChainOperator(ctx).Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{})
errCode := op.Lock()
if errCode != "" {
return nil, errCode
}
return op, ""
}
func Lock4CloudbrainRestart(ctx *lock.LockContext) (*lock.LockChainOperator, string) {
op := lock.NewLockChainOperator(ctx).Add(lock.CloudbrainUniquenessLock{})
errCode := op.Lock()
if errCode != "" {
return nil, errCode
}
return op, ""
}

+ 31
- 0
services/lock/cloudbrain_name_lock.go View File

@@ -0,0 +1,31 @@
package lock

import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"fmt"
)

type CloudbrainDisplayJobNameLock struct {
}

func (c CloudbrainDisplayJobNameLock) IsMatch(ctx *LockContext) bool {
return true
}

func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), ctx.Task.JobType, ctx.Task.DisplayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err)
return "repo.cloudbrain_samejob_err"
}
return ""
}

func (c CloudbrainDisplayJobNameLock) Unlock(ctx *LockContext) error {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), ctx.Task.JobType, ctx.Task.DisplayJobName))
return lock.UnLock()
}

+ 30
- 0
services/lock/cloudbrain_uniqueness_lock.go View File

@@ -0,0 +1,30 @@
package lock

import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
)

type CloudbrainUniquenessLock struct {
}

func (c CloudbrainUniquenessLock) IsMatch(ctx *LockContext) bool {
return true
}

func (c CloudbrainUniquenessLock) Lock(ctx *LockContext) string {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, ctx.Task.JobType))
isOk, err := lock.Lock(setting.CloudbrainUniquenessLockTime)
if !isOk {
log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err)
return "repo.cloudbrain.morethanonejob"
}
return ""
}

func (c CloudbrainUniquenessLock) Unlock(ctx *LockContext) error {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, ctx.Task.JobType))
return lock.UnLock()
}

+ 17
- 0
services/lock/lock.go View File

@@ -0,0 +1,17 @@
package lock

import (
"code.gitea.io/gitea/models"
)

type LockContext struct {
Repo *models.Repository
Task *models.Cloudbrain
User *models.User
}

type Lock interface {
IsMatch(ctx *LockContext) bool
Lock(ctx *LockContext) string
Unlock(ctx *LockContext) error
}

+ 42
- 0
services/lock/lock_operator.go View File

@@ -0,0 +1,42 @@
package lock

type LockChainOperator struct {
chainList []Lock
lockedList []Lock
ctx *LockContext
}

func NewLockChainOperator(ctx *LockContext) *LockChainOperator {
return &LockChainOperator{ctx: ctx}
}

func (b *LockChainOperator) Add(l Lock) *LockChainOperator {
b.chainList = append(b.chainList, l)
return b
}

func (b *LockChainOperator) Lock() string {
for i := 0; i < len(b.chainList); i++ {
l := b.chainList[i]
if !l.IsMatch(b.ctx) {
continue
}

if errCode := l.Lock(b.ctx); errCode != "" {
b.Unlock()
return errCode
}
b.lockedList = append(b.lockedList, l)
}
return ""
}

func (b *LockChainOperator) Unlock() error {
if b.chainList == nil || len(b.chainList) == 0 {
return nil
}
for j := len(b.lockedList) - 1; j >= 0; j-- {
b.lockedList[j].Unlock(b.ctx)
}
return nil
}

Loading…
Cancel
Save