You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 16 kB

3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package grampus
  2. import (
  3. "fmt"
  4. "strings"
  5. "code.gitea.io/gitea/models"
  6. "code.gitea.io/gitea/modules/cloudbrain"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/setting"
  11. "code.gitea.io/gitea/modules/timeutil"
  12. )
  13. const (
  14. JobPath = "job/"
  15. ProcessorTypeNPU = "npu.huawei.com/NPU"
  16. ProcessorTypeGPU = "nvidia.com/gpu"
  17. ProcessorTypeGCU = "enflame-tech.com/gcu"
  18. GpuWorkDir = "/tmp/"
  19. NpuWorkDir = "/cache/"
  20. NpuLocalLogUrl = "/tmp/train.log"
  21. CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
  22. CodeArchiveName = "master.zip"
  23. BucketRemote = "grampus"
  24. RemoteModelPath = "/output/" + models.ModelSuffix
  25. autoStopDurationMs = 4 * 60 * 60 * 1000
  26. CommandGpuDebug = "mkdir -p /dataset;%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --ServerApp.shutdown_no_activity_timeout=%s --TerminalManager.cull_inactive_timeout=%s --TerminalManager.cull_interval=%s --MappingKernelManager.cull_idle_timeout=%s --MappingKernelManager.cull_interval=%s --MappingKernelManager.cull_connected=True --MappingKernelManager.cull_busy=True --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;"
  27. CommandGrampusDebug = "unzip -d %s %s;rm %s;" + CommandGpuDebug
  28. )
  29. var (
  30. poolInfos *models.PoolInfos
  31. FlavorInfos *setting.StFlavorInfos
  32. ImageInfos *setting.StImageInfosModelArts
  33. SpecialPools *models.SpecialPools
  34. CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
  35. "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  36. )
  37. type GenerateTrainJobReq struct {
  38. JobName string
  39. Command string
  40. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  41. ImageId string
  42. DisplayJobName string
  43. Uuid string
  44. Description string
  45. CodeObsPath string
  46. BootFile string
  47. BootFileUrl string
  48. DataUrl string
  49. TrainUrl string
  50. WorkServerNumber int
  51. EngineID int64
  52. CommitID string
  53. IsLatestVersion string
  54. BranchName string
  55. PreVersionId int64
  56. PreVersionName string
  57. VersionCount int
  58. EngineName string
  59. TotalVersionCount int
  60. ComputeResource string
  61. ProcessType string
  62. DatasetNames string
  63. DatasetInfos map[string]models.DatasetInfo
  64. Params string
  65. ModelName string
  66. LabelName string
  67. CkptName string
  68. ModelVersion string
  69. PreTrainModelPath string
  70. PreTrainModelUrl string
  71. Spec *models.Specification
  72. CodeName string
  73. }
  74. type GenerateNotebookJobReq struct {
  75. JobName string
  76. Command string
  77. ImageUrl string
  78. ImageId string
  79. DisplayJobName string
  80. Uuid string
  81. Description string
  82. CodeStoragePath string
  83. CommitID string
  84. BranchName string
  85. ComputeResource string
  86. ProcessType string
  87. DatasetNames string
  88. DatasetInfos map[string]models.DatasetInfo
  89. ModelName string
  90. LabelName string
  91. CkptName string
  92. ModelVersion string
  93. PreTrainModelPath string
  94. PreTrainModelUrl string
  95. Spec *models.Specification
  96. CodeName string
  97. ModelPath string //参考启智GPU调试, 挂载/model目录用户的模型可以输出到这个目录
  98. }
  99. func getEndPoint() string {
  100. index := strings.Index(setting.Endpoint, "//")
  101. endpoint := setting.Endpoint[index+2:]
  102. return endpoint
  103. }
  104. func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
  105. var datasetGrampus []models.GrampusDataset
  106. endPoint := getEndPoint()
  107. for _, datasetInfo := range datasetInfos {
  108. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  109. Name: datasetInfo.FullName,
  110. Bucket: setting.Bucket,
  111. EndPoint: endPoint,
  112. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  113. })
  114. }
  115. return datasetGrampus
  116. }
  117. func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  118. var datasetGrampus []models.GrampusDataset
  119. var command = ""
  120. for uuid, datasetInfo := range datasetInfos {
  121. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  122. Name: datasetInfo.FullName,
  123. Bucket: setting.Attachment.Minio.Bucket,
  124. EndPoint: setting.Attachment.Minio.Endpoint,
  125. ObjectKey: datasetInfo.DataLocalPath,
  126. ReadOnly: true,
  127. ContainerPath: "/dataset1/" + datasetInfo.Name,
  128. })
  129. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  130. }
  131. return datasetGrampus, command
  132. }
  133. func getDatasetGCUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  134. var datasetGrampus []models.GrampusDataset
  135. var command = ""
  136. obsEndPoint := getEndPoint()
  137. for uuid, datasetInfo := range datasetInfos {
  138. if datasetInfo.Type == models.TypeCloudBrainOne {
  139. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  140. Name: datasetInfo.FullName,
  141. Bucket: setting.Attachment.Minio.Bucket,
  142. EndPoint: setting.Attachment.Minio.Endpoint,
  143. ObjectKey: datasetInfo.DataLocalPath,
  144. ReadOnly: true,
  145. ContainerPath: "/dataset1/" + datasetInfo.Name,
  146. })
  147. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  148. } else {
  149. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  150. Name: datasetInfo.FullName,
  151. Bucket: setting.Bucket,
  152. EndPoint: obsEndPoint,
  153. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  154. ContainerPath: "/dataset/" + datasetInfo.Name,
  155. })
  156. }
  157. }
  158. return datasetGrampus, command
  159. }
  160. func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
  161. createTime := timeutil.TimeStampNow()
  162. var datasetGrampus []models.GrampusDataset
  163. var codeGrampus models.GrampusDataset
  164. var cpCommand string
  165. imageUrl := req.ImageUrl
  166. if ProcessorTypeNPU == req.ProcessType {
  167. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  168. if len(req.ModelName) != 0 {
  169. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  170. Name: req.ModelName,
  171. Bucket: setting.Bucket,
  172. EndPoint: getEndPoint(),
  173. ReadOnly: true,
  174. ObjectKey: req.PreTrainModelPath,
  175. })
  176. }
  177. codeGrampus = models.GrampusDataset{
  178. Name: req.CodeName,
  179. Bucket: setting.Bucket,
  180. EndPoint: getEndPoint(),
  181. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  182. ReadOnly: false,
  183. }
  184. imageUrl = ""
  185. req.Command = ""
  186. } else {
  187. if ProcessorTypeGCU == req.ProcessType {
  188. datasetGrampus, cpCommand = getDatasetGCUGrampus(req.DatasetInfos)
  189. } else {
  190. datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos)
  191. }
  192. if len(req.ModelName) != 0 {
  193. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  194. Name: req.ModelName,
  195. Bucket: setting.Attachment.Minio.Bucket,
  196. EndPoint: setting.Attachment.Minio.Endpoint,
  197. ObjectKey: req.PreTrainModelPath,
  198. ReadOnly: true,
  199. ContainerPath: cloudbrain.PretrainModelMountPath,
  200. })
  201. }
  202. codeArchiveName := cloudbrain.DefaultBranchName + ".zip"
  203. codeGrampus = models.GrampusDataset{
  204. Name: req.CodeName,
  205. Bucket: setting.Attachment.Minio.Bucket,
  206. EndPoint: setting.Attachment.Minio.Endpoint,
  207. ObjectKey: req.CodeStoragePath + codeArchiveName,
  208. ReadOnly: false,
  209. ContainerPath: cloudbrain.CodeMountPath,
  210. }
  211. if ProcessorTypeGCU == req.ProcessType {
  212. imageUrl = ""
  213. }
  214. codeArchiveContainerPath := cloudbrain.CodeMountPath + "/" + codeArchiveName
  215. req.Command = fmt.Sprintf(CommandGrampusDebug, cloudbrain.CodeMountPath, codeArchiveContainerPath, codeArchiveContainerPath, cpCommand, setting.CullIdleTimeout, setting.CullIdleTimeout, setting.CullInterval, setting.CullIdleTimeout, setting.CullInterval)
  216. log.Info("debug command:" + req.Command)
  217. }
  218. jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
  219. Name: req.JobName,
  220. Tasks: []models.GrampusNotebookTask{
  221. {
  222. Name: req.JobName,
  223. ResourceSpecId: req.Spec.SourceSpecId,
  224. ImageId: req.ImageId,
  225. ImageUrl: imageUrl,
  226. Datasets: datasetGrampus,
  227. Code: codeGrampus,
  228. AutoStopDuration: autoStopDurationMs,
  229. Capacity: setting.Capacity,
  230. Command: req.Command,
  231. CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
  232. },
  233. },
  234. })
  235. if err != nil {
  236. log.Error("createNotebookJob failed: %v", err.Error())
  237. return "", err
  238. }
  239. jobID := jobResult.JobInfo.JobID
  240. err = models.CreateCloudbrain(&models.Cloudbrain{
  241. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  242. UserID: ctx.User.ID,
  243. RepoID: ctx.Repo.Repository.ID,
  244. JobID: jobID,
  245. JobName: req.JobName,
  246. DisplayJobName: req.DisplayJobName,
  247. JobType: string(models.JobTypeDebug),
  248. Type: models.TypeC2Net,
  249. Uuid: req.Uuid,
  250. DatasetName: req.DatasetNames,
  251. CommitID: req.CommitID,
  252. IsLatestVersion: "1",
  253. ComputeResource: req.ComputeResource,
  254. ImageID: req.ImageId,
  255. BranchName: req.BranchName,
  256. Description: req.Description,
  257. WorkServerNumber: 1,
  258. EngineName: req.ImageUrl,
  259. CreatedUnix: createTime,
  260. UpdatedUnix: createTime,
  261. Spec: req.Spec,
  262. ModelName: req.ModelName,
  263. ModelVersion: req.ModelVersion,
  264. LabelName: req.LabelName,
  265. PreTrainModelUrl: req.PreTrainModelUrl,
  266. CkptName: req.CkptName,
  267. })
  268. if err != nil {
  269. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  270. return "", err
  271. }
  272. var actionType models.ActionType
  273. if req.ComputeResource == models.NPUResource {
  274. actionType = models.ActionCreateGrampusNPUDebugTask
  275. } else if req.ComputeResource == models.GPUResource {
  276. actionType = models.ActionCreateGrampusGPUDebugTask
  277. } else if req.ComputeResource == models.GCUResource {
  278. actionType = models.ActionCreateGrampusGCUDebugTask
  279. }
  280. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  281. return jobID, nil
  282. }
  283. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
  284. createTime := timeutil.TimeStampNow()
  285. var datasetGrampus, modelGrampus []models.GrampusDataset
  286. var codeGrampus models.GrampusDataset
  287. if ProcessorTypeNPU == req.ProcessType {
  288. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  289. if len(req.ModelName) != 0 {
  290. modelGrampus = []models.GrampusDataset{
  291. {
  292. Name: req.ModelName,
  293. Bucket: setting.Bucket,
  294. EndPoint: getEndPoint(),
  295. ObjectKey: req.PreTrainModelPath,
  296. },
  297. }
  298. }
  299. codeGrampus = models.GrampusDataset{
  300. Name: req.CodeName,
  301. Bucket: setting.Bucket,
  302. EndPoint: getEndPoint(),
  303. ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
  304. }
  305. }
  306. jobResult, err := createJob(models.CreateGrampusJobRequest{
  307. Name: req.JobName,
  308. Tasks: []models.GrampusTasks{
  309. {
  310. Name: req.JobName,
  311. Command: req.Command,
  312. ResourceSpecId: req.Spec.SourceSpecId,
  313. ImageId: req.ImageId,
  314. ImageUrl: req.ImageUrl,
  315. CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
  316. ReplicaNum: 1,
  317. Datasets: datasetGrampus,
  318. Models: modelGrampus,
  319. Code: codeGrampus,
  320. BootFile: req.BootFile,
  321. },
  322. },
  323. })
  324. if err != nil {
  325. log.Error("createJob failed: %v", err.Error())
  326. return "", err
  327. }
  328. jobID := jobResult.JobInfo.JobID
  329. err = models.CreateCloudbrain(&models.Cloudbrain{
  330. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  331. UserID: ctx.User.ID,
  332. RepoID: ctx.Repo.Repository.ID,
  333. JobID: jobID,
  334. JobName: req.JobName,
  335. DisplayJobName: req.DisplayJobName,
  336. JobType: string(models.JobTypeTrain),
  337. Type: models.TypeC2Net,
  338. Uuid: req.Uuid,
  339. DatasetName: req.DatasetNames,
  340. CommitID: req.CommitID,
  341. IsLatestVersion: req.IsLatestVersion,
  342. ComputeResource: req.ComputeResource,
  343. ImageID: req.ImageId,
  344. TrainUrl: req.TrainUrl,
  345. BranchName: req.BranchName,
  346. Parameters: req.Params,
  347. BootFile: req.BootFile,
  348. DataUrl: req.DataUrl,
  349. Description: req.Description,
  350. WorkServerNumber: req.WorkServerNumber,
  351. EngineName: req.EngineName,
  352. VersionCount: req.VersionCount,
  353. TotalVersionCount: req.TotalVersionCount,
  354. CreatedUnix: createTime,
  355. UpdatedUnix: createTime,
  356. Spec: req.Spec,
  357. ModelName: req.ModelName,
  358. ModelVersion: req.ModelVersion,
  359. LabelName: req.LabelName,
  360. PreTrainModelUrl: req.PreTrainModelUrl,
  361. CkptName: req.CkptName,
  362. })
  363. if err != nil {
  364. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  365. return "", err
  366. }
  367. var actionType models.ActionType
  368. if req.ComputeResource == models.NPUResource {
  369. actionType = models.ActionCreateGrampusNPUTrainTask
  370. } else if req.ComputeResource == models.GPUResource {
  371. actionType = models.ActionCreateGrampusGPUTrainTask
  372. }
  373. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  374. return jobID, nil
  375. }
  376. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  377. var centerID []string
  378. var centerName []string
  379. includeCenters := make(map[string]string)
  380. excludeCenters := make(map[string]string)
  381. if SpecialPools != nil {
  382. for _, pool := range SpecialPools.Pools {
  383. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  384. org, _ := models.GetOrgByName(pool.Org)
  385. if org != nil {
  386. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  387. if isOrgMember {
  388. for _, info := range pool.Pool {
  389. includeCenters[info.Queue] = info.Value
  390. }
  391. } else {
  392. for _, info := range pool.Pool {
  393. excludeCenters[info.Queue] = info.Value
  394. }
  395. }
  396. }
  397. }
  398. }
  399. }
  400. if len(includeCenters) > 0 {
  401. //如果有专属资源池,根据专属资源池指定智算中心
  402. for k, v := range includeCenters {
  403. centerID = append(centerID, k)
  404. centerName = append(centerName, v)
  405. }
  406. } else if len(excludeCenters) > 0 {
  407. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  408. allCenters := make(map[string]string)
  409. specs, err := GetResourceSpecs(req.ProcessType)
  410. if err == nil {
  411. for _, info := range specs.Infos {
  412. for _, center := range info.Centers {
  413. allCenters[center.ID] = center.Name
  414. }
  415. }
  416. }
  417. for k, _ := range excludeCenters {
  418. delete(allCenters, k)
  419. }
  420. for k, v := range allCenters {
  421. centerID = append(centerID, k)
  422. centerName = append(centerName, v)
  423. }
  424. }
  425. return centerID, centerName
  426. }
  427. func TransTrainJobStatus(status string) string {
  428. if status == models.GrampusStatusPending {
  429. status = models.GrampusStatusWaiting
  430. }
  431. return strings.ToUpper(status)
  432. }
  433. func GetNpuModelRemoteObsUrl(jobName string) string {
  434. return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
  435. }
  436. func GetNpuModelObjectKey(jobName string) string {
  437. return setting.CodePathPrefix + jobName + RemoteModelPath
  438. }
  439. func GetRemoteEndPoint(aiCenterID string) string {
  440. var endPoint string
  441. for _, info := range setting.CenterInfos.Info {
  442. if info.CenterID == aiCenterID {
  443. endPoint = info.Endpoint
  444. break
  445. }
  446. }
  447. return endPoint
  448. }
  449. func GetCenterProxy(aiCenterID string) string {
  450. var proxy string
  451. for _, info := range setting.CenterInfos.Info {
  452. if info.CenterID == aiCenterID {
  453. proxy = info.StorageProxyServer
  454. break
  455. }
  456. }
  457. return proxy
  458. }