package repo import ( "bufio" "encoding/json" "errors" "fmt" "io" "net/http" "os" "os/exec" "regexp" "sort" "strconv" "strings" "time" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" ) const ( tplCloudBrainIndex base.TplName = "repo/cloudbrain/index" tplCloudBrainNew base.TplName = "repo/cloudbrain/new" tplCloudBrainShow base.TplName = "repo/cloudbrain/show" tplCloudBrainShowModels base.TplName = "repo/cloudbrain/models/index" ) var ( gpuInfos *models.GpuInfos categories *models.Categories ) var jobNamePattern = regexp.MustCompile(`^[a-z0-9][a-z0-9-_]{1,34}[a-z0-9-]$`) // MustEnableDataset check if repository enable internal cb func MustEnableCloudbrain(ctx *context.Context) { if !ctx.Repo.CanRead(models.UnitTypeCloudBrain) { ctx.NotFound("MustEnableCloudbrain", nil) return } } func CloudBrainIndex(ctx *context.Context) { MustEnableCloudbrain(ctx) repo := ctx.Repo.Repository page := ctx.QueryInt("page") if page <= 0 { page = 1 } ciTasks, count, err := models.Cloudbrains(&models.CloudbrainsOptions{ ListOptions: models.ListOptions{ Page: page, PageSize: setting.UI.IssuePagingNum, }, RepoID: repo.ID, Type: models.TypeCloudBrainOne, }) if err != nil { ctx.ServerError("Cloudbrain", err) return } timestamp := time.Now().Unix() for i, task := range ciTasks { if task.Status == string(models.JobRunning) && (timestamp-int64(task.Cloudbrain.CreatedUnix) > 10) { ciTasks[i].CanDebug = true } else { ciTasks[i].CanDebug = false } ciTasks[i].CanDel = models.CanDelJob(ctx.IsSigned, ctx.User, task) } pager := context.NewPagination(int(count), setting.UI.IssuePagingNum, page, 5) pager.SetDefaultParams(ctx) ctx.Data["Page"] = pager ctx.Data["PageIsCloudBrain"] = true ctx.Data["Tasks"] = ciTasks ctx.HTML(200, tplCloudBrainIndex) } func cutString(str string, lens int) string { if len(str) < lens { return str } return str[:lens] } func jobNamePrefixValid(s string) string { lowStr := strings.ToLower(s) re := regexp.MustCompile(`[^a-z0-9_\\-]+`) removeSpecial := re.ReplaceAllString(lowStr, "") re = regexp.MustCompile(`^[_\\-]+`) return re.ReplaceAllString(removeSpecial, "") } func cloudBrainNewDataPrepare(ctx *context.Context) error { ctx.Data["PageIsCloudBrain"] = true t := time.Now() var jobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] ctx.Data["job_name"] = jobName result, err := cloudbrain.GetImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range result.Payload.ImageInfo { if strings.HasPrefix(result.Payload.ImageInfo[i].Place, "192.168") { result.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { result.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["images"] = result.Payload.ImageInfo resultPublic, err := cloudbrain.GetPublicImages() if err != nil { ctx.Data["error"] = err.Error() log.Error("cloudbrain.GetPublicImages failed:", err.Error(), ctx.Data["MsgID"]) } for i, payload := range resultPublic.Payload.ImageInfo { if strings.HasPrefix(resultPublic.Payload.ImageInfo[i].Place, "192.168") { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place[strings.Index(payload.Place, "/"):len(payload.Place)] } else { resultPublic.Payload.ImageInfo[i].PlaceView = payload.Place } } ctx.Data["public_images"] = resultPublic.Payload.ImageInfo attachs, err := models.GetAllUserAttachments(ctx.User.ID) if err != nil { log.Error("GetAllUserAttachments failed: %v", err, ctx.Data["MsgID"]) return err } ctx.Data["attachments"] = attachs ctx.Data["command"] = cloudbrain.Command ctx.Data["code_path"] = cloudbrain.CodeMountPath ctx.Data["dataset_path"] = cloudbrain.DataSetMountPath ctx.Data["model_path"] = cloudbrain.ModelMountPath ctx.Data["benchmark_path"] = cloudbrain.BenchMarkMountPath ctx.Data["is_benchmark_enabled"] = setting.IsBenchmarkEnabled if categories == nil { json.Unmarshal([]byte(setting.BenchmarkCategory), &categories) } ctx.Data["benchmark_categories"] = categories.Category if gpuInfos == nil { json.Unmarshal([]byte(setting.GpuTypes), &gpuInfos) } ctx.Data["gpu_types"] = gpuInfos.GpuInfo if cloudbrain.ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs) } ctx.Data["resource_specs"] = cloudbrain.ResourceSpecs.ResourceSpec ctx.Data["snn4imagenet_path"] = cloudbrain.Snn4imagenetMountPath ctx.Data["is_snn4imagenet_enabled"] = setting.IsSnn4imagenetEnabled ctx.Data["brainscore_path"] = cloudbrain.BrainScoreMountPath ctx.Data["is_brainscore_enabled"] = setting.IsBrainScoreEnabled return nil } func CloudBrainNew(ctx *context.Context) { err := cloudBrainNewDataPrepare(ctx) if err != nil { ctx.ServerError("get new cloudbrain info failed", err) return } ctx.HTML(200, tplCloudBrainNew) } func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { ctx.Data["PageIsCloudBrain"] = true jobName := form.JobName image := form.Image command := form.Command uuid := form.Attachment jobType := form.JobType gpuQueue := form.GpuType codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath resourceSpecId := form.ResourceSpecId if !jobNamePattern.MatchString(jobName) { ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplCloudBrainNew, &form) return } if jobType != string(models.JobTypeBenchmark) && jobType != string(models.JobTypeDebug) && jobType != string(models.JobTypeSnn4imagenet) && jobType != string(models.JobTypeBrainScore) { log.Error("jobtype error:", jobType, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("jobtype error", tplCloudBrainNew, &form) return } _, err := models.GetCloudbrainByName(jobName) if err == nil { log.Error("the job name did already exist", ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("the job name did already exist", tplCloudBrainNew, &form) return } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr("system error", tplCloudBrainNew, &form) return } } repo := ctx.Repo.Repository downloadCode(repo, codePath) modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath err = os.MkdirAll(modelPath, os.ModePerm) if err != nil { cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplCloudBrainNew, &form) return } benchmarkPath := setting.JobPath + jobName + cloudbrain.BenchMarkMountPath if setting.IsBenchmarkEnabled && jobType == string(models.JobTypeBenchmark) { var gpuType string for _, gpuInfo := range gpuInfos.GpuInfo { if gpuInfo.Queue == gpuQueue { gpuType = gpuInfo.Value } } downloadRateCode(repo, jobName, setting.BenchmarkCode, benchmarkPath, form.BenchmarkCategory, gpuType) } snn4imagenetPath := setting.JobPath + jobName + cloudbrain.Snn4imagenetMountPath if setting.IsSnn4imagenetEnabled && jobType == string(models.JobTypeSnn4imagenet) { downloadRateCode(repo, jobName, setting.Snn4imagenetCode, snn4imagenetPath, "", "") } brainScorePath := setting.JobPath + jobName + cloudbrain.BrainScoreMountPath if setting.IsBrainScoreEnabled && jobType == string(models.JobTypeBrainScore) { downloadRateCode(repo, jobName, setting.BrainScoreCode, brainScorePath, "", "") } err = cloudbrain.GenerateTask(ctx, jobName, image, command, uuid, codePath, modelPath, benchmarkPath, snn4imagenetPath, brainScorePath, jobType, gpuQueue, resourceSpecId) if err != nil { cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(err.Error(), tplCloudBrainNew, &form) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain") } func CloudBrainShow(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true var jobID = ctx.Params(":jobid") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.Data["error"] = err.Error() } result, err := cloudbrain.GetJob(jobID) if err != nil { ctx.Data["error"] = err.Error() } if result != nil { jobRes, _ := models.ConvertToJobResultPayload(result.Payload) jobRes.Resource.Memory = strings.ReplaceAll(jobRes.Resource.Memory, "Mi", "MB") taskRoles := jobRes.TaskRoles if jobRes.JobStatus.State != string(models.JobFailed) { taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) ctx.Data["taskRes"] = taskRes task.Status = taskRes.TaskStatuses[0].State task.ContainerID = taskRes.TaskStatuses[0].ContainerID task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP err = models.UpdateJob(task) if err != nil { ctx.Data["error"] = err.Error() } } else { task.Status = jobRes.JobStatus.State taskRes := models.TaskPod{TaskStatuses: []models.TaskStatuses{ { State: jobRes.JobStatus.State, }, }} ctx.Data["taskRes"] = taskRes jobRes.JobStatus.StartTime = time.Unix(int64(task.CreatedUnix), 0).Format("2006-01-02 15:04:05") jobRes.JobStatus.EndTime = time.Unix(int64(task.UpdatedUnix), 0).Format("2006-01-02 15:04:05") } ctx.Data["result"] = jobRes } ctx.Data["task"] = task ctx.Data["jobID"] = jobID ctx.HTML(200, tplCloudBrainShow) } func CloudBrainDebug(ctx *context.Context) { var jobID = ctx.Params(":jobid") if !ctx.IsSigned { log.Error("the user has not signed in") ctx.Error(http.StatusForbidden, "","the user has not signed in") return } task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } debugUrl := setting.DebugServerHost + "jpylab_" + task.JobID + "_" + task.SubTaskName ctx.Redirect(debugUrl) } func CloudBrainCommitImage(ctx *context.Context, form auth.CommitImageCloudBrainForm) { var jobID = ctx.Params(":jobid") if !ctx.IsSigned { log.Error("the user has not signed in") ctx.Error(http.StatusForbidden, "","the user has not signed in") return } task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.JSON(200, map[string]string{ "result_code": "-1", "error_msg": "GetCloudbrainByJobID failed", }) return } err = cloudbrain.CommitImage(jobID, models.CommitImageParams{ Ip: task.ContainerIp, TaskContainerId: task.ContainerID, ImageDescription: form.Description, ImageTag: form.Tag, }) if err != nil { log.Error("CommitImage(%s) failed:%v", task.JobName, err.Error(), ctx.Data["msgID"]) ctx.JSON(200, map[string]string{ "result_code": "-1", "error_msg": "CommitImage failed", }) return } ctx.JSON(200, map[string]string{ "result_code": "0", "error_msg": "", }) } func CloudBrainStop(ctx *context.Context) { var jobID = ctx.Params(":jobid") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } if task.Status == string(models.JobStopped) || task.Status == string(models.JobFailed) { log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) ctx.ServerError("the job has been stopped", errors.New("the job has been stopped")) return } err = cloudbrain.StopJob(jobID) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err.Error(), ctx.Data["msgID"]) ctx.ServerError("StopJob failed", err) return } task.Status = string(models.JobStopped) err = models.UpdateJob(task) if err != nil { ctx.ServerError("UpdateJob failed", err) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain") } func StopJobsByUserID(userID int64) { cloudBrains, err := models.GetCloudbrainsNeededStopByUserID(userID) if err != nil { log.Warn("Failed to get cloudBrain info", err) return } StopJobs(cloudBrains) } func StopJobsByRepoID(repoID int64) { cloudBrains, err := models.GetCloudbrainsNeededStopByRepoID(repoID) if err != nil { log.Warn("Failed to get cloudBrain info", err) return } StopJobs(cloudBrains) } /** */ func StopJobs(cloudBrains []*models.Cloudbrain) { for _, taskInfo := range cloudBrains { if taskInfo.Type == models.TypeCloudBrainOne { err := retry(3, time.Second*30, func() error { return cloudbrain.StopJob(taskInfo.JobID) }) logErrorAndUpdateJobStatus(err, taskInfo) } else { if taskInfo.JobType == string(models.JobTypeTrain) { err := retry(3, time.Second*30, func() error { _, err := modelarts.StopTrainJob(taskInfo.JobID, strconv.FormatInt(taskInfo.VersionID, 10)) return err }) logErrorAndUpdateJobStatus(err, taskInfo) } else { param := models.NotebookAction{ Action: models.ActionStop, } err := retry(3, time.Second*30, func() error { _, err := modelarts.StopJob(taskInfo.JobID, param) return err }) logErrorAndUpdateJobStatus(err, taskInfo) } } } } func retry(attempts int, sleep time.Duration, f func() error) (err error) { for i := 0; i < attempts; i++ { if i > 0 { log.Warn("retrying after error:", err) time.Sleep(sleep) } err = f() if err == nil { return nil } } return fmt.Errorf("after %d attempts, last error: %s", attempts, err) } func logErrorAndUpdateJobStatus(err error, taskInfo *models.Cloudbrain) { if err != nil { log.Warn("Failed to stop cloudBrain job:"+taskInfo.JobID, err) } else { taskInfo.Status = string(models.JobStopped) err = models.UpdateJob(taskInfo) if err != nil { log.Warn("UpdateJob failed", err) } } } func CloudBrainDel(ctx *context.Context) { var jobID = ctx.Params(":jobid") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } if task.Status != string(models.JobStopped) && task.Status != string(models.JobFailed) { log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) ctx.ServerError("the job has not been stopped", errors.New("the job has not been stopped")) return } err = models.DeleteJob(task) if err != nil { ctx.ServerError("DeleteJob failed", err) return } ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain") } func CloudBrainShowModels(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true jobID := ctx.Params(":jobid") parentDir := ctx.Query("parentDir") dirArray := strings.Split(parentDir, "/") task, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("no such job!", ctx.Data["msgID"]) ctx.ServerError("no such job:", err) return } //get dirs dirs, err := getModelDirs(task.JobName, parentDir) if err != nil { log.Error("getModelDirs failed:%v", err.Error(), ctx.Data["msgID"]) ctx.ServerError("getModelDirs failed:", err) return } var fileInfos []storage.FileInfo err = json.Unmarshal([]byte(dirs), &fileInfos) if err != nil { log.Error("json.Unmarshal failed:%v", err.Error(), ctx.Data["msgID"]) ctx.ServerError("json.Unmarshal failed:", err) return } for i, fileInfo := range fileInfos { temp, _ := time.Parse("2006-01-02 15:04:05", fileInfo.ModTime) fileInfos[i].ModTime = temp.Local().Format("2006-01-02 15:04:05") } sort.Slice(fileInfos, func(i, j int) bool { return fileInfos[i].ModTime > fileInfos[j].ModTime }) ctx.Data["Path"] = dirArray ctx.Data["Dirs"] = fileInfos ctx.Data["task"] = task ctx.Data["JobID"] = jobID ctx.HTML(200, tplCloudBrainShowModels) } func GetPublicImages(ctx *context.Context) { getImages(ctx, cloudbrain.Public) } func GetCustomImages(ctx *context.Context) { getImages(ctx, cloudbrain.Custom) } func getImages(ctx *context.Context, imageType string) { log.Info("Get images begin") page := ctx.QueryInt("page") size := ctx.QueryInt("size") name := ctx.Query("name") getImagesResult, err := cloudbrain.GetImagesPageable(page, size, imageType, name) if err != nil { log.Error("Can not get images:%v", err) ctx.JSON(http.StatusOK, models.GetImagesPayload{ Count: 0, TotalPages: 0, ImageInfo: []*models.ImageInfo{}, }) } else { ctx.JSON(http.StatusOK, getImagesResult.Payload) } log.Info("Get images end") } func getModelDirs(jobName string, parentDir string) (string, error) { var req string modelActualPath := setting.JobPath + jobName + "/model/" if parentDir == "" { req = "baseDir=" + modelActualPath } else { req = "baseDir=" + modelActualPath + "&parentDir=" + parentDir } return getDirs(req) } func CloudBrainDownloadModel(ctx *context.Context) { parentDir := ctx.Query("parentDir") fileName := ctx.Query("fileName") jobName := ctx.Query("jobName") filePath := "jobs/" + jobName + "/model/" + parentDir url, err := storage.Attachments.PresignedGetURL(filePath, fileName) if err != nil { log.Error("PresignedGetURL failed: %v", err.Error(), ctx.Data["msgID"]) ctx.ServerError("PresignedGetURL", err) return } http.Redirect(ctx.Resp, ctx.Req.Request, url, http.StatusMovedPermanently) } func GetRate(ctx *context.Context) { var jobID = ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { ctx.ServerError("GetCloudbrainByJobID failed", err) return } if job.JobType == string(models.JobTypeBenchmark) { ctx.Redirect(setting.BenchmarkServerHost + "?username=" + ctx.User.Name) } else if job.JobType == string(models.JobTypeSnn4imagenet) { ctx.Redirect(setting.Snn4imagenetServerHost) } else if job.JobType == string(models.JobTypeBrainScore) { ctx.Redirect(setting.BrainScoreServerHost) } else { log.Error("JobType error:%s", job.JobType, ctx.Data["msgID"]) } } func downloadCode(repo *models.Repository, codePath string) error { if err := git.Clone(repo.RepoPath(), codePath, git.CloneRepoOptions{}); err != nil { log.Error("Failed to clone repository: %s (%v)", repo.FullName(), err) return err } configFile, err := os.OpenFile(codePath+"/.git/config", os.O_RDWR, 0666) if err != nil { log.Error("open file(%s) failed:%v", codePath+"/,git/config", err) return err } defer configFile.Close() pos := int64(0) reader := bufio.NewReader(configFile) for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { log.Error("not find the remote-url") return nil } else { log.Error("read error: %v", err) return err } } if strings.Contains(line, "url") && strings.Contains(line, ".git") { originUrl := "\turl = " + repo.CloneLink().HTTPS + "\n" if len(line) > len(originUrl) { originUrl += strings.Repeat(" ", len(line)-len(originUrl)) } bytes := []byte(originUrl) _, err := configFile.WriteAt(bytes, pos) if err != nil { log.Error("WriteAt failed:%v", err) return err } break } pos += int64(len(line)) } return nil } func downloadRateCode(repo *models.Repository, taskName, gitPath, codePath, benchmarkCategory, gpuType string) error { err := os.MkdirAll(codePath, os.ModePerm) if err != nil { log.Error("mkdir codePath failed", err.Error()) return err } command := "git clone " + gitPath + " " + codePath cmd := exec.Command("/bin/bash", "-c", command) output, err := cmd.Output() log.Info(string(output)) if err != nil { log.Error("exec.Command(%s) failed:%v", command, err) return err } fileName := codePath + cloudbrain.TaskInfoName f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { log.Error("OpenFile failed", err.Error()) return err } defer f.Close() data, err := json.Marshal(models.TaskInfo{ Username: repo.Owner.Name, TaskName: taskName, CodeName: repo.Name, BenchmarkCategory: strings.Split(benchmarkCategory, ","), CodeLink: strings.TrimSuffix(repo.CloneLink().HTTPS, ".git"), GpuType: gpuType, }) if err != nil { log.Error("json.Marshal failed", err.Error()) return err } _, err = f.Write(data) if err != nil { log.Error("WriteString failed", err.Error()) return err } return nil }