|
- package cloudbrainTask
-
- import (
- "fmt"
- "net/http"
- "path"
-
- "code.gitea.io/gitea/modules/modelarts"
- "code.gitea.io/gitea/modules/modelarts_cd"
-
- "code.gitea.io/gitea/modules/git"
-
- "code.gitea.io/gitea/modules/cloudbrain"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/redis/redis_key"
- "code.gitea.io/gitea/modules/redis/redis_lock"
- "code.gitea.io/gitea/modules/storage"
- "code.gitea.io/gitea/services/cloudbrain/resource"
- "code.gitea.io/gitea/services/reward/point/account"
-
- "code.gitea.io/gitea/modules/setting"
- cloudbrainService "code.gitea.io/gitea/services/cloudbrain"
- repo_service "code.gitea.io/gitea/services/repository"
-
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/context"
- api "code.gitea.io/gitea/modules/structs"
- "code.gitea.io/gitea/modules/util"
- )
-
- const NoteBookExtension = ".ipynb"
-
- func FileNotebookCreate(ctx *context.Context, option api.CreateFileNotebookJobOption) {
-
- if ctx.Written() {
- return
- }
-
- if path.Ext(option.File) != NoteBookExtension {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_select_wrong")))
- return
- }
-
- isNotebookFileExist, _ := isNoteBookFileExist(ctx, option)
- if !isNotebookFileExist {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_not_exist")))
- return
- }
-
- sourceRepo, err := models.GetRepositoryByOwnerAndName(option.OwnerName, option.ProjectName)
- if err != nil {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_not_exist")))
- return
- }
-
- permission, err := models.GetUserRepoPermission(sourceRepo, ctx.User)
- if err != nil {
- log.Error("Get permission failed", err)
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_no_right")))
- return
- }
-
- if !permission.CanRead(models.UnitTypeCode) {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_no_right")))
- return
- }
-
- //create repo if not exist
- repo, err := models.GetRepositoryByName(ctx.User.ID, setting.FileNoteBook.ProjectName)
- if repo == nil {
- repo, err = repo_service.CreateRepository(ctx.User, ctx.User, models.CreateRepoOptions{
- Name: setting.FileNoteBook.ProjectName,
- Alias: "",
- Description: "",
- IssueLabels: "",
- Gitignores: "",
- License: "",
- Readme: "Default",
- IsPrivate: false,
- AutoInit: true,
- DefaultBranch: "master",
- })
- }
- if err != nil {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.failed_to_create_notebook_repo",setting.FileNoteBook.ProjectName)))
- return
- }
- if option.Type <= 1 {
- cloudBrainFileNoteBookCreate(ctx, option, repo, sourceRepo)
- } else {
- modelartsFileNoteBookCreate(ctx, option, repo, sourceRepo)
- }
-
- }
-
- func cloudBrainFileNoteBookCreate(ctx *context.Context, option api.CreateFileNotebookJobOption, repo *models.Repository, sourceRepo *models.Repository) {
-
- displayJobName := cloudbrainService.GetDisplayJobName(ctx.User.Name)
- jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
- jobType := string(models.JobTypeDebug)
-
- lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
- defer lock.UnLock()
- isOk, err := lock.Lock(models.CloudbrainKeyDuration)
- if !isOk {
- log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err")))
- return
- }
-
- tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
- if err == nil {
- if len(tasks) != 0 {
- log.Error("the job name did already exist", ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err")))
- return
- }
- } else {
- if !models.IsErrJobNotExist(err) {
- log.Error("system error, %v", err, ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error."))
- return
- }
- }
-
- count, err := GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainOne, jobType)
- if err != nil {
- log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error."))
- return
- } else {
- if count >= 1 {
- log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK,models.BaseMessageApi{
- Code: 2,
- Message: ctx.Tr("repo.cloudbrain.morethanonejob"),
- })
- return
- }
- }
-
- errStr := uploadCodeFile(sourceRepo, getCodePath(jobName), option.BranchName, option.File, jobName)
- if errStr != "" {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.notebook_file_not_exist")))
- return
- }
- command := cloudbrain.GetCloudbrainDebugCommand()
- specId := setting.FileNoteBook.SpecIdGPU
- if option.Type == 0 {
- specId = setting.FileNoteBook.SpecIdCPU
- }
- spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{
- JobType: models.JobType(jobType),
- ComputeResource: models.GPU,
- Cluster: models.OpenICluster,
- AiCenterCode: models.AICenterOfCloudBrainOne})
- if err != nil || spec == nil {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.wrong_specification")))
- return
- }
-
- if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) {
- log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID)
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("points.insufficient_points_balance")))
- return
- }
- ctx.Repo = &context.Repository{
- Repository: repo,
- }
-
- req := cloudbrain.GenerateCloudBrainTaskReq{
- Ctx: ctx,
- DisplayJobName: displayJobName,
- JobName: jobName,
- Image: setting.FileNoteBook.ImageGPU,
- Command: command,
- Uuids: "",
- DatasetNames: "",
- DatasetInfos: nil,
- CodePath: storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"),
- ModelPath: storage.GetMinioPath(jobName, cloudbrain.ModelMountPath+"/"),
- BenchmarkPath: storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"),
- Snn4ImageNetPath: storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"),
- BrainScorePath: storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"),
- JobType: jobType,
- Description: getDescription(option),
- BranchName: option.BranchName,
- BootFile: option.File,
- Params: "{\"parameter\":[]}",
- CommitID: "",
- BenchmarkTypeID: 0,
- BenchmarkChildTypeID: 0,
- ResultPath: storage.GetMinioPath(jobName, cloudbrain.ResultPath+"/"),
- Spec: spec,
- }
-
- jobId, err := cloudbrain.GenerateTask(req)
- if err != nil {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error()))
- return
- }
- ctx.JSON(http.StatusOK, models.BaseMessageApi{
- Code: 0,
- Message: jobId,
- })
-
- }
-
- func getCodePath(jobName string) string {
- return setting.JobPath + jobName + cloudbrain.CodeMountPath
- }
-
- func getDescription(option api.CreateFileNotebookJobOption) string {
- return option.OwnerName + "/" + option.ProjectName + "/" + option.File
- }
-
- func modelartsFileNoteBookCreate(ctx *context.Context, option api.CreateFileNotebookJobOption, repo *models.Repository, sourceRepo *models.Repository) {
- displayJobName := cloudbrainService.GetDisplayJobName(ctx.User.Name)
- jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
-
- lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName))
- isOk, err := lock.Lock(models.CloudbrainKeyDuration)
- if !isOk {
- log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err")))
- return
- }
- defer lock.UnLock()
-
- count, err := GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeDebug))
-
- if err != nil {
- log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"])
-
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error."))
- return
- } else {
- if count >= 1 {
- log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK,models.BaseMessageApi{
- Code: 2,
- Message: ctx.Tr("repo.cloudbrain.morethanonejob"),
- })
- return
- }
- }
-
- tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeDebug), displayJobName)
- if err == nil {
- if len(tasks) != 0 {
- log.Error("the job name did already exist", ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("repo.cloudbrain_samejob_err")))
- return
- }
- } else {
- if !models.IsErrJobNotExist(err) {
- log.Error("system error, %v", err, ctx.Data["MsgID"])
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("system error."))
- return
- }
- }
-
- err = downloadCode(sourceRepo, getCodePath(jobName), option.BranchName)
- if err != nil {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed")))
- return
- }
-
- var aiCenterCode = models.AICenterOfCloudBrainTwo
- var specId = setting.FileNoteBook.SpecIdNPU
- if setting.ModelartsCD.Enabled {
- aiCenterCode = models.AICenterOfChengdu
- specId = setting.FileNoteBook.SpecIdNPUCD
- }
- spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{
- JobType: models.JobTypeDebug,
- ComputeResource: models.NPU,
- Cluster: models.OpenICluster,
- AiCenterCode: aiCenterCode})
- if err != nil || spec == nil {
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.wrong_specification")))
- return
- }
- if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) {
- log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID)
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("points.insufficient_points_balance")))
- return
- }
- ctx.Repo = &context.Repository{
- Repository: repo,
- }
-
- var jobId string
- if setting.ModelartsCD.Enabled {
- jobId, err = modelarts_cd.GenerateNotebook(ctx, displayJobName, jobName, "", getDescription(option), setting.FileNoteBook.ImageIdNPUCD, spec, option.File,modelarts.AutoStopDurationMs/4)
- } else {
- jobId, err = modelarts.GenerateNotebook2(ctx, displayJobName, jobName, "", getDescription(option), setting.FileNoteBook.ImageIdNPU, spec, option.File,modelarts.AutoStopDurationMs/4)
- }
-
- if err != nil {
- log.Error("GenerateNotebook2 failed, %v", err, ctx.Data["MsgID"])
-
- ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error()))
-
- return
- }
-
- ctx.JSON(http.StatusOK, models.BaseMessageApi{
- Code: 0,
- Message: jobId,
- })
-
- }
-
- func isNoteBookFileExist(ctx *context.Context, option api.CreateFileNotebookJobOption) (bool, error) {
- repoPathOfNoteBook := models.RepoPath(option.OwnerName, option.ProjectName)
-
- gitRepoOfNoteBook, err := git.OpenRepository(repoPathOfNoteBook)
- if err != nil {
- log.Error("RepoRef Invalid repo "+repoPathOfNoteBook, err.Error())
- return false, err
- }
- // We opened it, we should close it
- defer func() {
- // If it's been set to nil then assume someone else has closed it.
- if gitRepoOfNoteBook != nil {
- gitRepoOfNoteBook.Close()
- }
- }()
- fileExist, err := fileExists(gitRepoOfNoteBook, option.File, option.BranchName)
- if err != nil || !fileExist {
- log.Error("Get file error:", err, ctx.Data["MsgID"])
-
- return false, err
- }
- return true, nil
- }
-
- func uploadCodeFile(repo *models.Repository, codePath string, branchName string, filePath string, jobName string) string {
- err := downloadCode(repo, codePath, branchName)
- if err != nil {
- return "cloudbrain.load_code_failed"
- }
-
- err = uploadOneFileToMinio(codePath, filePath, jobName, cloudbrain.CodeMountPath+"/")
- if err != nil {
- return "cloudbrain.load_code_failed"
- }
- return ""
- }
-
- func fileExists(gitRepo *git.Repository, path string, branch string) (bool, error) {
-
- commit, err := gitRepo.GetBranchCommit(branch)
- if err != nil {
- return false, err
- }
- if _, err := commit.GetTreeEntryByPath(path); err != nil {
- return false, err
- }
- return true, nil
- }
|