package cloudbrainTask import ( "fmt" "net/http" "path" "strings" "code.gitea.io/gitea/modules/notebook" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/modelarts_cd" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/services/cloudbrain/resource" "code.gitea.io/gitea/services/reward/point/account" "code.gitea.io/gitea/modules/setting" cloudbrainService "code.gitea.io/gitea/services/cloudbrain" repo_service "code.gitea.io/gitea/services/repository" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/util" ) const NoteBookExtension = ".ipynb" const CPUType = 0 const GPUType = 1 const NPUType = 2 const CharacterLength = 2550 func FileNotebookCreate(ctx *context.Context, option api.CreateFileNotebookJobOption) { if ctx.Written() { return } if path.Ext(option.File) != NoteBookExtension { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_select_wrong"))) return } if len(getBootFile(option.File, option.OwnerName, option.ProjectName)) > CharacterLength { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_path_too_long"))) return } if len(option.BranchName) > CharacterLength { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_branch_name_too_long"))) return } isNotebookFileExist, _ := isNoteBookFileExist(ctx, option) if !isNotebookFileExist { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_not_exist"))) return } sourceRepo, err := models.GetRepositoryByOwnerAndName(option.OwnerName, option.ProjectName) if err != nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_not_exist"))) return } permission, err := models.GetUserRepoPermission(sourceRepo, ctx.User) if err != nil { log.Error("Get permission failed", err) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_no_right"))) return } if !permission.CanRead(models.UnitTypeCode) { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_no_right"))) return } //create repo if not exist repo, _ := models.GetRepositoryByName(ctx.User.ID, setting.FileNoteBook.ProjectName) if repo == nil { repo, err = repo_service.CreateRepository(ctx.User, ctx.User, models.CreateRepoOptions{ Name: setting.FileNoteBook.ProjectName, Alias: "", Description: "", IssueLabels: "", Gitignores: "", License: "", Readme: "Default", IsPrivate: false, AutoInit: true, DefaultBranch: "master", }) if err != nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.failed_to_create_notebook_repo", setting.FileNoteBook.ProjectName))) return } } else { noteBook, _ := models.GetWaitOrRunFileNotebookByRepo(repo.ID, getCloudbrainType(option.Type)) if noteBook != nil { if isRepoConfilcts(option, noteBook) { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_repo_conflict"))) return } if isNotebookSpecMath(option, noteBook) { if !isRepoMatch(option, noteBook) { err = downloadCode(sourceRepo, getCodePath(noteBook.JobName, sourceRepo), option.BranchName) if err != nil { log.Error("download code failed", err) if !strings.Contains(err.Error(), "already exists and is not an empty directory") { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } } } if !isRepoFileMatch(option, noteBook) { if len(noteBook.BootFile)+len(getBootFile(option.File, option.OwnerName, option.ProjectName))+1 <= CharacterLength { noteBook.BootFile += ";" + getBootFile(option.File, option.OwnerName, option.ProjectName) } else { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_path_too_long"))) return } if len(noteBook.BranchName)+len(option.BranchName)+1 <= CharacterLength { noteBook.BranchName += ";" + option.BranchName } else { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_branch_name_too_long"))) return } if len(noteBook.Description)+len(getDescription(option))+1 <= CharacterLength { noteBook.Description += ";" + getDescription(option) } err := models.UpdateJob(noteBook) if err != nil { log.Error("GenerateNotebook2 failed, %v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } } ctx.JSON(http.StatusOK, models.BaseMessageApi{ Code: 0, Message: noteBook.JobID, }) return } } } if option.Type <= GPUType { cloudBrainFileNoteBookCreate(ctx, option, repo, sourceRepo) } else { modelartsFileNoteBookCreate(ctx, option, repo, sourceRepo) } } func FileNotebookStatus(ctx *context.Context, option api.CreateFileNotebookJobOption) { if ctx.Written() { return } if path.Ext(option.File) != NoteBookExtension { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_select_wrong"))) return } isNotebookFileExist, _ := isNoteBookFileExist(ctx, option) if !isNotebookFileExist { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_not_exist"))) return } task, err := models.GetCloudbrainByJobID(option.JobId) if err != nil { log.Error("job not found:"+option.JobId, err) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("Job id may not be right. can not find job.")) return } if task.BootFile == "" || task.Status != string(models.ModelArtsRunning) { log.Warn("Boot file is empty or status is running. ") ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("Boot file is empty or status is running.")) return } if !isRepoFileMatch(option, task) { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("can not math repo file.")) return } debugBaseUrl, token, err := getBaseUrlAndToken(task) if err != nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } if uploadNotebookFileIfCannotBroswer(debugBaseUrl, getBootFile(option.File, option.OwnerName, option.ProjectName), task, token) { ctx.JSON(http.StatusOK, models.BaseOKMessageApi) } else { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("upload failed.")) } } func getBaseUrlAndToken(task *models.Cloudbrain) (string, string, error) { var debugBaseUrl string var token string if task.Type == models.TypeCloudBrainOne { debugBaseUrl = setting.DebugServerHost + "jpylab_" + task.JobID + "_" + task.SubTaskName + "/lab" } else { var result *models.GetNotebook2Result var err error if task.Type == models.TypeCloudBrainTwo { result, err = modelarts.GetNotebook2(task.JobID) } else if task.Type == models.TypeCDCenter { result, err = modelarts_cd.GetNotebook(task.JobID) } if err != nil || result == nil || result.Status != string(models.ModelArtsRunning) || result.Url == "" { log.Error("notebook job not found:"+task.JobID, err) return "", "", fmt.Errorf("can not get job or job is invalid.") } debugBaseUrl = result.Url token = result.Token } return debugBaseUrl, token, nil } func uploadNotebookFileIfCannotBroswer(debugBaseUrl string, bootFile string, task *models.Cloudbrain, token string) bool { c := ¬ebook.NotebookContent{ Url: debugBaseUrl, Path: bootFile, PathType: "file", Token: token, } if c.IsNotebookFileCanBrowser() { return true } else { c.SetCookiesAndCsrf() c.UploadNoteBookFile(task) return c.IsNotebookFileCanBrowser() } } func isNotebookSpecMath(option api.CreateFileNotebookJobOption, book *models.Cloudbrain) bool { if option.Type == NPUType || option.Type == CPUType { return true } spec, err := models.GetCloudbrainSpecByID(book.ID) if err != nil { log.Warn("can not get spec ", err) return false } return spec.AccCardsNum > 0 } func isRepoConfilcts(option api.CreateFileNotebookJobOption, book *models.Cloudbrain) bool { bootFiles := strings.Split(book.BootFile, ";") branches := strings.Split(book.BranchName, ";") for i, bootFile := range bootFiles { splits := strings.Split(bootFile, "/") if len(splits) >= 3 { if splits[0] == option.OwnerName && splits[1] == option.ProjectName && branches[i] != option.BranchName { return true } } } return false } func isRepoMatch(option api.CreateFileNotebookJobOption, book *models.Cloudbrain) bool { bootFiles := strings.Split(book.BootFile, ";") for _, bootFile := range bootFiles { splits := strings.Split(bootFile, "/") if len(splits) >= 3 { if splits[0] == option.OwnerName && splits[1] == option.ProjectName { return true } } } return false } func isRepoFileMatch(option api.CreateFileNotebookJobOption, book *models.Cloudbrain) bool { bootFiles := strings.Split(book.BootFile, ";") branches := strings.Split(book.BranchName, ";") for i, bootFile := range bootFiles { if branches[i] == option.BranchName && getBootFile(option.File, option.OwnerName, option.ProjectName) == bootFile { return true } } return false } func UploadNotebookFiles(task *models.Cloudbrain) { if task.Status == string(models.JobRunning) && task.BootFile != "" { debugBaseUrl, token, err := getBaseUrlAndToken(task) if err != nil { log.Error("can not get base url:", err) return } bootFiles := strings.Split(task.BootFile, ";") for _, bootFile := range bootFiles { uploadNotebookFileIfCannotBroswer(debugBaseUrl, bootFile, task, token) } } } func cloudBrainFileNoteBookCreate(ctx *context.Context, option api.CreateFileNotebookJobOption, repo *models.Repository, sourceRepo *models.Repository) { displayJobName := cloudbrainService.GetDisplayJobName(ctx.User.Name) jobName := util.ConvertDisplayJobNameToJobName(displayJobName) jobType := string(models.JobTypeDebug) lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) defer lock.UnLock() isOk, err := lock.Lock(models.CloudbrainKeyDuration) if !isOk { log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err"))) return } tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err"))) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error.")) return } } count, err := GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainOne, jobType) if err != nil { log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error.")) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseMessageApi{ Code: 2, Message: ctx.Tr("repo.cloudbrain.morethanonejob"), }) return } } err = downloadCode(sourceRepo, getCodePath(jobName, sourceRepo), option.BranchName) if err != nil { log.Error("download code failed", err) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } command := cloudbrain.GetCloudbrainDebugCommand() specId := setting.FileNoteBook.SpecIdGPU if option.Type == 0 { specId = setting.FileNoteBook.SpecIdCPU } spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{ JobType: models.JobType(jobType), ComputeResource: models.GPU, Cluster: models.OpenICluster, AiCenterCode: models.AICenterOfCloudBrainOne}) if err != nil || spec == nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.wrong_specification"))) return } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("points.insufficient_points_balance"))) return } ctx.Repo = &context.Repository{ Repository: repo, } req := cloudbrain.GenerateCloudBrainTaskReq{ Ctx: ctx, DisplayJobName: displayJobName, JobName: jobName, Image: setting.FileNoteBook.ImageGPU, Command: command, Uuids: "", DatasetNames: "", DatasetInfos: nil, CodePath: storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"), ModelPath: storage.GetMinioPath(jobName, cloudbrain.ModelMountPath+"/"), BenchmarkPath: storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"), Snn4ImageNetPath: storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"), BrainScorePath: storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"), JobType: jobType, Description: getDescription(option), BranchName: option.BranchName, BootFile: getBootFile(option.File, option.OwnerName, option.ProjectName), Params: "{\"parameter\":[]}", CommitID: "", BenchmarkTypeID: 0, BenchmarkChildTypeID: 0, ResultPath: storage.GetMinioPath(jobName, cloudbrain.ResultPath+"/"), Spec: spec, } jobId, err := cloudbrain.GenerateTask(req) if err != nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } ctx.JSON(http.StatusOK, models.BaseMessageApi{ Code: 0, Message: jobId, }) } func getCloudbrainType(optionType int) int { if optionType <= GPUType { return models.TypeCloudBrainOne } if setting.ModelartsCD.Enabled { return models.TypeCDCenter } return models.TypeCloudBrainTwo } func getCodePath(jobName string, repo *models.Repository) string { return setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" + repo.OwnerName + "/" + repo.Name } func getDescription(option api.CreateFileNotebookJobOption) string { des := option.OwnerName + "/" + option.ProjectName + "/" + option.File if len(des) <= CharacterLength { return des } return "" } func modelartsFileNoteBookCreate(ctx *context.Context, option api.CreateFileNotebookJobOption, repo *models.Repository, sourceRepo *models.Repository) { displayJobName := cloudbrainService.GetDisplayJobName(ctx.User.Name) jobName := util.ConvertDisplayJobNameToJobName(displayJobName) lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) isOk, err := lock.Lock(models.CloudbrainKeyDuration) if !isOk { log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err"))) return } defer lock.UnLock() count, err := GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeDebug)) if err != nil { log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error.")) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseMessageApi{ Code: 2, Message: ctx.Tr("repo.cloudbrain.morethanonejob"), }) return } } tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeDebug), displayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err"))) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error.")) return } } err = downloadCode(sourceRepo, getCodePath(jobName, sourceRepo), option.BranchName) if err != nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } var aiCenterCode = models.AICenterOfCloudBrainTwo var specId = setting.FileNoteBook.SpecIdNPU if setting.ModelartsCD.Enabled { aiCenterCode = models.AICenterOfChengdu specId = setting.FileNoteBook.SpecIdNPUCD } spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{ JobType: models.JobTypeDebug, ComputeResource: models.NPU, Cluster: models.OpenICluster, AiCenterCode: aiCenterCode}) if err != nil || spec == nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.wrong_specification"))) return } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("points.insufficient_points_balance"))) return } ctx.Repo = &context.Repository{ Repository: repo, } var jobId string req := cloudbrain.GenerateModelArtsNotebookReq{ DisplayJobName: displayJobName, JobName: jobName, Description: getDescription(option), ImageId: setting.FileNoteBook.ImageIdNPU, Spec: spec, BootFile: getBootFile(option.File, option.OwnerName, option.ProjectName), AutoStopDurationMs: modelarts.AutoStopDurationMs / 4, BranchName: option.BranchName, } if setting.ModelartsCD.Enabled { req.ImageId = setting.FileNoteBook.ImageIdNPUCD jobId, err = modelarts_cd.GenerateNotebook(ctx, req) } else { jobId, err = modelarts.GenerateNotebook2(ctx, req) } if err != nil { log.Error("GenerateNotebook2 failed, %v", err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } ctx.JSON(http.StatusOK, models.BaseMessageApi{ Code: 0, Message: jobId, }) } func isNoteBookFileExist(ctx *context.Context, option api.CreateFileNotebookJobOption) (bool, error) { repoPathOfNoteBook := models.RepoPath(option.OwnerName, option.ProjectName) gitRepoOfNoteBook, err := git.OpenRepository(repoPathOfNoteBook) if err != nil { log.Error("RepoRef Invalid repo "+repoPathOfNoteBook, err.Error()) return false, err } // We opened it, we should close it defer func() { // If it's been set to nil then assume someone else has closed it. if gitRepoOfNoteBook != nil { gitRepoOfNoteBook.Close() } }() fileExist, err := fileExists(gitRepoOfNoteBook, option.File, option.BranchName) if err != nil || !fileExist { log.Error("Get file error:", err, ctx.Data["MsgID"]) return false, err } return true, nil } func getBootFile(filePath string, ownerName string, projectName string) string { return ownerName + "/" + projectName + "/" + filePath } func fileExists(gitRepo *git.Repository, path string, branch string) (bool, error) { commit, err := gitRepo.GetBranchCommit(branch) if err != nil { return false, err } if _, err := commit.GetTreeEntryByPath(path); err != nil { return false, err } return true, nil }