package cloudbrain import ( "encoding/json" "errors" "os" "strconv" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" ) const ( Command = `pip3 install jupyterlab==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple;service ssh stop;jupyter lab --no-browser --ip=0.0.0.0 --allow-root --notebook-dir="/code" --port=80 --LabApp.token="" --LabApp.allow_origin="self https://cloudbrain.pcl.ac.cn"` //CommandBenchmark = `echo "start benchmark";python /code/test.py;echo "end benchmark"` CommandBenchmark = `echo "start benchmark";cd /benchmark && bash run_bk.sh;echo "end benchmark"` CodeMountPath = "/code" DataSetMountPath = "/dataset" ModelMountPath = "/model" LogFile = "log.txt" BenchMarkMountPath = "/benchmark" BenchMarkResourceID = 1 Snn4imagenetMountPath = "/snn4imagenet" BrainScoreMountPath = "/brainscore" TaskInfoName = "/taskInfo" Snn4imagenetCommand = `/opt/conda/bin/python /snn4imagenet/testSNN_script.py --modelname '%s' --modelpath '/dataset' --modeldescription '%s'` BrainScoreCommand = `bash /brainscore/brainscore_test_par4shSrcipt.sh -b '%s' -n '%s' -p '/dataset' -d '%s'` SubTaskName = "task1" Success = "S000" DefaultBranchName = "master" ResultPath = "/result" ) var ( ResourceSpecs *models.ResourceSpecs TrainResourceSpecs *models.ResourceSpecs ) type GenerateCloudBrainTaskReq struct { Ctx *context.Context DisplayJobName string JobName string Image string Command string CodePath string ModelPath string BenchmarkPath string Snn4ImageNetPath string BrainScorePath string JobType string GpuQueue string Description string BranchName string BootFile string Params string CommitID string Uuids string DatasetNames string DatasetInfos map[string]models.DatasetInfo BenchmarkTypeID int BenchmarkChildTypeID int ResourceSpecId int ResultPath string TrainUrl string ModelName string ModelVersion string CkptName string } func isAdminOrOwnerOrJobCreater(ctx *context.Context, job *models.Cloudbrain, err error) bool { if !ctx.IsSigned { return false } if err != nil { return ctx.IsUserRepoOwner() || ctx.IsUserSiteAdmin() } else { return ctx.IsUserRepoOwner() || ctx.IsUserSiteAdmin() || ctx.User.ID == job.UserID } } func CanDeleteJob(ctx *context.Context, job *models.Cloudbrain) bool { return isAdminOrOwnerOrJobCreater(ctx, job, nil) } func CanCreateOrDebugJob(ctx *context.Context) bool { if !ctx.IsSigned { return false } return ctx.Repo.CanWrite(models.UnitTypeCloudBrain) } func CanModifyJob(ctx *context.Context, job *models.Cloudbrain) bool { return isAdminOrJobCreater(ctx, job, nil) } func isAdminOrJobCreater(ctx *context.Context, job *models.Cloudbrain, err error) bool { if !ctx.IsSigned { return false } if err != nil { return ctx.IsUserSiteAdmin() } else { return ctx.IsUserSiteAdmin() || ctx.User.ID == job.UserID } } func isAdminOrImageCreater(ctx *context.Context, image *models.Image, err error) bool { if !ctx.IsSigned { return false } if err != nil { return ctx.IsUserSiteAdmin() } else { return ctx.IsUserSiteAdmin() || ctx.User.ID == image.UID } } func AdminOrOwnerOrJobCreaterRight(ctx *context.Context) { var ID = ctx.Params(":id") job, err := models.GetCloudbrainByID(ID) if err != nil { log.Error("GetCloudbrainByID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrOwnerOrJobCreater(ctx, job, err) { log.Error("!isAdminOrOwnerOrJobCreater error:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrJobCreaterRight(ctx *context.Context) { var ID = ctx.Params(":id") job, err := models.GetCloudbrainByID(ID) if err != nil { log.Error("GetCloudbrainByID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrJobCreater(ctx, job, err) { log.Error("!isAdminOrJobCreater error:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrOwnerOrJobCreaterRightForTrain(ctx *context.Context) { var jobID = ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrOwnerOrJobCreater(ctx, job, err) { log.Error("!isAdminOrOwnerOrJobCreater failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrJobCreaterRightForTrain(ctx *context.Context) { var jobID = ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrJobCreater(ctx, job, err) { log.Error("!isAdminOrJobCreater errot:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrImageCreaterRight(ctx *context.Context) { id, err := strconv.ParseInt(ctx.Params(":id"), 10, 64) var image *models.Image if err != nil { log.Error("Get Image by ID failed:%v", err.Error()) } else { image, err = models.GetImageByID(id) if err != nil { log.Error("Get Image by ID failed:%v", err.Error()) return } } if !isAdminOrImageCreater(ctx, image, err) { log.Error("!isAdminOrImageCreater error:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func GenerateTask(req GenerateCloudBrainTaskReq) error { var resourceSpec *models.ResourceSpec var versionCount int if req.JobType == string(models.JobTypeTrain) || req.JobType == string(models.JobTypeInference) { versionCount = 1 if TrainResourceSpecs == nil { json.Unmarshal([]byte(setting.TrainResourceSpecs), &TrainResourceSpecs) } for _, spec := range TrainResourceSpecs.ResourceSpec { if req.ResourceSpecId == spec.Id { resourceSpec = spec } } } else { if ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs) } for _, spec := range ResourceSpecs.ResourceSpec { if req.ResourceSpecId == spec.Id { resourceSpec = spec } } } if resourceSpec == nil { log.Error("no such resourceSpecId(%d)", req.ResourceSpecId, req.Ctx.Data["MsgID"]) return errors.New("no such resourceSpec") } volumes := []models.Volume{ { HostPath: models.StHostPath{ Path: req.CodePath, MountPath: CodeMountPath, ReadOnly: false, }, }, { HostPath: models.StHostPath{ Path: req.BenchmarkPath, MountPath: BenchMarkMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: req.Snn4ImageNetPath, MountPath: Snn4imagenetMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: req.BrainScorePath, MountPath: BrainScoreMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: req.ResultPath, MountPath: ResultPath, ReadOnly: false, }, }, } if len(req.DatasetInfos) == 1 { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: req.DatasetInfos[req.Uuids].DataLocalPath, MountPath: DataSetMountPath, ReadOnly: true, }, }) } else { for _, dataset := range req.DatasetInfos { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: dataset.DataLocalPath, MountPath: DataSetMountPath + "/" + dataset.Name, ReadOnly: true, }, }) } } createTime := timeutil.TimeStampNow() jobResult, err := CreateJob(req.JobName, models.CreateJobParams{ JobName: req.JobName, RetryCount: 1, GpuType: req.GpuQueue, Image: req.Image, TaskRoles: []models.TaskRole{ { Name: SubTaskName, TaskNumber: 1, MinSucceededTaskCount: 1, MinFailedTaskCount: 1, CPUNumber: resourceSpec.CpuNum, GPUNumber: resourceSpec.GpuNum, MemoryMB: resourceSpec.MemMiB, ShmMB: resourceSpec.ShareMemMiB, Command: req.Command, NeedIBDevice: false, IsMainRole: false, UseNNI: false, }, }, Volumes: volumes, }) if err != nil { log.Error("CreateJob failed:", err.Error(), req.Ctx.Data["MsgID"]) return err } if jobResult.Code != Success { log.Error("CreateJob(%s) failed:%s", req.JobName, jobResult.Msg, req.Ctx.Data["MsgID"]) return errors.New(jobResult.Msg) } var jobID = jobResult.Payload["jobId"].(string) err = models.CreateCloudbrain(&models.Cloudbrain{ Status: string(models.JobWaiting), UserID: req.Ctx.User.ID, RepoID: req.Ctx.Repo.Repository.ID, JobID: jobID, JobName: req.JobName, DisplayJobName: req.DisplayJobName, SubTaskName: SubTaskName, JobType: req.JobType, Type: models.TypeCloudBrainOne, Uuid: req.Uuids, Image: req.Image, GpuQueue: req.GpuQueue, ResourceSpecId: req.ResourceSpecId, ComputeResource: models.GPUResource, BenchmarkTypeID: req.BenchmarkTypeID, BenchmarkChildTypeID: req.BenchmarkChildTypeID, Description: req.Description, IsLatestVersion: "1", VersionCount: versionCount, BranchName: req.BranchName, BootFile: req.BootFile, DatasetName: req.DatasetNames, Parameters: req.Params, TrainUrl: req.TrainUrl, ModelName: req.ModelName, ModelVersion: req.ModelVersion, CkptName: req.CkptName, ResultUrl: req.ResultPath, CreatedUnix: createTime, UpdatedUnix: createTime, CommitID: req.CommitID, }) if err != nil { return err } task, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err.Error()) return err } stringId := strconv.FormatInt(task.ID, 10) if IsBenchmarkJob(req.JobType) { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, stringId, req.DisplayJobName, models.ActionCreateBenchMarkTask) } else if string(models.JobTypeTrain) == req.JobType { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, jobID, req.DisplayJobName, models.ActionCreateGPUTrainTask) } else if string(models.JobTypeInference) == req.JobType { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, jobID, req.DisplayJobName, models.ActionCreateInferenceTask) } else { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, stringId, req.DisplayJobName, models.ActionCreateDebugGPUTask) } return nil } func IsBenchmarkJob(jobType string) bool { return string(models.JobTypeBenchmark) == jobType || string(models.JobTypeBrainScore) == jobType || string(models.JobTypeSnn4imagenet) == jobType } func GetWaitingCloudbrainCount(cloudbrainType int, computeResource string, jobTypes ...models.JobType) int64 { num, err := models.GetWaitingCloudbrainCount(cloudbrainType, computeResource, jobTypes...) if err != nil { log.Warn("Get waiting count err", err) num = 0 } return num } func RestartTask(ctx *context.Context, task *models.Cloudbrain, newID *string) error { jobName := task.JobName var resourceSpec *models.ResourceSpec if ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs) } for _, spec := range ResourceSpecs.ResourceSpec { if task.ResourceSpecId == spec.Id { resourceSpec = spec } } if resourceSpec == nil { log.Error("no such resourceSpecId(%d)", task.ResourceSpecId, ctx.Data["MsgID"]) return errors.New("no such resourceSpec") } datasetInfos, _, err := models.GetDatasetInfo(task.Uuid) if err != nil { log.Error("GetDatasetInfo failed:%v", err, ctx.Data["MsgID"]) return err } volumes := []models.Volume{ { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, CodeMountPath+"/"), MountPath: CodeMountPath, ReadOnly: false, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, ModelMountPath+"/"), MountPath: ModelMountPath, ReadOnly: false, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, BenchMarkMountPath+"/"), MountPath: BenchMarkMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, Snn4imagenetMountPath+"/"), MountPath: Snn4imagenetMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, BrainScoreMountPath+"/"), MountPath: BrainScoreMountPath, ReadOnly: true, }, }, } if len(datasetInfos) == 1 { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: datasetInfos[task.Uuid].DataLocalPath, MountPath: DataSetMountPath, ReadOnly: true, }, }) } else { for _, dataset := range datasetInfos { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: dataset.DataLocalPath, MountPath: DataSetMountPath + "/" + dataset.Name, ReadOnly: true, }, }) } } createTime := timeutil.TimeStampNow() jobResult, err := CreateJob(jobName, models.CreateJobParams{ JobName: jobName, RetryCount: 1, GpuType: task.GpuQueue, Image: task.Image, TaskRoles: []models.TaskRole{ { Name: SubTaskName, TaskNumber: 1, MinSucceededTaskCount: 1, MinFailedTaskCount: 1, CPUNumber: resourceSpec.CpuNum, GPUNumber: resourceSpec.GpuNum, MemoryMB: resourceSpec.MemMiB, ShmMB: resourceSpec.ShareMemMiB, Command: Command, NeedIBDevice: false, IsMainRole: false, UseNNI: false, }, }, Volumes: volumes, }) if err != nil { log.Error("CreateJob failed:%v", err.Error(), ctx.Data["MsgID"]) return err } if jobResult.Code != Success { log.Error("CreateJob(%s) failed:%s", jobName, jobResult.Msg, ctx.Data["MsgID"]) return errors.New(jobResult.Msg) } var jobID = jobResult.Payload["jobId"].(string) newTask := &models.Cloudbrain{ Status: string(models.JobWaiting), UserID: task.UserID, RepoID: task.RepoID, JobID: jobID, JobName: task.JobName, DisplayJobName: task.DisplayJobName, SubTaskName: task.SubTaskName, JobType: task.JobType, Type: task.Type, Uuid: task.Uuid, DatasetName: task.DatasetName, Image: task.Image, GpuQueue: task.GpuQueue, ResourceSpecId: task.ResourceSpecId, ComputeResource: task.ComputeResource, CreatedUnix: createTime, UpdatedUnix: createTime, BranchName: task.BranchName, } err = models.RestartCloudbrain(task, newTask) if err != nil { log.Error("RestartCloudbrain(%s) failed:%v", jobName, err.Error(), ctx.Data["MsgID"]) return err } stringId := strconv.FormatInt(newTask.ID, 10) *newID = stringId notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, task.DisplayJobName, models.ActionCreateDebugGPUTask) return nil } func DelCloudBrainJob(jobId string) string { task, err := models.GetCloudbrainByJobID(jobId) if err != nil { log.Error("get cloud brain err:", err) return "cloudbrain.Delete_failed" } if task.Status != string(models.JobStopped) && task.Status != string(models.JobFailed) && task.Status != string(models.JobSucceeded) { log.Error("the job(%s) has not been stopped", task.JobName) return "cloudbrain.Not_Stopped" } err = models.DeleteJob(task) if err != nil { log.Error("DeleteJob failed:", err) return "cloudbrain.Delete_failed" } deleteJobStorage(task.JobName) return "" } func deleteJobStorage(jobName string) error { //delete local localJobPath := setting.JobPath + jobName err := os.RemoveAll(localJobPath) if err != nil { log.Error("RemoveAll(%s) failed:%v", localJobPath, err) } dirPath := setting.CBCodePathPrefix + jobName + "/" err = storage.Attachments.DeleteDir(dirPath) if err != nil { log.Error("DeleteDir(%s) failed:%v", localJobPath, err) } return nil }