Browse Source

#2417

add creation lock
fix-2417
chenyifan01 2 years ago
parent
commit
4017242a5f
9 changed files with 190 additions and 20 deletions
  1. +6
    -0
      modules/redis/redis_key/cloudbrain_redis_key.go
  2. +16
    -0
      routers/repo/aisafety.go
  3. +22
    -13
      routers/repo/cloudbrain.go
  4. +13
    -7
      routers/repo/grampus.go
  5. +13
    -0
      services/cloudbrain/lock.go
  6. +31
    -0
      services/lock/cloudbrain_name_lock.go
  7. +31
    -0
      services/lock/cloudbrain_uniqueness_lock.go
  8. +18
    -0
      services/lock/lock.go
  9. +40
    -0
      services/lock/lock_operator.go

+ 6
- 0
modules/redis/redis_key/cloudbrain_redis_key.go View File

@@ -1,7 +1,13 @@
package redis_key

import "fmt"

const CLOUDBRAIN_PREFIX = "cloudbrain"

func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string {
return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key")
}

func CloudbrainUniquenessKey(userId int64, jobType string) string {
return KeyJoin(CLOUDBRAIN_PREFIX, fmt.Sprint(userId), jobType, "uniqueness")
}

+ 16
- 0
routers/repo/aisafety.go View File

@@ -2,6 +2,7 @@ package repo

import (
"bufio"
"code.gitea.io/gitea/services/lock"
"encoding/json"
"errors"
"fmt"
@@ -575,6 +576,21 @@ func AiSafetyCreateForPost(ctx *context.Context) {
tpname = tplCloudBrainModelSafetyNewGpu
}

limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User}
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx)
defer func() {
if lockOperator != nil {
lockOperator.Unlock(limiterCtx)
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
modelSafetyNewDataPrepare(ctx)
ctx.RenderWithErr(ctx.Tr(errMsg), tpname, nil)
return
}

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeModelSafety), displayJobName)
if err == nil {
if len(tasks) != 0 {


+ 22
- 13
routers/repo/cloudbrain.go View File

@@ -2,6 +2,7 @@ package repo

import (
"bufio"
"code.gitea.io/gitea/services/lock"
"encoding/json"
"errors"
"fmt"
@@ -228,16 +229,20 @@ func cloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
if jobType == string(models.JobTypeTrain) {
tpl = tplCloudBrainTrainJobNew
}
limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User}
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx)
defer func() {
if lockOperator != nil {
lockOperator.Unlock(limiterCtx)
}
}()

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx, jobType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {
@@ -746,7 +751,6 @@ func CloudBrainRestart(ctx *context.Context) {
})
}


func hasDatasetDeleted(task *models.Cloudbrain) bool {
if task.Uuid == "" {
return false
@@ -2389,15 +2393,20 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo
ctx.Data["benchmarkTypeID"] = benchmarkTypeID
ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), form.JobType, displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User}
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx)
defer func() {
if lockOperator != nil {
lockOperator.Unlock(limiterCtx)
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
cloudBrainNewDataPrepare(ctx, jobType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplCloudBrainBenchmarkNew, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tplCloudBrainBenchmarkNew, &form)
return
}
defer lock.UnLock()

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {


+ 13
- 7
routers/repo/grampus.go View File

@@ -1,6 +1,7 @@
package repo

import (
"code.gitea.io/gitea/services/lock"
"encoding/json"
"errors"
"fmt"
@@ -119,13 +120,18 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook
codeStoragePath = grampus.JobPath + jobName + modelarts.CodePath
}

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName))
defer lock.UnLock()
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User}
lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx)
defer func() {
if lockOperator != nil {
lockOperator.Unlock(limiterCtx)
}
}()

if errMsg != "" {
log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
grampusNotebookNewDataPrepare(ctx, processType)
ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form)
ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form)
return
}

@@ -1308,7 +1314,7 @@ func GrampusTrainJobShow(ctx *context.Context) {
taskList := make([]*models.Cloudbrain, 0)
taskList = append(taskList, task)
prepareSpec4Show(ctx, task)
ctx.Data["version_list_task"] = taskList
ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false)
ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task)


+ 13
- 0
services/cloudbrain/lock.go View File

@@ -0,0 +1,13 @@
package cloudbrain

import "code.gitea.io/gitea/services/lock"

var defaultChain = lock.NewLockChainOperator().Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{})

func Lock4CloudbrainCreation(ctx *lock.LockContext) (*lock.LockChainOperator, string) {
errCode := defaultChain.Lock(ctx)
if errCode != "" {
return nil, errCode
}
return defaultChain, ""
}

+ 31
- 0
services/lock/cloudbrain_name_lock.go View File

@@ -0,0 +1,31 @@
package lock

import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"fmt"
)

type CloudbrainDisplayJobNameLock struct {
}

func (c CloudbrainDisplayJobNameLock) IsMatch(ctx *LockContext) bool {
return true
}

func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err)
return "repo.cloudbrain_samejob_err"
}
return ""
}

func (c CloudbrainDisplayJobNameLock) Unlock(ctx *LockContext) error {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName))
return lock.UnLock()
}

+ 31
- 0
services/lock/cloudbrain_uniqueness_lock.go View File

@@ -0,0 +1,31 @@
package lock

import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"time"
)

type CloudbrainUniquenessLock struct {
}

func (c CloudbrainUniquenessLock) IsMatch(ctx *LockContext) bool {
return true
}

func (c CloudbrainUniquenessLock) Lock(ctx *LockContext) string {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug)))
isOk, err := lock.Lock(5 * time.Minute)
if !isOk {
log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err)
return "you have already a running or waiting task, can not create more"
}
return ""
}

func (c CloudbrainUniquenessLock) Unlock(ctx *LockContext) error {
lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug)))
return lock.UnLock()
}

+ 18
- 0
services/lock/lock.go View File

@@ -0,0 +1,18 @@
package lock

import (
"code.gitea.io/gitea/models"
)

type LockContext struct {
Repo *models.Repository
DisplayJobName string
User *models.User
LockedList []Lock
}

type Lock interface {
IsMatch(ctx *LockContext) bool
Lock(ctx *LockContext) string
Unlock(ctx *LockContext) error
}

+ 40
- 0
services/lock/lock_operator.go View File

@@ -0,0 +1,40 @@
package lock

type LockChainOperator struct {
ChainList []Lock
}

func NewLockChainOperator() *LockChainOperator {
return &LockChainOperator{}
}

func (b *LockChainOperator) Add(l Lock) *LockChainOperator {
b.ChainList = append(b.ChainList, l)
return b
}

func (b *LockChainOperator) Lock(ctx *LockContext) string {
for i := 0; i < len(b.ChainList); i++ {
l := b.ChainList[i]
if !l.IsMatch(ctx) {
continue
}

if errCode := l.Lock(ctx); errCode != "" {
b.Unlock(ctx)
return errCode
}
ctx.LockedList = append(ctx.LockedList, l)
}
return ""
}

func (b *LockChainOperator) Unlock(ctx *LockContext) error {
if b.ChainList == nil || len(b.ChainList) == 0 {
return nil
}
for j := len(ctx.LockedList) - 1; j >= 0; j-- {
ctx.LockedList[j].Unlock(ctx)
}
return nil
}

Loading…
Cancel
Save