From e116706f3e709c5cc6bf02bf509c53bc897d27eb Mon Sep 17 00:00:00 2001 From: lewis <747342561@qq.com> Date: Wed, 19 Jan 2022 17:52:05 +0800 Subject: [PATCH 1/2] get all log --- models/cloudbrain.go | 36 ++++++++++++++------ modules/cloudbrain/resty.go | 72 ++++++++++++++++++++++++++++++++++++--- routers/api/v1/repo/cloudbrain.go | 36 +++++++++++++++----- 3 files changed, 122 insertions(+), 22 deletions(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 39d9aa7e2..9293a62a5 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -476,7 +476,7 @@ type MatchInfo struct { type GetJobLogResult struct { ScrollID string `json:"_scroll_id"` - Took int `json:"took"` + Took int `json:"took"` TimedOut bool `json:"timed_out"` Shards struct { Total int `json:"total"` @@ -485,18 +485,34 @@ type GetJobLogResult struct { Failed int `json:"failed"` } `json:"_shards"` Hits struct { - Hits []struct { - Index string `json:"_index"` - Type string `json:"_type"` - ID string `json:"_id"` - Source struct { - Message string `json:"message"` - } `json:"_source"` - Sort []int `json:"sort"` - } `json:"hits"` + Hits []Hits `json:"hits"` } `json:"hits"` } +type Hits struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + Source struct { + Message string `json:"message"` + } `json:"_source"` + Sort []int `json:"sort"` +} + +type GetAllJobLogParams struct { + Scroll string `json:"scroll"` + ScrollID string `json:"scroll_id"` +} + +type DeleteJobLogTokenParams struct { + ScrollID string `json:"scroll_id"` +} + +type DeleteJobLogTokenResult struct { + Succeeded bool `json:"succeeded"` + NumFreed int `json:"num_freed"` +} + type CloudBrainResult struct { Code string `json:"code"` Msg string `json:"msg"` diff --git a/modules/cloudbrain/resty.go b/modules/cloudbrain/resty.go index e9c829840..46b7c991b 100755 --- a/modules/cloudbrain/resty.go +++ b/modules/cloudbrain/resty.go @@ -26,6 +26,8 @@ const ( JobHasBeenStopped = "S410" Public = "public" Custom = "custom" + LogPageSize = 500 + LogPageTokenExpired = "5m" ) func getRestyClient() *resty.Client { @@ -279,7 +281,7 @@ func GetJobLog(jobID string) (*models.GetJobLogResult, error) { client := getRestyClient() var result models.GetJobLogResult req := models.GetJobLogParams{ - Size: "5000", + Size: strconv.Itoa(LogPageSize), Sort: "log.offset", QueryInfo: models.QueryInfo{ MatchInfo: models.MatchInfo{ @@ -293,17 +295,79 @@ func GetJobLog(jobID string) (*models.GetJobLogResult, error) { SetAuthToken(TOKEN). SetBody(req). SetResult(&result). - Post(HOST + "es/_search?_source=message&scroll=5m") + Post(HOST + "es/_search?_source=message&scroll=" + LogPageTokenExpired) if err != nil { - log.Info("GetJobLog failed: %v", err) + log.Error("GetJobLog failed: %v", err) return &result, fmt.Errorf("resty GetJobLog: %v, %s", err, res.String()) } if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) { - log.Info("res.Status(): %s, response: %s", res.Status(), res.String()) + log.Error("res.Status(): %s, response: %s", res.Status(), res.String()) return &result, errors.New(res.String()) } return &result, nil } + +func GetJobAllLog(scrollID string) (*models.GetJobLogResult, error) { + checkSetting() + client := getRestyClient() + var result models.GetJobLogResult + req := models.GetAllJobLogParams{ + Scroll: LogPageTokenExpired, + ScrollID: scrollID, + } + + res, err := client.R(). + SetHeader("Content-Type", "application/json"). + SetAuthToken(TOKEN). + SetBody(req). + SetResult(&result). + Post(HOST + "es/_search/scroll") + + if err != nil { + log.Error("GetJobAllLog failed: %v", err) + return &result, fmt.Errorf("resty GetJobAllLog: %v, %s", err, res.String()) + } + + if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) { + log.Error("res.Status(): %s, response: %s", res.Status(), res.String()) + return &result, errors.New(res.String()) + } + + return &result, nil +} + +func DeleteJobLogToken(scrollID string) (error) { + checkSetting() + client := getRestyClient() + var result models.DeleteJobLogTokenResult + req := models.DeleteJobLogTokenParams{ + ScrollID: scrollID, + } + + res, err := client.R(). + SetHeader("Content-Type", "application/json"). + SetAuthToken(TOKEN). + SetBody(req). + SetResult(&result). + Delete(HOST + "es/_search/scroll") + + if err != nil { + log.Error("DeleteJobLogToken failed: %v", err) + return fmt.Errorf("resty DeleteJobLogToken: %v, %s", err, res.String()) + } + + if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) { + log.Error("res.Status(): %s, response: %s", res.Status(), res.String()) + return errors.New(res.String()) + } + + if !result.Succeeded { + log.Error("DeleteJobLogToken failed") + return errors.New("DeleteJobLogToken failed") + } + + return nil +} diff --git a/routers/api/v1/repo/cloudbrain.go b/routers/api/v1/repo/cloudbrain.go index 2236c9013..dd468783d 100755 --- a/routers/api/v1/repo/cloudbrain.go +++ b/routers/api/v1/repo/cloudbrain.go @@ -102,25 +102,45 @@ func CloudbrainGetLog(ctx *context.Context) { return } + var hits []models.Hits result, err := cloudbrain.GetJobLog(jobID) if err != nil{ log.Error("GetJobLog failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } + hits = result.Hits.Hits + + //if the size equal page_size, then take the scroll_id to get all log and delete the scroll_id(the num of scroll_id is limited) + if len(result.Hits.Hits) >= cloudbrain.LogPageSize { + for { + resultNext, err := cloudbrain.GetJobAllLog(result.ScrollID) + if err != nil{ + log.Error("GetJobAllLog failed: %v", err, ctx.Data["MsgID"]) + } else { + for _, hit := range resultNext.Hits.Hits { + hits = append(hits, hit) + } + } + + if len(resultNext.Hits.Hits) < cloudbrain.LogPageSize { + log.Info("get all log already") + break + } + } + } - sort.Slice(result.Hits.Hits, func(i, j int) bool { - return result.Hits.Hits[i].Sort[0] < result.Hits.Hits[j].Sort[0] + cloudbrain.DeleteJobLogToken(result.ScrollID) + + sort.Slice(hits, func(i, j int) bool { + return hits[i].Sort[0] < hits[j].Sort[0] }) - log.Info("%v", result.Hits.Hits) - var content []string - for _, log := range result.Hits.Hits { - content = append(content, log.Source.Message + "\n") + var content string + for _, log := range hits { + content += log.Source.Message + "\n" } - log.Info("%v", content) - ctx.JSON(http.StatusOK, map[string]interface{}{ "JobID": jobID, "Content": content, From 8bd295c747711b48c587959fd316df7afa276d18 Mon Sep 17 00:00:00 2001 From: lewis <747342561@qq.com> Date: Wed, 19 Jan 2022 18:52:11 +0800 Subject: [PATCH 2/2] adjust --- models/cloudbrain.go | 2 ++ modules/cloudbrain/cloudbrain.go | 3 ++- routers/repo/cloudbrain.go | 18 +++++++++--------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 9293a62a5..82c4c6b83 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -403,6 +403,8 @@ type BenchmarkDataset struct { Id int `json:"id"` Value string `json:"value"` //二级算法类型名称 Attachment string `json:"attachment"` //数据集的uuid + Owner string `json:"owner"` //评估脚本所在仓库的拥有者 + RepoName string `json:"repo_name"` //评估脚本所在仓库的名称 } type GpuInfos struct { diff --git a/modules/cloudbrain/cloudbrain.go b/modules/cloudbrain/cloudbrain.go index 3af7d1019..f15443b30 100755 --- a/modules/cloudbrain/cloudbrain.go +++ b/modules/cloudbrain/cloudbrain.go @@ -17,7 +17,8 @@ const ( Command = `pip3 install jupyterlab==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple; service ssh stop; jupyter lab --no-browser --ip=0.0.0.0 --allow-root --notebook-dir="/code" --port=80 --LabApp.token="" --LabApp.allow_origin="self https://cloudbrain.pcl.ac.cn"` - CommandBenchmark = `echo "start benchmark";python /code/test.py;echo "end benchmark"` + //CommandBenchmark = `echo "start benchmark";python /code/test.py;echo "end benchmark"` + CommandBenchmark = `echo "start benchmark";cd /benchmark && bash run_bk.sh;echo "end benchmark"` CodeMountPath = "/code" DataSetMountPath = "/dataset" ModelMountPath = "/model" diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 03ae51d86..342ad6161 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1062,12 +1062,12 @@ func CloudBrainBenchmarkNew(ctx *context.Context) { ctx.HTML(200, tplCloudBrainBenchmarkNew) } -func getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID int) (string, error) { - uuid := "" +func getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID int) (*models.BenchmarkDataset, error) { + var childInfo *models.BenchmarkDataset if benchmarkTypes == nil { if err := json.Unmarshal([]byte(setting.BenchmarkTypes), &benchmarkTypes); err != nil { log.Error("json.Unmarshal BenchmarkTypes(%s) failed:%v", setting.BenchmarkTypes, err) - return uuid, err + return childInfo, err } } @@ -1076,7 +1076,7 @@ func getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID int) (string, if benchmarkType.Id == benchmarkTypeID { for _, childType := range benchmarkType.Second { if childType.Id == benchmarkChildTypeID { - uuid = childType.Attachment + childInfo = childType isExist = true break } @@ -1087,10 +1087,10 @@ func getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID int) (string, if !isExist { log.Error("no such benchmark_type_id&benchmark_child_type_id") - return uuid, errors.New("no such benchmark_type_id&benchmark_child_type_id") + return childInfo, errors.New("no such benchmark_type_id&benchmark_child_type_id") } - return uuid, nil + return childInfo, nil } func getBenchmarkGpuQueue(gpuQueue string) (string, error) { @@ -1161,7 +1161,7 @@ func CloudBrainBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainF return } - uuid, err := getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID) + childInfo, err := getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID) if err != nil { log.Error("getBenchmarkAttachment failed:%v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) @@ -1240,7 +1240,7 @@ func CloudBrainBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainF } } - if err := downloadRateCode(repo, jobName, setting.BenchmarkOwner, setting.BenchmarkName, benchmarkPath, form.BenchmarkCategory, gpuType); err != nil { + if err := downloadRateCode(repo, jobName, childInfo.Owner, childInfo.RepoName, benchmarkPath, form.BenchmarkCategory, gpuType); err != nil { log.Error("downloadRateCode failed, %v", err, ctx.Data["MsgID"]) //cloudBrainNewDataPrepare(ctx) //ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) @@ -1254,7 +1254,7 @@ func CloudBrainBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainF //return } - err = cloudbrain.GenerateTask(ctx, jobName, image, command, uuid, storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"), + err = cloudbrain.GenerateTask(ctx, jobName, image, command, childInfo.Attachment, storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.ModelMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"), string(models.JobTypeBenchmark), gpuQueue, form.Description,