Browse Source

Merge remote-tracking branch 'origin/V20221214' into zouap

pull/3352/head
zouap 2 years ago
parent
commit
bd37ed33a0
13 changed files with 424 additions and 53 deletions
  1. +84
    -0
      models/cloudbrain.go
  2. +21
    -0
      models/cloudbrain_static.go
  3. +93
    -0
      models/resource_specification.go
  4. +17
    -2
      modules/cron/tasks_basic.go
  5. +1
    -4
      modules/grampus/grampus.go
  6. +24
    -0
      modules/setting/setting.go
  7. +1
    -0
      options/locale/locale_en-US.ini
  8. +2
    -0
      options/locale/locale_zh-CN.ini
  9. +1
    -1
      routers/api/v1/api.go
  10. +15
    -0
      routers/api/v1/repo/cloudbrain_dashboard.go
  11. +12
    -0
      routers/repo/cloudbrain.go
  12. +151
    -0
      services/cloudbrain/clear.go
  13. +2
    -46
      services/cloudbrain/resource/resource_specification.go

+ 84
- 0
models/cloudbrain.go View File

@@ -204,6 +204,7 @@ type Cloudbrain struct {
BenchmarkTypeRankLink string `xorm:"-"` BenchmarkTypeRankLink string `xorm:"-"`
StartTime timeutil.TimeStamp StartTime timeutil.TimeStamp
EndTime timeutil.TimeStamp EndTime timeutil.TimeStamp
Cleared bool `xorm:"DEFAULT false"`
Spec *Specification `xorm:"-"` Spec *Specification `xorm:"-"`
} }


@@ -1905,6 +1906,12 @@ func GetCloudbrainByID(id string) (*Cloudbrain, error) {
return getRepoCloudBrain(cb) return getRepoCloudBrain(cb)
} }


func IsCloudbrainExistByJobName(jobName string)(bool,error){
return x.Unscoped().Exist(&Cloudbrain{
JobName: jobName,
})
}

func GetCloudbrainByIDWithDeleted(id string) (*Cloudbrain, error) { func GetCloudbrainByIDWithDeleted(id string) (*Cloudbrain, error) {
idInt64, _ := strconv.ParseInt(id, 10, 64) idInt64, _ := strconv.ParseInt(id, 10, 64)
cb := &Cloudbrain{ID: idInt64} cb := &Cloudbrain{ID: idInt64}
@@ -2050,6 +2057,83 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) {
Find(&cloudbrains) Find(&cloudbrains)
} }


func GetCloudBrainOneStoppedNotDebugJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) {
cloudbrains := make([]*Cloudbrain, 0, 10)
endTimeBefore := time.Now().Unix() - int64(days)*24*3600
missEndTimeBefore := endTimeBefore - 24*3600
return cloudbrains, x.Unscoped().Cols("id,job_name,job_id").
In("status",
JobStopped, JobSucceeded, JobFailed, ModelArtsCreateFailed, ModelArtsStartFailed, ModelArtsUnavailable, ModelArtsResizFailed, ModelArtsDeleted,
ModelArtsStopped, ModelArtsTrainJobCanceled, ModelArtsTrainJobCheckFailed, ModelArtsTrainJobCompleted, ModelArtsTrainJobDeleteFailed, ModelArtsTrainJobDeployServiceFailed,
ModelArtsTrainJobFailed, ModelArtsTrainJobImageFailed, ModelArtsTrainJobKilled, ModelArtsTrainJobLost, ModelArtsTrainJobSubmitFailed, ModelArtsTrainJobSubmitModelFailed).
Where("(((end_time is null or end_time=0) and updated_unix<? and updated_unix != 0 ) or (end_time<? and end_time != 0)) and cleared=false and type=0 and job_type != 'DEBUG'", missEndTimeBefore, endTimeBefore).
Limit(limit).
Find(&cloudbrains)
}
/**
本方法考虑了再次调试的情况,多次调试取最后一次的任务的结束时间
*/
func GetCloudBrainOneStoppedDebugJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) {
cloudbrains := make([]*Cloudbrain, 0, 10)
endTimeBefore := time.Now().Unix() - int64(days)*24*3600
missEndTimeBefore := endTimeBefore - 24*3600
sql:=`SELECT id,job_name,job_id from (SELECT DISTINCT ON (job_name)
id, job_name, job_id,status,end_time,updated_unix,cleared
FROM cloudbrain
where type=0 and job_type='DEBUG'
ORDER BY job_name, updated_unix DESC) a
where status in ('STOPPED','SUCCEEDED','FAILED') and (((end_time is null or end_time=0) and updated_unix<? and updated_unix != 0 ) or (end_time<? and end_time != 0)) and cleared=false`

return cloudbrains, x.Unscoped().SQL(sql,missEndTimeBefore, endTimeBefore).Limit(limit).Find(&cloudbrains)

}


