You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 29 kB

3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
3 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883
  1. package repo
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "os"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "code.gitea.io/gitea/modules/auth"
  14. "code.gitea.io/gitea/modules/git"
  15. "code.gitea.io/gitea/modules/grampus"
  16. "code.gitea.io/gitea/modules/modelarts"
  17. "code.gitea.io/gitea/modules/notification"
  18. "code.gitea.io/gitea/modules/timeutil"
  19. "code.gitea.io/gitea/modules/util"
  20. "github.com/unknwon/com"
  21. "code.gitea.io/gitea/models"
  22. "code.gitea.io/gitea/modules/base"
  23. "code.gitea.io/gitea/modules/cloudbrain"
  24. "code.gitea.io/gitea/modules/context"
  25. "code.gitea.io/gitea/modules/log"
  26. "code.gitea.io/gitea/modules/setting"
  27. )
  28. const (
  29. tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show"
  30. //GPU
  31. tplGrampusTrainJobGPUNew base.TplName = "repo/grampus/trainjob/gpu/new"
  32. //NPU
  33. tplGrampusTrainJobNPUNew base.TplName = "repo/grampus/trainjob/npu/new"
  34. )
  35. func GrampusTrainJobGPUNew(ctx *context.Context) {
  36. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  37. err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  38. if err != nil {
  39. ctx.ServerError("get new train-job info failed", err)
  40. return
  41. }
  42. ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew)
  43. }
  44. func GrampusTrainJobNPUNew(ctx *context.Context) {
  45. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  46. err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  47. if err != nil {
  48. ctx.ServerError("get new train-job info failed", err)
  49. return
  50. }
  51. ctx.HTML(200, tplGrampusTrainJobNPUNew)
  52. }
  53. func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error {
  54. ctx.Data["PageIsCloudBrain"] = true
  55. t := time.Now()
  56. var displayJobName = jobNamePrefixValid(cutString(ctx.User.Name, 5)) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:]
  57. ctx.Data["display_job_name"] = displayJobName
  58. //get valid images
  59. images, err := grampus.GetImages(processType)
  60. if err != nil {
  61. log.Error("GetImages failed:", err.Error())
  62. } else {
  63. ctx.Data["images"] = images.Infos
  64. }
  65. grampus.InitSpecialPool()
  66. ctx.Data["GPUEnabled"] = true
  67. ctx.Data["NPUEnabled"] = true
  68. includeCenters := make(map[string]struct{})
  69. excludeCenters := make(map[string]struct{})
  70. if grampus.SpecialPools != nil {
  71. for _, pool := range grampus.SpecialPools.Pools {
  72. if pool.IsExclusive {
  73. if !IsUserInOrgPool(ctx.User.ID, pool) {
  74. ctx.Data[pool.Type+"Enabled"] = false
  75. }
  76. } else {
  77. if strings.Contains(strings.ToLower(processType), strings.ToLower(pool.Type)) {
  78. if IsUserInOrgPool(ctx.User.ID, pool) {
  79. for _, center := range pool.Pool {
  80. includeCenters[center.Queue] = struct{}{}
  81. }
  82. } else {
  83. for _, center := range pool.Pool {
  84. excludeCenters[center.Queue] = struct{}{}
  85. }
  86. }
  87. }
  88. }
  89. }
  90. }
  91. //get valid resource specs
  92. specs, err := grampus.GetResourceSpecs(processType)
  93. grampusSpecs := getFilterSpecBySpecialPool(specs, includeCenters, excludeCenters)
  94. if err != nil {
  95. log.Error("GetResourceSpecs failed:", err.Error())
  96. } else {
  97. ctx.Data["flavor_infos"] = grampusSpecs
  98. }
  99. //get branches
  100. branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0)
  101. if err != nil {
  102. log.Error("GetBranches error:", err.Error())
  103. } else {
  104. ctx.Data["branches"] = branches
  105. }
  106. ctx.Data["branchName"] = ctx.Repo.BranchName
  107. if processType == grampus.ProcessorTypeGPU {
  108. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  109. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.GPUResource, models.JobTypeTrain)
  110. ctx.Data["WaitCount"] = waitCount
  111. } else if processType == grampus.ProcessorTypeNPU {
  112. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  113. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeC2Net, models.NPUResource, models.JobTypeTrain)
  114. ctx.Data["WaitCount"] = waitCount
  115. }
  116. return nil
  117. }
  118. func getFilterSpecBySpecialPool(specs *models.GetGrampusResourceSpecsResult, includeCenters map[string]struct{}, excludeCenters map[string]struct{}) []models.GrampusSpec {
  119. if len(includeCenters) == 0 && len(excludeCenters) == 0 {
  120. return specs.Infos
  121. }
  122. var grampusSpecs []models.GrampusSpec
  123. for _, info := range specs.Infos {
  124. if isInIncludeCenters(info, includeCenters) || (len(excludeCenters) != 0 && isNotAllInExcludeCenters(info, excludeCenters)) {
  125. grampusSpecs = append(grampusSpecs, info)
  126. }
  127. }
  128. return grampusSpecs
  129. }
  130. func isInIncludeCenters(grampusSpec models.GrampusSpec, centers map[string]struct{}) bool {
  131. for _, center := range grampusSpec.Centers {
  132. if _, ok := centers[center.ID]; ok {
  133. return true
  134. }
  135. }
  136. return false
  137. }
  138. func isNotAllInExcludeCenters(grampusSpec models.GrampusSpec, centers map[string]struct{}) bool {
  139. for _, center := range grampusSpec.Centers {
  140. if _, ok := centers[center.ID]; !ok {
  141. return true
  142. }
  143. }
  144. return false
  145. }
  146. func IsUserInOrgPool(userId int64, pool *models.SpecialPool) bool {
  147. org, _ := models.GetOrgByName(pool.Org)
  148. if org != nil {
  149. isOrgMember, _ := models.IsOrganizationMember(org.ID, userId)
  150. return isOrgMember
  151. }
  152. return false
  153. }
  154. func grampusParamCheckCreateTrainJob(form auth.CreateGrampusTrainJobForm) error {
  155. if !strings.HasSuffix(strings.TrimSpace(form.BootFile), ".py") {
  156. log.Error("the boot file(%s) must be a python file", form.BootFile)
  157. return errors.New("启动文件必须是python文件")
  158. }
  159. if form.BranchName == "" {
  160. log.Error("the branch must not be null!", form.BranchName)
  161. return errors.New("代码分支不能为空!")
  162. }
  163. return nil
  164. }
  165. func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) {
  166. displayJobName := form.DisplayJobName
  167. jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
  168. uuid := form.Attachment
  169. description := form.Description
  170. bootFile := strings.TrimSpace(form.BootFile)
  171. params := form.Params
  172. repo := ctx.Repo.Repository
  173. codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/"
  174. codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/"
  175. dataMinioPath := setting.Attachment.Minio.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid
  176. branchName := form.BranchName
  177. flavorName := form.FlavorName
  178. image := strings.TrimSpace(form.Image)
  179. if !jobNamePattern.MatchString(displayJobName) {
  180. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  181. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form)
  182. return
  183. }
  184. bootFileExict, err := ctx.Repo.FileExists(bootFile, branchName)
  185. if err != nil || !bootFileExict {
  186. log.Error("Get bootfile error:", err, ctx.Data["MsgID"])
  187. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  188. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tplGrampusTrainJobGPUNew, &form)
  189. return
  190. }
  191. errStr := checkSpecialPool(ctx, "GPU")
  192. if errStr != "" {
  193. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  194. ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form)
  195. return
  196. }
  197. //check count limit
  198. count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.GPUResource)
  199. if err != nil {
  200. log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"])
  201. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  202. ctx.RenderWithErr("system error", tplGrampusTrainJobGPUNew, &form)
  203. return
  204. } else {
  205. if count >= 1 {
  206. log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
  207. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  208. ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobGPUNew, &form)
  209. return
  210. }
  211. }
  212. //check param
  213. if err := grampusParamCheckCreateTrainJob(form); err != nil {
  214. log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"])
  215. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  216. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobGPUNew, &form)
  217. return
  218. }
  219. //check whether the task name in the project is duplicated
  220. tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName)
  221. if err == nil {
  222. if len(tasks) != 0 {
  223. log.Error("the job name did already exist", ctx.Data["MsgID"])
  224. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  225. ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobGPUNew, &form)
  226. return
  227. }
  228. } else {
  229. if !models.IsErrJobNotExist(err) {
  230. log.Error("system error, %v", err, ctx.Data["MsgID"])
  231. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  232. ctx.RenderWithErr("system error", tplGrampusTrainJobGPUNew, &form)
  233. return
  234. }
  235. }
  236. //check dataset
  237. attachment, err := models.GetAttachmentByUUID(uuid)
  238. if err != nil {
  239. log.Error("GetAttachmentByUUID failed:", err.Error(), ctx.Data["MsgID"])
  240. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  241. ctx.RenderWithErr("dataset is not exist", tplGrampusTrainJobGPUNew, &form)
  242. return
  243. }
  244. //prepare code and out path
  245. _, err = ioutil.ReadDir(codeLocalPath)
  246. if err == nil {
  247. os.RemoveAll(codeLocalPath)
  248. }
  249. if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
  250. log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  251. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  252. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplGrampusTrainJobGPUNew, &form)
  253. return
  254. }
  255. //todo: upload code (send to file_server todo this work?)
  256. //upload code
  257. if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil {
  258. log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  259. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  260. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplGrampusTrainJobGPUNew, &form)
  261. return
  262. }
  263. modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/"
  264. if err := mkModelPath(modelPath); err != nil {
  265. log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  266. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  267. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplGrampusTrainJobGPUNew, &form)
  268. return
  269. }
  270. //init model readme
  271. if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil {
  272. log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
  273. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  274. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplGrampusTrainJobGPUNew, &form)
  275. return
  276. }
  277. //prepare command
  278. command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", dataMinioPath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", attachment.Name)
  279. if err != nil {
  280. log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
  281. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  282. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form)
  283. return
  284. }
  285. commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)
  286. req := &grampus.GenerateTrainJobReq{
  287. JobName: jobName,
  288. DisplayJobName: displayJobName,
  289. ComputeResource: models.GPUResource,
  290. ProcessType: grampus.ProcessorTypeGPU,
  291. Command: command,
  292. ResourceSpecId: form.FlavorID,
  293. ImageUrl: image,
  294. Description: description,
  295. BootFile: bootFile,
  296. Uuid: uuid,
  297. CommitID: commitID,
  298. BranchName: branchName,
  299. Params: form.Params,
  300. FlavorName: flavorName,
  301. EngineName: image,
  302. DatasetName: attachment.Name,
  303. IsLatestVersion: modelarts.IsLatestVersion,
  304. VersionCount: modelarts.VersionCount,
  305. WorkServerNumber: 1,
  306. }
  307. err = grampus.GenerateTrainJob(ctx, req)
  308. if err != nil {
  309. log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"])
  310. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU)
  311. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobGPUNew, &form)
  312. return
  313. }
  314. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job")
  315. }
  316. func checkSpecialPool(ctx *context.Context, resourceType string) string {
  317. grampus.InitSpecialPool()
  318. if grampus.SpecialPools != nil {
  319. for _, pool := range grampus.SpecialPools.Pools {
  320. if pool.IsExclusive && pool.Type == resourceType {
  321. org, _ := models.GetOrgByName(pool.Org)
  322. if org != nil {
  323. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  324. if !isOrgMember {
  325. return ctx.Tr("repo.grampus.no_operate_right")
  326. }
  327. }
  328. }
  329. }
  330. }
  331. return ""
  332. }
  333. func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) {
  334. displayJobName := form.DisplayJobName
  335. jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
  336. uuid := form.Attachment
  337. description := form.Description
  338. bootFile := strings.TrimSpace(form.BootFile)
  339. params := form.Params
  340. repo := ctx.Repo.Repository
  341. codeLocalPath := setting.JobPath + jobName + modelarts.CodePath
  342. codeObsPath := grampus.JobPath + jobName + modelarts.CodePath
  343. dataObsPath := setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + "/"
  344. branchName := form.BranchName
  345. isLatestVersion := modelarts.IsLatestVersion
  346. flavorName := form.FlavorName
  347. versionCount := modelarts.VersionCount
  348. engineName := form.EngineName
  349. if !jobNamePattern.MatchString(displayJobName) {
  350. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  351. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form)
  352. return
  353. }
  354. bootFileExict, err := ctx.Repo.FileExists(bootFile, branchName)
  355. if err != nil || !bootFileExict {
  356. log.Error("Get bootfile error:", err, ctx.Data["MsgID"])
  357. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  358. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tplGrampusTrainJobNPUNew, &form)
  359. return
  360. }
  361. errStr := checkSpecialPool(ctx, "NPU")
  362. if errStr != "" {
  363. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  364. ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form)
  365. return
  366. }
  367. //check count limit
  368. count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.NPUResource)
  369. if err != nil {
  370. log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"])
  371. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  372. ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form)
  373. return
  374. } else {
  375. if count >= 1 {
  376. log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
  377. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  378. ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobNPUNew, &form)
  379. return
  380. }
  381. }
  382. //check param
  383. if err := grampusParamCheckCreateTrainJob(form); err != nil {
  384. log.Error("paramCheckCreateTrainJob failed:(%v)", err)
  385. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  386. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form)
  387. return
  388. }
  389. //check whether the task name in the project is duplicated
  390. tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName)
  391. if err == nil {
  392. if len(tasks) != 0 {
  393. log.Error("the job name did already exist", ctx.Data["MsgID"])
  394. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  395. ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobNPUNew, &form)
  396. return
  397. }
  398. } else {
  399. if !models.IsErrJobNotExist(err) {
  400. log.Error("system error, %v", err, ctx.Data["MsgID"])
  401. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  402. ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form)
  403. return
  404. }
  405. }
  406. //check dataset
  407. attachment, err := models.GetAttachmentByUUID(uuid)
  408. if err != nil {
  409. log.Error("GetAttachmentByUUID failed:", err.Error(), ctx.Data["MsgID"])
  410. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  411. ctx.RenderWithErr("dataset is not exist", tplGrampusTrainJobNPUNew, &form)
  412. return
  413. }
  414. //prepare code and out path
  415. _, err = ioutil.ReadDir(codeLocalPath)
  416. if err == nil {
  417. os.RemoveAll(codeLocalPath)
  418. }
  419. if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
  420. log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err)
  421. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  422. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplGrampusTrainJobNPUNew, &form)
  423. return
  424. }
  425. //todo: upload code (send to file_server todo this work?)
  426. if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
  427. log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err)
  428. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  429. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplGrampusTrainJobNPUNew, &form)
  430. return
  431. }
  432. if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil {
  433. log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err)
  434. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  435. ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tplGrampusTrainJobNPUNew, &form)
  436. return
  437. }
  438. //prepare command
  439. command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", dataObsPath+"'"+attachment.Name+"'", bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, attachment.Name)
  440. if err != nil {
  441. log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
  442. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  443. ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobNPUNew, &form)
  444. return
  445. }
  446. commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)
  447. req := &grampus.GenerateTrainJobReq{
  448. JobName: jobName,
  449. DisplayJobName: displayJobName,
  450. ComputeResource: models.NPUResource,
  451. ProcessType: grampus.ProcessorTypeNPU,
  452. Command: command,
  453. ResourceSpecId: form.FlavorID,
  454. ImageId: form.ImageID,
  455. DataUrl: dataObsPath,
  456. Description: description,
  457. CodeObsPath: codeObsPath,
  458. BootFileUrl: codeObsPath + bootFile,
  459. BootFile: bootFile,
  460. WorkServerNumber: form.WorkServerNumber,
  461. Uuid: uuid,
  462. CommitID: commitID,
  463. IsLatestVersion: isLatestVersion,
  464. BranchName: branchName,
  465. Params: form.Params,
  466. FlavorName: flavorName,
  467. EngineName: engineName,
  468. VersionCount: versionCount,
  469. TotalVersionCount: modelarts.TotalVersionCount,
  470. DatasetName: attachment.Name,
  471. }
  472. err = grampus.GenerateTrainJob(ctx, req)
  473. if err != nil {
  474. log.Error("GenerateTrainJob failed:%v", err.Error())
  475. grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU)
  476. ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form)
  477. return
  478. }
  479. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job")
  480. }
  481. func GrampusStopJob(ctx *context.Context) {
  482. var ID = ctx.Params(":jobid")
  483. var resultCode = "0"
  484. var errorMsg = ""
  485. var status = ""
  486. task := ctx.Cloudbrain
  487. for {
  488. if task.Status == string(models.GrampusStatusStopped) || task.Status == string(models.GrampusStatusFailed) || task.Status == string(models.GrampusStatusSucceeded) {
  489. log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"])
  490. resultCode = "-1"
  491. errorMsg = "system error"
  492. break
  493. }
  494. res, err := grampus.StopJob(task.JobID)
  495. if err != nil {
  496. log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  497. resultCode = strconv.Itoa(res.ErrorCode)
  498. errorMsg = res.ErrorMsg
  499. break
  500. }
  501. oldStatus := task.Status
  502. task.Status = string(models.GrampusStatusStopped)
  503. if task.EndTime == 0 {
  504. task.EndTime = timeutil.TimeStampNow()
  505. }
  506. task.ComputeAndSetDuration()
  507. if oldStatus != task.Status {
  508. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  509. }
  510. err = models.UpdateJob(task)
  511. if err != nil {
  512. log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  513. resultCode = "-1"
  514. errorMsg = "system error"
  515. break
  516. }
  517. status = task.Status
  518. break
  519. }
  520. ctx.JSON(200, map[string]interface{}{
  521. "result_code": resultCode,
  522. "error_msg": errorMsg,
  523. "status": status,
  524. "id": ID,
  525. "StatusOK": 0,
  526. })
  527. }
  528. func GrampusTrainJobDel(ctx *context.Context) {
  529. var listType = ctx.Query("listType")
  530. if err := deleteGrampusJob(ctx); err != nil {
  531. log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"])
  532. ctx.ServerError(err.Error(), err)
  533. return
  534. }
  535. var isAdminPage = ctx.Query("isadminpage")
  536. var isHomePage = ctx.Query("ishomepage")
  537. if ctx.IsUserSiteAdmin() && isAdminPage == "true" {
  538. ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains")
  539. } else if isHomePage == "true" {
  540. ctx.Redirect(setting.AppSubURL + "/cloudbrains")
  541. } else {
  542. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType)
  543. }
  544. }
  545. func deleteGrampusJob(ctx *context.Context) error {
  546. task := ctx.Cloudbrain
  547. if task.Status != string(models.GrampusStatusStopped) && task.Status != string(models.GrampusStatusSucceeded) && task.Status != string(models.GrampusStatusFailed) {
  548. log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"])
  549. return errors.New("the job has not been stopped")
  550. }
  551. err := models.DeleteJob(task)
  552. if err != nil {
  553. log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"])
  554. return err
  555. }
  556. storageType := models.TypeCloudBrainOne
  557. if task.ComputeResource == models.NPUResource {
  558. storageType = models.TypeCloudBrainTwo
  559. }
  560. deleteJobStorage(task.JobName, storageType)
  561. return nil
  562. }
  563. func GrampusTrainJobShow(ctx *context.Context) {
  564. ctx.Data["PageIsCloudBrain"] = true
  565. var task *models.Cloudbrain
  566. task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid"))
  567. if err != nil {
  568. log.Error("GetCloudbrainByJobID failed:" + err.Error())
  569. ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
  570. return
  571. }
  572. if task.DeletedAt.IsZero() { //normal record
  573. result, err := grampus.GetJob(task.JobID)
  574. if err != nil {
  575. log.Error("GetJob failed:" + err.Error())
  576. ctx.NotFound(ctx.Req.URL.RequestURI(), nil)
  577. return
  578. }
  579. if result != nil {
  580. if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 {
  581. task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0]
  582. }
  583. oldStatus := task.Status
  584. task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status)
  585. if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning {
  586. task.Duration = result.JobInfo.RunSec
  587. task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)
  588. if task.StartTime == 0 && result.JobInfo.StartedAt > 0 {
  589. task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt)
  590. }
  591. if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 {
  592. task.EndTime = task.StartTime.Add(task.Duration)
  593. }
  594. task.CorrectCreateUnix()
  595. if oldStatus != task.Status {
  596. notification.NotifyChangeCloudbrainStatus(task, oldStatus)
  597. }
  598. err = models.UpdateJob(task)
  599. if err != nil {
  600. log.Error("UpdateJob failed:" + err.Error())
  601. }
  602. }
  603. }
  604. }
  605. if len(task.Parameters) > 0 {
  606. var parameters models.Parameters
  607. err := json.Unmarshal([]byte(task.Parameters), &parameters)
  608. if err != nil {
  609. log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err)
  610. ctx.ServerError("system error", err)
  611. return
  612. }
  613. if len(parameters.Parameter) > 0 {
  614. paramTemp := ""
  615. for _, Parameter := range parameters.Parameter {
  616. param := Parameter.Label + " = " + Parameter.Value + "; "
  617. paramTemp = paramTemp + param
  618. }
  619. task.Parameters = paramTemp[:len(paramTemp)-2]
  620. } else {
  621. task.Parameters = ""
  622. }
  623. }
  624. taskList := make([]*models.Cloudbrain, 0)
  625. taskList = append(taskList, task)
  626. ctx.Data["version_list_task"] = taskList
  627. ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, false)
  628. ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task)
  629. ctx.Data["displayJobName"] = task.DisplayJobName
  630. aiCenterInfo := strings.Split(task.AiCenter, "+")
  631. if len(aiCenterInfo) == 2 {
  632. ctx.Data["ai_center"] = aiCenterInfo[1]
  633. }
  634. ctx.HTML(http.StatusOK, tplGrampusTrainJobShow)
  635. }
  636. func GrampusGetLog(ctx *context.Context) {
  637. jobID := ctx.Params(":jobid")
  638. job, err := models.GetCloudbrainByJobID(jobID)
  639. if err != nil {
  640. log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"])
  641. ctx.ServerError(err.Error(), err)
  642. return
  643. }
  644. content, err := grampus.GetTrainJobLog(job.JobID)
  645. if err != nil {
  646. log.Error("GetTrainJobLog failed: %v", err, ctx.Data["MsgID"])
  647. ctx.ServerError(err.Error(), err)
  648. return
  649. }
  650. ctx.JSON(http.StatusOK, map[string]interface{}{
  651. "JobName": job.JobName,
  652. "Content": content,
  653. })
  654. return
  655. }
  656. func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName string) (string, error) {
  657. var command string
  658. workDir := grampus.NpuWorkDir
  659. if processorType == grampus.ProcessorTypeGPU {
  660. workDir = grampus.GpuWorkDir
  661. }
  662. command += "pwd;cd " + workDir + grampus.CommandPrepareScript
  663. //download code & dataset
  664. if processorType == grampus.ProcessorTypeNPU {
  665. commandDownload := "./downloader_for_obs " + setting.Bucket + " " + codeRemotePath + " " + grampus.CodeArchiveName + " " + dataRemotePath + " '" + datasetName + "';"
  666. command += commandDownload
  667. } else if processorType == grampus.ProcessorTypeGPU {
  668. commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " " + dataRemotePath + " '" + datasetName + "';"
  669. command += commandDownload
  670. }
  671. //check download result
  672. commandCheckRes := "bash -c \"[[ $? -eq 0 ]] && exit 0 || exit -1;\";"
  673. command += commandCheckRes
  674. //unzip code & dataset
  675. toolUnzip := "unzip -q '"
  676. if strings.HasSuffix(datasetName, ".tar.gz") {
  677. toolUnzip = "tar -zxvf '"
  678. }
  679. commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + toolUnzip + datasetName + "';"
  680. command += commandUnzip
  681. //check unzip result
  682. commandCheckRes = "bash -c \"[[ $? -eq 0 ]] && exit 0 || exit -1;\";"
  683. command += commandCheckRes
  684. command += "echo \"unzip finished;start to exec code;\";"
  685. //exec code
  686. var parameters models.Parameters
  687. var paramCode string
  688. param := make([]models.Parameter, 0)
  689. if len(paramSrc) != 0 {
  690. err := json.Unmarshal([]byte(paramSrc), &parameters)
  691. if err != nil {
  692. log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err)
  693. return command, err
  694. }
  695. for _, parameter := range parameters.Parameter {
  696. param = append(param, models.Parameter{
  697. Label: parameter.Label,
  698. Value: parameter.Value,
  699. })
  700. paramCode += " --" + parameter.Label + "=" + parameter.Value
  701. }
  702. }
  703. var commandCode string
  704. if processorType == grampus.ProcessorTypeNPU {
  705. commandCode = "/bin/bash /home/work/run_train_for_openi.sh " + workDir + "code/" + strings.ToLower(repoName) + "/" + bootFile + " /tmp/log/train.log" + paramCode + ";"
  706. } else if processorType == grampus.ProcessorTypeGPU {
  707. commandCode = "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";"
  708. }
  709. command += commandCode
  710. //get exec result
  711. commandGetRes := "result=$?;"
  712. command += commandGetRes
  713. //upload models
  714. if processorType == grampus.ProcessorTypeNPU {
  715. commandUpload := "cd " + workDir + "script_for_grampus/;./uploader_for_obs " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;"
  716. command += commandUpload
  717. } else if processorType == grampus.ProcessorTypeGPU {
  718. commandUpload := "cd " + workDir + "script_for_grampus/;./uploader_for_minio " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;"
  719. command += commandUpload
  720. }
  721. //check exec result
  722. commandCheckRes = "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1\""
  723. command += commandCheckRes
  724. return command, nil
  725. }
  726. func downloadZipCode(ctx *context.Context, codePath, branchName string) error {
  727. archiveType := git.ZIP
  728. archivePath := codePath
  729. if !com.IsDir(archivePath) {
  730. if err := os.MkdirAll(archivePath, os.ModePerm); err != nil {
  731. log.Error("MkdirAll failed:" + err.Error())
  732. return err
  733. }
  734. }
  735. // Get corresponding commit.
  736. var (
  737. commit *git.Commit
  738. err error
  739. )
  740. gitRepo := ctx.Repo.GitRepo
  741. if err != nil {
  742. log.Error("OpenRepository failed:" + err.Error())
  743. return err
  744. }
  745. if gitRepo.IsBranchExist(branchName) {
  746. commit, err = gitRepo.GetBranchCommit(branchName)
  747. if err != nil {
  748. log.Error("GetBranchCommit failed:" + err.Error())
  749. return err
  750. }
  751. } else {
  752. log.Error("the branch is not exist: " + branchName)
  753. return fmt.Errorf("The branch does not exist.")
  754. }
  755. archivePath = path.Join(archivePath, grampus.CodeArchiveName)
  756. if !com.IsFile(archivePath) {
  757. if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{
  758. Format: archiveType,
  759. Prefix: setting.Repository.PrefixArchiveFiles,
  760. }); err != nil {
  761. log.Error("CreateArchive failed:" + err.Error())
  762. return err
  763. }
  764. }
  765. return nil
  766. }