diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 6135dac40..7307b5712 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -116,6 +116,8 @@ const ( GrampusStatusStopped = "STOPPED" GrampusStatusUnknown = "UNKNOWN" GrampusStatusWaiting = "WAITING" + + ModelSuffix = "models.zip" ) const ( diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 5d92af5aa..815ec986c 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -27,7 +27,7 @@ const ( CodeArchiveName = "master.zip" BucketRemote = "grampus" - RemoteModelPath = "/output/models.zip" + RemoteModelPath = "/output/" + models.ModelSuffix ) var ( diff --git a/modules/urfs_client/urchin/schedule.go b/modules/urfs_client/urchin/schedule.go index 72266b589..73ea7b39d 100755 --- a/modules/urfs_client/urchin/schedule.go +++ b/modules/urfs_client/urchin/schedule.go @@ -1,14 +1,14 @@ package urchin import ( - "code.gitea.io/gitea/modules/labelmsg" - "code.gitea.io/gitea/modules/setting" "encoding/json" "fmt" "strings" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/labelmsg" "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" ) type DecompressReq struct { @@ -16,10 +16,6 @@ type DecompressReq struct { DestPath string `json:"dest_path"` } -const ( - modelSuffix = "models.zip" -) - var urfsClient Urchinfs func getUrfsClient() { @@ -54,7 +50,7 @@ func GetBackNpuModel(cloudbrainID int64, endpoint, bucket, objectKey, destPeerHo switch res.StatusCode { case models.StorageScheduleSucceed: log.Info("ScheduleDataToPeerByKey succeed") - decompress(res.DataRoot+"/"+res.DataPath, setting.Bucket+"/"+strings.TrimSuffix(res.DataPath, modelSuffix)) + decompress(res.DataRoot+"/"+res.DataPath, setting.Bucket+"/"+strings.TrimSuffix(res.DataPath, models.ModelSuffix)) case models.StorageScheduleProcessing: log.Info("ScheduleDataToPeerByKey processing") case models.StorageScheduleFailed: @@ -89,7 +85,7 @@ func HandleScheduleRecords() error { switch res.StatusCode { case models.StorageScheduleSucceed: log.Info("ScheduleDataToPeerByKey(%s) succeed", record.ObjectKey) - decompress(record.Bucket+"/"+record.ObjectKey, setting.Bucket+"/"+strings.TrimSuffix(record.ObjectKey, modelSuffix)) + decompress(record.Bucket+"/"+record.ObjectKey, setting.Bucket+"/"+strings.TrimSuffix(record.ObjectKey, models.ModelSuffix)) case models.StorageScheduleProcessing: log.Info("ScheduleDataToPeerByKey(%s) processing", record.ObjectKey) case models.StorageScheduleFailed: diff --git a/routers/api/v1/repo/modelarts.go b/routers/api/v1/repo/modelarts.go index e65fcecd3..627581d4a 100755 --- a/routers/api/v1/repo/modelarts.go +++ b/routers/api/v1/repo/modelarts.go @@ -182,7 +182,9 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { if oldStatus != job.Status { notification.NotifyChangeCloudbrainStatus(job, oldStatus) if models.IsTrainJobTerminal(job.Status) { - urchin.GetBackNpuModel(job.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(job.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + if len(result.JobInfo.Tasks[0].CenterID) == 1 { + urchin.GetBackNpuModel(job.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(job.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + } } } err = models.UpdateTrainJobVersion(job) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index ee18c725d..85fbddd55 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1941,7 +1941,9 @@ func SyncCloudbrainStatus() { if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) if models.IsTrainJobTerminal(task.Status) { - urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + if len(result.JobInfo.Tasks[0].CenterID) == 1 { + urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + } } } err = models.UpdateJob(task) diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index 6ba8e0f9d..3319ef36c 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -872,7 +872,9 @@ func GrampusTrainJobShow(ctx *context.Context) { if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) if models.IsTrainJobTerminal(task.Status) { - urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + if len(result.JobInfo.Tasks[0].CenterID) == 1 { + urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + } } } err = models.UpdateJob(task) @@ -1079,6 +1081,7 @@ func generateDatasetUnzipCommand(datasetName string) string { if strings.HasSuffix(datasetNameArray[0], ".tar.gz") { unZipDatasetCommand = "tar --strip-components=1 -zxvf '" + datasetName + "';" } + unZipDatasetCommand += "rm -f " + datasetName + ";" } else { //多数据集 for _, datasetNameTemp := range datasetNameArray { @@ -1087,6 +1090,7 @@ func generateDatasetUnzipCommand(datasetName string) string { } else { unZipDatasetCommand = unZipDatasetCommand + "unzip -q '" + datasetNameTemp + "' -d './" + strings.TrimSuffix(datasetNameTemp, ".zip") + "';" } + unZipDatasetCommand += "rm -f " + datasetNameTemp + ";" } }