@@ -1972,6 +1972,12 @@ func GetCloudbrainByID(id string) (*Cloudbrain, error) { | |||||
return getRepoCloudBrain(cb) | return getRepoCloudBrain(cb) | ||||
} | } | ||||
func IsCloudbrainExistByJobName(jobName string)(bool,error){ | |||||
return x.Unscoped().Exist(&Cloudbrain{ | |||||
JobName: jobName, | |||||
}) | |||||
} | |||||
func GetCloudbrainByIDWithDeleted(id string) (*Cloudbrain, error) { | func GetCloudbrainByIDWithDeleted(id string) (*Cloudbrain, error) { | ||||
idInt64, _ := strconv.ParseInt(id, 10, 64) | idInt64, _ := strconv.ParseInt(id, 10, 64) | ||||
cb := &Cloudbrain{ID: idInt64} | cb := &Cloudbrain{ID: idInt64} | ||||
@@ -2117,19 +2123,37 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) { | |||||
Find(&cloudbrains) | Find(&cloudbrains) | ||||
} | } | ||||
func GetCloudBrainOneStoppedJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) { | |||||
func GetCloudBrainOneStoppedNotDebugJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) { | |||||
cloudbrains := make([]*Cloudbrain, 0, 10) | cloudbrains := make([]*Cloudbrain, 0, 10) | ||||
endTimeBefore := time.Now().Unix() - int64(days)*24*3600 | endTimeBefore := time.Now().Unix() - int64(days)*24*3600 | ||||
missEndTimeBefore := endTimeBefore - 24*3600 | missEndTimeBefore := endTimeBefore - 24*3600 | ||||
return cloudbrains, x.Cols("id,job_name,job_id"). | |||||
return cloudbrains, x.Unscoped().Cols("id,job_name,job_id"). | |||||
In("status", | In("status", | ||||
JobStopped, JobSucceeded, JobFailed, ModelArtsCreateFailed, ModelArtsStartFailed, ModelArtsUnavailable, ModelArtsResizFailed, ModelArtsDeleted, | JobStopped, JobSucceeded, JobFailed, ModelArtsCreateFailed, ModelArtsStartFailed, ModelArtsUnavailable, ModelArtsResizFailed, ModelArtsDeleted, | ||||
ModelArtsStopped, ModelArtsTrainJobCanceled, ModelArtsTrainJobCheckFailed, ModelArtsTrainJobCompleted, ModelArtsTrainJobDeleteFailed, ModelArtsTrainJobDeployServiceFailed, | ModelArtsStopped, ModelArtsTrainJobCanceled, ModelArtsTrainJobCheckFailed, ModelArtsTrainJobCompleted, ModelArtsTrainJobDeleteFailed, ModelArtsTrainJobDeployServiceFailed, | ||||
ModelArtsTrainJobFailed, ModelArtsTrainJobImageFailed, ModelArtsTrainJobKilled, ModelArtsTrainJobLost, ModelArtsTrainJobSubmitFailed, ModelArtsTrainJobSubmitModelFailed). | ModelArtsTrainJobFailed, ModelArtsTrainJobImageFailed, ModelArtsTrainJobKilled, ModelArtsTrainJobLost, ModelArtsTrainJobSubmitFailed, ModelArtsTrainJobSubmitModelFailed). | ||||
Where("(((end_time is null or end_time=0) and updated_unix<? and updated_unix != 0 ) or (end_time<? and end_time != 0)) and cleared=false and type=0", missEndTimeBefore, endTimeBefore). | |||||
Where("(((end_time is null or end_time=0) and updated_unix<? and updated_unix != 0 ) or (end_time<? and end_time != 0)) and cleared=false and type=0 and job_type != 'DEBUG'", missEndTimeBefore, endTimeBefore). | |||||
Limit(limit). | Limit(limit). | ||||
Find(&cloudbrains) | Find(&cloudbrains) | ||||
} | } | ||||
/** | |||||
本方法考虑了再次调试的情况,多次调试取最后一次的任务的结束时间 | |||||
*/ | |||||
func GetCloudBrainOneStoppedDebugJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) { | |||||
cloudbrains := make([]*Cloudbrain, 0, 10) | |||||
endTimeBefore := time.Now().Unix() - int64(days)*24*3600 | |||||
missEndTimeBefore := endTimeBefore - 24*3600 | |||||
sql:=`SELECT id,job_name,job_id from (SELECT DISTINCT ON (job_name) | |||||
id, job_name, job_id,status,end_time,updated_unix,cleared | |||||
FROM cloudbrain | |||||
where type=0 and job_type='DEBUG' | |||||
ORDER BY job_name, updated_unix DESC) a | |||||
where status in ('STOPPED','SUCCEEDED','FAILED') and (((end_time is null or end_time=0) and updated_unix<? and updated_unix != 0 ) or (end_time<? and end_time != 0)) and cleared=false` | |||||
return cloudbrains, x.Unscoped().SQL(sql,missEndTimeBefore, endTimeBefore).Limit(limit).Find(&cloudbrains) | |||||
} | |||||
func UpdateCloudBrainRecordsCleared(ids []int64) error { | func UpdateCloudBrainRecordsCleared(ids []int64) error { | ||||
pageSize := 150 | pageSize := 150 | ||||
@@ -618,6 +618,7 @@ var ( | |||||
Enabled bool | Enabled bool | ||||
ResultSaveDays int | ResultSaveDays int | ||||
BatchSize int | BatchSize int | ||||
DebugJobSize int | |||||
TrashSaveDays int | TrashSaveDays int | ||||
Cron string | Cron string | ||||
RunAtStart bool | RunAtStart bool | ||||
@@ -1696,6 +1697,7 @@ func getClearStrategy(){ | |||||
ClearStrategy.Enabled=sec.Key("ENABLED").MustBool(false) | ClearStrategy.Enabled=sec.Key("ENABLED").MustBool(false) | ||||
ClearStrategy.ResultSaveDays=sec.Key("RESULT_SAVE_DAYS").MustInt(30) | ClearStrategy.ResultSaveDays=sec.Key("RESULT_SAVE_DAYS").MustInt(30) | ||||
ClearStrategy.BatchSize=sec.Key("BATCH_SIZE").MustInt(500) | ClearStrategy.BatchSize=sec.Key("BATCH_SIZE").MustInt(500) | ||||
ClearStrategy.DebugJobSize=sec.Key("DEBUG_BATCH_SIZE").MustInt(100) | |||||
ClearStrategy.TrashSaveDays=sec.Key("TRASH_SAVE_DAYS").MustInt(90) | ClearStrategy.TrashSaveDays=sec.Key("TRASH_SAVE_DAYS").MustInt(90) | ||||
ClearStrategy.Cron=sec.Key("CRON").MustString("* 0,30 2-8 * * ?") | ClearStrategy.Cron=sec.Key("CRON").MustString("* 0,30 2-8 * * ?") | ||||
ClearStrategy.RunAtStart=sec.Key("RUN_AT_START").MustBool(false) | ClearStrategy.RunAtStart=sec.Key("RUN_AT_START").MustBool(false) | ||||
@@ -13,11 +13,22 @@ import ( | |||||
) | ) | ||||
func ClearCloudbrainResultSpace() { | func ClearCloudbrainResultSpace() { | ||||
log.Info("clear cloudbrain one result space begin.") | |||||
if !setting.ClearStrategy.Enabled{ | if !setting.ClearStrategy.Enabled{ | ||||
return | return | ||||
} | } | ||||
tasks, err := models.GetCloudBrainOneStoppedJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.BatchSize) | |||||
tasks, err := models.GetCloudBrainOneStoppedNotDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.BatchSize) | |||||
if err != nil { | |||||
log.Warn("Failed to get cloudbrain, clear result failed.", err) | |||||
return | |||||
} | |||||
debugTasks, err := models.GetCloudBrainOneStoppedDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.DebugJobSize) | |||||
if err != nil { | |||||
log.Warn("Failed to get debug cloudbrain.", err) | |||||
} | |||||
tasks=append(tasks,debugTasks...) | |||||
if err != nil { | if err != nil { | ||||
log.Warn("Failed to get cloudbrain, clear result failed.", err) | log.Warn("Failed to get cloudbrain, clear result failed.", err) | ||||
@@ -42,6 +53,7 @@ func ClearCloudbrainResultSpace() { | |||||
clearMinioHistoryTrashFile() | clearMinioHistoryTrashFile() | ||||
} | } | ||||
log.Info("clear cloudbrain one result space end.") | |||||
} | } | ||||
@@ -58,10 +70,13 @@ func clearMinioHistoryTrashFile() { | |||||
for _, file := range miniofiles { | for _, file := range miniofiles { | ||||
if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { | if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { | ||||
dirPath := setting.CBCodePathPrefix + file.Name() + "/" | |||||
log.Info("clear job in minio trash:"+file.Name()) | |||||
storage.Attachments.DeleteDir(dirPath) | |||||
processCount++ | |||||
has,err:=models.IsCloudbrainExistByJobName(file.Name()) | |||||
if err==nil && !has { | |||||
dirPath := setting.CBCodePathPrefix + file.Name() + "/" | |||||
log.Info("clear job in minio trash:" + file.Name()) | |||||
storage.Attachments.DeleteDir(dirPath) | |||||
processCount++ | |||||
} | |||||
if processCount == setting.ClearStrategy.BatchSize { | if processCount == setting.ClearStrategy.BatchSize { | ||||
break | break | ||||
} | } | ||||
@@ -84,9 +99,12 @@ func clearLocalHistoryTrashFile() { | |||||
for _, file := range files { | for _, file := range files { | ||||
//清理n天前的历史垃圾数据,清理job目录 | //清理n天前的历史垃圾数据,清理job目录 | ||||
if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { | if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { | ||||
os.RemoveAll(setting.JobPath + file.Name()) | |||||
log.Info("clear job in local trash:"+file.Name()) | |||||
processCount++ | |||||
has,err:=models.IsCloudbrainExistByJobName(file.Name()) | |||||
if err==nil && !has{ | |||||
os.RemoveAll(setting.JobPath + file.Name()) | |||||
log.Info("clear job in local trash:"+file.Name()) | |||||
processCount++ | |||||
} | |||||
if processCount == setting.ClearStrategy.BatchSize { | if processCount == setting.ClearStrategy.BatchSize { | ||||
break | break | ||||
} | } | ||||
@@ -105,13 +123,6 @@ func SortModTimeAscend(files []os.FileInfo) { | |||||
return files[i].ModTime().Before(files[j].ModTime()) | return files[i].ModTime().Before(files[j].ModTime()) | ||||
}) | }) | ||||
} | } | ||||
func SortModTimeAscendForMinio(files []storage.FileInfo) { | |||||
sort.Slice(files, func(i, j int) bool { | |||||
timeI, _ := time.Parse("2006-01-02 15:04:05", files[i].ModTime) | |||||
timeJ, _ := time.Parse("2006-01-02 15:04:05", files[i].ModTime) | |||||
return timeI.Before(timeJ) | |||||
}) | |||||
} | |||||
func DeleteCloudbrainOneJobStorage(jobName string) error { | func DeleteCloudbrainOneJobStorage(jobName string) error { | ||||
//delete local | //delete local | ||||