func UpdateCloudBrainRecordsCleared(ids []int64) error {
pageSize := 150
n := len(ids) / pageSize

var err error

for i := 1; i <= n+1; i++ {
tempIds := getPageIds(ids, i, pageSize)
if len(tempIds) > 0 {
idsIn := ""
for i, id := range tempIds {
if i == 0 {
idsIn += strconv.FormatInt(id, 10)
} else {
idsIn += "," + strconv.FormatInt(id, 10)
}
}

_, errTemp := x.Unscoped().Exec("update cloudbrain set cleared=true where id in (" + idsIn + ")")
if errTemp != nil {
err = errTemp
}

}

}
return err

}

func getPageIds(ids []int64, page int, pagesize int) []int64 {
begin := (page - 1) * pagesize
end := (page) * pagesize

if begin > len(ids)-1 {
return []int64{}
}
if end > len(ids)-1 {
return ids[begin:]
} else {
return ids[begin:end]
}

}

func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) { func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) {
cloudbrains := make([]*Cloudbrain, 0) cloudbrains := make([]*Cloudbrain, 0)
return cloudbrains, x. return cloudbrains, x.


+ 21
- 0
models/cloudbrain_static.go View File

@@ -183,6 +183,17 @@ func GetWaittingTop() ([]*CloudbrainInfo, error) {
Find(&cloudbrains); err != nil { Find(&cloudbrains); err != nil {
log.Info("find error.") log.Info("find error.")
} }

var ids []int64
for _, task := range cloudbrains {
ids = append(ids, task.RepoID)
}
repositoryMap, err := GetRepositoriesMapByIDs(ids)
if err == nil {
for _, task := range cloudbrains {
task.Repo = repositoryMap[task.RepoID]
}
}
return cloudbrains, nil return cloudbrains, nil
} }


@@ -199,6 +210,16 @@ func GetRunningTop() ([]*CloudbrainInfo, error) {
Find(&cloudbrains); err != nil { Find(&cloudbrains); err != nil {
log.Info("find error.") log.Info("find error.")
} }
var ids []int64
for _, task := range cloudbrains {
ids = append(ids, task.RepoID)
}
repositoryMap, err := GetRepositoriesMapByIDs(ids)
if err == nil {
for _, task := range cloudbrains {
task.Repo = repositoryMap[task.RepoID]
}
}
return cloudbrains, nil return cloudbrains, nil
} }




+ 93
- 0
models/resource_specification.go View File

@@ -3,6 +3,7 @@ package models
import ( import (
"code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/timeutil"
"fmt" "fmt"
"strings"
"xorm.io/builder" "xorm.io/builder"
) )


@@ -197,12 +198,104 @@ type Specification struct {
AiCenterName string AiCenterName string
IsExclusive bool IsExclusive bool
ExclusiveOrg string ExclusiveOrg string
//specs that have the same sourceSpecId, computeResource and cluster as current spec
RelatedSpecs []*Specification
} }


func (Specification) TableName() string { func (Specification) TableName() string {
return "resource_specification" return "resource_specification"
} }


