|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532 |
- package grampus
-
- import (
- "fmt"
- "strconv"
- "strings"
-
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/cloudbrain"
- "code.gitea.io/gitea/modules/context"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/notification"
- "code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/timeutil"
- )
-
- const (
- JobPath = "job/"
-
- ProcessorTypeNPU = "npu.huawei.com/NPU"
- ProcessorTypeGPU = "nvidia.com/gpu"
- ProcessorTypeGCU = "enflame-tech.com/gcu"
-
- GpuWorkDir = "/tmp/"
- NpuWorkDir = "/cache/"
- NpuLocalLogUrl = "/tmp/train.log"
- CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
-
- CodeArchiveName = "master.zip"
-
- BucketRemote = "grampus"
- RemoteModelPath = "/output/" + models.ModelSuffix
- autoStopDurationMs = 4 * 60 * 60 * 1000
- CommandGpuDebug = "mkdir -p /dataset;%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --ServerApp.shutdown_no_activity_timeout=%s --TerminalManager.cull_inactive_timeout=%s --TerminalManager.cull_interval=%s --MappingKernelManager.cull_idle_timeout=%s --MappingKernelManager.cull_interval=%s --MappingKernelManager.cull_connected=True --MappingKernelManager.cull_busy=True --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;"
- )
-
- var (
- poolInfos *models.PoolInfos
- FlavorInfos *setting.StFlavorInfos
- ImageInfos *setting.StImageInfosModelArts
-
- SpecialPools *models.SpecialPools
-
- CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
- "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
- )
-
- type GenerateTrainJobReq struct {
- JobName string
- Command string
- ImageUrl string //与image_id二选一,都有的情况下优先image_url
- ImageId string
-
- DisplayJobName string
- Uuid string
- Description string
- CodeObsPath string
- BootFile string
- BootFileUrl string
- DataUrl string
- TrainUrl string
- WorkServerNumber int
- EngineID int64
- CommitID string
- IsLatestVersion string
- BranchName string
- PreVersionId int64
- PreVersionName string
- VersionCount int
- EngineName string
- TotalVersionCount int
- ComputeResource string
- ProcessType string
-
- DatasetNames string
- DatasetInfos map[string]models.DatasetInfo
- Params string
- ModelName string
- LabelName string
- CkptName string
- ModelVersion string
- PreTrainModelPath string
- PreTrainModelUrl string
- Spec *models.Specification
- CodeName string
- }
-
- type GenerateNotebookJobReq struct {
- JobName string
- Command string
- ImageUrl string
- ImageId string
- DisplayJobName string
- Uuid string
- Description string
- CodeStoragePath string
- CommitID string
- BranchName string
- ComputeResource string
- ProcessType string
- DatasetNames string
- DatasetInfos map[string]models.DatasetInfo
- ModelName string
- LabelName string
- CkptName string
- ModelVersion string
- PreTrainModelPath string
- PreTrainModelUrl string
- Spec *models.Specification
- CodeName string
- ModelPath string //参考启智GPU调试, 挂载/model目录用户的模型可以输出到这个目录
- ModelStorageType int
- }
-
- func getEndPoint() string {
- index := strings.Index(setting.Endpoint, "//")
- endpoint := setting.Endpoint[index+2:]
- return endpoint
- }
-
- func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
- var datasetGrampus []models.GrampusDataset
- endPoint := getEndPoint()
- for _, datasetInfo := range datasetInfos {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: datasetInfo.FullName,
- Bucket: setting.Bucket,
- EndPoint: endPoint,
- ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
- })
-
- }
- return datasetGrampus
- }
- func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
- var datasetGrampus []models.GrampusDataset
- var command = ""
- for uuid, datasetInfo := range datasetInfos {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: datasetInfo.FullName,
- Bucket: setting.Attachment.Minio.Bucket,
- EndPoint: setting.Attachment.Minio.Endpoint,
- ObjectKey: datasetInfo.DataLocalPath,
- ReadOnly: true,
- ContainerPath: "/dataset1/" + datasetInfo.Name,
- })
-
- command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
-
- }
- return datasetGrampus, command
- }
- func getDatasetGCUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
- var datasetGrampus []models.GrampusDataset
- var command = ""
- obsEndPoint := getEndPoint()
- for uuid, datasetInfo := range datasetInfos {
- if datasetInfo.Type == models.TypeCloudBrainOne {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: datasetInfo.FullName,
- Bucket: setting.Attachment.Minio.Bucket,
- EndPoint: setting.Attachment.Minio.Endpoint,
- ObjectKey: datasetInfo.DataLocalPath,
- ReadOnly: true,
- ContainerPath: "/dataset1/" + datasetInfo.Name,
- })
-
- command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
-
- } else {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: datasetInfo.FullName,
- Bucket: setting.Bucket,
- EndPoint: obsEndPoint,
- ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
- ContainerPath: "/dataset/" + datasetInfo.Name,
- })
- }
-
- }
- return datasetGrampus, command
- }
-
- func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
- createTime := timeutil.TimeStampNow()
-
- var datasetGrampus []models.GrampusDataset
- var codeGrampus models.GrampusDataset
- var cpCommand string
- imageUrl := req.ImageUrl
- if ProcessorTypeNPU == req.ProcessType {
- datasetGrampus = getDatasetGrampus(req.DatasetInfos)
- if len(req.ModelName) != 0 {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: req.ModelName,
- Bucket: setting.Bucket,
- EndPoint: getEndPoint(),
- ReadOnly: true,
- ObjectKey: req.PreTrainModelPath,
- })
- }
-
- codeGrampus = models.GrampusDataset{
- Name: req.CodeName,
- Bucket: setting.Bucket,
- EndPoint: getEndPoint(),
- ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
- ReadOnly: false,
- }
- imageUrl = ""
- req.Command = ""
- } else {
- if ProcessorTypeGCU == req.ProcessType {
- datasetGrampus, cpCommand = getDatasetGCUGrampus(req.DatasetInfos)
- } else {
- datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos)
- }
- if len(req.ModelName) != 0 {
- if req.ModelStorageType == models.TypeCloudBrainOne {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: req.ModelName,
- Bucket: setting.Attachment.Minio.Bucket,
- EndPoint: setting.Attachment.Minio.Endpoint,
- ObjectKey: req.PreTrainModelPath,
- ReadOnly: true,
- ContainerPath: cloudbrain.PretrainModelMountPath,
- })
- } else {
- datasetGrampus = append(datasetGrampus, models.GrampusDataset{
- Name: req.ModelName,
- Bucket: setting.Bucket,
- EndPoint: getEndPoint(),
- ReadOnly: true,
- ObjectKey: req.PreTrainModelPath,
- ContainerPath: cloudbrain.PretrainModelMountPath,
- })
- }
-
- }
- codeArchiveName := cloudbrain.DefaultBranchName + ".zip"
- codeGrampus = models.GrampusDataset{
- Name: req.CodeName,
- Bucket: setting.Attachment.Minio.Bucket,
- EndPoint: setting.Attachment.Minio.Endpoint,
- ObjectKey: req.CodeStoragePath + codeArchiveName,
- ReadOnly: false,
- ContainerPath: cloudbrain.CodeMountPath,
- }
- if ProcessorTypeGCU == req.ProcessType {
- imageUrl = ""
- }
- req.Command = fmt.Sprintf(CommandGpuDebug, cpCommand, setting.CullIdleTimeout, setting.CullIdleTimeout, setting.CullInterval, setting.CullIdleTimeout, setting.CullInterval)
- log.Info("debug command:" + req.Command)
-
- }
-
- jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
- Name: req.JobName,
- Tasks: []models.GrampusNotebookTask{
- {
- Name: req.JobName,
- ResourceSpecId: req.Spec.SourceSpecId,
- ImageId: req.ImageId,
- ImageUrl: imageUrl,
- Datasets: datasetGrampus,
- Code: codeGrampus,
- AutoStopDuration: autoStopDurationMs,
- Capacity: setting.Capacity,
- Command: req.Command,
- CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
- },
- },
- })
- if err != nil {
- log.Error("createNotebookJob failed: %v", err.Error())
- return "", err
- }
-
- jobID := jobResult.JobInfo.JobID
- err = models.CreateCloudbrain(&models.Cloudbrain{
- Status: TransTrainJobStatus(jobResult.JobInfo.Status),
- UserID: ctx.User.ID,
- RepoID: ctx.Repo.Repository.ID,
- JobID: jobID,
- JobName: req.JobName,
- DisplayJobName: req.DisplayJobName,
- JobType: string(models.JobTypeDebug),
- Type: models.TypeC2Net,
- Uuid: req.Uuid,
- DatasetName: req.DatasetNames,
- CommitID: req.CommitID,
- IsLatestVersion: "1",
- ComputeResource: req.ComputeResource,
- ImageID: req.ImageId,
- BranchName: req.BranchName,
- Description: req.Description,
- WorkServerNumber: 1,
- EngineName: req.ImageUrl,
- CreatedUnix: createTime,
- UpdatedUnix: createTime,
- Spec: req.Spec,
- ModelName: req.ModelName,
- ModelVersion: req.ModelVersion,
- LabelName: req.LabelName,
- PreTrainModelUrl: req.PreTrainModelUrl,
- CkptName: req.CkptName,
- })
-
- if err != nil {
- log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
- return "", err
- }
-
- var actionType models.ActionType
- if req.ComputeResource == models.NPUResource {
- actionType = models.ActionCreateGrampusNPUDebugTask
- } else if req.ComputeResource == models.GPUResource {
- actionType = models.ActionCreateGrampusGPUDebugTask
- } else if req.ComputeResource == models.GCUResource {
- actionType = models.ActionCreateGrampusGCUDebugTask
- }
- task, err := models.GetCloudbrainByJobID(jobID)
- if err != nil {
- log.Error("GetCloudbrainByJobID failed: %v", err.Error())
- return "", err
- }
-
- stringId := strconv.FormatInt(task.ID, 10)
- notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, req.DisplayJobName, actionType)
-
- return jobID, nil
- }
-
- func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
- createTime := timeutil.TimeStampNow()
-
- var datasetGrampus, modelGrampus []models.GrampusDataset
- var codeGrampus models.GrampusDataset
- if ProcessorTypeNPU == req.ProcessType {
- datasetGrampus = getDatasetGrampus(req.DatasetInfos)
- if len(req.ModelName) != 0 {
- modelGrampus = []models.GrampusDataset{
- {
- Name: req.ModelName,
- Bucket: setting.Bucket,
- EndPoint: getEndPoint(),
- ObjectKey: req.PreTrainModelPath,
- },
- }
- }
- codeGrampus = models.GrampusDataset{
- Name: req.CodeName,
- Bucket: setting.Bucket,
- EndPoint: getEndPoint(),
- ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
- }
- }
-
- jobResult, err := createJob(models.CreateGrampusJobRequest{
- Name: req.JobName,
- Tasks: []models.GrampusTasks{
- {
- Name: req.JobName,
- Command: req.Command,
- ResourceSpecId: req.Spec.SourceSpecId,
- ImageId: req.ImageId,
- ImageUrl: req.ImageUrl,
- CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
- ReplicaNum: 1,
- Datasets: datasetGrampus,
- Models: modelGrampus,
- Code: codeGrampus,
- BootFile: req.BootFile,
- },
- },
- })
- if err != nil {
- log.Error("createJob failed: %v", err.Error())
- return "", err
- }
-
- jobID := jobResult.JobInfo.JobID
- err = models.CreateCloudbrain(&models.Cloudbrain{
- Status: TransTrainJobStatus(jobResult.JobInfo.Status),
- UserID: ctx.User.ID,
- RepoID: ctx.Repo.Repository.ID,
- JobID: jobID,
- JobName: req.JobName,
- DisplayJobName: req.DisplayJobName,
- JobType: string(models.JobTypeTrain),
- Type: models.TypeC2Net,
- Uuid: req.Uuid,
- DatasetName: req.DatasetNames,
- CommitID: req.CommitID,
- IsLatestVersion: req.IsLatestVersion,
- ComputeResource: req.ComputeResource,
- ImageID: req.ImageId,
- TrainUrl: req.TrainUrl,
- BranchName: req.BranchName,
- Parameters: req.Params,
- BootFile: req.BootFile,
- DataUrl: req.DataUrl,
- Description: req.Description,
- WorkServerNumber: req.WorkServerNumber,
- EngineName: req.EngineName,
- VersionCount: req.VersionCount,
- TotalVersionCount: req.TotalVersionCount,
- CreatedUnix: createTime,
- UpdatedUnix: createTime,
- Spec: req.Spec,
- ModelName: req.ModelName,
- ModelVersion: req.ModelVersion,
- LabelName: req.LabelName,
- PreTrainModelUrl: req.PreTrainModelUrl,
- CkptName: req.CkptName,
- })
-
- if err != nil {
- log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
- return "", err
- }
-
- var actionType models.ActionType
- if req.ComputeResource == models.NPUResource {
- actionType = models.ActionCreateGrampusNPUTrainTask
- } else if req.ComputeResource == models.GPUResource {
- actionType = models.ActionCreateGrampusGPUTrainTask
- }
- notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
-
- return jobID, nil
- }
-
- func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
- var centerID []string
- var centerName []string
-
- includeCenters := make(map[string]string)
- excludeCenters := make(map[string]string)
-
- if SpecialPools != nil {
- for _, pool := range SpecialPools.Pools {
- if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
- org, _ := models.GetOrgByName(pool.Org)
- if org != nil {
- isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
- if isOrgMember {
- for _, info := range pool.Pool {
- includeCenters[info.Queue] = info.Value
- }
- } else {
- for _, info := range pool.Pool {
- excludeCenters[info.Queue] = info.Value
- }
- }
- }
- }
- }
-
- }
-
- if len(includeCenters) > 0 {
- //如果有专属资源池,根据专属资源池指定智算中心
- for k, v := range includeCenters {
- centerID = append(centerID, k)
- centerName = append(centerName, v)
- }
- } else if len(excludeCenters) > 0 {
- //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
- allCenters := make(map[string]string)
- specs, err := GetResourceSpecs(req.ProcessType)
- if err == nil {
- for _, info := range specs.Infos {
- for _, center := range info.Centers {
- allCenters[center.ID] = center.Name
- }
-
- }
- }
-
- for k, _ := range excludeCenters {
- delete(allCenters, k)
- }
-
- for k, v := range allCenters {
- centerID = append(centerID, k)
- centerName = append(centerName, v)
- }
-
- }
- return centerID, centerName
- }
-
- func TransTrainJobStatus(status string) string {
- if status == models.GrampusStatusPending {
- status = models.GrampusStatusWaiting
- }
-
- return strings.ToUpper(status)
- }
-
- func GetNpuModelRemoteObsUrl(jobName string) string {
- return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
- }
-
- func GetNpuModelObjectKey(jobName string) string {
- return setting.CodePathPrefix + jobName + RemoteModelPath
- }
-
- func GetRemoteEndPoint(aiCenterID string) string {
- var endPoint string
- for _, info := range setting.CenterInfos.Info {
- if info.CenterID == aiCenterID {
- endPoint = info.Endpoint
- break
- }
- }
-
- return endPoint
- }
-
- func GetCenterProxy(aiCenterID string) string {
- var proxy string
- for _, info := range setting.CenterInfos.Info {
- if info.CenterID == aiCenterID {
- proxy = info.StorageProxyServer
- break
- }
- }
-
- return proxy
- }
|