You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aisafety.go 35 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package repo
  2. import (
  3. "bufio"
  4. "code.gitea.io/gitea/services/lock"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "os"
  12. "strconv"
  13. "strings"
  14. cloudbrainService "code.gitea.io/gitea/services/cloudbrain"
  15. "code.gitea.io/gitea/models"
  16. "code.gitea.io/gitea/modules/aisafety"
  17. "code.gitea.io/gitea/modules/cloudbrain"
  18. "code.gitea.io/gitea/modules/context"
  19. "code.gitea.io/gitea/modules/git"
  20. "code.gitea.io/gitea/modules/log"
  21. "code.gitea.io/gitea/modules/modelarts"
  22. "code.gitea.io/gitea/modules/setting"
  23. "code.gitea.io/gitea/modules/storage"
  24. "code.gitea.io/gitea/modules/timeutil"
  25. "code.gitea.io/gitea/modules/util"
  26. "code.gitea.io/gitea/services/cloudbrain/resource"
  27. "code.gitea.io/gitea/services/reward/point/account"
  28. )
  29. const (
  30. tplModelSafetyTestCreateGrampusGpu = "repo/modelsafety/newgrampusgpu"
  31. tplModelSafetyTestCreateGrampusNpu = "repo/modelsafety/newgrampusnpu"
  32. tplModelSafetyTestCreateGpu = "repo/modelsafety/newgpu"
  33. tplModelSafetyTestCreateNpu = "repo/modelsafety/newnpu"
  34. tplModelSafetyTestShow = "repo/modelsafety/show"
  35. )
  36. func GetAiSafetyTaskByJob(job *models.Cloudbrain) {
  37. if job == nil {
  38. log.Error("GetCloudbrainByJobID failed")
  39. return
  40. }
  41. syncAiSafetyTaskStatus(job)
  42. }
  43. func GetAiSafetyTaskTmpl(ctx *context.Context) {
  44. ctx.Data["id"] = ctx.Params(":id")
  45. ctx.Data["PageIsCloudBrain"] = true
  46. ctx.HTML(200, tplModelSafetyTestShow)
  47. }
  48. func GetAiSafetyTask(ctx *context.Context) {
  49. var ID = ctx.Params(":id")
  50. job, err := models.GetCloudbrainByIDWithDeleted(ID)
  51. if err != nil {
  52. log.Error("GetCloudbrainByJobID failed:" + err.Error())
  53. return
  54. }
  55. syncAiSafetyTaskStatus(job)
  56. job, err = models.GetCloudbrainByIDWithDeleted(ID)
  57. job.BenchmarkType = "安全评测"
  58. job.BenchmarkTypeName = "Image Classification"
  59. job.CanModify = cloudbrain.CanModifyJob(ctx, job)
  60. job.CanDel = cloudbrain.CanDeleteJob(ctx, job)
  61. if job.Parameters == "{\"parameter\":[]}" {
  62. job.Parameters = ""
  63. }
  64. s, err := resource.GetCloudbrainSpec(job.ID)
  65. if err == nil {
  66. job.Spec = s
  67. }
  68. user, err := models.GetUserByID(job.UserID)
  69. if err == nil {
  70. tmpUser := &models.User{
  71. Name: user.Name,
  72. }
  73. job.User = tmpUser
  74. }
  75. ctx.JSON(200, job)
  76. }
  77. func StopAiSafetyTask(ctx *context.Context) {
  78. log.Info("start to stop the task.")
  79. var ID = ctx.Params(":id")
  80. task, err := models.GetCloudbrainByIDWithDeleted(ID)
  81. result := make(map[string]interface{})
  82. result["result_code"] = "-1"
  83. if err != nil {
  84. log.Info("query task error.err=" + err.Error())
  85. log.Error("GetCloudbrainByJobID failed:" + err.Error())
  86. result["msg"] = "No such task."
  87. ctx.JSON(200, result)
  88. return
  89. }
  90. if isTaskNotFinished(task.Status) {
  91. if task.Type == models.TypeCloudBrainTwo {
  92. log.Info("start to stop model arts task.")
  93. _, err := modelarts.StopTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10))
  94. if err != nil {
  95. log.Info("stop failed.err=" + err.Error())
  96. }
  97. task.Status = string(models.JobStopped)
  98. if task.EndTime == 0 {
  99. task.EndTime = timeutil.TimeStampNow()
  100. }
  101. task.ComputeAndSetDuration()
  102. err = models.UpdateJob(task)
  103. if err != nil {
  104. log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  105. result["msg"] = "cloudbrain.Stopped_success_update_status_fail"
  106. ctx.JSON(200, result)
  107. return
  108. }
  109. //queryTaskStatusFromCloudbrainTwo(job)
  110. } else if task.Type == models.TypeCloudBrainOne {
  111. if task.Status == string(models.JobStopped) || task.Status == string(models.JobFailed) || task.Status == string(models.JobSucceeded) {
  112. log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"])
  113. result["msg"] = "cloudbrain.Already_stopped"
  114. ctx.JSON(200, result)
  115. return
  116. }
  117. err := cloudbrain.StopJob(task.JobID)
  118. if err != nil {
  119. log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  120. result["msg"] = "cloudbrain.Stopped_failed"
  121. ctx.JSON(200, result)
  122. return
  123. }
  124. task.Status = string(models.JobStopped)
  125. if task.EndTime == 0 {
  126. task.EndTime = timeutil.TimeStampNow()
  127. }
  128. task.ComputeAndSetDuration()
  129. err = models.UpdateJob(task)
  130. if err != nil {
  131. log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  132. result["msg"] = "cloudbrain.Stopped_success_update_status_fail"
  133. ctx.JSON(200, result)
  134. return
  135. }
  136. }
  137. } else {
  138. if task.Status == string(models.ModelSafetyTesting) {
  139. //修改为Failed
  140. task.Status = string(models.JobStopped)
  141. if task.EndTime == 0 {
  142. task.EndTime = timeutil.TimeStampNow()
  143. }
  144. task.ComputeAndSetDuration()
  145. err = models.UpdateJob(task)
  146. if err != nil {
  147. log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"])
  148. result["msg"] = "cloudbrain.Stopped_success_update_status_fail"
  149. ctx.JSON(200, result)
  150. return
  151. }
  152. } else {
  153. log.Info("The job is finished. status=" + task.Status)
  154. }
  155. }
  156. result["result_code"] = "0"
  157. result["msg"] = "succeed"
  158. ctx.JSON(200, result)
  159. }
  160. func DelAiSafetyTask(ctx *context.Context) {
  161. var ID = ctx.Params(":id")
  162. task, err := models.GetCloudbrainByIDWithDeleted(ID)
  163. if err != nil {
  164. log.Error("GetCloudbrainByJobID failed:" + err.Error())
  165. ctx.ServerError("No such task.", err)
  166. return
  167. }
  168. if task.Status != string(models.JobStopped) && task.Status != string(models.JobFailed) && task.Status != string(models.JobSucceeded) {
  169. log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"])
  170. ctx.ServerError("the job("+task.JobName+") has not been stopped", nil)
  171. return
  172. }
  173. if task.Type == models.TypeCloudBrainOne {
  174. DeleteCloudbrainJobStorage(task.JobName, models.TypeCloudBrainOne)
  175. }
  176. err = models.DeleteJob(task)
  177. if err != nil {
  178. ctx.ServerError(err.Error(), err)
  179. return
  180. }
  181. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain/benchmark")
  182. }
  183. func syncAiSafetyTaskStatus(job *models.Cloudbrain) {
  184. log.Info("start to query safety task status.")
  185. if isTaskNotFinished(job.Status) {
  186. if job.Type == models.TypeCloudBrainTwo {
  187. queryTaskStatusFromCloudbrainTwo(job)
  188. } else if job.Type == models.TypeCloudBrainOne {
  189. queryTaskStatusFromCloudbrain(job)
  190. }
  191. } else {
  192. if job.Status == string(models.ModelSafetyTesting) {
  193. queryTaskStatusFromModelSafetyTestServer(job)
  194. } else {
  195. log.Info("The job is finished. status=" + job.Status)
  196. }
  197. }
  198. }
  199. func TimerHandleModelSafetyTestTask() {
  200. log.Info("start to TimerHandleModelSafetyTestTask")
  201. tasks, err := models.GetModelSafetyTestTask()
  202. if err == nil {
  203. if tasks != nil && len(tasks) > 0 {
  204. for _, job := range tasks {
  205. syncAiSafetyTaskStatus(job)
  206. }
  207. } else {
  208. log.Info("query running model safety test task 0.")
  209. }
  210. } else {
  211. log.Info("query running model safety test task err." + err.Error())
  212. }
  213. }
  214. func queryTaskStatusFromCloudbrainTwo(job *models.Cloudbrain) {
  215. log.Info("The task not finished,name=" + job.DisplayJobName)
  216. result, err := modelarts.GetTrainJob(job.JobID, strconv.FormatInt(job.VersionID, 10))
  217. if err != nil {
  218. log.Info("query train job error." + err.Error())
  219. return
  220. }
  221. job.Status = modelarts.TransTrainJobStatus(result.IntStatus)
  222. job.Duration = result.Duration / 1000
  223. job.TrainJobDuration = result.TrainJobDuration
  224. if job.StartTime == 0 && result.StartTime > 0 {
  225. job.StartTime = timeutil.TimeStamp(result.StartTime / 1000)
  226. }
  227. job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)
  228. if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 {
  229. job.EndTime = job.StartTime.Add(job.Duration)
  230. }
  231. job.CorrectCreateUnix()
  232. if job.Status != string(models.ModelArtsTrainJobCompleted) {
  233. log.Info("CloudbrainTwo task status=" + job.Status)
  234. err = models.UpdateJob(job)
  235. if err != nil {
  236. log.Error("UpdateJob failed:", err)
  237. }
  238. } else {
  239. log.Info("start to deal ModelSafetyTesting, task status=" + job.Status)
  240. job.Status = string(models.ModelSafetyTesting)
  241. err = models.UpdateJob(job)
  242. if err != nil {
  243. log.Error("UpdateJob failed:", err)
  244. }
  245. //send msg to beihang
  246. sendNPUInferenceResultToTest(job)
  247. }
  248. }
  249. func queryTaskStatusFromCloudbrain(job *models.Cloudbrain) {
  250. log.Info("The task not finished,name=" + job.DisplayJobName)
  251. jobResult, err := cloudbrain.GetJob(job.JobID)
  252. result, err := models.ConvertToJobResultPayload(jobResult.Payload)
  253. if err != nil {
  254. log.Error("ConvertToJobResultPayload failed:", err)
  255. return
  256. }
  257. job.Status = result.JobStatus.State
  258. if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) {
  259. taskRoles := result.TaskRoles
  260. taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
  261. job.Status = taskRes.TaskStatuses[0].State
  262. }
  263. models.ParseAndSetDurationFromCloudBrainOne(result, job)
  264. //updateCloudBrainOneJobTime(job)
  265. log.Info("cloud brain one job status=" + job.Status)
  266. if result.JobStatus.State != string(models.JobSucceeded) {
  267. err = models.UpdateJob(job)
  268. if err != nil {
  269. log.Error("UpdateJob failed:", err)
  270. }
  271. } else {
  272. //
  273. job.Status = string(models.ModelSafetyTesting)
  274. job.EndTime = 0
  275. err = models.UpdateJob(job)
  276. if err != nil {
  277. log.Error("UpdateJob failed:", err)
  278. }
  279. //send msg to beihang
  280. sendGPUInferenceResultToTest(job)
  281. }
  282. }
  283. func queryTaskStatusFromModelSafetyTestServer(job *models.Cloudbrain) {
  284. result, err := aisafety.GetTaskStatus(job.PreVersionName)
  285. if err == nil {
  286. if result.Code == "0" {
  287. if result.Data.Status == 1 {
  288. log.Info("The task is running....")
  289. } else {
  290. job.EndTime = timeutil.TimeStampNow()
  291. job.Duration = (job.EndTime.AsTime().Unix() - job.StartTime.AsTime().Unix()) / 1000
  292. job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)
  293. if result.Data.Code == 0 {
  294. job.ResultJson = result.Data.StandardJson
  295. job.Status = string(models.JobSucceeded)
  296. err = models.UpdateJob(job)
  297. if err != nil {
  298. log.Error("UpdateJob failed:", err)
  299. }
  300. } else {
  301. job.ResultJson = result.Data.Msg
  302. job.Status = string(models.JobFailed)
  303. err = models.UpdateJob(job)
  304. if err != nil {
  305. log.Error("UpdateJob failed:", err)
  306. }
  307. }
  308. }
  309. } else {
  310. log.Info("The task is failed.")
  311. job.Status = string(models.JobFailed)
  312. err = models.UpdateJob(job)
  313. if err != nil {
  314. log.Error("UpdateJob failed:", err)
  315. }
  316. }
  317. } else {
  318. log.Info("The task not found.....")
  319. }
  320. }
  321. func getAisafetyTaskReq(job *models.Cloudbrain) aisafety.TaskReq {
  322. datasetname := job.DatasetName
  323. datasetnames := strings.Split(datasetname, ";")
  324. indicator := job.LabelName
  325. EvalContent := "test1"
  326. if job.Description != "" {
  327. EvalContent = job.Description
  328. }
  329. req := aisafety.TaskReq{
  330. UnionId: job.JobID,
  331. EvalName: job.DisplayJobName,
  332. EvalContent: EvalContent,
  333. TLPath: "test1",
  334. Indicators: strings.Split(indicator, ";"),
  335. CDName: strings.Split(datasetnames[1], ".")[0],
  336. BDName: strings.Split(datasetnames[0], ".")[0] + "基础数据集",
  337. }
  338. log.Info("CDName=" + req.CDName)
  339. log.Info("BDName=" + req.BDName)
  340. return req
  341. }
  342. func sendGPUInferenceResultToTest(job *models.Cloudbrain) {
  343. log.Info("send sendGPUInferenceResultToTest")
  344. req := getAisafetyTaskReq(job)
  345. resultDir := "/result"
  346. prefix := setting.CBCodePathPrefix + job.JobName + resultDir
  347. files, err := storage.GetOneLevelAllObjectUnderDirMinio(setting.Attachment.Minio.Bucket, prefix, "")
  348. if err != nil {
  349. log.Error("query cloudbrain one model failed: %v", err)
  350. return
  351. }
  352. jsonContent := ""
  353. for _, file := range files {
  354. if strings.HasSuffix(file.FileName, "result.json") {
  355. path := storage.GetMinioPath(job.JobName+resultDir+"/", file.FileName)
  356. log.Info("path=" + path)
  357. reader, err := os.Open(path)
  358. defer reader.Close()
  359. if err == nil {
  360. r := bufio.NewReader(reader)
  361. for {
  362. line, error := r.ReadString('\n')
  363. jsonContent += line
  364. if error == io.EOF {
  365. log.Info("read file completed.")
  366. break
  367. }
  368. if error != nil {
  369. log.Info("read file error." + error.Error())
  370. break
  371. }
  372. }
  373. }
  374. break
  375. }
  376. }
  377. if jsonContent != "" {
  378. sendHttpReqToBeihang(job, jsonContent, req)
  379. } else {
  380. updateJobFailed(job, "推理生成的Json数据为空,无法进行评测。")
  381. }
  382. }
  383. func sendNPUInferenceResultToTest(job *models.Cloudbrain) {
  384. log.Info("start to sendNPUInferenceResultToTest")
  385. req := getAisafetyTaskReq(job)
  386. jsonContent := ""
  387. VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount)
  388. resultPath := modelarts.JobPath + job.JobName + modelarts.ResultPath + VersionOutputPath + "/result.json"
  389. resultPath = resultPath[1:]
  390. log.Info("bucket=" + setting.Bucket + " resultPath=" + resultPath)
  391. body, err := storage.ObsDownloadAFile(setting.Bucket, resultPath)
  392. if err != nil {
  393. log.Info("ObsDownloadAFile error." + err.Error() + " resultPath=" + resultPath)
  394. } else {
  395. defer body.Close()
  396. var data []byte
  397. p := make([]byte, 4096)
  398. var readErr error
  399. var readCount int
  400. for {
  401. readCount, readErr = body.Read(p)
  402. if readCount > 0 {
  403. data = append(data, p[:readCount]...)
  404. }
  405. if readErr != nil || readCount == 0 {
  406. break
  407. }
  408. }
  409. jsonContent = string(data)
  410. }
  411. if jsonContent != "" {
  412. sendHttpReqToBeihang(job, jsonContent, req)
  413. } else {
  414. updateJobFailed(job, "推理生成的Json数据为空,无法进行评测。")
  415. }
  416. }
  417. func updateJobFailed(job *models.Cloudbrain, msg string) {
  418. log.Info("The json is null. so set it failed.")
  419. //update task failed.
  420. job.Status = string(models.ModelArtsTrainJobFailed)
  421. job.ResultJson = msg
  422. job.EndTime = timeutil.TimeStampNow()
  423. job.Duration = (job.EndTime.AsTime().Unix() - job.StartTime.AsTime().Unix()) / 1000
  424. job.TrainJobDuration = models.ConvertDurationToStr(job.Duration)
  425. err := models.UpdateJob(job)
  426. if err != nil {
  427. log.Error("UpdateJob failed:", err)
  428. }
  429. }
  430. func sendHttpReqToBeihang(job *models.Cloudbrain, jsonContent string, req aisafety.TaskReq) {
  431. log.Info("start to send beihang ...")
  432. serialNo, err := aisafety.CreateSafetyTask(req, jsonContent)
  433. if err == nil {
  434. //update serial no to db
  435. job.PreVersionName = serialNo
  436. err = models.UpdateJob(job)
  437. if err != nil {
  438. log.Error("UpdateJob failed:", err)
  439. }
  440. }
  441. }
  442. func isTaskNotFinished(status string) bool {
  443. if status == string(models.ModelArtsTrainJobRunning) || status == string(models.ModelArtsTrainJobWaiting) {
  444. return true
  445. }
  446. if status == string(models.JobWaiting) || status == string(models.JobRunning) {
  447. return true
  448. }
  449. if status == string(models.ModelArtsTrainJobUnknown) || status == string(models.ModelArtsTrainJobInit) {
  450. return true
  451. }
  452. if status == string(models.ModelArtsTrainJobImageCreating) || status == string(models.ModelArtsTrainJobSubmitTrying) {
  453. return true
  454. }
  455. return false
  456. }
  457. func AiSafetyCreateForGetGPU(ctx *context.Context) {
  458. ctx.Data["PageIsCloudBrain"] = true
  459. ctx.Data["IsCreate"] = true
  460. ctx.Data["type"] = models.TypeCloudBrainOne
  461. ctx.Data["compute_resource"] = models.GPUResource
  462. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  463. ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.GPUBaseDataSetName
  464. ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.GPUBaseDataSetUUID
  465. ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.GPUCombatDataSetName
  466. ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.GPUCombatDataSetUUID
  467. log.Info("GPUBaseDataSetName=" + setting.ModelSafetyTest.GPUBaseDataSetName)
  468. log.Info("GPUBaseDataSetUUID=" + setting.ModelSafetyTest.GPUBaseDataSetUUID)
  469. log.Info("GPUCombatDataSetName=" + setting.ModelSafetyTest.GPUCombatDataSetName)
  470. log.Info("GPUCombatDataSetUUID=" + setting.ModelSafetyTest.GPUCombatDataSetUUID)
  471. var displayJobName = cloudbrainService.GetDisplayJobName(ctx.User.Name)
  472. ctx.Data["display_job_name"] = displayJobName
  473. prepareCloudbrainOneSpecs(ctx)
  474. queuesDetail, _ := cloudbrain.GetQueuesDetail()
  475. if queuesDetail != nil {
  476. ctx.Data["QueuesDetail"] = queuesDetail
  477. reqPara, _ := json.Marshal(queuesDetail)
  478. log.Warn("The GPU WaitCount json:", string(reqPara))
  479. } else {
  480. log.Info("The GPU WaitCount not get")
  481. }
  482. NotStopTaskCount, _ := models.GetModelSafetyCountByUserID(ctx.User.ID)
  483. ctx.Data["NotStopTaskCount"] = NotStopTaskCount
  484. ctx.HTML(200, tplModelSafetyTestCreateGpu)
  485. }
  486. func AiSafetyCreateForGetNPU(ctx *context.Context) {
  487. ctx.Data["PageIsCloudBrain"] = true
  488. ctx.Data["IsCreate"] = true
  489. ctx.Data["type"] = models.TypeCloudBrainTwo
  490. ctx.Data["compute_resource"] = models.NPUResource
  491. var displayJobName = cloudbrainService.GetDisplayJobName(ctx.User.Name)
  492. ctx.Data["display_job_name"] = displayJobName
  493. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  494. ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.NPUBaseDataSetName
  495. ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.NPUBaseDataSetUUID
  496. ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.NPUCombatDataSetName
  497. ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.NPUCombatDataSetUUID
  498. log.Info("NPUBaseDataSetName=" + setting.ModelSafetyTest.NPUBaseDataSetName)
  499. log.Info("NPUBaseDataSetUUID=" + setting.ModelSafetyTest.NPUBaseDataSetUUID)
  500. log.Info("NPUCombatDataSetName=" + setting.ModelSafetyTest.NPUCombatDataSetName)
  501. log.Info("NPUCombatDataSetUUID=" + setting.ModelSafetyTest.NPUCombatDataSetUUID)
  502. var resourcePools modelarts.ResourcePool
  503. if err := json.Unmarshal([]byte(setting.ResourcePools), &resourcePools); err != nil {
  504. ctx.ServerError("json.Unmarshal failed:", err)
  505. }
  506. ctx.Data["resource_pools"] = resourcePools.Info
  507. var engines modelarts.Engine
  508. if err := json.Unmarshal([]byte(setting.Engines), &engines); err != nil {
  509. ctx.ServerError("json.Unmarshal failed:", err)
  510. }
  511. ctx.Data["engines"] = engines.Info
  512. var versionInfos modelarts.VersionInfo
  513. if err := json.Unmarshal([]byte(setting.EngineVersions), &versionInfos); err != nil {
  514. ctx.ServerError("json.Unmarshal failed:", err)
  515. }
  516. ctx.Data["engine_versions"] = versionInfos.Version
  517. prepareCloudbrainTwoInferenceSpecs(ctx)
  518. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeCloudBrainTwo, "")
  519. ctx.Data["WaitCount"] = waitCount
  520. log.Info("The NPU WaitCount is " + fmt.Sprint(waitCount))
  521. NotStopTaskCount, _ := models.GetModelSafetyCountByUserID(ctx.User.ID)
  522. ctx.Data["NotStopTaskCount"] = NotStopTaskCount
  523. ctx.HTML(200, tplModelSafetyTestCreateNpu)
  524. }
  525. func AiSafetyCreateForPost(ctx *context.Context) {
  526. ctx.Data["PageIsCloudBrain"] = true
  527. displayJobName := ctx.Query("display_job_name")
  528. jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
  529. taskType := ctx.QueryInt("type")
  530. description := ctx.Query("description")
  531. ctx.Data["type"] = taskType
  532. ctx.Data["displayJobName"] = displayJobName
  533. ctx.Data["description"] = description
  534. repo := ctx.Repo.Repository
  535. tpname := tplCloudBrainModelSafetyNewNpu
  536. if taskType == models.TypeCloudBrainOne {
  537. tpname = tplCloudBrainModelSafetyNewGpu
  538. }
  539. lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeInference)}, User: ctx.User})
  540. defer func() {
  541. if lockOperator != nil {
  542. lockOperator.Unlock()
  543. }
  544. }()
  545. if errMsg != "" {
  546. log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"])
  547. modelSafetyNewDataPrepare(ctx)
  548. ctx.RenderWithErr(ctx.Tr(errMsg), tpname, nil)
  549. return
  550. }
  551. tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeModelSafety), displayJobName)
  552. if err == nil {
  553. if len(tasks) != 0 {
  554. log.Error("the job name did already exist", ctx.Data["MsgID"])
  555. modelSafetyNewDataPrepare(ctx)
  556. ctx.RenderWithErr("the job name did already exist", tpname, nil)
  557. return
  558. }
  559. } else {
  560. if !models.IsErrJobNotExist(err) {
  561. log.Error("system error, %v", err, ctx.Data["MsgID"])
  562. modelSafetyNewDataPrepare(ctx)
  563. ctx.RenderWithErr("system error", tpname, nil)
  564. return
  565. }
  566. }
  567. if !jobNamePattern.MatchString(jobName) {
  568. modelSafetyNewDataPrepare(ctx)
  569. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpname, nil)
  570. return
  571. }
  572. count, err := models.GetModelSafetyCountByUserID(ctx.User.ID)
  573. if err != nil {
  574. log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"])
  575. modelSafetyNewDataPrepare(ctx)
  576. ctx.RenderWithErr("system error", tpname, nil)
  577. return
  578. } else {
  579. if count >= 1 {
  580. log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
  581. modelSafetyNewDataPrepare(ctx)
  582. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain.morethanonejob"), tpname, nil)
  583. return
  584. }
  585. }
  586. BootFile := ctx.Query("boot_file")
  587. BootFile = strings.TrimSpace(BootFile)
  588. bootFileExist, err := ctx.Repo.FileExists(BootFile, cloudbrain.DefaultBranchName)
  589. if err != nil || !bootFileExist {
  590. log.Error("Get bootfile error:", err)
  591. modelSafetyNewDataPrepare(ctx)
  592. ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_bootfile_err"), tpname, nil)
  593. return
  594. }
  595. if taskType == models.TypeCloudBrainTwo {
  596. err = createForNPU(ctx, jobName)
  597. } else if taskType == models.TypeCloudBrainOne {
  598. err = createForGPU(ctx, jobName)
  599. }
  600. if err != nil {
  601. modelSafetyNewDataPrepare(ctx)
  602. ctx.RenderWithErr(err.Error(), tpname, nil)
  603. } else {
  604. log.Info("to redirect...")
  605. ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/cloudbrain/benchmark")
  606. }
  607. }
  608. func createForNPU(ctx *context.Context, jobName string) error {
  609. VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount)
  610. BootFile := ctx.Query("boot_file")
  611. BootFile = strings.TrimSpace(BootFile)
  612. displayJobName := ctx.Query("display_job_name")
  613. description := ctx.Query("description")
  614. srcDataset := ctx.Query("src_dataset") //uuid
  615. combatDataset := ctx.Query("combat_dataset") //uuid
  616. evaluationIndex := ctx.Query("evaluation_index")
  617. Params := ctx.Query("run_para_list")
  618. specId := ctx.QueryInt64("spec_id")
  619. engineID := ctx.QueryInt("engine_id")
  620. log.Info("engine_id=" + fmt.Sprint(engineID))
  621. poolID := ctx.Query("pool_id")
  622. repo := ctx.Repo.Repository
  623. trainUrl := ctx.Query("train_url")
  624. modelName := ctx.Query("model_name")
  625. modelVersion := ctx.Query("model_version")
  626. ckptName := ctx.Query("ckpt_name")
  627. ckptUrl := "/" + trainUrl + ckptName
  628. log.Info("ckpt url:" + ckptUrl)
  629. FlavorName := ctx.Query("flaver_names")
  630. EngineName := ctx.Query("engine_names")
  631. isLatestVersion := modelarts.IsLatestVersion
  632. VersionCount := modelarts.VersionCountOne
  633. codeLocalPath := setting.JobPath + jobName + modelarts.CodePath
  634. codeObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.CodePath
  635. resultObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.ResultPath + VersionOutputPath + "/"
  636. logObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.LogPath + VersionOutputPath + "/"
  637. log.Info("ckpt url:" + ckptUrl)
  638. spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{
  639. JobType: models.JobTypeInference,
  640. ComputeResource: models.NPU,
  641. Cluster: models.OpenICluster,
  642. AiCenterCode: models.AICenterOfCloudBrainTwo})
  643. if err != nil || spec == nil {
  644. //ctx.RenderWithErr("Resource specification not available", tplCloudBrainModelSafetyNewNpu, nil)
  645. return errors.New("Resource specification not available")
  646. }
  647. if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) {
  648. log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID)
  649. return errors.New(ctx.Tr("points.insufficient_points_balance"))
  650. }
  651. //todo: del the codeLocalPath
  652. _, err = ioutil.ReadDir(codeLocalPath)
  653. if err == nil {
  654. os.RemoveAll(codeLocalPath)
  655. }
  656. gitRepo, _ := git.OpenRepository(repo.RepoPath())
  657. commitID, _ := gitRepo.GetBranchCommitID(cloudbrain.DefaultBranchName)
  658. if err := downloadCode(repo, codeLocalPath, cloudbrain.DefaultBranchName); err != nil {
  659. log.Error("Create task failed, server timed out: %s (%v)", repo.FullName(), err)
  660. return errors.New(ctx.Tr("cloudbrain.load_code_failed"))
  661. }
  662. //todo: upload code (send to file_server todo this work?)
  663. if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.ResultPath + VersionOutputPath + "/"); err != nil {
  664. log.Error("Failed to obsMkdir_result: %s (%v)", repo.FullName(), err)
  665. return errors.New("Failed to obsMkdir_result")
  666. }
  667. if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.LogPath + VersionOutputPath + "/"); err != nil {
  668. log.Error("Failed to obsMkdir_log: %s (%v)", repo.FullName(), err)
  669. return errors.New("Failed to obsMkdir_log")
  670. }
  671. if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil {
  672. log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err)
  673. return errors.New(ctx.Tr("cloudbrain.load_code_failed"))
  674. }
  675. var parameters models.Parameters
  676. param := make([]models.Parameter, 0)
  677. param = append(param, models.Parameter{
  678. Label: modelarts.ResultUrl,
  679. Value: "s3:/" + resultObsPath,
  680. }, models.Parameter{
  681. Label: modelarts.CkptUrl,
  682. Value: "s3:/" + ckptUrl,
  683. })
  684. uuid := srcDataset + ";" + combatDataset
  685. datasUrlList, dataUrl, datasetNames, isMultiDataset, err := getDatasUrlListByUUIDS(uuid)
  686. if err != nil {
  687. return err
  688. }
  689. dataPath := dataUrl
  690. jsondatas, err := json.Marshal(datasUrlList)
  691. if err != nil {
  692. log.Error("Failed to Marshal: %v", err)
  693. return err
  694. }
  695. if isMultiDataset {
  696. param = append(param, models.Parameter{
  697. Label: modelarts.MultiDataUrl,
  698. Value: string(jsondatas),
  699. })
  700. }
  701. existDeviceTarget := false
  702. if len(Params) != 0 {
  703. err := json.Unmarshal([]byte(Params), &parameters)
  704. if err != nil {
  705. log.Error("Failed to Unmarshal params: %s (%v)", Params, err)
  706. return errors.New("运行参数错误")
  707. }
  708. for _, parameter := range parameters.Parameter {
  709. if parameter.Label == modelarts.DeviceTarget {
  710. existDeviceTarget = true
  711. }
  712. if parameter.Label != modelarts.TrainUrl && parameter.Label != modelarts.DataUrl {
  713. param = append(param, models.Parameter{
  714. Label: parameter.Label,
  715. Value: parameter.Value,
  716. })
  717. }
  718. }
  719. }
  720. if !existDeviceTarget {
  721. param = append(param, models.Parameter{
  722. Label: modelarts.DeviceTarget,
  723. Value: modelarts.Ascend,
  724. })
  725. }
  726. req := &modelarts.GenerateInferenceJobReq{
  727. JobName: jobName,
  728. DisplayJobName: displayJobName,
  729. DataUrl: dataPath,
  730. Description: description,
  731. CodeObsPath: codeObsPath,
  732. BootFileUrl: codeObsPath + BootFile,
  733. BootFile: BootFile,
  734. TrainUrl: trainUrl,
  735. WorkServerNumber: 1,
  736. EngineID: int64(engineID),
  737. LogUrl: logObsPath,
  738. PoolID: poolID,
  739. Uuid: uuid,
  740. Parameters: param, //modelarts train parameters
  741. CommitID: commitID,
  742. BranchName: cloudbrain.DefaultBranchName,
  743. Params: Params,
  744. FlavorName: FlavorName,
  745. EngineName: EngineName,
  746. LabelName: evaluationIndex,
  747. IsLatestVersion: isLatestVersion,
  748. VersionCount: VersionCount,
  749. TotalVersionCount: modelarts.TotalVersionCount,
  750. ModelName: modelName,
  751. ModelVersion: modelVersion,
  752. CkptName: ckptName,
  753. ResultUrl: resultObsPath,
  754. Spec: spec,
  755. DatasetName: datasetNames,
  756. JobType: string(models.JobTypeModelSafety),
  757. }
  758. _, err = modelarts.GenerateInferenceJob(ctx, req)
  759. if err != nil {
  760. log.Error("GenerateTrainJob failed:%v", err.Error())
  761. return err
  762. }
  763. return nil
  764. }
  765. func createForGPU(ctx *context.Context, jobName string) error {
  766. BootFile := ctx.Query("boot_file")
  767. BootFile = strings.TrimSpace(BootFile)
  768. displayJobName := ctx.Query("display_job_name")
  769. description := ctx.Query("description")
  770. image := strings.TrimSpace(ctx.Query("image"))
  771. srcDataset := ctx.Query("src_dataset") //uuid
  772. combatDataset := ctx.Query("combat_dataset") //uuid
  773. evaluationIndex := ctx.Query("evaluation_index")
  774. Params := ctx.Query("run_para_list")
  775. specId := ctx.QueryInt64("spec_id")
  776. TrainUrl := ctx.Query("train_url")
  777. CkptName := ctx.Query("ckpt_name")
  778. modelName := ctx.Query("model_name")
  779. modelVersion := ctx.Query("model_version")
  780. ckptUrl := setting.Attachment.Minio.RealPath + TrainUrl + CkptName
  781. log.Info("ckpt url:" + ckptUrl)
  782. spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{
  783. JobType: models.JobTypeBenchmark,
  784. ComputeResource: models.GPU,
  785. Cluster: models.OpenICluster,
  786. AiCenterCode: models.AICenterOfCloudBrainOne})
  787. if err != nil || spec == nil {
  788. return errors.New("Resource specification not available")
  789. }
  790. if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) {
  791. log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID)
  792. return errors.New(ctx.Tr("points.insufficient_points_balance"))
  793. }
  794. repo := ctx.Repo.Repository
  795. codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath
  796. os.RemoveAll(codePath)
  797. gitRepo, _ := git.OpenRepository(repo.RepoPath())
  798. commitID, _ := gitRepo.GetBranchCommitID(cloudbrain.DefaultBranchName)
  799. if err := downloadCode(repo, codePath, cloudbrain.DefaultBranchName); err != nil {
  800. log.Error("downloadCode failed, %v", err, ctx.Data["MsgID"])
  801. return errors.New("system error")
  802. }
  803. err = uploadCodeToMinio(codePath+"/", jobName, cloudbrain.CodeMountPath+"/")
  804. if err != nil {
  805. log.Error("uploadCodeToMinio failed, %v", err, ctx.Data["MsgID"])
  806. return errors.New("system error")
  807. }
  808. uuid := srcDataset + ";" + combatDataset
  809. datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid)
  810. log.Info("uuid=" + uuid)
  811. if err != nil {
  812. log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"])
  813. return errors.New(ctx.Tr("cloudbrain.error.dataset_select"))
  814. }
  815. command, err := getGpuModelSafetyCommand(BootFile, Params, CkptName, displayJobName)
  816. if err != nil {
  817. log.Error("Get Command failed: %v", err, ctx.Data["MsgID"])
  818. return errors.New(ctx.Tr("cloudbrain.error.dataset_select"))
  819. }
  820. log.Info("Command=" + command)
  821. req := cloudbrain.GenerateCloudBrainTaskReq{
  822. Ctx: ctx,
  823. DisplayJobName: displayJobName,
  824. JobName: jobName,
  825. Image: image,
  826. Command: command,
  827. Uuids: uuid,
  828. DatasetNames: datasetNames,
  829. DatasetInfos: datasetInfos,
  830. CodePath: storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"),
  831. ModelPath: setting.Attachment.Minio.RealPath + TrainUrl,
  832. BenchmarkPath: storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"),
  833. Snn4ImageNetPath: storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"),
  834. BrainScorePath: storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"),
  835. JobType: string(models.JobTypeModelSafety),
  836. Description: description,
  837. BranchName: cloudbrain.DefaultBranchName,
  838. BootFile: BootFile,
  839. Params: Params,
  840. CommitID: commitID,
  841. ModelName: modelName,
  842. ModelVersion: modelVersion,
  843. CkptName: CkptName,
  844. ResultPath: storage.GetMinioPath(jobName, cloudbrain.ResultPath+"/"),
  845. Spec: spec,
  846. LabelName: evaluationIndex,
  847. }
  848. _, err = cloudbrain.GenerateTask(req)
  849. if err != nil {
  850. return err
  851. }
  852. return nil
  853. }
  854. func getGpuModelSafetyCommand(BootFile string, params string, CkptName string, DisplayJobName string) (string, error) {
  855. var command string
  856. bootFile := strings.TrimSpace(BootFile)
  857. if !strings.HasSuffix(bootFile, ".py") {
  858. log.Error("bootFile(%s) format error", bootFile)
  859. return command, errors.New("bootFile format error")
  860. }
  861. var parameters models.Parameters
  862. var param string
  863. if len(params) != 0 {
  864. err := json.Unmarshal([]byte(params), &parameters)
  865. if err != nil {
  866. log.Error("Failed to Unmarshal params: %s (%v)", params, err)
  867. return command, err
  868. }
  869. for _, parameter := range parameters.Parameter {
  870. param += " --" + parameter.Label + "=" + parameter.Value
  871. }
  872. }
  873. param += " --ckpt_url=" + cloudbrain.ModelMountPath + "/" + CkptName
  874. command += "python /code/" + bootFile + param + " > " + cloudbrain.ResultPath + "/" + DisplayJobName + "-" + cloudbrain.LogFile
  875. return command, nil
  876. }
  877. func modelSafetyNewDataPrepare(ctx *context.Context) error {
  878. ctx.Data["PageIsCloudBrain"] = true
  879. ctx.Data["type"] = ctx.QueryInt("type")
  880. ctx.Data["boot_file"] = ctx.Query("boot_file")
  881. ctx.Data["display_job_name"] = ctx.Query("display_job_name")
  882. ctx.Data["description"] = ctx.Query("description")
  883. ctx.Data["image"] = strings.TrimSpace(ctx.Query("image"))
  884. ctx.Data["src_dataset"] = ctx.Query("src_dataset") //uuid
  885. ctx.Data["combat_dataset"] = ctx.Query("combat_dataset") //uuid
  886. ctx.Data["evaluationIndex"] = ctx.Query("evaluation_index")
  887. ctx.Data["run_para_list"] = ctx.Query("run_para_list")
  888. ctx.Data["spec_id"] = ctx.QueryInt64("spec_id")
  889. ctx.Data["train_url"] = ctx.Query("train_url")
  890. ctx.Data["ckpt_name"] = ctx.Query("ckpt_name")
  891. ctx.Data["train_url"] = ctx.Query("train_url")
  892. ctx.Data["ckpt_name"] = ctx.Query("ckpt_name")
  893. ctx.Data["model_name"] = ctx.Query("model_name")
  894. ctx.Data["model_version"] = ctx.Query("model_version")
  895. NotStopTaskCount, _ := models.GetModelSafetyCountByUserID(ctx.User.ID)
  896. ctx.Data["NotStopTaskCount"] = NotStopTaskCount
  897. if ctx.QueryInt("type") == models.TypeCloudBrainOne {
  898. ctx.Data["type"] = models.TypeCloudBrainOne
  899. ctx.Data["compute_resource"] = models.GPUResource
  900. ctx.Data["datasetType"] = models.TypeCloudBrainOne
  901. ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.GPUBaseDataSetName
  902. ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.GPUBaseDataSetUUID
  903. ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.GPUCombatDataSetName
  904. ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.GPUCombatDataSetUUID
  905. prepareCloudbrainOneSpecs(ctx)
  906. queuesDetail, _ := cloudbrain.GetQueuesDetail()
  907. if queuesDetail != nil {
  908. ctx.Data["QueuesDetail"] = queuesDetail
  909. }
  910. } else {
  911. ctx.Data["engine_id"] = ctx.QueryInt("engine_id")
  912. ctx.Data["pool_id"] = ctx.Query("pool_id")
  913. ctx.Data["type"] = models.TypeCloudBrainTwo
  914. ctx.Data["compute_resource"] = models.NPUResource
  915. ctx.Data["datasetType"] = models.TypeCloudBrainTwo
  916. ctx.Data["BaseDataSetName"] = setting.ModelSafetyTest.NPUBaseDataSetName
  917. ctx.Data["BaseDataSetUUID"] = setting.ModelSafetyTest.NPUBaseDataSetUUID
  918. ctx.Data["CombatDataSetName"] = setting.ModelSafetyTest.NPUCombatDataSetName
  919. ctx.Data["CombatDataSetUUID"] = setting.ModelSafetyTest.NPUCombatDataSetUUID
  920. var engines modelarts.Engine
  921. if err := json.Unmarshal([]byte(setting.Engines), &engines); err != nil {
  922. ctx.ServerError("json.Unmarshal failed:", err)
  923. }
  924. ctx.Data["engines"] = engines.Info
  925. var versionInfos modelarts.VersionInfo
  926. if err := json.Unmarshal([]byte(setting.EngineVersions), &versionInfos); err != nil {
  927. ctx.ServerError("json.Unmarshal failed:", err)
  928. }
  929. ctx.Data["engine_versions"] = versionInfos.Version
  930. prepareCloudbrainTwoInferenceSpecs(ctx)
  931. waitCount := cloudbrain.GetWaitingCloudbrainCount(models.TypeCloudBrainTwo, "")
  932. ctx.Data["WaitCount"] = waitCount
  933. }
  934. return nil
  935. }
  936. func getJsonContent(url string) (string, error) {
  937. resp, err := http.Get(url)
  938. if err != nil || resp.StatusCode != 200 {
  939. log.Info("Get organizations url error=" + err.Error())
  940. return "", err
  941. }
  942. bytes, err := ioutil.ReadAll(resp.Body)
  943. resp.Body.Close()
  944. if err != nil {
  945. log.Info("Get organizations url error=" + err.Error())
  946. return "", err
  947. }
  948. str := string(bytes)
  949. //log.Info("json str =" + str)
  950. return str, nil
  951. }