package grampus import ( "fmt" "strconv" "strings" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/timeutil" ) const ( JobPath = "job/" ProcessorTypeNPU = "npu.huawei.com/NPU" ProcessorTypeGPU = "nvidia.com/gpu" ProcessorTypeGCU = "enflame-tech.com/gcu" GpuWorkDir = "/tmp/" NpuWorkDir = "/cache/" NpuLocalLogUrl = "/tmp/train.log" CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;" CodeArchiveName = "master.zip" BucketRemote = "grampus" RemoteModelPath = "/output/" + models.ModelSuffix autoStopDurationMs = 4 * 60 * 60 * 1000 CommandGpuDebug = "mkdir -p /dataset;%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --ServerApp.shutdown_no_activity_timeout=%s --TerminalManager.cull_inactive_timeout=%s --TerminalManager.cull_interval=%s --MappingKernelManager.cull_idle_timeout=%s --MappingKernelManager.cull_interval=%s --MappingKernelManager.cull_connected=True --MappingKernelManager.cull_busy=True --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;" ) var ( poolInfos *models.PoolInfos FlavorInfos *setting.StFlavorInfos ImageInfos *setting.StImageInfosModelArts SpecialPools *models.SpecialPools CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" + "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;" ) type GenerateTrainJobReq struct { JobName string Command string ImageUrl string //与image_id二选一,都有的情况下优先image_url ImageId string DisplayJobName string Uuid string Description string CodeObsPath string BootFile string BootFileUrl string DataUrl string TrainUrl string WorkServerNumber int EngineID int64 CommitID string IsLatestVersion string BranchName string PreVersionId int64 PreVersionName string VersionCount int EngineName string TotalVersionCount int ComputeResource string ProcessType string DatasetNames string DatasetInfos map[string]models.DatasetInfo Params string ModelName string LabelName string CkptName string ModelVersion string PreTrainModelPath string PreTrainModelUrl string Spec *models.Specification CodeName string } type GenerateNotebookJobReq struct { JobName string Command string ImageUrl string ImageId string DisplayJobName string Uuid string Description string CodeStoragePath string CommitID string BranchName string ComputeResource string ProcessType string DatasetNames string DatasetInfos map[string]models.DatasetInfo ModelName string LabelName string CkptName string ModelVersion string PreTrainModelPath string PreTrainModelUrl string Spec *models.Specification CodeName string ModelPath string //参考启智GPU调试, 挂载/model目录用户的模型可以输出到这个目录 ModelStorageType int } func getEndPoint() string { index := strings.Index(setting.Endpoint, "//") endpoint := setting.Endpoint[index+2:] return endpoint } func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset { var datasetGrampus []models.GrampusDataset endPoint := getEndPoint() for _, datasetInfo := range datasetInfos { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: datasetInfo.FullName, Bucket: setting.Bucket, EndPoint: endPoint, ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName, }) } return datasetGrampus } func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) { var datasetGrampus []models.GrampusDataset var command = "" for uuid, datasetInfo := range datasetInfos { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: datasetInfo.FullName, Bucket: setting.Attachment.Minio.Bucket, EndPoint: setting.Attachment.Minio.Endpoint, ObjectKey: datasetInfo.DataLocalPath, ReadOnly: true, ContainerPath: "/dataset1/" + datasetInfo.Name, }) command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';" } return datasetGrampus, command } func getDatasetGCUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) { var datasetGrampus []models.GrampusDataset var command = "" obsEndPoint := getEndPoint() for uuid, datasetInfo := range datasetInfos { if datasetInfo.Type == models.TypeCloudBrainOne { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: datasetInfo.FullName, Bucket: setting.Attachment.Minio.Bucket, EndPoint: setting.Attachment.Minio.Endpoint, ObjectKey: datasetInfo.DataLocalPath, ReadOnly: true, ContainerPath: "/dataset1/" + datasetInfo.Name, }) command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';" } else { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: datasetInfo.FullName, Bucket: setting.Bucket, EndPoint: obsEndPoint, ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName, ContainerPath: "/dataset/" + datasetInfo.Name, }) } } return datasetGrampus, command } func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) { createTime := timeutil.TimeStampNow() var datasetGrampus []models.GrampusDataset var codeGrampus models.GrampusDataset var cpCommand string imageUrl := req.ImageUrl if ProcessorTypeNPU == req.ProcessType { datasetGrampus = getDatasetGrampus(req.DatasetInfos) if len(req.ModelName) != 0 { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: req.ModelName, Bucket: setting.Bucket, EndPoint: getEndPoint(), ReadOnly: true, ObjectKey: req.PreTrainModelPath, }) } codeGrampus = models.GrampusDataset{ Name: req.CodeName, Bucket: setting.Bucket, EndPoint: getEndPoint(), ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip", ReadOnly: false, } imageUrl = "" req.Command = "" } else { if ProcessorTypeGCU == req.ProcessType { datasetGrampus, cpCommand = getDatasetGCUGrampus(req.DatasetInfos) } else { datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos) } if len(req.ModelName) != 0 { if req.ModelStorageType == models.TypeCloudBrainOne { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: req.ModelName, Bucket: setting.Attachment.Minio.Bucket, EndPoint: setting.Attachment.Minio.Endpoint, ObjectKey: req.PreTrainModelPath, ReadOnly: true, ContainerPath: cloudbrain.PretrainModelMountPath, }) } else { datasetGrampus = append(datasetGrampus, models.GrampusDataset{ Name: req.ModelName, Bucket: setting.Bucket, EndPoint: getEndPoint(), ReadOnly: true, ObjectKey: req.PreTrainModelPath, ContainerPath: cloudbrain.PretrainModelMountPath, }) } } codeArchiveName := cloudbrain.DefaultBranchName + ".zip" codeGrampus = models.GrampusDataset{ Name: req.CodeName, Bucket: setting.Attachment.Minio.Bucket, EndPoint: setting.Attachment.Minio.Endpoint, ObjectKey: req.CodeStoragePath + codeArchiveName, ReadOnly: false, ContainerPath: cloudbrain.CodeMountPath, } if ProcessorTypeGCU == req.ProcessType { imageUrl = "" } req.Command = fmt.Sprintf(CommandGpuDebug, cpCommand, setting.CullIdleTimeout, setting.CullIdleTimeout, setting.CullInterval, setting.CullIdleTimeout, setting.CullInterval) log.Info("debug command:" + req.Command) } jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{ Name: req.JobName, Tasks: []models.GrampusNotebookTask{ { Name: req.JobName, ResourceSpecId: req.Spec.SourceSpecId, ImageId: req.ImageId, ImageUrl: imageUrl, Datasets: datasetGrampus, Code: codeGrampus, AutoStopDuration: autoStopDurationMs, Capacity: setting.Capacity, Command: req.Command, CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID), }, }, }) if err != nil { log.Error("createNotebookJob failed: %v", err.Error()) return "", err } jobID := jobResult.JobInfo.JobID err = models.CreateCloudbrain(&models.Cloudbrain{ Status: TransTrainJobStatus(jobResult.JobInfo.Status), UserID: ctx.User.ID, RepoID: ctx.Repo.Repository.ID, JobID: jobID, JobName: req.JobName, DisplayJobName: req.DisplayJobName, JobType: string(models.JobTypeDebug), Type: models.TypeC2Net, Uuid: req.Uuid, DatasetName: req.DatasetNames, CommitID: req.CommitID, IsLatestVersion: "1", ComputeResource: req.ComputeResource, ImageID: req.ImageId, BranchName: req.BranchName, Description: req.Description, WorkServerNumber: 1, EngineName: req.ImageUrl, CreatedUnix: createTime, UpdatedUnix: createTime, Spec: req.Spec, ModelName: req.ModelName, ModelVersion: req.ModelVersion, LabelName: req.LabelName, PreTrainModelUrl: req.PreTrainModelUrl, CkptName: req.CkptName, }) if err != nil { log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error()) return "", err } var actionType models.ActionType if req.ComputeResource == models.NPUResource { actionType = models.ActionCreateGrampusNPUDebugTask } else if req.ComputeResource == models.GPUResource { actionType = models.ActionCreateGrampusGPUDebugTask } else if req.ComputeResource == models.GCUResource { actionType = models.ActionCreateGrampusGCUDebugTask } task, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err.Error()) return "", err } stringId := strconv.FormatInt(task.ID, 10) notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, req.DisplayJobName, actionType) return jobID, nil } func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) { createTime := timeutil.TimeStampNow() var datasetGrampus, modelGrampus []models.GrampusDataset var codeGrampus models.GrampusDataset if ProcessorTypeNPU == req.ProcessType { datasetGrampus = getDatasetGrampus(req.DatasetInfos) if len(req.ModelName) != 0 { modelGrampus = []models.GrampusDataset{ { Name: req.ModelName, Bucket: setting.Bucket, EndPoint: getEndPoint(), ObjectKey: req.PreTrainModelPath, }, } } codeGrampus = models.GrampusDataset{ Name: req.CodeName, Bucket: setting.Bucket, EndPoint: getEndPoint(), ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip", } } jobResult, err := createJob(models.CreateGrampusJobRequest{ Name: req.JobName, Tasks: []models.GrampusTasks{ { Name: req.JobName, Command: req.Command, ResourceSpecId: req.Spec.SourceSpecId, ImageId: req.ImageId, ImageUrl: req.ImageUrl, CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID), ReplicaNum: 1, Datasets: datasetGrampus, Models: modelGrampus, Code: codeGrampus, BootFile: req.BootFile, }, }, }) if err != nil { log.Error("createJob failed: %v", err.Error()) return "", err } jobID := jobResult.JobInfo.JobID err = models.CreateCloudbrain(&models.Cloudbrain{ Status: TransTrainJobStatus(jobResult.JobInfo.Status), UserID: ctx.User.ID, RepoID: ctx.Repo.Repository.ID, JobID: jobID, JobName: req.JobName, DisplayJobName: req.DisplayJobName, JobType: string(models.JobTypeTrain), Type: models.TypeC2Net, Uuid: req.Uuid, DatasetName: req.DatasetNames, CommitID: req.CommitID, IsLatestVersion: req.IsLatestVersion, ComputeResource: req.ComputeResource, ImageID: req.ImageId, TrainUrl: req.TrainUrl, BranchName: req.BranchName, Parameters: req.Params, BootFile: req.BootFile, DataUrl: req.DataUrl, Description: req.Description, WorkServerNumber: req.WorkServerNumber, EngineName: req.EngineName, VersionCount: req.VersionCount, TotalVersionCount: req.TotalVersionCount, CreatedUnix: createTime, UpdatedUnix: createTime, Spec: req.Spec, ModelName: req.ModelName, ModelVersion: req.ModelVersion, LabelName: req.LabelName, PreTrainModelUrl: req.PreTrainModelUrl, CkptName: req.CkptName, }) if err != nil { log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error()) return "", err } var actionType models.ActionType if req.ComputeResource == models.NPUResource { actionType = models.ActionCreateGrampusNPUTrainTask } else if req.ComputeResource == models.GPUResource { actionType = models.ActionCreateGrampusGPUTrainTask } notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType) return jobID, nil } func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) { var centerID []string var centerName []string includeCenters := make(map[string]string) excludeCenters := make(map[string]string) if SpecialPools != nil { for _, pool := range SpecialPools.Pools { if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) { org, _ := models.GetOrgByName(pool.Org) if org != nil { isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) if isOrgMember { for _, info := range pool.Pool { includeCenters[info.Queue] = info.Value } } else { for _, info := range pool.Pool { excludeCenters[info.Queue] = info.Value } } } } } } if len(includeCenters) > 0 { //如果有专属资源池,根据专属资源池指定智算中心 for k, v := range includeCenters { centerID = append(centerID, k) centerName = append(centerName, v) } } else if len(excludeCenters) > 0 { //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心 allCenters := make(map[string]string) specs, err := GetResourceSpecs(req.ProcessType) if err == nil { for _, info := range specs.Infos { for _, center := range info.Centers { allCenters[center.ID] = center.Name } } } for k, _ := range excludeCenters { delete(allCenters, k) } for k, v := range allCenters { centerID = append(centerID, k) centerName = append(centerName, v) } } return centerID, centerName } func TransTrainJobStatus(status string) string { if status == models.GrampusStatusPending { status = models.GrampusStatusWaiting } return strings.ToUpper(status) } func GetNpuModelRemoteObsUrl(jobName string) string { return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName) } func GetNpuModelObjectKey(jobName string) string { return setting.CodePathPrefix + jobName + RemoteModelPath } func GetRemoteEndPoint(aiCenterID string) string { var endPoint string for _, info := range setting.CenterInfos.Info { if info.CenterID == aiCenterID { endPoint = info.Endpoint break } } return endPoint } func GetCenterProxy(aiCenterID string) string { var proxy string for _, info := range setting.CenterInfos.Info { if info.CenterID == aiCenterID { proxy = info.StorageProxyServer break } } return proxy }