package grampus import ( "encoding/json" "strings" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" ) const ( JobPath = "job/" ProcessorTypeNPU = "npu.huawei.com/NPU" ProcessorTypeGPU = "nvidia.com/gpu" GpuWorkDir = "/tmp/" NpuWorkDir = "/cache/" NpuLocalLogUrl = "/tmp/train.log" CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;" CodeArchiveName = "master.zip" BucketRemote = "grampus" RemoteModelPath = "/output/" + models.ModelSuffix ) var ( poolInfos *models.PoolInfos FlavorInfos *setting.StFlavorInfos ImageInfos *setting.StImageInfosModelArts SpecialPools *models.SpecialPools CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" + "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;" ) type GenerateTrainJobReq struct { JobName string Command string ImageUrl string //与image_id二选一,都有的情况下优先image_url ImageId string DisplayJobName string Uuid string Description string CodeObsPath string BootFile string BootFileUrl string DataUrl string TrainUrl string WorkServerNumber int EngineID int64 CommitID string IsLatestVersion string BranchName string PreVersionId int64 PreVersionName string VersionCount int EngineName string TotalVersionCount int ComputeResource string ProcessType string DatasetNames string DatasetInfos map[string]models.DatasetInfo Params string ModelName string LabelName string CkptName string ModelVersion string PreTrainModelPath string PreTrainModelUrl string Spec *models.Specification CodeName string } func getEndPoint() string { index := strings.Index(setting.Endpoint, "//") endpoint := setting.Endpoint[index+2:] return endpoint } func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset { var datasetGrampus []models.GrampusDataset endPoint := getEndPoint() for _, datasetInfo := range datasetInfos { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: datasetInfo.FullName, Bucket: setting.Bucket, EndPoint: endPoint, ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName, }) } return datasetGrampus } func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) { createTime := timeutil.TimeStampNow() centerID, centerName := getCentersParamter(ctx, req) var datasetGrampus, modelGrampus []models.GrampusDataset var codeGrampus models.GrampusDataset if ProcessorTypeNPU == req.ProcessType { datasetGrampus = getDatasetGrampus(req.DatasetInfos) if len(req.ModelName) != 0 { modelGrampus = []models.GrampusDataset{ { Name: req.ModelName, Bucket: setting.Bucket, EndPoint: getEndPoint(), ObjectKey: req.PreTrainModelPath, }, } } codeGrampus = models.GrampusDataset{ Name: req.CodeName, Bucket: setting.Bucket, EndPoint: getEndPoint(), ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip", } } jobResult, err := createJob(models.CreateGrampusJobRequest{ Name: req.JobName, Tasks: []models.GrampusTasks{ { Name: req.JobName, Command: req.Command, ResourceSpecId: req.Spec.SourceSpecId, ImageId: req.ImageId, ImageUrl: req.ImageUrl, CenterID: centerID, CenterName: centerName, ReplicaNum: 1, Datasets: datasetGrampus, Models: modelGrampus, Code: codeGrampus, BootFile: req.BootFile, }, }, }) if err != nil { log.Error("createJob failed: %v", err.Error()) return err } jobID := jobResult.JobInfo.JobID err = models.CreateCloudbrain(&models.Cloudbrain{ Status: TransTrainJobStatus(jobResult.JobInfo.Status), UserID: ctx.User.ID, RepoID: ctx.Repo.Repository.ID, JobID: jobID, JobName: req.JobName, DisplayJobName: req.DisplayJobName, JobType: string(models.JobTypeTrain), Type: models.TypeC2Net, Uuid: req.Uuid, DatasetName: req.DatasetNames, CommitID: req.CommitID, IsLatestVersion: req.IsLatestVersion, ComputeResource: req.ComputeResource, ImageID: req.ImageId, TrainUrl: req.TrainUrl, BranchName: req.BranchName, Parameters: req.Params, BootFile: req.BootFile, DataUrl: req.DataUrl, Description: req.Description, WorkServerNumber: req.WorkServerNumber, EngineName: req.EngineName, VersionCount: req.VersionCount, TotalVersionCount: req.TotalVersionCount, CreatedUnix: createTime, UpdatedUnix: createTime, Spec: req.Spec, ModelName: req.ModelName, ModelVersion: req.ModelVersion, LabelName: req.LabelName, PreTrainModelUrl: req.PreTrainModelUrl, CkptName: req.CkptName, }) if err != nil { log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error()) return err } var actionType models.ActionType if req.ComputeResource == models.NPUResource { actionType = models.ActionCreateGrampusNPUTrainTask } else if req.ComputeResource == models.GPUResource { actionType = models.ActionCreateGrampusGPUTrainTask } notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType) return nil } func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) { var centerID []string var centerName []string includeCenters := make(map[string]string) excludeCenters := make(map[string]string) if SpecialPools != nil { for _, pool := range SpecialPools.Pools { if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) { org, _ := models.GetOrgByName(pool.Org) if org != nil { isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) if isOrgMember { for _, info := range pool.Pool { includeCenters[info.Queue] = info.Value } } else { for _, info := range pool.Pool { excludeCenters[info.Queue] = info.Value } } } } } } if len(includeCenters) > 0 { //如果有专属资源池,根据专属资源池指定智算中心 for k, v := range includeCenters { centerID = append(centerID, k) centerName = append(centerName, v) } } else if len(excludeCenters) > 0 { //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心 allCenters := make(map[string]string) specs, err := GetResourceSpecs(req.ProcessType) if err == nil { for _, info := range specs.Infos { for _, center := range info.Centers { allCenters[center.ID] = center.Name } } } for k, _ := range excludeCenters { delete(allCenters, k) } for k, v := range allCenters { centerID = append(centerID, k) centerName = append(centerName, v) } } return centerID, centerName } func TransTrainJobStatus(status string) string { if status == models.GrampusStatusPending { status = models.GrampusStatusWaiting } return strings.ToUpper(status) } func InitSpecialPool() { if SpecialPools == nil && setting.Grampus.SpecialPools != "" { json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools) } } func GetNpuModelRemoteObsUrl(jobName string) string { return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName) } func GetNpuModelObjectKey(jobName string) string { return setting.CodePathPrefix + jobName + RemoteModelPath } func GetRemoteEndPoint(aiCenterID string) string { var endPoint string for _, info := range setting.CenterInfos.Info { if info.CenterID == aiCenterID { endPoint = info.Endpoint break } } return endPoint } func GetCenterProxy(aiCenterID string) string { var proxy string for _, info := range setting.CenterInfos.Info { if info.CenterID == aiCenterID { proxy = info.StorageProxyServer break } } return proxy }