@@ -201,16 +201,17 @@ type Cloudbrain struct { | |||
} | |||
type CloudbrainShow struct { | |||
ID int64 | |||
JobID string | |||
RepoFullName string | |||
Type int | |||
JobType string | |||
DisplayJobName string | |||
Duration string | |||
ResourceSpec *Specification | |||
ComputeResource string | |||
AiCenter string | |||
ID int64 | |||
JobID string | |||
RepoFullName string | |||
Type int | |||
JobType string | |||
DisplayJobName string | |||
Duration string | |||
ResourceSpec *Specification | |||
ComputeResource string | |||
AiCenter string | |||
WorkServerNumber int | |||
} | |||
type CloudbrainShow4Action struct { | |||
@@ -223,15 +224,20 @@ type CloudbrainShow4Action struct { | |||
} | |||
func (task *Cloudbrain) ToShow() *CloudbrainShow { | |||
n := 1 | |||
if task.WorkServerNumber > 1 { | |||
n = task.WorkServerNumber | |||
} | |||
c := &CloudbrainShow{ | |||
ID: task.ID, | |||
JobID: task.JobID, | |||
JobType: task.JobType, | |||
Type: task.Type, | |||
DisplayJobName: task.DisplayJobName, | |||
Duration: task.TrainJobDuration, | |||
ResourceSpec: task.Spec, | |||
ComputeResource: task.ComputeResource, | |||
ID: task.ID, | |||
JobID: task.JobID, | |||
JobType: task.JobType, | |||
Type: task.Type, | |||
DisplayJobName: task.DisplayJobName, | |||
Duration: task.TrainJobDuration, | |||
ResourceSpec: task.Spec, | |||
ComputeResource: task.ComputeResource, | |||
WorkServerNumber: n, | |||
} | |||
if task.Repo != nil { | |||
c.RepoFullName = task.Repo.FullName() | |||
@@ -25,7 +25,19 @@ func AcceptStatusChangeAction() { | |||
} | |||
func StartAndGetCloudBrainPointDeductTask(task models.Cloudbrain) (*models.RewardPeriodicTask, error) { | |||
sourceId := getCloudBrainPointTaskSourceId(task) | |||
r, err := GetPeriodicTask(models.SourceTypeRunCloudbrainTask, sourceId, sourceId, models.OperateTypeDecrease) | |||
if err != nil { | |||
return nil, err | |||
} | |||
if r != nil { | |||
log.Debug("PeriodicTask is already exist.cloudbrain.ID = %d", task.ID) | |||
return r, nil | |||
} | |||
if !setting.CloudBrainPaySwitch { | |||
log.Debug("CloudBrainPaySwitch is off") | |||
return nil, nil | |||
} | |||
@@ -34,11 +46,11 @@ func StartAndGetCloudBrainPointDeductTask(task models.Cloudbrain) (*models.Rewar | |||
return nil, err | |||
} | |||
if unitPrice == 0 { | |||
log.Debug("finish StartAndGetCloudBrainPointDeductTask, UnitPrice = 0 task.ID=%d", task.ID) | |||
log.Debug("Finish startAndGetCloudBrainPointDeductTask, UnitPrice = 0 task.ID=%d", task.ID) | |||
return nil, nil | |||
} | |||
return StartAndGetPeriodicTask(&models.StartPeriodicTaskOpts{ | |||
return StartPeriodicTask(&models.StartPeriodicTaskOpts{ | |||
SourceType: models.SourceTypeRunCloudbrainTask, | |||
SourceId: getCloudBrainPointTaskSourceId(task), | |||
TargetUserId: task.UserID, | |||
@@ -180,43 +180,41 @@ func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) er | |||
return nil | |||
} | |||
func StartPeriodicTaskAsyn(opts *models.StartPeriodicTaskOpts) { | |||
go StartAndGetPeriodicTask(opts) | |||
func GetPeriodicTask(sourceType models.SourceType, sourceId, requestId string, operateType models.RewardOperateType) (*models.RewardPeriodicTask, error) { | |||
_, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType.Name(), requestId, operateType.Name()) | |||
if err == nil { | |||
task, err := models.GetPeriodicTaskBySourceIdAndType(sourceType, sourceId, operateType) | |||
if err != nil { | |||
log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err) | |||
return nil, err | |||
} | |||
return task, nil | |||
} | |||
if err != nil && !models.IsErrRecordNotExist(err) { | |||
log.Error("GetPointOperateRecordBySourceTypeAndRequestId error,%v", err) | |||
return nil, err | |||
} | |||
return nil, nil | |||
} | |||
func StartAndGetPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) { | |||
defer func() { | |||
if err := recover(); err != nil { | |||
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) | |||
log.Error("PANIC:%v", combinedErr) | |||
} | |||
}() | |||
func StartPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) { | |||
//add lock | |||
var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(opts.RequestId, opts.SourceType.Name(), opts.OperateType.Name())) | |||
isOk, err := rewardLock.Lock(3 * time.Second) | |||
if err != nil { | |||
log.Error("StartAndGetPeriodicTask RewardOperateLock error. %v", err) | |||
return nil, err | |||
} | |||
if !isOk { | |||
log.Info("duplicated operate request,targetUserId=%d requestId=%s", opts.TargetUserId, opts.RequestId) | |||
return nil, nil | |||
} | |||
defer rewardLock.UnLock() | |||
_, err = models.GetPointOperateRecordBySourceTypeAndRequestId(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name()) | |||
if err == nil { | |||
task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType) | |||
if err != nil { | |||
log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err) | |||
return nil, err | |||
} | |||
return task, nil | |||
r, err := GetPeriodicTask(opts.SourceType, opts.SourceId, opts.RequestId, opts.OperateType) | |||
if err != nil { | |||
return nil, err | |||
} | |||
if err != nil && !models.IsErrRecordNotExist(err) { | |||
log.Error("operate is handled error,%v", err) | |||
return nil, err | |||
if r != nil { | |||
return r, nil | |||
} | |||
//new reward operate record | |||