Browse Source

Merge pull request 'fix-3106' (#3165) from fix-3106 into V20221116

Reviewed-on: https://git.openi.org.cn/OpenI/aiforge/pulls/3165
Reviewed-by: lewis <747342561@qq.com>
fix-3163
lewis 2 years ago
parent
commit
60864c5d26
4 changed files with 117 additions and 106 deletions
  1. +11
    -34
      routers/api/v1/repo/cloudbrain.go
  2. +3
    -30
      routers/api/v1/repo/modelarts.go
  3. +20
    -42
      routers/repo/cloudbrain.go
  4. +83
    -0
      services/cloudbrain/cloudbrainTask/sync_status.go

+ 11
- 34
routers/api/v1/repo/cloudbrain.go View File

@@ -16,6 +16,8 @@ import (
"strings"
"time"

"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask"

"code.gitea.io/gitea/modules/notification"

"code.gitea.io/gitea/modules/setting"
@@ -81,47 +83,22 @@ func GetCloudbrainTask(ctx *context.APIContext) {
"JobDuration": job.TrainJobDuration,
})
} else {
jobResult, err := cloudbrain.GetJob(job.JobID)
if err != nil {
ctx.NotFound(err)
log.Error("GetJob failed:", err)
return
}
result, _ := models.ConvertToJobResultPayload(jobResult.Payload)
jobAfter, err := cloudbrainTask.SyncCloudBrainOneStatus(job)

if err != nil {
ctx.NotFound(err)
log.Error("ConvertToJobResultPayload failed:", err)
log.Error("Sync cloud brain one status failed:", err)
return
}
oldStatus := job.Status
job.Status = result.JobStatus.State
taskRoles := result.TaskRoles
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) {
job.ContainerIp = taskRes.TaskStatuses[0].ContainerIP
job.ContainerID = taskRes.TaskStatuses[0].ContainerID
job.Status = taskRes.TaskStatuses[0].State
}

if result.JobStatus.State != string(models.JobWaiting) {
models.ParseAndSetDurationFromCloudBrainOne(result, job)
if oldStatus != job.Status {
notification.NotifyChangeCloudbrainStatus(job, oldStatus)
}
err = models.UpdateJob(job)
if err != nil {
log.Error("UpdateJob failed:", err)
}
}

ctx.JSON(http.StatusOK, map[string]interface{}{
"ID": ID,
"JobName": result.Config.JobName,
"JobStatus": result.JobStatus.State,
"SubState": result.JobStatus.SubState,
"CreatedTime": time.Unix(result.JobStatus.CreatedTime/1000, 0).Format("2006-01-02 15:04:05"),
"CompletedTime": time.Unix(result.JobStatus.CompletedTime/1000, 0).Format("2006-01-02 15:04:05"),
"JobDuration": job.TrainJobDuration,
"JobName": jobAfter.JobName,
"JobStatus": jobAfter.Status,
"SubState": "",
"CreatedTime": jobAfter.CreatedUnix.Format("2006-01-02 15:04:05"),
"CompletedTime": jobAfter.UpdatedUnix.Format("2006-01-02 15:04:05"),
"JobDuration": jobAfter.TrainJobDuration,
})
}
}


+ 3
- 30
routers/api/v1/repo/modelarts.go View File

@@ -12,6 +12,8 @@ import (
"strconv"
"strings"

"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask"

"code.gitea.io/gitea/modules/urfs_client/urchin"

"code.gitea.io/gitea/modules/notification"
@@ -20,7 +22,6 @@ import (
"code.gitea.io/gitea/modules/setting"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts"
@@ -109,39 +110,11 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {
}

if job.Type == models.TypeCloudBrainOne {
jobResult, err := cloudbrain.GetJob(job.JobID)
if err != nil {
ctx.NotFound(err)
log.Error("GetJob failed:", err)
return
}
result, err := models.ConvertToJobResultPayload(jobResult.Payload)
job, err = cloudbrainTask.SyncCloudBrainOneStatus(job)
if err != nil {
ctx.NotFound(err)
log.Error("ConvertToJobResultPayload failed:", err)
return
}
oldStatus := job.Status
job.Status = result.JobStatus.State
if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) {
taskRoles := result.TaskRoles
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))

job.ContainerIp = taskRes.TaskStatuses[0].ContainerIP
job.ContainerID = taskRes.TaskStatuses[0].ContainerID
job.Status = taskRes.TaskStatuses[0].State
}

