@@ -201,16 +201,17 @@ type Cloudbrain struct { | |||
} | |||
type CloudbrainShow struct { | |||
ID int64 | |||
JobID string | |||
RepoFullName string | |||
Type int | |||
JobType string | |||
DisplayJobName string | |||
Duration string | |||
ResourceSpec *Specification | |||
ComputeResource string | |||
AiCenter string | |||
ID int64 | |||
JobID string | |||
RepoFullName string | |||
Type int | |||
JobType string | |||
DisplayJobName string | |||
Duration string | |||
ResourceSpec *Specification | |||
ComputeResource string | |||
AiCenter string | |||
WorkServerNumber int | |||
} | |||
type CloudbrainShow4Action struct { | |||
@@ -223,15 +224,20 @@ type CloudbrainShow4Action struct { | |||
} | |||
func (task *Cloudbrain) ToShow() *CloudbrainShow { | |||
n := 1 | |||
if task.WorkServerNumber > 1 { | |||
n = task.WorkServerNumber | |||
} | |||
c := &CloudbrainShow{ | |||
ID: task.ID, | |||
JobID: task.JobID, | |||
JobType: task.JobType, | |||
Type: task.Type, | |||
DisplayJobName: task.DisplayJobName, | |||
Duration: task.TrainJobDuration, | |||
ResourceSpec: task.Spec, | |||
ComputeResource: task.ComputeResource, | |||
ID: task.ID, | |||
JobID: task.JobID, | |||
JobType: task.JobType, | |||
Type: task.Type, | |||
DisplayJobName: task.DisplayJobName, | |||
Duration: task.TrainJobDuration, | |||
ResourceSpec: task.Spec, | |||
ComputeResource: task.ComputeResource, | |||
WorkServerNumber: n, | |||
} | |||
if task.Repo != nil { | |||
c.RepoFullName = task.Repo.FullName() | |||
@@ -123,10 +123,14 @@ func GetResourceSpecMapByCloudbrainIDs(ids []int64) (map[int64]*Specification, e | |||
return r, nil | |||
} | |||
func GetCloudbrainTaskUnitPrice(cloudbrainId int64) (int, error) { | |||
s, err := GetCloudbrainSpecByID(cloudbrainId) | |||
func GetCloudbrainTaskUnitPrice(task Cloudbrain) (int, error) { | |||
s, err := GetCloudbrainSpecByID(task.ID) | |||
if err != nil { | |||
return 0, err | |||
} | |||
return s.UnitPrice, nil | |||
var n = 1 | |||
if task.WorkServerNumber > 1 { | |||
n = task.WorkServerNumber | |||
} | |||
return s.UnitPrice * n, nil | |||
} |
@@ -1216,7 +1216,7 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) | |||
ctx.RenderWithErr("Resource specification not available", tplModelArtsTrainJobNew, &form) | |||
return | |||
} | |||
if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { | |||
if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice*form.WorkServerNumber) { | |||
log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) | |||
cloudBrainNewDataPrepare(ctx) | |||
ctx.RenderWithErr(ctx.Tr("points.insufficient_points_balance"), tplModelArtsTrainJobNew, &form) | |||
@@ -25,20 +25,32 @@ func AcceptStatusChangeAction() { | |||
} | |||
func StartAndGetCloudBrainPointDeductTask(task models.Cloudbrain) (*models.RewardPeriodicTask, error) { | |||
sourceId := getCloudBrainPointTaskSourceId(task) | |||
r, err := GetPeriodicTask(models.SourceTypeRunCloudbrainTask, sourceId, sourceId, models.OperateTypeDecrease) | |||
if err != nil { | |||
return nil, err | |||
} | |||
if r != nil { | |||
log.Debug("PeriodicTask is already exist.cloudbrain.ID = %d", task.ID) | |||
return r, nil | |||
} | |||
if !setting.CloudBrainPaySwitch { | |||
log.Debug("CloudBrainPaySwitch is off") | |||
return nil, nil | |||
} | |||
unitPrice, err := models.GetCloudbrainTaskUnitPrice(task.ID) | |||
unitPrice, err := models.GetCloudbrainTaskUnitPrice(task) | |||
if err != nil { | |||
return nil, err | |||
} | |||
if unitPrice == 0 { | |||
log.Debug("finish StartAndGetCloudBrainPointDeductTask, UnitPrice = 0 task.ID=%d", task.ID) | |||
log.Debug("Finish startAndGetCloudBrainPointDeductTask, UnitPrice = 0 task.ID=%d", task.ID) | |||
return nil, nil | |||
} | |||
return StartAndGetPeriodicTask(&models.StartPeriodicTaskOpts{ | |||
return StartPeriodicTask(&models.StartPeriodicTaskOpts{ | |||
SourceType: models.SourceTypeRunCloudbrainTask, | |||
SourceId: getCloudBrainPointTaskSourceId(task), | |||
TargetUserId: task.UserID, | |||
@@ -180,43 +180,41 @@ func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) er | |||
return nil | |||
} | |||
func StartPeriodicTaskAsyn(opts *models.StartPeriodicTaskOpts) { | |||
go StartAndGetPeriodicTask(opts) | |||
func GetPeriodicTask(sourceType models.SourceType, sourceId, requestId string, operateType models.RewardOperateType) (*models.RewardPeriodicTask, error) { | |||
_, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType.Name(), requestId, operateType.Name()) | |||
if err == nil { | |||
task, err := models.GetPeriodicTaskBySourceIdAndType(sourceType, sourceId, operateType) | |||
if err != nil { | |||
log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err) | |||
return nil, err | |||
} | |||
return task, nil | |||
} | |||
if err != nil && !models.IsErrRecordNotExist(err) { | |||
log.Error("GetPointOperateRecordBySourceTypeAndRequestId error,%v", err) | |||
return nil, err | |||
} | |||
return nil, nil | |||
} | |||
func StartAndGetPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) { | |||
defer func() { | |||
if err := recover(); err != nil { | |||
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) | |||
log.Error("PANIC:%v", combinedErr) | |||
} | |||
}() | |||
func StartPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) { | |||
//add lock | |||
var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(opts.RequestId, opts.SourceType.Name(), opts.OperateType.Name())) | |||
isOk, err := rewardLock.Lock(3 * time.Second) | |||
if err != nil { | |||
log.Error("StartAndGetPeriodicTask RewardOperateLock error. %v", err) | |||
return nil, err | |||
} | |||
if !isOk { | |||
log.Info("duplicated operate request,targetUserId=%d requestId=%s", opts.TargetUserId, opts.RequestId) | |||
return nil, nil | |||
} | |||
defer rewardLock.UnLock() | |||
_, err = models.GetPointOperateRecordBySourceTypeAndRequestId(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name()) | |||
if err == nil { | |||
task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType) | |||
if err != nil { | |||
log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err) | |||
return nil, err | |||
} | |||
return task, nil | |||
r, err := GetPeriodicTask(opts.SourceType, opts.SourceId, opts.RequestId, opts.OperateType) | |||
if err != nil { | |||
return nil, err | |||
} | |||
if err != nil && !models.IsErrRecordNotExist(err) { | |||
log.Error("operate is handled error,%v", err) | |||
return nil, err | |||
if r != nil { | |||
return r, nil | |||
} | |||
//new reward operate record | |||