You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedule.go 3.2 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package urchin
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/labelmsg"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/setting"
  10. )
  11. type DecompressReq struct {
  12. SourceFile string `json:"source_file"`
  13. DestPath string `json:"dest_path"`
  14. }
  15. var urfsClient Urchinfs
  16. func getUrfsClient() {
  17. if urfsClient != nil {
  18. return
  19. }
  20. urfsClient = New()
  21. }
  22. func GetBackNpuModel(cloudbrainID int64, endpoint, bucket, objectKey, destPeerHost string) error {
  23. getUrfsClient()
  24. res, err := urfsClient.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, destPeerHost)
  25. if err != nil {
  26. log.Error("ScheduleDataToPeerByKey failed:%v", err)
  27. return err
  28. }
  29. _, err = models.InsertScheduleRecord(&models.ScheduleRecord{
  30. CloudbrainID: cloudbrainID,
  31. EndPoint: res.DataEndpoint,
  32. Bucket: res.DataRoot,
  33. ObjectKey: res.DataPath,
  34. ProxyServer: destPeerHost,
  35. Status: res.StatusCode,
  36. })
  37. if err != nil {
  38. log.Error("InsertScheduleRecord failed:%v", err)
  39. return err
  40. }
  41. switch res.StatusCode {
  42. case models.StorageScheduleSucceed:
  43. log.Info("ScheduleDataToPeerByKey succeed")
  44. decompress(res.DataRoot+"/"+res.DataPath, setting.Bucket+"/"+strings.TrimSuffix(res.DataPath, models.ModelSuffix))
  45. case models.StorageScheduleProcessing:
  46. log.Info("ScheduleDataToPeerByKey processing")
  47. case models.StorageScheduleFailed:
  48. log.Error("ScheduleDataToPeerByKey failed:%s", res.StatusMsg)
  49. return fmt.Errorf("GetBackNpuModel failed:%s", res.StatusMsg)
  50. default:
  51. log.Info("ScheduleDataToPeerByKey failed, unknown StatusCode:%d", res.StatusCode)
  52. return fmt.Errorf("GetBackNpuModel failed, unknow StatusCode:%d", res.StatusCode)
  53. }
  54. return nil
  55. }
  56. func HandleScheduleRecords() error {
  57. getUrfsClient()
  58. records, err := models.GetSchedulingRecord()
  59. if err != nil {
  60. log.Error("GetSchedulingRecord failed:%v", err)
  61. return err
  62. }
  63. for _, record := range records {
  64. res, err := urfsClient.CheckScheduleTaskStatusByKey(record.EndPoint, record.Bucket, record.ObjectKey, record.ProxyServer)
  65. if err != nil {
  66. log.Error("CheckScheduleTaskStatusByKey(%d) failed:%v", record.ID, err)
  67. continue
  68. }
  69. record.Status = res.StatusCode
  70. models.UpdateScheduleCols(record, "status")
  71. switch res.StatusCode {
  72. case models.StorageScheduleSucceed:
  73. log.Info("ScheduleDataToPeerByKey(%s) succeed", record.ObjectKey)
  74. decompress(record.Bucket+"/"+record.ObjectKey, setting.Bucket+"/"+strings.TrimSuffix(record.ObjectKey, models.ModelSuffix))
  75. case models.StorageScheduleProcessing:
  76. log.Info("ScheduleDataToPeerByKey(%s) processing", record.ObjectKey)
  77. case models.StorageScheduleFailed:
  78. log.Error("ScheduleDataToPeerByKey(%s) failed:%s", record.ObjectKey, res.StatusMsg)
  79. default:
  80. log.Info("ScheduleDataToPeerByKey(%s) failed, unknown StatusCode:%d", record.ObjectKey, res.StatusCode)
  81. }
  82. }
  83. return nil
  84. }
  85. func decompress(sourceFile, destPath string) {
  86. req, _ := json.Marshal(DecompressReq{
  87. SourceFile: sourceFile,
  88. DestPath: destPath,
  89. })
  90. err := labelmsg.SendDecompressAttachToLabelOBS(string(req))
  91. if err != nil {
  92. log.Error("SendDecompressTask to labelsystem (%s) failed:%s", sourceFile, err.Error())
  93. }
  94. }