You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 31 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. package repo
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io/ioutil"
  6. "net/http"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "code.gitea.io/gitea/modules/auth"
  13. "code.gitea.io/gitea/modules/git"
  14. "code.gitea.io/gitea/modules/grampus"
  15. "code.gitea.io/gitea/modules/modelarts"
  16. "code.gitea.io/gitea/modules/notification"
  17. "code.gitea.io/gitea/modules/timeutil"
  18. "code.gitea.io/gitea/modules/util"
  19. "github.com/unknwon/com"
  20. "code.gitea.io/gitea/models"
  21. "code.gitea.io/gitea/modules/base"
  22. "code.gitea.io/gitea/modules/cloudbrain"
  23. "code.gitea.io/gitea/modules/context"
  24. "code.gitea.io/gitea/modules/log"
  25. "code.gitea.io/gitea/modules/setting"
  26. )
  27. const (
  28. tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show"
  29. //GPU
  30. tplGrampusTrainJobGPUNew base.TplName = "repo/grampus/trainjob/gpu/new"
  31. //NPU
  32. tplGrampusTrainJobNPUNew base.TplName = "repo/grampus/trainjob/npu/new"
  33. )
  34. func GrampusTrainJobGPUNew(ctx *context.Context) {
  35. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  36. err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  37. if err != nil {
  38. ctx.ServerError("get new train-job info failed", err)
  39. return
  40. }
  41. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.GPUResource, models.JobTypeTrain)
  42. ctx.Data["WaitCount"] = waitCount
  43. ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew)
  44. }
  45. func GrampusTrainJobNPUNew(ctx *context.Context) {
  46. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  47. err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  48. if err != nil {
  49. ctx.ServerError("get new train-job info failed", err)
  50. return
  51. }
  52. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.NPUResource, models.JobTypeTrain)
  53. ctx.Data["WaitCount"] = waitCount
  54. ctx.HTML(200, tplGrampusTrainJobNPUNew)
  55. }
  56. func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error {
  57. ctx.Data["PageIsCloudBrain"] = true
  58. t := time.Now()
  59. var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
  60. ctx.Data["display_job_name"] = displayJobName
  61. //get valid images
  62. images, err := grampus.GetImages(processType)
  63. if err != nil {
  64. log.Error("GetImages failed:", err.Error())
  65. } else {
  66. ctx.Data["images"] = images.Infos
  67. }
  68. grampus.InitSpecialPool()
  69. ctx.Data["GPUEnabled"] = true
  70. ctx.Data["NPUEnabled"] = true
  71. includeCenters := make(map[string]struct{})
  72. excludeCenters := make(map[string]struct{})
  73. if grampus.SpecialPools != nil {
  74. for _, pool := range grampus.SpecialPools.Pools {
  75. if pool.IsExclusive {
  76. if !IsUserInOrgPool(ctx.User.ID, pool) {
  77. ctx.Data[pool.Type+"Enabled"] = false
  78. }
  79. } else {
  80. if strings.Contains(strings.ToLower(processType), strings.ToLower(pool.Type)) {
  81. if IsUserInOrgPool(ctx.User.ID, pool) {
  82. for _, center := range pool.Pool {
  83. includeCenters[center.Queue] = struct{}{}
  84. }
  85. } else {
  86. for _, center := range pool.Pool {
  87. excludeCenters[center.Queue] = struct{}{}
  88. }
  89. }
  90. }
  91. }
  92. }
  93. }
  94. //get valid resource specs
  95. specs, err := grampus.GetResourceSpecs(processType)
  96. grampusSpecs := getFilterSpecBySpecialPool(specs, includeCenters, excludeCenters)
  97. if err != nil {
  98. log.Error("GetResourceSpecs failed:", err.Error())
  99. } else {
  100. ctx.Data["flavor_infos"] = grampusSpecs
  101. }
  102. //get branches
  103. branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0)
  104. if err != nil {
  105. log.Error("GetBranches error:", err.Error())
  106. } else {
  107. ctx.Data["branches"] = branches
  108. }
  109. ctx.Data["branchName"] = ctx.Repo.BranchName
  110. if processType == grampus.ProcessorTypeGPU {
  111. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  112. } else if processType == grampus.ProcessorTypeNPU {
  113. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  114. }
  115. return nil
  116. }
  117. func grampusTrainJobErrorPrepare(ctx *context.Context, processType string, form auth.CreateGrampusTrainJobForm) error {
  118. ctx.Data["PageIsCloudBrain"] = true
  119. //get valid images
  120. images, err := grampus.GetImages(processType)
  121. if err != nil {
  122. log.Error("GetImages failed:", err.Error())
  123. } else {
  124. ctx.Data["images"] = images.Infos
  125. }
  126. grampus.InitSpecialPool()
  127. ctx.Data["GPUEnabled"] = true
  128. ctx.Data["NPUEnabled"] = true
  129. includeCenters := make(map[string]struct{})
  130. excludeCenters := make(map[string]struct{})
  131. if grampus.SpecialPools != nil {
  132. for _, pool := range grampus.SpecialPools.Pools {
  133. if pool.IsExclusive {
  134. if !IsUserInOrgPool(ctx.User.ID, pool) {
  135. ctx.Data[pool.Type+"Enabled"] = false
  136. }
  137. } else {
  138. if strings.Contains(strings.ToLower(processType), strings.ToLower(pool.Type)) {
  139. if IsUserInOrgPool(ctx.User.ID, pool) {
  140. for _, center := range pool.Pool {
  141. includeCenters[center.Queue] = struct{}{}
  142. }
  143. } else {
  144. for _, center := range pool.Pool {
  145. excludeCenters[center.Queue] = struct{}{}
  146. }
  147. }
  148. }
  149. }
  150. }
  151. }
  152. //get valid resource specs
  153. specs, err := grampus.GetResourceSpecs(processType)
  154. grampusSpecs := getFilterSpecBySpecialPool(specs, includeCenters, excludeCenters)
  155. if err != nil {
  156. log.Error("GetResourceSpecs failed:", err.Error())
  157. } else {
  158. ctx.Data["flavor_infos"] = grampusSpecs
  159. }
  160. if processType == grampus.ProcessorTypeGPU {
  161. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  162. } else if processType == grampus.ProcessorTypeNPU {
  163. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  164. }
  165. var Parameters modelarts.Parameters
  166. if err := json.Unmarshal([]byte(form.Params), &Parameters); err != nil {
  167. ctx.ServerError("json.Unmarshal failed:", err)
  168. return err
  169. }
  170. ctx.Data["params"] = Parameters.Parameter
  171. ctx.Data["boot_file"] = form.BootFile
  172. ctx.Data["attachment"] = form.Attachment
  173. _, datasetNames, err := models.GetDatasetInfo(form.Attachment)
  174. if err != nil {
  175. log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"])
  176. return nil
  177. }
  178. ctx.Data["dataset_name"] = datasetNames
  179. ctx.Data["branch_name"] = form.BranchName
  180. ctx.Data["image_id"] = form.ImageID
  181. ctx.Data["display_job_name"] = form.DisplayJobName
  182. ctx.Data["image"] = form.Image
  183. ctx.Data["flavor"] = form.FlavorID
  184. ctx.Data["flavor_name"] = form.FlavorName
  185. ctx.Data["description"] = form.Description
  186. ctx.Data["engine_name"] = form.EngineName
  187. ctx.Data["work_server_number"] = form.WorkServerNumber
  188. return nil
  189. }
  190. func getFilterSpecBySpecialPool(specs *models.GetGrampusResourceSpecsResult, includeCenters map[string]struct{}, excludeCenters map[string]struct{}) []models.GrampusSpec {
  191. if len(includeCenters) == 0 && len(excludeCenters) == 0 {
  192. return specs.Infos
  193. }
  194. var grampusSpecs []models.GrampusSpec
  195. for _, info := range specs.Infos {
  196. if isInIncludeCenters(info, includeCenters) || (len(excludeCenters) != 0 && isNotAllInExcludeCenters(info, excludeCenters)) {
  197. grampusSpecs = append(grampusSpecs, info)
  198. }
  199. }
  200. return grampusSpecs
  201. }
  202. func isInIncludeCenters(grampusSpec models.GrampusSpec, centers map[string]struct{}) bool {
  203. for _, center := range grampusSpec.Centers {
  204. if _, ok := centers[center.ID]; ok {
  205. return true
  206. }
  207. }
  208. return false
  209. }
  210. func isNotAllInExcludeCenters(grampusSpec models.GrampusSpec, centers map[string]struct{}) bool {
  211. for _, center := range grampusSpec.Centers {
  212. if _, ok := centers[center.ID]; !ok {
  213. return true
  214. }
  215. }
  216. return false
  217. }
  218. func IsUserInOrgPool(userId int64, pool *models.SpecialPool) bool {
  219. org, _ := models.GetOrgByName(pool.Org)
  220. if org != nil {
  221. isOrgMember, _ := models.IsOrganizationMember(org.ID, userId)
  222. return isOrgMember
  223. }
  224. return false
  225. }
  226. func grampusParamCheckCreateTrainJob(form auth.CreateGrampusTrainJobForm) error {
  227. if !strings.HasSuffix(strings.TrimSpace(form.BootFile), ".py") {
  228. log.Error("the boot file(%s) must be a python file", form.BootFile)
  229. return errors.New("启动文件必须是python文件")
  230. }
  231. if form.BranchName == "" {
  232. log.Error("the branch must not be null!", form.BranchName)
  233. return errors.New("代码分支不能为空!")
  234. }
  235. return nil
  236. }
  237. func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) {
  238. displayJobName := form.DisplayJobName
  239. jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
  240. uuid := form.Attachment
  241. description := form.Description
  242. bootFile := strings.TrimSpace(form.BootFile)
  243. params := form.Params
  244. repo := ctx.Repo.Repository
  245. codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/"
  246. codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/"
  247. dataMinioPath := setting.Attachment.Minio.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid
  248. branchName := form.BranchName
  249. flavorName := form.FlavorName
  250. image := strings.TrimSpace(form.Image)
  251. if !jobNamePattern.MatchString(displayJobName) {
  252. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  253. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form)
  254. return
  255. }
  256. errStr := checkSpecialPool(ctx, "GPU")
  257. if errStr != "" {
  258. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  259. ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form)
  260. return
  261. }
  262. //check count limit
  263. count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.GPUResource)
  264. if err != nil {
  265. log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"])
  266. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  267. ctx.RenderWithErr("system error", tplGrampusTrainJobGPUNew, &form)
  268. return
  269. } else {
  270. if count >= 1 {
  271. log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
  272. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  273. ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobGPUNew, &form)
  274. return
  275. }
  276. }
  277. //check param
  278. if err := grampusParamCheckCreateTrainJob(form); err != nil {
  279. log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"])
  280. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  281. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobGPUNew, &form)
  282. return
  283. }
  284. //check whether the task name in the project is duplicated
  285. tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName)
  286. if err == nil {
  287. if len(tasks) != 0 {
  288. log.Error("the job name did already exist", ctx.Data["MsgID"])
  289. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  290. ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobGPUNew, &form)
  291. return
  292. }
  293. } else {
  294. if !models.IsErrJobNotExist(err) {
  295. log.Error("system error, %v", err, ctx.Data["MsgID"])
  296. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  297. ctx.RenderWithErr("system error", tplGrampusTrainJobGPUNew, &form)
  298. return
  299. }
  300. }
  301. //check dataset
  302. attachment, err := models.GetAttachmentByUUID(uuid)
  303. if err != nil {
  304. log.Error("GetAttachmentByUUID failed:", err.Error(), ctx.Data["MsgID"])
  305. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  306. ctx.RenderWithErr("dataset is not exist", tplGrampusTrainJobGPUNew, &form)
  307. return
  308. }
  309. //prepare code and out path
  310. _, err = ioutil.ReadDir(codeLocalPath)
  311. if err == nil {
  312. os.RemoveAll(codeLocalPath)
  313. }
  314. if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
  315. log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  316. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  317. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form)
  318. return
  319. }
  320. //todo: upload code (send to file_server todo this work?)
  321. //upload code
  322. if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil {
  323. log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  324. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  325. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form)
  326. return
  327. }
  328. modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/"
  329. if err := mkModelPath(modelPath); err != nil {
  330. log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  331. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  332. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form)
  333. return
  334. }
  335. //init model readme
  336. if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil {
  337. log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  338. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  339. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form)
  340. return
  341. }
  342. //prepare command
  343. command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", dataMinioPath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", attachment.Name)
  344. if err != nil {
  345. log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
  346. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  347. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form)
  348. return
  349. }
  350. commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)
  351. req := &grampus.GenerateTrainJobReq{
  352. JobName: jobName,
  353. DisplayJobName: displayJobName,
  354. ComputeResource: models.GPUResource,
  355. ProcessType: grampus.ProcessorTypeGPU,
  356. Command: command,
  357. ResourceSpecId: form.FlavorID,
  358. ImageUrl: image,
  359. Description: description,
  360. BootFile: bootFile,
  361. Uuid: uuid,
  362. CommitID: commitID,
  363. BranchName: branchName,
  364. Params: form.Params,
  365. FlavorName: flavorName,
  366. EngineName: image,
  367. DatasetName: attachment.Name,
  368. IsLatestVersion: modelarts.IsLatestVersion,
  369. VersionCount: modelarts.VersionCount,
  370. WorkServerNumber: 1,
  371. }
  372. err = grampus.GenerateTrainJob(ctx, req)
  373. if err != nil {
  374. log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"])
  375. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeGPU, form)
  376. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobGPUNew, &form)
  377. return
  378. }
  379. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job")
  380. }
  381. func checkSpecialPool(ctx *context.Context, resourceType string) string {
  382. grampus.InitSpecialPool()
  383. if grampus.SpecialPools != nil {
  384. for _, pool := range grampus.SpecialPools.Pools {
  385. if pool.IsExclusive && pool.Type == resourceType {
  386. org, _ := models.GetOrgByName(pool.Org)
  387. if org != nil {
  388. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  389. if !isOrgMember {
  390. return ctx.Tr("repo.grampus.no_operate_right")
  391. }
  392. }
  393. }
  394. }
  395. }
  396. return ""
  397. }
  398. func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) {
  399. displayJobName := form.DisplayJobName
  400. jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
  401. uuid := form.Attachment
  402. description := form.Description
  403. bootFile := strings.TrimSpace(form.BootFile)
  404. params := form.Params
  405. repo := ctx.Repo.Repository
  406. codeLocalPath := setting.JobPath + jobName + modelarts.CodePath
  407. codeObsPath := grampus.JobPath + jobName + modelarts.CodePath
  408. dataObsPath := setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + "/"
  409. branchName := form.BranchName
  410. isLatestVersion := modelarts.IsLatestVersion
  411. flavorName := form.FlavorName
  412. versionCount := modelarts.VersionCount
  413. engineName := form.EngineName
  414. if !jobNamePattern.MatchString(displayJobName) {
  415. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  416. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form)
  417. return
  418. }
  419. errStr := checkSpecialPool(ctx, "NPU")
  420. if errStr != "" {
  421. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  422. ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form)
  423. return
  424. }
  425. //check count limit
  426. count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.NPUResource)
  427. if err != nil {
  428. log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"])
  429. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  430. ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form)
  431. return
  432. } else {
  433. if count >= 1 {
  434. log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
  435. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  436. ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobNPUNew, &form)
  437. return
  438. }
  439. }
  440. //check param
  441. if err := grampusParamCheckCreateTrainJob(form); err != nil {
  442. log.Error("paramCheckCreateTrainJob failed:(%v)", err)
  443. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  444. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form)
  445. return
  446. }
  447. //check whether the task name in the project is duplicated
  448. tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName)
  449. if err == nil {
  450. if len(tasks) != 0 {
  451. log.Error("the job name did already exist", ctx.Data["MsgID"])
  452. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  453. ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobNPUNew, &form)
  454. return
  455. }
  456. } else {
  457. if !models.IsErrJobNotExist(err) {
  458. log.Error("system error, %v", err, ctx.Data["MsgID"])
  459. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  460. ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form)
  461. return
  462. }
  463. }
  464. //check dataset
  465. attachment, err := models.GetAttachmentByUUID(uuid)
  466. if err != nil {
  467. log.Error("GetAttachmentByUUID failed:", err.Error(), ctx.Data["MsgID"])
  468. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  469. ctx.RenderWithErr("dataset is not exist", tplGrampusTrainJobNPUNew, &form)
  470. return
  471. }
  472. //prepare code and out path
  473. _, err = ioutil.ReadDir(codeLocalPath)
  474. if err == nil {
  475. os.RemoveAll(codeLocalPath)
  476. }
  477. if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
  478. log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err)
  479. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  480. ctx.RenderWithErr("Create task failed, server timed out", tplGrampusTrainJobNPUNew, &form)
  481. return
  482. }
  483. //todo: upload code (send to file_server todo this work?)
  484. if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
  485. log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err)
  486. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  487. ctx.RenderWithErr("Failed to obsMkdir_output", tplGrampusTrainJobNPUNew, &form)
  488. return
  489. }
  490. if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil {
  491. log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err)
  492. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  493. ctx.RenderWithErr("Failed to uploadCodeToObs", tplGrampusTrainJobNPUNew, &form)
  494. return
  495. }
  496. //prepare command
  497. command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", dataObsPath+"'"+attachment.Name+"'", bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, attachment.Name)
  498. if err != nil {
  499. log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
  500. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  501. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobNPUNew, &form)
  502. return
  503. }
  504. commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)
  505. req := &grampus.GenerateTrainJobReq{
  506. JobName: jobName,
  507. DisplayJobName: displayJobName,
  508. ComputeResource: models.NPUResource,
  509. ProcessType: grampus.ProcessorTypeNPU,
  510. Command: command,
  511. ResourceSpecId: form.FlavorID,
  512. ImageId: form.ImageID,
  513. DataUrl: dataObsPath,
  514. Description: description,
  515. CodeObsPath: codeObsPath,
  516. BootFileUrl: codeObsPath + bootFile,
  517. BootFile: bootFile,
  518. WorkServerNumber: form.WorkServerNumber,
  519. Uuid: uuid,
  520. CommitID: commitID,
  521. IsLatestVersion: isLatestVersion,
  522. BranchName: branchName,
  523. Params: form.Params,
  524. FlavorName: flavorName,
  525. EngineName: engineName,
  526. VersionCount: versionCount,
  527. TotalVersionCount: modelarts.TotalVersionCount,
  528. DatasetName: attachment.Name,
  529. }
  530. err = grampus.GenerateTrainJob(ctx, req)
  531. if err != nil {
  532. log.Error("GenerateTrainJob failed:%v", err.Error())
  533. grampusTrainJobErrorPrepare(ctx, grampus.ProcessorTypeNPU, form)
  534. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form)
  535. return
  536. }
  537. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job")
  538. }
  539. func GrampusStopJob(ctx *context.Context) {
  540. var ID = ctx.Params(":jobid")
  541. var resultCode = "0"
  542. var errorMsg = ""
  543. var status = ""
  544. task := ctx.Cloudbrain
  545. for {
  546. if task.Status == string(models.GrampusStatusStopped) || task.Status == string(models.GrampusStatusFailed) || task.Status == string(models.GrampusStatusSucceeded) {
  547. log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"])
  548. resultCode = "-1"
  549. errorMsg = "system error"
  550. break
  551. }
  552. res, err := grampus.StopJob(task.JobID)
  553. if err != nil {
  554. log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  555. resultCode = strconv.Itoa(res.ErrorCode)
  556. errorMsg = res.ErrorMsg
  557. break
  558. }
  559. oldStatus := task.Status
  560. task.Status = string(models.GrampusStatusStopped)
  561. if task.EndTime == 0 {
  562. task.EndTime = timeutil.TimeStampNow()
  563. }
  564. task.ComputeAndSetDuration()
  565. if oldStatus != task.Status {
  566. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  567. }
  568. err = models.UpdateJob(task)
  569. if err != nil {
  570. log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  571. resultCode = "-1"
  572. errorMsg = "system error"
  573. break
  574. }
  575. status = task.Status
  576. break
  577. }
  578. ctx.JSON(200, map[string]interface{}{
  579. "result_code": resultCode,
  580. "error_msg": errorMsg,
  581. "status": status,
  582. "id": ID,
  583. "StatusOK": 0,
  584. })
  585. }
  586. func GrampusTrainJobDel(ctx *context.Context) {
  587. var listType = ctx.Query("listType")
  588. if err := deleteGrampusJob(ctx); err != nil {
  589. log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"])
  590. ctx.ServerError(err.Error(), err)
  591. return
  592. }
  593. var isAdminPage = ctx.Query("isadminpage")
  594. var isHomePage = ctx.Query("ishomepage")
  595. if ctx.IsUserSiteAdmin() && isAdminPage == "true" {
  596. ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains")
  597. } else if isHomePage == "true" {
  598. ctx.Redirect(setting.AppSubURL + "/cloudbrains")
  599. } else {
  600. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType)
  601. }
  602. }
  603. func deleteGrampusJob(ctx *context.Context) error {
  604. task := ctx.Cloudbrain
  605. if task.Status != string(models.GrampusStatusStopped) && task.Status != string(models.GrampusStatusSucceeded) && task.Status != string(models.GrampusStatusFailed) {
  606. log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"])
  607. return errors.New("the job has not been stopped")
  608. }
  609. err := models.DeleteJob(task)
  610. if err != nil {
  611. log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"])
  612. return err
  613. }
  614. storageType := models.TypeCloudBrainOne
  615. if task.ComputeResource == models.NPUResource {
  616. storageType = models.TypeCloudBrainTwo
  617. }
  618. deleteJobStorage(task.JobName, storageType)
  619. return nil
  620. }
  621. func GrampusTrainJobShow(ctx *context.Context) {
  622. ctx.Data["PageIsCloudBrain"] = true
  623. var task *models.Cloudbrain
  624. task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid"))
  625. if err != nil {
  626. log.Error("GetCloudbrainByJobID failed:" + err.Error())
  627. ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
  628. return
  629. }
  630. if task.DeletedAt.IsZero() { //normal record
  631. result, err := grampus.GetJob(task.JobID)
  632. if err != nil {
  633. log.Error("GetJob failed:" + err.Error())
  634. ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
  635. return
  636. }
  637. if result != nil {
  638. if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 {
  639. task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0]
  640. }
  641. oldStatus := task.Status
  642. task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status)
  643. if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning {
  644. task.Duration = result.JobInfo.RunSec
  645. task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)
  646. if task.StartTime == 0 && result.JobInfo.StartedAt > 0 {
  647. task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt)
  648. }
  649. if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 {
  650. task.EndTime = task.StartTime.Add(task.Duration)
  651. }
  652. task.CorrectCreateUnix()
  653. if oldStatus != task.Status {
  654. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  655. }
  656. err = models.UpdateJob(task)
  657. if err != nil {
  658. log.Error("UpdateJob failed:" + err.Error())
  659. }
  660. }
  661. }
  662. }
  663. if len(task.Parameters) > 0 {
  664. var parameters models.Parameters
  665. err := json.Unmarshal([]byte(task.Parameters), &parameters)
  666. if err != nil {
  667. log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err)
  668. ctx.ServerError("system error", err)
  669. return
  670. }
  671. if len(parameters.Parameter) > 0 {
  672. paramTemp := ""
  673. for _, Parameter := range parameters.Parameter {
  674. param := Parameter.Label + " = " + Parameter.Value + "; "
  675. paramTemp = paramTemp + param
  676. }
  677. task.Parameters = paramTemp[:len(paramTemp)-2]
  678. } else {
  679. task.Parameters = ""
  680. }
  681. }
  682. taskList := make([]*models.Cloudbrain, 0)
  683. taskList = append(taskList, task)
  684. ctx.Data["version_list_task"] = taskList
  685. ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task)
  686. ctx.Data["displayJobName"] = task.DisplayJobName
  687. aiCenterInfo := strings.Split(task.AiCenter, "+")
  688. if len(aiCenterInfo) == 2 {
  689. ctx.Data["ai_center"] = aiCenterInfo[1]
  690. }
  691. ctx.HTML(http.StatusOK, tplGrampusTrainJobShow)
  692. }
  693. func GrampusGetLog(ctx *context.Context) {
  694. jobID := ctx.Params(":jobid")
  695. job, err := models.GetCloudbrainByJobID(jobID)
  696. if err != nil {
  697. log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"])
  698. ctx.ServerError(err.Error(), err)
  699. return
  700. }
  701. content, err := grampus.GetTrainJobLog(job.JobID)
  702. if err != nil {
  703. log.Error("GetTrainJobLog failed: %v", err, ctx.Data["MsgID"])
  704. ctx.ServerError(err.Error(), err)
  705. return
  706. }
  707. ctx.JSON(http.StatusOK, map[string]interface{}{
  708. "JobName": job.JobName,
  709. "Content": content,
  710. })
  711. return
  712. }
  713. func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName string) (string, error) {
  714. var command string
  715. workDir := grampus.NpuWorkDir
  716. if processorType == grampus.ProcessorTypeGPU {
  717. workDir = grampus.GpuWorkDir
  718. }
  719. command += "pwd;cd " + workDir + grampus.CommandPrepareScript
  720. //download code & dataset
  721. if processorType == grampus.ProcessorTypeNPU {
  722. commandDownload := "./downloader_for_obs " + setting.Bucket + " " + codeRemotePath + " " + grampus.CodeArchiveName + " " + dataRemotePath + " '" + datasetName + "';"
  723. command += commandDownload
  724. } else if processorType == grampus.ProcessorTypeGPU {
  725. commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " " + dataRemotePath + " '" + datasetName + "';"
  726. command += commandDownload
  727. }
  728. //check download result
  729. commandCheckRes := "bash -c \"[[ $? -eq 0 ]] && exit 0 || exit -1;\";"
  730. command += commandCheckRes
  731. //unzip code & dataset
  732. toolUnzip := "unzip -q '"
  733. if strings.HasSuffix(datasetName, ".tar.gz") {
  734. toolUnzip = "tar -zxvf '"
  735. }
  736. commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + toolUnzip + datasetName + "';"
  737. command += commandUnzip
  738. //check unzip result
  739. commandCheckRes = "bash -c \"[[ $? -eq 0 ]] && exit 0 || exit -1;\";"
  740. command += commandCheckRes
  741. command += "echo \"unzip finished;start to exec code;\";"
  742. //exec code
  743. var parameters models.Parameters
  744. var paramCode string
  745. param := make([]models.Parameter, 0)
  746. if len(paramSrc) != 0 {
  747. err := json.Unmarshal([]byte(paramSrc), &parameters)
  748. if err != nil {
  749. log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err)
  750. return command, err
  751. }
  752. for _, parameter := range parameters.Parameter {
  753. param = append(param, models.Parameter{
  754. Label: parameter.Label,
  755. Value: parameter.Value,
  756. })
  757. paramCode += " --" + parameter.Label + "=" + parameter.Value
  758. }
  759. }
  760. commandCode := "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";"
  761. command += commandCode
  762. //get exec result
  763. commandGetRes := "result=$?;"
  764. command += commandGetRes
  765. //upload models
  766. if processorType == grampus.ProcessorTypeNPU {
  767. commandUpload := "cd " + workDir + "script_for_grampus/;./uploader_for_obs " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;"
  768. command += commandUpload
  769. } else if processorType == grampus.ProcessorTypeGPU {
  770. commandUpload := "cd " + workDir + "script_for_grampus/;./uploader_for_minio " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;"
  771. command += commandUpload
  772. }
  773. //check exec result
  774. commandCheckRes = "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1\""
  775. command += commandCheckRes
  776. return command, nil
  777. }
  778. func downloadZipCode(ctx *context.Context, codePath, branchName string) error {
  779. archiveType := git.ZIP
  780. archivePath := codePath
  781. if !com.IsDir(archivePath) {
  782. if err := os.MkdirAll(archivePath, os.ModePerm); err != nil {
  783. log.Error("MkdirAll failed:" + err.Error())
  784. return err
  785. }
  786. }
  787. // Get corresponding commit.
  788. var (
  789. commit *git.Commit
  790. err error
  791. )
  792. gitRepo := ctx.Repo.GitRepo
  793. if err != nil {
  794. log.Error("OpenRepository failed:" + err.Error())
  795. return err
  796. }
  797. if gitRepo.IsBranchExist(branchName) {
  798. commit, err = gitRepo.GetBranchCommit(branchName)
  799. if err != nil {
  800. log.Error("GetBranchCommit failed:" + err.Error())
  801. return err
  802. }
  803. }
  804. archivePath = path.Join(archivePath, grampus.CodeArchiveName)
  805. if !com.IsFile(archivePath) {
  806. if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{
  807. Format: archiveType,
  808. Prefix: setting.Repository.PrefixArchiveFiles,
  809. }); err != nil {
  810. log.Error("CreateArchive failed:" + err.Error())
  811. return err
  812. }
  813. }
  814. return nil
  815. }