func (s *Specification) loadRelatedSpecs() {
if s.RelatedSpecs != nil {
return
}
defaultSpecs := make([]*Specification, 0)
if s.SourceSpecId == "" {
s.RelatedSpecs = defaultSpecs
return
}
r, err := FindSpecs(FindSpecsOptions{
ComputeResource: s.ComputeResource,
Cluster: s.Cluster,
SourceSpecId: s.SourceSpecId,
RequestAll: true,
SpecStatus: SpecOnShelf,
})
if err != nil {
s.RelatedSpecs = defaultSpecs
return
}
s.RelatedSpecs = r
}
func (s *Specification) GetAvailableCenterIds(userIds ...int64) []string {
s.loadRelatedSpecs()

if len(s.RelatedSpecs) == 0 {
return make([]string, 0)
}

var uId int64
if len(userIds) > 0 {
uId = userIds[0]
}
//filter exclusive specs
specs := FilterExclusiveSpecs(s.RelatedSpecs, uId)

centerIds := make([]string, len(specs))
for i, v := range specs {
centerIds[i] = v.AiCenterCode
}
return centerIds
}

func FilterExclusiveSpecs(r []*Specification, userId int64) []*Specification {
if userId == 0 {
return r
}
specs := make([]*Specification, 0, len(r))
specMap := make(map[int64]string, 0)
for i := 0; i < len(r); i++ {
spec := r[i]
if _, has := specMap[spec.ID]; has {
continue
}
if !spec.IsExclusive {
specs = append(specs, spec)
specMap[spec.ID] = ""
continue
}
orgs := strings.Split(spec.ExclusiveOrg, ";")
for _, org := range orgs {
isMember, _ := IsOrganizationMemberByOrgName(org, userId)
if isMember {
specs = append(specs, spec)
specMap[spec.ID] = ""
break
}
}
}
return specs
}

func DistinctSpecs(r []*Specification) []*Specification {
specs := make([]*Specification, 0, len(r))
sourceSpecIdMap := make(map[string]string, 0)
for i := 0; i < len(r); i++ {
spec := r[i]
if spec.SourceSpecId == "" {
specs = append(specs, spec)
continue
}
if _, has := sourceSpecIdMap[spec.SourceSpecId]; has {
continue
}
specs = append(specs, spec)
sourceSpecIdMap[spec.SourceSpecId] = ""
}
return specs
}

func InsertResourceSpecification(r ResourceSpecification) (int64, error) { func InsertResourceSpecification(r ResourceSpecification) (int64, error) {
return x.Insert(&r) return x.Insert(&r)
} }


+ 17
- 2
modules/cron/tasks_basic.go View File

@@ -5,10 +5,13 @@
package cron package cron


import ( import (
"code.gitea.io/gitea/modules/urfs_client/urchin"
"code.gitea.io/gitea/modules/setting"
"context" "context"
"time" "time"


"code.gitea.io/gitea/modules/urfs_client/urchin"
cloudbrainService "code.gitea.io/gitea/services/cloudbrain"

"code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/services/cloudbrain/resource" "code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward" "code.gitea.io/gitea/services/reward"
@@ -190,6 +193,17 @@ func registerHandleRepoAndUserStatistic() {
}) })
} }


func registerHandleClearCloudbrainResult() {
RegisterTaskFatal("handle_cloudbrain_one_result_clear", &BaseConfig{
Enabled: true,
RunAtStart: setting.ClearStrategy.RunAtStart,
Schedule: setting.ClearStrategy.Cron,
}, func(ctx context.Context, _ *models.User, _ Config) error {
cloudbrainService.ClearCloudbrainResultSpace()
return nil
})
}

