@@ -116,6 +116,8 @@ const ( | |||||
GrampusStatusStopped = "STOPPED" | GrampusStatusStopped = "STOPPED" | ||||
GrampusStatusUnknown = "UNKNOWN" | GrampusStatusUnknown = "UNKNOWN" | ||||
GrampusStatusWaiting = "WAITING" | GrampusStatusWaiting = "WAITING" | ||||
ModelSuffix = "models.zip" | |||||
) | ) | ||||
const ( | const ( | ||||
@@ -27,7 +27,7 @@ const ( | |||||
CodeArchiveName = "master.zip" | CodeArchiveName = "master.zip" | ||||
BucketRemote = "grampus" | BucketRemote = "grampus" | ||||
RemoteModelPath = "/output/models.zip" | |||||
RemoteModelPath = "/output/" + models.ModelSuffix | |||||
) | ) | ||||
var ( | var ( | ||||
@@ -1,14 +1,14 @@ | |||||
package urchin | package urchin | ||||
import ( | import ( | ||||
"code.gitea.io/gitea/modules/labelmsg" | |||||
"code.gitea.io/gitea/modules/setting" | |||||
"encoding/json" | "encoding/json" | ||||
"fmt" | "fmt" | ||||
"strings" | "strings" | ||||
"code.gitea.io/gitea/models" | "code.gitea.io/gitea/models" | ||||
"code.gitea.io/gitea/modules/labelmsg" | |||||
"code.gitea.io/gitea/modules/log" | "code.gitea.io/gitea/modules/log" | ||||
"code.gitea.io/gitea/modules/setting" | |||||
) | ) | ||||
type DecompressReq struct { | type DecompressReq struct { | ||||
@@ -16,10 +16,6 @@ type DecompressReq struct { | |||||
DestPath string `json:"dest_path"` | DestPath string `json:"dest_path"` | ||||
} | } | ||||
const ( | |||||
modelSuffix = "models.zip" | |||||
) | |||||
var urfsClient Urchinfs | var urfsClient Urchinfs | ||||
func getUrfsClient() { | func getUrfsClient() { | ||||
@@ -54,7 +50,7 @@ func GetBackNpuModel(cloudbrainID int64, endpoint, bucket, objectKey, destPeerHo | |||||
switch res.StatusCode { | switch res.StatusCode { | ||||
case models.StorageScheduleSucceed: | case models.StorageScheduleSucceed: | ||||
log.Info("ScheduleDataToPeerByKey succeed") | log.Info("ScheduleDataToPeerByKey succeed") | ||||
decompress(res.DataRoot+"/"+res.DataPath, setting.Bucket+"/"+strings.TrimSuffix(res.DataPath, modelSuffix)) | |||||
decompress(res.DataRoot+"/"+res.DataPath, setting.Bucket+"/"+strings.TrimSuffix(res.DataPath, models.ModelSuffix)) | |||||
case models.StorageScheduleProcessing: | case models.StorageScheduleProcessing: | ||||
log.Info("ScheduleDataToPeerByKey processing") | log.Info("ScheduleDataToPeerByKey processing") | ||||
case models.StorageScheduleFailed: | case models.StorageScheduleFailed: | ||||
@@ -89,7 +85,7 @@ func HandleScheduleRecords() error { | |||||
switch res.StatusCode { | switch res.StatusCode { | ||||
case models.StorageScheduleSucceed: | case models.StorageScheduleSucceed: | ||||
log.Info("ScheduleDataToPeerByKey(%s) succeed", record.ObjectKey) | log.Info("ScheduleDataToPeerByKey(%s) succeed", record.ObjectKey) | ||||
decompress(record.Bucket+"/"+record.ObjectKey, setting.Bucket+"/"+strings.TrimSuffix(record.ObjectKey, modelSuffix)) | |||||
decompress(record.Bucket+"/"+record.ObjectKey, setting.Bucket+"/"+strings.TrimSuffix(record.ObjectKey, models.ModelSuffix)) | |||||
case models.StorageScheduleProcessing: | case models.StorageScheduleProcessing: | ||||
log.Info("ScheduleDataToPeerByKey(%s) processing", record.ObjectKey) | log.Info("ScheduleDataToPeerByKey(%s) processing", record.ObjectKey) | ||||
case models.StorageScheduleFailed: | case models.StorageScheduleFailed: | ||||
@@ -182,7 +182,9 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { | |||||
if oldStatus != job.Status { | if oldStatus != job.Status { | ||||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | notification.NotifyChangeCloudbrainStatus(job, oldStatus) | ||||
if models.IsTrainJobTerminal(job.Status) { | if models.IsTrainJobTerminal(job.Status) { | ||||
urchin.GetBackNpuModel(job.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(job.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) | |||||
if len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||||
urchin.GetBackNpuModel(job.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(job.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) | |||||
} | |||||
} | } | ||||
} | } | ||||
err = models.UpdateTrainJobVersion(job) | err = models.UpdateTrainJobVersion(job) | ||||
@@ -1941,7 +1941,9 @@ func SyncCloudbrainStatus() { | |||||
if oldStatus != task.Status { | if oldStatus != task.Status { | ||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | notification.NotifyChangeCloudbrainStatus(task, oldStatus) | ||||
if models.IsTrainJobTerminal(task.Status) { | if models.IsTrainJobTerminal(task.Status) { | ||||
urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) | |||||
if len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||||
urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) | |||||
} | |||||
} | } | ||||
} | } | ||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
@@ -872,7 +872,9 @@ func GrampusTrainJobShow(ctx *context.Context) { | |||||
if oldStatus != task.Status { | if oldStatus != task.Status { | ||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | notification.NotifyChangeCloudbrainStatus(task, oldStatus) | ||||
if models.IsTrainJobTerminal(task.Status) { | if models.IsTrainJobTerminal(task.Status) { | ||||
urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) | |||||
if len(result.JobInfo.Tasks[0].CenterID) == 1 { | |||||
urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) | |||||
} | |||||
} | } | ||||
} | } | ||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
@@ -1079,6 +1081,7 @@ func generateDatasetUnzipCommand(datasetName string) string { | |||||
if strings.HasSuffix(datasetNameArray[0], ".tar.gz") { | if strings.HasSuffix(datasetNameArray[0], ".tar.gz") { | ||||
unZipDatasetCommand = "tar --strip-components=1 -zxvf '" + datasetName + "';" | unZipDatasetCommand = "tar --strip-components=1 -zxvf '" + datasetName + "';" | ||||
} | } | ||||
unZipDatasetCommand += "rm -f " + datasetName + ";" | |||||
} else { //多数据集 | } else { //多数据集 | ||||
for _, datasetNameTemp := range datasetNameArray { | for _, datasetNameTemp := range datasetNameArray { | ||||
@@ -1087,6 +1090,7 @@ func generateDatasetUnzipCommand(datasetName string) string { | |||||
} else { | } else { | ||||
unZipDatasetCommand = unZipDatasetCommand + "unzip -q '" + datasetNameTemp + "' -d './" + strings.TrimSuffix(datasetNameTemp, ".zip") + "';" | unZipDatasetCommand = unZipDatasetCommand + "unzip -q '" + datasetNameTemp + "' -d './" + strings.TrimSuffix(datasetNameTemp, ".zip") + "';" | ||||
} | } | ||||
unZipDatasetCommand += "rm -f " + datasetNameTemp + ";" | |||||
} | } | ||||
} | } | ||||