diff --git a/routers/api/v1/repo/cloudbrain.go b/routers/api/v1/repo/cloudbrain.go index 2303ec7ee..3c120cb54 100755 --- a/routers/api/v1/repo/cloudbrain.go +++ b/routers/api/v1/repo/cloudbrain.go @@ -16,6 +16,8 @@ import ( "strings" "time" + "code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" + "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" @@ -81,47 +83,22 @@ func GetCloudbrainTask(ctx *context.APIContext) { "JobDuration": job.TrainJobDuration, }) } else { - jobResult, err := cloudbrain.GetJob(job.JobID) - if err != nil { - ctx.NotFound(err) - log.Error("GetJob failed:", err) - return - } - result, _ := models.ConvertToJobResultPayload(jobResult.Payload) + jobAfter, err := cloudbrainTask.SyncCloudBrainOneStatus(job) + if err != nil { ctx.NotFound(err) - log.Error("ConvertToJobResultPayload failed:", err) + log.Error("Sync cloud brain one status failed:", err) return } - oldStatus := job.Status - job.Status = result.JobStatus.State - taskRoles := result.TaskRoles - taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) - if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { - job.ContainerIp = taskRes.TaskStatuses[0].ContainerIP - job.ContainerID = taskRes.TaskStatuses[0].ContainerID - job.Status = taskRes.TaskStatuses[0].State - } - - if result.JobStatus.State != string(models.JobWaiting) { - models.ParseAndSetDurationFromCloudBrainOne(result, job) - if oldStatus != job.Status { - notification.NotifyChangeCloudbrainStatus(job, oldStatus) - } - err = models.UpdateJob(job) - if err != nil { - log.Error("UpdateJob failed:", err) - } - } ctx.JSON(http.StatusOK, map[string]interface{}{ "ID": ID, - "JobName": result.Config.JobName, - "JobStatus": result.JobStatus.State, - "SubState": result.JobStatus.SubState, - "CreatedTime": time.Unix(result.JobStatus.CreatedTime/1000, 0).Format("2006-01-02 15:04:05"), - "CompletedTime": time.Unix(result.JobStatus.CompletedTime/1000, 0).Format("2006-01-02 15:04:05"), - "JobDuration": job.TrainJobDuration, + "JobName": jobAfter.JobName, + "JobStatus": jobAfter.Status, + "SubState": "", + "CreatedTime": jobAfter.CreatedUnix.Format("2006-01-02 15:04:05"), + "CompletedTime": jobAfter.UpdatedUnix.Format("2006-01-02 15:04:05"), + "JobDuration": jobAfter.TrainJobDuration, }) } } diff --git a/routers/api/v1/repo/modelarts.go b/routers/api/v1/repo/modelarts.go index 5a0e21ed8..cd129dd5f 100755 --- a/routers/api/v1/repo/modelarts.go +++ b/routers/api/v1/repo/modelarts.go @@ -12,6 +12,8 @@ import ( "strconv" "strings" + "code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" + "code.gitea.io/gitea/modules/urfs_client/urchin" "code.gitea.io/gitea/modules/notification" @@ -20,7 +22,6 @@ import ( "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" @@ -109,39 +110,11 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { } if job.Type == models.TypeCloudBrainOne { - jobResult, err := cloudbrain.GetJob(job.JobID) - if err != nil { - ctx.NotFound(err) - log.Error("GetJob failed:", err) - return - } - result, err := models.ConvertToJobResultPayload(jobResult.Payload) + job, err = cloudbrainTask.SyncCloudBrainOneStatus(job) if err != nil { ctx.NotFound(err) - log.Error("ConvertToJobResultPayload failed:", err) return } - oldStatus := job.Status - job.Status = result.JobStatus.State - if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { - taskRoles := result.TaskRoles - taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) - - job.ContainerIp = taskRes.TaskStatuses[0].ContainerIP - job.ContainerID = taskRes.TaskStatuses[0].ContainerID - job.Status = taskRes.TaskStatuses[0].State - } - - if result.JobStatus.State != string(models.JobWaiting) { - models.ParseAndSetDurationFromCloudBrainOne(result, job) - if oldStatus != job.Status { - notification.NotifyChangeCloudbrainStatus(job, oldStatus) - } - err = models.UpdateJob(job) - if err != nil { - log.Error("UpdateJob failed:", err) - } - } } else if job.Type == models.TypeCloudBrainTwo { err := modelarts.HandleTrainJobInfo(job) if err != nil { diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index de0e0e8bf..aac941101 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1845,59 +1845,37 @@ func SyncCloudbrainStatus() { continue } if task.Type == models.TypeCloudBrainOne { - result, err := cloudbrain.GetJob(task.JobID) + + task, err = cloudbrainTask.SyncCloudBrainOneStatus(task) if err != nil { - log.Error("GetJob(%s) failed:%v", task.JobName, err) + log.Error("Sync cloud brain one (%s) failed:%v", task.JobName, err) continue } - if result != nil { - jobRes, _ := models.ConvertToJobResultPayload(result.Payload) - taskRoles := jobRes.TaskRoles - taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) - oldStatus := task.Status - task.Status = taskRes.TaskStatuses[0].State - if task.Status != string(models.JobWaiting) { - models.ParseAndSetDurationFromCloudBrainOne(jobRes, task) + if task.Status != string(models.JobWaiting) { + if task.Duration >= setting.MaxDuration && task.JobType == string(models.JobTypeDebug) { + log.Info("begin to stop job(%s), because of the duration", task.DisplayJobName) + err = cloudbrain.StopJob(task.JobID) + if err != nil { + log.Error("StopJob(%s) failed:%v", task.DisplayJobName, err) + continue + } + oldStatus := task.Status + task.Status = string(models.JobStopped) + if task.EndTime == 0 { + task.EndTime = timeutil.TimeStampNow() + } + task.ComputeAndSetDuration() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) } err = models.UpdateJob(task) if err != nil { - log.Error("UpdateJob(%s) failed:%v", task.JobName, err) - } - - var maxDuration int64 - if task.JobType == string(models.JobTypeBenchmark) { - maxDuration = setting.BenchmarkMaxDuration - } else if task.JobType == string(models.JobTypeSnn4imagenet) || task.JobType == string(models.JobTypeBrainScore) { - maxDuration = setting.ModelBenchmarkMaxDuration - } else { - maxDuration = setting.MaxDuration - } - - if task.Duration >= maxDuration && task.JobType != string(models.JobTypeTrain) { - log.Info("begin to stop job(%s), because of the duration", task.DisplayJobName) - err = cloudbrain.StopJob(task.JobID) - if err != nil { - log.Error("StopJob(%s) failed:%v", task.DisplayJobName, err) - continue - } - task.Status = string(models.JobStopped) - if task.EndTime == 0 { - task.EndTime = timeutil.TimeStampNow() - } - task.ComputeAndSetDuration() - if oldStatus != task.Status { - notification.NotifyChangeCloudbrainStatus(task, oldStatus) - } - err = models.UpdateJob(task) - if err != nil { - log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err) - continue - } + log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err) + continue } } + } } else if task.Type == models.TypeCloudBrainTwo { if task.JobType == string(models.JobTypeDebug) { diff --git a/services/cloudbrain/cloudbrainTask/sync_status.go b/services/cloudbrain/cloudbrainTask/sync_status.go new file mode 100644 index 000000000..7153a7ec0 --- /dev/null +++ b/services/cloudbrain/cloudbrainTask/sync_status.go @@ -0,0 +1,83 @@ +package cloudbrainTask + +import ( + "net/http" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/cloudbrain" + "code.gitea.io/gitea/modules/httplib" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/notification" + "code.gitea.io/gitea/modules/setting" +) + +var noteBookOKMap = make(map[int64]int, 20) + +//if a task notebook url can get two times, the notebook can browser. +const successfulCount = 2 + +func SyncCloudBrainOneStatus(task *models.Cloudbrain) (*models.Cloudbrain, error) { + jobResult, err := cloudbrain.GetJob(task.JobID) + if err != nil { + + log.Error("GetJob failed:", err) + + return task, err + } + result, err := models.ConvertToJobResultPayload(jobResult.Payload) + if err != nil { + log.Error("ConvertToJobResultPayload failed:", err) + return task, err + } + oldStatus := task.Status + + if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { + taskRoles := result.TaskRoles + taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) + + task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP + task.ContainerID = taskRes.TaskStatuses[0].ContainerID + } + + if (result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobRunning)) || + task.Status == string(models.JobRunning) || (result.JobStatus.State == string(models.JobRunning) && isNoteBookReady(task)) { + + models.ParseAndSetDurationFromCloudBrainOne(result, task) + task.Status = result.JobStatus.State + if oldStatus != task.Status { + notification.NotifyChangeCloudbrainStatus(task, oldStatus) + } + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob failed:", err) + return task, err + } + } + return task, nil + +} + +func isNoteBookReady(task *models.Cloudbrain) bool { + if task.JobType != string(models.JobTypeDebug) { + return true + } + noteBookUrl := setting.DebugServerHost + "jpylab_" + task.JobID + "_" + task.SubTaskName + r := httplib.Get(noteBookUrl) + res, err := r.Response() + if err != nil { + return false + } + if res.StatusCode == http.StatusOK { + count := noteBookOKMap[task.ID] + if count < successfulCount-1 { + noteBookOKMap[task.ID] = count + 1 + return false + } else { + delete(noteBookOKMap, task.ID) + return true + } + + } + return false + +}