func registerHandleSummaryStatistic() { func registerHandleSummaryStatistic() {
RegisterTaskFatal("handle_summary_statistic", &BaseConfig{ RegisterTaskFatal("handle_summary_statistic", &BaseConfig{
Enabled: true, Enabled: true,
@@ -306,6 +320,7 @@ func initBasicTasks() {


registerHandleRepoAndUserStatistic() registerHandleRepoAndUserStatistic()
registerHandleSummaryStatistic() registerHandleSummaryStatistic()
registerHandleClearCloudbrainResult()


registerSyncCloudbrainStatus() registerSyncCloudbrainStatus()
registerHandleOrgStatistic() registerHandleOrgStatistic()
@@ -317,6 +332,6 @@ func initBasicTasks() {


registerHandleModelSafetyTask() registerHandleModelSafetyTask()


registerHandleScheduleRecord()
registerHandleScheduleRecord()
registerHandleCloudbrainDurationStatistic() registerHandleCloudbrainDurationStatistic()
} }

+ 1
- 4
modules/grampus/grampus.go View File

@@ -105,8 +105,6 @@ func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.Gram
func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) { func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
createTime := timeutil.TimeStampNow() createTime := timeutil.TimeStampNow()


centerID, centerName := getCentersParamter(ctx, req)

var datasetGrampus, modelGrampus []models.GrampusDataset var datasetGrampus, modelGrampus []models.GrampusDataset
var codeGrampus models.GrampusDataset var codeGrampus models.GrampusDataset
if ProcessorTypeNPU == req.ProcessType { if ProcessorTypeNPU == req.ProcessType {
@@ -138,8 +136,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str
ResourceSpecId: req.Spec.SourceSpecId, ResourceSpecId: req.Spec.SourceSpecId,
ImageId: req.ImageId, ImageId: req.ImageId,
ImageUrl: req.ImageUrl, ImageUrl: req.ImageUrl,
CenterID: centerID,
CenterName: centerName,
CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID),
ReplicaNum: 1, ReplicaNum: 1,
Datasets: datasetGrampus, Datasets: datasetGrampus,
Models: modelGrampus, Models: modelGrampus,


+ 24
- 0
modules/setting/setting.go View File

@@ -519,6 +519,7 @@ var (
CullIdleTimeout string CullIdleTimeout string
CullInterval string CullInterval string



//benchmark config //benchmark config
IsBenchmarkEnabled bool IsBenchmarkEnabled bool
BenchmarkOwner string BenchmarkOwner string
@@ -613,6 +614,16 @@ var (
UsageRateBeginTime string UsageRateBeginTime string
}{} }{}


ClearStrategy= struct {
Enabled bool
ResultSaveDays int
BatchSize int
DebugJobSize int
TrashSaveDays int
Cron string
RunAtStart bool
}{}

C2NetInfos *C2NetSqInfos C2NetInfos *C2NetSqInfos
CenterInfos *AiCenterInfos CenterInfos *AiCenterInfos
C2NetMapInfo map[string]*C2NetSequenceInfo C2NetMapInfo map[string]*C2NetSequenceInfo
@@ -1619,6 +1630,7 @@ func NewContext() {
getModelConvertConfig() getModelConvertConfig()
getModelSafetyConfig() getModelSafetyConfig()
getModelAppConfig() getModelAppConfig()
getClearStrategy()
} }


func getModelSafetyConfig() { func getModelSafetyConfig() {
@@ -1679,6 +1691,18 @@ func getModelartsCDConfig() {
getNotebookFlavorInfos() getNotebookFlavorInfos()
} }


func getClearStrategy(){

sec := Cfg.Section("clear_strategy")
ClearStrategy.Enabled=sec.Key("ENABLED").MustBool(false)
ClearStrategy.ResultSaveDays=sec.Key("RESULT_SAVE_DAYS").MustInt(30)
ClearStrategy.BatchSize=sec.Key("BATCH_SIZE").MustInt(500)
ClearStrategy.DebugJobSize=sec.Key("DEBUG_BATCH_SIZE").MustInt(100)
ClearStrategy.TrashSaveDays=sec.Key("TRASH_SAVE_DAYS").MustInt(90)
ClearStrategy.Cron=sec.Key("CRON").MustString("* 0,30 2-8 * * ?")
ClearStrategy.RunAtStart=sec.Key("RUN_AT_START").MustBool(false)
}

func getGrampusConfig() { func getGrampusConfig() {
sec := Cfg.Section("grampus") sec := Cfg.Section("grampus")




+ 1
- 0
options/locale/locale_en-US.ini View File

@@ -3251,6 +3251,7 @@ specification = specification
select_specification = select specification select_specification = select specification
description = description description = description
wrong_specification=You cannot use this specification, please choose another item. wrong_specification=You cannot use this specification, please choose another item.
result_cleared=The files of the task have been cleared, can not restart any more, please create a new debug task instead.
resource_use=Resource Occupancy resource_use=Resource Occupancy


job_name_rule = Please enter letters, numbers, _ and - up to 64 characters and cannot end with a dash (-). job_name_rule = Please enter letters, numbers, _ and - up to 64 characters and cannot end with a dash (-).


+ 2
- 0
options/locale/locale_zh-CN.ini View File

@@ -3271,6 +3271,8 @@ card_duration = 运行卡时
card_type = 卡类型 card_type = 卡类型
wrong_specification=您目前不能使用这个资源规格,请选择其他资源规格。 wrong_specification=您目前不能使用这个资源规格,请选择其他资源规格。


result_cleared=本任务的文件已被清理,无法再次调试,请新建调试任务。

job_name_rule = 请输入字母、数字、_和-,最长64个字符,且不能以中划线(-)结尾。 job_name_rule = 请输入字母、数字、_和-,最长64个字符,且不能以中划线(-)结尾。
train_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,预训练模型存放在运行参数 <strong style="color:#010101">ckpt_url</strong> 中,训练输出路径存储在运行参数 <strong style="color:#010101">train_url</strong> 中。 train_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,预训练模型存放在运行参数 <strong style="color:#010101">ckpt_url</strong> 中,训练输出路径存储在运行参数 <strong style="color:#010101">train_url</strong> 中。
infer_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,推理输出路径存储在运行参数 <strong style="color:#010101">result_url</strong> 中。 infer_dataset_path_rule = 数据集位置存储在运行参数 <strong style="color:#010101">data_url</strong> 中,推理输出路径存储在运行参数 <strong style="color:#010101">result_url</strong> 中。


+ 1
- 1
routers/api/v1/api.go View File

@@ -739,7 +739,7 @@ func RegisterRoutes(m *macaron.Macaron) {
}, reqToken(), repoAssignment()) }, reqToken(), repoAssignment())


m.Group("/file_notebook", func() { m.Group("/file_notebook", func() {
m.Get("", reqToken(), repo.GetFileNoteBookInfo)
m.Get("", repo.GetFileNoteBookInfo)
m.Post("/create", reqToken(), reqWeChat(), bind(api.CreateFileNotebookJobOption{}), repo.CreateFileNoteBook) m.Post("/create", reqToken(), reqWeChat(), bind(api.CreateFileNotebookJobOption{}), repo.CreateFileNoteBook)


}) })


+ 15
- 0
routers/api/v1/repo/cloudbrain_dashboard.go View File

@@ -968,6 +968,8 @@ func GetWaittingTop(ctx *context.Context) {
taskDetail.RepoID = ciTasks[i].RepoID taskDetail.RepoID = ciTasks[i].RepoID
if ciTasks[i].Repo != nil { if ciTasks[i].Repo != nil {
taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name
} else {
taskDetail.RepoName = ""
} }
WaitTimeInt := time.Now().Unix() - ciTasks[i].Cloudbrain.CreatedUnix.AsTime().Unix() WaitTimeInt := time.Now().Unix() - ciTasks[i].Cloudbrain.CreatedUnix.AsTime().Unix()
taskDetail.WaitTime = models.ConvertDurationToStr(WaitTimeInt) taskDetail.WaitTime = models.ConvertDurationToStr(WaitTimeInt)
@@ -975,6 +977,13 @@ func GetWaittingTop(ctx *context.Context) {
if WaitTimeInt < 0 { if WaitTimeInt < 0 {
taskDetail.WaitTime = "00:00:00" taskDetail.WaitTime = "00:00:00"
} }

taskDetail.ID = ciTasks[i].Cloudbrain.ID
taskDetail.ComputeResource = ciTasks[i].Cloudbrain.ComputeResource
taskDetail.JobType = ciTasks[i].Cloudbrain.JobType
taskDetail.JobID = ciTasks[i].Cloudbrain.JobID
taskDetail.Type = ciTasks[i].Cloudbrain.Type

tasks = append(tasks, taskDetail) tasks = append(tasks, taskDetail)
} }
ctx.JSON(http.StatusOK, map[string]interface{}{ ctx.JSON(http.StatusOK, map[string]interface{}{
@@ -1001,6 +1010,12 @@ func GetRunningTop(ctx *context.Context) {
taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name
} }


taskDetail.ID = ciTasks[i].Cloudbrain.ID
taskDetail.ComputeResource = ciTasks[i].Cloudbrain.ComputeResource
taskDetail.JobType = ciTasks[i].Cloudbrain.JobType
taskDetail.JobID = ciTasks[i].Cloudbrain.JobID
taskDetail.Type = ciTasks[i].Cloudbrain.Type

tasks = append(tasks, taskDetail) tasks = append(tasks, taskDetail)
} }
ctx.JSON(http.StatusOK, map[string]interface{}{ ctx.JSON(http.StatusOK, map[string]interface{}{


+ 12
- 0
routers/repo/cloudbrain.go View File

@@ -670,6 +670,13 @@ func CloudBrainRestart(ctx *context.Context) {
break break
} }


if _, err := os.Stat(getOldJobPath(task)); err != nil {
log.Error("Can not find job minio path", err)
resultCode = "-1"
errorMsg = ctx.Tr("cloudbrain.result_cleared")
break
}

count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainOne, string(models.JobTypeDebug)) count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainOne, string(models.JobTypeDebug))
if err != nil { if err != nil {
log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"])
@@ -704,6 +711,11 @@ func CloudBrainRestart(ctx *context.Context) {
}) })
} }



func getOldJobPath(task *models.Cloudbrain) string {
return setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix + task.JobName
}

func CloudBrainBenchMarkShow(ctx *context.Context) { func CloudBrainBenchMarkShow(ctx *context.Context) {
cloudBrainShow(ctx, tplCloudBrainBenchmarkShow, models.JobTypeBenchmark) cloudBrainShow(ctx, tplCloudBrainBenchmarkShow, models.JobTypeBenchmark)
} }


+ 151
- 0
services/cloudbrain/clear.go View File

@@ -0,0 +1,151 @@
package cloudbrain

import (
"io/ioutil"
"os"
"sort"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
)

func ClearCloudbrainResultSpace() {
log.Info("clear cloudbrain one result space begin.")
if !setting.ClearStrategy.Enabled{
return
}

tasks, err := models.GetCloudBrainOneStoppedNotDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.BatchSize)
if err != nil {
log.Warn("Failed to get cloudbrain, clear result failed.", err)
return
}
debugTasks, err := models.GetCloudBrainOneStoppedDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.DebugJobSize)
if err != nil {
log.Warn("Failed to get debug cloudbrain.", err)

}
tasks=append(tasks,debugTasks...)

if err != nil {
log.Warn("Failed to get cloudbrain, clear result failed.", err)
return
}
var ids []int64
for _, task := range tasks {
err := DeleteCloudbrainOneJobStorage(task.JobName)
if err == nil {
log.Info("clear job in cloudbrain table:"+task.JobName)
ids = append(ids, task.ID)
}
}

err = models.UpdateCloudBrainRecordsCleared(ids)
if err != nil {
log.Warn("Failed to set cloudbrain cleared status", err)
}
//如果云脑表处理完了,通过遍历minio对象处理历史垃圾数据,如果存在的话
if len(tasks) < setting.ClearStrategy.BatchSize+setting.ClearStrategy.DebugJobSize {
clearLocalHistoryTrashFile()
clearMinioHistoryTrashFile()

}
log.Info("clear cloudbrain one result space end.")

}

func clearMinioHistoryTrashFile() {
JobRealPrefix := setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix

miniofiles, err := ioutil.ReadDir(JobRealPrefix)

processCount := 0
if err != nil {
log.Warn("Can not browser minio job path.")
} else {
SortModTimeAscend(miniofiles)
for _, file := range miniofiles {

if file.Name()!="" && file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) {

has,err:=models.IsCloudbrainExistByJobName(file.Name())
if err==nil && !has {
dirPath := setting.CBCodePathPrefix + file.Name() + "/"
log.Info("clear job in minio trash:" + file.Name())
storage.Attachments.DeleteDir(dirPath)
processCount++
}
if processCount == setting.ClearStrategy.BatchSize {
break
}
} else {
break
}

}

}
}

func clearLocalHistoryTrashFile() {
files, err := ioutil.ReadDir(setting.JobPath)
processCount := 0
if err != nil {
log.Warn("Can not browser local job path.")
} else {
SortModTimeAscend(files)
for _, file := range files {
//清理n天前的历史垃圾数据,清理job目录
if file.Name()!="" && file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) {
has,err:=models.IsCloudbrainExistByJobName(file.Name())
if err==nil && !has{
os.RemoveAll(setting.JobPath + file.Name())
log.Info("clear job in local trash:"+file.Name())
processCount++
}
if processCount == setting.ClearStrategy.BatchSize {
break
}
} else {
break
}

}

}

}

func SortModTimeAscend(files []os.FileInfo) {
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
}

func DeleteCloudbrainOneJobStorage(jobName string) error {

if jobName==""{
return nil
}
//delete local
localJobPath := setting.JobPath + jobName
err := os.RemoveAll(localJobPath)
if err != nil {
log.Error("RemoveAll(%s) failed:%v", localJobPath, err)
}

dirPath := setting.CBCodePathPrefix + jobName + "/"
err1 := storage.Attachments.DeleteDir(dirPath)

if err1 != nil {
log.Error("DeleteDir(%s) failed:%v", localJobPath, err)
}
if err == nil {
err = err1
}

return err
}

+ 2
- 46
services/cloudbrain/resource/resource_specification.go View File

@@ -246,10 +246,10 @@ func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.S
return nil, err return nil, err
} }
//filter exclusive specs //filter exclusive specs
specs := filterExclusiveSpecs(r, userId)
specs := models.FilterExclusiveSpecs(r, userId)


