|
- package grampus
-
- import (
- "encoding/json"
- "strings"
-
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/cloudbrain"
- "code.gitea.io/gitea/modules/context"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/notification"
- "code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/timeutil"
- )
-
- const (
- JobPath = "job/"
-
- ProcessorTypeNPU = "npu.huawei.com/NPU"
- ProcessorTypeGPU = "nvidia.com/gpu"
-
- GpuWorkDir = "/tmp/"
- NpuWorkDir = "/cache/"
- NpuLocalLogUrl = "/tmp/train.log"
- CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
-
- CodeArchiveName = "master.zip"
-
- BucketRemote = "grampus"
- RemoteModelPath = "/output/" + models.ModelSuffix
- )
-
- var (
- poolInfos *models.PoolInfos
- FlavorInfos *setting.StFlavorInfos
- ImageInfos *setting.StImageInfosModelArts
-
- SpecialPools *models.SpecialPools
-
- CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://openi.pcl.ac.cn/OpenIOSSG/%s/archive/master.zip;" +
- "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
- )
-
- type GenerateTrainJobReq struct {
- JobName string
- Command string
- ImageUrl string //与image_id二选一,都有的情况下优先image_url
- ImageId string
-
- DisplayJobName string
- Uuid string
- Description string
- CodeObsPath string
- BootFile string
- BootFileUrl string
- DataUrl string
- TrainUrl string
- WorkServerNumber int
- EngineID int64
- CommitID string
- IsLatestVersion string
- BranchName string
- PreVersionId int64
- PreVersionName string
- VersionCount int
- EngineName string
- TotalVersionCount int
- ComputeResource string
- ProcessType string
-
- DatasetNames string
- DatasetInfos map[string]models.DatasetInfo
- Params string
- ModelName string
- LabelName string
- CkptName string
- ModelVersion string
- PreTrainModelPath string
- PreTrainModelUrl string
- Spec *models.Specification
- CodeName string
- }
-
- func getEndPoint() string {
- index := strings.Index(setting.Endpoint, "//")
- endpoint := setting.Endpoint[index+2:]
- return endpoint
- }
-
- func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
- var datasetGrampus []models.GrampusDataset
- endPoint := getEndPoint()
- for _, datasetInfo := range datasetInfos {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: datasetInfo.FullName,
- Bucket: setting.Bucket,
- EndPoint: endPoint,
- ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
- })
-
- }
- return datasetGrampus
- }
-
- func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
- createTime := timeutil.TimeStampNow()
-
- centerID, centerName := getCentersParamter(ctx, req)
-
- var datasetGrampus, modelGrampus []models.GrampusDataset
- var codeGrampus models.GrampusDataset
- if ProcessorTypeNPU == req.ProcessType {
- datasetGrampus = getDatasetGrampus(req.DatasetInfos)
- if len(req.ModelName) != 0 {
- modelGrampus = []models.GrampusDataset{
- {
- Name: req.ModelName,
- Bucket: setting.Bucket,
- EndPoint: getEndPoint(),
- ObjectKey: req.PreTrainModelPath,
- },
- }
- }
- codeGrampus = models.GrampusDataset{
- Name: req.CodeName,
- Bucket: setting.Bucket,
- EndPoint: getEndPoint(),
- ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
- }
- }
-
- jobResult, err := createJob(models.CreateGrampusJobRequest{
- Name: req.JobName,
- Tasks: []models.GrampusTasks{
- {
- Name: req.JobName,
- Command: req.Command,
- ResourceSpecId: req.Spec.SourceSpecId,
- ImageId: req.ImageId,
- ImageUrl: req.ImageUrl,
- CenterID: centerID,
- CenterName: centerName,
- ReplicaNum: 1,
- Datasets: datasetGrampus,
- Models: modelGrampus,
- Code: codeGrampus,
- BootFile: req.BootFile,
- },
- },
- })
- if err != nil {
- log.Error("createJob failed: %v", err.Error())
- return "", err
- }
-
- jobID := jobResult.JobInfo.JobID
- err = models.CreateCloudbrain(&models.Cloudbrain{
- Status: TransTrainJobStatus(jobResult.JobInfo.Status),
- UserID: ctx.User.ID,
- RepoID: ctx.Repo.Repository.ID,
- JobID: jobID,
- JobName: req.JobName,
- DisplayJobName: req.DisplayJobName,
- JobType: string(models.JobTypeTrain),
- Type: models.TypeC2Net,
- Uuid: req.Uuid,
- DatasetName: req.DatasetNames,
- CommitID: req.CommitID,
- IsLatestVersion: req.IsLatestVersion,
- ComputeResource: req.ComputeResource,
- ImageID: req.ImageId,
- TrainUrl: req.TrainUrl,
- BranchName: req.BranchName,
- Parameters: req.Params,
- BootFile: req.BootFile,
- DataUrl: req.DataUrl,
- Description: req.Description,
- WorkServerNumber: req.WorkServerNumber,
- EngineName: req.EngineName,
- VersionCount: req.VersionCount,
- TotalVersionCount: req.TotalVersionCount,
- CreatedUnix: createTime,
- UpdatedUnix: createTime,
- Spec: req.Spec,
- ModelName: req.ModelName,
- ModelVersion: req.ModelVersion,
- LabelName: req.LabelName,
- PreTrainModelUrl: req.PreTrainModelUrl,
- CkptName: req.CkptName,
- })
-
- if err != nil {
- log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
- return "", err
- }
-
- var actionType models.ActionType
- if req.ComputeResource == models.NPUResource {
- actionType = models.ActionCreateGrampusNPUTrainTask
- } else if req.ComputeResource == models.GPUResource {
- actionType = models.ActionCreateGrampusGPUTrainTask
- }
- notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
-
- return jobID, nil
- }
-
- func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
- var centerID []string
- var centerName []string
-
- includeCenters := make(map[string]string)
- excludeCenters := make(map[string]string)
-
- if SpecialPools != nil {
- for _, pool := range SpecialPools.Pools {
- if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
- org, _ := models.GetOrgByName(pool.Org)
- if org != nil {
- isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
- if isOrgMember {
- for _, info := range pool.Pool {
- includeCenters[info.Queue] = info.Value
- }
- } else {
- for _, info := range pool.Pool {
- excludeCenters[info.Queue] = info.Value
- }
- }
- }
- }
- }
-
- }
-
- if len(includeCenters) > 0 {
- //如果有专属资源池,根据专属资源池指定智算中心
- for k, v := range includeCenters {
- centerID = append(centerID, k)
- centerName = append(centerName, v)
- }
- } else if len(excludeCenters) > 0 {
- //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
- allCenters := make(map[string]string)
- specs, err := GetResourceSpecs(req.ProcessType)
- if err == nil {
- for _, info := range specs.Infos {
- for _, center := range info.Centers {
- allCenters[center.ID] = center.Name
- }
-
- }
- }
-
- for k, _ := range excludeCenters {
- delete(allCenters, k)
- }
-
- for k, v := range allCenters {
- centerID = append(centerID, k)
- centerName = append(centerName, v)
- }
-
- }
- return centerID, centerName
- }
-
- func TransTrainJobStatus(status string) string {
- if status == models.GrampusStatusPending {
- status = models.GrampusStatusWaiting
- }
-
- return strings.ToUpper(status)
- }
- func InitSpecialPool() {
- if SpecialPools == nil && setting.Grampus.SpecialPools != "" {
- json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
- }
- }
-
- func GetNpuModelRemoteObsUrl(jobName string) string {
- return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
- }
-
- func GetNpuModelObjectKey(jobName string) string {
- return setting.CodePathPrefix + jobName + RemoteModelPath
- }
-
- func GetRemoteEndPoint(aiCenterID string) string {
- var endPoint string
- for _, info := range setting.CenterInfos.Info {
- if info.CenterID == aiCenterID {
- endPoint = info.Endpoint
- break
- }
- }
-
- return endPoint
- }
-
- func GetCenterProxy(aiCenterID string) string {
- var proxy string
- for _, info := range setting.CenterInfos.Info {
- if info.CenterID == aiCenterID {
- proxy = info.StorageProxyServer
- break
- }
- }
-
- return proxy
- }
|