You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

resty.go 14 kB

3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
2 years ago
4 years ago
4 years ago
2 years ago
2 years ago
2 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago

  1. package cloudbrain
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "code.gitea.io/gitea/modules/log"
  11. "code.gitea.io/gitea/models"
  12. "code.gitea.io/gitea/modules/setting"
  13. "github.com/go-resty/resty/v2"
  14. )
  15. var (
  16. restyClient *resty.Client
  17. HOST string
  18. TOKEN string
  19. ImagesUrlMap = map[string]string{Public: "/rest-server/api/v1/image/public/list/", Custom: "/rest-server/api/v1/image/list/"}
  20. )
  21. const (
  22. JobHasBeenStopped = "S410"
  23. errInvalidToken = "S401"
  24. Public = "public"
  25. Custom = "custom"
  26. LogPageSize = 500
  27. LogPageTokenExpired = "5m"
  28. pageSize = 15
  29. QueuesDetailUrl = "/rest-server/api/v2/queuesdetail"
  30. )
  31. func getRestyClient() *resty.Client {
  32. if restyClient == nil {
  33. restyClient = resty.New()
  34. }
  35. return restyClient
  36. }
  37. func checkSetting() {
  38. if len(HOST) != 0 && len(TOKEN) != 0 && restyClient != nil {
  39. return
  40. }
  41. _ = loginCloudbrain()
  42. }
  43. func loginCloudbrain() error {
  44. conf := setting.GetCloudbrainConfig()
  45. username := conf.Username
  46. password := conf.Password
  47. HOST = conf.Host
  48. var loginResult models.CloudBrainLoginResult
  49. client := getRestyClient()
  50. res, err := client.R().
  51. SetHeader("Content-Type", "application/json").
  52. SetBody(map[string]interface{}{"username": username, "password": password, "expiration": conf.Expiration}).
  53. SetResult(&loginResult).
  54. Post(HOST + "/rest-server/api/v1/token")
  55. if err != nil {
  56. return fmt.Errorf("resty loginCloudbrain: %s", err)
  57. }
  58. if loginResult.Code != Success {
  59. return fmt.Errorf("%s: %s", loginResult.Msg, res.String())
  60. }
  61. TOKEN = loginResult.Payload["token"].(string)
  62. return nil
  63. }
  64. func GetQueuesDetail() (*map[string]int, error) {
  65. checkSetting()
  66. client := getRestyClient()
  67. var jobResult models.QueueDetailResult
  68. var result = make(map[string]int, 0)
  69. retry := 0
  70. sendjob:
  71. res, err := client.R().
  72. SetHeader("Content-Type", "application/json").
  73. SetAuthToken(TOKEN).
  74. SetResult(&jobResult).
  75. Get(HOST + QueuesDetailUrl)
  76. if err != nil {
  77. return nil, fmt.Errorf("resty get queues detail failed: %s", err)
  78. }
  79. if (res.StatusCode() == http.StatusUnauthorized || jobResult.Code == errInvalidToken) && retry < 1 {
  80. retry++
  81. _ = loginCloudbrain()
  82. goto sendjob
  83. }
  84. if jobResult.Code != Success {
  85. return nil, fmt.Errorf("jobResult err: %s", res.String())
  86. }
  87. for k, v := range jobResult.Payload {
  88. result[k] = v.JobScheduleInfo.Pending
  89. }
  90. return &result, nil
  91. }
  92. func CreateJob(jobName string, createJobParams models.CreateJobParams) (*models.CreateJobResult, error) {
  93. checkSetting()
  94. client := getRestyClient()
  95. var jobResult models.CreateJobResult
  96. retry := 0
  97. reqPara, _ := json.Marshal(createJobParams)
  98. log.Warn("job req:", string(reqPara[:]))
  99. sendjob:
  100. res, err := client.R().
  101. SetHeader("Content-Type", "application/json").
  102. SetAuthToken(TOKEN).
  103. SetBody(createJobParams).
  104. SetResult(&jobResult).
  105. Post(HOST + "/rest-server/api/v1/jobs/")
  106. if err != nil {
  107. return nil, fmt.Errorf("resty create job: %s", err)
  108. }
  109. var response models.CloudBrainResult
  110. json.Unmarshal(res.Body(), &response)
  111. if response.Code == errInvalidToken && retry < 1 {
  112. retry++
  113. _ = loginCloudbrain()
  114. goto sendjob
  115. }
  116. if jobResult.Code != Success {
  117. return &jobResult, fmt.Errorf("jobResult err: %s", res.String())
  118. }
  119. return &jobResult, nil
  120. }
  121. func GetJob(jobID string) (*models.GetJobResult, error) {
  122. checkSetting()
  123. // http://192.168.204.24/rest-server/api/v1/jobs/90e26e500c4b3011ea0a251099a987938b96
  124. client := getRestyClient()
  125. var getJobResult models.GetJobResult
  126. retry := 0
  127. sendjob:
  128. res, err := client.R().
  129. SetHeader("Content-Type", "application/json").
  130. SetAuthToken(TOKEN).
  131. SetResult(&getJobResult).
  132. Get(HOST + "/rest-server/api/v1/jobs/" + jobID)
  133. if err != nil {
  134. return nil, fmt.Errorf("resty GetJob: %v", err)
  135. }
  136. var response models.CloudBrainResult
  137. json.Unmarshal(res.Body(), &response)
  138. if response.Code == errInvalidToken && retry < 1 {
  139. retry++
  140. _ = loginCloudbrain()
  141. goto sendjob
  142. }
  143. if getJobResult.Code != Success {
  144. return &getJobResult, fmt.Errorf("jobResult GetJob err: %s", res.String())
  145. }
  146. return &getJobResult, nil
  147. }
  148. func GetImagesPageable(page int, size int, imageType string, name string) (*models.GetImagesResult, error) {
  149. checkSetting()
  150. client := getRestyClient()
  151. var getImagesResult models.GetImagesResult
  152. retry := 0
  153. sendjob:
  154. res, err := client.R().
  155. SetHeader("Content-Type", "application/json").
  156. SetAuthToken(TOKEN).
  157. SetQueryString(getQueryString(page, size, name)).
  158. SetResult(&getImagesResult).
  159. Get(HOST + ImagesUrlMap[imageType])
  160. if err != nil {
  161. return nil, fmt.Errorf("resty GetImages: %v", err)
  162. }
  163. var response models.CloudBrainResult
  164. json.Unmarshal(res.Body(), &response)
  165. if response.Code == errInvalidToken && retry < 1 {
  166. retry++
  167. _ = loginCloudbrain()
  168. goto sendjob
  169. }
  170. if getImagesResult.Code != Success {
  171. return &getImagesResult, fmt.Errorf("getImagesResult err: %s", res.String())
  172. }
  173. getImagesResult.Payload.TotalPages = getTotalPages(getImagesResult, size)
  174. return &getImagesResult, nil
  175. }
  176. func getTotalPages(getImagesResult models.GetImagesResult, size int) int {
  177. totalCount := getImagesResult.Payload.Count
  178. var totalPages int
  179. if totalCount%size != 0 {
  180. totalPages = totalCount/size + 1
  181. } else {
  182. totalPages = totalCount / size
  183. }
  184. return totalPages
  185. }
  186. func getQueryString(page int, size int, name string) string {
  187. if strings.TrimSpace(name) == "" {
  188. return fmt.Sprintf("pageIndex=%d&pageSize=%d", page, size)
  189. }
  190. return fmt.Sprintf("pageIndex=%d&pageSize=%d&name=%s", page, size, name)
  191. }
  192. func CommitImage(jobID string, params models.CommitImageParams) error {
  193. imageTag := strings.TrimSpace(params.ImageTag)
  194. dbImage, err := models.GetImageByTag(imageTag)
  195. if err != nil && !models.IsErrImageNotExist(err) {
  196. return fmt.Errorf("resty CommitImage: %v", err)
  197. }
  198. var createTime time.Time
  199. var isSetCreatedUnix = false
  200. if dbImage != nil {
  201. if dbImage.UID != params.UID {
  202. return models.ErrorImageTagExist{
  203. Tag: imageTag,
  204. }
  205. } else {
  206. if dbImage.Status == models.IMAGE_STATUS_COMMIT {
  207. return models.ErrorImageCommitting{
  208. Tag: imageTag,
  209. }
  210. } else { //覆盖提交
  211. result, err := GetImagesPageable(1, pageSize, Custom, "")
  212. if err == nil && result.Code == "S000" {
  213. for _, v := range result.Payload.ImageInfo {
  214. if v.Place == dbImage.Place {
  215. isSetCreatedUnix = true
  216. createTime, _ = time.Parse(time.RFC3339, v.Createtime)
  217. break
  218. }
  219. }
  220. }
  221. }
  222. }
  223. }
  224. checkSetting()
  225. client := getRestyClient()
  226. var result models.CommitImageResult
  227. retry := 0
  228. sendjob:
  229. res, err := client.R().
  230. SetHeader("Content-Type", "application/json").
  231. SetAuthToken(TOKEN).
  232. SetBody(params.CommitImageCloudBrainParams).
  233. SetResult(&result).
  234. Post(HOST + "/rest-server/api/v1/jobs/" + jobID + "/commitImage")
  235. if err != nil {
  236. return fmt.Errorf("resty CommitImage: %v", err)
  237. }
  238. var response models.CloudBrainResult
  239. json.Unmarshal(res.Body(), &response)
  240. if response.Code == errInvalidToken && retry < 1 {
  241. retry++
  242. _ = loginCloudbrain()
  243. goto sendjob
  244. }
  245. if result.Code != Success {
  246. return fmt.Errorf("CommitImage err: %s", res.String())
  247. }
  248. image := models.Image{
  249. Type: models.NORMAL_TYPE,
  250. CloudbrainType: params.CloudBrainType,
  251. UID: params.UID,
  252. IsPrivate: params.IsPrivate,
  253. Tag: imageTag,
  254. Description: params.ImageDescription,
  255. Place: setting.Cloudbrain.ImageURLPrefix + imageTag,
  256. Status: models.IMAGE_STATUS_COMMIT,
  257. }
  258. err = models.WithTx(func(ctx models.DBContext) error {
  259. models.UpdateAutoIncrementIndex()
  260. if dbImage != nil {
  261. dbImage.IsPrivate = params.IsPrivate
  262. dbImage.Description = params.ImageDescription
  263. dbImage.Status = models.IMAGE_STATUS_COMMIT
  264. image = *dbImage
  265. if err := models.UpdateLocalImage(dbImage); err != nil {
  266. log.Error("Failed to update image record.", err)
  267. return fmt.Errorf("CommitImage err: %s", res.String())
  268. }
  269. } else {
  270. if err := models.CreateLocalImage(&image); err != nil {
  271. log.Error("Failed to insert image record.", err)
  272. return fmt.Errorf("CommitImage err: %s", res.String())
  273. }
  274. }
  275. if err := models.SaveImageTopics(image.ID, params.Topics...); err != nil {
  276. log.Error("Failed to insert image record.", err)
  277. return fmt.Errorf("CommitImage err: %s", res.String())
  278. }
  279. return nil
  280. })
  281. if err == nil {
  282. go updateImageStatus(image, isSetCreatedUnix, createTime)
  283. }
  284. return err
  285. }
  286. func CommitAdminImage(params models.CommitImageParams) error {
  287. imageTag := strings.TrimSpace(params.ImageTag)
  288. exist, err := models.IsImageExist(imageTag)
  289. if err != nil {
  290. return fmt.Errorf("resty CommitImage: %v", err)
  291. }
  292. if exist {
  293. return models.ErrorImageTagExist{
  294. Tag: imageTag,
  295. }
  296. }
  297. image := models.Image{
  298. CloudbrainType: params.CloudBrainType,
  299. UID: params.UID,
  300. IsPrivate: params.IsPrivate,
  301. Tag: imageTag,
  302. Description: params.ImageDescription,
  303. Place: params.Place,
  304. Status: models.IMAGE_STATUS_SUCCESS,
  305. Type: params.Type,
  306. }
  307. err = models.WithTx(func(ctx models.DBContext) error {
  308. if err := models.CreateLocalImage(&image); err != nil {
  309. log.Error("Failed to insert image record.", err)
  310. return fmt.Errorf("resty CommitImage: %v", err)
  311. }
  312. if err := models.SaveImageTopics(image.ID, params.Topics...); err != nil {
  313. log.Error("Failed to insert image record.", err)
  314. return fmt.Errorf("resty CommitImage: %v", err)
  315. }
  316. return nil
  317. })
  318. return err
  319. }
  320. func updateImageStatus(image models.Image, isSetCreatedUnix bool, createTime time.Time) {
  321. attemps := 60
  322. commitSuccess := false
  323. for i := 0; i < attemps; i++ {
  324. time.Sleep(20 * time.Second)
  325. log.Info("the " + strconv.Itoa(i) + " times query cloudbrain images.Imagetag:" + image.Tag + "isSetCreate:" + strconv.FormatBool(isSetCreatedUnix))
  326. result, err := GetImagesPageable(1, pageSize, Custom, "")
  327. if err == nil && result.Code == "S000" {
  328. log.Info("images count:" + strconv.Itoa(result.Payload.Count))
  329. for _, v := range result.Payload.ImageInfo {
  330. if v.Place == image.Place && (!isSetCreatedUnix || (isSetCreatedUnix && createTimeUpdated(v, createTime))) {
  331. image.Status = models.IMAGE_STATUS_SUCCESS
  332. models.UpdateLocalImageStatus(&image)
  333. commitSuccess = true
  334. break
  335. }
  336. }
  337. }
  338. if commitSuccess {
  339. break
  340. }
  341. }
  342. if !commitSuccess {
  343. image.Status = models.IMAGE_STATUS_Failed
  344. models.UpdateLocalImageStatus(&image)
  345. }
  346. }
  347. func createTimeUpdated(v *models.ImageInfo, createTime time.Time) bool {
  348. newTime, err := time.Parse(time.RFC3339, v.Createtime)
  349. if err != nil {
  350. return false
  351. }
  352. return newTime.After(createTime)
  353. }
  354. func StopJob(jobID string) error {
  355. checkSetting()
  356. client := getRestyClient()
  357. var result models.CloudBrainResult
  358. retry := 0
  359. sendjob:
  360. res, err := client.R().
  361. SetHeader("Content-Type", "application/json").
  362. SetAuthToken(TOKEN).
  363. SetResult(&result).
  364. Delete(HOST + "/rest-server/api/v1/jobs/" + jobID)
  365. if err != nil {
  366. return fmt.Errorf("resty StopJob: %v", err)
  367. }
  368. var response models.CloudBrainResult
  369. json.Unmarshal(res.Body(), &response)
  370. if response.Code == errInvalidToken && retry < 1 {
  371. retry++
  372. _ = loginCloudbrain()
  373. goto sendjob
  374. }
  375. if result.Code != Success {
  376. if result.Code == JobHasBeenStopped {
  377. log.Info("StopJob(%s) failed:%s", jobID, result.Msg)
  378. } else {
  379. return fmt.Errorf("StopJob err: %s", res.String())
  380. }
  381. }
  382. return nil
  383. }
  384. func GetJobLog(jobID string) (*models.GetJobLogResult, error) {
  385. checkSetting()
  386. client := getRestyClient()
  387. var result models.GetJobLogResult
  388. req := models.GetJobLogParams{
  389. Size: strconv.Itoa(LogPageSize),
  390. Sort: "log.offset",
  391. QueryInfo: models.QueryInfo{
  392. MatchInfo: models.MatchInfo{
  393. PodName: jobID + "-task1-0",
  394. },
  395. },
  396. }
  397. res, err := client.R().
  398. SetHeader("Content-Type", "application/json").
  399. SetAuthToken(TOKEN).
  400. SetBody(req).
  401. SetResult(&result).
  402. Post(HOST + "es/_search?_source=message&scroll=" + LogPageTokenExpired)
  403. if err != nil {
  404. log.Error("GetJobLog failed: %v", err)
  405. return &result, fmt.Errorf("resty GetJobLog: %v, %s", err, res.String())
  406. }
  407. if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) {
  408. log.Error("res.Status(): %s, response: %s", res.Status(), res.String())
  409. return &result, errors.New(res.String())
  410. }
  411. return &result, nil
  412. }
  413. func GetJobAllLog(scrollID string) (*models.GetJobLogResult, error) {
  414. checkSetting()
  415. client := getRestyClient()
  416. var result models.GetJobLogResult
  417. req := models.GetAllJobLogParams{
  418. Scroll: LogPageTokenExpired,
  419. ScrollID: scrollID,
  420. }
  421. res, err := client.R().
  422. SetHeader("Content-Type", "application/json").
  423. SetAuthToken(TOKEN).
  424. SetBody(req).
  425. SetResult(&result).
  426. Post(HOST + "es/_search/scroll")
  427. if err != nil {
  428. log.Error("GetJobAllLog failed: %v", err)
  429. return &result, fmt.Errorf("resty GetJobAllLog: %v, %s", err, res.String())
  430. }
  431. if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) {
  432. log.Error("res.Status(): %s, response: %s", res.Status(), res.String())
  433. return &result, errors.New(res.String())
  434. }
  435. return &result, nil
  436. }
  437. func DeleteJobLogToken(scrollID string) error {
  438. checkSetting()
  439. client := getRestyClient()
  440. var result models.DeleteJobLogTokenResult
  441. req := models.DeleteJobLogTokenParams{
  442. ScrollID: scrollID,
  443. }
  444. res, err := client.R().
  445. SetHeader("Content-Type", "application/json").
  446. SetAuthToken(TOKEN).
  447. SetBody(req).
  448. SetResult(&result).
  449. Delete(HOST + "es/_search/scroll")
  450. if err != nil {
  451. log.Error("DeleteJobLogToken failed: %v", err)
  452. return fmt.Errorf("resty DeleteJobLogToken: %v, %s", err, res.String())
  453. }
  454. if !strings.Contains(res.Status(), strconv.Itoa(http.StatusOK)) {
  455. log.Error("res.Status(): %s, response: %s", res.Status(), res.String())
  456. return errors.New(res.String())
  457. }
  458. if !result.Succeeded {
  459. log.Error("DeleteJobLogToken failed")
  460. return errors.New("DeleteJobLogToken failed")
  461. }
  462. return nil
  463. }