You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 6.0 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package grampus
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "code.gitea.io/gitea/modules/setting"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/timeutil"
  11. )
  12. const (
  13. JobPath = "job/"
  14. ProcessorTypeNPU = "npu.huawei.com/NPU"
  15. ProcessorTypeGPU = "nvidia.com/gpu"
  16. GpuWorkDir = "/tmp/"
  17. NpuWorkDir = "/cache/"
  18. CommandPrepareScript = ";mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" +
  19. "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  20. CodeArchiveName = "master.zip"
  21. )
  22. var (
  23. poolInfos *models.PoolInfos
  24. FlavorInfos *models.FlavorInfos
  25. ImageInfos *models.ImageInfosModelArts
  26. SpecialPools *models.SpecialPools
  27. )
  28. type GenerateTrainJobReq struct {
  29. JobName string
  30. Command string
  31. ResourceSpecId string
  32. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  33. ImageId string
  34. DisplayJobName string
  35. Uuid string
  36. Description string
  37. CodeObsPath string
  38. BootFile string
  39. BootFileUrl string
  40. DataUrl string
  41. TrainUrl string
  42. WorkServerNumber int
  43. EngineID int64
  44. CommitID string
  45. IsLatestVersion string
  46. BranchName string
  47. PreVersionId int64
  48. PreVersionName string
  49. FlavorName string
  50. VersionCount int
  51. EngineName string
  52. TotalVersionCount int
  53. ComputeResource string
  54. ProcessType string
  55. DatasetName string
  56. Params string
  57. }
  58. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) {
  59. createTime := timeutil.TimeStampNow()
  60. centerID, centerName := getCentersParamter(ctx, req)
  61. jobResult, err := createJob(models.CreateGrampusJobRequest{
  62. Name: req.JobName,
  63. Tasks: []models.GrampusTasks{
  64. {
  65. Name: req.JobName,
  66. Command: req.Command,
  67. ResourceSpecId: req.ResourceSpecId,
  68. ImageId: req.ImageId,
  69. ImageUrl: req.ImageUrl,
  70. CenterID: centerID,
  71. CenterName: centerName,
  72. ReplicaNum: 1,
  73. },
  74. },
  75. })
  76. if err != nil {
  77. log.Error("createJob failed: %v", err.Error())
  78. return err
  79. }
  80. jobID := jobResult.JobInfo.JobID
  81. err = models.CreateCloudbrain(&models.Cloudbrain{
  82. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  83. UserID: ctx.User.ID,
  84. RepoID: ctx.Repo.Repository.ID,
  85. JobID: jobID,
  86. JobName: req.JobName,
  87. DisplayJobName: req.DisplayJobName,
  88. JobType: string(models.JobTypeTrain),
  89. Type: models.TypeC2Net,
  90. Uuid: req.Uuid,
  91. DatasetName: req.DatasetName,
  92. CommitID: req.CommitID,
  93. IsLatestVersion: req.IsLatestVersion,
  94. ComputeResource: req.ComputeResource,
  95. ImageID: req.ImageId,
  96. TrainUrl: req.TrainUrl,
  97. BranchName: req.BranchName,
  98. Parameters: req.Params,
  99. BootFile: req.BootFile,
  100. DataUrl: req.DataUrl,
  101. FlavorCode: req.ResourceSpecId,
  102. Description: req.Description,
  103. WorkServerNumber: req.WorkServerNumber,
  104. FlavorName: req.FlavorName,
  105. EngineName: req.EngineName,
  106. VersionCount: req.VersionCount,
  107. TotalVersionCount: req.TotalVersionCount,
  108. CreatedUnix: createTime,
  109. UpdatedUnix: createTime,
  110. })
  111. if err != nil {
  112. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  113. return err
  114. }
  115. var actionType models.ActionType
  116. if req.ComputeResource == models.NPUResource {
  117. actionType = models.ActionCreateGrampusNPUTrainTask
  118. } else if req.ComputeResource == models.GPUResource {
  119. actionType = models.ActionCreateGrampusGPUTrainTask
  120. }
  121. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  122. return nil
  123. }
  124. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  125. var centerID []string
  126. var centerName []string
  127. includeCenters := make(map[string]string)
  128. excludeCenters := make(map[string]string)
  129. if SpecialPools != nil {
  130. for _, pool := range SpecialPools.Pools {
  131. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  132. org, _ := models.GetOrgByName(pool.Org)
  133. if org != nil {
  134. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  135. if isOrgMember {
  136. for _, info := range pool.Pool {
  137. includeCenters[info.Queue] = info.Value
  138. }
  139. } else {
  140. for _, info := range pool.Pool {
  141. excludeCenters[info.Queue] = info.Value
  142. }
  143. }
  144. }
  145. }
  146. }
  147. }
  148. if len(includeCenters) > 0 {
  149. //如果有专属资源池,根据专属资源池指定智算中心
  150. for k, v := range includeCenters {
  151. centerID = append(centerID, k)
  152. centerName = append(centerName, v)
  153. }
  154. } else if len(excludeCenters) > 0 {
  155. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  156. allCenters := make(map[string]string)
  157. specs, err := GetResourceSpecs(req.ProcessType)
  158. if err == nil {
  159. for _, info := range specs.Infos {
  160. for _, center := range info.Centers {
  161. allCenters[center.ID] = center.Name
  162. }
  163. }
  164. }
  165. for k, _ := range excludeCenters {
  166. delete(allCenters, k)
  167. }
  168. for k, v := range allCenters {
  169. centerID = append(centerID, k)
  170. centerName = append(centerName, v)
  171. }
  172. }
  173. return centerID, centerName
  174. }
  175. func TransTrainJobStatus(status string) string {
  176. if status == models.GrampusStatusPending {
  177. status = models.GrampusStatusWaiting
  178. }
  179. return strings.ToUpper(status)
  180. }
  181. func InitSpecialPool() {
  182. if SpecialPools == nil && setting.Grampus.SpecialPools != "" {
  183. json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
  184. }
  185. }