package cloudbrain import ( "encoding/json" "errors" "strconv" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" ) const ( Command = `pip3 install jupyterlab==2.2.5 -i https://pypi.tuna.tsinghua.edu.cn/simple;service ssh stop;jupyter lab --no-browser --ip=0.0.0.0 --allow-root --notebook-dir="/code" --port=80 --LabApp.token="" --LabApp.allow_origin="self https://cloudbrain.pcl.ac.cn"` //CommandBenchmark = `echo "start benchmark";python /code/test.py;echo "end benchmark"` CommandBenchmark = `echo "start benchmark";cd /benchmark && bash run_bk.sh;echo "end benchmark"` CodeMountPath = "/code" DataSetMountPath = "/dataset" ModelMountPath = "/model" LogFile = "log.txt" BenchMarkMountPath = "/benchmark" BenchMarkResourceID = 1 Snn4imagenetMountPath = "/snn4imagenet" BrainScoreMountPath = "/brainscore" TaskInfoName = "/taskInfo" Snn4imagenetCommand = `/opt/conda/bin/python /snn4imagenet/testSNN_script.py --modelname '%s' --modelpath '/dataset' --modeldescription '%s'` BrainScoreCommand = `bash /brainscore/brainscore_test_par4shSrcipt.sh -b '%s' -n '%s' -p '/dataset' -d '%s'` SubTaskName = "task1" Success = "S000" DefaultBranchName = "master" ) var ( ResourceSpecs *models.ResourceSpecs TrainResourceSpecs *models.ResourceSpecs SpecialPools *models.SpecialPools ) type GenerateCloudBrainTaskReq struct { Ctx *context.Context DisplayJobName string JobName string Image string Command string CodePath string ModelPath string BenchmarkPath string Snn4ImageNetPath string BrainScorePath string JobType string GpuQueue string Description string BranchName string BootFile string Params string CommitID string Uuids string DatasetNames string DatasetInfos map[string]models.DatasetInfo BenchmarkTypeID int BenchmarkChildTypeID int ResourceSpecId int } func isAdminOrOwnerOrJobCreater(ctx *context.Context, job *models.Cloudbrain, err error) bool { if !ctx.IsSigned { return false } if err != nil { return ctx.IsUserRepoOwner() || ctx.IsUserSiteAdmin() } else { return ctx.IsUserRepoOwner() || ctx.IsUserSiteAdmin() || ctx.User.ID == job.UserID } } func CanDeleteJob(ctx *context.Context, job *models.Cloudbrain) bool { return isAdminOrOwnerOrJobCreater(ctx, job, nil) } func CanCreateOrDebugJob(ctx *context.Context) bool { if !ctx.IsSigned { return false } return ctx.Repo.CanWrite(models.UnitTypeCloudBrain) } func CanModifyJob(ctx *context.Context, job *models.Cloudbrain) bool { return isAdminOrJobCreater(ctx, job, nil) } func isAdminOrJobCreater(ctx *context.Context, job *models.Cloudbrain, err error) bool { if !ctx.IsSigned { return false } if err != nil { return ctx.IsUserSiteAdmin() } else { return ctx.IsUserSiteAdmin() || ctx.User.ID == job.UserID } } func isAdminOrImageCreater(ctx *context.Context, image *models.Image, err error) bool { if !ctx.IsSigned { return false } if err != nil { return ctx.IsUserSiteAdmin() } else { return ctx.IsUserSiteAdmin() || ctx.User.ID == image.UID } } func AdminOrOwnerOrJobCreaterRight(ctx *context.Context) { var ID = ctx.Params(":id") job, err := models.GetCloudbrainByID(ID) if err != nil { log.Error("GetCloudbrainByID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrOwnerOrJobCreater(ctx, job, err) { log.Error("!isAdminOrOwnerOrJobCreater error:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrJobCreaterRight(ctx *context.Context) { var ID = ctx.Params(":id") job, err := models.GetCloudbrainByID(ID) if err != nil { log.Error("GetCloudbrainByID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrJobCreater(ctx, job, err) { log.Error("!isAdminOrJobCreater error:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrOwnerOrJobCreaterRightForTrain(ctx *context.Context) { var jobID = ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrOwnerOrJobCreater(ctx, job, err) { log.Error("!isAdminOrOwnerOrJobCreater failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrJobCreaterRightForTrain(ctx *context.Context) { var jobID = ctx.Params(":jobid") job, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } ctx.Cloudbrain = job if !isAdminOrJobCreater(ctx, job, err) { log.Error("!isAdminOrJobCreater errot:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func AdminOrImageCreaterRight(ctx *context.Context) { id, err := strconv.ParseInt(ctx.Params(":id"), 10, 64) var image *models.Image if err != nil { log.Error("Get Image by ID failed:%v", err.Error()) } else { image, err = models.GetImageByID(id) if err != nil { log.Error("Get Image by ID failed:%v", err.Error()) return } } if !isAdminOrImageCreater(ctx, image, err) { log.Error("!isAdminOrImageCreater error:%v", err.Error()) ctx.NotFound(ctx.Req.URL.RequestURI(), nil) } } func GenerateTask(req GenerateCloudBrainTaskReq) error { var resourceSpec *models.ResourceSpec var versionCount int if req.JobType == string(models.JobTypeTrain) { versionCount = 1 if TrainResourceSpecs == nil { json.Unmarshal([]byte(setting.TrainResourceSpecs), &TrainResourceSpecs) } for _, spec := range TrainResourceSpecs.ResourceSpec { if req.ResourceSpecId == spec.Id { resourceSpec = spec break } } } else { if ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs) } for _, spec := range ResourceSpecs.ResourceSpec { if req.ResourceSpecId == spec.Id { resourceSpec = spec break } } } //如果没有匹配到spec信息,尝试从专属资源池获取 if resourceSpec == nil && SpecialPools != nil { for _, specialPool := range SpecialPools.Pools { if resourceSpec != nil { break } if specialPool.ResourceSpec != nil { if IsElementExist(specialPool.JobType, req.JobType) && IsQueueInSpecialtPool(specialPool.Pool, req.GpuQueue) { for _, spec := range specialPool.ResourceSpec { if req.ResourceSpecId == spec.Id { resourceSpec = spec break } } } } } } if resourceSpec == nil { log.Error("no such resourceSpecId(%d)", req.ResourceSpecId, req.Ctx.Data["MsgID"]) return errors.New("no such resourceSpec") } volumes := []models.Volume{ { HostPath: models.StHostPath{ Path: req.CodePath, MountPath: CodeMountPath, ReadOnly: false, }, }, { HostPath: models.StHostPath{ Path: req.ModelPath, MountPath: ModelMountPath, ReadOnly: false, }, }, { HostPath: models.StHostPath{ Path: req.BenchmarkPath, MountPath: BenchMarkMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: req.Snn4ImageNetPath, MountPath: Snn4imagenetMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: req.BrainScorePath, MountPath: BrainScoreMountPath, ReadOnly: true, }, }, } if len(req.DatasetInfos) == 1 { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: req.DatasetInfos[req.Uuids].DataLocalPath, MountPath: DataSetMountPath, ReadOnly: true, }, }) } else { for _, dataset := range req.DatasetInfos { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: dataset.DataLocalPath, MountPath: DataSetMountPath + "/" + dataset.Name, ReadOnly: true, }, }) } } createTime := timeutil.TimeStampNow() jobResult, err := CreateJob(req.JobName, models.CreateJobParams{ JobName: req.JobName, RetryCount: 1, GpuType: req.GpuQueue, Image: req.Image, TaskRoles: []models.TaskRole{ { Name: SubTaskName, TaskNumber: 1, MinSucceededTaskCount: 1, MinFailedTaskCount: 1, CPUNumber: resourceSpec.CpuNum, GPUNumber: resourceSpec.GpuNum, MemoryMB: resourceSpec.MemMiB, ShmMB: resourceSpec.ShareMemMiB, Command: req.Command, NeedIBDevice: false, IsMainRole: false, UseNNI: false, }, }, Volumes: volumes, }) if err != nil { log.Error("CreateJob failed:", err.Error(), req.Ctx.Data["MsgID"]) return err } if jobResult.Code != Success { log.Error("CreateJob(%s) failed:%s", req.JobName, jobResult.Msg, req.Ctx.Data["MsgID"]) return errors.New(jobResult.Msg) } var jobID = jobResult.Payload["jobId"].(string) err = models.CreateCloudbrain(&models.Cloudbrain{ Status: string(models.JobWaiting), UserID: req.Ctx.User.ID, RepoID: req.Ctx.Repo.Repository.ID, JobID: jobID, JobName: req.JobName, DisplayJobName: req.DisplayJobName, SubTaskName: SubTaskName, JobType: req.JobType, Type: models.TypeCloudBrainOne, Uuid: req.Uuids, Image: req.Image, GpuQueue: req.GpuQueue, ResourceSpecId: req.ResourceSpecId, ComputeResource: models.GPUResource, BenchmarkTypeID: req.BenchmarkTypeID, BenchmarkChildTypeID: req.BenchmarkChildTypeID, Description: req.Description, IsLatestVersion: "1", VersionCount: versionCount, BranchName: req.BranchName, BootFile: req.BootFile, DatasetName: req.DatasetNames, Parameters: req.Params, CreatedUnix: createTime, UpdatedUnix: createTime, CommitID: req.CommitID, }) if err != nil { return err } task, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err.Error()) return err } stringId := strconv.FormatInt(task.ID, 10) if IsBenchmarkJob(req.JobType) { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, stringId, req.DisplayJobName, models.ActionCreateBenchMarkTask) } else if string(models.JobTypeTrain) == req.JobType { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, jobID, req.DisplayJobName, models.ActionCreateGPUTrainTask) } else { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, stringId, req.DisplayJobName, models.ActionCreateDebugGPUTask) } return nil } func IsBenchmarkJob(jobType string) bool { return string(models.JobTypeBenchmark) == jobType || string(models.JobTypeBrainScore) == jobType || string(models.JobTypeSnn4imagenet) == jobType } func RestartTask(ctx *context.Context, task *models.Cloudbrain, newID *string) error { jobName := task.JobName var resourceSpec *models.ResourceSpec if ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs) } for _, spec := range ResourceSpecs.ResourceSpec { if task.ResourceSpecId == spec.Id { resourceSpec = spec } } if resourceSpec == nil { log.Error("no such resourceSpecId(%d)", task.ResourceSpecId, ctx.Data["MsgID"]) return errors.New("no such resourceSpec") } datasetInfos, _, err := models.GetDatasetInfo(task.Uuid) if err != nil { log.Error("GetDatasetInfo failed:%v", err, ctx.Data["MsgID"]) return err } volumes := []models.Volume{ { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, CodeMountPath+"/"), MountPath: CodeMountPath, ReadOnly: false, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, ModelMountPath+"/"), MountPath: ModelMountPath, ReadOnly: false, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, BenchMarkMountPath+"/"), MountPath: BenchMarkMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, Snn4imagenetMountPath+"/"), MountPath: Snn4imagenetMountPath, ReadOnly: true, }, }, { HostPath: models.StHostPath{ Path: storage.GetMinioPath(jobName, BrainScoreMountPath+"/"), MountPath: BrainScoreMountPath, ReadOnly: true, }, }, } if len(datasetInfos) == 1 { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: datasetInfos[task.Uuid].DataLocalPath, MountPath: DataSetMountPath, ReadOnly: true, }, }) } else { for _, dataset := range datasetInfos { volumes = append(volumes, models.Volume{ HostPath: models.StHostPath{ Path: dataset.DataLocalPath, MountPath: DataSetMountPath + "/" + dataset.Name, ReadOnly: true, }, }) } } createTime := timeutil.TimeStampNow() jobResult, err := CreateJob(jobName, models.CreateJobParams{ JobName: jobName, RetryCount: 1, GpuType: task.GpuQueue, Image: task.Image, TaskRoles: []models.TaskRole{ { Name: SubTaskName, TaskNumber: 1, MinSucceededTaskCount: 1, MinFailedTaskCount: 1, CPUNumber: resourceSpec.CpuNum, GPUNumber: resourceSpec.GpuNum, MemoryMB: resourceSpec.MemMiB, ShmMB: resourceSpec.ShareMemMiB, Command: Command, NeedIBDevice: false, IsMainRole: false, UseNNI: false, }, }, Volumes: volumes, }) if err != nil { log.Error("CreateJob failed:%v", err.Error(), ctx.Data["MsgID"]) return err } if jobResult.Code != Success { log.Error("CreateJob(%s) failed:%s", jobName, jobResult.Msg, ctx.Data["MsgID"]) return errors.New(jobResult.Msg) } var jobID = jobResult.Payload["jobId"].(string) newTask := &models.Cloudbrain{ Status: string(models.JobWaiting), UserID: task.UserID, RepoID: task.RepoID, JobID: jobID, JobName: task.JobName, DisplayJobName: task.DisplayJobName, SubTaskName: task.SubTaskName, JobType: task.JobType, Type: task.Type, Uuid: task.Uuid, DatasetName: task.DatasetName, Image: task.Image, GpuQueue: task.GpuQueue, ResourceSpecId: task.ResourceSpecId, ComputeResource: task.ComputeResource, CreatedUnix: createTime, UpdatedUnix: createTime, BranchName: task.BranchName, } err = models.RestartCloudbrain(task, newTask) if err != nil { log.Error("RestartCloudbrain(%s) failed:%v", jobName, err.Error(), ctx.Data["MsgID"]) return err } stringId := strconv.FormatInt(newTask.ID, 10) *newID = stringId notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, task.DisplayJobName, models.ActionCreateDebugGPUTask) return nil } func InitSpecialPool() { if SpecialPools == nil && setting.SpecialPools != "" { json.Unmarshal([]byte(setting.SpecialPools), &SpecialPools) } } func IsResourceSpecInSpecialPool(resourceSpecs []*models.ResourceSpec, resourceSpecId int) bool { if resourceSpecs == nil || len(resourceSpecs) == 0 { return true } for _, v := range resourceSpecs { if v.Id == resourceSpecId { return true } } return false } func IsQueueInSpecialtPool(pool []*models.GpuInfo, queue string) bool { for _, v := range pool { if v.Queue == queue { return true } } return false } func IsElementExist(s []string, str string) bool { for _, v := range s { if v == str { return true } } return false }