You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 17 kB

3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package grampus
  2. import (
  3. "fmt"
  4. "strings"
  5. "code.gitea.io/gitea/models"
  6. "code.gitea.io/gitea/modules/cloudbrain"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/setting"
  11. "code.gitea.io/gitea/modules/timeutil"
  12. )
  13. const (
  14. JobPath = "job/"
  15. ProcessorTypeNPU = "npu.huawei.com/NPU"
  16. ProcessorTypeGPU = "nvidia.com/gpu"
  17. ProcessorTypeGCU = "enflame-tech.com/gcu"
  18. GpuWorkDir = "/tmp/"
  19. NpuWorkDir = "/cache/"
  20. NpuLocalLogUrl = "/tmp/train.log"
  21. CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
  22. CodeArchiveName = "master.zip"
  23. BucketRemote = "grampus"
  24. RemoteModelPath = "/output/" + models.ModelSuffix
  25. autoStopDurationMs = 4 * 60 * 60 * 1000
  26. CommandGpuDebug = "mkdir -p /dataset;%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --ServerApp.shutdown_no_activity_timeout=%s --TerminalManager.cull_inactive_timeout=%s --TerminalManager.cull_interval=%s --MappingKernelManager.cull_idle_timeout=%s --MappingKernelManager.cull_interval=%s --MappingKernelManager.cull_connected=True --MappingKernelManager.cull_busy=True --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;"
  27. CommandGrampusDebug = "unzip -d %s %s;rm %s;" + CommandGpuDebug
  28. )
  29. var (
  30. poolInfos *models.PoolInfos
  31. FlavorInfos *setting.StFlavorInfos
  32. ImageInfos *setting.StImageInfosModelArts
  33. SpecialPools *models.SpecialPools
  34. CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
  35. "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  36. )
  37. type GenerateTrainJobReq struct {
  38. JobName string
  39. Command string
  40. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  41. ImageId string
  42. DisplayJobName string
  43. Uuid string
  44. Description string
  45. CodeObsPath string
  46. BootFile string
  47. BootFileUrl string
  48. DataUrl string
  49. TrainUrl string
  50. WorkServerNumber int
  51. EngineID int64
  52. CommitID string
  53. IsLatestVersion string
  54. BranchName string
  55. PreVersionId int64
  56. PreVersionName string
  57. VersionCount int
  58. EngineName string
  59. TotalVersionCount int
  60. ComputeResource string
  61. ProcessType string
  62. DatasetNames string
  63. DatasetInfos map[string]models.DatasetInfo
  64. Params string
  65. ModelName string
  66. LabelName string
  67. CkptName string
  68. ModelVersion string
  69. PreTrainModelPath string
  70. PreTrainModelUrl string
  71. Spec *models.Specification
  72. CodeName string
  73. }
  74. type GenerateNotebookJobReq struct {
  75. JobName string
  76. Command string
  77. ImageUrl string
  78. ImageId string
  79. DisplayJobName string
  80. Uuid string
  81. Description string
  82. CodeStoragePath string
  83. CommitID string
  84. BranchName string
  85. ComputeResource string
  86. ProcessType string
  87. DatasetNames string
  88. DatasetInfos map[string]models.DatasetInfo
  89. ModelName string
  90. LabelName string
  91. CkptName string
  92. ModelVersion string
  93. PreTrainModelPath string
  94. PreTrainModelUrl string
  95. Spec *models.Specification
  96. CodeName string
  97. ModelPath string //参考启智GPU调试, 挂载/model目录用户的模型可以输出到这个目录
  98. ModelStorageType int
  99. }
  100. func getEndPoint() string {
  101. index := strings.Index(setting.Endpoint, "//")
  102. endpoint := setting.Endpoint[index+2:]
  103. return endpoint
  104. }
  105. func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
  106. var datasetGrampus []models.GrampusDataset
  107. endPoint := getEndPoint()
  108. for _, datasetInfo := range datasetInfos {
  109. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  110. Name: datasetInfo.FullName,
  111. Bucket: setting.Bucket,
  112. EndPoint: endPoint,
  113. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  114. })
  115. }
  116. return datasetGrampus
  117. }
  118. func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  119. var datasetGrampus []models.GrampusDataset
  120. var command = ""
  121. for uuid, datasetInfo := range datasetInfos {
  122. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  123. Name: datasetInfo.FullName,
  124. Bucket: setting.Attachment.Minio.Bucket,
  125. EndPoint: setting.Attachment.Minio.Endpoint,
  126. ObjectKey: datasetInfo.DataLocalPath,
  127. ReadOnly: true,
  128. ContainerPath: "/dataset1/" + datasetInfo.Name,
  129. })
  130. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  131. }
  132. return datasetGrampus, command
  133. }
  134. func getDatasetGCUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  135. var datasetGrampus []models.GrampusDataset
  136. var command = ""
  137. obsEndPoint := getEndPoint()
  138. for uuid, datasetInfo := range datasetInfos {
  139. if datasetInfo.Type == models.TypeCloudBrainOne {
  140. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  141. Name: datasetInfo.FullName,
  142. Bucket: setting.Attachment.Minio.Bucket,
  143. EndPoint: setting.Attachment.Minio.Endpoint,
  144. ObjectKey: datasetInfo.DataLocalPath,
  145. ReadOnly: true,
  146. ContainerPath: "/dataset1/" + datasetInfo.Name,
  147. })
  148. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  149. } else {
  150. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  151. Name: datasetInfo.FullName,
  152. Bucket: setting.Bucket,
  153. EndPoint: obsEndPoint,
  154. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  155. ContainerPath: "/dataset/" + datasetInfo.Name,
  156. })
  157. }
  158. }
  159. return datasetGrampus, command
  160. }
  161. func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
  162. createTime := timeutil.TimeStampNow()
  163. var datasetGrampus []models.GrampusDataset
  164. var codeGrampus models.GrampusDataset
  165. var cpCommand string
  166. imageUrl := req.ImageUrl
  167. if ProcessorTypeNPU == req.ProcessType {
  168. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  169. if len(req.ModelName) != 0 {
  170. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  171. Name: req.ModelName,
  172. Bucket: setting.Bucket,
  173. EndPoint: getEndPoint(),
  174. ReadOnly: true,
  175. ObjectKey: req.PreTrainModelPath,
  176. })
  177. }
  178. codeGrampus = models.GrampusDataset{
  179. Name: req.CodeName,
  180. Bucket: setting.Bucket,
  181. EndPoint: getEndPoint(),
  182. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  183. ReadOnly: false,
  184. }
  185. imageUrl = ""
  186. req.Command = ""
  187. } else {
  188. if ProcessorTypeGCU == req.ProcessType {
  189. datasetGrampus, cpCommand = getDatasetGCUGrampus(req.DatasetInfos)
  190. } else {
  191. datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos)
  192. }
  193. if len(req.ModelName) != 0 {
  194. if req.ModelStorageType == models.TypeCloudBrainOne {
  195. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  196. Name: req.ModelName,
  197. Bucket: setting.Attachment.Minio.Bucket,
  198. EndPoint: setting.Attachment.Minio.Endpoint,
  199. ObjectKey: req.PreTrainModelPath,
  200. ReadOnly: true,
  201. ContainerPath: cloudbrain.PretrainModelMountPath,
  202. })
  203. } else {
  204. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  205. Name: req.ModelName,
  206. Bucket: setting.Bucket,
  207. EndPoint: getEndPoint(),
  208. ReadOnly: true,
  209. ObjectKey: req.PreTrainModelPath,
  210. ContainerPath: cloudbrain.PretrainModelMountPath,
  211. })
  212. }
  213. }
  214. codeArchiveName := cloudbrain.DefaultBranchName + ".zip"
  215. codeGrampus = models.GrampusDataset{
  216. Name: req.CodeName,
  217. Bucket: setting.Attachment.Minio.Bucket,
  218. EndPoint: setting.Attachment.Minio.Endpoint,
  219. ObjectKey: req.CodeStoragePath + codeArchiveName,
  220. ReadOnly: false,
  221. ContainerPath: cloudbrain.CodeMountPath,
  222. }
  223. if ProcessorTypeGCU == req.ProcessType {
  224. imageUrl = ""
  225. }
  226. codeArchiveContainerPath := cloudbrain.CodeMountPath + "/" + codeArchiveName
  227. req.Command = fmt.Sprintf(CommandGrampusDebug, cloudbrain.CodeMountPath, codeArchiveContainerPath, codeArchiveContainerPath, cpCommand, setting.CullIdleTimeout, setting.CullIdleTimeout, setting.CullInterval, setting.CullIdleTimeout, setting.CullInterval)
  228. log.Info("debug command:" + req.Command)
  229. }
  230. jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
  231. Name: req.JobName,
  232. Tasks: []models.GrampusNotebookTask{
  233. {
  234. Name: req.JobName,
  235. ResourceSpecId: req.Spec.SourceSpecId,
  236. ImageId: req.ImageId,
  237. ImageUrl: imageUrl,
  238. Datasets: datasetGrampus,
  239. Code: codeGrampus,
  240. AutoStopDuration: autoStopDurationMs,
  241. Capacity: setting.Capacity,
  242. Command: req.Command,
  243. CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
  244. },
  245. },
  246. })
  247. if err != nil {
  248. log.Error("createNotebookJob failed: %v", err.Error())
  249. return "", err
  250. }
  251. jobID := jobResult.JobInfo.JobID
  252. err = models.CreateCloudbrain(&models.Cloudbrain{
  253. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  254. UserID: ctx.User.ID,
  255. RepoID: ctx.Repo.Repository.ID,
  256. JobID: jobID,
  257. JobName: req.JobName,
  258. DisplayJobName: req.DisplayJobName,
  259. JobType: string(models.JobTypeDebug),
  260. Type: models.TypeC2Net,
  261. Uuid: req.Uuid,
  262. DatasetName: req.DatasetNames,
  263. CommitID: req.CommitID,
  264. IsLatestVersion: "1",
  265. ComputeResource: req.ComputeResource,
  266. ImageID: req.ImageId,
  267. BranchName: req.BranchName,
  268. Description: req.Description,
  269. WorkServerNumber: 1,
  270. EngineName: req.ImageUrl,
  271. CreatedUnix: createTime,
  272. UpdatedUnix: createTime,
  273. Spec: req.Spec,
  274. ModelName: req.ModelName,
  275. ModelVersion: req.ModelVersion,
  276. LabelName: req.LabelName,
  277. PreTrainModelUrl: req.PreTrainModelUrl,
  278. CkptName: req.CkptName,
  279. })
  280. if err != nil {
  281. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  282. return "", err
  283. }
  284. var actionType models.ActionType
  285. if req.ComputeResource == models.NPUResource {
  286. actionType = models.ActionCreateGrampusNPUDebugTask
  287. } else if req.ComputeResource == models.GPUResource {
  288. actionType = models.ActionCreateGrampusGPUDebugTask
  289. } else if req.ComputeResource == models.GCUResource {
  290. actionType = models.ActionCreateGrampusGCUDebugTask
  291. }
  292. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  293. return jobID, nil
  294. }
  295. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
  296. createTime := timeutil.TimeStampNow()
  297. var datasetGrampus, modelGrampus []models.GrampusDataset
  298. var codeGrampus models.GrampusDataset
  299. if ProcessorTypeNPU == req.ProcessType {
  300. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  301. if len(req.ModelName) != 0 {
  302. modelGrampus = []models.GrampusDataset{
  303. {
  304. Name: req.ModelName,
  305. Bucket: setting.Bucket,
  306. EndPoint: getEndPoint(),
  307. ObjectKey: req.PreTrainModelPath,
  308. },
  309. }
  310. }
  311. codeGrampus = models.GrampusDataset{
  312. Name: req.CodeName,
  313. Bucket: setting.Bucket,
  314. EndPoint: getEndPoint(),
  315. ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
  316. }
  317. }
  318. jobResult, err := createJob(models.CreateGrampusJobRequest{
  319. Name: req.JobName,
  320. Tasks: []models.GrampusTasks{
  321. {
  322. Name: req.JobName,
  323. Command: req.Command,
  324. ResourceSpecId: req.Spec.SourceSpecId,
  325. ImageId: req.ImageId,
  326. ImageUrl: req.ImageUrl,
  327. CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
  328. ReplicaNum: 1,
  329. Datasets: datasetGrampus,
  330. Models: modelGrampus,
  331. Code: codeGrampus,
  332. BootFile: req.BootFile,
  333. },
  334. },
  335. })
  336. if err != nil {
  337. log.Error("createJob failed: %v", err.Error())
  338. return "", err
  339. }
  340. jobID := jobResult.JobInfo.JobID
  341. err = models.CreateCloudbrain(&models.Cloudbrain{
  342. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  343. UserID: ctx.User.ID,
  344. RepoID: ctx.Repo.Repository.ID,
  345. JobID: jobID,
  346. JobName: req.JobName,
  347. DisplayJobName: req.DisplayJobName,
  348. JobType: string(models.JobTypeTrain),
  349. Type: models.TypeC2Net,
  350. Uuid: req.Uuid,
  351. DatasetName: req.DatasetNames,
  352. CommitID: req.CommitID,
  353. IsLatestVersion: req.IsLatestVersion,
  354. ComputeResource: req.ComputeResource,
  355. ImageID: req.ImageId,
  356. TrainUrl: req.TrainUrl,
  357. BranchName: req.BranchName,
  358. Parameters: req.Params,
  359. BootFile: req.BootFile,
  360. DataUrl: req.DataUrl,
  361. Description: req.Description,
  362. WorkServerNumber: req.WorkServerNumber,
  363. EngineName: req.EngineName,
  364. VersionCount: req.VersionCount,
  365. TotalVersionCount: req.TotalVersionCount,
  366. CreatedUnix: createTime,
  367. UpdatedUnix: createTime,
  368. Spec: req.Spec,
  369. ModelName: req.ModelName,
  370. ModelVersion: req.ModelVersion,
  371. LabelName: req.LabelName,
  372. PreTrainModelUrl: req.PreTrainModelUrl,
  373. CkptName: req.CkptName,
  374. })
  375. if err != nil {
  376. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  377. return "", err
  378. }
  379. var actionType models.ActionType
  380. if req.ComputeResource == models.NPUResource {
  381. actionType = models.ActionCreateGrampusNPUTrainTask
  382. } else if req.ComputeResource == models.GPUResource {
  383. actionType = models.ActionCreateGrampusGPUTrainTask
  384. }
  385. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  386. return jobID, nil
  387. }
  388. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  389. var centerID []string
  390. var centerName []string
  391. includeCenters := make(map[string]string)
  392. excludeCenters := make(map[string]string)
  393. if SpecialPools != nil {
  394. for _, pool := range SpecialPools.Pools {
  395. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  396. org, _ := models.GetOrgByName(pool.Org)
  397. if org != nil {
  398. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  399. if isOrgMember {
  400. for _, info := range pool.Pool {
  401. includeCenters[info.Queue] = info.Value
  402. }
  403. } else {
  404. for _, info := range pool.Pool {
  405. excludeCenters[info.Queue] = info.Value
  406. }
  407. }
  408. }
  409. }
  410. }
  411. }
  412. if len(includeCenters) > 0 {
  413. //如果有专属资源池,根据专属资源池指定智算中心
  414. for k, v := range includeCenters {
  415. centerID = append(centerID, k)
  416. centerName = append(centerName, v)
  417. }
  418. } else if len(excludeCenters) > 0 {
  419. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  420. allCenters := make(map[string]string)
  421. specs, err := GetResourceSpecs(req.ProcessType)
  422. if err == nil {
  423. for _, info := range specs.Infos {
  424. for _, center := range info.Centers {
  425. allCenters[center.ID] = center.Name
  426. }
  427. }
  428. }
  429. for k, _ := range excludeCenters {
  430. delete(allCenters, k)
  431. }
  432. for k, v := range allCenters {
  433. centerID = append(centerID, k)
  434. centerName = append(centerName, v)
  435. }
  436. }
  437. return centerID, centerName
  438. }
  439. func TransTrainJobStatus(status string) string {
  440. if status == models.GrampusStatusPending {
  441. status = models.GrampusStatusWaiting
  442. }
  443. return strings.ToUpper(status)
  444. }
  445. func GetNpuModelRemoteObsUrl(jobName string) string {
  446. return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
  447. }
  448. func GetNpuModelObjectKey(jobName string) string {
  449. return setting.CodePathPrefix + jobName + RemoteModelPath
  450. }
  451. func GetRemoteEndPoint(aiCenterID string) string {
  452. var endPoint string
  453. for _, info := range setting.CenterInfos.Info {
  454. if info.CenterID == aiCenterID {
  455. endPoint = info.Endpoint
  456. break
  457. }
  458. }
  459. return endPoint
  460. }
  461. func GetCenterProxy(aiCenterID string) string {
  462. var proxy string
  463. for _, info := range setting.CenterInfos.Info {
  464. if info.CenterID == aiCenterID {
  465. proxy = info.StorageProxyServer
  466. break
  467. }
  468. }
  469. return proxy
  470. }