if result.JobStatus.State != string(models.JobWaiting) {
models.ParseAndSetDurationFromCloudBrainOne(result, job)
if oldStatus != job.Status {
notification.NotifyChangeCloudbrainStatus(job, oldStatus)
}
err = models.UpdateJob(job)
if err != nil {
log.Error("UpdateJob failed:", err)
}
}
} else if job.Type == models.TypeCloudBrainTwo {
err := modelarts.HandleTrainJobInfo(job)
if err != nil {


+ 20
- 42
routers/repo/cloudbrain.go View File

@@ -1845,59 +1845,37 @@ func SyncCloudbrainStatus() {
continue
}
if task.Type == models.TypeCloudBrainOne {
result, err := cloudbrain.GetJob(task.JobID)

task, err = cloudbrainTask.SyncCloudBrainOneStatus(task)
if err != nil {
log.Error("GetJob(%s) failed:%v", task.JobName, err)
log.Error("Sync cloud brain one (%s) failed:%v", task.JobName, err)
continue
}

if result != nil {
jobRes, _ := models.ConvertToJobResultPayload(result.Payload)
taskRoles := jobRes.TaskRoles
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
oldStatus := task.Status
task.Status = taskRes.TaskStatuses[0].State
if task.Status != string(models.JobWaiting) {
models.ParseAndSetDurationFromCloudBrainOne(jobRes, task)
if task.Status != string(models.JobWaiting) {
if task.Duration >= setting.MaxDuration && task.JobType == string(models.JobTypeDebug) {
log.Info("begin to stop job(%s), because of the duration", task.DisplayJobName)
err = cloudbrain.StopJob(task.JobID)
if err != nil {
log.Error("StopJob(%s) failed:%v", task.DisplayJobName, err)
continue
}
oldStatus := task.Status
task.Status = string(models.JobStopped)
if task.EndTime == 0 {
task.EndTime = timeutil.TimeStampNow()
}
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
}

var maxDuration int64
if task.JobType == string(models.JobTypeBenchmark) {
maxDuration = setting.BenchmarkMaxDuration
} else if task.JobType == string(models.JobTypeSnn4imagenet) || task.JobType == string(models.JobTypeBrainScore) {
maxDuration = setting.ModelBenchmarkMaxDuration
} else {
maxDuration = setting.MaxDuration
}

if task.Duration >= maxDuration && task.JobType != string(models.JobTypeTrain) {
log.Info("begin to stop job(%s), because of the duration", task.DisplayJobName)
err = cloudbrain.StopJob(task.JobID)
if err != nil {
log.Error("StopJob(%s) failed:%v", task.DisplayJobName, err)
continue
}
task.Status = string(models.JobStopped)
if task.EndTime == 0 {
task.EndTime = timeutil.TimeStampNow()
}
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err)
continue
}
log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err)
continue
}
}

}
} else if task.Type == models.TypeCloudBrainTwo {
if task.JobType == string(models.JobTypeDebug) {


+ 83
- 0
services/cloudbrain/cloudbrainTask/sync_status.go View File

@@ -0,0 +1,83 @@
package cloudbrainTask

import (
"net/http"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/httplib"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/setting"
)

var noteBookOKMap = make(map[int64]int, 20)

//if a task notebook url can get two times, the notebook can browser.
const successfulCount = 2

func SyncCloudBrainOneStatus(task *models.Cloudbrain) (*models.Cloudbrain, error) {
jobResult, err := cloudbrain.GetJob(task.JobID)
if err != nil {

log.Error("GetJob failed:", err)

return task, err
}
result, err := models.ConvertToJobResultPayload(jobResult.Payload)
if err != nil {
log.Error("ConvertToJobResultPayload failed:", err)
return task, err
}
oldStatus := task.Status

if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) {
taskRoles := result.TaskRoles
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))

task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP
task.ContainerID = taskRes.TaskStatuses[0].ContainerID
}

if (result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobRunning)) ||
task.Status == string(models.JobRunning) || (result.JobStatus.State == string(models.JobRunning) && isNoteBookReady(task)) {

models.ParseAndSetDurationFromCloudBrainOne(result, task)
task.Status = result.JobStatus.State
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob failed:", err)
return task, err
}
}
return task, nil

}

func isNoteBookReady(task *models.Cloudbrain) bool {
if task.JobType != string(models.JobTypeDebug) {
return true
}
noteBookUrl := setting.DebugServerHost + "jpylab_" + task.JobID + "_" + task.SubTaskName
r := httplib.Get(noteBookUrl)
res, err := r.Response()
if err != nil {
return false
}
if res.StatusCode == http.StatusOK {
count := noteBookOKMap[task.ID]
if count < successfulCount-1 {
noteBookOKMap[task.ID] = count + 1
return false
} else {
delete(noteBookOKMap, task.ID)
return true
}

}
return false

}

Loading…
Cancel
Save