diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 1e7c702ab..aeed8629c 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -204,6 +204,7 @@ type Cloudbrain struct { BenchmarkTypeRankLink string `xorm:"-"` StartTime timeutil.TimeStamp EndTime timeutil.TimeStamp + Cleared bool `xorm:"DEFAULT false"` Spec *Specification `xorm:"-"` } @@ -1905,6 +1906,12 @@ func GetCloudbrainByID(id string) (*Cloudbrain, error) { return getRepoCloudBrain(cb) } +func IsCloudbrainExistByJobName(jobName string)(bool,error){ + return x.Unscoped().Exist(&Cloudbrain{ + JobName: jobName, + }) +} + func GetCloudbrainByIDWithDeleted(id string) (*Cloudbrain, error) { idInt64, _ := strconv.ParseInt(id, 10, 64) cb := &Cloudbrain{ID: idInt64} @@ -2050,6 +2057,83 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) { Find(&cloudbrains) } +func GetCloudBrainOneStoppedNotDebugJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) { + cloudbrains := make([]*Cloudbrain, 0, 10) + endTimeBefore := time.Now().Unix() - int64(days)*24*3600 + missEndTimeBefore := endTimeBefore - 24*3600 + return cloudbrains, x.Unscoped().Cols("id,job_name,job_id"). + In("status", + JobStopped, JobSucceeded, JobFailed, ModelArtsCreateFailed, ModelArtsStartFailed, ModelArtsUnavailable, ModelArtsResizFailed, ModelArtsDeleted, + ModelArtsStopped, ModelArtsTrainJobCanceled, ModelArtsTrainJobCheckFailed, ModelArtsTrainJobCompleted, ModelArtsTrainJobDeleteFailed, ModelArtsTrainJobDeployServiceFailed, + ModelArtsTrainJobFailed, ModelArtsTrainJobImageFailed, ModelArtsTrainJobKilled, ModelArtsTrainJobLost, ModelArtsTrainJobSubmitFailed, ModelArtsTrainJobSubmitModelFailed). + Where("(((end_time is null or end_time=0) and updated_unix 0 { + idsIn := "" + for i, id := range tempIds { + if i == 0 { + idsIn += strconv.FormatInt(id, 10) + } else { + idsIn += "," + strconv.FormatInt(id, 10) + } + } + + _, errTemp := x.Unscoped().Exec("update cloudbrain set cleared=true where id in (" + idsIn + ")") + if errTemp != nil { + err = errTemp + } + + } + + } + return err + +} + +func getPageIds(ids []int64, page int, pagesize int) []int64 { + begin := (page - 1) * pagesize + end := (page) * pagesize + + if begin > len(ids)-1 { + return []int64{} + } + if end > len(ids)-1 { + return ids[begin:] + } else { + return ids[begin:end] + } + +} + func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) { cloudbrains := make([]*Cloudbrain, 0) return cloudbrains, x. diff --git a/models/cloudbrain_static.go b/models/cloudbrain_static.go index a213f179c..40d7a2a2e 100644 --- a/models/cloudbrain_static.go +++ b/models/cloudbrain_static.go @@ -183,6 +183,17 @@ func GetWaittingTop() ([]*CloudbrainInfo, error) { Find(&cloudbrains); err != nil { log.Info("find error.") } + + var ids []int64 + for _, task := range cloudbrains { + ids = append(ids, task.RepoID) + } + repositoryMap, err := GetRepositoriesMapByIDs(ids) + if err == nil { + for _, task := range cloudbrains { + task.Repo = repositoryMap[task.RepoID] + } + } return cloudbrains, nil } @@ -199,6 +210,16 @@ func GetRunningTop() ([]*CloudbrainInfo, error) { Find(&cloudbrains); err != nil { log.Info("find error.") } + var ids []int64 + for _, task := range cloudbrains { + ids = append(ids, task.RepoID) + } + repositoryMap, err := GetRepositoriesMapByIDs(ids) + if err == nil { + for _, task := range cloudbrains { + task.Repo = repositoryMap[task.RepoID] + } + } return cloudbrains, nil } diff --git a/models/resource_specification.go b/models/resource_specification.go index 2f815818b..809a3496a 100644 --- a/models/resource_specification.go +++ b/models/resource_specification.go @@ -3,6 +3,7 @@ package models import ( "code.gitea.io/gitea/modules/timeutil" "fmt" + "strings" "xorm.io/builder" ) @@ -197,12 +198,104 @@ type Specification struct { AiCenterName string IsExclusive bool ExclusiveOrg string + //specs that have the same sourceSpecId, computeResource and cluster as current spec + RelatedSpecs []*Specification } func (Specification) TableName() string { return "resource_specification" } +func (s *Specification) loadRelatedSpecs() { + if s.RelatedSpecs != nil { + return + } + defaultSpecs := make([]*Specification, 0) + if s.SourceSpecId == "" { + s.RelatedSpecs = defaultSpecs + return + } + r, err := FindSpecs(FindSpecsOptions{ + ComputeResource: s.ComputeResource, + Cluster: s.Cluster, + SourceSpecId: s.SourceSpecId, + RequestAll: true, + SpecStatus: SpecOnShelf, + }) + if err != nil { + s.RelatedSpecs = defaultSpecs + return + } + s.RelatedSpecs = r +} +func (s *Specification) GetAvailableCenterIds(userIds ...int64) []string { + s.loadRelatedSpecs() + + if len(s.RelatedSpecs) == 0 { + return make([]string, 0) + } + + var uId int64 + if len(userIds) > 0 { + uId = userIds[0] + } + //filter exclusive specs + specs := FilterExclusiveSpecs(s.RelatedSpecs, uId) + + centerIds := make([]string, len(specs)) + for i, v := range specs { + centerIds[i] = v.AiCenterCode + } + return centerIds +} + +func FilterExclusiveSpecs(r []*Specification, userId int64) []*Specification { + if userId == 0 { + return r + } + specs := make([]*Specification, 0, len(r)) + specMap := make(map[int64]string, 0) + for i := 0; i < len(r); i++ { + spec := r[i] + if _, has := specMap[spec.ID]; has { + continue + } + if !spec.IsExclusive { + specs = append(specs, spec) + specMap[spec.ID] = "" + continue + } + orgs := strings.Split(spec.ExclusiveOrg, ";") + for _, org := range orgs { + isMember, _ := IsOrganizationMemberByOrgName(org, userId) + if isMember { + specs = append(specs, spec) + specMap[spec.ID] = "" + break + } + } + } + return specs +} + +func DistinctSpecs(r []*Specification) []*Specification { + specs := make([]*Specification, 0, len(r)) + sourceSpecIdMap := make(map[string]string, 0) + for i := 0; i < len(r); i++ { + spec := r[i] + if spec.SourceSpecId == "" { + specs = append(specs, spec) + continue + } + if _, has := sourceSpecIdMap[spec.SourceSpecId]; has { + continue + } + specs = append(specs, spec) + sourceSpecIdMap[spec.SourceSpecId] = "" + } + return specs +} + func InsertResourceSpecification(r ResourceSpecification) (int64, error) { return x.Insert(&r) } diff --git a/modules/cron/tasks_basic.go b/modules/cron/tasks_basic.go index 985a82cdb..6a1fc6e39 100755 --- a/modules/cron/tasks_basic.go +++ b/modules/cron/tasks_basic.go @@ -5,10 +5,13 @@ package cron import ( - "code.gitea.io/gitea/modules/urfs_client/urchin" + "code.gitea.io/gitea/modules/setting" "context" "time" + "code.gitea.io/gitea/modules/urfs_client/urchin" + cloudbrainService "code.gitea.io/gitea/services/cloudbrain" + "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/services/cloudbrain/resource" "code.gitea.io/gitea/services/reward" @@ -190,6 +193,17 @@ func registerHandleRepoAndUserStatistic() { }) } +func registerHandleClearCloudbrainResult() { + RegisterTaskFatal("handle_cloudbrain_one_result_clear", &BaseConfig{ + Enabled: true, + RunAtStart: setting.ClearStrategy.RunAtStart, + Schedule: setting.ClearStrategy.Cron, + }, func(ctx context.Context, _ *models.User, _ Config) error { + cloudbrainService.ClearCloudbrainResultSpace() + return nil + }) +} + func registerHandleSummaryStatistic() { RegisterTaskFatal("handle_summary_statistic", &BaseConfig{ Enabled: true, @@ -306,6 +320,7 @@ func initBasicTasks() { registerHandleRepoAndUserStatistic() registerHandleSummaryStatistic() + registerHandleClearCloudbrainResult() registerSyncCloudbrainStatus() registerHandleOrgStatistic() @@ -317,6 +332,6 @@ func initBasicTasks() { registerHandleModelSafetyTask() - registerHandleScheduleRecord() + registerHandleScheduleRecord() registerHandleCloudbrainDurationStatistic() } diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 280407240..34d7d3fe0 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -105,8 +105,6 @@ func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.Gram func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) { createTime := timeutil.TimeStampNow() - centerID, centerName := getCentersParamter(ctx, req) - var datasetGrampus, modelGrampus []models.GrampusDataset var codeGrampus models.GrampusDataset if ProcessorTypeNPU == req.ProcessType { @@ -138,8 +136,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str ResourceSpecId: req.Spec.SourceSpecId, ImageId: req.ImageId, ImageUrl: req.ImageUrl, - CenterID: centerID, - CenterName: centerName, + CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID), ReplicaNum: 1, Datasets: datasetGrampus, Models: modelGrampus, diff --git a/modules/setting/setting.go b/modules/setting/setting.go index a473dad6a..bf7eb2352 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -519,6 +519,7 @@ var ( CullIdleTimeout string CullInterval string + //benchmark config IsBenchmarkEnabled bool BenchmarkOwner string @@ -613,6 +614,16 @@ var ( UsageRateBeginTime string }{} + ClearStrategy= struct { + Enabled bool + ResultSaveDays int + BatchSize int + DebugJobSize int + TrashSaveDays int + Cron string + RunAtStart bool + }{} + C2NetInfos *C2NetSqInfos CenterInfos *AiCenterInfos C2NetMapInfo map[string]*C2NetSequenceInfo @@ -1619,6 +1630,7 @@ func NewContext() { getModelConvertConfig() getModelSafetyConfig() getModelAppConfig() + getClearStrategy() } func getModelSafetyConfig() { @@ -1679,6 +1691,18 @@ func getModelartsCDConfig() { getNotebookFlavorInfos() } +func getClearStrategy(){ + + sec := Cfg.Section("clear_strategy") + ClearStrategy.Enabled=sec.Key("ENABLED").MustBool(false) + ClearStrategy.ResultSaveDays=sec.Key("RESULT_SAVE_DAYS").MustInt(30) + ClearStrategy.BatchSize=sec.Key("BATCH_SIZE").MustInt(500) + ClearStrategy.DebugJobSize=sec.Key("DEBUG_BATCH_SIZE").MustInt(100) + ClearStrategy.TrashSaveDays=sec.Key("TRASH_SAVE_DAYS").MustInt(90) + ClearStrategy.Cron=sec.Key("CRON").MustString("* 0,30 2-8 * * ?") + ClearStrategy.RunAtStart=sec.Key("RUN_AT_START").MustBool(false) +} + func getGrampusConfig() { sec := Cfg.Section("grampus") diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index bcb170f7a..082b35ca8 100755 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -3251,6 +3251,7 @@ specification = specification select_specification = select specification description = description wrong_specification=You cannot use this specification, please choose another item. +result_cleared=The files of the task have been cleared, can not restart any more, please create a new debug task instead. resource_use=Resource Occupancy job_name_rule = Please enter letters, numbers, _ and - up to 64 characters and cannot end with a dash (-). diff --git a/options/locale/locale_zh-CN.ini b/options/locale/locale_zh-CN.ini index d014878c2..f4e8f1aea 100755 --- a/options/locale/locale_zh-CN.ini +++ b/options/locale/locale_zh-CN.ini @@ -3271,6 +3271,8 @@ card_duration = 运行卡时 card_type = 卡类型 wrong_specification=您目前不能使用这个资源规格,请选择其他资源规格。 +result_cleared=本任务的文件已被清理,无法再次调试,请新建调试任务。 + job_name_rule = 请输入字母、数字、_和-,最长64个字符,且不能以中划线(-)结尾。 train_dataset_path_rule = 数据集位置存储在运行参数 data_url 中,预训练模型存放在运行参数 ckpt_url 中,训练输出路径存储在运行参数 train_url 中。 infer_dataset_path_rule = 数据集位置存储在运行参数 data_url 中,推理输出路径存储在运行参数 result_url 中。 diff --git a/routers/api/v1/api.go b/routers/api/v1/api.go index 1dfcc7588..15fc57107 100755 --- a/routers/api/v1/api.go +++ b/routers/api/v1/api.go @@ -739,7 +739,7 @@ func RegisterRoutes(m *macaron.Macaron) { }, reqToken(), repoAssignment()) m.Group("/file_notebook", func() { - m.Get("", reqToken(), repo.GetFileNoteBookInfo) + m.Get("", repo.GetFileNoteBookInfo) m.Post("/create", reqToken(), reqWeChat(), bind(api.CreateFileNotebookJobOption{}), repo.CreateFileNoteBook) }) diff --git a/routers/api/v1/repo/cloudbrain_dashboard.go b/routers/api/v1/repo/cloudbrain_dashboard.go index 7fe5d603c..0d68fff30 100755 --- a/routers/api/v1/repo/cloudbrain_dashboard.go +++ b/routers/api/v1/repo/cloudbrain_dashboard.go @@ -968,6 +968,8 @@ func GetWaittingTop(ctx *context.Context) { taskDetail.RepoID = ciTasks[i].RepoID if ciTasks[i].Repo != nil { taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name + } else { + taskDetail.RepoName = "" } WaitTimeInt := time.Now().Unix() - ciTasks[i].Cloudbrain.CreatedUnix.AsTime().Unix() taskDetail.WaitTime = models.ConvertDurationToStr(WaitTimeInt) @@ -975,6 +977,13 @@ func GetWaittingTop(ctx *context.Context) { if WaitTimeInt < 0 { taskDetail.WaitTime = "00:00:00" } + + taskDetail.ID = ciTasks[i].Cloudbrain.ID + taskDetail.ComputeResource = ciTasks[i].Cloudbrain.ComputeResource + taskDetail.JobType = ciTasks[i].Cloudbrain.JobType + taskDetail.JobID = ciTasks[i].Cloudbrain.JobID + taskDetail.Type = ciTasks[i].Cloudbrain.Type + tasks = append(tasks, taskDetail) } ctx.JSON(http.StatusOK, map[string]interface{}{ @@ -1001,6 +1010,12 @@ func GetRunningTop(ctx *context.Context) { taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name } + taskDetail.ID = ciTasks[i].Cloudbrain.ID + taskDetail.ComputeResource = ciTasks[i].Cloudbrain.ComputeResource + taskDetail.JobType = ciTasks[i].Cloudbrain.JobType + taskDetail.JobID = ciTasks[i].Cloudbrain.JobID + taskDetail.Type = ciTasks[i].Cloudbrain.Type + tasks = append(tasks, taskDetail) } ctx.JSON(http.StatusOK, map[string]interface{}{ diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index d3d76f440..a23cd5462 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -670,6 +670,13 @@ func CloudBrainRestart(ctx *context.Context) { break } + if _, err := os.Stat(getOldJobPath(task)); err != nil { + log.Error("Can not find job minio path", err) + resultCode = "-1" + errorMsg = ctx.Tr("cloudbrain.result_cleared") + break + } + count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainOne, string(models.JobTypeDebug)) if err != nil { log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) @@ -704,6 +711,11 @@ func CloudBrainRestart(ctx *context.Context) { }) } + +func getOldJobPath(task *models.Cloudbrain) string { + return setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix + task.JobName +} + func CloudBrainBenchMarkShow(ctx *context.Context) { cloudBrainShow(ctx, tplCloudBrainBenchmarkShow, models.JobTypeBenchmark) } diff --git a/services/cloudbrain/clear.go b/services/cloudbrain/clear.go new file mode 100644 index 000000000..44613ee3c --- /dev/null +++ b/services/cloudbrain/clear.go @@ -0,0 +1,151 @@ +package cloudbrain + +import ( + "io/ioutil" + "os" + "sort" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/storage" +) + +func ClearCloudbrainResultSpace() { + log.Info("clear cloudbrain one result space begin.") + if !setting.ClearStrategy.Enabled{ + return + } + + tasks, err := models.GetCloudBrainOneStoppedNotDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.BatchSize) + if err != nil { + log.Warn("Failed to get cloudbrain, clear result failed.", err) + return + } + debugTasks, err := models.GetCloudBrainOneStoppedDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.DebugJobSize) + if err != nil { + log.Warn("Failed to get debug cloudbrain.", err) + + } + tasks=append(tasks,debugTasks...) + + if err != nil { + log.Warn("Failed to get cloudbrain, clear result failed.", err) + return + } + var ids []int64 + for _, task := range tasks { + err := DeleteCloudbrainOneJobStorage(task.JobName) + if err == nil { + log.Info("clear job in cloudbrain table:"+task.JobName) + ids = append(ids, task.ID) + } + } + + err = models.UpdateCloudBrainRecordsCleared(ids) + if err != nil { + log.Warn("Failed to set cloudbrain cleared status", err) + } + //如果云脑表处理完了,通过遍历minio对象处理历史垃圾数据,如果存在的话 + if len(tasks) < setting.ClearStrategy.BatchSize+setting.ClearStrategy.DebugJobSize { + clearLocalHistoryTrashFile() + clearMinioHistoryTrashFile() + + } + log.Info("clear cloudbrain one result space end.") + +} + +func clearMinioHistoryTrashFile() { + JobRealPrefix := setting.Attachment.Minio.RealPath + setting.Attachment.Minio.Bucket + "/" + setting.CBCodePathPrefix + + miniofiles, err := ioutil.ReadDir(JobRealPrefix) + + processCount := 0 + if err != nil { + log.Warn("Can not browser minio job path.") + } else { + SortModTimeAscend(miniofiles) + for _, file := range miniofiles { + + if file.Name()!="" && file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { + + has,err:=models.IsCloudbrainExistByJobName(file.Name()) + if err==nil && !has { + dirPath := setting.CBCodePathPrefix + file.Name() + "/" + log.Info("clear job in minio trash:" + file.Name()) + storage.Attachments.DeleteDir(dirPath) + processCount++ + } + if processCount == setting.ClearStrategy.BatchSize { + break + } + } else { + break + } + + } + + } +} + +func clearLocalHistoryTrashFile() { + files, err := ioutil.ReadDir(setting.JobPath) + processCount := 0 + if err != nil { + log.Warn("Can not browser local job path.") + } else { + SortModTimeAscend(files) + for _, file := range files { + //清理n天前的历史垃圾数据,清理job目录 + if file.Name()!="" && file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { + has,err:=models.IsCloudbrainExistByJobName(file.Name()) + if err==nil && !has{ + os.RemoveAll(setting.JobPath + file.Name()) + log.Info("clear job in local trash:"+file.Name()) + processCount++ + } + if processCount == setting.ClearStrategy.BatchSize { + break + } + } else { + break + } + + } + + } + +} + +func SortModTimeAscend(files []os.FileInfo) { + sort.Slice(files, func(i, j int) bool { + return files[i].ModTime().Before(files[j].ModTime()) + }) +} + +func DeleteCloudbrainOneJobStorage(jobName string) error { + + if jobName==""{ + return nil + } + //delete local + localJobPath := setting.JobPath + jobName + err := os.RemoveAll(localJobPath) + if err != nil { + log.Error("RemoveAll(%s) failed:%v", localJobPath, err) + } + + dirPath := setting.CBCodePathPrefix + jobName + "/" + err1 := storage.Attachments.DeleteDir(dirPath) + + if err1 != nil { + log.Error("DeleteDir(%s) failed:%v", localJobPath, err) + } + if err == nil { + err = err1 + } + + return err +} diff --git a/services/cloudbrain/resource/resource_specification.go b/services/cloudbrain/resource/resource_specification.go index 8f4182d87..5070d7c1e 100644 --- a/services/cloudbrain/resource/resource_specification.go +++ b/services/cloudbrain/resource/resource_specification.go @@ -246,10 +246,10 @@ func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.S return nil, err } //filter exclusive specs - specs := filterExclusiveSpecs(r, userId) + specs := models.FilterExclusiveSpecs(r, userId) //distinct by sourceSpecId - specs = distinctSpecs(specs) + specs = models.DistinctSpecs(specs) return specs, err } @@ -265,50 +265,6 @@ func FindAvailableSpecs4Show(userId int64, opts models.FindSpecsOptions) ([]*api return result, nil } -func filterExclusiveSpecs(r []*models.Specification, userId int64) []*models.Specification { - specs := make([]*models.Specification, 0, len(r)) - specMap := make(map[int64]string, 0) - for i := 0; i < len(r); i++ { - spec := r[i] - if _, has := specMap[spec.ID]; has { - continue - } - if !spec.IsExclusive { - specs = append(specs, spec) - specMap[spec.ID] = "" - continue - } - orgs := strings.Split(spec.ExclusiveOrg, ";") - for _, org := range orgs { - isMember, _ := models.IsOrganizationMemberByOrgName(org, userId) - if isMember { - specs = append(specs, spec) - specMap[spec.ID] = "" - break - } - } - } - return specs -} - -func distinctSpecs(r []*models.Specification) []*models.Specification { - specs := make([]*models.Specification, 0, len(r)) - sourceSpecIdMap := make(map[string]string, 0) - for i := 0; i < len(r); i++ { - spec := r[i] - if spec.SourceSpecId == "" { - specs = append(specs, spec) - continue - } - if _, has := sourceSpecIdMap[spec.SourceSpecId]; has { - continue - } - specs = append(specs, spec) - sourceSpecIdMap[spec.SourceSpecId] = "" - } - return specs -} - func GetAndCheckSpec(userId int64, specId int64, opts models.FindSpecsOptions) (*models.Specification, error) { if specId == 0 { return nil, nil