package repo import ( "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "os" "path" "strconv" "strings" "code.gitea.io/gitea/modules/urfs_client/urchin" "code.gitea.io/gitea/routers/response" "code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" "code.gitea.io/gitea/modules/dataset" "code.gitea.io/gitea/services/cloudbrain/resource" "code.gitea.io/gitea/services/reward/point/account" "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" "github.com/unknwon/com" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" cloudbrainService "code.gitea.io/gitea/services/cloudbrain" ) const ( tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show" tplGrampusNotebookShow base.TplName = "repo/grampus/notebook/show" //GPU tplGrampusNotebookGPUNew base.TplName = "repo/grampus/notebook/gpu/new" tplGrampusTrainJobGPUNew base.TplName = "repo/grampus/trainjob/gpu/new" //NPU tplGrampusNotebookNPUNew base.TplName = "repo/grampus/notebook/npu/new" tplGrampusTrainJobNPUNew base.TplName = "repo/grampus/trainjob/npu/new" ) func GrampusNotebookNew(ctx *context.Context) { ctx.Data["IsCreate"] = true notebookType := ctx.QueryInt("type") processType := grampus.ProcessorTypeGPU if notebookType == 1 { processType = grampus.ProcessorTypeNPU } err := grampusNotebookNewDataPrepare(ctx, processType) if err != nil { ctx.ServerError("get new notebook-job info failed", err) return } if processType == grampus.ProcessorTypeGPU { ctx.HTML(http.StatusOK, tplGrampusNotebookGPUNew) } else { ctx.HTML(http.StatusOK, tplGrampusNotebookNPUNew) } } func GrampusTrainJobGPUNew(ctx *context.Context) { ctx.Data["IsCreate"] = true err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) if err != nil { ctx.ServerError("get new train-job info failed", err) return } ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew) } func GrampusTrainJobNPUNew(ctx *context.Context) { ctx.Data["IsCreate"] = true err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) if err != nil { ctx.ServerError("get new train-job info failed", err) return } ctx.HTML(200, tplGrampusTrainJobNPUNew) } func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebookForm) { ctx.Data["IsCreate"] = true displayJobName := form.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) uuid := form.Attachment description := form.Description repo := ctx.Repo.Repository branchName := form.BranchName image := strings.TrimSpace(form.Image) codeStoragePath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" tpl := tplGrampusNotebookGPUNew processType := grampus.ProcessorTypeGPU computeSource := models.GPUResource computeSourceSimple := models.GPU if form.Type == 1 { tpl = tplGrampusNotebookNPUNew processType = grampus.ProcessorTypeNPU computeSource = models.NPUResource computeSourceSimple = models.NPU codeStoragePath = grampus.JobPath + jobName + modelarts.CodePath } lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) defer lock.UnLock() isOk, err := lock.Lock(models.CloudbrainKeyDuration) if !isOk { log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) return } if !jobNamePattern.MatchString(displayJobName) { grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) return } //check count limit count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeDebug), computeSource) if err != nil { log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("system error", tpl, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tpl, &form) return } } //check whether the task name in the project is duplicated tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeDebug), displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("the job name did already exist", tpl, &form) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("system error", tpl, &form) return } } //check specification spec, err := resource.GetAndCheckSpec(ctx.User.ID, form.SpecId, models.FindSpecsOptions{ JobType: models.JobTypeDebug, ComputeResource: computeSourceSimple, Cluster: models.C2NetCluster, }) if err != nil || spec == nil { grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("Resource specification not available", tpl, &form) return } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("points.insufficient_points_balance"), tpl, &form) return } var datasetInfos map[string]models.DatasetInfo var datasetNames string //var if uuid != "" { datasetInfos, datasetNames, err = models.GetDatasetInfo(uuid, computeSourceSimple) if err != nil { log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form) return } } //prepare code and out path codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } if processType == grampus.ProcessorTypeGPU { if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } } else { if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } } commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) req := &grampus.GenerateNotebookJobReq{ JobName: jobName, DisplayJobName: displayJobName, ComputeResource: computeSource, ProcessType: processType, ImageUrl: image, ImageId: form.ImageID, Description: description, Uuid: uuid, CommitID: commitID, BranchName: branchName, DatasetNames: datasetNames, DatasetInfos: datasetInfos, Spec: spec, CodeStoragePath: codeStoragePath, CodeName: strings.ToLower(repo.Name), } if form.ModelName != "" { //使用预训练模型训练 _, err := models.QueryModelByPath(form.PreTrainModelUrl) if err != nil { log.Error("Can not find model", err) grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("repo.modelconvert.manage.model_not_exist"), tpl, &form) return } req.ModelName = form.ModelName req.LabelName = form.LabelName req.CkptName = form.CkptName req.ModelVersion = form.ModelVersion req.PreTrainModelUrl = form.PreTrainModelUrl req.PreTrainModelPath = getPreTrainModelPath(form.PreTrainModelUrl, form.CkptName) } _, err = grampus.GenerateNotebookJob(ctx, req) if err != nil { log.Error("GenerateNotebookJob failed:%v", err.Error(), ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, processType) ctx.RenderWithErr(err.Error(), tpl, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/debugjob?debugListType=all") } func grampusNotebookNewDataPrepare(ctx *context.Context, processType string) error { ctx.Data["PageIsCloudBrain"] = true var displayJobName = cloudbrainService.GetDisplayJobName(ctx.User.Name) ctx.Data["display_job_name"] = displayJobName //get valid images if processType == grampus.ProcessorTypeNPU { images, err := grampus.GetImages(processType, string(models.JobTypeDebug)) if err != nil { log.Error("GetImages failed:", err.Error()) } else { ctx.Data["images"] = images.Infos } } //prepare available specs computeResourceSimple := models.GPU datasetType := models.TypeCloudBrainOne computeResource := models.GPUResource if processType == grampus.ProcessorTypeNPU { computeResourceSimple = models.NPU datasetType = models.TypeCloudBrainTwo computeResource = models.NPUResource } prepareGrampusSpecs(ctx, computeResourceSimple, models.JobTypeDebug) //get branches branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0) if err != nil { log.Error("GetBranches error:", err.Error()) } else { ctx.Data["branches"] = branches } ctx.Data["branchName"] = ctx.Repo.BranchName ctx.Data["datasetType"] = datasetType waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, computeResource, models.JobTypeDebug) ctx.Data["WaitCount"] = waitCount NotStopTaskCount, _ := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeDebug), computeResource) ctx.Data["NotStopTaskCount"] = NotStopTaskCount ctx.Data["code_path"] = cloudbrain.CodeMountPath ctx.Data["dataset_path"] = cloudbrain.DataSetMountPath ctx.Data["model_path"] = cloudbrain.ModelMountPath return nil } func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error { ctx.Data["PageIsCloudBrain"] = true var displayJobName = cloudbrainService.GetDisplayJobName(ctx.User.Name) ctx.Data["display_job_name"] = displayJobName //get valid images if processType == grampus.ProcessorTypeNPU { images, err := grampus.GetImages(processType, string(models.JobTypeTrain)) if err != nil { log.Error("GetImages failed:", err.Error()) } else { ctx.Data["images"] = images.Infos } } //prepare available specs if processType == grampus.ProcessorTypeNPU { prepareGrampusSpecs(ctx, models.NPU) } else if processType == grampus.ProcessorTypeGPU { prepareGrampusSpecs(ctx, models.GPU) } //get branches branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0) if err != nil { log.Error("GetBranches error:", err.Error()) } else { ctx.Data["branches"] = branches } ctx.Data["branchName"] = ctx.Repo.BranchName if processType == grampus.ProcessorTypeGPU { ctx.Data["datasetType"] = models.TypeCloudBrainOne waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.GPUResource, models.JobTypeTrain) ctx.Data["WaitCount"] = waitCount NotStopTaskCount, _ := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeTrain), models.GPUResource) ctx.Data["NotStopTaskCount"] = NotStopTaskCount } else if processType == grampus.ProcessorTypeNPU { ctx.Data["datasetType"] = models.TypeCloudBrainTwo waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.NPUResource, models.JobTypeTrain) ctx.Data["WaitCount"] = waitCount NotStopTaskCount, _ := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeTrain), models.NPUResource) ctx.Data["NotStopTaskCount"] = NotStopTaskCount } if ctx.Cloudbrain != nil { uuids, datasetNames := dataset.GetFilterDeletedAttachments(ctx.Cloudbrain.Uuid) ctx.Data["attachment"] = uuids ctx.Data["boot_file"] = ctx.Cloudbrain.BootFile ctx.Data["image_id"] = ctx.Cloudbrain.ImageID ctx.Data["run_para_list"] = ctx.Cloudbrain.Parameters ctx.Data["description"] = ctx.Cloudbrain.Description ctx.Data["branch_name"] = ctx.Cloudbrain.BranchName ctx.Data["engine_name"] = ctx.Cloudbrain.EngineName ctx.Data["work_server_number"] = ctx.Cloudbrain.WorkServerNumber if ctx.Cloudbrain.Image != "" { ctx.Data["image"] = ctx.Cloudbrain.Image } else { ctx.Data["image"] = ctx.Cloudbrain.EngineName } ctx.Data["dataset_name"] = datasetNames ctx.Data["model_name"] = ctx.Cloudbrain.ModelName ctx.Data["model_version"] = ctx.Cloudbrain.ModelVersion ctx.Data["ckpt_name"] = ctx.Cloudbrain.CkptName ctx.Data["label_names"] = ctx.Cloudbrain.LabelName ctx.Data["pre_train_model_url"] = ctx.Cloudbrain.PreTrainModelUrl spec, _ := resource.GetCloudbrainSpec(ctx.Cloudbrain.ID) if spec != nil { ctx.Data["spec_id"] = spec.ID } } return nil } func GrampusTrainJobVersionNew(ctx *context.Context) { task := ctx.Cloudbrain ctx.Data["IsCreate"] = false if task.ComputeResource == models.GPUResource { err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) if err != nil { ctx.ServerError("get new train-job version info failed", err) return } ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew) } else if task.ComputeResource == models.NPUResource { err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) if err != nil { ctx.ServerError("get new train-job version info failed", err) return } ctx.HTML(200, tplGrampusTrainJobNPUNew) } } func prepareGrampusSpecs(ctx *context.Context, computeResource string, jobType ...models.JobType) { tempJobType := models.JobTypeTrain if len(jobType) > 0 { tempJobType = jobType[0] } noteBookSpecs, _ := resource.FindAvailableSpecs(ctx.User.ID, models.FindSpecsOptions{ JobType: tempJobType, ComputeResource: computeResource, Cluster: models.C2NetCluster, }) ctx.Data["Specs"] = noteBookSpecs } func grampusParamCheckCreateTrainJob(form auth.CreateGrampusTrainJobForm) error { if !strings.HasSuffix(strings.TrimSpace(form.BootFile), ".py") { log.Error("the boot file(%s) must be a python file", form.BootFile) return errors.New("启动文件必须是python文件") } if form.BranchName == "" { log.Error("the branch must not be null!", form.BranchName) return errors.New("代码分支不能为空!") } return nil } func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { ctx.Data["IsCreate"] = true grampusTrainJobGpuCreate(ctx, form) } func grampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { displayJobName := form.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) uuid := form.Attachment description := form.Description bootFile := strings.TrimSpace(form.BootFile) params := form.Params repo := ctx.Repo.Repository codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" branchName := form.BranchName image := strings.TrimSpace(form.Image) tpl := tplGrampusTrainJobGPUNew lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) isOk, err := lock.Lock(models.CloudbrainKeyDuration) if !isOk { log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobGPUNew, &form) return } defer lock.UnLock() if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) return } bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName) if err != nil || !bootFileExist { log.Error("Get bootfile error:", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpl, &form) return } //check count limit count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeTrain), models.GPUResource) if err != nil { log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr("system error", tpl, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tpl, &form) return } } //check param if err := grampusParamCheckCreateTrainJob(form); err != nil { log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(err.Error(), tpl, &form) return } //check whether the task name in the project is duplicated tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr("the job name did already exist", tpl, &form) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr("system error", tpl, &form) return } } //check specification spec, err := resource.GetAndCheckSpec(ctx.User.ID, form.SpecId, models.FindSpecsOptions{ JobType: models.JobTypeTrain, ComputeResource: models.GPU, Cluster: models.C2NetCluster, }) if err != nil || spec == nil { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr("Resource specification not available", tpl, &form) return } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("points.insufficient_points_balance"), tplGrampusTrainJobGPUNew, &form) return } //check dataset datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid, models.GPU) if err != nil { log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form) return } //prepare code and out path _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } //todo: upload code (send to file_server todo this work?) //upload code if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/" if err := mkModelPath(modelPath); err != nil { log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } //init model readme if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil { log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } var datasetRemotePath, allFileName string for _, datasetInfo := range datasetInfos { if datasetRemotePath == "" { datasetRemotePath = datasetInfo.DataLocalPath allFileName = datasetInfo.FullName } else { datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath allFileName = allFileName + ";" + datasetInfo.FullName } } //prepare command preTrainModelPath := getPreTrainModelPath(form.PreTrainModelUrl, form.CkptName) command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, form.CkptName, "") if err != nil { log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr("Create task failed, internal error", tpl, &form) return } commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) req := &grampus.GenerateTrainJobReq{ JobName: jobName, DisplayJobName: displayJobName, ComputeResource: models.GPUResource, ProcessType: grampus.ProcessorTypeGPU, Command: command, ImageUrl: image, Description: description, BootFile: bootFile, Uuid: uuid, CommitID: commitID, BranchName: branchName, Params: form.Params, EngineName: image, DatasetNames: datasetNames, DatasetInfos: datasetInfos, IsLatestVersion: modelarts.IsLatestVersion, VersionCount: modelarts.VersionCountOne, WorkServerNumber: 1, Spec: spec, } if form.ModelName != "" { //使用预训练模型训练 req.ModelName = form.ModelName req.LabelName = form.LabelName req.CkptName = form.CkptName req.ModelVersion = form.ModelVersion req.PreTrainModelUrl = form.PreTrainModelUrl } _, err = grampus.GenerateTrainJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(err.Error(), tpl, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") } func getPreTrainModelPath(pretrainModelDir string, fileName string) string { index := strings.Index(pretrainModelDir, "/") if index > 0 { filterBucket := pretrainModelDir[index+1:] return filterBucket + fileName } else { return "" } } func GrampusTrainJobVersionCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { ctx.Data["IsCreate"] = false computeResource := ctx.Query("compute_resource") if computeResource == models.GPUResource { grampusTrainJobGpuCreate(ctx, form) } else if computeResource == models.NPUResource { grampusTrainJobNpuCreate(ctx, form) } else { ctx.ServerError("resource error", errors.New("compute resource is not support")) return } } func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { ctx.Data["IsCreate"] = true grampusTrainJobNpuCreate(ctx, form) } func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { displayJobName := form.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) uuid := form.Attachment description := form.Description bootFile := strings.TrimSpace(form.BootFile) params := form.Params repo := ctx.Repo.Repository codeLocalPath := setting.JobPath + jobName + modelarts.CodePath codeObsPath := grampus.JobPath + jobName + modelarts.CodePath //dataObsPath := setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + "/" branchName := form.BranchName isLatestVersion := modelarts.IsLatestVersion versionCount := modelarts.VersionCountOne engineName := form.EngineName tpl := tplGrampusTrainJobNPUNew lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) isOk, err := lock.Lock(models.CloudbrainKeyDuration) if !isOk { log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobNPUNew, &form) return } defer lock.UnLock() if !jobNamePattern.MatchString(displayJobName) { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) return } bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName) if err != nil || !bootFileExist { log.Error("Get bootfile error:", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpl, &form) return } //check count limit count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeTrain), models.NPUResource) if err != nil { log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr("system error", tpl, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tpl, &form) return } } //check param if err := grampusParamCheckCreateTrainJob(form); err != nil { log.Error("paramCheckCreateTrainJob failed:(%v)", err) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(err.Error(), tpl, &form) return } //check whether the task name in the project is duplicated tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr("the job name did already exist", tpl, &form) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr("system error", tpl, &form) return } } //check specification spec, err := resource.GetAndCheckSpec(ctx.User.ID, form.SpecId, models.FindSpecsOptions{ JobType: models.JobTypeTrain, ComputeResource: models.NPU, Cluster: models.C2NetCluster, }) if err != nil || spec == nil { grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr("Resource specification not available", tpl, &form) return } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("points.insufficient_points_balance"), tplGrampusTrainJobNPUNew, &form) return } //check dataset datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid, models.NPU) if err != nil { log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form) return } //prepare code and out path _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } //todo: upload code (send to file_server todo this work?) if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil { log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) return } var datasetRemotePath, allFileName string for _, datasetInfo := range datasetInfos { if datasetRemotePath == "" { datasetRemotePath = datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'" allFileName = datasetInfo.FullName } else { datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'" allFileName = allFileName + ";" + datasetInfo.FullName } } //prepare command preTrainModelPath := getPreTrainModelPath(form.PreTrainModelUrl, form.CkptName) command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, form.CkptName, grampus.GetNpuModelRemoteObsUrl(jobName)) if err != nil { log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr("Create task failed, internal error", tpl, &form) return } commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) req := &grampus.GenerateTrainJobReq{ JobName: jobName, DisplayJobName: displayJobName, ComputeResource: models.NPUResource, ProcessType: grampus.ProcessorTypeNPU, Command: command, ImageId: form.ImageID, Description: description, CodeObsPath: codeObsPath, BootFileUrl: codeObsPath + bootFile, BootFile: bootFile, WorkServerNumber: form.WorkServerNumber, Uuid: uuid, CommitID: commitID, IsLatestVersion: isLatestVersion, BranchName: branchName, Params: form.Params, EngineName: engineName, VersionCount: versionCount, TotalVersionCount: modelarts.TotalVersionCount, DatasetNames: datasetNames, DatasetInfos: datasetInfos, Spec: spec, CodeName: strings.ToLower(repo.Name), } if form.ModelName != "" { //使用预训练模型训练 req.ModelName = form.ModelName req.LabelName = form.LabelName req.CkptName = form.CkptName req.ModelVersion = form.ModelVersion req.PreTrainModelUrl = form.PreTrainModelUrl req.PreTrainModelPath = preTrainModelPath } _, err = grampus.GenerateTrainJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error()) grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(err.Error(), tpl, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") } func GetGrampusNotebook(ctx *context.APIContext) { var ( err error ) ID := ctx.Params(":id") job, err := models.GetCloudbrainByID(ID) if err != nil { ctx.NotFound("", err) log.Error("GetCloudbrainByID failed:", err) return } jobAfter, err := cloudbrainTask.SyncGrampusNotebookStatus(job) aiCenterName := cloudbrainService.GetAiCenterShow(jobAfter.AiCenter, ctx.Context) if err != nil { ctx.NotFound(err) log.Error("Sync cloud brain one status failed:", err) return } ctx.JSON(http.StatusOK, map[string]interface{}{ "ID": ID, "JobName": jobAfter.JobName, "JobStatus": jobAfter.Status, "AiCenter": aiCenterName, "CreatedTime": jobAfter.CreatedUnix.Format("2006-01-02 15:04:05"), "CompletedTime": jobAfter.UpdatedUnix.Format("2006-01-02 15:04:05"), "JobDuration": jobAfter.TrainJobDuration, }) } func GrampusStopJob(ctx *context.Context) { var ID = ctx.Params(":id") var resultCode = "0" var errorMsg = "" var status = "" task := ctx.Cloudbrain for { if task.Status == models.GrampusStatusStopped || task.Status == models.GrampusStatusFailed || task.Status == models.GrampusStatusSucceeded { log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) resultCode = "-1" errorMsg = ctx.Tr("cloudbrain.Already_stopped") break } res, err := grampus.StopJob(task.JobID, task.JobType) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = strconv.Itoa(res.ErrorCode) errorMsg = ctx.Tr("cloudbrain.Stopped_failed") break } oldStatus := task.Status task.Status = getStopJobResponseStatus(res) if task.EndTime == 0 { task.EndTime = timeutil.TimeStampNow() } task.ComputeAndSetDuration() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) } err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } status = task.Status break } ctx.JSON(200, map[string]interface{}{ "result_code": resultCode, "error_msg": errorMsg, "status": status, "id": ID, "StatusOK": 0, }) } func getStopJobResponseStatus(res *models.GrampusStopJobResponse) string { newStatus := models.GrampusStatusStopping if res.Status != "" { newStatus = grampus.TransTrainJobStatus(res.Status) } return newStatus } func GrampusNotebookDel(ctx *context.Context) { var listType = ctx.Query("listType") if err := deleteGrampusJob(ctx); err != nil { log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"]) ctx.ServerError(err.Error(), err) return } var isAdminPage = ctx.Query("isadminpage") var isHomePage = ctx.Query("ishomepage") if ctx.IsUserSiteAdmin() && isAdminPage == "true" { ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains") } else if isHomePage == "true" { ctx.Redirect(setting.AppSubURL + "/cloudbrains") } else { ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/debugjob?debugListType=" + listType) } } func GrampusTrainJobDel(ctx *context.Context) { var listType = ctx.Query("listType") if err := deleteGrampusJob(ctx); err != nil { log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"]) ctx.ServerError(err.Error(), err) return } var isAdminPage = ctx.Query("isadminpage") var isHomePage = ctx.Query("ishomepage") if ctx.IsUserSiteAdmin() && isAdminPage == "true" { ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains") } else if isHomePage == "true" { ctx.Redirect(setting.AppSubURL + "/cloudbrains") } else { ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType) } } func deleteGrampusJob(ctx *context.Context) error { task := ctx.Cloudbrain if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed { log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) return errors.New(ctx.Tr("cloudbrain.Not_Stopped")) } err := models.DeleteJob(task) if err != nil { log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"]) return err } storageType := models.TypeCloudBrainOne if task.ComputeResource == models.NPUResource { storageType = models.TypeCloudBrainTwo } DeleteCloudbrainJobStorage(task.JobName, storageType) return nil } type NotebookDataset struct { DatasetUrl string `json:"dataset_url"` } func GrampusNotebookShow(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true var task *models.Cloudbrain task, err := models.GetCloudbrainByIDWithDeleted(ctx.Params(":id")) if err != nil { log.Error("GetCloudbrainByID failed:" + err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) return } task.ContainerIp = "" if task.DeletedAt.IsZero() && cloudbrainTask.IsTaskNotStop(task) { //normal record result, err := grampus.GetNotebookJob(task.JobID) if err != nil { log.Error("GetJob failed:" + err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) return } if result != nil { if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] } oldStatus := task.Status task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) if task.Status != oldStatus || task.Status == models.GrampusStatusRunning { task.Duration = result.JobInfo.RunSec if task.Duration < 0 { task.Duration = 0 } task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) } if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { task.EndTime = task.StartTime.Add(task.Duration) } task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource { if len(result.JobInfo.Tasks[0].CenterID) == 1 { urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) } } } } err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob failed:" + err.Error()) } } } if len(task.Parameters) > 0 { var parameters models.Parameters err := json.Unmarshal([]byte(task.Parameters), ¶meters) if err != nil { log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err) ctx.ServerError("system error", err) return } if len(parameters.Parameter) > 0 { paramTemp := "" for _, Parameter := range parameters.Parameter { param := Parameter.Label + " = " + Parameter.Value + "; " paramTemp = paramTemp + param } task.Parameters = paramTemp[:len(paramTemp)-2] } else { task.Parameters = "" } } user, err := models.GetUserByID(task.UserID) if err == nil { task.User = user } prepareSpec4Show(ctx, task) ctx.Data["task"] = task ctx.Data["datasetDownload"] = getDatasetDownloadInfo(ctx, task) ctx.Data["modelDownload"] = getModelDownloadInfo(ctx, task) ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) ctx.Data["ai_center"] = cloudbrainService.GetAiCenterShow(task.AiCenter, ctx) ctx.Data["code_path"] = cloudbrain.CodeMountPath ctx.Data["dataset_path"] = cloudbrain.DataSetMountPath ctx.Data["model_path"] = cloudbrain.ModelMountPath ctx.HTML(http.StatusOK, tplGrampusNotebookShow) } func getDatasetDownloadInfo(ctx *context.Context, task *models.Cloudbrain) []*models.DatasetDownload { datasetDownload := make([]*models.DatasetDownload, 0) if ctx.IsSigned { if task.Uuid != "" && task.UserID == ctx.User.ID { if task.IsGPUTask() { return GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) } else { datasetDownload = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) datasetObsUrlList := make([]NotebookDataset, 0) _ = json.Unmarshal([]byte(task.DataUrl), &datasetObsUrlList) for _, datasetInfo := range datasetDownload { for _, datasetObs := range datasetObsUrlList { log.Info("datasetObsUrl:" + datasetObs.DatasetUrl + "datasetName:" + datasetInfo.DatasetName) if strings.Contains(datasetObs.DatasetUrl, datasetInfo.DatasetName) { datasetInfo.DatasetDownloadLink = datasetObs.DatasetUrl break } } } } } } return datasetDownload } func getModelDownloadInfo(ctx *context.Context, task *models.Cloudbrain) *models.ModelDownload { var modelDownload models.ModelDownload if ctx.IsSigned { if task.ModelName != "" && task.UserID == ctx.User.ID { if task.IsNPUTask() { modelDownload = models.ModelDownload{ Name: task.CkptName, DownloadLink: "", IsDelete: false, } if !HasModelFile(task) { modelDownload.IsDelete = true } datasetObsUrlList := make([]NotebookDataset, 0) _ = json.Unmarshal([]byte(task.DataUrl), &datasetObsUrlList) for _, datasetObs := range datasetObsUrlList { if strings.Contains(datasetObs.DatasetUrl, task.CkptName) { modelDownload.DownloadLink = datasetObs.DatasetUrl break } } } } } return &modelDownload } func GrampusTrainJobShow(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true var task *models.Cloudbrain task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid")) if err != nil { log.Error("GetCloudbrainByJobID failed:" + err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) return } task.ContainerIp = "" task.User, _ = models.GetUserByID(task.UserID) if task.DeletedAt.IsZero() { //normal record result, err := grampus.GetJob(task.JobID) if err != nil { log.Error("GetJob failed:" + err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) return } if result != nil { if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] } oldStatus := task.Status task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) if task.Status != oldStatus || task.Status == models.GrampusStatusRunning { task.Duration = result.JobInfo.RunSec if task.Duration < 0 { task.Duration = 0 } task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) } if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { task.EndTime = task.StartTime.Add(task.Duration) } task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource { if len(result.JobInfo.Tasks[0].CenterID) == 1 { urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) } } } } err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob failed:" + err.Error()) } } } if len(task.Parameters) > 0 { var parameters models.Parameters err := json.Unmarshal([]byte(task.Parameters), ¶meters) if err != nil { log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err) ctx.ServerError("system error", err) return } if len(parameters.Parameter) > 0 { paramTemp := "" for _, Parameter := range parameters.Parameter { param := Parameter.Label + " = " + Parameter.Value + "; " paramTemp = paramTemp + param } task.Parameters = paramTemp[:len(paramTemp)-2] } else { task.Parameters = "" } } taskList := make([]*models.Cloudbrain, 0) taskList = append(taskList, task) prepareSpec4Show(ctx, task) ctx.Data["version_list_task"] = taskList ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) ctx.Data["displayJobName"] = task.DisplayJobName ctx.Data["ai_center"] = cloudbrainService.GetAiCenterShow(task.AiCenter, ctx) ctx.HTML(http.StatusOK, tplGrampusTrainJobShow) } func GrampusDownloadLog(ctx *context.Context) { jobID := ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } content, err := grampus.GetTrainJobLog(job.JobID) if err != nil { log.Error("GetTrainJobLog failed: %v", err, ctx.Data["MsgID"]) content = "" } fileName := job.JobName + "-log.txt" ctx.Resp.Header().Set("Content-Disposition", "attachment; filename="+fileName) ctx.Resp.Header().Set("Content-Type", "application/octet-stream") var b []byte = []byte(content) ctx.Resp.Write(b) } func GrampusGetLog(ctx *context.Context) { jobID := ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } content, err := grampus.GetTrainJobLog(job.JobID) if err != nil { log.Error("GetTrainJobLog failed: %v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, map[string]interface{}{ "JobName": job.JobName, "Content": "", "CanLogDownload": false, }) return } result, err := grampus.GetJob(jobID) if err != nil { log.Error("GetJob(%s) failed:%v", job.JobName, err) ctx.JSON(http.StatusOK, map[string]interface{}{ "JobName": job.JobName, "Content": content, "CanLogDownload": false, }) return } if result != nil { job.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) if job.Status == models.GrampusStatusFailed { content = content + "\n" + result.ExitDiagnostics } } canLogDownload := err == nil && job.IsUserHasRight(ctx.User) ctx.JSON(http.StatusOK, map[string]interface{}{ "JobName": job.JobName, "Content": content, "CanLogDownload": canLogDownload, }) return } func GrampusMetrics(ctx *context.Context) { jobID := ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"]) ctx.ServerError(err.Error(), err) return } result, err := grampus.GetGrampusMetrics(job.JobID) if err != nil { log.Error("GetTrainJobLog failed: %v", err, ctx.Data["MsgID"]) } ctx.JSON(http.StatusOK, map[string]interface{}{ "JobID": jobID, "Interval": result.Interval, "MetricsInfo": result.MetricsInfo, }) return } func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName, modelRemoteObsUrl string) (string, error) { var command string //prepare workDir := grampus.NpuWorkDir if processorType == grampus.ProcessorTypeNPU { command += "pwd;cd " + workDir + grampus.CommandPrepareScriptNpu } else if processorType == grampus.ProcessorTypeGPU { workDir = grampus.GpuWorkDir command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScriptGpu, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject) } //download code & dataset if processorType == grampus.ProcessorTypeNPU { //no need to download code & dataset by internet } else if processorType == grampus.ProcessorTypeGPU { commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " '" + dataRemotePath + "' '" + datasetName + "'" commandDownload = processPretrainModelParameter(pretrainModelPath, pretrainModelFileName, commandDownload) command += commandDownload } //unzip code & dataset if processorType == grampus.ProcessorTypeNPU { //no need to process } else if processorType == grampus.ProcessorTypeGPU { unZipDatasetCommand := cloudbrainTask.GenerateDatasetUnzipCommand(datasetName) commandUnzip := "cd " + workDir + "code;unzip -q master.zip;rm -f master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + unZipDatasetCommand command += commandUnzip } command += "echo \"unzip finished;start to exec code;\";" // set export var commandExport string if processorType == grampus.ProcessorTypeNPU { commandExport = "export bucket=" + setting.Bucket + " && export remote_path=" + outputRemotePath + ";" } else if processorType == grampus.ProcessorTypeGPU { commandExport = "export env=" + setting.Grampus.Env + " && export remote_path=" + outputRemotePath + ";" } command += commandExport //exec code var parameters models.Parameters var paramCode string if len(paramSrc) != 0 { err := json.Unmarshal([]byte(paramSrc), ¶meters) if err != nil { log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err) return command, err } for _, parameter := range parameters.Parameter { paramCode += " --" + parameter.Label + "=" + parameter.Value } } var commandCode string if processorType == grampus.ProcessorTypeNPU { paramCode += " --model_url=" + modelRemoteObsUrl commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py " + grampus.NpuLocalLogUrl + paramCode + ";" } else if processorType == grampus.ProcessorTypeGPU { if pretrainModelFileName != "" { paramCode += " --ckpt_url" + "=" + workDir + "pretrainmodel/" + pretrainModelFileName } commandCode = "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";" } command += commandCode //get exec result commandGetRes := "result=$?;" command += commandGetRes //upload models if processorType == grampus.ProcessorTypeNPU { // no need to upload } else if processorType == grampus.ProcessorTypeGPU { commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;" command += commandUpload } //check exec result commandCheckRes := "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1\"" command += commandCheckRes return command, nil } func processPretrainModelParameter(pretrainModelPath string, pretrainModelFileName string, commandDownload string) string { commandDownloadTemp := commandDownload if pretrainModelPath != "" { commandDownloadTemp += " '" + pretrainModelPath + "' '" + pretrainModelFileName + "'" } commandDownloadTemp += ";" return commandDownloadTemp } func downloadZipCode(ctx *context.Context, codePath, branchName string) error { archiveType := git.ZIP archivePath := codePath if !com.IsDir(archivePath) { if err := os.MkdirAll(archivePath, os.ModePerm); err != nil { log.Error("MkdirAll failed:" + err.Error()) return err } } // Get corresponding commit. var ( commit *git.Commit err error ) gitRepo := ctx.Repo.GitRepo if err != nil { log.Error("OpenRepository failed:" + err.Error()) return err } if gitRepo.IsBranchExist(branchName) { commit, err = gitRepo.GetBranchCommit(branchName) if err != nil { log.Error("GetBranchCommit failed:" + err.Error()) return err } } else { log.Error("the branch is not exist: " + branchName) return fmt.Errorf("The branch does not exist.") } archivePath = path.Join(archivePath, grampus.CodeArchiveName) if !com.IsFile(archivePath) { if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{ Format: archiveType, Prefix: setting.Repository.PrefixArchiveFiles, }); err != nil { log.Error("CreateArchive failed:" + err.Error()) return err } } return nil } func HandleTaskWithAiCenter(ctx *context.Context) { log.Info("HandleTaskWithAiCenter start") updateCounts := 0 cloudBrains, err := models.GetC2NetWithAiCenterWrongJob() if err != nil { log.Error("GetC2NetWithAiCenterWrongJob failed:" + err.Error()) return } if len(cloudBrains) == 0 { log.Info("HandleC2NetWithAiCenterWrongJob:no task need handle") return } cloudBrainCounts := len(cloudBrains) for _, task := range cloudBrains { result, err := grampus.GetJob(task.JobID) if err != nil { log.Error("GetJob failed:" + err.Error()) continue } if len(result.JobInfo.Tasks) != 0 { if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] } err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob failed:" + err.Error()) } updateCounts++ } } r := make(map[string]interface{}, 0) r["cloudBrainCounts"] = cloudBrainCounts r["updateCounts"] = updateCounts ctx.JSON(http.StatusOK, response.SuccessWithData(r)) } func GrampusNotebookDebug(ctx *context.Context) { result, err := grampus.GetNotebookJob(ctx.Cloudbrain.JobID) if err != nil { ctx.RenderWithErr(err.Error(), tplDebugJobIndex, nil) return } if len(result.JobInfo.Tasks) > 0 { ctx.Redirect(result.JobInfo.Tasks[0].Url + "?token=" + result.JobInfo.Tasks[0].Token) return } ctx.NotFound("Can not find the job.", nil) } func GrampusNotebookRestart(ctx *context.Context) { var id = ctx.Params(":id") var resultCode = "-1" var errorMsg = "" var status = "" var spec *models.Specification task := ctx.Cloudbrain if ctx.Written() { return } for { if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed { log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) errorMsg = "the job is not stopped" break } count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeDebug), task.ComputeResource) if err != nil { log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"]) errorMsg = "system error" break } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) resultCode = "2" errorMsg = ctx.Tr("repo.cloudbrain.morethanonejob") break } } oldSpec, err := resource.GetCloudbrainSpec(task.ID) if err != nil || oldSpec == nil { log.Error("NotebookManage GetCloudbrainSpec error.%v", err) errorMsg = "Resource specification not available" break } computeSourceSimple := models.GPU action := models.ActionCreateGrampusGPUDebugTask if task.ComputeResource == models.NPUResource { computeSourceSimple = models.NPU action = models.ActionCreateGrampusNPUDebugTask } spec, err = resource.GetAndCheckSpec(ctx.User.ID, oldSpec.ID, models.FindSpecsOptions{ JobType: models.JobType(task.JobType), ComputeResource: computeSourceSimple, Cluster: models.C2NetCluster, }) if err != nil || spec == nil { log.Error("NotebookManage GetAndCheckSpec error.task.id = %d", task.ID) errorMsg = "Resource specification not support any more" break } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) errorMsg = ctx.Tr("points.insufficient_points_balance") break } if task.IsGPUTask() { if _, err := os.Stat(getOldJobPath(task)); err != nil { log.Error("Can not find job minio path", err) resultCode = "-1" errorMsg = ctx.Tr("cloudbrain.result_cleared") break } } if !HasModelFile(task) { //使用预训练模型训练 errorMsg = ctx.Tr("repo.debug.manage.model_not_exist") break } if hasDatasetDeleted(task) { errorMsg = ctx.Tr("repo.debug.manage.dataset_not_exist") break } createTime := timeutil.TimeStampNow() res, err := grampus.RestartNotebookJob(task.JobID) if err != nil { log.Error("ManageNotebook2(%s) failed:%v", task.DisplayJobName, err.Error(), ctx.Data["MsgID"]) errorMsg = ctx.Tr("repo.debug_again_fail") break } if res.GrampusResult.ErrorCode != 0 || res.NewId == "" { log.Error("ManageNotebook2 failed:" + res.GrampusResult.ErrorMsg) errorMsg = ctx.Tr("repo.debug_again_fail") if res.GrampusResult.ErrorCode == 5005 { errorMsg = ctx.Tr("repo.debug_again_fail_forever") } break } newTask := &models.Cloudbrain{ Status: res.Status, UserID: task.UserID, RepoID: task.RepoID, JobID: res.NewId, JobName: task.JobName, DisplayJobName: task.DisplayJobName, JobType: task.JobType, Type: task.Type, Uuid: task.Uuid, Image: task.Image, ImageID: task.ImageID, EngineID: task.EngineID, CommitID: task.CommitID, EngineName: task.EngineName, IsLatestVersion: "1", BranchName: task.BranchName, DatasetName: task.DatasetName, ComputeResource: task.ComputeResource, Description: task.Description, CreatedUnix: createTime, UpdatedUnix: createTime, Spec: spec, ModelName: task.ModelName, ModelVersion: task.ModelVersion, LabelName: task.LabelName, PreTrainModelUrl: task.PreTrainModelUrl, CkptName: task.CkptName, WorkServerNumber: 1, } err = models.RestartCloudbrain(task, newTask) if err != nil { log.Error("RestartCloudbrain(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"]) errorMsg = "system error" break } id = strconv.FormatInt(newTask.ID, 10) status = res.Status resultCode = "0" notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, id, newTask.DisplayJobName, action) break } ctx.JSON(200, map[string]string{ "result_code": resultCode, "error_msg": errorMsg, "status": status, "id": id, }) }