Browse Source

Merge branch 'V20221102' into bug-3137

pull/3154/head
liuzx 2 years ago
parent
commit
99c2f6e8d3
4 changed files with 56 additions and 41 deletions
  1. +43
    -36
      modules/storage/obs.go
  2. +10
    -2
      routers/api/v1/repo/modelarts.go
  3. +2
    -2
      routers/repo/cloudbrain.go
  4. +1
    -1
      routers/repo/grampus.go

+ 43
- 36
modules/storage/obs.go View File

@@ -367,51 +367,58 @@ func GetOneLevelAllObjectUnderDir(bucket string, prefixRootPath string, relative
if !strings.HasSuffix(input.Prefix, "/") {
input.Prefix += "/"
}
output, err := ObsCli.ListObjects(input)
fileInfos := make([]FileInfo, 0)
prefixLen := len(input.Prefix)
fileMap := make(map[string]bool, 0)
if err == nil {
for _, val := range output.Contents {
log.Info("val key=" + val.Key)
var isDir bool
var fileName string
if val.Key == input.Prefix {
continue
}
fileName = val.Key[prefixLen:]
log.Info("fileName =" + fileName)
files := strings.Split(fileName, "/")
if fileMap[files[0]] {
continue
} else {
fileMap[files[0]] = true
index := 1
for {
output, err := ObsCli.ListObjects(input)
if err == nil {
log.Info("Page:%d\n", index)
index++
for _, val := range output.Contents {
var isDir bool
var fileName string
if val.Key == input.Prefix {
continue
}
fileName = val.Key[prefixLen:]
files := strings.Split(fileName, "/")
if fileMap[files[0]] {
continue
} else {
fileMap[files[0]] = true
}
ParenDir := relativePath
fileName = files[0]
if len(files) > 1 {
isDir = true
ParenDir += fileName + "/"
} else {
isDir = false
}
fileInfo := FileInfo{
ModTime: val.LastModified.Local().Format("2006-01-02 15:04:05"),
FileName: fileName,
Size: val.Size,
IsDir: isDir,
ParenDir: ParenDir,
}
fileInfos = append(fileInfos, fileInfo)
}
ParenDir := relativePath
fileName = files[0]
if len(files) > 1 {
isDir = true
ParenDir += fileName + "/"
if output.IsTruncated {
input.Marker = output.NextMarker
} else {
isDir = false
break
}
fileInfo := FileInfo{
ModTime: val.LastModified.Local().Format("2006-01-02 15:04:05"),
FileName: fileName,
Size: val.Size,
IsDir: isDir,
ParenDir: ParenDir,
} else {
if obsError, ok := err.(obs.ObsError); ok {
log.Error("Code:%s, Message:%s", obsError.Code, obsError.Message)
}
fileInfos = append(fileInfos, fileInfo)
}
return fileInfos, err
} else {
if obsError, ok := err.(obs.ObsError); ok {
log.Error("Code:%s, Message:%s", obsError.Code, obsError.Message)
return nil, err
}
return nil, err
}
return fileInfos, nil
}

func GetAllObjectByBucketAndPrefix(bucket string, prefix string) ([]FileInfo, error) {


+ 10
- 2
routers/api/v1/repo/modelarts.go View File

@@ -182,7 +182,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {
}
if oldStatus != job.Status {
notification.NotifyChangeCloudbrainStatus(job, oldStatus)
if models.IsTrainJobTerminal(job.Status) {
if models.IsTrainJobTerminal(job.Status) && job.ComputeResource == models.NPUResource {
if len(result.JobInfo.Tasks[0].CenterID) == 1 {
urchin.GetBackNpuModel(job.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(job.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID))
}
@@ -470,7 +470,11 @@ func ModelList(ctx *context.APIContext) {
status := models.StorageScheduleSucceed
var fileInfos []storage.FileInfo
if task.ComputeResource == models.NPUResource {
fileInfos, err = storage.GetObsListObject(task.JobName, "output/", parentDir, versionName)
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, task.JobName, setting.OutPutPath, versionName), "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
fileInfos, err = storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, parentDir)
if err != nil {
log.Info("get TrainJobListModel failed:", err)
ctx.ServerError("GetObsListObject:", err)
@@ -484,6 +488,10 @@ func ModelList(ctx *context.APIContext) {
if models.IsTrainJobTerminal(task.Status) {
if task.Status == models.GrampusStatusStopped {
status = models.StorageNoFile
} else if task.Status == models.GrampusStatusFailed {
if task.AiCenter == "" {
status = models.StorageNoFile
}
} else {
record, _ := models.GetScheduleRecordByCloudbrainID(task.ID)
if record != nil {


+ 2
- 2
routers/repo/cloudbrain.go View File

@@ -757,7 +757,7 @@ func cloudBrainShow(ctx *context.Context, tpName base.TplName, jobType models.Jo
if ctx.Written() {
return
}
if task.Status==string(models.JobWaiting) || task.Status==string(models.JobRunning) {
if task.Status == string(models.JobWaiting) || task.Status == string(models.JobRunning) {
result, err := cloudbrain.GetJob(task.JobID)
if err != nil {
log.Info("error:" + err.Error())
@@ -1944,7 +1944,7 @@ func SyncCloudbrainStatus() {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
if models.IsTrainJobTerminal(task.Status) {
if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource {
if len(result.JobInfo.Tasks[0].CenterID) == 1 {
urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID))
}


+ 1
- 1
routers/repo/grampus.go View File

@@ -879,7 +879,7 @@ func GrampusTrainJobShow(ctx *context.Context) {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
if models.IsTrainJobTerminal(task.Status) {
if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource {
if len(result.JobInfo.Tasks[0].CenterID) == 1 {
urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID))
}


Loading…
Cancel
Save