//distinct by sourceSpecId //distinct by sourceSpecId
specs = distinctSpecs(specs)
specs = models.DistinctSpecs(specs)
return specs, err return specs, err
} }


@@ -265,50 +265,6 @@ func FindAvailableSpecs4Show(userId int64, opts models.FindSpecsOptions) ([]*api
return result, nil return result, nil
} }


func filterExclusiveSpecs(r []*models.Specification, userId int64) []*models.Specification {
specs := make([]*models.Specification, 0, len(r))
specMap := make(map[int64]string, 0)
for i := 0; i < len(r); i++ {
spec := r[i]
if _, has := specMap[spec.ID]; has {
continue
}
if !spec.IsExclusive {
specs = append(specs, spec)
specMap[spec.ID] = ""
continue
}
orgs := strings.Split(spec.ExclusiveOrg, ";")
for _, org := range orgs {
isMember, _ := models.IsOrganizationMemberByOrgName(org, userId)
if isMember {
specs = append(specs, spec)
specMap[spec.ID] = ""
break
}
}
}
return specs
}

func distinctSpecs(r []*models.Specification) []*models.Specification {
specs := make([]*models.Specification, 0, len(r))
sourceSpecIdMap := make(map[string]string, 0)
for i := 0; i < len(r); i++ {
spec := r[i]
if spec.SourceSpecId == "" {
specs = append(specs, spec)
continue
}
if _, has := sourceSpecIdMap[spec.SourceSpecId]; has {
continue
}
specs = append(specs, spec)
sourceSpecIdMap[spec.SourceSpecId] = ""
}
return specs
}

func GetAndCheckSpec(userId int64, specId int64, opts models.FindSpecsOptions) (*models.Specification, error) { func GetAndCheckSpec(userId int64, specId int64, opts models.FindSpecsOptions) (*models.Specification, error) {
if specId == 0 { if specId == 0 {
return nil, nil return nil, nil


Loading…
Cancel
Save