You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 15 kB

3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package grampus
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/cloudbrain"
  8. "code.gitea.io/gitea/modules/context"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/modules/notification"
  11. "code.gitea.io/gitea/modules/setting"
  12. "code.gitea.io/gitea/modules/timeutil"
  13. )
  14. const (
  15. JobPath = "job/"
  16. ProcessorTypeNPU = "npu.huawei.com/NPU"
  17. ProcessorTypeGPU = "nvidia.com/gpu"
  18. GpuWorkDir = "/tmp/"
  19. NpuWorkDir = "/cache/"
  20. NpuLocalLogUrl = "/tmp/train.log"
  21. CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
  22. CodeArchiveName = "master.zip"
  23. BucketRemote = "grampus"
  24. RemoteModelPath = "/output/" + models.ModelSuffix
  25. autoStopDurationMs = 4 * 60 * 60 * 1000
  26. CommandGpuDebug = "mkdir -p /dataset;%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --ServerApp.shutdown_no_activity_timeout=%s --TerminalManager.cull_inactive_timeout=%s --TerminalManager.cull_interval=%s --MappingKernelManager.cull_idle_timeout=%s --MappingKernelManager.cull_interval=%s --MappingKernelManager.cull_connected=True --MappingKernelManager.cull_busy=True --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;"
  27. )
  28. var (
  29. poolInfos *models.PoolInfos
  30. FlavorInfos *setting.StFlavorInfos
  31. ImageInfos *setting.StImageInfosModelArts
  32. SpecialPools *models.SpecialPools
  33. CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
  34. "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  35. )
  36. type GenerateTrainJobReq struct {
  37. JobName string
  38. Command string
  39. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  40. ImageId string
  41. DisplayJobName string
  42. Uuid string
  43. Description string
  44. CodeObsPath string
  45. BootFile string
  46. BootFileUrl string
  47. DataUrl string
  48. TrainUrl string
  49. WorkServerNumber int
  50. EngineID int64
  51. CommitID string
  52. IsLatestVersion string
  53. BranchName string
  54. PreVersionId int64
  55. PreVersionName string
  56. VersionCount int
  57. EngineName string
  58. TotalVersionCount int
  59. ComputeResource string
  60. ProcessType string
  61. DatasetNames string
  62. DatasetInfos map[string]models.DatasetInfo
  63. Params string
  64. ModelName string
  65. LabelName string
  66. CkptName string
  67. ModelVersion string
  68. PreTrainModelPath string
  69. PreTrainModelUrl string
  70. Spec *models.Specification
  71. CodeName string
  72. }
  73. type GenerateNotebookJobReq struct {
  74. JobName string
  75. Command string
  76. ImageUrl string
  77. ImageId string
  78. DisplayJobName string
  79. Uuid string
  80. Description string
  81. CodeStoragePath string
  82. CommitID string
  83. BranchName string
  84. ComputeResource string
  85. ProcessType string
  86. DatasetNames string
  87. DatasetInfos map[string]models.DatasetInfo
  88. ModelName string
  89. LabelName string
  90. CkptName string
  91. ModelVersion string
  92. PreTrainModelPath string
  93. PreTrainModelUrl string
  94. Spec *models.Specification
  95. CodeName string
  96. ModelPath string //参考启智GPU调试, 挂载/model目录用户的模型可以输出到这个目录
  97. }
  98. func getEndPoint() string {
  99. index := strings.Index(setting.Endpoint, "//")
  100. endpoint := setting.Endpoint[index+2:]
  101. return endpoint
  102. }
  103. func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
  104. var datasetGrampus []models.GrampusDataset
  105. endPoint := getEndPoint()
  106. for _, datasetInfo := range datasetInfos {
  107. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  108. Name: datasetInfo.FullName,
  109. Bucket: setting.Bucket,
  110. EndPoint: endPoint,
  111. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  112. })
  113. }
  114. return datasetGrampus
  115. }
  116. func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  117. var datasetGrampus []models.GrampusDataset
  118. var command = ""
  119. for uuid, datasetInfo := range datasetInfos {
  120. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  121. Name: datasetInfo.FullName,
  122. Bucket: setting.Attachment.Minio.Bucket,
  123. EndPoint: setting.Attachment.Minio.Endpoint,
  124. ObjectKey: datasetInfo.DataLocalPath,
  125. ReadOnly: true,
  126. ContainerPath: "/dataset1/" + datasetInfo.Name,
  127. })
  128. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  129. }
  130. return datasetGrampus, command
  131. }
  132. func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
  133. createTime := timeutil.TimeStampNow()
  134. var datasetGrampus []models.GrampusDataset
  135. var codeGrampus models.GrampusDataset
  136. var cpCommand string
  137. imageUrl := req.ImageUrl
  138. if ProcessorTypeNPU == req.ProcessType {
  139. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  140. if len(req.ModelName) != 0 {
  141. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  142. Name: req.ModelName,
  143. Bucket: setting.Bucket,
  144. EndPoint: getEndPoint(),
  145. ReadOnly: true,
  146. ObjectKey: req.PreTrainModelPath,
  147. })
  148. }
  149. codeGrampus = models.GrampusDataset{
  150. Name: req.CodeName,
  151. Bucket: setting.Bucket,
  152. EndPoint: getEndPoint(),
  153. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  154. ReadOnly: false,
  155. }
  156. imageUrl = ""
  157. req.Command = ""
  158. } else {
  159. datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos)
  160. if len(req.ModelName) != 0 {
  161. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  162. Name: req.ModelName,
  163. Bucket: setting.Attachment.Minio.Bucket,
  164. EndPoint: setting.Attachment.Minio.Endpoint,
  165. ObjectKey: req.PreTrainModelPath,
  166. ReadOnly: true,
  167. ContainerPath: cloudbrain.PretrainModelMountPath,
  168. })
  169. }
  170. codeGrampus = models.GrampusDataset{
  171. Name: req.CodeName,
  172. Bucket: setting.Attachment.Minio.Bucket,
  173. EndPoint: setting.Attachment.Minio.Endpoint,
  174. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  175. ReadOnly: false,
  176. ContainerPath: cloudbrain.CodeMountPath,
  177. }
  178. req.Command = fmt.Sprintf(CommandGpuDebug, cpCommand, setting.CullIdleTimeout, setting.CullIdleTimeout, setting.CullInterval, setting.CullIdleTimeout, setting.CullInterval)
  179. log.Info("debug command:" + req.Command)
  180. }
  181. jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
  182. Name: req.JobName,
  183. Tasks: []models.GrampusNotebookTask{
  184. {
  185. Name: req.JobName,
  186. ResourceSpecId: req.Spec.SourceSpecId,
  187. ImageId: req.ImageId,
  188. ImageUrl: imageUrl,
  189. Datasets: datasetGrampus,
  190. Code: codeGrampus,
  191. AutoStopDuration: autoStopDurationMs,
  192. Capacity: setting.Capacity,
  193. Command: req.Command,
  194. },
  195. },
  196. })
  197. if err != nil {
  198. log.Error("createNotebookJob failed: %v", err.Error())
  199. return "", err
  200. }
  201. jobID := jobResult.JobInfo.JobID
  202. err = models.CreateCloudbrain(&models.Cloudbrain{
  203. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  204. UserID: ctx.User.ID,
  205. RepoID: ctx.Repo.Repository.ID,
  206. JobID: jobID,
  207. JobName: req.JobName,
  208. DisplayJobName: req.DisplayJobName,
  209. JobType: string(models.JobTypeDebug),
  210. Type: models.TypeC2Net,
  211. Uuid: req.Uuid,
  212. DatasetName: req.DatasetNames,
  213. CommitID: req.CommitID,
  214. IsLatestVersion: "1",
  215. ComputeResource: req.ComputeResource,
  216. ImageID: req.ImageId,
  217. BranchName: req.BranchName,
  218. Description: req.Description,
  219. WorkServerNumber: 1,
  220. EngineName: req.ImageUrl,
  221. CreatedUnix: createTime,
  222. UpdatedUnix: createTime,
  223. Spec: req.Spec,
  224. ModelName: req.ModelName,
  225. ModelVersion: req.ModelVersion,
  226. LabelName: req.LabelName,
  227. PreTrainModelUrl: req.PreTrainModelUrl,
  228. CkptName: req.CkptName,
  229. })
  230. if err != nil {
  231. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  232. return "", err
  233. }
  234. var actionType models.ActionType
  235. if req.ComputeResource == models.NPUResource {
  236. actionType = models.ActionCreateGrampusNPUDebugTask
  237. } else if req.ComputeResource == models.GPUResource {
  238. actionType = models.ActionCreateGrampusGPUDebugTask
  239. }
  240. task, err := models.GetCloudbrainByJobID(jobID)
  241. if err != nil {
  242. log.Error("GetCloudbrainByJobID failed: %v", err.Error())
  243. return "", err
  244. }
  245. stringId := strconv.FormatInt(task.ID, 10)
  246. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, req.DisplayJobName, actionType)
  247. return jobID, nil
  248. }
  249. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
  250. createTime := timeutil.TimeStampNow()
  251. var datasetGrampus, modelGrampus []models.GrampusDataset
  252. var codeGrampus models.GrampusDataset
  253. if ProcessorTypeNPU == req.ProcessType {
  254. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  255. if len(req.ModelName) != 0 {
  256. modelGrampus = []models.GrampusDataset{
  257. {
  258. Name: req.ModelName,
  259. Bucket: setting.Bucket,
  260. EndPoint: getEndPoint(),
  261. ObjectKey: req.PreTrainModelPath,
  262. },
  263. }
  264. }
  265. codeGrampus = models.GrampusDataset{
  266. Name: req.CodeName,
  267. Bucket: setting.Bucket,
  268. EndPoint: getEndPoint(),
  269. ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
  270. }
  271. }
  272. jobResult, err := createJob(models.CreateGrampusJobRequest{
  273. Name: req.JobName,
  274. Tasks: []models.GrampusTasks{
  275. {
  276. Name: req.JobName,
  277. Command: req.Command,
  278. ResourceSpecId: req.Spec.SourceSpecId,
  279. ImageId: req.ImageId,
  280. ImageUrl: req.ImageUrl,
  281. CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
  282. ReplicaNum: 1,
  283. Datasets: datasetGrampus,
  284. Models: modelGrampus,
  285. Code: codeGrampus,
  286. BootFile: req.BootFile,
  287. },
  288. },
  289. })
  290. if err != nil {
  291. log.Error("createJob failed: %v", err.Error())
  292. return "", err
  293. }
  294. jobID := jobResult.JobInfo.JobID
  295. err = models.CreateCloudbrain(&models.Cloudbrain{
  296. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  297. UserID: ctx.User.ID,
  298. RepoID: ctx.Repo.Repository.ID,
  299. JobID: jobID,
  300. JobName: req.JobName,
  301. DisplayJobName: req.DisplayJobName,
  302. JobType: string(models.JobTypeTrain),
  303. Type: models.TypeC2Net,
  304. Uuid: req.Uuid,
  305. DatasetName: req.DatasetNames,
  306. CommitID: req.CommitID,
  307. IsLatestVersion: req.IsLatestVersion,
  308. ComputeResource: req.ComputeResource,
  309. ImageID: req.ImageId,
  310. TrainUrl: req.TrainUrl,
  311. BranchName: req.BranchName,
  312. Parameters: req.Params,
  313. BootFile: req.BootFile,
  314. DataUrl: req.DataUrl,
  315. Description: req.Description,
  316. WorkServerNumber: req.WorkServerNumber,
  317. EngineName: req.EngineName,
  318. VersionCount: req.VersionCount,
  319. TotalVersionCount: req.TotalVersionCount,
  320. CreatedUnix: createTime,
  321. UpdatedUnix: createTime,
  322. Spec: req.Spec,
  323. ModelName: req.ModelName,
  324. ModelVersion: req.ModelVersion,
  325. LabelName: req.LabelName,
  326. PreTrainModelUrl: req.PreTrainModelUrl,
  327. CkptName: req.CkptName,
  328. })
  329. if err != nil {
  330. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  331. return "", err
  332. }
  333. var actionType models.ActionType
  334. if req.ComputeResource == models.NPUResource {
  335. actionType = models.ActionCreateGrampusNPUTrainTask
  336. } else if req.ComputeResource == models.GPUResource {
  337. actionType = models.ActionCreateGrampusGPUTrainTask
  338. }
  339. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  340. return jobID, nil
  341. }
  342. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  343. var centerID []string
  344. var centerName []string
  345. includeCenters := make(map[string]string)
  346. excludeCenters := make(map[string]string)
  347. if SpecialPools != nil {
  348. for _, pool := range SpecialPools.Pools {
  349. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  350. org, _ := models.GetOrgByName(pool.Org)
  351. if org != nil {
  352. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  353. if isOrgMember {
  354. for _, info := range pool.Pool {
  355. includeCenters[info.Queue] = info.Value
  356. }
  357. } else {
  358. for _, info := range pool.Pool {
  359. excludeCenters[info.Queue] = info.Value
  360. }
  361. }
  362. }
  363. }
  364. }
  365. }
  366. if len(includeCenters) > 0 {
  367. //如果有专属资源池,根据专属资源池指定智算中心
  368. for k, v := range includeCenters {
  369. centerID = append(centerID, k)
  370. centerName = append(centerName, v)
  371. }
  372. } else if len(excludeCenters) > 0 {
  373. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  374. allCenters := make(map[string]string)
  375. specs, err := GetResourceSpecs(req.ProcessType)
  376. if err == nil {
  377. for _, info := range specs.Infos {
  378. for _, center := range info.Centers {
  379. allCenters[center.ID] = center.Name
  380. }
  381. }
  382. }
  383. for k, _ := range excludeCenters {
  384. delete(allCenters, k)
  385. }
  386. for k, v := range allCenters {
  387. centerID = append(centerID, k)
  388. centerName = append(centerName, v)
  389. }
  390. }
  391. return centerID, centerName
  392. }
  393. func TransTrainJobStatus(status string) string {
  394. if status == models.GrampusStatusPending {
  395. status = models.GrampusStatusWaiting
  396. }
  397. return strings.ToUpper(status)
  398. }
  399. func GetNpuModelRemoteObsUrl(jobName string) string {
  400. return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
  401. }
  402. func GetNpuModelObjectKey(jobName string) string {
  403. return setting.CodePathPrefix + jobName + RemoteModelPath
  404. }
  405. func GetRemoteEndPoint(aiCenterID string) string {
  406. var endPoint string
  407. for _, info := range setting.CenterInfos.Info {
  408. if info.CenterID == aiCenterID {
  409. endPoint = info.Endpoint
  410. break
  411. }
  412. }
  413. return endPoint
  414. }
  415. func GetCenterProxy(aiCenterID string) string {
  416. var proxy string
  417. for _, info := range setting.CenterInfos.Info {
  418. if info.CenterID == aiCenterID {
  419. proxy = info.StorageProxyServer
  420. break
  421. }
  422. }
  423. return proxy
  424. }