package repo import ( "bufio" "encoding/json" "errors" "fmt" "io" "net/http" "os" "regexp" "sort" "strconv" "strings" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/util" ) const ( tplCloudBrainNew base.TplName = "repo/cloudbrain/new" tplCloudBrainShow base.TplName = "repo/cloudbrain/show" tplCloudBrainShowModels base.TplName = "repo/cloudbrain/models/index" tplCloudBrainBenchmarkIndex base.TplName = "repo/cloudbrain/benchmark/index" tplCloudBrainBenchmarkNew base.TplName = "repo/cloudbrain/benchmark/new" tplCloudBrainBenchmarkShow base.TplName = "repo/cloudbrain/benchmark/show" ) var ( gpuInfos *models.GpuInfos categories *models.Categories benchmarkTypes *models.BenchmarkTypes benchmarkGpuInfos *models.GpuInfos benchmarkResourceSpecs *models.ResourceSpecs ) var jobNamePattern = regexp.MustCompile(`^[a-z0-9][a-z0-9-_]{1,34}[a-z0-9-]$`) // MustEnableDataset check if repository enable internal cb func MustEnableCloudbrain(ctx *context.Context) { if !ctx.Repo.CanRead(models.UnitTypeCloudBrain) { ctx.NotFound("MustEnableCloudbrain", nil) return } } func cutString(str string, lens int) string { if len(str) < lens { return str } return str[:lens] } func jobNamePrefixValid(s string) string { lowStr := strings.ToLower(s) re := regexp.MustCompile(`[^a-z0-9_\\-]+`) removeSpecial := re.ReplaceAllString(lowStr, "") re = regexp.MustCompile(`^[_\\-]+`) return re.ReplaceAllString(removeSpecial, "") } func cloudBrainNewDataPrepare(ctx *context.Context) error { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var jobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["job_name"] = jobName result, err := cloudbrain.GetImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range result.Payload.ImageInfo { if strings.HasPrefix(result.Payload.ImageInfo[i].Place, "192.168") { result.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { result.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["images"] = result.Payload.ImageInfo resultPublic, err := cloudbrain.GetPublicImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetPublicImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range resultPublic.Payload.ImageInfo { if strings.HasPrefix(resultPublic.Payload.ImageInfo[i].Place, "192.168") { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["public_images"] = resultPublic.Payload.ImageInfo attachs, err := models.GetAllUserAttachments(ctx.User.ID) if err != nil { log.Error("GetAllUserAttachments failed: %v", err, ctx.Data["MsgID"]) return err } ctx.Data["attachments"] = attachs ctx.Data["command"] = cloudbrain.Command ctx.Data["code_path"] = cloudbrain.CodeMountPath ctx.Data["dataset_path"] = cloudbrain.DataSetMountPath ctx.Data["model_path"] = cloudbrain.ModelMountPath ctx.Data["benchmark_path"] = cloudbrain.BenchMarkMountPath ctx.Data["is_benchmark_enabled"] = setting.IsBenchmarkEnabled if categories == nil { json.Unmarshal([]byte(setting.BenchmarkCategory), &categories) } ctx.Data["benchmark_categories"] = categories.Category if benchmarkTypes == nil { if err := json.Unmarshal([]byte(setting.BenchmarkTypes), &benchmarkTypes); err != nil { log.Error("json.Unmarshal BenchmarkTypes(%s) failed:%v", setting.BenchmarkTypes, err, ctx.Data["MsgID"]) } } ctx.Data["benchmark_types"] = benchmarkTypes.BenchmarkType if gpuInfos == nil { json.Unmarshal([]byte(setting.GpuTypes), &gpuInfos) } ctx.Data["gpu_types"] = gpuInfos.GpuInfo if benchmarkGpuInfos == nil { json.Unmarshal([]byte(setting.BenchmarkGpuTypes), &benchmarkGpuInfos) } ctx.Data["benchmark_gpu_types"] = benchmarkGpuInfos.GpuInfo if benchmarkResourceSpecs == nil { json.Unmarshal([]byte(setting.BenchmarkResourceSpecs), &benchmarkResourceSpecs) } ctx.Data["benchmark_resource_specs"] = benchmarkResourceSpecs.ResourceSpec if cloudbrain.ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs) } ctx.Data["resource_specs"] = cloudbrain.ResourceSpecs.ResourceSpec ctx.Data["snn4imagenet_path"] = cloudbrain.Snn4imagenetMountPath ctx.Data["is_snn4imagenet_enabled"] = setting.IsSnn4imagenetEnabled ctx.Data["brainscore_path"] = cloudbrain.BrainScoreMountPath ctx.Data["is_brainscore_enabled"] = setting.IsBrainScoreEnabled return nil } func CloudBrainNew(ctx *context.Context) { err := cloudBrainNewDataPrepare(ctx) if err != nil { ctx.ServerError("get new cloudbrain info failed", err) return } ctx.HTML(200, tplCloudBrainNew) } func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { ctx.Data["PageIsCloudBrain"] = true jobName := form.JobName image := form.Image uuid := form.Attachment jobType := form.JobType command := cloudbrain.Command gpuQueue := form.GpuType codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath resourceSpecId := form.ResourceSpecId if !jobNamePattern.MatchString(jobName) { ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplCloudBrainNew, &form) return } if jobType != string(models.JobTypeBenchmark) && jobType != string(models.JobTypeDebug) && jobType != string(models.JobTypeSnn4imagenet) && jobType != string(models.JobTypeBrainScore) { log.Error("jobtype error:", jobType, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("jobtype error", tplCloudBrainNew, &form) return } count, err := models.GetCloudbrainCountByUserID(ctx.User.ID, jobType) if err != nil { log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainNew, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplCloudBrainNew, &form) return } } _, err = models.GetCloudbrainByName(jobName) if err == nil { log.Error("the job name did already exist", ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("the job name did already exist", tplCloudBrainNew, &form) return } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainNew, &form) return } } repo := ctx.Repo.Repository downloadCode(repo, codePath) uploadCodeToMinio(codePath+"/", jobName, cloudbrain.CodeMountPath+"/") modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/" mkModelPath(modelPath) uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/") benchmarkPath := setting.JobPath + jobName + cloudbrain.BenchMarkMountPath if setting.IsBenchmarkEnabled && jobType == string(models.JobTypeBenchmark) { var gpuType string for _, gpuInfo := range gpuInfos.GpuInfo { if gpuInfo.Queue == gpuQueue { gpuType = gpuInfo.Value } } downloadRateCode(repo, jobName, setting.BenchmarkOwner, setting.BenchmarkName, benchmarkPath, form.BenchmarkCategory, gpuType) uploadCodeToMinio(benchmarkPath+"/", jobName, cloudbrain.BenchMarkMountPath+"/") } snn4imagenetPath := setting.JobPath + jobName + cloudbrain.Snn4imagenetMountPath if setting.IsSnn4imagenetEnabled && jobType == string(models.JobTypeSnn4imagenet) { downloadRateCode(repo, jobName, setting.Snn4imagenetOwner, setting.Snn4imagenetName, snn4imagenetPath, "", "") uploadCodeToMinio(snn4imagenetPath+"/", jobName, cloudbrain.Snn4imagenetMountPath+"/") } brainScorePath := setting.JobPath + jobName + cloudbrain.BrainScoreMountPath if setting.IsBrainScoreEnabled && jobType == string(models.JobTypeBrainScore) { downloadRateCode(repo, jobName, setting.BrainScoreOwner, setting.BrainScoreName, brainScorePath, "", "") uploadCodeToMinio(brainScorePath+"/", jobName, cloudbrain.BrainScoreMountPath+"/") } err = cloudbrain.GenerateTask(ctx, jobName, image, command, uuid, storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.ModelMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"), jobType, gpuQueue, form.Description, 0, 0, resourceSpecId) if err != nil { cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplCloudBrainNew, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/debugjob?debugListType=all") } func CloudBrainRestart(ctx *context.Context) { var jobID = ctx.Params(":jobid") var resultCode = "0" var errorMsg = "" var status = string(models.JobWaiting) task := ctx.Cloudbrain for { if task.Status != string(models.JobStopped) && task.Status != string(models.JobSucceeded) && task.Status != string(models.JobFailed) { log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) resultCode = "-1" errorMsg = "the job is not stopped" break } if task.Image == "" || task.GpuQueue == "" || task.Type != models.TypeCloudBrainOne { log.Error("the job(%s) version is too old", task.JobName, ctx.Data["MsgID"]) resultCode = "-1" errorMsg = "the job's version is too old and can not be restarted" break } if !ctx.IsSigned || (ctx.User.ID != task.UserID && !ctx.IsUserSiteAdmin()) { log.Error("the user has no right ro restart the job", task.JobName, ctx.Data["MsgID"]) resultCode = "-1" errorMsg = "you have no right to restart the job" break } count, err := models.GetCloudbrainCountByUserID(ctx.User.ID, string(models.JobTypeDebug)) if err != nil { log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) resultCode = "-1" errorMsg = "system error" break } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) resultCode = "-1" errorMsg = "you have already a running or waiting task, can not create more" break } } err = cloudbrain.RestartTask(ctx, task, &jobID) if err != nil { log.Error("RestartTask failed:%v", err.Error(), ctx.Data["MsgID"]) resultCode = "-1" errorMsg = "system error" break } break } ctx.JSON(200, map[string]string{ "result_code": resultCode, "error_msg": errorMsg, "status": status, "job_id": jobID, }) } func CloudBrainBenchMarkShow(ctx *context.Context) { if benchmarkTypes == nil { if err := json.Unmarshal([]byte(setting.BenchmarkTypes), &benchmarkTypes); err != nil { log.Error("json.Unmarshal BenchmarkTypes(%s) failed:%v", setting.BenchmarkTypes, err, ctx.Data["MsgID"]) } } cloudBrainShow(ctx, tplCloudBrainBenchmarkShow) } func CloudBrainShow(ctx *context.Context) { cloudBrainShow(ctx, tplCloudBrainShow) } func cloudBrainShow(ctx *context.Context, tpName base.TplName) { ctx.Data["PageIsCloudBrain"] = true var jobID = ctx.Params(":jobid") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.Data["error"] = err.Error() } result, err := cloudbrain.GetJob(jobID) if err != nil { ctx.Data["error"] = err.Error() } if result != nil { jobRes, _ := models.ConvertToJobResultPayload(result.Payload) jobRes.Resource.Memory = strings.ReplaceAll(jobRes.Resource.Memory, "Mi", "MB") spec := "GPU数:" + strconv.Itoa(jobRes.Resource.NvidiaComGpu) + ",CPU数:" + strconv.Itoa(jobRes.Resource.CPU) + ",内存(MB):" + jobRes.Resource.Memory ctx.Data["resource_spec"] = spec taskRoles := jobRes.TaskRoles if jobRes.JobStatus.State != string(models.JobFailed) { taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) ctx.Data["taskRes"] = taskRes task.Status = taskRes.TaskStatuses[0].State task.ContainerID = taskRes.TaskStatuses[0].ContainerID task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP err = models.UpdateJob(task) if err != nil { ctx.Data["error"] = err.Error() } } else { task.Status = jobRes.JobStatus.State taskRes := models.TaskPod{TaskStatuses: []models.TaskStatuses{ { State: jobRes.JobStatus.State, }, }} ctx.Data["taskRes"] = taskRes jobRes.JobStatus.StartTime = time.Unix(int64(task.CreatedUnix), 0).Format("2006-01-02 15:04:05") jobRes.JobStatus.EndTime = time.Unix(int64(task.UpdatedUnix), 0).Format("2006-01-02 15:04:05") } ctx.Data["result"] = jobRes } else { log.Info("error:" + err.Error()) } user, err := models.GetUserByID(task.UserID) if err == nil { task.User = user } var duration int64 if task.Status == string(models.JobRunning) { duration = time.Now().Unix() - int64(task.CreatedUnix) } else { duration = int64(task.UpdatedUnix) - int64(task.CreatedUnix) } if benchmarkTypes != nil { for _, benchmarkType := range benchmarkTypes.BenchmarkType { if task.BenchmarkTypeID == benchmarkType.Id { ctx.Data["BenchmarkTypeName"] = benchmarkType.First for _, benchmarkChildType := range benchmarkType.Second { if task.BenchmarkChildTypeID == benchmarkChildType.Id { ctx.Data["BenchmarkChildTypeName"] = benchmarkChildType.Value break } } break } } } ctx.Data["duration"] = util.AddZero(duration/3600000) + ":" + util.AddZero(duration%3600000/60000) + ":" + util.AddZero(duration%60000/1000) ctx.Data["task"] = task ctx.Data["jobID"] = jobID ctx.Data["jobName"] = task.JobName version_list_task := make([]*models.Cloudbrain, 0) version_list_task = append(version_list_task, task) ctx.Data["version_list_task"] = version_list_task ctx.HTML(200, tpName) } func CloudBrainDebug(ctx *context.Context) { debugUrl := setting.DebugServerHost + "jpylab_" + ctx.Cloudbrain.JobID + "_" + ctx.Cloudbrain.SubTaskName ctx.Redirect(debugUrl) } func CloudBrainCommitImage(ctx *context.Context, form auth.CommitImageCloudBrainForm) { err := cloudbrain.CommitImage(ctx.Cloudbrain.JobID, models.CommitImageParams{ Ip: ctx.Cloudbrain.ContainerIp, TaskContainerId: ctx.Cloudbrain.ContainerID, ImageDescription: form.Description, ImageTag: form.Tag, }) if err != nil { log.Error("CommitImage(%s) failed:%v", ctx.Cloudbrain.JobName, err.Error(), ctx.Data["msgID"]) ctx.JSON(200, map[string]string{ "result_code": "-1", "error_msg": "CommitImage failed", }) return } ctx.JSON(200, map[string]string{ "result_code": "0", "error_msg": "", }) } func CloudBrainStop(ctx *context.Context) { var jobID = ctx.Params(":jobid") var resultCode = "0" var errorMsg = "" var status = "" task := ctx.Cloudbrain for { if task.Status == string(models.JobStopped) || task.Status == string(models.JobFailed) || task.Status == string(models.JobSucceeded) { log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } err := cloudbrain.StopJob(jobID) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } task.Status = string(models.JobStopped) err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = "-1" errorMsg = "system error" break } status = task.Status break } ctx.JSON(200, map[string]string{ "result_code": resultCode, "error_msg": errorMsg, "status": status, "job_id": jobID, }) } func StopJobsByUserID(userID int64) { cloudBrains, err := models.GetCloudbrainsNeededStopByUserID(userID) if err != nil { log.Warn("Failed to get cloudBrain info", err) return } StopJobs(cloudBrains) } func StopJobsByRepoID(repoID int64) { cloudBrains, err := models.GetCloudbrainsNeededStopByRepoID(repoID) if err != nil { log.Warn("Failed to get cloudBrain info", err) return } StopJobs(cloudBrains) } /** */ func StopJobs(cloudBrains []*models.Cloudbrain) { for _, taskInfo := range cloudBrains { if taskInfo.Type == models.TypeCloudBrainOne { err := retry(3, time.Second*30, func() error { return cloudbrain.StopJob(taskInfo.JobID) }) logErrorAndUpdateJobStatus(err, taskInfo) } else { if taskInfo.JobType == string(models.JobTypeTrain) { err := retry(3, time.Second*30, func() error { _, err := modelarts.StopTrainJob(taskInfo.JobID, strconv.FormatInt(taskInfo.VersionID, 10)) return err }) logErrorAndUpdateJobStatus(err, taskInfo) } else { param := models.NotebookAction{ Action: models.ActionStop, } err := retry(3, time.Second*30, func() error { _, err := modelarts.ManageNotebook(taskInfo.JobID, param) return err }) logErrorAndUpdateJobStatus(err, taskInfo) } } } } func retry(attempts int, sleep time.Duration, f func() error) (err error) { for i := 0; i < attempts; i++ { if i > 0 { log.Warn("retrying after error:", err) time.Sleep(sleep) } err = f() if err == nil { return nil } } return fmt.Errorf("after %d attempts, last error: %s", attempts, err) } func logErrorAndUpdateJobStatus(err error, taskInfo *models.Cloudbrain) { if err != nil { log.Warn("Failed to stop cloudBrain job:"+taskInfo.JobID, err) } else { taskInfo.Status = string(models.JobStopped) err = models.UpdateJob(taskInfo) if err != nil { log.Warn("UpdateJob failed", err) } } } func CloudBrainDel(ctx *context.Context) { if err := deleteCloudbrainJob(ctx); err != nil { log.Error("deleteCloudbrainJob failed: %v", err, ctx.Data["msgID"]) ctx.ServerError(err.Error(), err) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/debugjob?debugListType=all") } func deleteCloudbrainJob(ctx *context.Context) error { task := ctx.Cloudbrain if task.Status != string(models.JobStopped) && task.Status != string(models.JobFailed) && task.Status != string(models.JobSucceeded) { log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) return errors.New("the job has not been stopped") } err := models.DeleteJob(task) if err != nil { log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"]) return err } deleteJobStorage(task.JobName, models.TypeCloudBrainOne) return nil } func CloudBrainShowModels(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true jobID := ctx.Params(":jobid") parentDir := ctx.Query("parentDir") dirArray := strings.Split(parentDir, "/") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("no such job!", ctx.Data["msgID"]) ctx.ServerError("no such job:", err) return } //get dirs dirs, err := GetModelDirs(task.JobName, parentDir) if err != nil { log.Error("GetModelDirs failed:%v", err.Error(), ctx.Data["msgID"]) ctx.ServerError("GetModelDirs failed:", err) return } var fileInfos []storage.FileInfo err = json.Unmarshal([]byte(dirs), &fileInfos) if err != nil { log.Error("json.Unmarshal failed:%v", err.Error(), ctx.Data["msgID"]) ctx.ServerError("json.Unmarshal failed:", err) return } for i, fileInfo := range fileInfos { temp, _ := time.Parse("2006-01-02 15:04:05", fileInfo.ModTime) fileInfos[i].ModTime = temp.Local().Format("2006-01-02 15:04:05") } sort.Slice(fileInfos, func(i, j int) bool { return fileInfos[i].ModTime > fileInfos[j].ModTime }) ctx.Data["Path"] = dirArray ctx.Data["Dirs"] = fileInfos ctx.Data["task"] = task ctx.Data["JobID"] = jobID ctx.HTML(200, tplCloudBrainShowModels) } func GetPublicImages(ctx *context.Context) { getImages(ctx, cloudbrain.Public) } func GetCustomImages(ctx *context.Context) { getImages(ctx, cloudbrain.Custom) } func getImages(ctx *context.Context, imageType string) { log.Info("Get images begin") page := ctx.QueryInt("page") size := ctx.QueryInt("size") name := ctx.Query("name") getImagesResult, err := cloudbrain.GetImagesPageable(page, size, imageType, name) if err != nil { log.Error("Can not get images:%v", err) ctx.JSON(http.StatusOK, models.GetImagesPayload{ Count: 0, TotalPages: 0, ImageInfo: []*models.ImageInfo{}, }) } else { ctx.JSON(http.StatusOK, getImagesResult.Payload) } log.Info("Get images end") } func GetModelDirs(jobName string, parentDir string) (string, error) { var req string modelActualPath := storage.GetMinioPath(jobName, cloudbrain.ModelMountPath+"/") if parentDir == "" { req = "baseDir=" + modelActualPath } else { req = "baseDir=" + modelActualPath + "&parentDir=" + parentDir } return getDirs(req) } func CloudBrainDownloadModel(ctx *context.Context) { parentDir := ctx.Query("parentDir") fileName := ctx.Query("fileName") jobName := ctx.Query("jobName") filePath := "jobs/" + jobName + "/model/" + parentDir url, err := storage.Attachments.PresignedGetURL(filePath, fileName) if err != nil { log.Error("PresignedGetURL failed: %v", err.Error(), ctx.Data["msgID"]) ctx.ServerError("PresignedGetURL", err) return } http.Redirect(ctx.Resp, ctx.Req.Request, url, http.StatusMovedPermanently) } func GetRate(ctx *context.Context) { isObjectDetcionAll := ctx.QueryBool("isObjectDetcionAll") if isObjectDetcionAll { ctx.Redirect(setting.BenchmarkServerHost + "?username=admin") return } var jobID = ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } if job.JobType == string(models.JobTypeBenchmark) { log.Info("url=" + setting.BenchmarkServerHost + "?username=" + ctx.User.Name) ctx.Redirect(setting.BenchmarkServerHost + "?username=" + ctx.User.Name) } else if job.JobType == string(models.JobTypeSnn4imagenet) { ctx.Redirect(setting.Snn4imagenetServerHost) } else if job.JobType == string(models.JobTypeBrainScore) { ctx.Redirect(setting.BrainScoreServerHost) } else { log.Error("JobType error:%s", job.JobType, ctx.Data["msgID"]) } } func downloadCode(repo *models.Repository, codePath string) error { if err := git.Clone(repo.RepoPath(), codePath, git.CloneRepoOptions{}); err != nil { log.Error("Failed to clone repository: %s (%v)", repo.FullName(), err) return err } configFile, err := os.OpenFile(codePath+"/.git/config", os.O_RDWR, 0666) if err != nil { log.Error("open file(%s) failed:%v", codePath+"/,git/config", err) return err } defer configFile.Close() pos := int64(0) reader := bufio.NewReader(configFile) for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { log.Error("not find the remote-url") return nil } else { log.Error("read error: %v", err) return err } } if strings.Contains(line, "url") && strings.Contains(line, ".git") { originUrl := "\turl = " + repo.CloneLink().HTTPS + "\n" if len(line) > len(originUrl) { originUrl += strings.Repeat(" ", len(line)-len(originUrl)) } bytes := []byte(originUrl) _, err := configFile.WriteAt(bytes, pos) if err != nil { log.Error("WriteAt failed:%v", err) return err } break } pos += int64(len(line)) } return nil } func downloadRateCode(repo *models.Repository, taskName, rateOwnerName, rateRepoName, codePath, benchmarkCategory, gpuType string) error { err := os.MkdirAll(codePath, os.ModePerm) if err != nil { log.Error("mkdir codePath failed", err.Error()) return err } repoExt, err := models.GetRepositoryByOwnerAndName(rateOwnerName, rateRepoName) if err != nil { log.Error("GetRepositoryByOwnerAndName(%s) failed", rateRepoName, err.Error()) return err } if err := git.Clone(repoExt.RepoPath(), codePath, git.CloneRepoOptions{}); err != nil { log.Error("Failed to clone repository: %s (%v)", repoExt.FullName(), err) return err } fileName := codePath + cloudbrain.TaskInfoName f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { log.Error("OpenFile failed", err.Error()) return err } defer f.Close() data, err := json.Marshal(models.TaskInfo{ Username: repo.Owner.Name, TaskName: taskName, CodeName: repo.Name, BenchmarkCategory: strings.Split(benchmarkCategory, ","), CodeLink: strings.TrimSuffix(repo.CloneLink().HTTPS, ".git"), GpuType: gpuType, }) if err != nil { log.Error("json.Marshal failed", err.Error()) return err } _, err = f.Write(data) if err != nil { log.Error("WriteString failed", err.Error()) return err } return nil } func uploadCodeToMinio(codePath, jobName, parentDir string) error { files, err := readDir(codePath) if err != nil { log.Error("readDir(%s) failed: %s", codePath, err.Error()) return err } for _, file := range files { if file.IsDir() { if err = uploadCodeToMinio(codePath+file.Name()+"/", jobName, parentDir+file.Name()+"/"); err != nil { log.Error("uploadCodeToMinio(%s) failed: %s", file.Name(), err.Error()) return err } } else { destObject := setting.CBCodePathPrefix + jobName + parentDir + file.Name() sourceFile := codePath + file.Name() err = storage.Attachments.UploadObject(destObject, sourceFile) if err != nil { log.Error("UploadObject(%s) failed: %s", file.Name(), err.Error()) return err } } } return nil } func mkModelPath(modelPath string) error { err := os.MkdirAll(modelPath, os.ModePerm) if err != nil { log.Error("MkdirAll(%s) failed:%v", modelPath, err) return err } fileName := modelPath + "README" f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { log.Error("OpenFile failed", err.Error()) return err } defer f.Close() _, err = f.WriteString("You can put the model file into this directory and download it by the web page.") if err != nil { log.Error("WriteString failed", err.Error()) return err } return nil } func deleteJobStorage(jobName string, cloudbrainType int) error { //delete local localJobPath := setting.JobPath + jobName err := os.RemoveAll(localJobPath) if err != nil { log.Error("RemoveAll(%s) failed:%v", localJobPath, err) } //delete oss if cloudbrainType == models.TypeCloudBrainOne { dirPath := setting.CBCodePathPrefix + jobName + "/" err = storage.Attachments.DeleteDir(dirPath) if err != nil { log.Error("DeleteDir(%s) failed:%v", localJobPath, err) } } else if cloudbrainType == models.TypeCloudBrainTwo { //dirPath := setting.CodePathPrefix + jobName + "/" //err = storage.ObsRemoveObject(setting.Bucket, dirPath) //if err != nil { // log.Error("ObsRemoveObject(%s) failed:%v", localJobPath, err) //} log.Info("no need to delete") } else { log.Error("cloudbrainType(%d) error", cloudbrainType) } return nil } func SyncCloudbrainStatus() { cloudBrains, err := models.GetCloudBrainUnStoppedJob() if err != nil { log.Error("GetCloudBrainUnStoppedJob failed:", err.Error()) return } for _, task := range cloudBrains { if task.Type == models.TypeCloudBrainOne { result, err := cloudbrain.GetJob(task.JobID) if err != nil { log.Error("GetJob(%s) failed:%v", task.JobName, err) continue } if result != nil { jobRes, _ := models.ConvertToJobResultPayload(result.Payload) taskRoles := jobRes.TaskRoles taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) task.Status = taskRes.TaskStatuses[0].State if task.Status != string(models.JobWaiting) { task.Duration = time.Now().Unix() - taskRes.TaskStatuses[0].StartAt.Unix() err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err) } var maxDuration int64 if task.JobType == string(models.JobTypeBenchmark) { maxDuration = setting.BenchmarkMaxDuration } else { maxDuration = setting.MaxDuration } if task.Duration >= maxDuration { log.Info("begin to stop job(%s), because of the duration", task.JobName) err = cloudbrain.StopJob(task.JobID) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err) continue } task.Status = string(models.JobStopped) err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err) } } } } } else if task.Type == models.TypeCloudBrainTwo { if task.JobType == string(models.JobTypeDebug) { result, err := modelarts.GetJob(task.JobID) if err != nil { log.Error("GetJob(%s) failed:%v", task.JobName, err) continue } if result != nil { task.Status = result.Status err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err) continue } } } else if task.JobType == string(models.JobTypeTrain) { result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10)) if err != nil { log.Error("GetTrainJob(%s) failed:%v", task.JobName, err) continue } if result != nil { task.Status = modelarts.TransTrainJobStatus(result.IntStatus) task.Duration = result.Duration task.TrainJobDuration = result.TrainJobDuration if result.Duration != 0 { task.TrainJobDuration = util.AddZero(result.Duration/3600000) + ":" + util.AddZero(result.Duration%3600000/60000) + ":" + util.AddZero(result.Duration%60000/1000) } else { task.TrainJobDuration = "00:00:00" } err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob(%s) failed:%v", task.JobName, err) continue } } } else { log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType) } } else { log.Error("task.Type(%s) is error:%d", task.JobName, task.Type) } } return } func CloudBrainBenchmarkIndex(ctx *context.Context) { MustEnableCloudbrain(ctx) repo := ctx.Repo.Repository page := ctx.QueryInt("page") if page <= 0 { page = 1 } var jobTypes []string jobTypes = append(jobTypes, string(models.JobTypeBenchmark)) ciTasks, count, err := models.Cloudbrains(&models.CloudbrainsOptions{ ListOptions: models.ListOptions{ Page: page, PageSize: setting.UI.IssuePagingNum, }, RepoID: repo.ID, Type: models.TypeCloudBrainOne, JobTypes: jobTypes, }) if err != nil { ctx.ServerError("Get debugjob faild:", err) return } for i, task := range ciTasks { ciTasks[i].CanDel = cloudbrain.CanDeleteJob(ctx, &task.Cloudbrain) ciTasks[i].Cloudbrain.ComputeResource = task.ComputeResource var duration int64 if task.Status == string(models.JobRunning) { duration = time.Now().Unix() - int64(task.Cloudbrain.CreatedUnix) } else { duration = int64(task.Cloudbrain.UpdatedUnix) - int64(task.Cloudbrain.CreatedUnix) } ciTasks[i].TrainJobDuration = util.AddZero(duration/3600000) + ":" + util.AddZero(duration%3600000/60000) + ":" + util.AddZero(duration%60000/1000) } pager := context.NewPagination(int(count), setting.UI.IssuePagingNum, page, 5) ctx.Data["Page"] = pager ctx.Data["PageIsCloudBrain"] = true ctx.Data["Tasks"] = ciTasks ctx.Data["CanCreate"] = cloudbrain.CanCreateOrDebugJob(ctx) ctx.Data["RepoIsEmpty"] = repo.IsEmpty ctx.HTML(200, tplCloudBrainBenchmarkIndex) } func GetChildTypes(ctx *context.Context) { benchmarkTypeID := ctx.QueryInt("benchmark_type_id") re := make(map[string]interface{}) for { if benchmarkTypes == nil { if err := json.Unmarshal([]byte(setting.BenchmarkTypes), &benchmarkTypes); err != nil { log.Error("json.Unmarshal BenchmarkTypes(%s) failed:%v", setting.BenchmarkTypes, err, ctx.Data["MsgID"]) re["errMsg"] = "system error" break } } var isExist bool for _, benchmarkType := range benchmarkTypes.BenchmarkType { if benchmarkTypeID == benchmarkType.Id { isExist = true re["child_types"] = benchmarkType.Second re["result_code"] = "0" break } } if !isExist { re["result_code"] = "1" log.Error("no such benchmark_type_id", ctx.Data["MsgID"]) re["errMsg"] = "system error" break } break } ctx.JSON(200, re) } func CloudBrainBenchmarkNew(ctx *context.Context) { err := cloudBrainNewDataPrepare(ctx) if err != nil { ctx.ServerError("get new cloudbrain info failed", err) return } ctx.HTML(200, tplCloudBrainBenchmarkNew) } func getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID int) (*models.BenchmarkDataset, error) { var childInfo *models.BenchmarkDataset if benchmarkTypes == nil { if err := json.Unmarshal([]byte(setting.BenchmarkTypes), &benchmarkTypes); err != nil { log.Error("json.Unmarshal BenchmarkTypes(%s) failed:%v", setting.BenchmarkTypes, err) return childInfo, err } } var isExist bool for _, benchmarkType := range benchmarkTypes.BenchmarkType { if benchmarkType.Id == benchmarkTypeID { for _, childType := range benchmarkType.Second { if childType.Id == benchmarkChildTypeID { childInfo = childType isExist = true break } } break } } if !isExist { log.Error("no such benchmark_type_id&benchmark_child_type_id") return childInfo, errors.New("no such benchmark_type_id&benchmark_child_type_id") } return childInfo, nil } func getBenchmarkGpuQueue(gpuQueue string) (string, error) { queue := "" if benchmarkGpuInfos == nil { if err := json.Unmarshal([]byte(setting.BenchmarkGpuTypes), &benchmarkGpuInfos); err != nil { log.Error("json.Unmarshal BenchmarkGpuTypes(%s) failed:%v", setting.BenchmarkGpuTypes, err) return queue, err } } var isExist bool for _, gpuInfo := range benchmarkGpuInfos.GpuInfo { if gpuQueue == gpuInfo.Queue { isExist = true queue = gpuQueue break } } if !isExist { log.Error("no such gpuQueue, %s", gpuQueue) return queue, errors.New("no such gpuQueue") } return queue, nil } func getBenchmarkResourceSpec(resourceSpecID int) (int, error) { var id int if benchmarkResourceSpecs == nil { if err := json.Unmarshal([]byte(setting.BenchmarkResourceSpecs), &benchmarkResourceSpecs); err != nil { log.Error("json.Unmarshal BenchmarkResourceSpecs(%s) failed:%v", setting.BenchmarkResourceSpecs, err) return id, err } } var isExist bool for _, resourceSpec := range benchmarkResourceSpecs.ResourceSpec { if resourceSpecID == resourceSpec.Id { isExist = true id = resourceSpecID break } } if !isExist { log.Error("no such resourceSpecID, %d", resourceSpecID) return id, errors.New("no such resourceSpec") } return id, nil } func CloudBrainBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { ctx.Data["PageIsCloudBrain"] = true jobName := form.JobName image := form.Image gpuQueue := form.GpuType command := cloudbrain.CommandBenchmark codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath resourceSpecId := cloudbrain.BenchMarkResourceID benchmarkTypeID := form.BenchmarkTypeID benchmarkChildTypeID := form.BenchmarkChildTypeID if !jobNamePattern.MatchString(jobName) { cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplCloudBrainBenchmarkNew, &form) return } childInfo, err := getBenchmarkAttachment(benchmarkTypeID, benchmarkChildTypeID) if err != nil { log.Error("getBenchmarkAttachment failed:%v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("benchmark type error", tplCloudBrainBenchmarkNew, &form) return } _, err = getBenchmarkGpuQueue(gpuQueue) if err != nil { log.Error("getBenchmarkGpuQueue failed:%v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("gpu queue error", tplCloudBrainBenchmarkNew, &form) return } _, err = getBenchmarkResourceSpec(resourceSpecId) if err != nil { log.Error("getBenchmarkResourceSpec failed:%v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("resource spec error", tplCloudBrainBenchmarkNew, &form) return } count, err := models.GetCloudbrainCountByUserID(ctx.User.ID, string(models.JobTypeBenchmark)) if err != nil { log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplCloudBrainBenchmarkNew, &form) return } } _, err = models.GetCloudbrainByName(jobName) if err == nil { log.Error("the job name did already exist", ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("the job name did already exist", tplCloudBrainBenchmarkNew, &form) return } else { if !models.IsErrJobNotExist(err) { log.Error("GetCloudbrainByName failed, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) return } } repo := ctx.Repo.Repository os.RemoveAll(codePath) if err := downloadCode(repo, codePath); err != nil { log.Error("downloadCode failed, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) return } if _, err := os.Stat(codePath + "/train.py"); err != nil { if os.IsNotExist(err) { // file does not exist log.Error("train.py does not exist, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("train.py does not exist", tplCloudBrainBenchmarkNew, &form) } else { log.Error("Stat failed, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) } return } else if _, err := os.Stat(codePath + "/test.py"); err != nil { if os.IsNotExist(err) { // file does not exist log.Error("test.py does not exist, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("test.py does not exist", tplCloudBrainBenchmarkNew, &form) } else { log.Error("Stat failed, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) } return } if err := uploadCodeToMinio(codePath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { log.Error("uploadCodeToMinio failed, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) return } benchmarkPath := setting.JobPath + jobName + cloudbrain.BenchMarkMountPath var gpuType string for _, gpuInfo := range gpuInfos.GpuInfo { if gpuInfo.Queue == gpuQueue { gpuType = gpuInfo.Value } } if err := downloadRateCode(repo, jobName, childInfo.Owner, childInfo.RepoName, benchmarkPath, form.BenchmarkCategory, gpuType); err != nil { log.Error("downloadRateCode failed, %v", err, ctx.Data["MsgID"]) //cloudBrainNewDataPrepare(ctx) //ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) //return } if err := uploadCodeToMinio(benchmarkPath+"/", jobName, cloudbrain.BenchMarkMountPath+"/"); err != nil { log.Error("uploadCodeToMinio failed, %v", err, ctx.Data["MsgID"]) //cloudBrainNewDataPrepare(ctx) //ctx.RenderWithErr("system error", tplCloudBrainBenchmarkNew, &form) //return } err = cloudbrain.GenerateTask(ctx, jobName, image, command, childInfo.Attachment, storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.ModelMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"), storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"), string(models.JobTypeBenchmark), gpuQueue, form.Description, benchmarkTypeID, benchmarkChildTypeID, resourceSpecId) if err != nil { cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplCloudBrainBenchmarkNew, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain/benchmark") } func BenchmarkDel(ctx *context.Context) { if err := deleteCloudbrainJob(ctx); err != nil { log.Error("deleteCloudbrainJob failed: %v", err, ctx.Data["msgID"]) ctx.ServerError(err.Error(), err) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain/benchmark") }