Reviewed-on: https://git.openi.org.cn/OpenI/aiforge/pulls/2558 Reviewed-by: lewis <747342561@qq.com>pull/2575/head
@@ -206,7 +206,16 @@ func (task *Cloudbrain) CorrectCreateUnix() { | |||||
func (task *Cloudbrain) IsTerminal() bool { | func (task *Cloudbrain) IsTerminal() bool { | ||||
status := task.Status | status := task.Status | ||||
return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || status == string(ModelArtsTrainJobKilled) || status == string(ModelArtsStopped) || status == string(JobStopped) || status == string(JobFailed) || status == string(JobSucceeded) | |||||
return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || | |||||
status == string(ModelArtsTrainJobKilled) || status == string(ModelArtsStopped) || | |||||
status == string(JobStopped) || status == string(JobFailed) || | |||||
status == string(JobSucceeded) || status == GrampusStatusFailed || | |||||
status == GrampusStatusSucceeded || status == GrampusStatusStopped | |||||
} | |||||
func (task *Cloudbrain) IsRunning() bool { | |||||
status := task.Status | |||||
return status == string(ModelArtsTrainJobRunning) || status == string(ModelArtsRunning) || | |||||
status == string(JobRunning) || status == GrampusStatusRunning | |||||
} | } | ||||
func ConvertDurationToStr(duration int64) string { | func ConvertDurationToStr(duration int64) string { | ||||
@@ -19,6 +19,7 @@ const ( | |||||
ACCESS_TOKEN_PATH = "/cgi-bin/token" | ACCESS_TOKEN_PATH = "/cgi-bin/token" | ||||
QR_CODE_PATH = "/cgi-bin/qrcode/create" | QR_CODE_PATH = "/cgi-bin/qrcode/create" | ||||
GET_MATERIAL_PATH = "/cgi-bin/material/batchget_material" | GET_MATERIAL_PATH = "/cgi-bin/material/batchget_material" | ||||
SEND_TEMPLATE_PATH = "/cgi-bin/message/template/send" | |||||
ACTION_QR_STR_SCENE = "QR_STR_SCENE" | ACTION_QR_STR_SCENE = "QR_STR_SCENE" | ||||
ERR_CODE_ACCESSTOKEN_EXPIRE = 42001 | ERR_CODE_ACCESSTOKEN_EXPIRE = 42001 | ||||
@@ -41,12 +42,33 @@ type QRCodeRequest struct { | |||||
Action_info ActionInfo `json:"action_info"` | Action_info ActionInfo `json:"action_info"` | ||||
Expire_seconds int `json:"expire_seconds"` | Expire_seconds int `json:"expire_seconds"` | ||||
} | } | ||||
type MaterialRequest struct { | type MaterialRequest struct { | ||||
Type string `json:"type"` | Type string `json:"type"` | ||||
Offset int `json:"offset"` | Offset int `json:"offset"` | ||||
Count int `json:"count"` | Count int `json:"count"` | ||||
} | } | ||||
type TemplateMsgRequest struct { | |||||
ToUser string `json:"touser"` | |||||
TemplateId string `json:"template_id"` | |||||
Url string `json:"url"` | |||||
ClientMsgId string `json:"client_msg_id"` | |||||
Data interface{} `json:"data"` | |||||
} | |||||
type TemplateValue struct { | |||||
Value string `json:"value"` | |||||
Color string `json:"color"` | |||||
} | |||||
type CloudbrainTaskData struct { | |||||
First TemplateValue `json:"first"` | |||||
Keyword1 TemplateValue `json:"keyword1"` | |||||
Keyword2 TemplateValue `json:"keyword2"` | |||||
Keyword3 TemplateValue `json:"keyword3"` | |||||
Remark TemplateValue `json:"remark"` | |||||
} | |||||
type ActionInfo struct { | type ActionInfo struct { | ||||
Scene Scene `json:"scene"` | Scene Scene `json:"scene"` | ||||
} | } | ||||
@@ -161,3 +183,27 @@ func getErrorCodeFromResponse(r *resty.Response) int { | |||||
c, _ := strconv.Atoi(fmt.Sprint(code)) | c, _ := strconv.Atoi(fmt.Sprint(code)) | ||||
return c | return c | ||||
} | } | ||||
func sendTemplateMsg(req TemplateMsgRequest) (error, bool) { | |||||
client := getWechatRestyClient() | |||||
bodyJson, _ := json.Marshal(req) | |||||
r, err := client.R(). | |||||
SetHeader("Content-Type", "application/json"). | |||||
SetQueryParam("access_token", GetWechatAccessToken()). | |||||
SetBody(bodyJson). | |||||
Post(setting.WechatApiHost + SEND_TEMPLATE_PATH) | |||||
if err != nil { | |||||
log.Error("sendTemplateMsg,e=%v", err) | |||||
return nil, false | |||||
} | |||||
a := r.Body() | |||||
resultMap := make(map[string]interface{}, 0) | |||||
json.Unmarshal(a, &resultMap) | |||||
errcode := resultMap["errcode"] | |||||
log.Info("sendTemplateMsg,%v", r) | |||||
if errcode == fmt.Sprint(ERR_CODE_ACCESSTOKEN_EXPIRE) || errcode == fmt.Sprint(ERR_CODE_ACCESSTOKEN_INVALID) { | |||||
return nil, true | |||||
} | |||||
return nil, false | |||||
} |
@@ -0,0 +1,145 @@ | |||||
package wechat | |||||
import ( | |||||
"code.gitea.io/gitea/models" | |||||
"code.gitea.io/gitea/modules/log" | |||||
"code.gitea.io/gitea/modules/setting" | |||||
"errors" | |||||
"fmt" | |||||
"time" | |||||
) | |||||
type JobOperateType string | |||||
const ( | |||||
JobOperateTypeStart JobOperateType = "start" | |||||
JobOperateTypeStop JobOperateType = "stop" | |||||
) | |||||
func GetJobOperateTypeFromCloudbrainStatus(cloudbrain *models.Cloudbrain) JobOperateType { | |||||
if cloudbrain.IsTerminal() { | |||||
return JobOperateTypeStop | |||||
} | |||||
if cloudbrain.IsRunning() { | |||||
return JobOperateTypeStart | |||||
} | |||||
return "" | |||||
} | |||||
func SendCloudbrainStartedMsg(operateType JobOperateType, cloudbrain models.Cloudbrain) error { | |||||
defer func() { | |||||
if err := recover(); err != nil { | |||||
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) | |||||
log.Error("PANIC:", combinedErr) | |||||
} | |||||
}() | |||||
repo, err := models.GetRepositoryByID(cloudbrain.RepoID) | |||||
if err != nil { | |||||
log.Error("SendCloudbrainStartedMsg GetRepositoryByID error,%v", err) | |||||
} | |||||
if setting.CloudbrainStartedTemplateId == "" { | |||||
return nil | |||||
} | |||||
openId := models.GetUserWechatOpenId(cloudbrain.UserID) | |||||
if openId == "" { | |||||
return errors.New("Wechat openId not exist") | |||||
} | |||||
data := CloudbrainTaskData{ | |||||
First: TemplateValue{Value: getCloudbrainTemplateTitle(operateType)}, | |||||
Keyword1: TemplateValue{Value: cloudbrain.DisplayJobName}, | |||||
Keyword2: TemplateValue{Value: getJobTypeDisplayName(cloudbrain.JobType)}, | |||||
Keyword3: TemplateValue{Value: time.Unix(int64(cloudbrain.CreatedUnix), 0).Format("2006-01-02 15:04:05")}, | |||||
Remark: TemplateValue{Value: getCloudbrainTemplateRemark(operateType)}, | |||||
} | |||||
req := TemplateMsgRequest{ | |||||
ToUser: openId, | |||||
TemplateId: setting.CloudbrainStartedTemplateId, | |||||
Url: getCloudbrainTemplateUrl(cloudbrain, repo), | |||||
ClientMsgId: string(operateType) + "_" + fmt.Sprint(cloudbrain.ID), | |||||
Data: data, | |||||
} | |||||
err, retryFlag := sendTemplateMsg(req) | |||||
if retryFlag { | |||||
log.Info("retrySendCloudbrainTemplateMsg calling") | |||||
refreshAccessToken() | |||||
err, _ = sendTemplateMsg(req) | |||||
if err != nil { | |||||
log.Error("SendCloudbrainStartedMsg err. %v", err) | |||||
return err | |||||
} | |||||
return nil | |||||
} | |||||
if err != nil { | |||||
log.Error("SendCloudbrainStartedMsg err. %v", err) | |||||
return err | |||||
} | |||||
return nil | |||||
} | |||||
func getCloudbrainTemplateUrl(cloudbrain models.Cloudbrain, repo *models.Repository) string { | |||||
url := setting.AppURL + repo.FullName() | |||||
switch cloudbrain.JobType { | |||||
case string(models.JobTypeDebug): | |||||
if cloudbrain.ComputeResource == "CPU/GPU" { | |||||
url += "/cloudbrain/" + fmt.Sprint(cloudbrain.ID) | |||||
} else { | |||||
url += "/modelarts/notebook/" + fmt.Sprint(cloudbrain.ID) | |||||
} | |||||
case string(models.JobTypeBenchmark): | |||||
url += "/cloudbrain/benchmark/" + fmt.Sprint(cloudbrain.ID) | |||||
case string(models.JobTypeTrain): | |||||
if cloudbrain.Type == models.TypeCloudBrainOne { | |||||
url += "/cloudbrain/train-job/" + fmt.Sprint(cloudbrain.JobID) | |||||
} else if cloudbrain.Type == models.TypeCloudBrainTwo { | |||||
url += "/modelarts/train-job/" + fmt.Sprint(cloudbrain.JobID) | |||||
} else if cloudbrain.Type == models.TypeC2Net { | |||||
url += "/grampus/train-job/" + fmt.Sprint(cloudbrain.JobID) | |||||
} | |||||
case string(models.JobTypeInference): | |||||
url += "/modelarts/inference-job/" + fmt.Sprint(cloudbrain.JobID) | |||||
} | |||||
return url | |||||
} | |||||
func getCloudbrainTemplateTitle(operateType JobOperateType) string { | |||||
var title string | |||||
switch operateType { | |||||
case JobOperateTypeStart: | |||||
title = "您好,您提交的算力资源申请已通过,任务已启动,请您关注运行情况。" | |||||
case JobOperateTypeStop: | |||||
title = "您好,您提交的任务已运行结束。" | |||||
} | |||||
return title | |||||
} | |||||
func getCloudbrainTemplateRemark(operateType JobOperateType) string { | |||||
var remark string | |||||
switch operateType { | |||||
case JobOperateTypeStart: | |||||
remark = "感谢您的耐心等待。" | |||||
case JobOperateTypeStop: | |||||
remark = "点击可查看运行结果" | |||||
} | |||||
return remark | |||||
} | |||||
func getJobTypeDisplayName(jobType string) string { | |||||
switch jobType { | |||||
case string(models.JobTypeDebug): | |||||
return "调试任务" | |||||
case string(models.JobTypeBenchmark): | |||||
return "评测任务" | |||||
case string(models.JobTypeTrain): | |||||
return "训练任务" | |||||
case string(models.JobTypeInference): | |||||
return "推理任务" | |||||
} | |||||
return "" | |||||
} |
@@ -56,4 +56,6 @@ type Notifier interface { | |||||
NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string) | NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string) | ||||
NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) | NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) | ||||
NotifyChangeCloudbrainStatus(cloudbrain *models.Cloudbrain, oldStatus string) | |||||
} | } |
@@ -158,3 +158,7 @@ func (*NullNotifier) NotifySyncDeleteRef(doer *models.User, repo *models.Reposit | |||||
func (*NullNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) { | func (*NullNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) { | ||||
} | } | ||||
func (*NullNotifier) NotifyChangeCloudbrainStatus(cloudbrain *models.Cloudbrain, oldStatus string) { | |||||
} |
@@ -12,6 +12,7 @@ import ( | |||||
"code.gitea.io/gitea/modules/notification/mail" | "code.gitea.io/gitea/modules/notification/mail" | ||||
"code.gitea.io/gitea/modules/notification/ui" | "code.gitea.io/gitea/modules/notification/ui" | ||||
"code.gitea.io/gitea/modules/notification/webhook" | "code.gitea.io/gitea/modules/notification/webhook" | ||||
wechatNotifier "code.gitea.io/gitea/modules/notification/wechat" | |||||
"code.gitea.io/gitea/modules/repository" | "code.gitea.io/gitea/modules/repository" | ||||
"code.gitea.io/gitea/modules/setting" | "code.gitea.io/gitea/modules/setting" | ||||
) | ) | ||||
@@ -35,6 +36,7 @@ func NewContext() { | |||||
RegisterNotifier(indexer.NewNotifier()) | RegisterNotifier(indexer.NewNotifier()) | ||||
RegisterNotifier(webhook.NewNotifier()) | RegisterNotifier(webhook.NewNotifier()) | ||||
RegisterNotifier(action.NewNotifier()) | RegisterNotifier(action.NewNotifier()) | ||||
RegisterNotifier(wechatNotifier.NewNotifier()) | |||||
} | } | ||||
// NotifyUploadAttachment notifies attachment upload message to notifiers | // NotifyUploadAttachment notifies attachment upload message to notifiers | ||||
@@ -269,3 +271,10 @@ func NotifySyncDeleteRef(pusher *models.User, repo *models.Repository, refType, | |||||
notifier.NotifySyncDeleteRef(pusher, repo, refType, refFullName) | notifier.NotifySyncDeleteRef(pusher, repo, refType, refFullName) | ||||
} | } | ||||
} | } | ||||
// NotifyChangeCloudbrainStatus | |||||
func NotifyChangeCloudbrainStatus(cloudbrain *models.Cloudbrain, oldStatus string) { | |||||
for _, notifier := range notifiers { | |||||
notifier.NotifyChangeCloudbrainStatus(cloudbrain, oldStatus) | |||||
} | |||||
} |
@@ -0,0 +1,44 @@ | |||||
// Copyright 2019 The Gitea Authors. All rights reserved. | |||||
// Use of this source code is governed by a MIT-style | |||||
// license that can be found in the LICENSE file. | |||||
package wechat | |||||
import ( | |||||
"code.gitea.io/gitea/models" | |||||
"code.gitea.io/gitea/modules/auth/wechat" | |||||
"code.gitea.io/gitea/modules/notification/base" | |||||
"code.gitea.io/gitea/modules/setting" | |||||
) | |||||
type wechatNotifier struct { | |||||
base.NullNotifier | |||||
} | |||||
var ( | |||||
_ base.Notifier = &wechatNotifier{} | |||||
) | |||||
// NewNotifier create a new wechatNotifier notifier | |||||
func NewNotifier() base.Notifier { | |||||
return &wechatNotifier{} | |||||
} | |||||
func (*wechatNotifier) NotifyChangeCloudbrainStatus(cloudbrain *models.Cloudbrain, oldStatus string) { | |||||
operateType := wechat.GetJobOperateTypeFromCloudbrainStatus(cloudbrain) | |||||
if operateType == "" { | |||||
return | |||||
} | |||||
switch operateType { | |||||
case wechat.JobOperateTypeStart: | |||||
if len(setting.CloudbrainStartedNotifyList) == 0 { | |||||
return | |||||
} | |||||
for _, v := range setting.CloudbrainStartedNotifyList { | |||||
if v == cloudbrain.JobType { | |||||
go wechat.SendCloudbrainStartedMsg(operateType, *cloudbrain) | |||||
return | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -582,6 +582,10 @@ var ( | |||||
TreePathOfAutoMsgReply string | TreePathOfAutoMsgReply string | ||||
TreePathOfSubscribe string | TreePathOfSubscribe string | ||||
//wechat template msg config | |||||
CloudbrainStartedTemplateId string | |||||
CloudbrainStartedNotifyList []string | |||||
//nginx proxy | //nginx proxy | ||||
PROXYURL string | PROXYURL string | ||||
RadarMap = struct { | RadarMap = struct { | ||||
@@ -1432,7 +1436,7 @@ func NewContext() { | |||||
WechatApiHost = sec.Key("HOST").MustString("https://api.weixin.qq.com") | WechatApiHost = sec.Key("HOST").MustString("https://api.weixin.qq.com") | ||||
WechatApiTimeoutSeconds = sec.Key("TIMEOUT_SECONDS").MustInt(3) | WechatApiTimeoutSeconds = sec.Key("TIMEOUT_SECONDS").MustInt(3) | ||||
WechatAppId = sec.Key("APP_ID").MustString("wxba77b915a305a57d") | WechatAppId = sec.Key("APP_ID").MustString("wxba77b915a305a57d") | ||||
WechatAppSecret = sec.Key("APP_SECRET").MustString("e48e13f315adc32749ddc7057585f198") | |||||
WechatAppSecret = sec.Key("APP_SECRET").MustString("") | |||||
WechatQRCodeExpireSeconds = sec.Key("QR_CODE_EXPIRE_SECONDS").MustInt(120) | WechatQRCodeExpireSeconds = sec.Key("QR_CODE_EXPIRE_SECONDS").MustInt(120) | ||||
WechatAuthSwitch = sec.Key("AUTH_SWITCH").MustBool(true) | WechatAuthSwitch = sec.Key("AUTH_SWITCH").MustBool(true) | ||||
UserNameOfWechatReply = sec.Key("AUTO_REPLY_USER_NAME").MustString("OpenIOSSG") | UserNameOfWechatReply = sec.Key("AUTO_REPLY_USER_NAME").MustString("OpenIOSSG") | ||||
@@ -1440,6 +1444,8 @@ func NewContext() { | |||||
RefNameOfWechatReply = sec.Key("AUTO_REPLY_REF_NAME").MustString("master") | RefNameOfWechatReply = sec.Key("AUTO_REPLY_REF_NAME").MustString("master") | ||||
TreePathOfAutoMsgReply = sec.Key("AUTO_REPLY_TREE_PATH").MustString("wechat/auto_reply.json") | TreePathOfAutoMsgReply = sec.Key("AUTO_REPLY_TREE_PATH").MustString("wechat/auto_reply.json") | ||||
TreePathOfSubscribe = sec.Key("SUBSCRIBE_TREE_PATH").MustString("wechat/subscribe_reply.json") | TreePathOfSubscribe = sec.Key("SUBSCRIBE_TREE_PATH").MustString("wechat/subscribe_reply.json") | ||||
CloudbrainStartedTemplateId = sec.Key("CLOUDBRAIN_STARTED_TEMPLATE_ID").MustString("") | |||||
CloudbrainStartedNotifyList = strings.Split(sec.Key("CLOUDBRAIN_STARTED_NOTIFY_LIST").MustString("DEBUG"), ",") | |||||
SetRadarMapConfig() | SetRadarMapConfig() | ||||
@@ -6,6 +6,7 @@ | |||||
package repo | package repo | ||||
import ( | import ( | ||||
"code.gitea.io/gitea/modules/notification" | |||||
"encoding/json" | "encoding/json" | ||||
"net/http" | "net/http" | ||||
"sort" | "sort" | ||||
@@ -74,7 +75,7 @@ func GetCloudbrainTask(ctx *context.APIContext) { | |||||
log.Error("ConvertToJobResultPayload failed:", err) | log.Error("ConvertToJobResultPayload failed:", err) | ||||
return | return | ||||
} | } | ||||
oldStatus := job.Status | |||||
job.Status = result.JobStatus.State | job.Status = result.JobStatus.State | ||||
taskRoles := result.TaskRoles | taskRoles := result.TaskRoles | ||||
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) | taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) | ||||
@@ -86,6 +87,9 @@ func GetCloudbrainTask(ctx *context.APIContext) { | |||||
if result.JobStatus.State != string(models.JobWaiting) { | if result.JobStatus.State != string(models.JobWaiting) { | ||||
models.ParseAndSetDurationFromCloudBrainOne(result, job) | models.ParseAndSetDurationFromCloudBrainOne(result, job) | ||||
if oldStatus != job.Status { | |||||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | |||||
} | |||||
err = models.UpdateJob(job) | err = models.UpdateJob(job) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob failed:", err) | log.Error("UpdateJob failed:", err) | ||||
@@ -123,7 +127,7 @@ func GetCloudBrainInferenceJob(ctx *context.APIContext) { | |||||
log.Error("ConvertToJobResultPayload failed:", err) | log.Error("ConvertToJobResultPayload failed:", err) | ||||
return | return | ||||
} | } | ||||
oldStatus := job.Status | |||||
job.Status = result.JobStatus.State | job.Status = result.JobStatus.State | ||||
if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { | if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { | ||||
taskRoles := result.TaskRoles | taskRoles := result.TaskRoles | ||||
@@ -136,6 +140,9 @@ func GetCloudBrainInferenceJob(ctx *context.APIContext) { | |||||
if result.JobStatus.State != string(models.JobWaiting) { | if result.JobStatus.State != string(models.JobWaiting) { | ||||
models.ParseAndSetDurationFromCloudBrainOne(result, job) | models.ParseAndSetDurationFromCloudBrainOne(result, job) | ||||
if oldStatus != job.Status { | |||||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | |||||
} | |||||
err = models.UpdateJob(job) | err = models.UpdateJob(job) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob failed:", err) | log.Error("UpdateJob failed:", err) | ||||
@@ -6,6 +6,7 @@ | |||||
package repo | package repo | ||||
import ( | import ( | ||||
"code.gitea.io/gitea/modules/notification" | |||||
"encoding/json" | "encoding/json" | ||||
"net/http" | "net/http" | ||||
"path" | "path" | ||||
@@ -42,8 +43,11 @@ func GetModelArtsNotebook(ctx *context.APIContext) { | |||||
ctx.NotFound(err) | ctx.NotFound(err) | ||||
return | return | ||||
} | } | ||||
oldStatus := job.Status | |||||
job.Status = result.Status | job.Status = result.Status | ||||
if oldStatus != result.Status { | |||||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | |||||
} | |||||
err = models.UpdateJob(job) | err = models.UpdateJob(job) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob failed:", err) | log.Error("UpdateJob failed:", err) | ||||
@@ -75,12 +79,16 @@ func GetModelArtsNotebook2(ctx *context.APIContext) { | |||||
if job.StartTime == 0 && result.Lease.UpdateTime > 0 { | if job.StartTime == 0 && result.Lease.UpdateTime > 0 { | ||||
job.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000) | job.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000) | ||||
} | } | ||||
oldStatus := job.Status | |||||
job.Status = result.Status | job.Status = result.Status | ||||
if job.EndTime == 0 && models.IsModelArtsDebugJobTerminal(job.Status) { | if job.EndTime == 0 && models.IsModelArtsDebugJobTerminal(job.Status) { | ||||
job.EndTime = timeutil.TimeStampNow() | job.EndTime = timeutil.TimeStampNow() | ||||
} | } | ||||
job.CorrectCreateUnix() | job.CorrectCreateUnix() | ||||
job.ComputeAndSetDuration() | job.ComputeAndSetDuration() | ||||
if oldStatus != result.Status { | |||||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | |||||
} | |||||
err = models.UpdateJob(job) | err = models.UpdateJob(job) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob failed:", err) | log.Error("UpdateJob failed:", err) | ||||
@@ -111,10 +119,13 @@ func GetModelArtsTrainJob(ctx *context.APIContext) { | |||||
ctx.NotFound(err) | ctx.NotFound(err) | ||||
return | return | ||||
} | } | ||||
oldStatus := job.Status | |||||
job.Status = modelarts.TransTrainJobStatus(result.IntStatus) | job.Status = modelarts.TransTrainJobStatus(result.IntStatus) | ||||
job.Duration = result.Duration | job.Duration = result.Duration | ||||
job.TrainJobDuration = result.TrainJobDuration | job.TrainJobDuration = result.TrainJobDuration | ||||
if oldStatus != job.Status { | |||||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | |||||
} | |||||
err = models.UpdateJob(job) | err = models.UpdateJob(job) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob failed:", err) | log.Error("UpdateJob failed:", err) | ||||
@@ -155,7 +166,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { | |||||
log.Error("ConvertToJobResultPayload failed:", err) | log.Error("ConvertToJobResultPayload failed:", err) | ||||
return | return | ||||
} | } | ||||
oldStatus := job.Status | |||||
job.Status = result.JobStatus.State | job.Status = result.JobStatus.State | ||||
if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { | if result.JobStatus.State != string(models.JobWaiting) && result.JobStatus.State != string(models.JobFailed) { | ||||
taskRoles := result.TaskRoles | taskRoles := result.TaskRoles | ||||
@@ -168,6 +179,9 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { | |||||
if result.JobStatus.State != string(models.JobWaiting) { | if result.JobStatus.State != string(models.JobWaiting) { | ||||
models.ParseAndSetDurationFromCloudBrainOne(result, job) | models.ParseAndSetDurationFromCloudBrainOne(result, job) | ||||
if oldStatus != job.Status { | |||||
notification.NotifyChangeCloudbrainStatus(job, oldStatus) | |||||
} | |||||
err = models.UpdateJob(job) | err = models.UpdateJob(job) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob failed:", err) | log.Error("UpdateJob failed:", err) | ||||
@@ -2,6 +2,7 @@ package repo | |||||
import ( | import ( | ||||
"bufio" | "bufio" | ||||
"code.gitea.io/gitea/modules/notification" | |||||
"encoding/json" | "encoding/json" | ||||
"errors" | "errors" | ||||
"fmt" | "fmt" | ||||
@@ -373,7 +374,6 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { | |||||
} | } | ||||
} | } | ||||
func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBrainInferencForm) { | func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBrainInferencForm) { | ||||
ctx.Data["PageIsCloudBrain"] = true | ctx.Data["PageIsCloudBrain"] = true | ||||
displayJobName := form.DisplayJobName | displayJobName := form.DisplayJobName | ||||
@@ -494,6 +494,7 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra | |||||
ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/inference-job") | ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/inference-job") | ||||
} | } | ||||
/** | /** | ||||
检查用户传输的参数是否符合专属资源池 | 检查用户传输的参数是否符合专属资源池 | ||||
*/ | */ | ||||
@@ -784,12 +785,16 @@ func cloudBrainShow(ctx *context.Context, tpName base.TplName, jobType models.Jo | |||||
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) | taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) | ||||
ctx.Data["taskRes"] = taskRes | ctx.Data["taskRes"] = taskRes | ||||
ctx.Data["ExitDiagnostics"] = taskRes.TaskStatuses[0].ExitDiagnostics | ctx.Data["ExitDiagnostics"] = taskRes.TaskStatuses[0].ExitDiagnostics | ||||
oldStatus := task.Status | |||||
task.Status = taskRes.TaskStatuses[0].State | task.Status = taskRes.TaskStatuses[0].State | ||||
task.ContainerID = taskRes.TaskStatuses[0].ContainerID | task.ContainerID = taskRes.TaskStatuses[0].ContainerID | ||||
task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP | task.ContainerIp = taskRes.TaskStatuses[0].ContainerIP | ||||
models.ParseAndSetDurationFromCloudBrainOne(jobRes, task) | models.ParseAndSetDurationFromCloudBrainOne(jobRes, task) | ||||
if task.DeletedAt.IsZero() { //normal record | if task.DeletedAt.IsZero() { //normal record | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
ctx.Data["error"] = err.Error() | ctx.Data["error"] = err.Error() | ||||
@@ -1143,12 +1148,15 @@ func CloudBrainStop(ctx *context.Context) { | |||||
errorMsg = "cloudbrain.Stopped_failed" | errorMsg = "cloudbrain.Stopped_failed" | ||||
break | break | ||||
} | } | ||||
oldStatus := task.Status | |||||
task.Status = string(models.JobStopped) | task.Status = string(models.JobStopped) | ||||
if task.EndTime == 0 { | if task.EndTime == 0 { | ||||
task.EndTime = timeutil.TimeStampNow() | task.EndTime = timeutil.TimeStampNow() | ||||
} | } | ||||
task.ComputeAndSetDuration() | task.ComputeAndSetDuration() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) | log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) | ||||
@@ -1242,11 +1250,15 @@ func logErrorAndUpdateJobStatus(err error, taskInfo *models.Cloudbrain) { | |||||
if err != nil { | if err != nil { | ||||
log.Warn("Failed to stop cloudBrain job:"+taskInfo.JobID, err) | log.Warn("Failed to stop cloudBrain job:"+taskInfo.JobID, err) | ||||
} else { | } else { | ||||
oldStatus := taskInfo.Status | |||||
taskInfo.Status = string(models.JobStopped) | taskInfo.Status = string(models.JobStopped) | ||||
if taskInfo.EndTime == 0 { | if taskInfo.EndTime == 0 { | ||||
taskInfo.EndTime = timeutil.TimeStampNow() | taskInfo.EndTime = timeutil.TimeStampNow() | ||||
} | } | ||||
taskInfo.ComputeAndSetDuration() | taskInfo.ComputeAndSetDuration() | ||||
if oldStatus != taskInfo.Status { | |||||
notification.NotifyChangeCloudbrainStatus(taskInfo, oldStatus) | |||||
} | |||||
err = models.UpdateJob(taskInfo) | err = models.UpdateJob(taskInfo) | ||||
if err != nil { | if err != nil { | ||||
log.Warn("UpdateJob failed", err) | log.Warn("UpdateJob failed", err) | ||||
@@ -1726,9 +1738,13 @@ func SyncCloudbrainStatus() { | |||||
jobRes, _ := models.ConvertToJobResultPayload(result.Payload) | jobRes, _ := models.ConvertToJobResultPayload(result.Payload) | ||||
taskRoles := jobRes.TaskRoles | taskRoles := jobRes.TaskRoles | ||||
taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) | taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) | ||||
oldStatus := task.Status | |||||
task.Status = taskRes.TaskStatuses[0].State | task.Status = taskRes.TaskStatuses[0].State | ||||
if task.Status != string(models.JobWaiting) { | if task.Status != string(models.JobWaiting) { | ||||
models.ParseAndSetDurationFromCloudBrainOne(jobRes, task) | models.ParseAndSetDurationFromCloudBrainOne(jobRes, task) | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | ||||
@@ -1755,6 +1771,9 @@ func SyncCloudbrainStatus() { | |||||
task.EndTime = timeutil.TimeStampNow() | task.EndTime = timeutil.TimeStampNow() | ||||
} | } | ||||
task.ComputeAndSetDuration() | task.ComputeAndSetDuration() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err) | log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err) | ||||
@@ -1773,6 +1792,7 @@ func SyncCloudbrainStatus() { | |||||
} | } | ||||
if result != nil { | if result != nil { | ||||
oldStatus := task.Status | |||||
task.Status = result.Status | task.Status = result.Status | ||||
if task.StartTime == 0 && result.Lease.UpdateTime > 0 { | if task.StartTime == 0 && result.Lease.UpdateTime > 0 { | ||||
task.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000) | task.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000) | ||||
@@ -1782,6 +1802,9 @@ func SyncCloudbrainStatus() { | |||||
} | } | ||||
task.CorrectCreateUnix() | task.CorrectCreateUnix() | ||||
task.ComputeAndSetDuration() | task.ComputeAndSetDuration() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | ||||
@@ -1796,6 +1819,7 @@ func SyncCloudbrainStatus() { | |||||
} | } | ||||
if result != nil { | if result != nil { | ||||
oldStatus := task.Status | |||||
task.Status = modelarts.TransTrainJobStatus(result.IntStatus) | task.Status = modelarts.TransTrainJobStatus(result.IntStatus) | ||||
task.Duration = result.Duration / 1000 | task.Duration = result.Duration / 1000 | ||||
task.TrainJobDuration = result.TrainJobDuration | task.TrainJobDuration = result.TrainJobDuration | ||||
@@ -1808,6 +1832,9 @@ func SyncCloudbrainStatus() { | |||||
task.EndTime = task.StartTime.Add(task.Duration) | task.EndTime = task.StartTime.Add(task.Duration) | ||||
} | } | ||||
task.CorrectCreateUnix() | task.CorrectCreateUnix() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | ||||
@@ -1828,6 +1855,7 @@ func SyncCloudbrainStatus() { | |||||
if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { | if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { | ||||
task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] | task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] | ||||
} | } | ||||
oldStatus := task.Status | |||||
task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) | task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) | ||||
task.Duration = result.JobInfo.RunSec | task.Duration = result.JobInfo.RunSec | ||||
task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) | task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) | ||||
@@ -1839,6 +1867,9 @@ func SyncCloudbrainStatus() { | |||||
task.EndTime = task.StartTime.Add(task.Duration) | task.EndTime = task.StartTime.Add(task.Duration) | ||||
} | } | ||||
task.CorrectCreateUnix() | task.CorrectCreateUnix() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | log.Error("UpdateJob(%s) failed:%v", task.JobName, err) | ||||
@@ -5,6 +5,7 @@ import ( | |||||
"code.gitea.io/gitea/modules/git" | "code.gitea.io/gitea/modules/git" | ||||
"code.gitea.io/gitea/modules/grampus" | "code.gitea.io/gitea/modules/grampus" | ||||
"code.gitea.io/gitea/modules/modelarts" | "code.gitea.io/gitea/modules/modelarts" | ||||
"code.gitea.io/gitea/modules/notification" | |||||
"code.gitea.io/gitea/modules/timeutil" | "code.gitea.io/gitea/modules/timeutil" | ||||
"code.gitea.io/gitea/modules/util" | "code.gitea.io/gitea/modules/util" | ||||
"encoding/json" | "encoding/json" | ||||
@@ -550,12 +551,15 @@ func GrampusStopJob(ctx *context.Context) { | |||||
errorMsg = res.ErrorMsg | errorMsg = res.ErrorMsg | ||||
break | break | ||||
} | } | ||||
oldStatus := task.Status | |||||
task.Status = string(models.GrampusStatusStopped) | task.Status = string(models.GrampusStatusStopped) | ||||
if task.EndTime == 0 { | if task.EndTime == 0 { | ||||
task.EndTime = timeutil.TimeStampNow() | task.EndTime = timeutil.TimeStampNow() | ||||
} | } | ||||
task.ComputeAndSetDuration() | task.ComputeAndSetDuration() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) | log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) | ||||
@@ -642,6 +646,7 @@ func GrampusTrainJobShow(ctx *context.Context) { | |||||
if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { | if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { | ||||
task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] | task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] | ||||
} | } | ||||
oldStatus := task.Status | |||||
task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) | task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) | ||||
if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { | if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { | ||||
task.Duration = result.JobInfo.RunSec | task.Duration = result.JobInfo.RunSec | ||||
@@ -654,6 +659,9 @@ func GrampusTrainJobShow(ctx *context.Context) { | |||||
task.EndTime = task.StartTime.Add(task.Duration) | task.EndTime = task.StartTime.Add(task.Duration) | ||||
} | } | ||||
task.CorrectCreateUnix() | task.CorrectCreateUnix() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob failed:" + err.Error()) | log.Error("UpdateJob failed:" + err.Error()) | ||||
@@ -272,8 +272,10 @@ func NotebookShow(ctx *context.Context) { | |||||
if result != nil { | if result != nil { | ||||
if task.DeletedAt.IsZero() { //normal record | if task.DeletedAt.IsZero() { //normal record | ||||
if task.Status != result.Status { | if task.Status != result.Status { | ||||
oldStatus := task.Status | |||||
task.Status = result.Status | task.Status = result.Status | ||||
models.ParseAndSetDurationFromModelArtsNotebook(result, task) | models.ParseAndSetDurationFromModelArtsNotebook(result, task) | ||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("GET job error", err.Error()) | log.Error("GET job error", err.Error()) | ||||
@@ -510,11 +512,15 @@ func NotebookManage(ctx *context.Context) { | |||||
ID = strconv.FormatInt(newTask.ID, 10) | ID = strconv.FormatInt(newTask.ID, 10) | ||||
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, ID, task.DisplayJobName, models.ActionCreateDebugNPUTask) | notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, ID, task.DisplayJobName, models.ActionCreateDebugNPUTask) | ||||
} else { | } else { | ||||
oldStatus := task.Status | |||||
task.Status = res.Status | task.Status = res.Status | ||||
if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) { | if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) { | ||||
task.EndTime = timeutil.TimeStampNow() | task.EndTime = timeutil.TimeStampNow() | ||||
} | } | ||||
task.ComputeAndSetDuration() | task.ComputeAndSetDuration() | ||||
if oldStatus != task.Status { | |||||
notification.NotifyChangeCloudbrainStatus(task, oldStatus) | |||||
} | |||||
err = models.UpdateJob(task) | err = models.UpdateJob(task) | ||||
if err != nil { | if err != nil { | ||||
log.Error("UpdateJob(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"]) | log.Error("UpdateJob(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"]) | ||||