diff --git a/custom/conf/app.ini.sample b/custom/conf/app.ini.sample index d294c8823..7a4298f6b 100755 --- a/custom/conf/app.ini.sample +++ b/custom/conf/app.ini.sample @@ -1141,3 +1141,9 @@ growth_issue=0.2 growth_contributors=0.2 growth_commit=0.2 growth_comments=0.2 + + +[grampus] +USERNAME = +PASSWORD = +SERVER_HOST = diff --git a/models/action.go b/models/action.go index 1a25c162a..3e6a58077 100755 --- a/models/action.go +++ b/models/action.go @@ -50,6 +50,16 @@ const ( ActionRejectPullRequest // 22 ActionCommentPull // 23 + ActionUploadAttachment //24 + ActionCreateDebugGPUTask //25 + ActionCreateDebugNPUTask //26 + ActionCreateTrainTask //27 + ActionCreateInferenceTask // 28 + ActionCreateBenchMarkTask //29 + ActionCreateNewModelTask //30 + ActionCreateGPUTrainTask //31 + ActionCreateGrampusNPUTrainTask //32 + ActionCreateGrampusGPUTrainTask //33 ActionUploadAttachment //24 ActionCreateDebugGPUTask //25 ActionCreateDebugNPUTask //26 diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 1d2e56476..d2d4ac656 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -24,7 +24,7 @@ type ModelArtsJobStatus string const ( TypeCloudBrainOne int = iota TypeCloudBrainTwo - TypeIntelligentNet + TypeC2Net //智算网络 TypeCloudBrainAll = -1 ) @@ -99,6 +99,15 @@ const ( ModelArtsTrainJobCheckFailed ModelArtsJobStatus = "CHECK_FAILED" //审核作业失败 DURATION_STR_ZERO = "00:00:00" + + //grampus + GrampusStatusPending = "pending" + GrampusStatusRunning = "RUNNING" + GrampusStatusFailed = "FAILED" + GrampusStatusSucceeded = "SUCCEEDED" + GrampusStatusStopped = "STOPPED" + GrampusStatusUnknown = "UNKNOWN" + GrampusStatusWaiting = "WAITING" ) type Cloudbrain struct { @@ -138,6 +147,8 @@ type Cloudbrain struct { PreVersionName string //父版本名称 ComputeResource string //计算资源,例如npu EngineID int64 //引擎id + ImageID string //grampus image_id + AiCenter string //grampus ai center: center_id+center_name TrainUrl string //输出模型的obs路径 BranchName string //分支名称 @@ -238,7 +249,7 @@ func ConvertDurationToStr(duration int64) string { } func IsTrainJobTerminal(status string) bool { - return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || status == string(ModelArtsTrainJobKilled) + return status == string(ModelArtsTrainJobCompleted) || status == string(ModelArtsTrainJobFailed) || status == string(ModelArtsTrainJobKilled) || status == GrampusStatusFailed || status == GrampusStatusStopped || status == GrampusStatusSucceeded } func IsModelArtsDebugJobTerminal(status string) bool { @@ -588,6 +599,17 @@ type FlavorInfo struct { UnitPrice int64 `json:"unitPrice"` } +type SpecialPools struct { + Pools []*SpecialPool `json:"pools"` +} +type SpecialPool struct { + Org string `json:"org"` + Type string `json:"type"` + IsExclusive bool `json:"isExclusive"` + Pool []*GpuInfo `json:"pool"` + JobType []string `json:"jobType"` +} + type ImageInfosModelArts struct { ImageInfo []*ImageInfoModelArts `json:"image_info"` } @@ -1190,6 +1212,84 @@ type LogFile struct { Name string } +//Grampus +type GrampusResult struct { + ErrorCode int `json:"errorCode"` + ErrorMsg string `json:"errorMsg"` +} + +type GrampusJobInfo struct { + StartedAt int64 `json:"startedAt"` + RunSec int64 `json:"runSec"` + CompletedAt int64 `json:"completedAt"` + CreatedAt int64 `json:"createdAt"` + UpdatedAt int64 `json:"updatedAt"` + Desc string `json:"desc"` + JobID string `json:"id"` + Name string `json:"name"` + Status string `json:"status"` + UserID string `json:"userId"` + Tasks []GrampusTasks `json:"tasks"` +} + +type GrampusSpec struct { + CreatedAt int64 `json:"createdAt"` + UpdatedAt int64 `json:"updatedAt"` + ID string `json:"id"` + Name string `json:"name"` + ProcessorType string `json:"processorType"` +} + +type GetGrampusResourceSpecsResult struct { + GrampusResult + Infos []GrampusSpec `json:"resourceSpecs"` +} + +type GrampusImage struct { + CreatedAt int64 `json:"createdAt"` + UpdatedAt int64 `json:"updatedAt"` + ID string `json:"id"` + Name string `json:"name"` + ProcessorType string `json:"processorType"` +} + +type GetGrampusImagesResult struct { + GrampusResult + TotalSize int `json:"totalSize"` + Infos []GrampusImage `json:"images"` +} + +type CreateGrampusJobResponse struct { + GrampusResult + JobInfo GrampusJobInfo `json:"otJob"` +} + +type GetGrampusJobResponse struct { + GrampusResult + JobInfo GrampusJobInfo `json:"otJob"` +} + +type GrampusStopJobResponse struct { + GrampusResult + StoppedAt int64 `json:"stoppedAt"` +} + +type GrampusTasks struct { + Command string `json:"command"` + Name string `json:"name"` + ImageId string `json:"imageId"` + ResourceSpecId string `json:"resourceSpecId"` + ImageUrl string `json:"imageUrl"` + CenterID []string `json:"centerID"` + CenterName []string `json:"centerName"` + ReplicaNum int `json:"replicaNum"` +} + +type CreateGrampusJobRequest struct { + Name string `json:"name"` + Tasks []GrampusTasks `json:"tasks"` +} + type GetTrainJobMetricStatisticResult struct { TrainJobResult Interval int `json:"interval"` //查询的时间间隔,单位为分钟 @@ -1235,6 +1335,12 @@ func Cloudbrains(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int64, error) { ) } + if len(opts.ComputeResource) > 0 { + cond = cond.And( + builder.Eq{"cloudbrain.compute_resource": opts.ComputeResource}, + ) + } + if len(opts.JobTypes) > 0 { if opts.JobTypeNot { cond = cond.And( @@ -1490,6 +1596,11 @@ func GetCloudbrainByJobID(jobID string) (*Cloudbrain, error) { return getRepoCloudBrain(cb) } +func GetCloudbrainByJobIDWithDeleted(jobID string) (*Cloudbrain, error) { + cb := &Cloudbrain{JobID: jobID} + return getRepoCloudBrainWithDeleted(cb) +} + func GetCloudbrainByID(id string) (*Cloudbrain, error) { idInt64, _ := strconv.ParseInt(id, 10, 64) cb := &Cloudbrain{ID: idInt64} @@ -1668,6 +1779,11 @@ func GetCloudbrainInferenceJobCountByUserID(userID int64) (int, error) { return int(count), err } +func GetGrampusCountByUserID(userID int64, jobType, computeResource string) (int, error) { + count, err := x.In("status", GrampusStatusWaiting, GrampusStatusRunning).And("job_type = ? and user_id = ? and type = ?", jobType, userID, TypeC2Net).And("compute_resource = ?", computeResource).Count(new(Cloudbrain)) + return int(count), err +} + func UpdateInferenceJob(job *Cloudbrain) error { return updateInferenceJob(x, job) } diff --git a/models/cloudbrain_image.go b/models/cloudbrain_image.go old mode 100644 new mode 100755 index f72c6a27c..71f0c2c94 --- a/models/cloudbrain_image.go +++ b/models/cloudbrain_image.go @@ -68,6 +68,7 @@ type SearchImageOptions struct { IncludeCustom bool IncludeOwnerOnly bool Topics string + CloudbrainType int ListOptions SearchOrderBy } @@ -411,6 +412,10 @@ func SearchImageCondition(opts *SearchImageOptions) builder.Cond { } + if opts.CloudbrainType > 0 { + cond = cond.And(builder.Eq{"cloudbrain_type": opts.CloudbrainType}) + } + return cond } diff --git a/models/custom_migrations.go b/models/custom_migrations.go old mode 100644 new mode 100755 index 412bedce1..65b53f0f4 --- a/models/custom_migrations.go +++ b/models/custom_migrations.go @@ -15,13 +15,9 @@ type CustomMigrationStatic struct { Migrate func(*xorm.Engine, *xorm.Engine) error } -var customMigrations = []CustomMigration{ - {"Custom v1 Topic struct change to support chinese", syncTopicStruct}, -} +var customMigrations []CustomMigration -var customMigrationsStatic = []CustomMigrationStatic{ - {"update issue_fixed_rate to 1 if num_issues is 0 ", updateIssueFixedRate}, -} +var customMigrationsStatic []CustomMigrationStatic func MigrateCustom(x *xorm.Engine) { diff --git a/models/repo.go b/models/repo.go index 9845ffa82..0e63f4c37 100755 --- a/models/repo.go +++ b/models/repo.go @@ -2755,15 +2755,10 @@ func ReadLatestFileInRepo(userName, repoName, refName, treePath string) (*RepoFi log.Error("ReadLatestFileInRepo: Close: %v", err) } }() - - buf := make([]byte, 1024) - n, _ := reader.Read(buf) - if n >= 0 { - buf = buf[:n] - } + d, _ := ioutil.ReadAll(reader) commitId := "" if blob != nil { commitId = fmt.Sprint(blob.ID) } - return &RepoFile{CommitId: commitId, Content: buf}, nil + return &RepoFile{CommitId: commitId, Content: d}, nil } diff --git a/models/user_business_analysis.go b/models/user_business_analysis.go index 4cd3539d7..e058c0df8 100644 --- a/models/user_business_analysis.go +++ b/models/user_business_analysis.go @@ -955,6 +955,8 @@ func CounDataByDateAndReCount(wikiCountMap map[string]int, startTime time.Time, return err } userNewAddActivity := make(map[int64]map[int64]int64) + userAcitvateJsonMap := make(map[int64]map[int64]int64) + userCurrentDayRegistMap := make(map[int64]map[int64]int64) ParaWeight := getParaWeight() userMetrics := make(map[string]int) var indexTotal int64 @@ -1028,7 +1030,10 @@ func CounDataByDateAndReCount(wikiCountMap map[string]int, startTime time.Time, log.Info("has activity." + userRecord.Name) addUserToMap(userNewAddActivity, userRecord.CreatedUnix, dateRecord.ID) } - + if userRecord.IsActive { + addUserToMap(userAcitvateJsonMap, userRecord.CreatedUnix, dateRecord.ID) + } + addUserToMap(userCurrentDayRegistMap, userRecord.CreatedUnix, dateRecord.ID) } indexTotal += PAGE_SIZE @@ -1064,36 +1069,61 @@ func CounDataByDateAndReCount(wikiCountMap map[string]int, startTime time.Time, } statictisSess.Insert(&useMetrics) //update new user activity - updateNewUserAcitivity(userNewAddActivity, statictisSess) + updateNewUserAcitivity(userNewAddActivity, userAcitvateJsonMap, userCurrentDayRegistMap, statictisSess) return nil } -func updateNewUserAcitivity(currentUserActivity map[int64]map[int64]int64, statictisSess *xorm.Session) { - for key, value := range currentUserActivity { +func updateNewUserAcitivity(currentUserActivity map[int64]map[int64]int64, userAcitvateJsonMap map[int64]map[int64]int64, userCurrentDayRegistMap map[int64]map[int64]int64, statictisSess *xorm.Session) { + for key, value := range userCurrentDayRegistMap { useMetrics := &UserMetrics{CountDate: key} + userAcitvateValue := userAcitvateJsonMap[key] + HuodongValue := currentUserActivity[key] has, err := statictisSess.Get(useMetrics) if err == nil && has { - userIdArrays := strings.Split(useMetrics.HasActivityUserJson, ",") - for _, userIdStr := range userIdArrays { - userIdInt, err := strconv.ParseInt(userIdStr, 10, 64) - if err == nil { - value[userIdInt] = userIdInt - } - } - userIdArray := "" - for _, tmpValue := range value { - userIdArray += fmt.Sprint(tmpValue) + "," - } - useMetrics.HasActivityUser = len(value) - if len(userIdArray) > 0 { - useMetrics.HasActivityUserJson = userIdArray[0 : len(userIdArray)-1] - } - updateSql := "update public.user_metrics set has_activity_user_json='" + useMetrics.HasActivityUserJson + "',regist_activity_user=" + fmt.Sprint(useMetrics.HasActivityUser) + " where count_date=" + fmt.Sprint(key) + ActivityUserArray, HuodongTotal := setUniqueUserId(useMetrics.HasActivityUserJson, HuodongValue) + useMetrics.HasActivityUser = HuodongTotal + useMetrics.HasActivityUserJson = ActivityUserArray + + useMetrics.CurrentDayRegistUser = len(value) + + RegistUserArray, lenRegistUser := setUniqueUserId(useMetrics.ActivityUserJson, userAcitvateValue) + useMetrics.ActivityUserJson = RegistUserArray + useMetrics.ActivateRegistUser = lenRegistUser + + updateSql := "update public.user_metrics set has_activity_user_json='" + useMetrics.HasActivityUserJson + + "',regist_activity_user=" + fmt.Sprint(useMetrics.HasActivityUser) + + ",activity_user_json='" + useMetrics.ActivityUserJson + "'" + + ",activate_regist_user=" + fmt.Sprint(useMetrics.ActivateRegistUser) + + ",not_activate_regist_user=" + fmt.Sprint(useMetrics.CurrentDayRegistUser-useMetrics.ActivateRegistUser) + + ",current_day_regist_user=" + fmt.Sprint(useMetrics.CurrentDayRegistUser) + + " where count_date=" + fmt.Sprint(key) + statictisSess.Exec(updateSql) } } } +func setUniqueUserId(jsonString string, value map[int64]int64) (string, int) { + if value == nil { + value = make(map[int64]int64, 0) + } + userIdArrays := strings.Split(jsonString, ",") + for _, userIdStr := range userIdArrays { + userIdInt, err := strconv.ParseInt(userIdStr, 10, 64) + if err == nil { + value[userIdInt] = userIdInt + } + } + userIdArray := "" + for _, tmpValue := range value { + userIdArray += fmt.Sprint(tmpValue) + "," + } + if len(userIdArray) > 0 { + return userIdArray[0 : len(userIdArray)-1], len(value) + } + return userIdArray, len(value) +} + func addUserToMap(currentUserActivity map[int64]map[int64]int64, registDate timeutil.TimeStamp, userId int64) { CountDateTime := time.Date(registDate.Year(), registDate.AsTime().Month(), registDate.AsTime().Day(), 0, 1, 0, 0, registDate.AsTime().Location()) CountDate := CountDateTime.Unix() @@ -1104,7 +1134,6 @@ func addUserToMap(currentUserActivity map[int64]map[int64]int64, registDate time } else { currentUserActivity[CountDate][userId] = userId } - } func setUserMetrics(userMetrics map[string]int, user *User, start_time int64, end_time int64, dateRecord UserBusinessAnalysis) { diff --git a/models/user_business_struct.go b/models/user_business_struct.go index fec361bca..870a64bc7 100644 --- a/models/user_business_struct.go +++ b/models/user_business_struct.go @@ -467,11 +467,11 @@ type UserAnalysisPara struct { type UserMetrics struct { CountDate int64 `xorm:"pk"` - ActivateRegistUser int `xorm:"NOT NULL DEFAULT 0"` - NotActivateRegistUser int `xorm:"NOT NULL DEFAULT 0"` - ActivateIndex float64 `xorm:"NOT NULL DEFAULT 0"` - RegistActivityUser int `xorm:"NOT NULL DEFAULT 0"` - HasActivityUser int `xorm:"NOT NULL DEFAULT 0"` + ActivateRegistUser int `xorm:"NOT NULL DEFAULT 0"` //当天激活用户 + NotActivateRegistUser int `xorm:"NOT NULL DEFAULT 0"` //当天未激活用户 + ActivateIndex float64 `xorm:"NOT NULL DEFAULT 0"` //激活比率 + RegistActivityUser int `xorm:"NOT NULL DEFAULT 0"` //当天注册激活的人中,有贡献活动的人 + HasActivityUser int `xorm:"NOT NULL DEFAULT 0"` //当天有贡献活动的人 TotalUser int `xorm:"NOT NULL DEFAULT 0"` TotalRegistUser int `xorm:"-"` TotalActivateRegistUser int `xorm:"NOT NULL DEFAULT 0"` @@ -480,5 +480,7 @@ type UserMetrics struct { DisplayDate string `xorm:"-"` DataDate string `xorm:"NULL"` DaysForMonth int `xorm:"NOT NULL DEFAULT 0"` - HasActivityUserJson string `xorm:"text NULL"` + HasActivityUserJson string `xorm:"text NULL"` //贡献活动用户列表 + ActivityUserJson string `xorm:"text NULL"` //激活用户列表 + CurrentDayRegistUser int `xorm:"NOT NULL DEFAULT 0"` //当天注册用户 } diff --git a/modules/auth/grampus.go b/modules/auth/grampus.go new file mode 100755 index 000000000..ebf0defde --- /dev/null +++ b/modules/auth/grampus.go @@ -0,0 +1,26 @@ +package auth + +import ( + "gitea.com/macaron/binding" + "gitea.com/macaron/macaron" +) + +type CreateGrampusTrainJobForm struct { + DisplayJobName string `form:"display_job_name" binding:"Required"` + JobName string `form:"job_name" binding:"Required"` + Attachment string `form:"attachment" binding:"Required"` + BootFile string `form:"boot_file" binding:"Required"` + ImageID string `form:"image_id" binding:"Required"` + FlavorID string `form:"flavor" binding:"Required"` + Params string `form:"run_para_list" binding:"Required"` + Description string `form:"description"` + BranchName string `form:"branch_name" binding:"Required"` + FlavorName string `form:"flavor_name" binding:"Required"` + EngineName string `form:"engine_name" binding:"Required"` + WorkServerNumber int `form:"work_server_number" binding:"Required"` + Image string `form:"image"` +} + +func (f *CreateGrampusTrainJobForm) Validate(ctx *macaron.Context, errs binding.Errors) binding.Errors { + return validate(errs, ctx.Data, f, ctx.Locale) +} diff --git a/modules/auth/wechat/auto_reply.go b/modules/auth/wechat/auto_reply.go new file mode 100644 index 000000000..440f6de6a --- /dev/null +++ b/modules/auth/wechat/auto_reply.go @@ -0,0 +1,139 @@ +package wechat + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "encoding/json" + "github.com/patrickmn/go-cache" + "strings" + "time" +) + +var WechatReplyCache = cache.New(2*time.Minute, 1*time.Minute) + +const ( + WECHAT_REPLY_CACHE_KEY = "wechat_response" +) + +const ( + ReplyTypeText = "text" + ReplyTypeImage = "image" + ReplyTypeVoice = "voice" + ReplyTypeVideo = "video" + ReplyTypeMusic = "music" + ReplyTypeNews = "news" +) + +type ReplyConfigType string + +const ( + SubscribeReply ReplyConfigType = "subscribe" + AutoMsgReply ReplyConfigType = "autoMsg" +) + +func (r ReplyConfigType) Name() string { + switch r { + case SubscribeReply: + return "subscribe" + case AutoMsgReply: + return "autoMsg" + } + return "" +} + +func (r ReplyConfigType) TreePath() string { + switch r { + case SubscribeReply: + return setting.TreePathOfSubscribe + case AutoMsgReply: + return setting.TreePathOfAutoMsgReply + } + return "" +} + +type WechatReplyContent struct { + Reply *ReplyContent + ReplyType string + KeyWords []string + IsFullMatch int +} + +type ReplyContent struct { + Content string + MediaId string + Title string + Description string + MusicUrl string + HQMusicUrl string + ThumbMediaId string + Articles []ArticlesContent +} + +func GetAutomaticReply(msg string) *WechatReplyContent { + r, err := LoadReplyFromCacheAndDisk(AutoMsgReply) + if err != nil { + return nil + } + if r == nil || len(r) == 0 { + return nil + } + for i := 0; i < len(r); i++ { + if r[i].IsFullMatch == 0 { + for _, v := range r[i].KeyWords { + if strings.Contains(msg, v) { + return r[i] + } + } + } else if r[i].IsFullMatch > 0 { + for _, v := range r[i].KeyWords { + if msg == v { + return r[i] + } + } + } + } + return nil + +} + +func loadReplyFromDisk(replyConfig ReplyConfigType) ([]*WechatReplyContent, error) { + log.Info("LoadReply from disk") + repo, err := models.GetRepositoryByOwnerAndAlias(setting.UserNameOfWechatReply, setting.RepoNameOfWechatReply) + if err != nil { + log.Error("get AutomaticReply repo failed, error=%v", err) + return nil, err + } + repoFile, err := models.ReadLatestFileInRepo(setting.UserNameOfWechatReply, repo.Name, setting.RefNameOfWechatReply, replyConfig.TreePath()) + if err != nil { + log.Error("get AutomaticReply failed, error=%v", err) + return nil, err + } + res := make([]*WechatReplyContent, 0) + json.Unmarshal(repoFile.Content, &res) + if res == nil || len(res) == 0 { + return nil, err + } + return res, nil +} + +func LoadReplyFromCacheAndDisk(replyConfig ReplyConfigType) ([]*WechatReplyContent, error) { + v, success := WechatReplyCache.Get(replyConfig.Name()) + if success { + log.Info("LoadReply from cache,value = %v", v) + if v == nil { + return nil, nil + } + n := v.([]*WechatReplyContent) + return n, nil + } + + content, err := loadReplyFromDisk(replyConfig) + if err != nil { + log.Error("LoadReply failed, error=%v", err) + WechatReplyCache.Set(replyConfig.Name(), nil, 30*time.Second) + return nil, err + } + WechatReplyCache.Set(replyConfig.Name(), content, 60*time.Second) + return content, nil +} diff --git a/modules/auth/wechat/client.go b/modules/auth/wechat/client.go index 5a81aa808..28d448b53 100644 --- a/modules/auth/wechat/client.go +++ b/modules/auth/wechat/client.go @@ -17,7 +17,8 @@ var ( const ( GRANT_TYPE = "client_credential" ACCESS_TOKEN_PATH = "/cgi-bin/token" - QR_CODE_Path = "/cgi-bin/qrcode/create" + QR_CODE_PATH = "/cgi-bin/qrcode/create" + GET_MATERIAL_PATH = "/cgi-bin/material/batchget_material" ACTION_QR_STR_SCENE = "QR_STR_SCENE" ERR_CODE_ACCESSTOKEN_EXPIRE = 42001 @@ -40,6 +41,11 @@ type QRCodeRequest struct { Action_info ActionInfo `json:"action_info"` Expire_seconds int `json:"expire_seconds"` } +type MaterialRequest struct { + Type string `json:"type"` + Offset int `json:"offset"` + Count int `json:"count"` +} type ActionInfo struct { Scene Scene `json:"scene"` @@ -99,7 +105,7 @@ func callQRCodeCreate(sceneStr string) (*QRCodeResponse, bool) { SetQueryParam("access_token", GetWechatAccessToken()). SetBody(bodyJson). SetResult(&result). - Post(setting.WechatApiHost + QR_CODE_Path) + Post(setting.WechatApiHost + QR_CODE_PATH) if err != nil { log.Error("create QR code failed,e=%v", err) return nil, false @@ -115,6 +121,37 @@ func callQRCodeCreate(sceneStr string) (*QRCodeResponse, bool) { return &result, false } +//getMaterial +// api doc: https://developers.weixin.qq.com/doc/offiaccount/Asset_Management/Get_materials_list.html +func getMaterial(mType string, offset, count int) (interface{}, bool) { + client := getWechatRestyClient() + + body := &MaterialRequest{ + Type: mType, + Offset: offset, + Count: count, + } + bodyJson, _ := json.Marshal(body) + r, err := client.R(). + SetHeader("Content-Type", "application/json"). + SetQueryParam("access_token", GetWechatAccessToken()). + SetBody(bodyJson). + Post(setting.WechatApiHost + GET_MATERIAL_PATH) + if err != nil { + log.Error("create QR code failed,e=%v", err) + return nil, false + } + a := r.Body() + resultMap := make(map[string]interface{}, 0) + json.Unmarshal(a, &resultMap) + errcode := resultMap["errcode"] + if errcode == fmt.Sprint(ERR_CODE_ACCESSTOKEN_EXPIRE) || errcode == fmt.Sprint(ERR_CODE_ACCESSTOKEN_INVALID) { + return nil, true + } + log.Info("%v", r) + return &resultMap, false +} + func getErrorCodeFromResponse(r *resty.Response) int { a := r.Body() resultMap := make(map[string]interface{}, 0) diff --git a/modules/auth/wechat/event_handle.go b/modules/auth/wechat/event_handle.go index 399537f1e..e20f106a6 100644 --- a/modules/auth/wechat/event_handle.go +++ b/modules/auth/wechat/event_handle.go @@ -20,7 +20,7 @@ import ( // // // -type WechatEvent struct { +type WechatMsg struct { ToUserName string FromUserName string CreateTime int64 @@ -28,9 +28,13 @@ type WechatEvent struct { Event string EventKey string Ticket string + Content string + MsgId string + MsgDataId string + Idx string } -type EventReply struct { +type MsgReply struct { XMLName xml.Name `xml:"xml"` ToUserName string FromUserName string @@ -39,16 +43,97 @@ type EventReply struct { Content string } +type TextMsgReply struct { + XMLName xml.Name `xml:"xml"` + ToUserName string + FromUserName string + CreateTime int64 + MsgType string + Content string +} +type ImageMsgReply struct { + XMLName xml.Name `xml:"xml"` + ToUserName string + FromUserName string + CreateTime int64 + MsgType string + Image ImageContent +} +type VoiceMsgReply struct { + XMLName xml.Name `xml:"xml"` + ToUserName string + FromUserName string + CreateTime int64 + MsgType string + Voice VoiceContent +} +type VideoMsgReply struct { + XMLName xml.Name `xml:"xml"` + ToUserName string + FromUserName string + CreateTime int64 + MsgType string + Video VideoContent +} +type MusicMsgReply struct { + XMLName xml.Name `xml:"xml"` + ToUserName string + FromUserName string + CreateTime int64 + MsgType string + Music MusicContent +} +type NewsMsgReply struct { + XMLName xml.Name `xml:"xml"` + ToUserName string + FromUserName string + CreateTime int64 + MsgType string + ArticleCount int + Articles ArticleItem +} + +type ArticleItem struct { + Item []ArticlesContent +} + +type ImageContent struct { + MediaId string +} +type VoiceContent struct { + MediaId string +} +type VideoContent struct { + MediaId string + Title string + Description string +} +type MusicContent struct { + Title string + Description string + MusicUrl string + HQMusicUrl string + ThumbMediaId string +} +type ArticlesContent struct { + XMLName xml.Name `xml:"item"` + Title string + Description string + PicUrl string + Url string +} + const ( WECHAT_EVENT_SUBSCRIBE = "subscribe" WECHAT_EVENT_SCAN = "SCAN" ) const ( - WECHAT_MSG_TYPE_TEXT = "text" + WECHAT_MSG_TYPE_TEXT = "text" + WECHAT_MSG_TYPE_EVENT = "event" ) -func HandleSubscribeEvent(we WechatEvent) string { +func HandleScanEvent(we WechatMsg) string { eventKey := we.EventKey if eventKey == "" { return "" @@ -80,3 +165,11 @@ func HandleSubscribeEvent(we WechatEvent) string { return BIND_REPLY_SUCCESS } + +func HandleSubscribeEvent(we WechatMsg) *WechatReplyContent { + r, err := LoadReplyFromCacheAndDisk(SubscribeReply) + if err != nil || len(r) == 0 { + return nil + } + return r[0] +} diff --git a/modules/auth/wechat/material.go b/modules/auth/wechat/material.go new file mode 100644 index 000000000..526156af5 --- /dev/null +++ b/modules/auth/wechat/material.go @@ -0,0 +1,13 @@ +package wechat + +import "code.gitea.io/gitea/modules/log" + +func GetWechatMaterial(mType string, offset, count int) interface{} { + result, retryFlag := getMaterial(mType, offset, count) + if retryFlag { + log.Info("retryGetWechatMaterial calling") + refreshAccessToken() + result, _ = getMaterial(mType, offset, count) + } + return result +} diff --git a/modules/cloudbrain/cloudbrain.go b/modules/cloudbrain/cloudbrain.go index 8fb13ca4c..dea3f2768 100755 --- a/modules/cloudbrain/cloudbrain.go +++ b/modules/cloudbrain/cloudbrain.go @@ -48,13 +48,10 @@ func isAdminOrOwnerOrJobCreater(ctx *context.Context, job *models.Cloudbrain, er if !ctx.IsSigned { return false } - log.Info("is repo owner:" + strconv.FormatBool(ctx.IsUserRepoOwner())) - log.Info("is user admin:" + strconv.FormatBool(ctx.IsUserSiteAdmin())) if err != nil { return ctx.IsUserRepoOwner() || ctx.IsUserSiteAdmin() } else { - log.Info("is job creator:" + strconv.FormatBool(ctx.User.ID == job.UserID)) return ctx.IsUserRepoOwner() || ctx.IsUserSiteAdmin() || ctx.User.ID == job.UserID } diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go new file mode 100755 index 000000000..e83ccccc6 --- /dev/null +++ b/modules/grampus/grampus.go @@ -0,0 +1,174 @@ +package grampus + +import ( + "encoding/json" + "strings" + + "code.gitea.io/gitea/modules/setting" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/notification" + "code.gitea.io/gitea/modules/timeutil" +) + +const ( + JobPath = "job/" + + ProcessorTypeNPU = "npu.huawei.com/NPU" + ProcessorTypeGPU = "nvidia.com/gpu" + + GpuWorkDir = "/tmp/" + NpuWorkDir = "/cache/" + + CommandPrepareScript = ";mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" + + "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_obs downloader_for_minio uploader_for_minio;" + //CommandPrepareScript = "pwd;cd /cache;mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" + + // "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_obs downloader_for_minio uploader_for_minio;" + + CodeArchiveName = "master.zip" +) + +var ( + poolInfos *models.PoolInfos + FlavorInfos *models.FlavorInfos + ImageInfos *models.ImageInfosModelArts + + SpecialPools *models.SpecialPools +) + +type GenerateTrainJobReq struct { + JobName string + Command string + ResourceSpecId string + ImageUrl string //与image_id二选一,都有的情况下优先image_url + ImageId string + + DisplayJobName string + Uuid string + Description string + CodeObsPath string + BootFile string + BootFileUrl string + DataUrl string + TrainUrl string + WorkServerNumber int + EngineID int64 + CommitID string + IsLatestVersion string + BranchName string + PreVersionId int64 + PreVersionName string + FlavorName string + VersionCount int + EngineName string + TotalVersionCount int + ComputeResource string + DatasetName string + Params string +} + +func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) { + createTime := timeutil.TimeStampNow() + + var CenterID []string + var CenterName []string + + if SpecialPools != nil { + for _, pool := range SpecialPools.Pools { + if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) { + org, _ := models.GetOrgByName(pool.Org) + if org != nil { + isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) + if isOrgMember { + for _, info := range pool.Pool { + CenterID = append(CenterID, info.Queue) + CenterName = append(CenterName, info.Value) + } + } + } + } + } + } + + jobResult, err := createJob(models.CreateGrampusJobRequest{ + Name: req.JobName, + Tasks: []models.GrampusTasks{ + { + Name: req.JobName, + Command: req.Command, + ResourceSpecId: req.ResourceSpecId, + ImageId: req.ImageId, + ImageUrl: req.ImageUrl, + CenterID: CenterID, + CenterName: CenterName, + ReplicaNum: 1, + }, + }, + }) + if err != nil { + log.Error("createJob failed: %v", err.Error()) + return err + } + + jobID := jobResult.JobInfo.JobID + err = models.CreateCloudbrain(&models.Cloudbrain{ + Status: TransTrainJobStatus(jobResult.JobInfo.Status), + UserID: ctx.User.ID, + RepoID: ctx.Repo.Repository.ID, + JobID: jobID, + JobName: req.JobName, + DisplayJobName: req.DisplayJobName, + JobType: string(models.JobTypeTrain), + Type: models.TypeC2Net, + Uuid: req.Uuid, + DatasetName: req.DatasetName, + CommitID: req.CommitID, + IsLatestVersion: req.IsLatestVersion, + ComputeResource: req.ComputeResource, + ImageID: req.ImageId, + TrainUrl: req.TrainUrl, + BranchName: req.BranchName, + Parameters: req.Params, + BootFile: req.BootFile, + DataUrl: req.DataUrl, + FlavorCode: req.ResourceSpecId, + Description: req.Description, + WorkServerNumber: req.WorkServerNumber, + FlavorName: req.FlavorName, + EngineName: req.EngineName, + VersionCount: req.VersionCount, + TotalVersionCount: req.TotalVersionCount, + CreatedUnix: createTime, + UpdatedUnix: createTime, + }) + + if err != nil { + log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error()) + return err + } + + var actionType models.ActionType + if req.ComputeResource == models.NPUResource { + actionType = models.ActionCreateGrampusNPUTrainTask + } else if req.ComputeResource == models.GPUResource { + actionType = models.ActionCreateGrampusGPUTrainTask + } + notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType) + + return nil +} + +func TransTrainJobStatus(status string) string { + if status == models.GrampusStatusPending { + status = models.GrampusStatusWaiting + } + + return strings.ToUpper(status) +} +func InitSpecialPool() { + if SpecialPools == nil && setting.Grampus.SpecialPools != "" { + json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools) + } +} diff --git a/modules/grampus/resty.go b/modules/grampus/resty.go new file mode 100755 index 000000000..5e8722b4b --- /dev/null +++ b/modules/grampus/resty.go @@ -0,0 +1,277 @@ +package grampus + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "crypto/tls" + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + "net/http" +) + +var ( + restyClient *resty.Client + HOST string + TOKEN string +) + +const ( + urlOpenApiV1 = "/openapi/v1/" + + urlGetToken = urlOpenApiV1 + "token" + urlTrainJob = urlOpenApiV1 + "trainjob" + urlGetResourceSpecs = urlOpenApiV1 + "resourcespec" + urlGetImages = urlOpenApiV1 + "image" + + errorIllegalToken = 1005 +) + +type GetTokenParams struct { + UserName string `json:"username"` + Password string `json:"password"` +} + +type GetTokenResult struct { + Token string `json:"token"` + Expiration int64 `json:"expiration"` +} + +func getRestyClient() *resty.Client { + if restyClient == nil { + restyClient = resty.New() + restyClient.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + } + return restyClient +} + +func checkSetting() { + if len(HOST) != 0 && len(TOKEN) != 0 && restyClient != nil { + return + } + + err := getToken() + if err != nil { + log.Error("getToken failed:%v", err) + } +} + +func getToken() error { + HOST = setting.Grampus.Host + + client := getRestyClient() + params := GetTokenParams{ + UserName: setting.Grampus.UserName, + Password: setting.Grampus.Password, + } + + var result GetTokenResult + res, err := client.R(). + SetHeader("Content-Type", "application/json"). + SetBody(params). + SetResult(&result). + Post(HOST + urlGetToken) + if err != nil { + return fmt.Errorf("resty getToken: %v", err) + } + + if res.StatusCode() != http.StatusOK { + return fmt.Errorf("getToken failed:%s", res.String()) + } + + TOKEN = result.Token + + return nil +} + +func createJob(req models.CreateGrampusJobRequest) (*models.CreateGrampusJobResponse, error) { + checkSetting() + client := getRestyClient() + var result models.CreateGrampusJobResponse + + retry := 0 + +sendjob: + _, err := client.R(). + SetHeader("Content-Type", "application/json"). + SetAuthToken(TOKEN). + SetBody(req). + SetResult(&result). + Post(HOST + urlTrainJob) + + if err != nil { + return nil, fmt.Errorf("resty CreateJob: %s", err) + } + + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("CreateJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("CreateJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} + +func GetJob(jobID string) (*models.GetGrampusJobResponse, error) { + checkSetting() + client := getRestyClient() + var result models.GetGrampusJobResponse + + retry := 0 + +sendjob: + _, err := client.R(). + SetAuthToken(TOKEN). + SetResult(&result). + Get(HOST + urlTrainJob + "/" + jobID) + + if err != nil { + return nil, fmt.Errorf("resty GetJob: %v", err) + } + + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + log.Info("retry get token") + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return nil, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} + +func GetResourceSpecs(processorType string) (*models.GetGrampusResourceSpecsResult, error) { + checkSetting() + client := getRestyClient() + var result models.GetGrampusResourceSpecsResult + + retry := 0 + +sendjob: + _, err := client.R(). + SetAuthToken(TOKEN). + SetResult(&result). + Get(HOST + urlGetResourceSpecs + "?processorType=" + processorType) + + if err != nil { + return nil, fmt.Errorf("resty GetResourceSpecs: %v", err) + } + + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + log.Info("retry get token") + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("GetResourceSpecs failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("GetResourceSpecs failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} + +func GetImages(processorType string) (*models.GetGrampusImagesResult, error) { + checkSetting() + client := getRestyClient() + var result models.GetGrampusImagesResult + + retry := 0 + +sendjob: + _, err := client.R(). + SetAuthToken(TOKEN). + SetResult(&result). + Get(HOST + urlGetImages + "?processorType=" + processorType) + + if err != nil { + return nil, fmt.Errorf("resty GetImages: %v", err) + } + + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + log.Info("retry get token") + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("GetImages failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("GetImages failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} + +func GetTrainJobLog(jobID string) (string, error) { + checkSetting() + client := getRestyClient() + var logContent string + + res, err := client.R(). + SetAuthToken(TOKEN). + SetResult(&logContent). + Get(HOST + urlTrainJob + "/" + jobID + "/task/0/replica/0/log") + + if err != nil { + return logContent, fmt.Errorf("resty GetTrainJobLog: %v", err) + } + + if res.StatusCode() != http.StatusOK { + var temp models.GrampusResult + if err = json.Unmarshal([]byte(res.String()), &temp); err != nil { + log.Error("json.Unmarshal failed(%s): %v", res.String(), err.Error()) + return logContent, fmt.Errorf("json.Unmarshal failed(%s): %v", res.String(), err.Error()) + } + log.Error("GetTrainJobLog failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg) + return logContent, fmt.Errorf("GetTrainJobLog failed(%d):%s(%s)", res.StatusCode(), temp.ErrorCode, temp.ErrorMsg) + } + + logContent = res.String() + + return logContent, nil +} + +func StopJob(jobID string) (*models.GrampusStopJobResponse, error) { + checkSetting() + client := getRestyClient() + var result models.GrampusStopJobResponse + + retry := 0 + +sendjob: + _, err := client.R(). + //SetHeader("Content-Type", "application/json"). + SetAuthToken(TOKEN). + SetResult(&result). + Post(HOST + urlTrainJob + "/" + jobID + "/stop") + + if err != nil { + return &result, fmt.Errorf("resty StopTrainJob: %v", err) + } + + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + log.Info("retry get token") + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 217388789..6a2405b06 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -531,6 +531,15 @@ var ( FlavorInfos string TrainJobFLAVORINFOS string + //grampus config + Grampus = struct { + Env string + Host string + UserName string + Password string + SpecialPools string + }{} + //elk config ElkUrl string ElkUser string @@ -553,6 +562,13 @@ var ( CloudBrainPayDelay time.Duration CloudBrainPayInterval time.Duration + //wechat auto reply config + UserNameOfWechatReply string + RepoNameOfWechatReply string + RefNameOfWechatReply string + TreePathOfAutoMsgReply string + TreePathOfSubscribe string + //nginx proxy PROXYURL string RadarMap = struct { @@ -1380,6 +1396,12 @@ func NewContext() { WechatAppSecret = sec.Key("APP_SECRET").MustString("e48e13f315adc32749ddc7057585f198") WechatQRCodeExpireSeconds = sec.Key("QR_CODE_EXPIRE_SECONDS").MustInt(120) WechatAuthSwitch = sec.Key("AUTH_SWITCH").MustBool(false) + UserNameOfWechatReply = sec.Key("AUTO_REPLY_USER_NAME").MustString("OpenIOSSG") + RepoNameOfWechatReply = sec.Key("AUTO_REPLY_REPO_NAME").MustString("promote") + RefNameOfWechatReply = sec.Key("AUTO_REPLY_REF_NAME").MustString("master") + TreePathOfAutoMsgReply = sec.Key("AUTO_REPLY_TREE_PATH").MustString("wechat/auto_reply.json") + TreePathOfSubscribe = sec.Key("SUBSCRIBE_TREE_PATH").MustString("wechat/subscribe_reply.json") + WechatAuthSwitch = sec.Key("AUTH_SWITCH").MustBool(false) sec = Cfg.Section("point") CloudBrainPaySwitch = sec.Key("CLOUDBRAIN_PAY_SWITCH").MustBool(false) @@ -1395,6 +1417,18 @@ func NewContext() { Course.OrgName = sec.Key("org_name").MustString("") Course.TeamName = sec.Key("team_name").MustString("") + GetGrampusConfig() +} + +func GetGrampusConfig() { + sec := Cfg.Section("grampus") + + Grampus.Env = sec.Key("ENV").MustString("TEST") + Grampus.Host = sec.Key("SERVER_HOST").MustString("") + Grampus.UserName = sec.Key("USERNAME").MustString("") + Grampus.Password = sec.Key("PASSWORD").MustString("") + Grampus.SpecialPools = sec.Key("SPECIAL_POOL").MustString("") + } func SetRadarMapConfig() { diff --git a/modules/setting/webhook.go b/modules/setting/webhook.go index 34cf8a62d..a14ad949f 100644 --- a/modules/setting/webhook.go +++ b/modules/setting/webhook.go @@ -6,6 +6,7 @@ package setting import ( "net/url" + "strings" "code.gitea.io/gitea/modules/log" ) @@ -13,14 +14,18 @@ import ( var ( // Webhook settings Webhook = struct { - QueueLength int - DeliverTimeout int - SkipTLSVerify bool - Types []string - PagingNum int - ProxyURL string - ProxyURLFixed *url.URL - ProxyHosts []string + QueueLength int + DeliverTimeout int + SkipTLSVerify bool + Types []string + PagingNum int + ProxyURL string + ProxyURLFixed *url.URL + ProxyHosts []string + Socks5Proxy string + Socks5UserName string + Socks5Password string + Socks5ProxyHosts []string }{ QueueLength: 1000, DeliverTimeout: 5, @@ -39,6 +44,10 @@ func newWebhookService() { Webhook.Types = []string{"gitea", "gogs", "slack", "discord", "dingtalk", "telegram", "msteams", "feishu", "matrix"} Webhook.PagingNum = sec.Key("PAGING_NUM").MustInt(10) Webhook.ProxyURL = sec.Key("PROXY_URL").MustString("") + Webhook.Socks5Proxy = sec.Key("SOCKS5_PROXY_URL").MustString("") + Webhook.Socks5UserName = sec.Key("SOCKS5_USER_NAME").MustString("") + Webhook.Socks5Password = sec.Key("SOCKS5_PASSWORD").MustString("") + Webhook.Socks5ProxyHosts = strings.Split(sec.Key("SOCKS5_PROXY_HOST").MustString(""), ";") if Webhook.ProxyURL != "" { var err error Webhook.ProxyURLFixed, err = url.Parse(Webhook.ProxyURL) diff --git a/modules/util/path.go b/modules/util/path.go old mode 100644 new mode 100755 index 2b198eb6d..1db6e4379 --- a/modules/util/path.go +++ b/modules/util/path.go @@ -31,3 +31,13 @@ func GetDirectorySize(path string) (int64, error) { }) return size, err } + +// check whether the path is dir +func IsDir(path string) bool { + s, err := os.Stat(path) + if err != nil { + return false + } + + return s.IsDir() +} diff --git a/modules/util/util.go b/modules/util/util.go index 134fd1296..87dd5b700 100755 --- a/modules/util/util.go +++ b/modules/util/util.go @@ -115,7 +115,7 @@ func AddZero(t int64) (m string) { func ConvertDisplayJobNameToJobName(DisplayName string) (JobName string) { t := time.Now() - JobName = "openi" + strings.ToLower(cutNameString(DisplayName, 15)) + "t" + t.Format("20060102150405")[4:] + strconv.Itoa(int(rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(100000))) + JobName = strings.ToLower(cutNameString(DisplayName, 15)) + "t" + t.Format("20060102150405")[10:] + strconv.Itoa(int(rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(100000))) return JobName } diff --git a/modules/webhook/deliver.go b/modules/webhook/deliver.go index 7b0c65173..8348e8641 100644 --- a/modules/webhook/deliver.go +++ b/modules/webhook/deliver.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "golang.org/x/net/proxy" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" @@ -137,8 +139,10 @@ func Deliver(t *models.HookTask) error { return } }() + match := isSocks5ProxyUrlMatch(req) + + resp, err := makeReq(req, match) - resp, err := webhookHTTPClient.Do(req) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) return err @@ -161,6 +165,23 @@ func Deliver(t *models.HookTask) error { return nil } +func makeReq(req *http.Request, proxyMatch bool) (*http.Response, error) { + if proxyMatch { + return webhookSocks5PoxyHTTPClient.Do(req) + } + return webhookHTTPClient.Do(req) +} + +func isSocks5ProxyUrlMatch(req *http.Request) bool { + + for _, v := range socks5HostMatchers { + if v.Match(req.URL.Host) { + return true + } + } + return false +} + // DeliverHooks checks and delivers undelivered hooks. // FIXME: graceful: This would likely benefit from either a worker pool with dummy queue // or a full queue. Then more hooks could be sent at same time. @@ -225,9 +246,11 @@ func DeliverHooks(ctx context.Context) { } var ( - webhookHTTPClient *http.Client - once sync.Once - hostMatchers []glob.Glob + webhookHTTPClient *http.Client + once sync.Once + hostMatchers []glob.Glob + webhookSocks5PoxyHTTPClient *http.Client + socks5HostMatchers []glob.Glob ) func webhookProxy() func(req *http.Request) (*url.URL, error) { @@ -274,5 +297,31 @@ func InitDeliverHooks() { }, } + if setting.Webhook.Socks5Proxy != "" { + auth := proxy.Auth{ + User: setting.Webhook.Socks5UserName, + Password: setting.Webhook.Socks5Password, + } + + dialSocksProxy, err := proxy.SOCKS5("tcp", setting.Webhook.Socks5Proxy, &auth, proxy.Direct) + if err != nil { + fmt.Println("Error connecting to proxy:", err) + } + tr := &http.Transport{Dial: dialSocksProxy.Dial} + + webhookSocks5PoxyHTTPClient = &http.Client{ + Transport: tr, + } + + for _, h := range setting.Webhook.Socks5ProxyHosts { + if g, err := glob.Compile(h); err == nil { + socks5HostMatchers = append(socks5HostMatchers, g) + } else { + log.Error("glob.Compile %s failed: %v", h, err) + } + } + + } + go graceful.GetManager().RunWithShutdownContext(DeliverHooks) } diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index b04534636..8b465d3e0 100755 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -1176,6 +1176,11 @@ model.manage.sava_model = Sava Model model.manage.model_manage = ModelManage model.manage.model_accuracy = Model Accuracy +grampus.train_job.ai_center = AI Center +grampus.dataset_path_rule = The code is storaged in /cache/code;the dataset is storaged in /cache/dataset;and please put your model into /cache/output, then you can download it online。 +grampus.gpu_dataset_path_rule = The code is storaged in /tmp/code;the dataset is storaged in /tmp/dataset;and please put your model into /tmp/output, then you can download it online。 +grampus.no_operate_right = You have no right to do this operation. + template.items = Template Items template.git_content = Git Content (Default Branch) template.git_hooks = Git Hooks @@ -2935,6 +2940,8 @@ task_inferencejob=`created reasoning task %s` task_createmodel=`created new model %s` task_gputrainjob=`created CPU/GPU training task%s` +task_c2netnputrainjob=`created NPU training task%s` +task_c2netgputrainjob=`created CPU/GPU training task%s` [tool] ago = %s ago @@ -3023,6 +3030,9 @@ Platform_Tutorial = Tutorial foot.advice_feedback = Feedback [cloudbrain] +resource_cluster = Resource Cluster +resource_cluster_openi = OpenI Resource Cluster +resource_cluster_c2net = China Computing NET compute_resource = Computing resources task_name = Task name task_type = Task type diff --git a/options/locale/locale_zh-CN.ini b/options/locale/locale_zh-CN.ini index c6f42ddf5..c317216e9 100755 --- a/options/locale/locale_zh-CN.ini +++ b/options/locale/locale_zh-CN.ini @@ -1190,6 +1190,12 @@ model.manage.sava_model = 保存模型 model.manage.model_manage = 模型管理 model.manage.model_accuracy = 模型精度 +grampus.train_job.ai_center=智算中心 +grampus.dataset_path_rule = 训练脚本存储在/cache/code中,数据集存储在/cache/dataset中,训练输出请存储在/cache/output中以供后续下载。 +grampus.gpu_dataset_path_rule = 训练脚本存储在/tmp/code中,数据集存储在/tmp/dataset中,训练输出请存储在/tmp/output中以供后续下载。 + +grampus.no_operate_right = 您没有权限创建这类任务。 + template.items=模板选项 template.git_content=Git数据(默认分支) template.git_hooks=Git 钩子 @@ -2950,6 +2956,8 @@ task_inferencejob=`创建了推理任务 %s` task_createmodel=`导入了新模型 %s` task_gputrainjob=`创建了CPU/GPU类型训练任务 %s` +task_c2netnputrainjob=`创建了NPU类型训练任务 %s` +task_c2netgputrainjob=`创建了CPU/GPU类型训练任务 %s` [tool] ago=%s前 @@ -3038,6 +3046,9 @@ Platform_Tutorial=新手指引 foot.advice_feedback = 意见反馈 [cloudbrain] +resource_cluster = 算力集群 +resource_cluster_openi = 启智集群 +resource_cluster_c2net = 智算网络集群 compute_resource = 计算资源 task_name = 任务名称 task_type = 任务类型 diff --git a/public/home/home.js b/public/home/home.js index f772403f1..95ea3da4c 100755 --- a/public/home/home.js +++ b/public/home/home.js @@ -156,7 +156,7 @@ document.onreadystatechange = function () { html += recordPrefix + actionName; html += " " + getRepotext(record) + "" } - else if(record.OpType == "24" || record.OpType == "26" || record.OpType == "27" || record.OpType == "28" || record.OpType == "30" || record.OpType == "31"){ + else if(record.OpType == "24" || record.OpType == "26" || record.OpType == "27" || record.OpType == "28" || record.OpType == "30" || record.OpType == "31" || record.OpType == "32" || record.OpType == "33"){ html += recordPrefix + actionName; html += " " + record.RefName + "" } @@ -201,6 +201,8 @@ function getTaskLink(record){ re = re + "/modelmanage/show_model_info?name=" + record.RefName; }else if(record.OpType == 31){ re = re + "/cloudbrain/train-job/" + record.Content; + }else if(record.OpType == 32 || record.OpType == 33){ + re = re + "/grampus/train-job/" + record.Content; } re = encodeURI(re); return re; @@ -374,7 +376,9 @@ var actionNameZH={ "28":"创建了推理任务", "29":"创建了评测任务", "30":"导入了新模型", - "31":"创建了CPU/GPU类型训练任务" + "31":"创建了CPU/GPU类型训练任务", + "32":"创建了NPU类型训练任务", + "33":"创建了CPU/GPU类型训练任务" }; var actionNameEN={ @@ -401,6 +405,8 @@ var actionNameEN={ "29":" created profiling task", "30":" created new model", "31":" created CPU/GPU type training task", + "32":" created NPU type training task", + "33":" created CPU/GPU type training task" }; var repoAndOrgZH={ diff --git a/public/home/search.js b/public/home/search.js index f9e815da3..86b2ad06e 100644 --- a/public/home/search.js +++ b/public/home/search.js @@ -918,6 +918,8 @@ function LetterAvatar(name, size, color) { } else { initials = nameSplit[0].charAt(0) + nameSplit[1].charAt(0); } + let initials1 = initials.toUpperCase(); + initials.toUpperCase(); if (w.devicePixelRatio) { size = size * w.devicePixelRatio; } @@ -934,7 +936,7 @@ function LetterAvatar(name, size, color) { context.font = Math.round(canvas.width / 2) + "px 'Microsoft Yahei'"; context.textAlign = "center"; context.fillStyle = "#FFF"; - context.fillText(initials, size / 2, size / 1.5); + context.fillText(initials1, size / 2, size / 1.5); dataURI = canvas.toDataURL(); canvas = null; return dataURI; @@ -1009,9 +1011,9 @@ function displayRepoResult(page, jsonResult, onlyReturnNum, keyword) { var recordMap = data[i]; html += '
'; if (recordMap["avatar"]) { - html += ``; + html += ``; } else { - html += ``; + html += ``; } html += '
'; diff --git a/routers/admin/cloudbrains.go b/routers/admin/cloudbrains.go index 0481e6743..8cfe10795 100755 --- a/routers/admin/cloudbrains.go +++ b/routers/admin/cloudbrains.go @@ -43,12 +43,6 @@ func CloudBrains(ctx *context.Context) { if page <= 0 { page = 1 } - debugType := models.TypeCloudBrainAll - if listType == models.GPUResource { - debugType = models.TypeCloudBrainOne - } else if listType == models.NPUResource { - debugType = models.TypeCloudBrainTwo - } var jobTypes []string jobTypeNot := false @@ -77,13 +71,14 @@ func CloudBrains(ctx *context.Context) { PageSize: setting.UI.IssuePagingNum, }, Keyword: keyword, - Type: debugType, JobTypeNot: jobTypeNot, JobStatusNot: jobStatusNot, JobStatus: jobStatuses, JobTypes: jobTypes, NeedRepoInfo: true, IsLatestVersion: modelarts.IsLatestVersion, + ComputeResource: listType, + Type: models.TypeCloudBrainAll, }) if err != nil { ctx.ServerError("Get job failed:", err) diff --git a/routers/api/v1/api.go b/routers/api/v1/api.go index 0c280b0cb..e6c572e73 100755 --- a/routers/api/v1/api.go +++ b/routers/api/v1/api.go @@ -947,6 +947,15 @@ func RegisterRoutes(m *macaron.Macaron) { }) }) }, reqRepoReader(models.UnitTypeCloudBrain)) + m.Group("/grampus", func() { + m.Group("/train-job", func() { + m.Group("/:jobid", func() { + m.Get("", repo.GetModelArtsTrainJobVersion) + m.Post("/stop_version", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo_ext.GrampusStopJob) + m.Get("/log", repo_ext.GrampusGetLog) + }) + }) + }, reqRepoReader(models.UnitTypeCloudBrain)) }, repoAssignment()) }) @@ -1046,6 +1055,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Get("/prd/event", authentication.ValidEventSource) m.Post("/prd/event", authentication.AcceptWechatEvent) }) + m.Get("/wechat/material", authentication.GetMaterial) }, securityHeaders(), context.APIContexter(), sudo()) } diff --git a/routers/api/v1/repo/cloudbrain_dashboard.go b/routers/api/v1/repo/cloudbrain_dashboard.go old mode 100644 new mode 100755 index f102a0f05..cc125c97f --- a/routers/api/v1/repo/cloudbrain_dashboard.go +++ b/routers/api/v1/repo/cloudbrain_dashboard.go @@ -103,7 +103,7 @@ func GetAllCloudbrainsOverview(ctx *context.Context) { if cloudbrain.Cloudbrain.Type == models.TypeCloudBrainTwo { cloudBrainTwoDuration = cloudBrainTwoDuration + cloudbrain.Cloudbrain.Duration } - if cloudbrain.Cloudbrain.Type == models.TypeIntelligentNet { + if cloudbrain.Cloudbrain.Type == models.TypeC2Net { intelligentNetDuration = intelligentNetDuration + cloudbrain.Cloudbrain.Duration } @@ -540,7 +540,7 @@ func GetAllCloudbrainsPeriodDistribution(ctx *context.Context) { cloudTwoJobTypeRes[cloudbrain.JobType] += 1 } } - if cloudbrain.Cloudbrain.Type == models.TypeIntelligentNet { + if cloudbrain.Cloudbrain.Type == models.TypeC2Net { if _, ok := intelligentNetJobTypeRes[cloudbrain.JobType]; !ok { intelligentNetJobTypeRes[cloudbrain.JobType] = 1 } else { @@ -1287,7 +1287,7 @@ func getCloudbrainType(rs *models.CloudbrainInfo, ctx *context.Context) string { return ctx.Tr("repo.cloudbrain1") } else if rs.Cloudbrain.Type == models.TypeCloudBrainTwo { return ctx.Tr("repo.cloudbrain2") - } else if rs.Cloudbrain.Type == models.TypeIntelligentNet { + } else if rs.Cloudbrain.Type == models.TypeC2Net { return ctx.Tr("repo.intelligent_net") } else { return ctx.Tr("repo.cloudbrain_untype") diff --git a/routers/api/v1/repo/modelarts.go b/routers/api/v1/repo/modelarts.go index c14976282..2a0ce19db 100755 --- a/routers/api/v1/repo/modelarts.go +++ b/routers/api/v1/repo/modelarts.go @@ -6,6 +6,8 @@ package repo import ( + "code.gitea.io/gitea/modules/grampus" + "encoding/json" "net/http" "strconv" "strings" @@ -125,7 +127,8 @@ func GetModelArtsTrainJob(ctx *context.APIContext) { func GetModelArtsTrainJobVersion(ctx *context.APIContext) { var ( - err error + err error + aiCenterName string ) jobID := ctx.Params(":jobid") @@ -167,7 +170,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { log.Error("UpdateJob failed:", err) } } - } else { + } else if job.Type == models.TypeCloudBrainTwo { result, err := modelarts.GetTrainJob(jobID, strconv.FormatInt(job.VersionID, 10)) if err != nil { ctx.NotFound(err) @@ -189,12 +192,50 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { if err != nil { log.Error("UpdateJob failed:", err) } + } else if job.Type == models.TypeC2Net { + result, err := grampus.GetJob(jobID) + if err != nil { + log.Error("GetJob(%s) failed:%v", job.JobName, err) + ctx.NotFound(err) + return + } + + if job.StartTime == 0 && result.JobInfo.StartedAt > 0 { + job.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) + } + job.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + job.Duration = result.JobInfo.RunSec + job.TrainJobDuration = models.ConvertDurationToStr(job.Duration) + + if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 { + job.EndTime = job.StartTime.Add(job.Duration) + } + job.CorrectCreateUnix() + + if len(job.AiCenter) == 0 { + if len(result.JobInfo.Tasks) > 0 { + if len(result.JobInfo.Tasks[0].CenterID) > 0 && len(result.JobInfo.Tasks[0].CenterName) > 0 { + job.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] + aiCenterName = result.JobInfo.Tasks[0].CenterName[0] + } + } + } else { + temp := strings.Split(job.AiCenter, "+") + if len(temp) > 1 { + aiCenterName = temp[1] + } + } + err = models.UpdateTrainJobVersion(job) + if err != nil { + log.Error("UpdateJob failed:", err) + } } ctx.JSON(http.StatusOK, map[string]interface{}{ "JobID": jobID, "JobStatus": job.Status, "JobDuration": job.TrainJobDuration, + "AiCenter": aiCenterName, }) } @@ -373,11 +414,29 @@ func ModelList(ctx *context.APIContext) { log.Error("GetCloudbrainByJobID(%s) failed:%v", task.JobName, err.Error()) return } - models, err := storage.GetObsListObject(task.JobName, "output/", parentDir, versionName) - if err != nil { - log.Info("get TrainJobListModel failed:", err) - ctx.ServerError("GetObsListObject:", err) - return + + var fileInfos []storage.FileInfo + if task.ComputeResource == models.NPUResource { + fileInfos, err = storage.GetObsListObject(task.JobName, "output/", parentDir, versionName) + if err != nil { + log.Info("get TrainJobListModel failed:", err) + ctx.ServerError("GetObsListObject:", err) + return + } + } else if task.ComputeResource == models.GPUResource { + files, err := routerRepo.GetModelDirs(task.JobName, parentDir) + if err != nil { + log.Info("GetModelDirs failed:", err) + ctx.ServerError("GetModelDirs:", err) + return + } + + err = json.Unmarshal([]byte(files), &fileInfos) + if err != nil { + log.Error("json.Unmarshal failed:%v", err.Error(), ctx.Data["msgID"]) + ctx.ServerError("json.Unmarshal failed:", err) + return + } } ctx.JSON(http.StatusOK, map[string]interface{}{ @@ -385,7 +444,7 @@ func ModelList(ctx *context.APIContext) { "VersionName": versionName, "StatusOK": 0, "Path": dirArray, - "Dirs": models, + "Dirs": fileInfos, "task": task, "PageIsCloudBrain": true, }) diff --git a/routers/api/v1/repo/repo_dashbord.go b/routers/api/v1/repo/repo_dashbord.go index b19c93371..b3a01cff1 100644 --- a/routers/api/v1/repo/repo_dashbord.go +++ b/routers/api/v1/repo/repo_dashbord.go @@ -887,19 +887,12 @@ func getTimePeroid(ctx *context.Context, recordBeginTime time.Time) (time.Time, if queryType == "all" { beginTime = recordBeginTimeTemp endTime = now - } else if queryType == "today" { + } else if queryType == "yesterday" { endTime = now beginTime = time.Date(endTime.Year(), endTime.Month(), endTime.Day(), 0, 0, 0, 0, now.Location()) - } else if queryType == "yesterday" { - endTime = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) - beginTime = endTime.AddDate(0, 0, -1) - } else if queryType == "last_7day" { - beginTime = now.AddDate(0, 0, -7) - beginTime = time.Date(beginTime.Year(), beginTime.Month(), beginTime.Day(), 0, 0, 0, 0, now.Location()) - endTime = now - } else if queryType == "last_30day" { - beginTime = now.AddDate(0, 0, -30) + } else if queryType == "current_week" { + beginTime = now.AddDate(0, 0, -int(time.Now().Weekday())+2) //begin from monday beginTime = time.Date(beginTime.Year(), beginTime.Month(), beginTime.Day(), 0, 0, 0, 0, now.Location()) endTime = now } else if queryType == "current_month" { diff --git a/routers/authentication/wechat.go b/routers/authentication/wechat.go index 152348125..1337ed3d4 100644 --- a/routers/authentication/wechat.go +++ b/routers/authentication/wechat.go @@ -8,9 +8,11 @@ import ( "code.gitea.io/gitea/modules/redis/redis_client" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/routers/response" "encoding/json" "errors" gouuid "github.com/satori/go.uuid" + "strconv" "time" ) @@ -125,3 +127,23 @@ func createQRCode4Bind(userId int64) (*QRCodeResponse, error) { } return result, nil } + +// GetMaterial +func GetMaterial(ctx *context.Context) { + mType := ctx.Query("type") + offsetStr := ctx.Query("offset") + countStr := ctx.Query("count") + var offset, count int + if offsetStr == "" { + offset = 0 + } else { + offset, _ = strconv.Atoi(offsetStr) + } + if countStr == "" { + count = 20 + } else { + count, _ = strconv.Atoi(countStr) + } + r := wechat.GetWechatMaterial(mType, offset, count) + ctx.JSON(200, response.SuccessWithData(r)) +} diff --git a/routers/authentication/wechat_event.go b/routers/authentication/wechat_event.go index 9b1cebec6..887bfba0d 100644 --- a/routers/authentication/wechat_event.go +++ b/routers/authentication/wechat_event.go @@ -14,24 +14,48 @@ import ( // https://developers.weixin.qq.com/doc/offiaccount/Message_Management/Passive_user_reply_message.html func AcceptWechatEvent(ctx *context.Context) { b, _ := ioutil.ReadAll(ctx.Req.Request.Body) - we := wechat.WechatEvent{} + we := wechat.WechatMsg{} xml.Unmarshal(b, &we) - + switch we.MsgType { + case wechat.WECHAT_MSG_TYPE_EVENT: + HandleEventMsg(ctx, we) + case wechat.WECHAT_MSG_TYPE_TEXT: + HandleTextMsg(ctx, we) + } log.Info("accept wechat event= %+v", we) - var replyStr string - switch we.Event { - case wechat.WECHAT_EVENT_SUBSCRIBE, wechat.WECHAT_EVENT_SCAN: - replyStr = wechat.HandleSubscribeEvent(we) - break + +} + +// ValidEventSource +func ValidEventSource(ctx *context.Context) { + echostr := ctx.Query("echostr") + ctx.Write([]byte(echostr)) + return +} + +func HandleEventMsg(ctx *context.Context, msg wechat.WechatMsg) { + switch msg.Event { + case wechat.WECHAT_EVENT_SCAN: + HandleEventScan(ctx, msg) + case wechat.WECHAT_EVENT_SUBSCRIBE: + if msg.EventKey != "" { + HandleEventScan(ctx, msg) + } else { + HandleEventSubscribe(ctx, msg) + } + } +} +func HandleEventScan(ctx *context.Context, msg wechat.WechatMsg) { + replyStr := wechat.HandleScanEvent(msg) if replyStr == "" { log.Info("reply str is empty") return } - reply := &wechat.EventReply{ - ToUserName: we.FromUserName, - FromUserName: we.ToUserName, + reply := &wechat.MsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, CreateTime: time.Now().Unix(), MsgType: wechat.WECHAT_MSG_TYPE_TEXT, Content: replyStr, @@ -39,9 +63,99 @@ func AcceptWechatEvent(ctx *context.Context) { ctx.XML(200, reply) } -// ValidEventSource -func ValidEventSource(ctx *context.Context) { - echostr := ctx.Query("echostr") - ctx.Write([]byte(echostr)) - return +func HandleEventSubscribe(ctx *context.Context, msg wechat.WechatMsg) { + r := wechat.HandleSubscribeEvent(msg) + if r == nil { + return + } + reply := buildReplyContent(msg, r) + ctx.XML(200, reply) +} + +func HandleTextMsg(ctx *context.Context, msg wechat.WechatMsg) { + r := wechat.GetAutomaticReply(msg.Content) + if r == nil { + log.Info("TextMsg reply is empty") + return + } + reply := buildReplyContent(msg, r) + ctx.XML(200, reply) +} + +func buildReplyContent(msg wechat.WechatMsg, r *wechat.WechatReplyContent) interface{} { + reply := &wechat.MsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, + CreateTime: time.Now().Unix(), + MsgType: r.ReplyType, + } + switch r.ReplyType { + case wechat.ReplyTypeText: + return &wechat.TextMsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, + CreateTime: time.Now().Unix(), + MsgType: r.ReplyType, + Content: r.Reply.Content, + } + + case wechat.ReplyTypeImage: + return &wechat.ImageMsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, + CreateTime: time.Now().Unix(), + MsgType: r.ReplyType, + Image: wechat.ImageContent{ + MediaId: r.Reply.MediaId, + }, + } + case wechat.ReplyTypeVoice: + return &wechat.VoiceMsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, + CreateTime: time.Now().Unix(), + MsgType: r.ReplyType, + Voice: wechat.VoiceContent{ + MediaId: r.Reply.MediaId, + }, + } + case wechat.ReplyTypeVideo: + return &wechat.VideoMsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, + CreateTime: time.Now().Unix(), + MsgType: r.ReplyType, + Video: wechat.VideoContent{ + MediaId: r.Reply.MediaId, + Title: r.Reply.Title, + Description: r.Reply.Description, + }, + } + case wechat.ReplyTypeMusic: + return &wechat.MusicMsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, + CreateTime: time.Now().Unix(), + MsgType: r.ReplyType, + Music: wechat.MusicContent{ + Title: r.Reply.Title, + Description: r.Reply.Description, + MusicUrl: r.Reply.MusicUrl, + HQMusicUrl: r.Reply.HQMusicUrl, + ThumbMediaId: r.Reply.ThumbMediaId, + }, + } + case wechat.ReplyTypeNews: + return &wechat.NewsMsgReply{ + ToUserName: msg.FromUserName, + FromUserName: msg.ToUserName, + CreateTime: time.Now().Unix(), + MsgType: r.ReplyType, + ArticleCount: len(r.Reply.Articles), + Articles: wechat.ArticleItem{ + Item: r.Reply.Articles}, + } + + } + return reply } diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index a075f3b70..eb284e2f6 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -3,6 +3,7 @@ package repo import ( "bufio" "code.gitea.io/gitea/services/reward/point/account" + "code.gitea.io/gitea/modules/grampus" "encoding/json" "errors" "fmt" @@ -187,7 +188,7 @@ func cloudBrainNewDataPrepare(ctx *context.Context) error { ctx.Data["brainscore_path"] = cloudbrain.BrainScoreMountPath ctx.Data["is_brainscore_enabled"] = setting.IsBrainScoreEnabled - ctx.Data["cloudbraintype"] = models.TypeCloudBrainOne + ctx.Data["datasetType"] = models.TypeCloudBrainOne ctx.Data["benchmarkMode"] = ctx.Query("benchmarkMode") @@ -1049,6 +1050,7 @@ func GetPublicImages(ctx *context.Context) { IncludeOfficialOnly: ctx.QueryBool("recommend"), SearchOrderBy: "type desc, num_stars desc,id desc", Status: models.IMAGE_STATUS_SUCCESS, + CloudbrainType: ctx.QueryInt("cloudbrainType"), } getImages(ctx, &opts) @@ -1484,7 +1486,34 @@ func SyncCloudbrainStatus() { } else { log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType) } + } else if task.Type == models.TypeC2Net { + result, err := grampus.GetJob(task.JobID) + if err != nil { + log.Error("GetTrainJob(%s) failed:%v", task.JobName, err) + continue + } + + if result != nil { + if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { + task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] + } + task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + task.Duration = result.JobInfo.RunSec + task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) + if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { + task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) + } + if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { + task.EndTime = task.StartTime.Add(task.Duration) + } + task.CorrectCreateUnix() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + continue + } + } } else { log.Error("task.Type(%s) is error:%d", task.JobName, task.Type) } diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go new file mode 100755 index 000000000..52f803d1d --- /dev/null +++ b/routers/repo/grampus.go @@ -0,0 +1,786 @@ +package repo + +import ( + "code.gitea.io/gitea/modules/auth" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/grampus" + "code.gitea.io/gitea/modules/modelarts" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" + "encoding/json" + "errors" + "github.com/unknwon/com" + "io/ioutil" + "net/http" + "os" + "path" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/base" + "code.gitea.io/gitea/modules/cloudbrain" + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +const ( + tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show" + + //GPU + tplGrampusTrainJobGPUNew base.TplName = "repo/grampus/trainjob/gpu/new" + + //NPU + tplGrampusTrainJobNPUNew base.TplName = "repo/grampus/trainjob/npu/new" +) + +func GrampusTrainJobGPUNew(ctx *context.Context) { + ctx.Data["datasetType"] = models.TypeCloudBrainOne + err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + if err != nil { + ctx.ServerError("get new train-job info failed", err) + return + } + ctx.HTML(http.StatusOK, tplGrampusTrainJobGPUNew) +} + +func GrampusTrainJobNPUNew(ctx *context.Context) { + ctx.Data["datasetType"] = models.TypeCloudBrainTwo + err := grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + if err != nil { + ctx.ServerError("get new train-job info failed", err) + return + } + ctx.HTML(200, tplGrampusTrainJobNPUNew) +} + +func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) error { + ctx.Data["PageIsCloudBrain"] = true + + t := time.Now() + var displayJobName = cutString(ctx.User.Name, 5) + t.Format("2006010215") + strconv.Itoa(int(t.Unix()))[5:] + ctx.Data["display_job_name"] = displayJobName + + //get valid images + images, err := grampus.GetImages(processType) + if err != nil { + log.Error("GetImages failed:", err.Error()) + } else { + ctx.Data["images"] = images.Infos + } + + grampus.InitSpecialPool() + + ctx.Data["GPUEnabled"] = true + ctx.Data["NPUEnabled"] = true + + if grampus.SpecialPools != nil { + for _, pool := range grampus.SpecialPools.Pools { + if pool.IsExclusive { + org, _ := models.GetOrgByName(pool.Org) + if org != nil { + isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) + if !isOrgMember { + ctx.Data[pool.Type+"Enabled"] = false + } + } + } + } + } + + //get valid resource specs + specs, err := grampus.GetResourceSpecs(processType) + if err != nil { + log.Error("GetResourceSpecs failed:", err.Error()) + } else { + ctx.Data["flavor_infos"] = specs.Infos + } + + //get branches + branches, _, err := ctx.Repo.GitRepo.GetBranches(0, 0) + if err != nil { + log.Error("GetBranches error:", err.Error()) + } else { + ctx.Data["branches"] = branches + } + + ctx.Data["branchName"] = ctx.Repo.BranchName + + if processType == grampus.ProcessorTypeGPU { + ctx.Data["datasetType"] = models.TypeCloudBrainOne + } else if processType == grampus.ProcessorTypeNPU { + ctx.Data["datasetType"] = models.TypeCloudBrainTwo + } + + return nil +} + +func grampusParamCheckCreateTrainJob(form auth.CreateGrampusTrainJobForm) error { + if !strings.HasSuffix(strings.TrimSpace(form.BootFile), ".py") { + log.Error("the boot file(%s) must be a python file", form.BootFile) + return errors.New("启动文件必须是python文件") + } + + if form.BranchName == "" { + log.Error("the branch must not be null!", form.BranchName) + return errors.New("代码分支不能为空!") + } + + return nil +} + +func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { + displayJobName := form.DisplayJobName + jobName := util.ConvertDisplayJobNameToJobName(displayJobName) + uuid := form.Attachment + description := form.Description + bootFile := strings.TrimSpace(form.BootFile) + params := form.Params + repo := ctx.Repo.Repository + codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" + codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" + dataMinioPath := setting.Attachment.Minio.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + branchName := form.BranchName + flavorName := form.FlavorName + image := strings.TrimSpace(form.Image) + + if !jobNamePattern.MatchString(displayJobName) { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form) + return + } + + errStr := checkSpecialPool(ctx, "GPU") + if errStr != "" { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form) + return + } + + //check count limit + count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.GPUResource) + if err != nil { + log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("system error", tplGrampusTrainJobGPUNew, &form) + return + } else { + if count >= 1 { + log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobGPUNew, &form) + return + } + } + + //check param + if err := grampusParamCheckCreateTrainJob(form); err != nil { + log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(err.Error(), tplGrampusTrainJobGPUNew, &form) + return + } + + //check whether the task name in the project is duplicated + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName) + if err == nil { + if len(tasks) != 0 { + log.Error("the job name did already exist", ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobGPUNew, &form) + return + } + } else { + if !models.IsErrJobNotExist(err) { + log.Error("system error, %v", err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("system error", tplGrampusTrainJobGPUNew, &form) + return + } + } + + //check dataset + attachment, err := models.GetAttachmentByUUID(uuid) + if err != nil { + log.Error("GetAttachmentByUUID failed:", err.Error(), ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("dataset is not exist", tplGrampusTrainJobGPUNew, &form) + return + } + + //prepare code and out path + _, err = ioutil.ReadDir(codeLocalPath) + if err == nil { + os.RemoveAll(codeLocalPath) + } + + if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { + log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form) + return + } + + //todo: upload code (send to file_server todo this work?) + //upload code + if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { + log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form) + return + } + + modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/" + if err := mkModelPath(modelPath); err != nil { + log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form) + return + } + + //init model readme + if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil { + log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form) + return + } + + //prepare command + command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", dataMinioPath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", attachment.Name) + if err != nil { + log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobGPUNew, &form) + return + } + + commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) + + req := &grampus.GenerateTrainJobReq{ + JobName: jobName, + DisplayJobName: displayJobName, + ComputeResource: models.GPUResource, + Command: command, + ResourceSpecId: form.FlavorID, + ImageUrl: image, + Description: description, + BootFile: bootFile, + Uuid: uuid, + CommitID: commitID, + BranchName: branchName, + Params: form.Params, + FlavorName: flavorName, + EngineName: image, + DatasetName: attachment.Name, + IsLatestVersion: modelarts.IsLatestVersion, + VersionCount: modelarts.VersionCount, + WorkServerNumber: 1, + } + + err = grampus.GenerateTrainJob(ctx, req) + if err != nil { + log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(err.Error(), tplGrampusTrainJobGPUNew, &form) + return + } + ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") +} + +func checkSpecialPool(ctx *context.Context, resourceType string) string { + grampus.InitSpecialPool() + if grampus.SpecialPools != nil { + for _, pool := range grampus.SpecialPools.Pools { + + if pool.IsExclusive && pool.Type == resourceType { + + org, _ := models.GetOrgByName(pool.Org) + if org != nil { + isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) + if !isOrgMember { + return ctx.Tr("repo.grampus.no_operate_right") + } + } + } + + } + + } + return "" +} + +func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { + displayJobName := form.DisplayJobName + jobName := util.ConvertDisplayJobNameToJobName(displayJobName) + uuid := form.Attachment + description := form.Description + bootFile := strings.TrimSpace(form.BootFile) + params := form.Params + repo := ctx.Repo.Repository + codeLocalPath := setting.JobPath + jobName + modelarts.CodePath + codeObsPath := grampus.JobPath + jobName + modelarts.CodePath + dataObsPath := setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + "/" + branchName := form.BranchName + isLatestVersion := modelarts.IsLatestVersion + flavorName := form.FlavorName + versionCount := modelarts.VersionCount + engineName := form.EngineName + + if !jobNamePattern.MatchString(displayJobName) { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form) + return + } + + errStr := checkSpecialPool(ctx, "NPU") + if errStr != "" { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form) + return + } + + //check count limit + count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.NPUResource) + if err != nil { + log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form) + return + } else { + if count >= 1 { + log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("you have already a running or waiting task, can not create more", tplGrampusTrainJobNPUNew, &form) + return + } + } + + //check param + if err := grampusParamCheckCreateTrainJob(form); err != nil { + log.Error("paramCheckCreateTrainJob failed:(%v)", err) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form) + return + } + + //check whether the task name in the project is duplicated + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), displayJobName) + if err == nil { + if len(tasks) != 0 { + log.Error("the job name did already exist", ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("the job name did already exist", tplGrampusTrainJobNPUNew, &form) + return + } + } else { + if !models.IsErrJobNotExist(err) { + log.Error("system error, %v", err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("system error", tplGrampusTrainJobNPUNew, &form) + return + } + } + + //check dataset + attachment, err := models.GetAttachmentByUUID(uuid) + if err != nil { + log.Error("GetAttachmentByUUID failed:", err.Error(), ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("dataset is not exist", tplGrampusTrainJobNPUNew, &form) + return + } + + //prepare code and out path + _, err = ioutil.ReadDir(codeLocalPath) + if err == nil { + os.RemoveAll(codeLocalPath) + } + + if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { + log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("Create task failed, server timed out", tplGrampusTrainJobNPUNew, &form) + return + } + + //todo: upload code (send to file_server todo this work?) + if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil { + log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("Failed to obsMkdir_output", tplGrampusTrainJobNPUNew, &form) + return + } + + if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { + log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("Failed to uploadCodeToObs", tplGrampusTrainJobNPUNew, &form) + return + } + + //prepare command + command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", dataObsPath+attachment.Name, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, attachment.Name) + if err != nil { + log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr("Create task failed, internal error", tplGrampusTrainJobNPUNew, &form) + return + } + + commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) + + req := &grampus.GenerateTrainJobReq{ + JobName: jobName, + DisplayJobName: displayJobName, + ComputeResource: models.NPUResource, + Command: command, + ResourceSpecId: form.FlavorID, + ImageId: form.ImageID, + DataUrl: dataObsPath, + Description: description, + CodeObsPath: codeObsPath, + BootFileUrl: codeObsPath + bootFile, + BootFile: bootFile, + WorkServerNumber: form.WorkServerNumber, + Uuid: uuid, + CommitID: commitID, + IsLatestVersion: isLatestVersion, + BranchName: branchName, + Params: form.Params, + FlavorName: flavorName, + EngineName: engineName, + VersionCount: versionCount, + TotalVersionCount: modelarts.TotalVersionCount, + DatasetName: attachment.Name, + } + + err = grampus.GenerateTrainJob(ctx, req) + if err != nil { + log.Error("GenerateTrainJob failed:%v", err.Error()) + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(err.Error(), tplGrampusTrainJobNPUNew, &form) + return + } + ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") +} + +func GrampusStopJob(ctx *context.Context) { + var ID = ctx.Params(":jobid") + var resultCode = "0" + var errorMsg = "" + var status = "" + + task := ctx.Cloudbrain + for { + if task.Status == string(models.GrampusStatusStopped) || task.Status == string(models.GrampusStatusFailed) || task.Status == string(models.GrampusStatusSucceeded) { + log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) + resultCode = "-1" + errorMsg = "system error" + break + } + + res, err := grampus.StopJob(task.JobID) + if err != nil { + log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) + resultCode = strconv.Itoa(res.ErrorCode) + errorMsg = res.ErrorMsg + break + } + + task.Status = string(models.GrampusStatusStopped) + if task.EndTime == 0 { + task.EndTime = timeutil.TimeStampNow() + } + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) + resultCode = "-1" + errorMsg = "system error" + break + } + + status = task.Status + break + } + + ctx.JSON(200, map[string]interface{}{ + "result_code": resultCode, + "error_msg": errorMsg, + "status": status, + "id": ID, + "StatusOK": 0, + }) +} + +func GrampusTrainJobDel(ctx *context.Context) { + var listType = ctx.Query("listType") + if err := deleteGrampusJob(ctx); err != nil { + log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"]) + ctx.ServerError(err.Error(), err) + return + } + + var isAdminPage = ctx.Query("isadminpage") + var isHomePage = ctx.Query("ishomepage") + if ctx.IsUserSiteAdmin() && isAdminPage == "true" { + ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains") + } else if isHomePage == "true" { + ctx.Redirect(setting.AppSubURL + "/cloudbrains") + } else { + ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job?listType=" + listType) + } +} + +func deleteGrampusJob(ctx *context.Context) error { + task := ctx.Cloudbrain + + if task.Status != string(models.GrampusStatusStopped) && task.Status != string(models.GrampusStatusSucceeded) && task.Status != string(models.GrampusStatusFailed) { + log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) + return errors.New("the job has not been stopped") + } + + err := models.DeleteJob(task) + if err != nil { + log.Error("DeleteJob failed: %v", err, ctx.Data["msgID"]) + return err + } + + storageType := models.TypeCloudBrainOne + if task.ComputeResource == models.NPUResource { + storageType = models.TypeCloudBrainTwo + } + deleteJobStorage(task.JobName, storageType) + + return nil +} + +func GrampusTrainJobShow(ctx *context.Context) { + ctx.Data["PageIsCloudBrain"] = true + + var task *models.Cloudbrain + task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid")) + if err != nil { + log.Error("GetCloudbrainByJobID failed:" + err.Error()) + ctx.ServerError("system error", err) + return + } + + if task.DeletedAt.IsZero() { //normal record + result, err := grampus.GetJob(task.JobID) + if err != nil { + log.Error("GetJob failed:" + err.Error()) + //ctx.ServerError("GetJob failed", err) + //return + } + + if result != nil { + if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { + task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] + } + task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { + task.Duration = result.JobInfo.RunSec + task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) + + if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { + task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) + } + if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { + task.EndTime = task.StartTime.Add(task.Duration) + } + task.CorrectCreateUnix() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob failed:" + err.Error()) + } + } + } + } + + if len(task.Parameters) > 0 { + var parameters models.Parameters + err := json.Unmarshal([]byte(task.Parameters), ¶meters) + if err != nil { + log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err) + ctx.ServerError("system error", err) + return + } + + if len(parameters.Parameter) > 0 { + paramTemp := "" + for _, Parameter := range parameters.Parameter { + param := Parameter.Label + " = " + Parameter.Value + "; " + paramTemp = paramTemp + param + } + task.Parameters = paramTemp[:len(paramTemp)-2] + } else { + task.Parameters = "" + } + } + + taskList := make([]*models.Cloudbrain, 0) + taskList = append(taskList, task) + ctx.Data["version_list_task"] = taskList + + ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) + ctx.Data["displayJobName"] = task.DisplayJobName + + aiCenterInfo := strings.Split(task.AiCenter, "+") + if len(aiCenterInfo) == 2 { + ctx.Data["ai_center"] = aiCenterInfo[1] + } + + ctx.HTML(http.StatusOK, tplGrampusTrainJobShow) +} + +func GrampusGetLog(ctx *context.Context) { + jobID := ctx.Params(":jobid") + job, err := models.GetCloudbrainByJobID(jobID) + if err != nil { + log.Error("GetCloudbrainByJobID failed: %v", err, ctx.Data["MsgID"]) + ctx.ServerError(err.Error(), err) + return + } + + content, err := grampus.GetTrainJobLog(job.JobID) + if err != nil { + log.Error("GetTrainJobLog failed: %v", err, ctx.Data["MsgID"]) + ctx.ServerError(err.Error(), err) + return + } + + ctx.JSON(http.StatusOK, map[string]interface{}{ + "JobName": job.JobName, + "Content": content, + }) + + return +} + +func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName string) (string, error) { + var command string + + workDir := grampus.NpuWorkDir + if processorType == grampus.ProcessorTypeGPU { + workDir = grampus.GpuWorkDir + } + + command += "pwd;cd " + workDir + grampus.CommandPrepareScript + //download code & dataset + if processorType == grampus.ProcessorTypeNPU { + commandDownload := "./downloader_for_obs " + setting.Bucket + " " + codeRemotePath + " " + grampus.CodeArchiveName + " " + dataRemotePath + " " + datasetName + ";" + command += commandDownload + } else if processorType == grampus.ProcessorTypeGPU { + commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " " + dataRemotePath + " " + datasetName + ";" + command += commandDownload + } + + //check download result + commandCheckRes := "bash -c \"[[ $? -eq 0 ]] && exit 0 || exit -1;\";" + command += commandCheckRes + + //unzip code & dataset + toolUnzip := "unzip -q " + if strings.HasSuffix(datasetName, ".tar.gz") { + toolUnzip = "tar -zxvf " + } + commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + toolUnzip + datasetName + ";" + command += commandUnzip + + //check unzip result + commandCheckRes = "bash -c \"[[ $? -eq 0 ]] && exit 0 || exit -1;\";" + command += commandCheckRes + + command += "echo \"unzip finished;start to exec code;\";" + + //exec code + var parameters models.Parameters + var paramCode string + param := make([]models.Parameter, 0) + if len(paramSrc) != 0 { + err := json.Unmarshal([]byte(paramSrc), ¶meters) + if err != nil { + log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err) + return command, err + } + + for _, parameter := range parameters.Parameter { + param = append(param, models.Parameter{ + Label: parameter.Label, + Value: parameter.Value, + }) + paramCode += " --" + parameter.Label + "=" + parameter.Value + } + } + + commandCode := "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";" + command += commandCode + + //get exec result + commandGetRes := "result=$?;" + command += commandGetRes + + //upload models + if processorType == grampus.ProcessorTypeNPU { + commandUpload := "cd " + workDir + "script_for_grampus/;./uploader_for_obs " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;" + command += commandUpload + } else if processorType == grampus.ProcessorTypeGPU { + commandUpload := "cd " + workDir + "script_for_grampus/;./uploader_for_minio " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;" + command += commandUpload + } + + //check exec result + commandCheckRes = "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1;\"" + command += commandCheckRes + + return command, nil +} + +func downloadZipCode(ctx *context.Context, codePath, branchName string) error { + archiveType := git.ZIP + archivePath := codePath + + if !com.IsDir(archivePath) { + if err := os.MkdirAll(archivePath, os.ModePerm); err != nil { + log.Error("MkdirAll failed:" + err.Error()) + return err + } + } + + // Get corresponding commit. + var ( + commit *git.Commit + err error + ) + + gitRepo := ctx.Repo.GitRepo + if err != nil { + log.Error("OpenRepository failed:" + err.Error()) + return err + } + + if gitRepo.IsBranchExist(branchName) { + commit, err = gitRepo.GetBranchCommit(branchName) + if err != nil { + log.Error("GetBranchCommit failed:" + err.Error()) + return err + } + } + + archivePath = path.Join(archivePath, grampus.CodeArchiveName) + if !com.IsFile(archivePath) { + if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{ + Format: archiveType, + Prefix: setting.Repository.PrefixArchiveFiles, + }); err != nil { + log.Error("CreateArchive failed:" + err.Error()) + return err + } + } + + return nil +} diff --git a/routers/repo/modelarts.go b/routers/repo/modelarts.go index 0b33a6dd7..e679e8a3e 100755 --- a/routers/repo/modelarts.go +++ b/routers/repo/modelarts.go @@ -147,7 +147,7 @@ func notebookNewDataPrepare(ctx *context.Context) error { } ctx.Data["flavors"] = modelarts.FlavorInfos.FlavorInfo - ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo + ctx.Data["datasetType"] = models.TypeCloudBrainTwo return nil } @@ -573,24 +573,11 @@ func TrainJobIndex(ctx *context.Context) { } listType := ctx.Query("listType") - if len(listType) == 0 { - listType = models.AllResource - } ctx.Data["ListType"] = listType - typeCloudBrain := models.TypeCloudBrainAll - if listType == models.GPUResource { - typeCloudBrain = models.TypeCloudBrainOne - } else if listType == models.NPUResource { - typeCloudBrain = models.TypeCloudBrainTwo - } else if listType == models.AllResource { - typeCloudBrain = models.TypeCloudBrainAll + if listType == models.AllResource { + listType = "" } - //else { - // log.Error("listType(%s) error", listType) - // ctx.ServerError("listType error", errors.New("listType error")) - // return - //} var jobTypes []string jobTypes = append(jobTypes, string(models.JobTypeTrain)) @@ -600,10 +587,11 @@ func TrainJobIndex(ctx *context.Context) { PageSize: setting.UI.IssuePagingNum, }, RepoID: repo.ID, - Type: typeCloudBrain, JobTypeNot: false, JobTypes: jobTypes, IsLatestVersion: modelarts.IsLatestVersion, + ComputeResource: listType, + Type: models.TypeCloudBrainAll, }) if err != nil { ctx.ServerError("Cloudbrain", err) @@ -613,11 +601,6 @@ func TrainJobIndex(ctx *context.Context) { for i, task := range tasks { tasks[i].CanDel = cloudbrain.CanDeleteJob(ctx, &task.Cloudbrain) tasks[i].CanModify = cloudbrain.CanModifyJob(ctx, &task.Cloudbrain) - if task.Cloudbrain.Type == models.TypeCloudBrainOne { - tasks[i].ComputeResource = models.GPUResource - } else if task.Cloudbrain.Type == models.TypeCloudBrainTwo { - tasks[i].ComputeResource = models.NPUResource - } } pager := context.NewPagination(int(count), setting.UI.IssuePagingNum, page, 5) @@ -704,7 +687,7 @@ func trainJobNewDataPrepare(ctx *context.Context) error { return err } ctx.Data["config_list"] = configList.ParaConfigs - ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo + ctx.Data["datasetType"] = models.TypeCloudBrainTwo return nil } @@ -778,7 +761,7 @@ func trainJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModelArts ctx.Data["bootFile"] = form.BootFile ctx.Data["uuid"] = form.Attachment ctx.Data["branch_name"] = form.BranchName - ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo + ctx.Data["datasetType"] = models.TypeCloudBrainTwo return nil } @@ -872,7 +855,7 @@ func trainJobNewVersionDataPrepare(ctx *context.Context) error { ctx.Data["uuid"] = task.Uuid ctx.Data["flavor_code"] = task.FlavorCode ctx.Data["engine_id"] = task.EngineID - ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo + ctx.Data["datasetType"] = models.TypeCloudBrainTwo configList, err := getConfigList(modelarts.PerPage, 1, modelarts.SortByCreateTime, "desc", "", modelarts.ConfigTypeCustom) if err != nil { @@ -969,7 +952,7 @@ func versionErrorDataPrepare(ctx *context.Context, form auth.CreateModelArtsTrai return err } ctx.Data["config_list"] = configList.ParaConfigs - ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo + ctx.Data["datasetType"] = models.TypeCloudBrainTwo return nil } @@ -2136,7 +2119,7 @@ func inferenceJobNewDataPrepare(ctx *context.Context) error { New: MODEL_LATEST, }) ctx.Data["MODEL_COUNT"] = model_count - ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo + ctx.Data["datasetType"] = models.TypeCloudBrainTwo return nil } @@ -2202,7 +2185,7 @@ func inferenceJobErrorNewDataPrepare(ctx *context.Context, form auth.CreateModel ctx.Data["model_version"] = form.ModelVersion ctx.Data["ckpt_name"] = form.CkptName ctx.Data["train_url"] = form.TrainUrl - ctx.Data["cloudbraintype"] = models.TypeCloudBrainTwo + ctx.Data["datasetType"] = models.TypeCloudBrainTwo return nil } @@ -2272,24 +2255,35 @@ func ModelDownload(ctx *context.Context) { err error ) - var jobID = ctx.Params(":jobid") + jobID := ctx.Params(":jobid") versionName := ctx.Query("version_name") parentDir := ctx.Query("parent_dir") fileName := ctx.Query("file_name") task, err := models.GetCloudbrainByJobIDAndVersionName(jobID, versionName) if err != nil { - log.Error("GetCloudbrainByJobID(%s) failed:%v", task.JobName, err.Error()) + log.Error("GetCloudbrainByJobIDAndVersionName(%s) failed:%v", task.JobName, err.Error()) return } - path := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, task.JobName, setting.OutPutPath, versionName, parentDir, fileName), "/") - - url, err := storage.GetObsCreateSignedUrlByBucketAndKey(setting.Bucket, path) - if err != nil { - log.Error("GetObsCreateSignedUrl failed: %v", err.Error(), ctx.Data["msgID"]) - ctx.ServerError("GetObsCreateSignedUrl", err) - return + var url string + if task.ComputeResource == models.NPUResource { + path := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, task.JobName, setting.OutPutPath, versionName, parentDir, fileName), "/") + url, err = storage.GetObsCreateSignedUrlByBucketAndKey(setting.Bucket, path) + if err != nil { + log.Error("GetObsCreateSignedUrl failed: %v", err.Error(), ctx.Data["msgID"]) + ctx.ServerError("GetObsCreateSignedUrl", err) + return + } + } else if task.ComputeResource == models.GPUResource { + filePath := setting.CBCodePathPrefix + task.JobName + cloudbrain.ModelMountPath + "/" + parentDir + url, err = storage.Attachments.PresignedGetURL(filePath, fileName) + if err != nil { + log.Error("PresignedGetURL failed: %v", err.Error(), ctx.Data["msgID"]) + ctx.ServerError("PresignedGetURL", err) + return + } } + ctx.Resp.Header().Set("Cache-Control", "max-age=0") http.Redirect(ctx.Resp, ctx.Req.Request, url, http.StatusMovedPermanently) } diff --git a/routers/routes/routes.go b/routers/routes/routes.go index 6cf87b527..00a820fc9 100755 --- a/routers/routes/routes.go +++ b/routers/routes/routes.go @@ -1103,6 +1103,24 @@ func RegisterRoutes(m *macaron.Macaron) { m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateCloudBrainForm{}), repo.CloudBrainCreate) }) }, context.RepoRef()) + m.Group("/grampus", func() { + m.Group("/train-job", func() { + m.Group("/:jobid", func() { + m.Get("", reqRepoCloudBrainReader, repo.GrampusTrainJobShow) + m.Post("/stop", cloudbrain.AdminOrOwnerOrJobCreaterRight, repo.GrampusStopJob) + m.Post("/del", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.GrampusTrainJobDel) + m.Get("/model_download", cloudbrain.AdminOrJobCreaterRightForTrain, repo.ModelDownload) + }) + m.Group("/gpu", func() { + m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, repo.GrampusTrainJobGPUNew) + m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusTrainJobForm{}), repo.GrampusTrainJobGpuCreate) + }) + m.Group("/npu", func() { + m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, repo.GrampusTrainJobNPUNew) + m.Post("/create", reqWechatBind, reqRepoCloudBrainWriter, bindIgnErr(auth.CreateGrampusTrainJobForm{}), repo.GrampusTrainJobNpuCreate) + }) + }) + }, context.RepoRef()) m.Group("/modelmanage", func() { m.Post("/create_model", reqRepoModelManageWriter, repo.SaveModel) m.Post("/create_new_model", repo.SaveNewNameModel) diff --git a/routers/user/home.go b/routers/user/home.go index 53aff19b8..ab64e707f 100755 --- a/routers/user/home.go +++ b/routers/user/home.go @@ -769,12 +769,6 @@ func Cloudbrains(ctx *context.Context) { if page <= 0 { page = 1 } - debugType := models.TypeCloudBrainAll - if listType == models.GPUResource { - debugType = models.TypeCloudBrainOne - } else if listType == models.NPUResource { - debugType = models.TypeCloudBrainTwo - } var jobTypes []string jobTypeNot := false @@ -821,7 +815,6 @@ func Cloudbrains(ctx *context.Context) { }, Keyword: keyword, UserID: ctxUser.ID, - Type: debugType, JobTypeNot: jobTypeNot, JobStatusNot: jobStatusNot, JobStatus: jobStatuses, @@ -829,6 +822,8 @@ func Cloudbrains(ctx *context.Context) { NeedRepoInfo: true, IsLatestVersion: modelarts.IsLatestVersion, RepoIDList: repoIDList, + ComputeResource: listType, + Type: models.TypeCloudBrainAll, }) if err != nil { ctx.ServerError("Get job failed:", err) diff --git a/services/socketwrap/clientManager.go b/services/socketwrap/clientManager.go index 6ffa96933..98bcc8a85 100755 --- a/services/socketwrap/clientManager.go +++ b/services/socketwrap/clientManager.go @@ -10,7 +10,7 @@ import ( "github.com/elliotchance/orderedmap" ) -var opTypes = []int{1, 2, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 17, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31} +var opTypes = []int{1, 2, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 17, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33} type ClientsManager struct { Clients *orderedmap.OrderedMap diff --git a/templates/admin/cloudbrain/list.tmpl b/templates/admin/cloudbrain/list.tmpl index 4c63b167a..347b5658d 100755 --- a/templates/admin/cloudbrain/list.tmpl +++ b/templates/admin/cloudbrain/list.tmpl @@ -102,7 +102,7 @@ {{else if eq .JobType "TRAIN"}} {{.DisplayJobName}} @@ -204,7 +204,7 @@ {{else}} {{$.i18n.Tr "repo.stop"}} @@ -212,7 +212,7 @@
{{$.CsrfTokenHtml}}
-
+ data-dataset-type="{{.datasetType}}">
+
- {{if eq .cloudbraintype 0}} + {{if eq .datasetType 0}} {{else}} @@ -18,7 +18,7 @@
- +
diff --git a/templates/custom/select_dataset_train.tmpl b/templates/custom/select_dataset_train.tmpl old mode 100644 new mode 100755 index f1d2abaf3..e96bfa3a8 --- a/templates/custom/select_dataset_train.tmpl +++ b/templates/custom/select_dataset_train.tmpl @@ -1,5 +1,5 @@ + data-dataset-type="{{.datasetType}}">
{{if or (.benchmarkMode) (.newInference)}}
- +
diff --git a/templates/repo/cloudbrain/trainjob/new.tmpl b/templates/repo/cloudbrain/trainjob/new.tmpl index e8b92162b..976865262 100755 --- a/templates/repo/cloudbrain/trainjob/new.tmpl +++ b/templates/repo/cloudbrain/trainjob/new.tmpl @@ -89,6 +89,19 @@

{{.i18n.Tr "repo.modelarts.train_job.basic_info"}}:

+ + +
+