diff --git a/models/attachment.go b/models/attachment.go index 55c6dbad3..5ebb6f0f3 100755 --- a/models/attachment.go +++ b/models/attachment.go @@ -33,27 +33,27 @@ const ( // Attachment represent a attachment of issue/comment/release. type Attachment struct { - ID int64 `xorm:"pk autoincr"` - UUID string `xorm:"uuid UNIQUE"` - IssueID int64 `xorm:"INDEX"` - DatasetID int64 `xorm:"INDEX DEFAULT 0"` - ReleaseID int64 `xorm:"INDEX"` - UploaderID int64 `xorm:"INDEX DEFAULT 0"` // Notice: will be zero before this column added - CommentID int64 - Name string - Description string `xorm:"TEXT"` - DownloadCount int64 `xorm:"DEFAULT 0"` - UseNumber int64 `xorm:"DEFAULT 0"` - Size int64 `xorm:"DEFAULT 0"` - IsPrivate bool `xorm:"DEFAULT false"` - DecompressState int32 `xorm:"DEFAULT 0"` - Type int `xorm:"DEFAULT 0"` - CreatedUnix timeutil.TimeStamp `xorm:"created"` - - FileChunk *FileChunk `xorm:"-"` - CanDel bool `xorm:"-"` - Uploader *User `xorm:"-"` - Md5 string `xorm:"-"` + ID int64 `xorm:"pk autoincr" json:"id"` + UUID string `xorm:"uuid UNIQUE" json:"uuid"` + IssueID int64 `xorm:"INDEX" json:"issueId"` + DatasetID int64 `xorm:"INDEX DEFAULT 0" json:"datasetId"` + ReleaseID int64 `xorm:"INDEX" json:"releaseId"` + UploaderID int64 `xorm:"INDEX DEFAULT 0" json:"uploaderId"` // Notice: will be zero before this column added + CommentID int64 `json:"commentId"` + Name string `json:"name"` + Description string `xorm:"TEXT" json:"description"` + DownloadCount int64 `xorm:"DEFAULT 0" json:"downloadCount"` + UseNumber int64 `xorm:"DEFAULT 0" json:"useNumber"` + Size int64 `xorm:"DEFAULT 0" json:"size"` + IsPrivate bool `xorm:"DEFAULT false" json:"isPrivate"` + DecompressState int32 `xorm:"DEFAULT 0" json:"decompressState"` + Type int `xorm:"DEFAULT 0" json:"type"` + CreatedUnix timeutil.TimeStamp `xorm:"created" json:"createdUnix"` + + FileChunk *FileChunk `xorm:"-" json:"fileChunk"` + CanDel bool `xorm:"-" json:"canDel"` + Uploader *User `xorm:"-" json:"uploader"` + Md5 string `xorm:"-" json:"md5"` } type AttachmentUsername struct { @@ -61,30 +61,6 @@ type AttachmentUsername struct { Name string } -type AttachmentInfo struct { - Attachment `xorm:"extends"` - Repo *Repository `xorm:"extends"` - RelAvatarLink string `xorm:"extends"` - UserName string `xorm:"extends"` - Recommend bool `xorm:"-"` -} - -type AttachmentsOptions struct { - ListOptions - DatasetIDs []int64 - DecompressState int - Type int - UploaderID int64 - NeedDatasetIDs bool - NeedIsPrivate bool - IsPrivate bool - JustNeedZipFile bool - NeedRepoInfo bool - Keyword string - RecommendOnly bool - UserId int64 -} - func (a *Attachment) AfterUpdate() { if a.DatasetID > 0 { datasetIsPublicCount, err := x.Where("dataset_id = ? AND is_private = ?", a.DatasetID, false).Count(new(Attachment)) @@ -493,19 +469,6 @@ func getPrivateAttachments(e Engine, userID int64) ([]*AttachmentUsername, error return attachments, nil } -func getAllUserAttachments(e Engine, userID int64) ([]*AttachmentUsername, error) { - attachments := make([]*AttachmentUsername, 0, 10) - if err := e.Table("attachment").Join("LEFT", "`user`", "attachment.uploader_id "+ - "= `user`.id").Where("decompress_state= ? and attachment.type = ? and (uploader_id= ? or is_private = ?)", DecompressStateDone, TypeCloudBrainOne, userID, false).Find(&attachments); err != nil { - return nil, err - } - return attachments, nil -} - -func GetAllUserAttachments(userID int64) ([]*AttachmentUsername, error) { - return getAllUserAttachments(x, userID) -} - func getModelArtsUserAttachments(e Engine, userID int64) ([]*AttachmentUsername, error) { attachments := make([]*AttachmentUsername, 0, 10) if err := e.Table("attachment").Join("LEFT", "`user`", "attachment.uploader_id "+ @@ -601,107 +564,6 @@ func GetAllAttachmentSize() (int64, error) { return x.SumInt(&Attachment{}, "size") } -func Attachments(opts *AttachmentsOptions) ([]*AttachmentInfo, int64, error) { - sess := x.NewSession() - defer sess.Close() - - var cond = builder.NewCond() - if opts.NeedDatasetIDs { - cond = cond.And( - builder.In("attachment.dataset_id", opts.DatasetIDs), - ) - } - - if opts.UploaderID > 0 { - cond = cond.And( - builder.Eq{"attachment.uploader_id": opts.UploaderID}, - ) - } - - if (opts.Type) >= 0 { - cond = cond.And( - builder.Eq{"attachment.type": opts.Type}, - ) - } - - if opts.NeedIsPrivate { - cond = cond.And( - builder.Eq{"attachment.is_private": opts.IsPrivate}, - ) - } - if opts.RecommendOnly { - cond = cond.And(builder.In("attachment.id", builder.Select("attachment.id"). - From("attachment"). - Join("INNER", "dataset", "attachment.dataset_id = dataset.id and dataset.recommend=true"))) - } - - if opts.JustNeedZipFile { - var DecompressState []int32 - DecompressState = append(DecompressState, DecompressStateDone, DecompressStateIng, DecompressStateFailed) - cond = cond.And( - builder.In("attachment.decompress_state", DecompressState), - ) - } - - var count int64 - var err error - if len(opts.Keyword) == 0 { - count, err = sess.Where(cond).Count(new(Attachment)) - } else { - lowerKeyWord := strings.ToLower(opts.Keyword) - - cond = cond.And(builder.Or(builder.Like{"LOWER(attachment.name)", lowerKeyWord}, builder.Like{"LOWER(attachment.description)", lowerKeyWord})) - count, err = sess.Table(&Attachment{}).Where(cond).Count(new(AttachmentInfo)) - - } - - if err != nil { - return nil, 0, fmt.Errorf("Count: %v", err) - } - - if opts.Page >= 0 && opts.PageSize > 0 { - var start int - if opts.Page == 0 { - start = 0 - } else { - start = (opts.Page - 1) * opts.PageSize - } - sess.Limit(opts.PageSize, start) - } - - sess.OrderBy("attachment.created_unix DESC") - attachments := make([]*AttachmentInfo, 0, setting.UI.DatasetPagingNum) - if err := sess.Table(&Attachment{}).Where(cond). - Find(&attachments); err != nil { - return nil, 0, fmt.Errorf("Find: %v", err) - } - - if opts.NeedRepoInfo { - for _, attachment := range attachments { - dataset, err := GetDatasetByID(attachment.DatasetID) - if err != nil { - return nil, 0, fmt.Errorf("GetDatasetByID failed error: %v", err) - } - attachment.Recommend = dataset.Recommend - repo, err := GetRepositoryByID(dataset.RepoID) - if err == nil { - attachment.Repo = repo - } else { - return nil, 0, fmt.Errorf("GetRepositoryByID failed error: %v", err) - } - user, err := GetUserByID(attachment.UploaderID) - if err == nil { - attachment.RelAvatarLink = user.RelAvatarLink() - attachment.UserName = user.Name - } else { - return nil, 0, fmt.Errorf("GetUserByID failed error: %v", err) - } - } - } - - return attachments, count, nil -} - func GetAllDatasetContributorByDatasetId(datasetId int64) ([]*User, error) { r := make([]*User, 0) if err := x.Select("distinct(public.user.*)").Table("attachment").Join("LEFT", "user", "public.user.ID = attachment.uploader_id").Where("attachment.dataset_id = ?", datasetId).Find(&r); err != nil { diff --git a/models/base_message.go b/models/base_message.go index 37f7668ad..31103d795 100644 --- a/models/base_message.go +++ b/models/base_message.go @@ -1,8 +1,8 @@ package models type BaseMessage struct { - Code int - Message string + Code int `json:"code"` + Message string `json:"message"` } var BaseOKMessage = BaseMessage{ diff --git a/models/dataset.go b/models/dataset.go index 720850ed9..f3c2adbd5 100755 --- a/models/dataset.go +++ b/models/dataset.go @@ -21,26 +21,26 @@ const ( ) type Dataset struct { - ID int64 `xorm:"pk autoincr"` - Title string `xorm:"INDEX NOT NULL"` - Status int32 `xorm:"INDEX"` // normal_private: 0, pulbic: 1, is_delete: 2 - Category string - Description string `xorm:"TEXT"` - DownloadTimes int64 - UseCount int64 `xorm:"DEFAULT 0"` - NumStars int `xorm:"INDEX NOT NULL DEFAULT 0"` - Recommend bool `xorm:"INDEX NOT NULL DEFAULT false"` - License string - Task string - ReleaseID int64 `xorm:"INDEX"` - UserID int64 `xorm:"INDEX"` - RepoID int64 `xorm:"INDEX"` - Repo *Repository `xorm:"-"` - CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` - UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` - - User *User `xorm:"-"` - Attachments []*Attachment `xorm:"-"` + ID int64 `xorm:"pk autoincr" json:"id"` + Title string `xorm:"INDEX NOT NULL" json:"title"` + Status int32 `xorm:"INDEX" json:"status"` // normal_private: 0, pulbic: 1, is_delete: 2 + Category string `json:"category"` + Description string `xorm:"TEXT" json:"description"` + DownloadTimes int64 `json:"downloadTimes"` + UseCount int64 `xorm:"DEFAULT 0" json:"useCount"` + NumStars int `xorm:"INDEX NOT NULL DEFAULT 0" json:"numStars"` + Recommend bool `xorm:"INDEX NOT NULL DEFAULT false" json:"recommend"` + License string `json:"license"` + Task string `json:"task"` + ReleaseID int64 `xorm:"INDEX" json:"releaseId"` + UserID int64 `xorm:"INDEX" json:"userId"` + RepoID int64 `xorm:"INDEX" json:"repoId"` + Repo *Repository `xorm:"-" json:"repo"` + CreatedUnix timeutil.TimeStamp `xorm:"INDEX created" json:"createdUnix"` + UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated" json:"updatedUnix"` + + User *User `xorm:"-" json:"user"` + Attachments []*Attachment `xorm:"-" json:"attachments"` } type DatasetWithStar struct { diff --git a/models/repo.go b/models/repo.go index 2c4fda39b..ccb0d680c 100755 --- a/models/repo.go +++ b/models/repo.go @@ -162,79 +162,79 @@ const ( // Repository represents a git repository. type Repository struct { - ID int64 `xorm:"pk autoincr"` - OwnerID int64 `xorm:"UNIQUE(s) index"` - OwnerName string - Owner *User `xorm:"-"` - LowerName string `xorm:"UNIQUE(s) INDEX NOT NULL"` - Name string `xorm:"INDEX NOT NULL"` - Description string `xorm:"TEXT"` - Website string `xorm:"VARCHAR(2048)"` - OriginalServiceType api.GitServiceType `xorm:"index"` - OriginalURL string `xorm:"VARCHAR(2048)"` - DefaultBranch string - CreatorID int64 `xorm:"INDEX NOT NULL DEFAULT 0"` - Creator *User `xorm:"-"` - NumWatches int - NumStars int - NumForks int - NumIssues int - NumClosedIssues int - NumOpenIssues int `xorm:"-"` - NumPulls int - NumClosedPulls int - NumOpenPulls int `xorm:"-"` - NumMilestones int `xorm:"NOT NULL DEFAULT 0"` - NumClosedMilestones int `xorm:"NOT NULL DEFAULT 0"` - NumOpenMilestones int `xorm:"-"` - NumCommit int64 `xorm:"NOT NULL DEFAULT 0"` - RepoType RepoType `xorm:"NOT NULL DEFAULT 0"` - - IsPrivate bool `xorm:"INDEX"` - IsEmpty bool `xorm:"INDEX"` - IsArchived bool `xorm:"INDEX"` - IsMirror bool `xorm:"INDEX"` + ID int64 `xorm:"pk autoincr" json:"id"` + OwnerID int64 `xorm:"UNIQUE(s) index" json:"ownerId"` + OwnerName string `json:"ownerName"` + Owner *User `xorm:"-" json:"owner"` + LowerName string `xorm:"UNIQUE(s) INDEX NOT NULL" json:"lowerName"` + Name string `xorm:"INDEX NOT NULL" json:"name"` + Description string `xorm:"TEXT" json:"description"` + Website string `xorm:"VARCHAR(2048)" json:"website"` + OriginalServiceType api.GitServiceType `xorm:"index" json:"originalServiceType"` + OriginalURL string `xorm:"VARCHAR(2048)" json:"originalUrl"` + DefaultBranch string `json:"defaultBranch"` + CreatorID int64 `xorm:"INDEX NOT NULL DEFAULT 0" json:"creatorId"` + Creator *User `xorm:"-" json:"creator"` + NumWatches int `json:"numWatches"` + NumStars int `json:"numStars"` + NumForks int `json:"numForks"` + NumIssues int `json:"numIssues"` + NumClosedIssues int `json:"numClosedIssues"` + NumOpenIssues int `xorm:"-" json:"numOpenIssues"` + NumPulls int `json:"numPulls"` + NumClosedPulls int `json:"numClosedPulls"` + NumOpenPulls int `xorm:"-" json:"numOpenPulls"` + NumMilestones int `xorm:"NOT NULL DEFAULT 0" json:"numMilestones"` + NumClosedMilestones int `xorm:"NOT NULL DEFAULT 0" json:"numClosedMilestones"` + NumOpenMilestones int `xorm:"-" json:"numOpenMilestones"` + NumCommit int64 `xorm:"NOT NULL DEFAULT 0" json:"numCommit"` + RepoType RepoType `xorm:"NOT NULL DEFAULT 0" json:"repoType"` + + IsPrivate bool `xorm:"INDEX" json:"isPrivate"` + IsEmpty bool `xorm:"INDEX" json:"isEmpty"` + IsArchived bool `xorm:"INDEX" json:"isArchived"` + IsMirror bool `xorm:"INDEX" json:"isMirror"` *Mirror `xorm:"-"` - Status RepositoryStatus `xorm:"NOT NULL DEFAULT 0"` + Status RepositoryStatus `xorm:"NOT NULL DEFAULT 0" json:"status"` RenderingMetas map[string]string `xorm:"-"` - Units []*RepoUnit `xorm:"-"` - PrimaryLanguage *LanguageStat `xorm:"-"` - - IsFork bool `xorm:"INDEX NOT NULL DEFAULT false"` - ForkID int64 `xorm:"INDEX"` - BaseRepo *Repository `xorm:"-"` - IsTemplate bool `xorm:"INDEX NOT NULL DEFAULT false"` - TemplateID int64 `xorm:"INDEX"` - TemplateRepo *Repository `xorm:"-"` - Size int64 `xorm:"NOT NULL DEFAULT 0"` - CodeIndexerStatus *RepoIndexerStatus `xorm:"-"` - StatsIndexerStatus *RepoIndexerStatus `xorm:"-"` - IsFsckEnabled bool `xorm:"NOT NULL DEFAULT true"` - CloseIssuesViaCommitInAnyBranch bool `xorm:"NOT NULL DEFAULT false"` - Topics []string `xorm:"TEXT JSON"` + Units []*RepoUnit `xorm:"-" json:"units"` + PrimaryLanguage *LanguageStat `xorm:"-" json:"primaryLanguage"` + + IsFork bool `xorm:"INDEX NOT NULL DEFAULT false" json:"isFork"` + ForkID int64 `xorm:"INDEX" json:"forkId"` + BaseRepo *Repository `xorm:"-" json:"baseRepo"` + IsTemplate bool `xorm:"INDEX NOT NULL DEFAULT false" json:"isTemplate"` + TemplateID int64 `xorm:"INDEX" json:"templateId"` + TemplateRepo *Repository `xorm:"-" json:"templateRepo"` + Size int64 `xorm:"NOT NULL DEFAULT 0" json:"size"` + CodeIndexerStatus *RepoIndexerStatus `xorm:"-" json:"codeIndexerStatus"` + StatsIndexerStatus *RepoIndexerStatus `xorm:"-" json:"statsIndexerStatus"` + IsFsckEnabled bool `xorm:"NOT NULL DEFAULT true" json:"isFsckEnabled"` + CloseIssuesViaCommitInAnyBranch bool `xorm:"NOT NULL DEFAULT false" json:"closeIssuesViaCommitInAnyBranch"` + Topics []string `xorm:"TEXT JSON" json:"topics"` // Avatar: ID(10-20)-md5(32) - must fit into 64 symbols - Avatar string `xorm:"VARCHAR(64)"` + Avatar string `xorm:"VARCHAR(64)" json:"avatar"` //blockchain - ContractAddress string `xorm:"INDEX"` - Balance string `xorm:"NOT NULL DEFAULT '0'"` - BlockChainStatus RepoBlockChainStatus `xorm:"NOT NULL DEFAULT 0"` + ContractAddress string `xorm:"INDEX" json:"contractAddress"` + Balance string `xorm:"NOT NULL DEFAULT '0'" json:"balance"` + BlockChainStatus RepoBlockChainStatus `xorm:"NOT NULL DEFAULT 0" json:"blockChainStatus"` // git clone and git pull total count - CloneCnt int64 `xorm:"NOT NULL DEFAULT 0"` + CloneCnt int64 `xorm:"NOT NULL DEFAULT 0" json:"clone_cnt" json:"cloneCnt"` // only git clone total count - GitCloneCnt int64 `xorm:"NOT NULL DEFAULT 0"` + GitCloneCnt int64 `xorm:"NOT NULL DEFAULT 0" json:"git_clone_cnt" json:"gitCloneCnt"` - CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` - UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` + CreatedUnix timeutil.TimeStamp `xorm:"INDEX created" json:"createdUnix"` + UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated" json:"updatedUnix"` - Hot int64 `xorm:"-"` - Active int64 `xorm:"-"` - Alias string `xorm:"INDEX"` - LowerAlias string `xorm:"INDEX"` + Hot int64 `xorm:"-" json:"hot"` + Active int64 `xorm:"-" json:"active"` + Alias string `xorm:"INDEX" json:"alias"` + LowerAlias string `xorm:"INDEX" json:"lowerAlias"` } type RepositoryShow struct { diff --git a/modules/auth/modelarts.go b/modules/auth/modelarts.go index 9744bc387..0221c51d8 100755 --- a/modules/auth/modelarts.go +++ b/modules/auth/modelarts.go @@ -57,29 +57,26 @@ type CreateModelArtsTrainJobForm struct { } type CreateModelArtsInferenceJobForm struct { - DisplayJobName string `form:"display_job_name" binding:"Required"` - JobName string `form:"job_name" binding:"Required"` - Attachment string `form:"attachment" binding:"Required"` - BootFile string `form:"boot_file" binding:"Required"` - WorkServerNumber int `form:"work_server_number" binding:"Required"` - EngineID int `form:"engine_id" binding:"Required"` - PoolID string `form:"pool_id" binding:"Required"` - Flavor string `form:"flavor" binding:"Required"` - Params string `form:"run_para_list" binding:"Required"` - Description string `form:"description"` - IsSaveParam string `form:"is_save_para"` - ParameterTemplateName string `form:"parameter_template_name"` - PrameterDescription string `form:"parameter_description"` - BranchName string `form:"branch_name" binding:"Required"` - VersionName string `form:"version_name" binding:"Required"` - FlavorName string `form:"flaver_names" binding:"Required"` - EngineName string `form:"engine_names" binding:"Required"` - LabelName string `form:"label_names" binding:"Required"` - TrainUrl string `form:"train_url" binding:"Required"` - ModelName string `form:"model_name" binding:"Required"` - ModelVersion string `form:"model_version" binding:"Required"` - CkptName string `form:"ckpt_name" binding:"Required"` - SpecId int64 `form:"spec_id" binding:"Required"` + DisplayJobName string `form:"display_job_name" binding:"Required"` + JobName string `form:"job_name" binding:"Required"` + Attachment string `form:"attachment" binding:"Required"` + BootFile string `form:"boot_file" binding:"Required"` + WorkServerNumber int `form:"work_server_number" binding:"Required"` + EngineID int `form:"engine_id" binding:"Required"` + PoolID string `form:"pool_id" binding:"Required"` + Flavor string `form:"flavor" binding:"Required"` + Params string `form:"run_para_list" binding:"Required"` + Description string `form:"description"` + BranchName string `form:"branch_name" binding:"Required"` + VersionName string `form:"version_name" binding:"Required"` + FlavorName string `form:"flaver_names" binding:"Required"` + EngineName string `form:"engine_names" binding:"Required"` + LabelName string `form:"label_names" binding:"Required"` + TrainUrl string `form:"train_url" binding:"Required"` + ModelName string `form:"model_name" binding:"Required"` + ModelVersion string `form:"model_version" binding:"Required"` + CkptName string `form:"ckpt_name" binding:"Required"` + SpecId int64 `form:"spec_id" binding:"Required"` } func (f *CreateModelArtsTrainJobForm) Validate(ctx *macaron.Context, errs binding.Errors) binding.Errors { diff --git a/modules/cloudbrain/cloudbrain.go b/modules/cloudbrain/cloudbrain.go index e57bd8d7e..8d4e57670 100755 --- a/modules/cloudbrain/cloudbrain.go +++ b/modules/cloudbrain/cloudbrain.go @@ -228,7 +228,7 @@ func AdminOrImageCreaterRight(ctx *context.Context) { } -func GenerateTask(req GenerateCloudBrainTaskReq) error { +func GenerateTask(req GenerateCloudBrainTaskReq) (string, error) { var versionCount int if req.JobType == string(models.JobTypeTrain) { versionCount = 1 @@ -335,11 +335,11 @@ func GenerateTask(req GenerateCloudBrainTaskReq) error { }) if err != nil { log.Error("CreateJob failed:", err.Error(), req.Ctx.Data["MsgID"]) - return err + return "", err } if jobResult.Code != Success { log.Error("CreateJob(%s) failed:%s", req.JobName, jobResult.Msg, req.Ctx.Data["MsgID"]) - return errors.New(jobResult.Msg) + return "", errors.New(jobResult.Msg) } var jobID = jobResult.Payload["jobId"].(string) @@ -380,13 +380,13 @@ func GenerateTask(req GenerateCloudBrainTaskReq) error { }) if err != nil { - return err + return "", err } task, err := models.GetCloudbrainByJobID(jobID) if err != nil { log.Error("GetCloudbrainByJobID failed: %v", err.Error()) - return err + return "", err } stringId := strconv.FormatInt(task.ID, 10) @@ -401,7 +401,7 @@ func GenerateTask(req GenerateCloudBrainTaskReq) error { notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, stringId, req.DisplayJobName, models.ActionCreateDebugGPUTask) } - return nil + return jobID, nil } func IsBenchmarkJob(jobType string) bool { diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 7bacb46d3..a827b408f 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -98,7 +98,7 @@ func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.Gram return datasetGrampus } -func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) { +func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) { createTime := timeutil.TimeStampNow() centerID, centerName := getCentersParamter(ctx, req) @@ -146,7 +146,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error }) if err != nil { log.Error("createJob failed: %v", err.Error()) - return err + return "", err } jobID := jobResult.JobInfo.JobID @@ -187,7 +187,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error if err != nil { log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error()) - return err + return "", err } var actionType models.ActionType @@ -198,7 +198,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error } notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType) - return nil + return jobID, nil } func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) { diff --git a/modules/modelarts/modelarts.go b/modules/modelarts/modelarts.go index 06521993e..4158174cb 100755 --- a/modules/modelarts/modelarts.go +++ b/modules/modelarts/modelarts.go @@ -680,7 +680,7 @@ func GetOutputPathByCount(TotalVersionCount int) (VersionOutputPath string) { return VersionOutputPath } -func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (err error) { +func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (jobId string, err error) { createTime := timeutil.TimeStampNow() jobResult, err := createInferenceJob(models.CreateInferenceJobParams{ JobName: req.JobName, @@ -715,10 +715,10 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e }) if err != nil { log.Error("InsertCloudbrainTemp failed: %v", err.Error()) - return err + return "", err } } - return err + return "", err } // attach, err := models.GetAttachmentByUUID(req.Uuid) @@ -769,7 +769,7 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e if err != nil { log.Error("CreateCloudbrain(%s) failed:%v", req.JobName, err.Error()) - return err + return "", err } if req.JobType == string(models.JobTypeModelSafety) { task, err := models.GetCloudbrainByJobID(jobID) @@ -780,7 +780,7 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, models.ActionCreateInferenceTask) } - return nil + return jobID, nil } func GetNotebookImageName(imageId string) (string, error) { diff --git a/routers/api/v1/api.go b/routers/api/v1/api.go index 7667cfcfa..b3694ced9 100755 --- a/routers/api/v1/api.go +++ b/routers/api/v1/api.go @@ -517,6 +517,15 @@ func RegisterRoutes(m *macaron.Macaron) { m.Post("/markdown", bind(api.MarkdownOption{}), misc.Markdown) m.Post("/markdown/raw", misc.MarkdownRaw) + m.Group("/images", func() { + + m.Get("public", repo.GetPublicImages) + m.Get("custom", repo.GetCustomImages) + m.Get("star", repo.GetStarImages) + m.Get("npu", repo.GetNpuImages) + + }, reqToken()) + // Notifications m.Group("/notifications", func() { m.Combo(""). @@ -696,6 +705,13 @@ func RegisterRoutes(m *macaron.Macaron) { m.Combo("/repositories/:id", reqToken()).Get(repo.GetByID) + m.Group("/:username/:reponame/datasets", func() { + m.Get("/current_repo", repo.CurrentRepoDatasetMultiple) + m.Get("/my_datasets", repo.MyDatasetsMultiple) + m.Get("/public_datasets", repo.PublicDatasetMultiple) + m.Get("/my_favorite", repo.MyFavoriteDatasetMultiple) + }, reqToken()) + m.Group("/repos", func() { m.Get("/search", repo.Search) @@ -955,6 +971,17 @@ func RegisterRoutes(m *macaron.Macaron) { m.Get("/:id/modelartlog", repo.TrainJobForModelConvertGetLog) m.Get("/:id/model_list", repo.CloudBrainModelConvertList) }, reqRepoReader(models.UnitTypeModelManage)) + m.Group("/cloudbrain", func() { + m.Group("/train-job", func() { + m.Post("/create", bind(api.CreateTrainJobOption{}), repo.CreateCloudBrain) + + }) + m.Group("/inference-job", func() { + m.Post("/create", bind(api.CreateTrainJobOption{}), repo.CreateCloudBrainInferenceTask) + + }) + + }, reqRepoReader(models.UnitTypeCloudBrain)) m.Group("/modelarts", func() { m.Group("/notebook", func() { //m.Get("/:jobid", repo.GetModelArtsNotebook) diff --git a/routers/api/v1/repo/cloudbrain.go b/routers/api/v1/repo/cloudbrain.go index ba46ab58c..58dd5b801 100755 --- a/routers/api/v1/repo/cloudbrain.go +++ b/routers/api/v1/repo/cloudbrain.go @@ -16,6 +16,10 @@ import ( "strings" "time" + "code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" + + api "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/setting" @@ -29,6 +33,28 @@ import ( routerRepo "code.gitea.io/gitea/routers/repo" ) +func CreateCloudBrain(ctx *context.APIContext, option api.CreateTrainJobOption) { + + if option.Type == 2 { + cloudbrainTask.GrampusTrainJobGpuCreate(ctx.Context, option) + } + if option.Type == 3 { + cloudbrainTask.GrampusTrainJobNpuCreate(ctx.Context, option) + } + +} + +func CreateCloudBrainInferenceTask(ctx *context.APIContext, option api.CreateTrainJobOption) { + + if option.Type == 0 { + cloudbrainTask.GrampusTrainJobGpuCreate(ctx.Context, option) + } + if option.Type == 1 { + cloudbrainTask.GrampusTrainJobNpuCreate(ctx.Context, option) + } + +} + // cloudbrain get job task by jobid func GetCloudbrainTask(ctx *context.APIContext) { // swagger:operation GET /repos/{owner}/{repo}/cloudbrain/{jobid} cloudbrain jobTask diff --git a/routers/api/v1/repo/datasets.go b/routers/api/v1/repo/datasets.go new file mode 100644 index 000000000..0539a22e9 --- /dev/null +++ b/routers/api/v1/repo/datasets.go @@ -0,0 +1,114 @@ +package repo + +import ( + "fmt" + "strings" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +func PublicDatasetMultiple(ctx *context.APIContext) { + + opts := &models.SearchDatasetOptions{ + PublicOnly: true, + NeedAttachment: true, + CloudBrainType: ctx.QueryInt("type"), + } + datasetMultiple(ctx, opts) + +} + +func MyFavoriteDatasetMultiple(ctx *context.APIContext) { + + opts := &models.SearchDatasetOptions{ + StarByMe: true, + DatasetIDs: models.GetDatasetIdsStarByUser(ctx.User.ID), + NeedAttachment: true, + CloudBrainType: ctx.QueryInt("type"), + } + datasetMultiple(ctx, opts) +} + +func CurrentRepoDatasetMultiple(ctx *context.APIContext) { + datasetIds := models.GetDatasetIdsByRepoID(ctx.Repo.Repository.ID) + searchOrderBy := getSearchOrderByInValues(datasetIds) + opts := &models.SearchDatasetOptions{ + RepoID: ctx.Repo.Repository.ID, + NeedAttachment: true, + CloudBrainType: ctx.QueryInt("type"), + DatasetIDs: datasetIds, + SearchOrderBy: searchOrderBy, + } + + datasetMultiple(ctx, opts) + +} + +func MyDatasetsMultiple(ctx *context.APIContext) { + + opts := &models.SearchDatasetOptions{ + UploadAttachmentByMe: true, + NeedAttachment: true, + CloudBrainType: ctx.QueryInt("type"), + } + datasetMultiple(ctx, opts) + +} +func datasetMultiple(ctx *context.APIContext, opts *models.SearchDatasetOptions) { + page := ctx.QueryInt("page") + if page < 1 { + page = 1 + } + pageSize := ctx.QueryInt("pageSize") + if pageSize < 1 { + pageSize = setting.UI.DatasetPagingNum + } + + keyword := strings.Trim(ctx.Query("q"), " ") + opts.Keyword = keyword + if opts.SearchOrderBy.String() == "" { + opts.SearchOrderBy = models.SearchOrderByRecentUpdated + } + + opts.RecommendOnly = ctx.QueryBool("recommend") + opts.ListOptions = models.ListOptions{ + Page: page, + PageSize: pageSize, + } + opts.JustNeedZipFile = true + opts.User = ctx.User + + datasets, count, err := models.SearchDataset(opts) + + if err != nil { + log.Error("json.Marshal failed:", err.Error()) + ctx.JSON(200, map[string]interface{}{ + "code": 1, + "message": err.Error(), + "data": []*models.Dataset{}, + "count": 0, + }) + return + } + ctx.JSON(200, map[string]interface{}{ + "code": 0, + "message": "", + "data": datasets, + "count": count, + }) +} + +func getSearchOrderByInValues(datasetIds []int64) models.SearchOrderBy { + if len(datasetIds) == 0 { + return "" + } + searchOrderBy := "CASE id " + for i, id := range datasetIds { + searchOrderBy += fmt.Sprintf(" WHEN %d THEN %d", id, i+1) + } + searchOrderBy += " ELSE 0 END" + return models.SearchOrderBy(searchOrderBy) +} diff --git a/routers/api/v1/repo/images.go b/routers/api/v1/repo/images.go new file mode 100644 index 000000000..f0cb62980 --- /dev/null +++ b/routers/api/v1/repo/images.go @@ -0,0 +1,141 @@ +package repo + +import ( + "encoding/json" + "net/http" + "strconv" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/grampus" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/modelarts" + "code.gitea.io/gitea/modules/setting" +) + +type NPUImageINFO struct { + ID string `json:"id"` + Value string `json:"value"` +} + +func GetPublicImages(ctx *context.APIContext) { + uid := getUID(ctx) + opts := models.SearchImageOptions{ + IncludePublicOnly: true, + UID: uid, + Keyword: ctx.Query("q"), + Topics: ctx.Query("topic"), + IncludeOfficialOnly: ctx.QueryBool("recommend"), + SearchOrderBy: "type desc, num_stars desc,id desc", + Status: models.IMAGE_STATUS_SUCCESS, + CloudbrainType: ctx.QueryInt("cloudbrainType"), + } + + getImages(ctx, &opts) + +} + +func GetCustomImages(ctx *context.APIContext) { + uid := getUID(ctx) + opts := models.SearchImageOptions{ + UID: uid, + IncludeOwnerOnly: true, + Keyword: ctx.Query("q"), + Topics: ctx.Query("topic"), + Status: -1, + SearchOrderBy: "id desc", + } + getImages(ctx, &opts) + +} +func GetStarImages(ctx *context.APIContext) { + + uid := getUID(ctx) + opts := models.SearchImageOptions{ + UID: uid, + IncludeStarByMe: true, + Keyword: ctx.Query("q"), + Topics: ctx.Query("topic"), + Status: models.IMAGE_STATUS_SUCCESS, + SearchOrderBy: "id desc", + } + getImages(ctx, &opts) + +} + +func GetNpuImages(ctx *context.APIContext) { + cloudbrainType := ctx.QueryInt("type") + if cloudbrainType == 0 { //modelarts + getModelArtsImages(ctx) + } else { //c2net + getC2netNpuImages(ctx) + } +} + +func getModelArtsImages(ctx *context.APIContext) { + + var versionInfos modelarts.VersionInfo + _ = json.Unmarshal([]byte(setting.EngineVersions), &versionInfos) + var npuImageInfos []NPUImageINFO + for _, info := range versionInfos.Version { + npuImageInfos = append(npuImageInfos, NPUImageINFO{ + ID: strconv.Itoa(info.ID), + Value: info.Value, + }) + } + ctx.JSON(http.StatusOK, npuImageInfos) + +} + +func getC2netNpuImages(ctx *context.APIContext) { + images, err := grampus.GetImages(grampus.ProcessorTypeNPU) + var npuImageInfos []NPUImageINFO + if err != nil { + log.Error("GetImages failed:", err.Error()) + ctx.JSON(http.StatusOK, []NPUImageINFO{}) + } else { + for _, info := range images.Infos { + npuImageInfos = append(npuImageInfos, NPUImageINFO{ + ID: info.ID, + Value: info.Name, + }) + } + ctx.JSON(http.StatusOK, npuImageInfos) + } +} +func getImages(ctx *context.APIContext, opts *models.SearchImageOptions) { + page := ctx.QueryInt("page") + if page <= 0 { + page = 1 + } + + pageSize := ctx.QueryInt("pageSize") + if pageSize <= 0 { + pageSize = 15 + } + opts.ListOptions = models.ListOptions{ + Page: page, + PageSize: pageSize, + } + imageList, total, err := models.SearchImage(opts) + if err != nil { + log.Error("Can not get images:%v", err) + ctx.JSON(http.StatusOK, models.ImagesPageResult{ + Count: 0, + Images: []*models.Image{}, + }) + } else { + ctx.JSON(http.StatusOK, models.ImagesPageResult{ + Count: total, + Images: imageList, + }) + } +} + +func getUID(ctx *context.APIContext) int64 { + var uid int64 = -1 + if ctx.IsSigned { + uid = ctx.User.ID + } + return uid +} diff --git a/services/cloudbrain/cloudbrainTask/inference.go b/services/cloudbrain/cloudbrainTask/inference.go new file mode 100644 index 000000000..ba5ba7cf1 --- /dev/null +++ b/services/cloudbrain/cloudbrainTask/inference.go @@ -0,0 +1,631 @@ +package cloudbrainTask + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path" + "strconv" + "strings" + "unicode/utf8" + + "code.gitea.io/gitea/modules/modelarts" + + "code.gitea.io/gitea/modules/git" + + api "code.gitea.io/gitea/modules/structs" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/cloudbrain" + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/storage" + "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/services/cloudbrain/resource" + "code.gitea.io/gitea/services/reward/point/account" +) + +const CLONE_FILE_PREFIX = "file:///" + +func CloudBrainInferenceJobCreate(ctx *context.Context, option api.CreateTrainJobOption) { + + displayJobName := option.DisplayJobName + jobName := util.ConvertDisplayJobNameToJobName(displayJobName) + image := strings.TrimSpace(option.Image) + uuid := option.Attachment + jobType := string(models.JobTypeInference) + codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath + branchName := option.BranchName + bootFile := strings.TrimSpace(option.BootFile) + labelName := option.LabelName + repo := ctx.Repo.Repository + + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) + defer lock.UnLock() + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_samejob_err"))) + return + } + + ckptUrl := setting.Attachment.Minio.RealPath + option.PreTrainModelUrl + option.CkptName + log.Info("ckpt url:" + ckptUrl) + command, err := getInferenceJobCommand(option) + if err != nil { + log.Error("getTrainJobCommand failed: %v", err) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) + if err == nil { + if len(tasks) != 0 { + log.Error("the job name did already exist", ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage("the job name did already exist")) + return + } + } else { + if !models.IsErrJobNotExist(err) { + log.Error("system error, %v", err, ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error")) + return + } + } + + if !jobNamePattern.MatchString(displayJobName) { + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_jobname_err"))) + return + } + + bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName) + if err != nil || !bootFileExist { + log.Error("Get bootfile error:", err, ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_bootfile_err"))) + return + } + + count, err := models.GetCloudbrainCountByUserID(ctx.User.ID, jobType) + if err != nil { + log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error")) + return + } else { + if count >= 1 { + log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain.morethanonejob"))) + return + } + } + + if branchName == "" { + branchName = cloudbrain.DefaultBranchName + } + errStr := loadCodeAndMakeModelPath(repo, codePath, branchName, jobName, cloudbrain.ResultPath) + if errStr != "" { + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr(errStr))) + return + } + + commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) + + datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid) + if err != nil { + log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.error.dataset_select"))) + return + } + spec, err := resource.GetAndCheckSpec(ctx.User.ID, option.SpecId, models.FindSpecsOptions{ + JobType: models.JobTypeInference, + ComputeResource: models.GPU, + Cluster: models.OpenICluster, + AiCenterCode: models.AICenterOfCloudBrainOne}) + if err != nil || spec == nil { + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("Resource specification is not available")) + return + } + if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { + log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("points.insufficient_points_balance"))) + return + } + req := cloudbrain.GenerateCloudBrainTaskReq{ + Ctx: ctx, + DisplayJobName: displayJobName, + JobName: jobName, + Image: image, + Command: command, + Uuids: uuid, + DatasetNames: datasetNames, + DatasetInfos: datasetInfos, + CodePath: storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"), + ModelPath: setting.Attachment.Minio.RealPath + option.PreTrainModelUrl, + BenchmarkPath: storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"), + Snn4ImageNetPath: storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"), + BrainScorePath: storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"), + JobType: jobType, + Description: option.Description, + BranchName: branchName, + BootFile: option.BootFile, + Params: option.Params, + CommitID: commitID, + ResultPath: storage.GetMinioPath(jobName, cloudbrain.ResultPath+"/"), + ModelName: option.ModelName, + ModelVersion: option.ModelVersion, + CkptName: option.CkptName, + TrainUrl: option.PreTrainModelUrl, + LabelName: labelName, + Spec: spec, + } + + jobId, err := cloudbrain.GenerateTask(req) + if err != nil { + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId}) +} + +func ModelArtsInferenceJobCreate(ctx *context.Context, option api.CreateTrainJobOption) { + ctx.Data["PageIsTrainJob"] = true + VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount) + displayJobName := option.DisplayJobName + jobName := util.ConvertDisplayJobNameToJobName(displayJobName) + uuid := option.Attachment + description := option.Description + workServerNumber := option.WorkServerNumber + engineID, _ := strconv.Atoi(option.ImageID) + bootFile := strings.TrimSpace(option.BootFile) + params := option.Params + repo := ctx.Repo.Repository + codeLocalPath := setting.JobPath + jobName + modelarts.CodePath + codeObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.CodePath + resultObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.ResultPath + VersionOutputPath + "/" + logObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.LogPath + VersionOutputPath + "/" + //dataPath := "/" + setting.Bucket + "/" + setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + uuid + "/" + branchName := option.BranchName + EngineName := option.Image + LabelName := option.LabelName + isLatestVersion := modelarts.IsLatestVersion + VersionCount := modelarts.VersionCountOne + trainUrl := option.PreTrainModelUrl + modelName := option.ModelName + modelVersion := option.ModelVersion + ckptName := option.CkptName + ckptUrl := "/" + option.PreTrainModelUrl + option.CkptName + + errStr := checkInferenceJobMultiNode(ctx.User.ID, option.WorkServerNumber) + if errStr != "" { + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr(errStr))) + return + } + + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)) + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_samejob_err"))) + return + } + defer lock.UnLock() + + count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID) + if err != nil { + log.Error("GetCloudbrainInferenceJobCountByUserID failed:%v", err, ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error")) + return + } else { + if count >= 1 { + log.Error("the user already has running or waiting inference task", ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("you have already a running or waiting inference task, can not create more")) + return + } + } + + if err := paramCheckCreateInferenceJob(option); err != nil { + log.Error("paramCheckCreateInferenceJob failed:(%v)", err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + + bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName) + if err != nil || !bootFileExist { + log.Error("Get bootfile error:", err) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_bootfile_err"))) + return + } + + //Determine whether the task name of the task in the project is duplicated + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeInference), displayJobName) + if err == nil { + if len(tasks) != 0 { + log.Error("the job name did already exist", ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("the job name did already exist")) + return + } + } else { + if !models.IsErrJobNotExist(err) { + log.Error("system error, %v", err, ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error")) + return + } + } + + spec, err := resource.GetAndCheckSpec(ctx.User.ID, option.SpecId, models.FindSpecsOptions{ + JobType: models.JobTypeInference, + ComputeResource: models.NPU, + Cluster: models.OpenICluster, + AiCenterCode: models.AICenterOfCloudBrainTwo}) + if err != nil || spec == nil { + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("Resource specification not available")) + return + } + if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { + log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("points.insufficient_points_balance"))) + return + } + + //todo: del the codeLocalPath + _, err = ioutil.ReadDir(codeLocalPath) + if err == nil { + os.RemoveAll(codeLocalPath) + } + + gitRepo, _ := git.OpenRepository(repo.RepoPath()) + commitID, _ := gitRepo.GetBranchCommitID(branchName) + + if err := downloadCode(repo, codeLocalPath, branchName); err != nil { + log.Error("Create task failed, server timed out: %s (%v)", repo.FullName(), err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + //todo: upload code (send to file_server todo this work?) + if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.ResultPath + VersionOutputPath + "/"); err != nil { + log.Error("Failed to obsMkdir_result: %s (%v)", repo.FullName(), err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("Failed to obsMkdir_result")) + return + } + + if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.LogPath + VersionOutputPath + "/"); err != nil { + log.Error("Failed to obsMkdir_log: %s (%v)", repo.FullName(), err) + ctx.JSON(http.StatusOK, models.BaseErrorMessage("Failed to obsMkdir_log")) + return + } + + if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { + log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + var parameters models.Parameters + param := make([]models.Parameter, 0) + param = append(param, models.Parameter{ + Label: modelarts.ResultUrl, + Value: "s3:/" + resultObsPath, + }, models.Parameter{ + Label: modelarts.CkptUrl, + Value: "s3:/" + ckptUrl, + }) + + datasUrlList, dataUrl, datasetNames, isMultiDataset, err := getDatasUrlListByUUIDS(uuid) + if err != nil { + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + dataPath := dataUrl + jsondatas, err := json.Marshal(datasUrlList) + if err != nil { + log.Error("Failed to Marshal: %v", err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("json error:"+err.Error())) + return + } + if isMultiDataset { + param = append(param, models.Parameter{ + Label: modelarts.MultiDataUrl, + Value: string(jsondatas), + }) + } + + existDeviceTarget := false + if len(params) != 0 { + err := json.Unmarshal([]byte(params), ¶meters) + if err != nil { + log.Error("Failed to Unmarshal params: %s (%v)", params, err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("运行参数错误")) + return + } + + for _, parameter := range parameters.Parameter { + if parameter.Label == modelarts.DeviceTarget { + existDeviceTarget = true + } + if parameter.Label != modelarts.TrainUrl && parameter.Label != modelarts.DataUrl { + param = append(param, models.Parameter{ + Label: parameter.Label, + Value: parameter.Value, + }) + } + } + } + if !existDeviceTarget { + param = append(param, models.Parameter{ + Label: modelarts.DeviceTarget, + Value: modelarts.Ascend, + }) + } + + req := &modelarts.GenerateInferenceJobReq{ + JobName: jobName, + DisplayJobName: displayJobName, + DataUrl: dataPath, + Description: description, + CodeObsPath: codeObsPath, + BootFileUrl: codeObsPath + bootFile, + BootFile: bootFile, + TrainUrl: trainUrl, + WorkServerNumber: workServerNumber, + EngineID: int64(engineID), + LogUrl: logObsPath, + PoolID: getPoolId(), + Uuid: uuid, + Parameters: param, //modelarts train parameters + CommitID: commitID, + BranchName: branchName, + Params: option.Params, + EngineName: EngineName, + LabelName: LabelName, + IsLatestVersion: isLatestVersion, + VersionCount: VersionCount, + TotalVersionCount: modelarts.TotalVersionCount, + ModelName: modelName, + ModelVersion: modelVersion, + CkptName: ckptName, + ResultUrl: resultObsPath, + Spec: spec, + DatasetName: datasetNames, + JobType: string(models.JobTypeInference), + } + + jobId, err := modelarts.GenerateInferenceJob(ctx, req) + if err != nil { + log.Error("GenerateTrainJob failed:%v", err.Error()) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId}) +} + +func getDatasUrlListByUUIDS(uuidStr string) ([]models.Datasurl, string, string, bool, error) { + var isMultiDataset bool + var dataUrl string + var datasetNames string + var datasUrlList []models.Datasurl + uuids := strings.Split(uuidStr, ";") + if len(uuids) > setting.MaxDatasetNum { + log.Error("the dataset count(%d) exceed the limit", len(uuids)) + return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("the dataset count exceed the limit") + } + + datasetInfos := make(map[string]models.DatasetInfo) + attachs, err := models.GetAttachmentsByUUIDs(uuids) + if err != nil || len(attachs) != len(uuids) { + log.Error("GetAttachmentsByUUIDs failed: %v", err) + return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("GetAttachmentsByUUIDs failed") + } + + for i, tmpUuid := range uuids { + var attach *models.Attachment + for _, tmpAttach := range attachs { + if tmpAttach.UUID == tmpUuid { + attach = tmpAttach + break + } + } + if attach == nil { + log.Error("GetAttachmentsByUUIDs failed: %v", err) + return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("GetAttachmentsByUUIDs failed") + } + fileName := strings.TrimSuffix(strings.TrimSuffix(strings.TrimSuffix(attach.Name, ".zip"), ".tar.gz"), ".tgz") + for _, datasetInfo := range datasetInfos { + if fileName == datasetInfo.Name { + log.Error("the dataset name is same: %v", attach.Name) + return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("the dataset name is same") + } + } + if len(attachs) <= 1 { + dataUrl = "/" + setting.Bucket + "/" + setting.BasePath + path.Join(attach.UUID[0:1], attach.UUID[1:2]) + "/" + attach.UUID + attach.UUID + "/" + isMultiDataset = false + } else { + dataUrl = "/" + setting.Bucket + "/" + setting.BasePath + path.Join(attachs[0].UUID[0:1], attachs[0].UUID[1:2]) + "/" + attachs[0].UUID + attachs[0].UUID + "/" + datasetUrl := "s3://" + setting.Bucket + "/" + setting.BasePath + path.Join(attach.UUID[0:1], attach.UUID[1:2]) + "/" + attach.UUID + attach.UUID + "/" + datasUrlList = append(datasUrlList, models.Datasurl{ + DatasetUrl: datasetUrl, + DatasetName: fileName, + }) + isMultiDataset = true + } + + if i == 0 { + datasetNames = attach.Name + } else { + datasetNames += ";" + attach.Name + } + } + + return datasUrlList, dataUrl, datasetNames, isMultiDataset, nil +} +func checkInferenceJobMultiNode(userId int64, serverNum int) string { + if serverNum == 1 { + return "" + } + + return "repo.modelarts.no_node_right" + +} + +func paramCheckCreateInferenceJob(option api.CreateTrainJobOption) error { + if !strings.HasSuffix(strings.TrimSpace(option.BootFile), ".py") { + log.Error("the boot file(%s) must be a python file", strings.TrimSpace(option.BootFile)) + return errors.New("启动文件必须是python文件") + } + + if option.ModelName == "" { + log.Error("the ModelName(%d) must not be nil", option.ModelName) + return errors.New("模型名称不能为空") + } + if option.ModelVersion == "" { + log.Error("the ModelVersion(%d) must not be nil", option.ModelVersion) + return errors.New("模型版本不能为空") + } + if option.CkptName == "" { + log.Error("the CkptName(%d) must not be nil", option.CkptName) + return errors.New("权重文件不能为空") + } + if option.BranchName == "" { + log.Error("the Branch(%d) must not be nil", option.BranchName) + return errors.New("分支名不能为空") + } + + if utf8.RuneCountInString(option.Description) > 255 { + log.Error("the Description length(%d) must not more than 255", option.Description) + return errors.New("描述字符不能超过255个字符") + } + + return nil +} + +func loadCodeAndMakeModelPath(repo *models.Repository, codePath string, branchName string, jobName string, resultPath string) string { + err := downloadCode(repo, codePath, branchName) + if err != nil { + return "cloudbrain.load_code_failed" + } + + err = uploadCodeToMinio(codePath+"/", jobName, cloudbrain.CodeMountPath+"/") + if err != nil { + return "cloudbrain.load_code_failed" + } + + modelPath := setting.JobPath + jobName + resultPath + "/" + err = mkModelPath(modelPath) + if err != nil { + return "cloudbrain.load_code_failed" + } + err = uploadCodeToMinio(modelPath, jobName, resultPath+"/") + if err != nil { + return "cloudbrain.load_code_failed" + } + + return "" +} + +func downloadCode(repo *models.Repository, codePath, branchName string) error { + //add "file:///" prefix to make the depth valid + if err := git.Clone(CLONE_FILE_PREFIX+repo.RepoPath(), codePath, git.CloneRepoOptions{Branch: branchName, Depth: 1}); err != nil { + log.Error("Failed to clone repository: %s (%v)", repo.FullName(), err) + return err + } + + configFile, err := os.OpenFile(codePath+"/.git/config", os.O_RDWR, 0666) + if err != nil { + log.Error("open file(%s) failed:%v", codePath+"/,git/config", err) + return err + } + + defer configFile.Close() + + pos := int64(0) + reader := bufio.NewReader(configFile) + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + log.Error("not find the remote-url") + return nil + } else { + log.Error("read error: %v", err) + return err + } + } + + if strings.Contains(line, "url") && strings.Contains(line, ".git") { + originUrl := "\turl = " + repo.CloneLink().HTTPS + "\n" + if len(line) > len(originUrl) { + originUrl += strings.Repeat(" ", len(line)-len(originUrl)) + } + bytes := []byte(originUrl) + _, err := configFile.WriteAt(bytes, pos) + if err != nil { + log.Error("WriteAt failed:%v", err) + return err + } + break + } + + pos += int64(len(line)) + } + + return nil +} + +func getInferenceJobCommand(option api.CreateTrainJobOption) (string, error) { + var command string + bootFile := strings.TrimSpace(option.BootFile) + params := option.Params + + if !strings.HasSuffix(bootFile, ".py") { + log.Error("bootFile(%s) format error", bootFile) + return command, errors.New("bootFile format error") + } + + var parameters models.Parameters + var param string + if len(params) != 0 { + err := json.Unmarshal([]byte(params), ¶meters) + if err != nil { + log.Error("Failed to Unmarshal params: %s (%v)", params, err) + return command, err + } + + for _, parameter := range parameters.Parameter { + param += " --" + parameter.Label + "=" + parameter.Value + } + } + + param += " --modelname" + "=" + option.CkptName + + command += "python /code/" + bootFile + param + " > " + cloudbrain.ResultPath + "/" + option.DisplayJobName + "-" + cloudbrain.LogFile + + return command, nil +} diff --git a/services/cloudbrain/cloudbrainTask/train.go b/services/cloudbrain/cloudbrainTask/train.go new file mode 100644 index 000000000..28d5038bf --- /dev/null +++ b/services/cloudbrain/cloudbrainTask/train.go @@ -0,0 +1,678 @@ +package cloudbrainTask + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path" + "regexp" + "strings" + + "code.gitea.io/gitea/modules/obs" + + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/storage" + "github.com/unknwon/com" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/cloudbrain" + "code.gitea.io/gitea/modules/context" + "code.gitea.io/gitea/modules/grampus" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/modelarts" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "code.gitea.io/gitea/modules/setting" + api "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/services/cloudbrain/resource" + "code.gitea.io/gitea/services/reward/point/account" +) + +var jobNamePattern = regexp.MustCompile(`^[a-z0-9][a-z0-9-_]{1,34}[a-z0-9-]$`) + +func GrampusTrainJobGpuCreate(ctx *context.Context, option api.CreateTrainJobOption) { + + displayJobName := option.DisplayJobName + jobName := util.ConvertDisplayJobNameToJobName(displayJobName) + uuid := option.Attachment + description := option.Description + bootFile := strings.TrimSpace(option.BootFile) + params := option.Params + repo := ctx.Repo.Repository + codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" + codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" + branchName := option.BranchName + image := strings.TrimSpace(option.Image) + + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) + defer lock.UnLock() + spec, datasetInfos, datasetNames, err := checkParameters(ctx, option, lock, repo) + if err != nil { + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + + //prepare code and out path + _, err = ioutil.ReadDir(codeLocalPath) + if err == nil { + os.RemoveAll(codeLocalPath) + } + + if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { + log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + + } + + //todo: upload code (send to file_server todo this work?) + //upload code + if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { + log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/" + if err := mkModelPath(modelPath); err != nil { + log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + //init model readme + if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil { + log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + var datasetRemotePath, allFileName string + for _, datasetInfo := range datasetInfos { + if datasetRemotePath == "" { + datasetRemotePath = datasetInfo.DataLocalPath + allFileName = datasetInfo.FullName + } else { + datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath + allFileName = allFileName + ";" + datasetInfo.FullName + } + + } + + //prepare command + preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName) + + command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, option.CkptName) + if err != nil { + log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage("Create task failed, internal error")) + return + } + + commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) + + req := &grampus.GenerateTrainJobReq{ + JobName: jobName, + DisplayJobName: displayJobName, + ComputeResource: models.GPUResource, + ProcessType: grampus.ProcessorTypeGPU, + Command: command, + ImageUrl: image, + Description: description, + BootFile: bootFile, + Uuid: uuid, + CommitID: commitID, + BranchName: branchName, + Params: option.Params, + EngineName: image, + DatasetNames: datasetNames, + DatasetInfos: datasetInfos, + + IsLatestVersion: modelarts.IsLatestVersion, + VersionCount: modelarts.VersionCountOne, + WorkServerNumber: 1, + Spec: spec, + } + + if option.ModelName != "" { //使用预训练模型训练 + req.ModelName = option.ModelName + req.LabelName = option.LabelName + req.CkptName = option.CkptName + req.ModelVersion = option.ModelVersion + req.PreTrainModelUrl = option.PreTrainModelUrl + + } + + jobId, err := grampus.GenerateTrainJob(ctx, req) + if err != nil { + log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"]) + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId}) +} + +func checkParameters(ctx *context.Context, option api.CreateTrainJobOption, lock *redis_lock.DistributeLock, repo *models.Repository) (*models.Specification, map[string]models.DatasetInfo, string, error) { + isOk, err := lock.Lock(models.CloudbrainKeyDuration) + if !isOk { + log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) + + return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_samejob_err")) + } + + if !jobNamePattern.MatchString(option.DisplayJobName) { + return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_jobname_err")) + } + + bootFileExist, err := ctx.Repo.FileExists(option.BootFile, option.BranchName) + if err != nil || !bootFileExist { + log.Error("Get bootfile error:", err, ctx.Data["MsgID"]) + return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_bootfile_err")) + } + + //check count limit + count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.GPUResource) + if err != nil { + log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) + return nil, nil, "", fmt.Errorf("system error") + } else { + if count >= 1 { + log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) + return nil, nil, "", fmt.Errorf("you have already a running or waiting task, can not create more.") + } + } + + //check param + if err := grampusParamCheckCreateTrainJob(option.BootFile, option.BranchName); err != nil { + log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"]) + return nil, nil, "", err + } + + //check whether the task name in the project is duplicated + tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), option.DisplayJobName) + if err == nil { + if len(tasks) != 0 { + log.Error("the job name did already exist", ctx.Data["MsgID"]) + return nil, nil, "", fmt.Errorf("The job name did already exist.") + } + } else { + if !models.IsErrJobNotExist(err) { + log.Error("system error, %v", err, ctx.Data["MsgID"]) + return nil, nil, "", fmt.Errorf("system error") + } + } + + //check specification + computeResource := models.GPU + if option.Type == 3 { + computeResource = models.NPU + } + spec, err := resource.GetAndCheckSpec(ctx.User.ID, option.SpecId, models.FindSpecsOptions{ + JobType: models.JobTypeTrain, + ComputeResource: computeResource, + Cluster: models.C2NetCluster, + }) + if err != nil || spec == nil { + return nil, nil, "", fmt.Errorf("Resource specification is not available.") + } + + if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { + log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) + return nil, nil, "", fmt.Errorf(ctx.Tr("points.insufficient_points_balance")) + } + + //check dataset + datasetInfos, datasetNames, err := models.GetDatasetInfo(option.Attachment, computeResource) + if err != nil { + log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) + return nil, nil, "", fmt.Errorf(ctx.Tr("cloudbrain.error.dataset_select")) + } + return spec, datasetInfos, datasetNames, err +} + +func GrampusTrainJobNpuCreate(ctx *context.Context, option api.CreateTrainJobOption) { + + displayJobName := option.DisplayJobName + jobName := util.ConvertDisplayJobNameToJobName(displayJobName) + uuid := option.Attachment + description := option.Description + bootFile := strings.TrimSpace(option.BootFile) + params := option.Params + repo := ctx.Repo.Repository + codeLocalPath := setting.JobPath + jobName + modelarts.CodePath + codeObsPath := grampus.JobPath + jobName + modelarts.CodePath + branchName := option.BranchName + isLatestVersion := modelarts.IsLatestVersion + versionCount := modelarts.VersionCountOne + engineName := option.Image + + lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) + defer lock.UnLock() + spec, datasetInfos, datasetNames, err := checkParameters(ctx, option, lock, repo) + if err != nil { + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + + //prepare code and out path + _, err = ioutil.ReadDir(codeLocalPath) + if err == nil { + os.RemoveAll(codeLocalPath) + } + + if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { + log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + //todo: upload code (send to file_server todo this work?) + if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil { + log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { + log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed"))) + return + } + + var datasetRemotePath, allFileName string + for _, datasetInfo := range datasetInfos { + if datasetRemotePath == "" { + datasetRemotePath = datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'" + allFileName = datasetInfo.FullName + } else { + datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'" + allFileName = allFileName + ";" + datasetInfo.FullName + } + + } + + //prepare command + preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName) + command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, option.CkptName) + if err != nil { + log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage("Create task failed, internal error")) + return + } + + commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) + + req := &grampus.GenerateTrainJobReq{ + JobName: jobName, + DisplayJobName: displayJobName, + ComputeResource: models.NPUResource, + ProcessType: grampus.ProcessorTypeNPU, + Command: command, + ImageId: option.ImageID, + Description: description, + CodeObsPath: codeObsPath, + BootFileUrl: codeObsPath + bootFile, + BootFile: bootFile, + WorkServerNumber: option.WorkServerNumber, + Uuid: uuid, + CommitID: commitID, + IsLatestVersion: isLatestVersion, + BranchName: branchName, + Params: option.Params, + EngineName: engineName, + VersionCount: versionCount, + TotalVersionCount: modelarts.TotalVersionCount, + DatasetNames: datasetNames, + DatasetInfos: datasetInfos, + Spec: spec, + CodeName: strings.ToLower(repo.Name), + } + if option.ModelName != "" { //使用预训练模型训练 + req.ModelName = option.ModelName + req.LabelName = option.LabelName + req.CkptName = option.CkptName + req.ModelVersion = option.ModelVersion + req.PreTrainModelUrl = option.PreTrainModelUrl + req.PreTrainModelPath = preTrainModelPath + } + + jobId, err := grampus.GenerateTrainJob(ctx, req) + if err != nil { + log.Error("GenerateTrainJob failed:%v", err.Error()) + + ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error())) + return + } + ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId}) +} + +func obsMkdir(dir string) error { + input := &obs.PutObjectInput{} + input.Bucket = setting.Bucket + input.Key = dir + _, err := storage.ObsCli.PutObject(input) + if err != nil { + log.Error("PutObject(%s) failed: %s", input.Key, err.Error()) + return err + } + + return nil +} +func uploadCodeToObs(codePath, jobName, parentDir string) error { + files, err := readDir(codePath) + if err != nil { + log.Error("readDir(%s) failed: %s", codePath, err.Error()) + return err + } + + for _, file := range files { + if file.IsDir() { + input := &obs.PutObjectInput{} + input.Bucket = setting.Bucket + input.Key = parentDir + file.Name() + "/" + _, err = storage.ObsCli.PutObject(input) + if err != nil { + log.Error("PutObject(%s) failed: %s", input.Key, err.Error()) + return err + } + + if err = uploadCodeToObs(codePath+file.Name()+"/", jobName, parentDir+file.Name()+"/"); err != nil { + log.Error("uploadCodeToObs(%s) failed: %s", file.Name(), err.Error()) + return err + } + } else { + input := &obs.PutFileInput{} + input.Bucket = setting.Bucket + input.Key = setting.CodePathPrefix + jobName + "/code/" + parentDir + file.Name() + input.SourceFile = codePath + file.Name() + _, err = storage.ObsCli.PutFile(input) + if err != nil { + log.Error("PutFile(%s) failed: %s", input.SourceFile, err.Error()) + return err + } + } + } + + return nil +} + +func grampusParamCheckCreateTrainJob(bootFile string, branchName string) error { + if !strings.HasSuffix(strings.TrimSpace(bootFile), ".py") { + log.Error("the boot file(%s) must be a python file", bootFile) + return errors.New("启动文件必须是python文件") + } + + if branchName == "" { + log.Error("the branch must not be null!", branchName) + return errors.New("代码分支不能为空!") + } + + return nil +} +func downloadZipCode(ctx *context.Context, codePath, branchName string) error { + archiveType := git.ZIP + archivePath := codePath + + if !com.IsDir(archivePath) { + if err := os.MkdirAll(archivePath, os.ModePerm); err != nil { + log.Error("MkdirAll failed:" + err.Error()) + return err + } + } + + // Get corresponding commit. + var ( + commit *git.Commit + err error + ) + + gitRepo := ctx.Repo.GitRepo + if err != nil { + log.Error("OpenRepository failed:" + err.Error()) + return err + } + + if gitRepo.IsBranchExist(branchName) { + commit, err = gitRepo.GetBranchCommit(branchName) + if err != nil { + log.Error("GetBranchCommit failed:" + err.Error()) + return err + } + } else { + log.Error("the branch is not exist: " + branchName) + return fmt.Errorf("The branch does not exist.") + } + + archivePath = path.Join(archivePath, grampus.CodeArchiveName) + if !com.IsFile(archivePath) { + if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{ + Format: archiveType, + Prefix: setting.Repository.PrefixArchiveFiles, + }); err != nil { + log.Error("CreateArchive failed:" + err.Error()) + return err + } + } + + return nil +} + +func uploadCodeToMinio(codePath, jobName, parentDir string) error { + files, err := readDir(codePath) + if err != nil { + log.Error("readDir(%s) failed: %s", codePath, err.Error()) + return err + } + + for _, file := range files { + if file.IsDir() { + if err = uploadCodeToMinio(codePath+file.Name()+"/", jobName, parentDir+file.Name()+"/"); err != nil { + log.Error("uploadCodeToMinio(%s) failed: %s", file.Name(), err.Error()) + return err + } + } else { + destObject := setting.CBCodePathPrefix + jobName + parentDir + file.Name() + sourceFile := codePath + file.Name() + err = storage.Attachments.UploadObject(destObject, sourceFile) + if err != nil { + log.Error("UploadObject(%s) failed: %s", file.Name(), err.Error()) + return err + } + } + } + + return nil +} + +func readDir(dirname string) ([]os.FileInfo, error) { + f, err := os.Open(dirname) + if err != nil { + return nil, err + } + + list, err := f.Readdir(0) + f.Close() + if err != nil { + //todo: can not upload empty folder + if err == io.EOF { + return nil, nil + } + return nil, err + } + + //sort.Slice(list, func(i, j int) bool { return list[i].Name() < list[j].Name() }) + return list, nil +} +func mkModelPath(modelPath string) error { + return mkPathAndReadMeFile(modelPath, "You can put the files into this directory and download the files by the web page.") +} + +func mkPathAndReadMeFile(path string, text string) error { + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + log.Error("MkdirAll(%s) failed:%v", path, err) + return err + } + + fileName := path + "README" + f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) + if err != nil { + log.Error("OpenFile failed", err.Error()) + return err + } + + defer f.Close() + + _, err = f.WriteString(text) + if err != nil { + log.Error("WriteString failed", err.Error()) + return err + } + + return nil +} + +func getPreTrainModelPath(pretrainModelDir string, fileName string) string { + index := strings.Index(pretrainModelDir, "/") + if index > 0 { + filterBucket := pretrainModelDir[index+1:] + return filterBucket + fileName + } else { + return "" + } + +} + +func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName string) (string, error) { + var command string + + workDir := grampus.NpuWorkDir + if processorType == grampus.ProcessorTypeGPU { + workDir = grampus.GpuWorkDir + } + + command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScript, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject) + //download code & dataset + if processorType == grampus.ProcessorTypeNPU { + //no need to download code & dataset by internet + } else if processorType == grampus.ProcessorTypeGPU { + commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " '" + dataRemotePath + "' '" + datasetName + "'" + commandDownload = processPretrainModelParameter(pretrainModelPath, pretrainModelFileName, commandDownload) + command += commandDownload + } + + //unzip code & dataset + if processorType == grampus.ProcessorTypeNPU { + //no need to process + } else if processorType == grampus.ProcessorTypeGPU { + unZipDatasetCommand := generateDatasetUnzipCommand(datasetName) + commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + unZipDatasetCommand + command += commandUnzip + } + + command += "echo \"unzip finished;start to exec code;\";" + + // set export + var commandExport string + if processorType == grampus.ProcessorTypeNPU { + commandExport = "export bucket=" + setting.Bucket + " && export remote_path=" + outputRemotePath + ";" + } else if processorType == grampus.ProcessorTypeGPU { + commandExport = "export env=" + setting.Grampus.Env + " && export remote_path=" + outputRemotePath + ";" + } + + command += commandExport + + //exec code + var parameters models.Parameters + var paramCode string + + if len(paramSrc) != 0 { + err := json.Unmarshal([]byte(paramSrc), ¶meters) + if err != nil { + log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err) + return command, err + } + + for _, parameter := range parameters.Parameter { + paramCode += " --" + parameter.Label + "=" + parameter.Value + } + } + + var commandCode string + if processorType == grampus.ProcessorTypeNPU { + commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py /tmp/log/train.log" + paramCode + ";" + } else if processorType == grampus.ProcessorTypeGPU { + if pretrainModelFileName != "" { + paramCode += " --ckpt_url" + "=" + workDir + "pretrainmodel/" + pretrainModelFileName + } + commandCode = "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";" + } + + command += commandCode + + //get exec result + commandGetRes := "result=$?;" + command += commandGetRes + + //upload models + if processorType == grampus.ProcessorTypeNPU { + commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_npu " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;" + command += commandUpload + } else if processorType == grampus.ProcessorTypeGPU { + commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;" + command += commandUpload + } + + //check exec result + commandCheckRes := "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1\"" + command += commandCheckRes + + return command, nil +} + +func processPretrainModelParameter(pretrainModelPath string, pretrainModelFileName string, commandDownload string) string { + commandDownloadTemp := commandDownload + if pretrainModelPath != "" { + commandDownloadTemp += " '" + pretrainModelPath + "' '" + pretrainModelFileName + "'" + } + commandDownloadTemp += ";" + return commandDownloadTemp +} + +func generateDatasetUnzipCommand(datasetName string) string { + var unZipDatasetCommand string + + datasetNameArray := strings.Split(datasetName, ";") + if len(datasetNameArray) == 1 { //单数据集 + unZipDatasetCommand = "unzip -q '" + datasetName + "';" + if strings.HasSuffix(datasetNameArray[0], ".tar.gz") { + unZipDatasetCommand = "tar --strip-components=1 -zxvf '" + datasetName + "';" + } + + } else { //多数据集 + for _, datasetNameTemp := range datasetNameArray { + if strings.HasSuffix(datasetNameTemp, ".tar.gz") { + unZipDatasetCommand = unZipDatasetCommand + "tar -zxvf '" + datasetNameTemp + "';" + } else { + unZipDatasetCommand = unZipDatasetCommand + "unzip -q '" + datasetNameTemp + "' -d './" + strings.TrimSuffix(datasetNameTemp, ".zip") + "';" + } + } + + } + return unZipDatasetCommand +} + +func getPoolId() string { + var resourcePools modelarts.ResourcePool + json.Unmarshal([]byte(setting.ResourcePools), &resourcePools) + + return resourcePools.Info[0].ID +}