package repo import ( "bufio" "encoding/json" "errors" "fmt" "io" "io/ioutil" "net/http" "os" "strconv" "strings" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/aisafety" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/services/cloudbrain/resource" "code.gitea.io/gitea/services/reward/point/account" ) const ( tplModelSafetyTestCreateGrampusGpu = "repo/modelsafety/newgrampusgpu" tplModelSafetyTestCreateGrampusNpu = "repo/modelsafety/newgrampusnpu" tplModelSafetyTestCreateGpu = "repo/modelsafety/newgpu" tplModelSafetyTestCreateNpu = "repo/modelsafety/newnpu" tplModelSafetyTestShow = "repo/modelsafety/show" ) func GetAiSafetyTaskByJob(job *models.Cloudbrain) { if job == nil { log.Error("GetCloudbrainByJobID failed") return } syncAiSafetyTaskStatus(job) } func GetAiSafetyTaskTmpl(ctx *context.Context) { ctx.Data["id"] = ctx.Params(":id") ctx.Data["PageIsCloudBrain"] = true ctx.HTML(200, tplModelSafetyTestShow) } func GetAiSafetyTask(ctx *context.Context) { var ID = ctx.Params(":id") job, err := models.GetCloudbrainByIDWithDeleted(ID) if err != nil { log.Error("GetCloudbrainByJobID failed:" + err.Error()) return } syncAiSafetyTaskStatus(job) job, err = models.GetCloudbrainByIDWithDeleted(ID) job.BenchmarkType = "安全评测" job.BenchmarkTypeName = "Image Classification" job.CanModify = cloudbrain.CanModifyJob(ctx, job) job.CanDel = cloudbrain.CanDeleteJob(ctx, job) if job.Parameters == "{\"parameter\":[]}" { job.Parameters = "" } s, err := resource.GetCloudbrainSpec(job.ID) if err == nil { job.Spec = s } user, err := models.GetUserByID(job.UserID) if err == nil { tmpUser := &models.User{ Name: user.Name, } job.User = tmpUser } ctx.JSON(200, job) } func StopAiSafetyTask(ctx *context.Context) { log.Info("start to stop the task.") var ID = ctx.Params(":id") task, err := models.GetCloudbrainByIDWithDeleted(ID) result := make(map[string]interface{}) result["result_code"] = "-1" if err != nil { log.Info("query task error.err=" + err.Error()) log.Error("GetCloudbrainByJobID failed:" + err.Error()) result["msg"] = "No such task." ctx.JSON(200, result) return } if isTaskNotFinished(task.Status) { if task.Type == models.TypeCloudBrainTwo { log.Info("start to stop model arts task.") _, err := modelarts.StopTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10)) if err != nil { log.Info("stop failed.err=" + err.Error()) } task.Status = string(models.JobStopped) if task.EndTime == 0 { task.EndTime = timeutil.TimeStampNow() } task.ComputeAndSetDuration() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) result["msg"] = "cloudbrain.Stopped_success_update_status_fail" ctx.JSON(200, result) return } //queryTaskStatusFromCloudbrainTwo(job) } else if task.Type == models.TypeCloudBrainOne { if task.Status == string(models.JobStopped) || task.Status == string(models.JobFailed) || task.Status == string(models.JobSucceeded) { log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) result["msg"] = "cloudbrain.Already_stopped" ctx.JSON(200, result) return } err := cloudbrain.StopJob(task.JobID) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) result["msg"] = "cloudbrain.Stopped_failed" ctx.JSON(200, result) return } task.Status = string(models.JobStopped) if task.EndTime == 0 { task.EndTime = timeutil.TimeStampNow() } task.ComputeAndSetDuration() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) result["msg"] = "cloudbrain.Stopped_success_update_status_fail" ctx.JSON(200, result) return } } } else { if task.Status == string(models.ModelSafetyTesting) { //修改为Failed task.Status = string(models.JobStopped) if task.EndTime == 0 { task.EndTime = timeutil.TimeStampNow() } task.ComputeAndSetDuration() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) result["msg"] = "cloudbrain.Stopped_success_update_status_fail" ctx.JSON(200, result) return } } else { log.Info("The job is finished. status=" + task.Status) } } result["result_code"] = "0" result["msg"] = "succeed" ctx.JSON(200, result) } func DelAiSafetyTask(ctx *context.Context) { var ID = ctx.Params(":id") task, err := models.GetCloudbrainByIDWithDeleted(ID) if err != nil { log.Error("GetCloudbrainByJobID failed:" + err.Error()) ctx.ServerError("No such task.", err) return } if task.Status != string(models.JobStopped) && task.Status != string(models.JobFailed) && task.Status != string(models.JobSucceeded) { log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) ctx.ServerError("the job("+task.JobName+") has not been stopped", nil) return } if task.Type == models.TypeCloudBrainOne { DeleteCloudbrainJobStorage(task.JobName, models.TypeCloudBrainOne) } err = models.DeleteJob(task) if err != nil { ctx.ServerError(err.Error(), err) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain/benchmark") } func syncAiSafetyTaskStatus(job *models.Cloudbrain) { log.Info("start to query safety task status.") if isTaskNotFinished(job.Status) { if job.Type == models.TypeCloudBrainTwo { queryTaskStatusFromCloudbrainTwo(job) } else if job.Type == models.TypeCloudBrainOne { queryTaskStatusFromCloudbrain(job) } } else { if job.Status == string(models.ModelSafetyTesting) { queryTaskStatusFromModelSafetyTestServer(job) } else { log.Info("The job is finished. status=" + job.Status) } } } func TimerHandleModelSafetyTestTask() { log.Info("start to TimerHandleModelSafetyTestTask") tasks, err := models.GetModelSafetyTestTask() if err == nil { if tasks != nil && len(tasks) > 0 { for _, job := range tasks { syncAiSafetyTaskStatus(job) } } else { log.Info("query running model safety test task 0.") } } else { log.Info("query running model safety test task err." + err.Error()) } } func queryTaskStatusFromCloudbrainTwo(job *models.Cloudbrain) { log.Info("The task not finished,name=" + job.DisplayJobName) result, err := modelarts.GetTrainJob(job.JobID, strconv.FormatInt(job.VersionID, 10)) if err != nil { log.Info("query train job error." + err.Error()) return } job.Status = modelarts.TransTrainJobStatus(result.IntStatus) job.Duration = result.Duration / 1000 job.TrainJobDuration = result.TrainJobDuration if job.StartTime == 0 && result.StartTime > 0 { job.StartTime = timeutil.TimeStamp(result.StartTime / 1000) } job.TrainJobDuration = models.ConvertDurationToStr(job.Duration) if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 { job.EndTime = job.StartTime.Add(job.Duration) } job.CorrectCreateUnix() if job.Status != string(models.ModelArtsTrainJobCompleted) { log.Info("CloudbrainTwo task status=" + job.Status) err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } } else { log.Info("start to deal ModelSafetyTesting, task status=" + job.Status) job.Status = string(models.ModelSafetyTesting) err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } //send msg to beihang sendNPUInferenceResultToTest(job) } } func queryTaskStatusFromCloudbrain(job *models.Cloudbrain) { log.Info("The task not finished,name=" + job.DisplayJobName) jobResult, err := cloudbrain.GetJob(job.JobID) result, err := models.ConvertToJobResultPayload(jobResult.Payload) if err != nil { log.Error("ConvertToJobResultPayload failed:", err) return } job.Status = result.JobStatus.State if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { taskRoles := result.TaskRoles taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) job.Status = taskRes.TaskStatuses[0].State } models.ParseAndSetDurationFromCloudBrainOne(result, job) //updateCloudBrainOneJobTime(job) log.Info("cloud brain one job status=" + job.Status) if result.JobStatus.State != string(models.JobSucceeded) { err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } } else { // job.Status = string(models.ModelSafetyTesting) err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } //send msg to beihang sendGPUInferenceResultToTest(job) } } func queryTaskStatusFromModelSafetyTestServer(job *models.Cloudbrain) { result, err := aisafety.GetTaskStatus(job.PreVersionName) if err == nil { if result.Code == "0" { if result.Data.Status == 1 { log.Info("The task is running....") } else { if result.Data.Code == 0 { job.ResultJson = result.Data.StandardJson job.Status = string(models.JobSucceeded) err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } } else { job.ResultJson = result.Data.Msg job.Status = string(models.JobFailed) err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } } } } else { log.Info("The task is failed.") job.Status = string(models.JobFailed) err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } } } else { log.Info("The task not found.....") } } func getAisafetyTaskReq(job *models.Cloudbrain) aisafety.TaskReq { datasetname := job.DatasetName datasetnames := strings.Split(datasetname, ";") indicator := job.LabelName EvalContent := "test1" if job.Description != "" { EvalContent = job.Description } req := aisafety.TaskReq{ UnionId: job.JobID, EvalName: job.DisplayJobName, EvalContent: EvalContent, TLPath: "test1", Indicators: strings.Split(indicator, ";"), CDName: strings.Split(datasetnames[1], ".")[0], BDName: strings.Split(datasetnames[0], ".")[0] + "基础数据集", } log.Info("CDName=" + req.CDName) log.Info("BDName=" + req.BDName) return req } func sendGPUInferenceResultToTest(job *models.Cloudbrain) { log.Info("send sendGPUInferenceResultToTest") req := getAisafetyTaskReq(job) resultDir := "/result" prefix := setting.CBCodePathPrefix + job.JobName + resultDir files, err := storage.GetOneLevelAllObjectUnderDirMinio(setting.Attachment.Minio.Bucket, prefix, "") if err != nil { log.Error("query cloudbrain one model failed: %v", err) return } jsonContent := "" for _, file := range files { if strings.HasSuffix(file.FileName, "result.json") { path := storage.GetMinioPath(job.JobName+resultDir+"/", file.FileName) log.Info("path=" + path) reader, err := os.Open(path) defer reader.Close() if err == nil { r := bufio.NewReader(reader) for { line, error := r.ReadString('\n') jsonContent += line if error == io.EOF { log.Info("read file completed.") break } if error != nil { log.Info("read file error." + error.Error()) break } } } break } } if jsonContent != "" { sendHttpReqToBeihang(job, jsonContent, req) } else { updateJobFailed(job, "推理生成的Json数据为空,无法进行评测。") } } func sendNPUInferenceResultToTest(job *models.Cloudbrain) { log.Info("start to sendNPUInferenceResultToTest") req := getAisafetyTaskReq(job) jsonContent := "" VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount) resultPath := modelarts.JobPath + job.JobName + modelarts.ResultPath + VersionOutputPath + "/result.json" resultPath = resultPath[1:] log.Info("bucket=" + setting.Bucket + " resultPath=" + resultPath) body, err := storage.ObsDownloadAFile(setting.Bucket, resultPath) if err != nil { log.Info("ObsDownloadAFile error." + err.Error() + " resultPath=" + resultPath) } else { defer body.Close() var data []byte p := make([]byte, 4096) var readErr error var readCount int for { readCount, readErr = body.Read(p) if readCount > 0 { data = append(data, p[:readCount]...) } if readErr != nil || readCount == 0 { break } } jsonContent = string(data) } if jsonContent != "" { sendHttpReqToBeihang(job, jsonContent, req) } else { updateJobFailed(job, "推理生成的Json数据为空,无法进行评测。") } } func updateJobFailed(job *models.Cloudbrain, msg string) { log.Info("The json is null. so set it failed.") //update task failed. job.Status = string(models.ModelArtsTrainJobFailed) job.ResultJson = msg err := models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } } func sendHttpReqToBeihang(job *models.Cloudbrain, jsonContent string, req aisafety.TaskReq) { log.Info("start to send beihang ...") serialNo, err := aisafety.CreateSafetyTask(req, jsonContent) if err == nil { //update serial no to db job.PreVersionName = serialNo err = models.UpdateJob(job) if err != nil { log.Error("UpdateJob failed:", err) } } } func isTaskNotFinished(status string) bool { if status == string(models.ModelArtsTrainJobRunning) || status == string(models.ModelArtsTrainJobWaiting) { return true } if status == string(models.JobWaiting) || status == string(models.JobRunning) { return true } if status == string(models.ModelArtsTrainJobUnknown) || status == string(models.ModelArtsTrainJobInit) { return true } if status == string(models.ModelArtsTrainJobImageCreating) || status == string(models.ModelArtsTrainJobSubmitTrying) { return true } return false } func AiSafetyCreateForGetGPU(ctx *context.Context) { t := time.Now() ctx.Data["PageIsCloudBrain"] = true ctx.Data["IsCreate"] = true ctx.Data["type"] = models.TypeCloudBrainOne ctx.Data["compute_resource"] = models.GPUResource ctx.Data["datasetType"] = models.TypeCloudBrainOne ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.GPUBaseDataSetName ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.GPUBaseDataSetUUID ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.GPUCombatDataSetName ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.GPUCombatDataSetUUID log.Info("GPUBaseDataSetName=" + setting.ModelSafetyTest.GPUBaseDataSetName) log.Info("GPUBaseDataSetUUID=" + setting.ModelSafetyTest.GPUBaseDataSetUUID) log.Info("GPUCombatDataSetName=" + setting.ModelSafetyTest.GPUCombatDataSetName) log.Info("GPUCombatDataSetUUID=" + setting.ModelSafetyTest.GPUCombatDataSetUUID) var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["display_job_name"] = displayJobName prepareCloudbrainOneSpecs(ctx) queuesDetail, _ := cloudbrain.GetQueuesDetail() if queuesDetail != nil { ctx.Data["QueuesDetail"] = queuesDetail reqPara, _ := json.Marshal(queuesDetail) log.Warn("The GPU WaitCount json:", string(reqPara)) } else { log.Info("The GPU WaitCount not get") } NotStopTaskCount, _ := models.GetModelSafetyCountByUserID(ctx.User.ID) ctx.Data["NotStopTaskCount"] = NotStopTaskCount ctx.HTML(200, tplModelSafetyTestCreateGpu) } func AiSafetyCreateForGetNPU(ctx *context.Context) { t := time.Now() ctx.Data["PageIsCloudBrain"] = true ctx.Data["IsCreate"] = true ctx.Data["type"] = models.TypeCloudBrainTwo ctx.Data["compute_resource"] = models.NPUResource var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["display_job_name"] = displayJobName ctx.Data["datasetType"] = models.TypeCloudBrainTwo ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.NPUBaseDataSetName ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.NPUBaseDataSetUUID ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.NPUCombatDataSetName ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.NPUCombatDataSetUUID log.Info("NPUBaseDataSetName=" + setting.ModelSafetyTest.NPUBaseDataSetName) log.Info("NPUBaseDataSetUUID=" + setting.ModelSafetyTest.NPUBaseDataSetUUID) log.Info("NPUCombatDataSetName=" + setting.ModelSafetyTest.NPUCombatDataSetName) log.Info("NPUCombatDataSetUUID=" + setting.ModelSafetyTest.NPUCombatDataSetUUID) var resourcePools modelarts.ResourcePool if err := json.Unmarshal([]byte(setting.ResourcePools), &resourcePools); err != nil { ctx.ServerError("json.Unmarshal failed:", err) } ctx.Data["resource_pools"] = resourcePools.Info var engines modelarts.Engine if err := json.Unmarshal([]byte(setting.Engines), &engines); err != nil { ctx.ServerError("json.Unmarshal failed:", err) } ctx.Data["engines"] = engines.Info var versionInfos modelarts.VersionInfo if err := json.Unmarshal([]byte(setting.EngineVersions), &versionInfos); err != nil { ctx.ServerError("json.Unmarshal failed:", err) } ctx.Data["engine_versions"] = versionInfos.Version prepareCloudbrainTwoInferenceSpecs(ctx) waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeCloudBrainTwo, "") ctx.Data["WaitCount"] = waitCount log.Info("The NPU WaitCount is " + fmt.Sprint(waitCount)) NotStopTaskCount, _ := models.GetModelSafetyCountByUserID(ctx.User.ID) ctx.Data["NotStopTaskCount"] = NotStopTaskCount ctx.HTML(200, tplModelSafetyTestCreateNpu) } func AiSafetyCreateForPost(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true displayJobName := ctx.Query("display_job_name") jobName := util.ConvertDisplayJobNameToJobName(displayJobName) taskType := ctx.QueryInt("type") description := ctx.Query("description") ctx.Data["type"] = taskType ctx.Data["displayJobName"] = displayJobName ctx.Data["description"] = description repo := ctx.Repo.Repository tpname := tplCloudBrainModelSafetyNewNpu if taskType == models.TypeCloudBrainOne { tpname = tplCloudBrainModelSafetyNewGpu } tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeModelSafety), displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) modelSafetyNewDataPrepare(ctx) ctx.RenderWithErr("the job name did already exist", tpname, nil) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) modelSafetyNewDataPrepare(ctx) ctx.RenderWithErr("system error", tpname, nil) return } } if !jobNamePattern.MatchString(jobName) { modelSafetyNewDataPrepare(ctx) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpname, nil) return } count, err := models.GetModelSafetyCountByUserID(ctx.User.ID) if err != nil { log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) modelSafetyNewDataPrepare(ctx) ctx.RenderWithErr("system error", tpname, nil) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) modelSafetyNewDataPrepare(ctx) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain.morethanonejob"), tpname, nil) return } } BootFile := ctx.Query("boot_file") BootFile = strings.TrimSpace(BootFile) bootFileExist, err := ctx.Repo.FileExists(BootFile, cloudbrain.DefaultBranchName) if err != nil || !bootFileExist { log.Error("Get bootfile error:", err) modelSafetyNewDataPrepare(ctx) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpname, nil) return } if taskType == models.TypeCloudBrainTwo { err = createForNPU(ctx, jobName) } else if taskType == models.TypeCloudBrainOne { err = createForGPU(ctx, jobName) } if err != nil { modelSafetyNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tpname, nil) } else { log.Info("to redirect...") ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain/benchmark") } } func createForNPU(ctx *context.Context, jobName string) error { VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount) BootFile := ctx.Query("boot_file") BootFile = strings.TrimSpace(BootFile) displayJobName := ctx.Query("display_job_name") description := ctx.Query("description") srcDataset := ctx.Query("src_dataset") //uuid combatDataset := ctx.Query("combat_dataset") //uuid evaluationIndex := ctx.Query("evaluation_index") Params := ctx.Query("run_para_list") specId := ctx.QueryInt64("spec_id") engineID := ctx.QueryInt("engine_id") log.Info("engine_id=" + fmt.Sprint(engineID)) poolID := ctx.Query("pool_id") repo := ctx.Repo.Repository trainUrl := ctx.Query("train_url") modelName := ctx.Query("model_name") modelVersion := ctx.Query("model_version") ckptName := ctx.Query("ckpt_name") ckptUrl := "/" + trainUrl + ckptName log.Info("ckpt url:" + ckptUrl) FlavorName := ctx.Query("flaver_names") EngineName := ctx.Query("engine_names") isLatestVersion := modelarts.IsLatestVersion VersionCount := modelarts.VersionCountOne codeLocalPath := setting.JobPath + jobName + modelarts.CodePath codeObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.CodePath resultObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.ResultPath + VersionOutputPath + "/" logObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.LogPath + VersionOutputPath + "/" log.Info("ckpt url:" + ckptUrl) spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{ JobType: models.JobTypeInference, ComputeResource: models.NPU, Cluster: models.OpenICluster, AiCenterCode: models.AICenterOfCloudBrainTwo}) if err != nil || spec == nil { //ctx.RenderWithErr("Resource specification not available", tplCloudBrainModelSafetyNewNpu, nil) return errors.New("Resource specification not available") } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID) return errors.New(ctx.Tr("points.insufficient_points_balance")) } //todo: del the codeLocalPath _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } gitRepo, _ := git.OpenRepository(repo.RepoPath()) commitID, _ := gitRepo.GetBranchCommitID(cloudbrain.DefaultBranchName) if err := downloadCode(repo, codeLocalPath, cloudbrain.DefaultBranchName); err != nil { log.Error("Create task failed, server timed out: %s (%v)", repo.FullName(), err) return errors.New(ctx.Tr("cloudbrain.load_code_failed")) } //todo: upload code (send to file_server todo this work?) if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.ResultPath + VersionOutputPath + "/"); err != nil { log.Error("Failed to obsMkdir_result: %s (%v)", repo.FullName(), err) return errors.New("Failed to obsMkdir_result") } if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.LogPath + VersionOutputPath + "/"); err != nil { log.Error("Failed to obsMkdir_log: %s (%v)", repo.FullName(), err) return errors.New("Failed to obsMkdir_log") } if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) return errors.New(ctx.Tr("cloudbrain.load_code_failed")) } var parameters models.Parameters param := make([]models.Parameter, 0) param = append(param, models.Parameter{ Label: modelarts.ResultUrl, Value: "s3:/" + resultObsPath, }, models.Parameter{ Label: modelarts.CkptUrl, Value: "s3:/" + ckptUrl, }) uuid := srcDataset + ";" + combatDataset datasUrlList, dataUrl, datasetNames, isMultiDataset, err := getDatasUrlListByUUIDS(uuid) if err != nil { return err } dataPath := dataUrl jsondatas, err := json.Marshal(datasUrlList) if err != nil { log.Error("Failed to Marshal: %v", err) return err } if isMultiDataset { param = append(param, models.Parameter{ Label: modelarts.MultiDataUrl, Value: string(jsondatas), }) } existDeviceTarget := false if len(Params) != 0 { err := json.Unmarshal([]byte(Params), ¶meters) if err != nil { log.Error("Failed to Unmarshal params: %s (%v)", Params, err) return errors.New("运行参数错误") } for _, parameter := range parameters.Parameter { if parameter.Label == modelarts.DeviceTarget { existDeviceTarget = true } if parameter.Label != modelarts.TrainUrl && parameter.Label != modelarts.DataUrl { param = append(param, models.Parameter{ Label: parameter.Label, Value: parameter.Value, }) } } } if !existDeviceTarget { param = append(param, models.Parameter{ Label: modelarts.DeviceTarget, Value: modelarts.Ascend, }) } req := &modelarts.GenerateInferenceJobReq{ JobName: jobName, DisplayJobName: displayJobName, DataUrl: dataPath, Description: description, CodeObsPath: codeObsPath, BootFileUrl: codeObsPath + BootFile, BootFile: BootFile, TrainUrl: trainUrl, WorkServerNumber: 1, EngineID: int64(engineID), LogUrl: logObsPath, PoolID: poolID, Uuid: uuid, Parameters: param, //modelarts train parameters CommitID: commitID, BranchName: cloudbrain.DefaultBranchName, Params: Params, FlavorName: FlavorName, EngineName: EngineName, LabelName: evaluationIndex, IsLatestVersion: isLatestVersion, VersionCount: VersionCount, TotalVersionCount: modelarts.TotalVersionCount, ModelName: modelName, ModelVersion: modelVersion, CkptName: ckptName, ResultUrl: resultObsPath, Spec: spec, DatasetName: datasetNames, JobType: string(models.JobTypeModelSafety), } _, err = modelarts.GenerateInferenceJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error()) return err } return nil } func createForGPU(ctx *context.Context, jobName string) error { BootFile := ctx.Query("boot_file") BootFile = strings.TrimSpace(BootFile) displayJobName := ctx.Query("display_job_name") description := ctx.Query("description") image := strings.TrimSpace(ctx.Query("image")) srcDataset := ctx.Query("src_dataset") //uuid combatDataset := ctx.Query("combat_dataset") //uuid evaluationIndex := ctx.Query("evaluation_index") Params := ctx.Query("run_para_list") specId := ctx.QueryInt64("spec_id") TrainUrl := ctx.Query("train_url") CkptName := ctx.Query("ckpt_name") modelName := ctx.Query("model_name") modelVersion := ctx.Query("model_version") ckptUrl := setting.Attachment.Minio.RealPath + TrainUrl + CkptName log.Info("ckpt url:" + ckptUrl) spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{ JobType: models.JobTypeBenchmark, ComputeResource: models.GPU, Cluster: models.OpenICluster, AiCenterCode: models.AICenterOfCloudBrainOne}) if err != nil || spec == nil { return errors.New("Resource specification not available") } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID) return errors.New(ctx.Tr("points.insufficient_points_balance")) } repo := ctx.Repo.Repository codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath os.RemoveAll(codePath) if err := downloadCode(repo, codePath, cloudbrain.DefaultBranchName); err != nil { log.Error("downloadCode failed, %v", err, ctx.Data["MsgID"]) return errors.New("system error") } err = uploadCodeToMinio(codePath+"/", jobName, cloudbrain.CodeMountPath+"/") if err != nil { log.Error("uploadCodeToMinio failed, %v", err, ctx.Data["MsgID"]) return errors.New("system error") } uuid := srcDataset + ";" + combatDataset datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid) log.Info("uuid=" + uuid) if err != nil { log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) return errors.New(ctx.Tr("cloudbrain.error.dataset_select")) } command, err := getGpuModelSafetyCommand(BootFile, Params, CkptName, displayJobName) if err != nil { log.Error("Get Command failed: %v", err, ctx.Data["MsgID"]) return errors.New(ctx.Tr("cloudbrain.error.dataset_select")) } log.Info("Command=" + command) req := cloudbrain.GenerateCloudBrainTaskReq{ Ctx: ctx, DisplayJobName: displayJobName, JobName: jobName, Image: image, Command: command, Uuids: uuid, DatasetNames: datasetNames, DatasetInfos: datasetInfos, CodePath: storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"), ModelPath: setting.Attachment.Minio.RealPath + TrainUrl, BenchmarkPath: storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"), Snn4ImageNetPath: storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"), BrainScorePath: storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"), JobType: string(models.JobTypeModelSafety), Description: description, BranchName: cloudbrain.DefaultBranchName, BootFile: BootFile, Params: Params, CommitID: "", ModelName: modelName, ModelVersion: modelVersion, CkptName: CkptName, ResultPath: storage.GetMinioPath(jobName, cloudbrain.ResultPath+"/"), Spec: spec, LabelName: evaluationIndex, } _, err = cloudbrain.GenerateTask(req) if err != nil { return err } return nil } func getGpuModelSafetyCommand(BootFile string, params string, CkptName string, DisplayJobName string) (string, error) { var command string bootFile := strings.TrimSpace(BootFile) if !strings.HasSuffix(bootFile, ".py") { log.Error("bootFile(%s) format error", bootFile) return command, errors.New("bootFile format error") } var parameters models.Parameters var param string if len(params) != 0 { err := json.Unmarshal([]byte(params), ¶meters) if err != nil { log.Error("Failed to Unmarshal params: %s (%v)", params, err) return command, err } for _, parameter := range parameters.Parameter { param += " --" + parameter.Label + "=" + parameter.Value } } param += " --ckpt_url=" + cloudbrain.ModelMountPath + "/" + CkptName command += "python /code/" + bootFile + param + " > " + cloudbrain.ResultPath + "/" + DisplayJobName + "-" + cloudbrain.LogFile return command, nil } func modelSafetyNewDataPrepare(ctx *context.Context) error { ctx.Data["PageIsCloudBrain"] = true ctx.Data["type"] = ctx.QueryInt("type") ctx.Data["boot_file"] = ctx.Query("boot_file") ctx.Data["display_job_name"] = ctx.Query("display_job_name") ctx.Data["description"] = ctx.Query("description") ctx.Data["image"] = strings.TrimSpace(ctx.Query("image")) ctx.Data["src_dataset"] = ctx.Query("src_dataset") //uuid ctx.Data["combat_dataset"] = ctx.Query("combat_dataset") //uuid ctx.Data["evaluationIndex"] = ctx.Query("evaluation_index") ctx.Data["run_para_list"] = ctx.Query("run_para_list") ctx.Data["spec_id"] = ctx.QueryInt64("spec_id") ctx.Data["train_url"] = ctx.Query("train_url") ctx.Data["ckpt_name"] = ctx.Query("ckpt_name") ctx.Data["train_url"] = ctx.Query("train_url") ctx.Data["ckpt_name"] = ctx.Query("ckpt_name") ctx.Data["model_name"] = ctx.Query("model_name") ctx.Data["model_version"] = ctx.Query("model_version") NotStopTaskCount, _ := models.GetModelSafetyCountByUserID(ctx.User.ID) ctx.Data["NotStopTaskCount"] = NotStopTaskCount if ctx.QueryInt("type") == models.TypeCloudBrainOne { ctx.Data["type"] = models.TypeCloudBrainOne ctx.Data["compute_resource"] = models.GPUResource ctx.Data["datasetType"] = models.TypeCloudBrainOne ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.GPUBaseDataSetName ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.GPUBaseDataSetUUID ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.GPUCombatDataSetName ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.GPUCombatDataSetUUID prepareCloudbrainOneSpecs(ctx) queuesDetail, _ := cloudbrain.GetQueuesDetail() if queuesDetail != nil { ctx.Data["QueuesDetail"] = queuesDetail } } else { ctx.Data["engine_id"] = ctx.QueryInt("engine_id") ctx.Data["pool_id"] = ctx.Query("pool_id") ctx.Data["type"] = models.TypeCloudBrainTwo ctx.Data["compute_resource"] = models.NPUResource ctx.Data["datasetType"] = models.TypeCloudBrainTwo ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.NPUBaseDataSetName ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.NPUBaseDataSetUUID ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.NPUCombatDataSetName ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.NPUCombatDataSetUUID var engines modelarts.Engine if err := json.Unmarshal([]byte(setting.Engines), &engines); err != nil { ctx.ServerError("json.Unmarshal failed:", err) } ctx.Data["engines"] = engines.Info var versionInfos modelarts.VersionInfo if err := json.Unmarshal([]byte(setting.EngineVersions), &versionInfos); err != nil { ctx.ServerError("json.Unmarshal failed:", err) } ctx.Data["engine_versions"] = versionInfos.Version prepareCloudbrainTwoInferenceSpecs(ctx) waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeCloudBrainTwo, "") ctx.Data["WaitCount"] = waitCount } return nil } func getJsonContent(url string) (string, error) { resp, err := http.Get(url) if err != nil || resp.StatusCode != 200 { log.Info("Get organizations url error=" + err.Error()) return "", err } bytes, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { log.Info("Get organizations url error=" + err.Error()) return "", err } str := string(bytes) //log.Info("json str =" + str) return str, nil }