package grampus import ( "encoding/json" "strings" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/timeutil" ) const ( JobPath = "job/" ProcessorTypeNPU = "npu.huawei.com/NPU" ProcessorTypeGPU = "nvidia.com/gpu" GpuWorkDir = "/tmp/" NpuWorkDir = "/cache/" CommandPrepareScript = ";mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" + "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_obs downloader_for_minio uploader_for_minio;" //CommandPrepareScript = "pwd;cd /cache;mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" + // "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_obs downloader_for_minio uploader_for_minio;" CodeArchiveName = "master.zip" ) var ( poolInfos *models.PoolInfos FlavorInfos *models.FlavorInfos ImageInfos *models.ImageInfosModelArts SpecialPools *models.SpecialPools ) type GenerateTrainJobReq struct { JobName string Command string ResourceSpecId string ImageUrl string //与image_id二选一,都有的情况下优先image_url ImageId string DisplayJobName string Uuid string Description string CodeObsPath string BootFile string BootFileUrl string DataUrl string TrainUrl string WorkServerNumber int EngineID int64 CommitID string IsLatestVersion string BranchName string PreVersionId int64 PreVersionName string FlavorName string VersionCount int EngineName string TotalVersionCount int ComputeResource string ProcessType string DatasetName string Params string } func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) { createTime := timeutil.TimeStampNow() centerID, centerName := getCentersParamter(ctx, req) jobResult, err := createJob(models.CreateGrampusJobRequest{ Name: req.JobName, Tasks: []models.GrampusTasks{ { Name: req.JobName, Command: req.Command, ResourceSpecId: req.ResourceSpecId, ImageId: req.ImageId, ImageUrl: req.ImageUrl, CenterID: centerID, CenterName: centerName, ReplicaNum: 1, }, }, }) if err != nil { log.Error("createJob failed: %v", err.Error()) return err } jobID := jobResult.JobInfo.JobID err = models.CreateCloudbrain(&models.Cloudbrain{ Status: TransTrainJobStatus(jobResult.JobInfo.Status), UserID: ctx.User.ID, RepoID: ctx.Repo.Repository.ID, JobID: jobID, JobName: req.JobName, DisplayJobName: req.DisplayJobName, JobType: string(models.JobTypeTrain), Type: models.TypeC2Net, Uuid: req.Uuid, DatasetName: req.DatasetName, CommitID: req.CommitID, IsLatestVersion: req.IsLatestVersion, ComputeResource: req.ComputeResource, ImageID: req.ImageId, TrainUrl: req.TrainUrl, BranchName: req.BranchName, Parameters: req.Params, BootFile: req.BootFile, DataUrl: req.DataUrl, FlavorCode: req.ResourceSpecId, Description: req.Description, WorkServerNumber: req.WorkServerNumber, FlavorName: req.FlavorName, EngineName: req.EngineName, VersionCount: req.VersionCount, TotalVersionCount: req.TotalVersionCount, CreatedUnix: createTime, UpdatedUnix: createTime, }) if err != nil { log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error()) return err } var actionType models.ActionType if req.ComputeResource == models.NPUResource { actionType = models.ActionCreateGrampusNPUTrainTask } else if req.ComputeResource == models.GPUResource { actionType = models.ActionCreateGrampusGPUTrainTask } notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType) return nil } func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) { var centerID []string var centerName []string includeCenters := make(map[string]string) excludeCenters := make(map[string]string) if SpecialPools != nil { for _, pool := range SpecialPools.Pools { if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) { org, _ := models.GetOrgByName(pool.Org) if org != nil { isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) if isOrgMember { for _, info := range pool.Pool { includeCenters[info.Queue] = info.Value } } else { for _, info := range pool.Pool { excludeCenters[info.Queue] = info.Value } } } } } } if len(includeCenters) > 0 { //如果有专属资源池,根据专属资源池指定智算中心 for k, v := range includeCenters { centerID = append(centerID, k) centerName = append(centerName, v) } } else if len(excludeCenters) > 0 { //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心 allCenters := make(map[string]string) specs, err := GetResourceSpecs(req.ProcessType) if err == nil { for _, info := range specs.Infos { for _, center := range info.Centers { allCenters[center.ID] = center.Name } } } for k, _ := range excludeCenters { delete(allCenters, k) } for k, v := range allCenters { centerID = append(centerID, k) centerName = append(centerName, v) } } return centerID, centerName } func TransTrainJobStatus(status string) string { if status == models.GrampusStatusPending { status = models.GrampusStatusWaiting } return strings.ToUpper(status) } func InitSpecialPool() { if SpecialPools == nil && setting.Grampus.SpecialPools != "" { json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools) } }