Browse Source

Merge branch 'api' of https://git.openi.org.cn/OpenI/aiforge into api

api
zouap 2 years ago
parent
commit
6b5badb07e
14 changed files with 1755 additions and 279 deletions
  1. +21
    -159
      models/attachment.go
  2. +2
    -2
      models/base_message.go
  3. +20
    -20
      models/dataset.go
  4. +60
    -60
      models/repo.go
  5. +20
    -23
      modules/auth/modelarts.go
  6. +6
    -6
      modules/cloudbrain/cloudbrain.go
  7. +4
    -4
      modules/grampus/grampus.go
  8. +5
    -5
      modules/modelarts/modelarts.go
  9. +27
    -0
      routers/api/v1/api.go
  10. +26
    -0
      routers/api/v1/repo/cloudbrain.go
  11. +114
    -0
      routers/api/v1/repo/datasets.go
  12. +141
    -0
      routers/api/v1/repo/images.go
  13. +631
    -0
      services/cloudbrain/cloudbrainTask/inference.go
  14. +678
    -0
      services/cloudbrain/cloudbrainTask/train.go

+ 21
- 159
models/attachment.go View File

@@ -33,27 +33,27 @@ const (

// Attachment represent a attachment of issue/comment/release.
type Attachment struct {
ID int64 `xorm:"pk autoincr"`
UUID string `xorm:"uuid UNIQUE"`
IssueID int64 `xorm:"INDEX"`
DatasetID int64 `xorm:"INDEX DEFAULT 0"`
ReleaseID int64 `xorm:"INDEX"`
UploaderID int64 `xorm:"INDEX DEFAULT 0"` // Notice: will be zero before this column added
CommentID int64
Name string
Description string `xorm:"TEXT"`
DownloadCount int64 `xorm:"DEFAULT 0"`
UseNumber int64 `xorm:"DEFAULT 0"`
Size int64 `xorm:"DEFAULT 0"`
IsPrivate bool `xorm:"DEFAULT false"`
DecompressState int32 `xorm:"DEFAULT 0"`
Type int `xorm:"DEFAULT 0"`
CreatedUnix timeutil.TimeStamp `xorm:"created"`
FileChunk *FileChunk `xorm:"-"`
CanDel bool `xorm:"-"`
Uploader *User `xorm:"-"`
Md5 string `xorm:"-"`
ID int64 `xorm:"pk autoincr" json:"id"`
UUID string `xorm:"uuid UNIQUE" json:"uuid"`
IssueID int64 `xorm:"INDEX" json:"issueId"`
DatasetID int64 `xorm:"INDEX DEFAULT 0" json:"datasetId"`
ReleaseID int64 `xorm:"INDEX" json:"releaseId"`
UploaderID int64 `xorm:"INDEX DEFAULT 0" json:"uploaderId"` // Notice: will be zero before this column added
CommentID int64 `json:"commentId"`
Name string `json:"name"`
Description string `xorm:"TEXT" json:"description"`
DownloadCount int64 `xorm:"DEFAULT 0" json:"downloadCount"`
UseNumber int64 `xorm:"DEFAULT 0" json:"useNumber"`
Size int64 `xorm:"DEFAULT 0" json:"size"`
IsPrivate bool `xorm:"DEFAULT false" json:"isPrivate"`
DecompressState int32 `xorm:"DEFAULT 0" json:"decompressState"`
Type int `xorm:"DEFAULT 0" json:"type"`
CreatedUnix timeutil.TimeStamp `xorm:"created" json:"createdUnix"`
FileChunk *FileChunk `xorm:"-" json:"fileChunk"`
CanDel bool `xorm:"-" json:"canDel"`
Uploader *User `xorm:"-" json:"uploader"`
Md5 string `xorm:"-" json:"md5"`
}

type AttachmentUsername struct {
@@ -61,30 +61,6 @@ type AttachmentUsername struct {
Name string
}

type AttachmentInfo struct {
Attachment `xorm:"extends"`
Repo *Repository `xorm:"extends"`
RelAvatarLink string `xorm:"extends"`
UserName string `xorm:"extends"`
Recommend bool `xorm:"-"`
}

type AttachmentsOptions struct {
ListOptions
DatasetIDs []int64
DecompressState int
Type int
UploaderID int64
NeedDatasetIDs bool
NeedIsPrivate bool
IsPrivate bool
JustNeedZipFile bool
NeedRepoInfo bool
Keyword string
RecommendOnly bool
UserId int64
}

func (a *Attachment) AfterUpdate() {
if a.DatasetID > 0 {
datasetIsPublicCount, err := x.Where("dataset_id = ? AND is_private = ?", a.DatasetID, false).Count(new(Attachment))
@@ -493,19 +469,6 @@ func getPrivateAttachments(e Engine, userID int64) ([]*AttachmentUsername, error
return attachments, nil
}

func getAllUserAttachments(e Engine, userID int64) ([]*AttachmentUsername, error) {
attachments := make([]*AttachmentUsername, 0, 10)
if err := e.Table("attachment").Join("LEFT", "`user`", "attachment.uploader_id "+
"= `user`.id").Where("decompress_state= ? and attachment.type = ? and (uploader_id= ? or is_private = ?)", DecompressStateDone, TypeCloudBrainOne, userID, false).Find(&attachments); err != nil {
return nil, err
}
return attachments, nil
}

func GetAllUserAttachments(userID int64) ([]*AttachmentUsername, error) {
return getAllUserAttachments(x, userID)
}

func getModelArtsUserAttachments(e Engine, userID int64) ([]*AttachmentUsername, error) {
attachments := make([]*AttachmentUsername, 0, 10)
if err := e.Table("attachment").Join("LEFT", "`user`", "attachment.uploader_id "+
@@ -601,107 +564,6 @@ func GetAllAttachmentSize() (int64, error) {
return x.SumInt(&Attachment{}, "size")
}

func Attachments(opts *AttachmentsOptions) ([]*AttachmentInfo, int64, error) {
sess := x.NewSession()
defer sess.Close()

var cond = builder.NewCond()
if opts.NeedDatasetIDs {
cond = cond.And(
builder.In("attachment.dataset_id", opts.DatasetIDs),
)
}

if opts.UploaderID > 0 {
cond = cond.And(
builder.Eq{"attachment.uploader_id": opts.UploaderID},
)
}

if (opts.Type) >= 0 {
cond = cond.And(
builder.Eq{"attachment.type": opts.Type},
)
}

if opts.NeedIsPrivate {
cond = cond.And(
builder.Eq{"attachment.is_private": opts.IsPrivate},
)
}
if opts.RecommendOnly {
cond = cond.And(builder.In("attachment.id", builder.Select("attachment.id").
From("attachment").
Join("INNER", "dataset", "attachment.dataset_id = dataset.id and dataset.recommend=true")))
}

if opts.JustNeedZipFile {
var DecompressState []int32
DecompressState = append(DecompressState, DecompressStateDone, DecompressStateIng, DecompressStateFailed)
cond = cond.And(
builder.In("attachment.decompress_state", DecompressState),
)
}

var count int64
var err error
if len(opts.Keyword) == 0 {
count, err = sess.Where(cond).Count(new(Attachment))
} else {
lowerKeyWord := strings.ToLower(opts.Keyword)

cond = cond.And(builder.Or(builder.Like{"LOWER(attachment.name)", lowerKeyWord}, builder.Like{"LOWER(attachment.description)", lowerKeyWord}))
count, err = sess.Table(&Attachment{}).Where(cond).Count(new(AttachmentInfo))

}

if err != nil {
return nil, 0, fmt.Errorf("Count: %v", err)
}

if opts.Page >= 0 && opts.PageSize > 0 {
var start int
if opts.Page == 0 {
start = 0
} else {
start = (opts.Page - 1) * opts.PageSize
}
sess.Limit(opts.PageSize, start)
}

sess.OrderBy("attachment.created_unix DESC")
attachments := make([]*AttachmentInfo, 0, setting.UI.DatasetPagingNum)
if err := sess.Table(&Attachment{}).Where(cond).
Find(&attachments); err != nil {
return nil, 0, fmt.Errorf("Find: %v", err)
}

if opts.NeedRepoInfo {
for _, attachment := range attachments {
dataset, err := GetDatasetByID(attachment.DatasetID)
if err != nil {
return nil, 0, fmt.Errorf("GetDatasetByID failed error: %v", err)
}
attachment.Recommend = dataset.Recommend
repo, err := GetRepositoryByID(dataset.RepoID)
if err == nil {
attachment.Repo = repo
} else {
return nil, 0, fmt.Errorf("GetRepositoryByID failed error: %v", err)
}
user, err := GetUserByID(attachment.UploaderID)
if err == nil {
attachment.RelAvatarLink = user.RelAvatarLink()
attachment.UserName = user.Name
} else {
return nil, 0, fmt.Errorf("GetUserByID failed error: %v", err)
}
}
}

return attachments, count, nil
}

func GetAllDatasetContributorByDatasetId(datasetId int64) ([]*User, error) {
r := make([]*User, 0)
if err := x.Select("distinct(public.user.*)").Table("attachment").Join("LEFT", "user", "public.user.ID = attachment.uploader_id").Where("attachment.dataset_id = ?", datasetId).Find(&r); err != nil {


+ 2
- 2
models/base_message.go View File

@@ -1,8 +1,8 @@
package models

type BaseMessage struct {
Code int
Message string
Code int `json:"code"`
Message string `json:"message"`
}

var BaseOKMessage = BaseMessage{


+ 20
- 20
models/dataset.go View File

@@ -21,26 +21,26 @@ const (
)

type Dataset struct {
ID int64 `xorm:"pk autoincr"`
Title string `xorm:"INDEX NOT NULL"`
Status int32 `xorm:"INDEX"` // normal_private: 0, pulbic: 1, is_delete: 2
Category string
Description string `xorm:"TEXT"`
DownloadTimes int64
UseCount int64 `xorm:"DEFAULT 0"`
NumStars int `xorm:"INDEX NOT NULL DEFAULT 0"`
Recommend bool `xorm:"INDEX NOT NULL DEFAULT false"`
License string
Task string
ReleaseID int64 `xorm:"INDEX"`
UserID int64 `xorm:"INDEX"`
RepoID int64 `xorm:"INDEX"`
Repo *Repository `xorm:"-"`
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"`
User *User `xorm:"-"`
Attachments []*Attachment `xorm:"-"`
ID int64 `xorm:"pk autoincr" json:"id"`
Title string `xorm:"INDEX NOT NULL" json:"title"`
Status int32 `xorm:"INDEX" json:"status"` // normal_private: 0, pulbic: 1, is_delete: 2
Category string `json:"category"`
Description string `xorm:"TEXT" json:"description"`
DownloadTimes int64 `json:"downloadTimes"`
UseCount int64 `xorm:"DEFAULT 0" json:"useCount"`
NumStars int `xorm:"INDEX NOT NULL DEFAULT 0" json:"numStars"`
Recommend bool `xorm:"INDEX NOT NULL DEFAULT false" json:"recommend"`
License string `json:"license"`
Task string `json:"task"`
ReleaseID int64 `xorm:"INDEX" json:"releaseId"`
UserID int64 `xorm:"INDEX" json:"userId"`
RepoID int64 `xorm:"INDEX" json:"repoId"`
Repo *Repository `xorm:"-" json:"repo"`
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created" json:"createdUnix"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated" json:"updatedUnix"`
User *User `xorm:"-" json:"user"`
Attachments []*Attachment `xorm:"-" json:"attachments"`
}

type DatasetWithStar struct {


+ 60
- 60
models/repo.go View File

@@ -162,79 +162,79 @@ const (

// Repository represents a git repository.
type Repository struct {
ID int64 `xorm:"pk autoincr"`
OwnerID int64 `xorm:"UNIQUE(s) index"`
OwnerName string
Owner *User `xorm:"-"`
LowerName string `xorm:"UNIQUE(s) INDEX NOT NULL"`
Name string `xorm:"INDEX NOT NULL"`
Description string `xorm:"TEXT"`
Website string `xorm:"VARCHAR(2048)"`
OriginalServiceType api.GitServiceType `xorm:"index"`
OriginalURL string `xorm:"VARCHAR(2048)"`
DefaultBranch string
CreatorID int64 `xorm:"INDEX NOT NULL DEFAULT 0"`
Creator *User `xorm:"-"`
NumWatches int
NumStars int
NumForks int
NumIssues int
NumClosedIssues int
NumOpenIssues int `xorm:"-"`
NumPulls int
NumClosedPulls int
NumOpenPulls int `xorm:"-"`
NumMilestones int `xorm:"NOT NULL DEFAULT 0"`
NumClosedMilestones int `xorm:"NOT NULL DEFAULT 0"`
NumOpenMilestones int `xorm:"-"`
NumCommit int64 `xorm:"NOT NULL DEFAULT 0"`
RepoType RepoType `xorm:"NOT NULL DEFAULT 0"`
IsPrivate bool `xorm:"INDEX"`
IsEmpty bool `xorm:"INDEX"`
IsArchived bool `xorm:"INDEX"`
IsMirror bool `xorm:"INDEX"`
ID int64 `xorm:"pk autoincr" json:"id"`
OwnerID int64 `xorm:"UNIQUE(s) index" json:"ownerId"`
OwnerName string `json:"ownerName"`
Owner *User `xorm:"-" json:"owner"`
LowerName string `xorm:"UNIQUE(s) INDEX NOT NULL" json:"lowerName"`
Name string `xorm:"INDEX NOT NULL" json:"name"`
Description string `xorm:"TEXT" json:"description"`
Website string `xorm:"VARCHAR(2048)" json:"website"`
OriginalServiceType api.GitServiceType `xorm:"index" json:"originalServiceType"`
OriginalURL string `xorm:"VARCHAR(2048)" json:"originalUrl"`
DefaultBranch string `json:"defaultBranch"`
CreatorID int64 `xorm:"INDEX NOT NULL DEFAULT 0" json:"creatorId"`
Creator *User `xorm:"-" json:"creator"`
NumWatches int `json:"numWatches"`
NumStars int `json:"numStars"`
NumForks int `json:"numForks"`
NumIssues int `json:"numIssues"`
NumClosedIssues int `json:"numClosedIssues"`
NumOpenIssues int `xorm:"-" json:"numOpenIssues"`
NumPulls int `json:"numPulls"`
NumClosedPulls int `json:"numClosedPulls"`
NumOpenPulls int `xorm:"-" json:"numOpenPulls"`
NumMilestones int `xorm:"NOT NULL DEFAULT 0" json:"numMilestones"`
NumClosedMilestones int `xorm:"NOT NULL DEFAULT 0" json:"numClosedMilestones"`
NumOpenMilestones int `xorm:"-" json:"numOpenMilestones"`
NumCommit int64 `xorm:"NOT NULL DEFAULT 0" json:"numCommit"`
RepoType RepoType `xorm:"NOT NULL DEFAULT 0" json:"repoType"`
IsPrivate bool `xorm:"INDEX" json:"isPrivate"`
IsEmpty bool `xorm:"INDEX" json:"isEmpty"`
IsArchived bool `xorm:"INDEX" json:"isArchived"`
IsMirror bool `xorm:"INDEX" json:"isMirror"`
*Mirror `xorm:"-"`
Status RepositoryStatus `xorm:"NOT NULL DEFAULT 0"`
Status RepositoryStatus `xorm:"NOT NULL DEFAULT 0" json:"status"`

RenderingMetas map[string]string `xorm:"-"`
Units []*RepoUnit `xorm:"-"`
PrimaryLanguage *LanguageStat `xorm:"-"`
IsFork bool `xorm:"INDEX NOT NULL DEFAULT false"`
ForkID int64 `xorm:"INDEX"`
BaseRepo *Repository `xorm:"-"`
IsTemplate bool `xorm:"INDEX NOT NULL DEFAULT false"`
TemplateID int64 `xorm:"INDEX"`
TemplateRepo *Repository `xorm:"-"`
Size int64 `xorm:"NOT NULL DEFAULT 0"`
CodeIndexerStatus *RepoIndexerStatus `xorm:"-"`
StatsIndexerStatus *RepoIndexerStatus `xorm:"-"`
IsFsckEnabled bool `xorm:"NOT NULL DEFAULT true"`
CloseIssuesViaCommitInAnyBranch bool `xorm:"NOT NULL DEFAULT false"`
Topics []string `xorm:"TEXT JSON"`
Units []*RepoUnit `xorm:"-" json:"units"`
PrimaryLanguage *LanguageStat `xorm:"-" json:"primaryLanguage"`
IsFork bool `xorm:"INDEX NOT NULL DEFAULT false" json:"isFork"`
ForkID int64 `xorm:"INDEX" json:"forkId"`
BaseRepo *Repository `xorm:"-" json:"baseRepo"`
IsTemplate bool `xorm:"INDEX NOT NULL DEFAULT false" json:"isTemplate"`
TemplateID int64 `xorm:"INDEX" json:"templateId"`
TemplateRepo *Repository `xorm:"-" json:"templateRepo"`
Size int64 `xorm:"NOT NULL DEFAULT 0" json:"size"`
CodeIndexerStatus *RepoIndexerStatus `xorm:"-" json:"codeIndexerStatus"`
StatsIndexerStatus *RepoIndexerStatus `xorm:"-" json:"statsIndexerStatus"`
IsFsckEnabled bool `xorm:"NOT NULL DEFAULT true" json:"isFsckEnabled"`
CloseIssuesViaCommitInAnyBranch bool `xorm:"NOT NULL DEFAULT false" json:"closeIssuesViaCommitInAnyBranch"`
Topics []string `xorm:"TEXT JSON" json:"topics"`

// Avatar: ID(10-20)-md5(32) - must fit into 64 symbols
Avatar string `xorm:"VARCHAR(64)"`
Avatar string `xorm:"VARCHAR(64)" json:"avatar"`

//blockchain
ContractAddress string `xorm:"INDEX"`
Balance string `xorm:"NOT NULL DEFAULT '0'"`
BlockChainStatus RepoBlockChainStatus `xorm:"NOT NULL DEFAULT 0"`
ContractAddress string `xorm:"INDEX" json:"contractAddress"`
Balance string `xorm:"NOT NULL DEFAULT '0'" json:"balance"`
BlockChainStatus RepoBlockChainStatus `xorm:"NOT NULL DEFAULT 0" json:"blockChainStatus"`

// git clone and git pull total count
CloneCnt int64 `xorm:"NOT NULL DEFAULT 0"`
CloneCnt int64 `xorm:"NOT NULL DEFAULT 0" json:"clone_cnt" json:"cloneCnt"`

// only git clone total count
GitCloneCnt int64 `xorm:"NOT NULL DEFAULT 0"`
GitCloneCnt int64 `xorm:"NOT NULL DEFAULT 0" json:"git_clone_cnt" json:"gitCloneCnt"`

CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"`
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created" json:"createdUnix"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated" json:"updatedUnix"`

Hot int64 `xorm:"-"`
Active int64 `xorm:"-"`
Alias string `xorm:"INDEX"`
LowerAlias string `xorm:"INDEX"`
Hot int64 `xorm:"-" json:"hot"`
Active int64 `xorm:"-" json:"active"`
Alias string `xorm:"INDEX" json:"alias"`
LowerAlias string `xorm:"INDEX" json:"lowerAlias"`
}

type RepositoryShow struct {


+ 20
- 23
modules/auth/modelarts.go View File

@@ -57,29 +57,26 @@ type CreateModelArtsTrainJobForm struct {
}

type CreateModelArtsInferenceJobForm struct {
DisplayJobName string `form:"display_job_name" binding:"Required"`
JobName string `form:"job_name" binding:"Required"`
Attachment string `form:"attachment" binding:"Required"`
BootFile string `form:"boot_file" binding:"Required"`
WorkServerNumber int `form:"work_server_number" binding:"Required"`
EngineID int `form:"engine_id" binding:"Required"`
PoolID string `form:"pool_id" binding:"Required"`
Flavor string `form:"flavor" binding:"Required"`
Params string `form:"run_para_list" binding:"Required"`
Description string `form:"description"`
IsSaveParam string `form:"is_save_para"`
ParameterTemplateName string `form:"parameter_template_name"`
PrameterDescription string `form:"parameter_description"`
BranchName string `form:"branch_name" binding:"Required"`
VersionName string `form:"version_name" binding:"Required"`
FlavorName string `form:"flaver_names" binding:"Required"`
EngineName string `form:"engine_names" binding:"Required"`
LabelName string `form:"label_names" binding:"Required"`
TrainUrl string `form:"train_url" binding:"Required"`
ModelName string `form:"model_name" binding:"Required"`
ModelVersion string `form:"model_version" binding:"Required"`
CkptName string `form:"ckpt_name" binding:"Required"`
SpecId int64 `form:"spec_id" binding:"Required"`
DisplayJobName string `form:"display_job_name" binding:"Required"`
JobName string `form:"job_name" binding:"Required"`
Attachment string `form:"attachment" binding:"Required"`
BootFile string `form:"boot_file" binding:"Required"`
WorkServerNumber int `form:"work_server_number" binding:"Required"`
EngineID int `form:"engine_id" binding:"Required"`
PoolID string `form:"pool_id" binding:"Required"`
Flavor string `form:"flavor" binding:"Required"`
Params string `form:"run_para_list" binding:"Required"`
Description string `form:"description"`
BranchName string `form:"branch_name" binding:"Required"`
VersionName string `form:"version_name" binding:"Required"`
FlavorName string `form:"flaver_names" binding:"Required"`
EngineName string `form:"engine_names" binding:"Required"`
LabelName string `form:"label_names" binding:"Required"`
TrainUrl string `form:"train_url" binding:"Required"`
ModelName string `form:"model_name" binding:"Required"`
ModelVersion string `form:"model_version" binding:"Required"`
CkptName string `form:"ckpt_name" binding:"Required"`
SpecId int64 `form:"spec_id" binding:"Required"`
}

func (f *CreateModelArtsTrainJobForm) Validate(ctx *macaron.Context, errs binding.Errors) binding.Errors {


+ 6
- 6
modules/cloudbrain/cloudbrain.go View File

@@ -228,7 +228,7 @@ func AdminOrImageCreaterRight(ctx *context.Context) {

}

func GenerateTask(req GenerateCloudBrainTaskReq) error {
func GenerateTask(req GenerateCloudBrainTaskReq) (string, error) {
var versionCount int
if req.JobType == string(models.JobTypeTrain) {
versionCount = 1
@@ -335,11 +335,11 @@ func GenerateTask(req GenerateCloudBrainTaskReq) error {
})
if err != nil {
log.Error("CreateJob failed:", err.Error(), req.Ctx.Data["MsgID"])
return err
return "", err
}
if jobResult.Code != Success {
log.Error("CreateJob(%s) failed:%s", req.JobName, jobResult.Msg, req.Ctx.Data["MsgID"])
return errors.New(jobResult.Msg)
return "", errors.New(jobResult.Msg)
}

var jobID = jobResult.Payload["jobId"].(string)
@@ -380,13 +380,13 @@ func GenerateTask(req GenerateCloudBrainTaskReq) error {
})

if err != nil {
return err
return "", err
}

task, err := models.GetCloudbrainByJobID(jobID)
if err != nil {
log.Error("GetCloudbrainByJobID failed: %v", err.Error())
return err
return "", err
}

stringId := strconv.FormatInt(task.ID, 10)
@@ -401,7 +401,7 @@ func GenerateTask(req GenerateCloudBrainTaskReq) error {
notification.NotifyOtherTask(req.Ctx.User, req.Ctx.Repo.Repository, stringId, req.DisplayJobName, models.ActionCreateDebugGPUTask)
}

return nil
return jobID, nil
}

func IsBenchmarkJob(jobType string) bool {


+ 4
- 4
modules/grampus/grampus.go View File

@@ -98,7 +98,7 @@ func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.Gram
return datasetGrampus
}

func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) {
func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
createTime := timeutil.TimeStampNow()

centerID, centerName := getCentersParamter(ctx, req)
@@ -146,7 +146,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error
})
if err != nil {
log.Error("createJob failed: %v", err.Error())
return err
return "", err
}

jobID := jobResult.JobInfo.JobID
@@ -187,7 +187,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error

if err != nil {
log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
return err
return "", err
}

var actionType models.ActionType
@@ -198,7 +198,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error
}
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)

return nil
return jobID, nil
}

func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {


+ 5
- 5
modules/modelarts/modelarts.go View File

@@ -680,7 +680,7 @@ func GetOutputPathByCount(TotalVersionCount int) (VersionOutputPath string) {
return VersionOutputPath
}

func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (err error) {
func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (jobId string, err error) {
createTime := timeutil.TimeStampNow()
jobResult, err := createInferenceJob(models.CreateInferenceJobParams{
JobName: req.JobName,
@@ -715,10 +715,10 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e
})
if err != nil {
log.Error("InsertCloudbrainTemp failed: %v", err.Error())
return err
return "", err
}
}
return err
return "", err
}

// attach, err := models.GetAttachmentByUUID(req.Uuid)
@@ -769,7 +769,7 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e

if err != nil {
log.Error("CreateCloudbrain(%s) failed:%v", req.JobName, err.Error())
return err
return "", err
}
if req.JobType == string(models.JobTypeModelSafety) {
task, err := models.GetCloudbrainByJobID(jobID)
@@ -780,7 +780,7 @@ func GenerateInferenceJob(ctx *context.Context, req *GenerateInferenceJobReq) (e
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, models.ActionCreateInferenceTask)
}

return nil
return jobID, nil
}

func GetNotebookImageName(imageId string) (string, error) {


+ 27
- 0
routers/api/v1/api.go View File

@@ -517,6 +517,15 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Post("/markdown", bind(api.MarkdownOption{}), misc.Markdown)
m.Post("/markdown/raw", misc.MarkdownRaw)

m.Group("/images", func() {

m.Get("public", repo.GetPublicImages)
m.Get("custom", repo.GetCustomImages)
m.Get("star", repo.GetStarImages)
m.Get("npu", repo.GetNpuImages)

}, reqToken())

// Notifications
m.Group("/notifications", func() {
m.Combo("").
@@ -696,6 +705,13 @@ func RegisterRoutes(m *macaron.Macaron) {

m.Combo("/repositories/:id", reqToken()).Get(repo.GetByID)

m.Group("/:username/:reponame/datasets", func() {
m.Get("/current_repo", repo.CurrentRepoDatasetMultiple)
m.Get("/my_datasets", repo.MyDatasetsMultiple)
m.Get("/public_datasets", repo.PublicDatasetMultiple)
m.Get("/my_favorite", repo.MyFavoriteDatasetMultiple)
}, reqToken())

m.Group("/repos", func() {
m.Get("/search", repo.Search)

@@ -955,6 +971,17 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Get("/:id/modelartlog", repo.TrainJobForModelConvertGetLog)
m.Get("/:id/model_list", repo.CloudBrainModelConvertList)
}, reqRepoReader(models.UnitTypeModelManage))
m.Group("/cloudbrain", func() {
m.Group("/train-job", func() {
m.Post("/create", bind(api.CreateTrainJobOption{}), repo.CreateCloudBrain)

})
m.Group("/inference-job", func() {
m.Post("/create", bind(api.CreateTrainJobOption{}), repo.CreateCloudBrainInferenceTask)

})

}, reqRepoReader(models.UnitTypeCloudBrain))
m.Group("/modelarts", func() {
m.Group("/notebook", func() {
//m.Get("/:jobid", repo.GetModelArtsNotebook)


+ 26
- 0
routers/api/v1/repo/cloudbrain.go View File

@@ -16,6 +16,10 @@ import (
"strings"
"time"

"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask"

api "code.gitea.io/gitea/modules/structs"

"code.gitea.io/gitea/modules/notification"

"code.gitea.io/gitea/modules/setting"
@@ -29,6 +33,28 @@ import (
routerRepo "code.gitea.io/gitea/routers/repo"
)

func CreateCloudBrain(ctx *context.APIContext, option api.CreateTrainJobOption) {

if option.Type == 2 {
cloudbrainTask.GrampusTrainJobGpuCreate(ctx.Context, option)
}
if option.Type == 3 {
cloudbrainTask.GrampusTrainJobNpuCreate(ctx.Context, option)
}

}

func CreateCloudBrainInferenceTask(ctx *context.APIContext, option api.CreateTrainJobOption) {

if option.Type == 0 {
cloudbrainTask.GrampusTrainJobGpuCreate(ctx.Context, option)
}
if option.Type == 1 {
cloudbrainTask.GrampusTrainJobNpuCreate(ctx.Context, option)
}

}

// cloudbrain get job task by jobid
func GetCloudbrainTask(ctx *context.APIContext) {
// swagger:operation GET /repos/{owner}/{repo}/cloudbrain/{jobid} cloudbrain jobTask


+ 114
- 0
routers/api/v1/repo/datasets.go View File

@@ -0,0 +1,114 @@
package repo

import (
"fmt"
"strings"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)

func PublicDatasetMultiple(ctx *context.APIContext) {

opts := &models.SearchDatasetOptions{
PublicOnly: true,
NeedAttachment: true,
CloudBrainType: ctx.QueryInt("type"),
}
datasetMultiple(ctx, opts)

}

func MyFavoriteDatasetMultiple(ctx *context.APIContext) {

opts := &models.SearchDatasetOptions{
StarByMe: true,
DatasetIDs: models.GetDatasetIdsStarByUser(ctx.User.ID),
NeedAttachment: true,
CloudBrainType: ctx.QueryInt("type"),
}
datasetMultiple(ctx, opts)
}

func CurrentRepoDatasetMultiple(ctx *context.APIContext) {
datasetIds := models.GetDatasetIdsByRepoID(ctx.Repo.Repository.ID)
searchOrderBy := getSearchOrderByInValues(datasetIds)
opts := &models.SearchDatasetOptions{
RepoID: ctx.Repo.Repository.ID,
NeedAttachment: true,
CloudBrainType: ctx.QueryInt("type"),
DatasetIDs: datasetIds,
SearchOrderBy: searchOrderBy,
}

datasetMultiple(ctx, opts)

}

func MyDatasetsMultiple(ctx *context.APIContext) {

opts := &models.SearchDatasetOptions{
UploadAttachmentByMe: true,
NeedAttachment: true,
CloudBrainType: ctx.QueryInt("type"),
}
datasetMultiple(ctx, opts)

}
func datasetMultiple(ctx *context.APIContext, opts *models.SearchDatasetOptions) {
page := ctx.QueryInt("page")
if page < 1 {
page = 1
}
pageSize := ctx.QueryInt("pageSize")
if pageSize < 1 {
pageSize = setting.UI.DatasetPagingNum
}

keyword := strings.Trim(ctx.Query("q"), " ")
opts.Keyword = keyword
if opts.SearchOrderBy.String() == "" {
opts.SearchOrderBy = models.SearchOrderByRecentUpdated
}

opts.RecommendOnly = ctx.QueryBool("recommend")
opts.ListOptions = models.ListOptions{
Page: page,
PageSize: pageSize,
}
opts.JustNeedZipFile = true
opts.User = ctx.User

datasets, count, err := models.SearchDataset(opts)

if err != nil {
log.Error("json.Marshal failed:", err.Error())
ctx.JSON(200, map[string]interface{}{
"code": 1,
"message": err.Error(),
"data": []*models.Dataset{},
"count": 0,
})
return
}
ctx.JSON(200, map[string]interface{}{
"code": 0,
"message": "",
"data": datasets,
"count": count,
})
}

func getSearchOrderByInValues(datasetIds []int64) models.SearchOrderBy {
if len(datasetIds) == 0 {
return ""
}
searchOrderBy := "CASE id "
for i, id := range datasetIds {
searchOrderBy += fmt.Sprintf(" WHEN %d THEN %d", id, i+1)
}
searchOrderBy += " ELSE 0 END"
return models.SearchOrderBy(searchOrderBy)
}

+ 141
- 0
routers/api/v1/repo/images.go View File

@@ -0,0 +1,141 @@
package repo

import (
"encoding/json"
"net/http"
"strconv"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/setting"
)

type NPUImageINFO struct {
ID string `json:"id"`
Value string `json:"value"`
}

func GetPublicImages(ctx *context.APIContext) {
uid := getUID(ctx)
opts := models.SearchImageOptions{
IncludePublicOnly: true,
UID: uid,
Keyword: ctx.Query("q"),
Topics: ctx.Query("topic"),
IncludeOfficialOnly: ctx.QueryBool("recommend"),
SearchOrderBy: "type desc, num_stars desc,id desc",
Status: models.IMAGE_STATUS_SUCCESS,
CloudbrainType: ctx.QueryInt("cloudbrainType"),
}

getImages(ctx, &opts)

}

func GetCustomImages(ctx *context.APIContext) {
uid := getUID(ctx)
opts := models.SearchImageOptions{
UID: uid,
IncludeOwnerOnly: true,
Keyword: ctx.Query("q"),
Topics: ctx.Query("topic"),
Status: -1,
SearchOrderBy: "id desc",
}
getImages(ctx, &opts)

}
func GetStarImages(ctx *context.APIContext) {

uid := getUID(ctx)
opts := models.SearchImageOptions{
UID: uid,
IncludeStarByMe: true,
Keyword: ctx.Query("q"),
Topics: ctx.Query("topic"),
Status: models.IMAGE_STATUS_SUCCESS,
SearchOrderBy: "id desc",
}
getImages(ctx, &opts)

}

func GetNpuImages(ctx *context.APIContext) {
cloudbrainType := ctx.QueryInt("type")
if cloudbrainType == 0 { //modelarts
getModelArtsImages(ctx)
} else { //c2net
getC2netNpuImages(ctx)
}
}

func getModelArtsImages(ctx *context.APIContext) {

var versionInfos modelarts.VersionInfo
_ = json.Unmarshal([]byte(setting.EngineVersions), &versionInfos)
var npuImageInfos []NPUImageINFO
for _, info := range versionInfos.Version {
npuImageInfos = append(npuImageInfos, NPUImageINFO{
ID: strconv.Itoa(info.ID),
Value: info.Value,
})
}
ctx.JSON(http.StatusOK, npuImageInfos)

}

func getC2netNpuImages(ctx *context.APIContext) {
images, err := grampus.GetImages(grampus.ProcessorTypeNPU)
var npuImageInfos []NPUImageINFO
if err != nil {
log.Error("GetImages failed:", err.Error())
ctx.JSON(http.StatusOK, []NPUImageINFO{})
} else {
for _, info := range images.Infos {
npuImageInfos = append(npuImageInfos, NPUImageINFO{
ID: info.ID,
Value: info.Name,
})
}
ctx.JSON(http.StatusOK, npuImageInfos)
}
}
func getImages(ctx *context.APIContext, opts *models.SearchImageOptions) {
page := ctx.QueryInt("page")
if page <= 0 {
page = 1
}

pageSize := ctx.QueryInt("pageSize")
if pageSize <= 0 {
pageSize = 15
}
opts.ListOptions = models.ListOptions{
Page: page,
PageSize: pageSize,
}
imageList, total, err := models.SearchImage(opts)
if err != nil {
log.Error("Can not get images:%v", err)
ctx.JSON(http.StatusOK, models.ImagesPageResult{
Count: 0,
Images: []*models.Image{},
})
} else {
ctx.JSON(http.StatusOK, models.ImagesPageResult{
Count: total,
Images: imageList,
})
}
}

func getUID(ctx *context.APIContext) int64 {
var uid int64 = -1
if ctx.IsSigned {
uid = ctx.User.ID
}
return uid
}

+ 631
- 0
services/cloudbrain/cloudbrainTask/inference.go View File

@@ -0,0 +1,631 @@
package cloudbrainTask

import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
"unicode/utf8"

"code.gitea.io/gitea/modules/modelarts"

"code.gitea.io/gitea/modules/git"

api "code.gitea.io/gitea/modules/structs"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"
)

const CLONE_FILE_PREFIX = "file:///"

func CloudBrainInferenceJobCreate(ctx *context.Context, option api.CreateTrainJobOption) {

displayJobName := option.DisplayJobName
jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
image := strings.TrimSpace(option.Image)
uuid := option.Attachment
jobType := string(models.JobTypeInference)
codePath := setting.JobPath + jobName + cloudbrain.CodeMountPath
branchName := option.BranchName
bootFile := strings.TrimSpace(option.BootFile)
labelName := option.LabelName
repo := ctx.Repo.Repository

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName))
defer lock.UnLock()
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_samejob_err")))
return
}

ckptUrl := setting.Attachment.Minio.RealPath + option.PreTrainModelUrl + option.CkptName
log.Info("ckpt url:" + ckptUrl)
command, err := getInferenceJobCommand(option)
if err != nil {
log.Error("getTrainJobCommand failed: %v", err)
ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}

tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName)
if err == nil {
if len(tasks) != 0 {
log.Error("the job name did already exist", ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage("the job name did already exist"))
return
}
} else {
if !models.IsErrJobNotExist(err) {
log.Error("system error, %v", err, ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error"))
return
}
}

if !jobNamePattern.MatchString(displayJobName) {

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_jobname_err")))
return
}

bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName)
if err != nil || !bootFileExist {
log.Error("Get bootfile error:", err, ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_bootfile_err")))
return
}

count, err := models.GetCloudbrainCountByUserID(ctx.User.ID, jobType)
if err != nil {
log.Error("GetCloudbrainCountByUserID failed:%v", err, ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error"))
return
} else {
if count >= 1 {
log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain.morethanonejob")))
return
}
}

if branchName == "" {
branchName = cloudbrain.DefaultBranchName
}
errStr := loadCodeAndMakeModelPath(repo, codePath, branchName, jobName, cloudbrain.ResultPath)
if errStr != "" {

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr(errStr)))
return
}

commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)

datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid)
if err != nil {
log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.error.dataset_select")))
return
}
spec, err := resource.GetAndCheckSpec(ctx.User.ID, option.SpecId, models.FindSpecsOptions{
JobType: models.JobTypeInference,
ComputeResource: models.GPU,
Cluster: models.OpenICluster,
AiCenterCode: models.AICenterOfCloudBrainOne})
if err != nil || spec == nil {

ctx.JSON(http.StatusOK, models.BaseErrorMessage("Resource specification is not available"))
return
}
if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) {
log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID)
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("points.insufficient_points_balance")))
return
}
req := cloudbrain.GenerateCloudBrainTaskReq{
Ctx: ctx,
DisplayJobName: displayJobName,
JobName: jobName,
Image: image,
Command: command,
Uuids: uuid,
DatasetNames: datasetNames,
DatasetInfos: datasetInfos,
CodePath: storage.GetMinioPath(jobName, cloudbrain.CodeMountPath+"/"),
ModelPath: setting.Attachment.Minio.RealPath + option.PreTrainModelUrl,
BenchmarkPath: storage.GetMinioPath(jobName, cloudbrain.BenchMarkMountPath+"/"),
Snn4ImageNetPath: storage.GetMinioPath(jobName, cloudbrain.Snn4imagenetMountPath+"/"),
BrainScorePath: storage.GetMinioPath(jobName, cloudbrain.BrainScoreMountPath+"/"),
JobType: jobType,
Description: option.Description,
BranchName: branchName,
BootFile: option.BootFile,
Params: option.Params,
CommitID: commitID,
ResultPath: storage.GetMinioPath(jobName, cloudbrain.ResultPath+"/"),
ModelName: option.ModelName,
ModelVersion: option.ModelVersion,
CkptName: option.CkptName,
TrainUrl: option.PreTrainModelUrl,
LabelName: labelName,
Spec: spec,
}

jobId, err := cloudbrain.GenerateTask(req)
if err != nil {

ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}
ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId})
}

func ModelArtsInferenceJobCreate(ctx *context.Context, option api.CreateTrainJobOption) {
ctx.Data["PageIsTrainJob"] = true
VersionOutputPath := modelarts.GetOutputPathByCount(modelarts.TotalVersionCount)
displayJobName := option.DisplayJobName
jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
uuid := option.Attachment
description := option.Description
workServerNumber := option.WorkServerNumber
engineID, _ := strconv.Atoi(option.ImageID)
bootFile := strings.TrimSpace(option.BootFile)
params := option.Params
repo := ctx.Repo.Repository
codeLocalPath := setting.JobPath + jobName + modelarts.CodePath
codeObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.CodePath
resultObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.ResultPath + VersionOutputPath + "/"
logObsPath := "/" + setting.Bucket + modelarts.JobPath + jobName + modelarts.LogPath + VersionOutputPath + "/"
//dataPath := "/" + setting.Bucket + "/" + setting.BasePath + path.Join(uuid[0:1], uuid[1:2]) + "/" + uuid + uuid + "/"
branchName := option.BranchName
EngineName := option.Image
LabelName := option.LabelName
isLatestVersion := modelarts.IsLatestVersion
VersionCount := modelarts.VersionCountOne
trainUrl := option.PreTrainModelUrl
modelName := option.ModelName
modelVersion := option.ModelVersion
ckptName := option.CkptName
ckptUrl := "/" + option.PreTrainModelUrl + option.CkptName

errStr := checkInferenceJobMultiNode(ctx.User.ID, option.WorkServerNumber)
if errStr != "" {

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr(errStr)))
return
}

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName))
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_samejob_err")))
return
}
defer lock.UnLock()

count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainInferenceJobCountByUserID failed:%v", err, ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error"))
return
} else {
if count >= 1 {
log.Error("the user already has running or waiting inference task", ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage("you have already a running or waiting inference task, can not create more"))
return
}
}

if err := paramCheckCreateInferenceJob(option); err != nil {
log.Error("paramCheckCreateInferenceJob failed:(%v)", err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}

bootFileExist, err := ctx.Repo.FileExists(bootFile, branchName)
if err != nil || !bootFileExist {
log.Error("Get bootfile error:", err)
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.cloudbrain_bootfile_err")))
return
}

//Determine whether the task name of the task in the project is duplicated
tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeInference), displayJobName)
if err == nil {
if len(tasks) != 0 {
log.Error("the job name did already exist", ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage("the job name did already exist"))
return
}
} else {
if !models.IsErrJobNotExist(err) {
log.Error("system error, %v", err, ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage("system error"))
return
}
}

spec, err := resource.GetAndCheckSpec(ctx.User.ID, option.SpecId, models.FindSpecsOptions{
JobType: models.JobTypeInference,
ComputeResource: models.NPU,
Cluster: models.OpenICluster,
AiCenterCode: models.AICenterOfCloudBrainTwo})
if err != nil || spec == nil {

ctx.JSON(http.StatusOK, models.BaseErrorMessage("Resource specification not available"))
return
}
if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) {
log.Error("point balance is not enough,userId=%d specId=%d ", ctx.User.ID, spec.ID)
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("points.insufficient_points_balance")))
return
}

//todo: del the codeLocalPath
_, err = ioutil.ReadDir(codeLocalPath)
if err == nil {
os.RemoveAll(codeLocalPath)
}

gitRepo, _ := git.OpenRepository(repo.RepoPath())
commitID, _ := gitRepo.GetBranchCommitID(branchName)

if err := downloadCode(repo, codeLocalPath, branchName); err != nil {
log.Error("Create task failed, server timed out: %s (%v)", repo.FullName(), err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

//todo: upload code (send to file_server todo this work?)
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.ResultPath + VersionOutputPath + "/"); err != nil {
log.Error("Failed to obsMkdir_result: %s (%v)", repo.FullName(), err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage("Failed to obsMkdir_result"))
return
}

if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.LogPath + VersionOutputPath + "/"); err != nil {
log.Error("Failed to obsMkdir_log: %s (%v)", repo.FullName(), err)
ctx.JSON(http.StatusOK, models.BaseErrorMessage("Failed to obsMkdir_log"))
return
}

if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil {
log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err)
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

var parameters models.Parameters
param := make([]models.Parameter, 0)
param = append(param, models.Parameter{
Label: modelarts.ResultUrl,
Value: "s3:/" + resultObsPath,
}, models.Parameter{
Label: modelarts.CkptUrl,
Value: "s3:/" + ckptUrl,
})

datasUrlList, dataUrl, datasetNames, isMultiDataset, err := getDatasUrlListByUUIDS(uuid)
if err != nil {

ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}
dataPath := dataUrl
jsondatas, err := json.Marshal(datasUrlList)
if err != nil {
log.Error("Failed to Marshal: %v", err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage("json error:"+err.Error()))
return
}
if isMultiDataset {
param = append(param, models.Parameter{
Label: modelarts.MultiDataUrl,
Value: string(jsondatas),
})
}

existDeviceTarget := false
if len(params) != 0 {
err := json.Unmarshal([]byte(params), &parameters)
if err != nil {
log.Error("Failed to Unmarshal params: %s (%v)", params, err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage("运行参数错误"))
return
}

for _, parameter := range parameters.Parameter {
if parameter.Label == modelarts.DeviceTarget {
existDeviceTarget = true
}
if parameter.Label != modelarts.TrainUrl && parameter.Label != modelarts.DataUrl {
param = append(param, models.Parameter{
Label: parameter.Label,
Value: parameter.Value,
})
}
}
}
if !existDeviceTarget {
param = append(param, models.Parameter{
Label: modelarts.DeviceTarget,
Value: modelarts.Ascend,
})
}

req := &modelarts.GenerateInferenceJobReq{
JobName: jobName,
DisplayJobName: displayJobName,
DataUrl: dataPath,
Description: description,
CodeObsPath: codeObsPath,
BootFileUrl: codeObsPath + bootFile,
BootFile: bootFile,
TrainUrl: trainUrl,
WorkServerNumber: workServerNumber,
EngineID: int64(engineID),
LogUrl: logObsPath,
PoolID: getPoolId(),
Uuid: uuid,
Parameters: param, //modelarts train parameters
CommitID: commitID,
BranchName: branchName,
Params: option.Params,
EngineName: EngineName,
LabelName: LabelName,
IsLatestVersion: isLatestVersion,
VersionCount: VersionCount,
TotalVersionCount: modelarts.TotalVersionCount,
ModelName: modelName,
ModelVersion: modelVersion,
CkptName: ckptName,
ResultUrl: resultObsPath,
Spec: spec,
DatasetName: datasetNames,
JobType: string(models.JobTypeInference),
}

jobId, err := modelarts.GenerateInferenceJob(ctx, req)
if err != nil {
log.Error("GenerateTrainJob failed:%v", err.Error())

ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}
ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId})
}

func getDatasUrlListByUUIDS(uuidStr string) ([]models.Datasurl, string, string, bool, error) {
var isMultiDataset bool
var dataUrl string
var datasetNames string
var datasUrlList []models.Datasurl
uuids := strings.Split(uuidStr, ";")
if len(uuids) > setting.MaxDatasetNum {
log.Error("the dataset count(%d) exceed the limit", len(uuids))
return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("the dataset count exceed the limit")
}

datasetInfos := make(map[string]models.DatasetInfo)
attachs, err := models.GetAttachmentsByUUIDs(uuids)
if err != nil || len(attachs) != len(uuids) {
log.Error("GetAttachmentsByUUIDs failed: %v", err)
return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("GetAttachmentsByUUIDs failed")
}

for i, tmpUuid := range uuids {
var attach *models.Attachment
for _, tmpAttach := range attachs {
if tmpAttach.UUID == tmpUuid {
attach = tmpAttach
break
}
}
if attach == nil {
log.Error("GetAttachmentsByUUIDs failed: %v", err)
return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("GetAttachmentsByUUIDs failed")
}
fileName := strings.TrimSuffix(strings.TrimSuffix(strings.TrimSuffix(attach.Name, ".zip"), ".tar.gz"), ".tgz")
for _, datasetInfo := range datasetInfos {
if fileName == datasetInfo.Name {
log.Error("the dataset name is same: %v", attach.Name)
return datasUrlList, dataUrl, datasetNames, isMultiDataset, errors.New("the dataset name is same")
}
}
if len(attachs) <= 1 {
dataUrl = "/" + setting.Bucket + "/" + setting.BasePath + path.Join(attach.UUID[0:1], attach.UUID[1:2]) + "/" + attach.UUID + attach.UUID + "/"
isMultiDataset = false
} else {
dataUrl = "/" + setting.Bucket + "/" + setting.BasePath + path.Join(attachs[0].UUID[0:1], attachs[0].UUID[1:2]) + "/" + attachs[0].UUID + attachs[0].UUID + "/"
datasetUrl := "s3://" + setting.Bucket + "/" + setting.BasePath + path.Join(attach.UUID[0:1], attach.UUID[1:2]) + "/" + attach.UUID + attach.UUID + "/"
datasUrlList = append(datasUrlList, models.Datasurl{
DatasetUrl: datasetUrl,
DatasetName: fileName,
})
isMultiDataset = true
}

if i == 0 {
datasetNames = attach.Name
} else {
datasetNames += ";" + attach.Name
}
}

return datasUrlList, dataUrl, datasetNames, isMultiDataset, nil
}
func checkInferenceJobMultiNode(userId int64, serverNum int) string {
if serverNum == 1 {
return ""
}

return "repo.modelarts.no_node_right"

}

func paramCheckCreateInferenceJob(option api.CreateTrainJobOption) error {
if !strings.HasSuffix(strings.TrimSpace(option.BootFile), ".py") {
log.Error("the boot file(%s) must be a python file", strings.TrimSpace(option.BootFile))
return errors.New("启动文件必须是python文件")
}

if option.ModelName == "" {
log.Error("the ModelName(%d) must not be nil", option.ModelName)
return errors.New("模型名称不能为空")
}
if option.ModelVersion == "" {
log.Error("the ModelVersion(%d) must not be nil", option.ModelVersion)
return errors.New("模型版本不能为空")
}
if option.CkptName == "" {
log.Error("the CkptName(%d) must not be nil", option.CkptName)
return errors.New("权重文件不能为空")
}
if option.BranchName == "" {
log.Error("the Branch(%d) must not be nil", option.BranchName)
return errors.New("分支名不能为空")
}

if utf8.RuneCountInString(option.Description) > 255 {
log.Error("the Description length(%d) must not more than 255", option.Description)
return errors.New("描述字符不能超过255个字符")
}

return nil
}

func loadCodeAndMakeModelPath(repo *models.Repository, codePath string, branchName string, jobName string, resultPath string) string {
err := downloadCode(repo, codePath, branchName)
if err != nil {
return "cloudbrain.load_code_failed"
}

err = uploadCodeToMinio(codePath+"/", jobName, cloudbrain.CodeMountPath+"/")
if err != nil {
return "cloudbrain.load_code_failed"
}

modelPath := setting.JobPath + jobName + resultPath + "/"
err = mkModelPath(modelPath)
if err != nil {
return "cloudbrain.load_code_failed"
}
err = uploadCodeToMinio(modelPath, jobName, resultPath+"/")
if err != nil {
return "cloudbrain.load_code_failed"
}

return ""
}

func downloadCode(repo *models.Repository, codePath, branchName string) error {
//add "file:///" prefix to make the depth valid
if err := git.Clone(CLONE_FILE_PREFIX+repo.RepoPath(), codePath, git.CloneRepoOptions{Branch: branchName, Depth: 1}); err != nil {
log.Error("Failed to clone repository: %s (%v)", repo.FullName(), err)
return err
}

configFile, err := os.OpenFile(codePath+"/.git/config", os.O_RDWR, 0666)
if err != nil {
log.Error("open file(%s) failed:%v", codePath+"/,git/config", err)
return err
}

defer configFile.Close()

pos := int64(0)
reader := bufio.NewReader(configFile)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
log.Error("not find the remote-url")
return nil
} else {
log.Error("read error: %v", err)
return err
}
}

if strings.Contains(line, "url") && strings.Contains(line, ".git") {
originUrl := "\turl = " + repo.CloneLink().HTTPS + "\n"
if len(line) > len(originUrl) {
originUrl += strings.Repeat(" ", len(line)-len(originUrl))
}
bytes := []byte(originUrl)
_, err := configFile.WriteAt(bytes, pos)
if err != nil {
log.Error("WriteAt failed:%v", err)
return err
}
break
}

pos += int64(len(line))
}

return nil
}

func getInferenceJobCommand(option api.CreateTrainJobOption) (string, error) {
var command string
bootFile := strings.TrimSpace(option.BootFile)
params := option.Params

if !strings.HasSuffix(bootFile, ".py") {
log.Error("bootFile(%s) format error", bootFile)
return command, errors.New("bootFile format error")
}

var parameters models.Parameters
var param string
if len(params) != 0 {
err := json.Unmarshal([]byte(params), &parameters)
if err != nil {
log.Error("Failed to Unmarshal params: %s (%v)", params, err)
return command, err
}

for _, parameter := range parameters.Parameter {
param += " --" + parameter.Label + "=" + parameter.Value
}
}

param += " --modelname" + "=" + option.CkptName

command += "python /code/" + bootFile + param + " > " + cloudbrain.ResultPath + "/" + option.DisplayJobName + "-" + cloudbrain.LogFile

return command, nil
}

+ 678
- 0
services/cloudbrain/cloudbrainTask/train.go View File

@@ -0,0 +1,678 @@
package cloudbrainTask

import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"regexp"
"strings"

"code.gitea.io/gitea/modules/obs"

"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/storage"
"github.com/unknwon/com"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/services/cloudbrain/resource"
"code.gitea.io/gitea/services/reward/point/account"
)

var jobNamePattern = regexp.MustCompile(`^[a-z0-9][a-z0-9-_]{1,34}[a-z0-9-]$`)

func GrampusTrainJobGpuCreate(ctx *context.Context, option api.CreateTrainJobOption) {

displayJobName := option.DisplayJobName
jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
uuid := option.Attachment
description := option.Description
bootFile := strings.TrimSpace(option.BootFile)
params := option.Params
repo := ctx.Repo.Repository
codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/"
codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/"
branchName := option.BranchName
image := strings.TrimSpace(option.Image)

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
defer lock.UnLock()
spec, datasetInfos, datasetNames, err := checkParameters(ctx, option, lock, repo)
if err != nil {
ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}

//prepare code and out path
_, err = ioutil.ReadDir(codeLocalPath)
if err == nil {
os.RemoveAll(codeLocalPath)
}

if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))

}

//todo: upload code (send to file_server todo this work?)
//upload code
if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil {
log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/"
if err := mkModelPath(modelPath); err != nil {
log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

//init model readme
if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil {
log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

var datasetRemotePath, allFileName string
for _, datasetInfo := range datasetInfos {
if datasetRemotePath == "" {
datasetRemotePath = datasetInfo.DataLocalPath
allFileName = datasetInfo.FullName
} else {
datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath
allFileName = allFileName + ";" + datasetInfo.FullName
}

}

//prepare command
preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName)

command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, option.CkptName)
if err != nil {
log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage("Create task failed, internal error"))
return
}

commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)

req := &grampus.GenerateTrainJobReq{
JobName: jobName,
DisplayJobName: displayJobName,
ComputeResource: models.GPUResource,
ProcessType: grampus.ProcessorTypeGPU,
Command: command,
ImageUrl: image,
Description: description,
BootFile: bootFile,
Uuid: uuid,
CommitID: commitID,
BranchName: branchName,
Params: option.Params,
EngineName: image,
DatasetNames: datasetNames,
DatasetInfos: datasetInfos,

IsLatestVersion: modelarts.IsLatestVersion,
VersionCount: modelarts.VersionCountOne,
WorkServerNumber: 1,
Spec: spec,
}

if option.ModelName != "" { //使用预训练模型训练
req.ModelName = option.ModelName
req.LabelName = option.LabelName
req.CkptName = option.CkptName
req.ModelVersion = option.ModelVersion
req.PreTrainModelUrl = option.PreTrainModelUrl

}

jobId, err := grampus.GenerateTrainJob(ctx, req)
if err != nil {
log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"])
ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}
ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId})
}

func checkParameters(ctx *context.Context, option api.CreateTrainJobOption, lock *redis_lock.DistributeLock, repo *models.Repository) (*models.Specification, map[string]models.DatasetInfo, string, error) {
isOk, err := lock.Lock(models.CloudbrainKeyDuration)
if !isOk {
log.Error("lock processed failed:%v", err, ctx.Data["MsgID"])

return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_samejob_err"))
}

if !jobNamePattern.MatchString(option.DisplayJobName) {
return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_jobname_err"))
}

bootFileExist, err := ctx.Repo.FileExists(option.BootFile, option.BranchName)
if err != nil || !bootFileExist {
log.Error("Get bootfile error:", err, ctx.Data["MsgID"])
return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_bootfile_err"))
}

//check count limit
count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.GPUResource)
if err != nil {
log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"])
return nil, nil, "", fmt.Errorf("system error")
} else {
if count >= 1 {
log.Error("the user already has running or waiting task", ctx.Data["MsgID"])
return nil, nil, "", fmt.Errorf("you have already a running or waiting task, can not create more.")
}
}

//check param
if err := grampusParamCheckCreateTrainJob(option.BootFile, option.BranchName); err != nil {
log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"])
return nil, nil, "", err
}

//check whether the task name in the project is duplicated
tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), option.DisplayJobName)
if err == nil {
if len(tasks) != 0 {
log.Error("the job name did already exist", ctx.Data["MsgID"])
return nil, nil, "", fmt.Errorf("The job name did already exist.")
}
} else {
if !models.IsErrJobNotExist(err) {
log.Error("system error, %v", err, ctx.Data["MsgID"])
return nil, nil, "", fmt.Errorf("system error")
}
}

//check specification
computeResource := models.GPU
if option.Type == 3 {
computeResource = models.NPU
}
spec, err := resource.GetAndCheckSpec(ctx.User.ID, option.SpecId, models.FindSpecsOptions{
JobType: models.JobTypeTrain,
ComputeResource: computeResource,
Cluster: models.C2NetCluster,
})
if err != nil || spec == nil {
return nil, nil, "", fmt.Errorf("Resource specification is not available.")
}

if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) {
log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID)
return nil, nil, "", fmt.Errorf(ctx.Tr("points.insufficient_points_balance"))
}

//check dataset
datasetInfos, datasetNames, err := models.GetDatasetInfo(option.Attachment, computeResource)
if err != nil {
log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"])
return nil, nil, "", fmt.Errorf(ctx.Tr("cloudbrain.error.dataset_select"))
}
return spec, datasetInfos, datasetNames, err
}

func GrampusTrainJobNpuCreate(ctx *context.Context, option api.CreateTrainJobOption) {

displayJobName := option.DisplayJobName
jobName := util.ConvertDisplayJobNameToJobName(displayJobName)
uuid := option.Attachment
description := option.Description
bootFile := strings.TrimSpace(option.BootFile)
params := option.Params
repo := ctx.Repo.Repository
codeLocalPath := setting.JobPath + jobName + modelarts.CodePath
codeObsPath := grampus.JobPath + jobName + modelarts.CodePath
branchName := option.BranchName
isLatestVersion := modelarts.IsLatestVersion
versionCount := modelarts.VersionCountOne
engineName := option.Image

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
defer lock.UnLock()
spec, datasetInfos, datasetNames, err := checkParameters(ctx, option, lock, repo)
if err != nil {
ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}

//prepare code and out path
_, err = ioutil.ReadDir(codeLocalPath)
if err == nil {
os.RemoveAll(codeLocalPath)
}

if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil {
log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

//todo: upload code (send to file_server todo this work?)
if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil {
log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil {
log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err)

ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("cloudbrain.load_code_failed")))
return
}

var datasetRemotePath, allFileName string
for _, datasetInfo := range datasetInfos {
if datasetRemotePath == "" {
datasetRemotePath = datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'"
allFileName = datasetInfo.FullName
} else {
datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'"
allFileName = allFileName + ";" + datasetInfo.FullName
}

}

//prepare command
preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName)
command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, option.CkptName)
if err != nil {
log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"])

ctx.JSON(http.StatusOK, models.BaseErrorMessage("Create task failed, internal error"))
return
}

commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName)

req := &grampus.GenerateTrainJobReq{
JobName: jobName,
DisplayJobName: displayJobName,
ComputeResource: models.NPUResource,
ProcessType: grampus.ProcessorTypeNPU,
Command: command,
ImageId: option.ImageID,
Description: description,
CodeObsPath: codeObsPath,
BootFileUrl: codeObsPath + bootFile,
BootFile: bootFile,
WorkServerNumber: option.WorkServerNumber,
Uuid: uuid,
CommitID: commitID,
IsLatestVersion: isLatestVersion,
BranchName: branchName,
Params: option.Params,
EngineName: engineName,
VersionCount: versionCount,
TotalVersionCount: modelarts.TotalVersionCount,
DatasetNames: datasetNames,
DatasetInfos: datasetInfos,
Spec: spec,
CodeName: strings.ToLower(repo.Name),
}
if option.ModelName != "" { //使用预训练模型训练
req.ModelName = option.ModelName
req.LabelName = option.LabelName
req.CkptName = option.CkptName
req.ModelVersion = option.ModelVersion
req.PreTrainModelUrl = option.PreTrainModelUrl
req.PreTrainModelPath = preTrainModelPath
}

jobId, err := grampus.GenerateTrainJob(ctx, req)
if err != nil {
log.Error("GenerateTrainJob failed:%v", err.Error())

ctx.JSON(http.StatusOK, models.BaseErrorMessage(err.Error()))
return
}
ctx.JSON(http.StatusOK, models.BaseMessage{Code: 0, Message: jobId})
}

func obsMkdir(dir string) error {
input := &obs.PutObjectInput{}
input.Bucket = setting.Bucket
input.Key = dir
_, err := storage.ObsCli.PutObject(input)
if err != nil {
log.Error("PutObject(%s) failed: %s", input.Key, err.Error())
return err
}

return nil
}
func uploadCodeToObs(codePath, jobName, parentDir string) error {
files, err := readDir(codePath)
if err != nil {
log.Error("readDir(%s) failed: %s", codePath, err.Error())
return err
}

for _, file := range files {
if file.IsDir() {
input := &obs.PutObjectInput{}
input.Bucket = setting.Bucket
input.Key = parentDir + file.Name() + "/"
_, err = storage.ObsCli.PutObject(input)
if err != nil {
log.Error("PutObject(%s) failed: %s", input.Key, err.Error())
return err
}

if err = uploadCodeToObs(codePath+file.Name()+"/", jobName, parentDir+file.Name()+"/"); err != nil {
log.Error("uploadCodeToObs(%s) failed: %s", file.Name(), err.Error())
return err
}
} else {
input := &obs.PutFileInput{}
input.Bucket = setting.Bucket
input.Key = setting.CodePathPrefix + jobName + "/code/" + parentDir + file.Name()
input.SourceFile = codePath + file.Name()
_, err = storage.ObsCli.PutFile(input)
if err != nil {
log.Error("PutFile(%s) failed: %s", input.SourceFile, err.Error())
return err
}
}
}

return nil
}

func grampusParamCheckCreateTrainJob(bootFile string, branchName string) error {
if !strings.HasSuffix(strings.TrimSpace(bootFile), ".py") {
log.Error("the boot file(%s) must be a python file", bootFile)
return errors.New("启动文件必须是python文件")
}

if branchName == "" {
log.Error("the branch must not be null!", branchName)
return errors.New("代码分支不能为空!")
}

return nil
}
func downloadZipCode(ctx *context.Context, codePath, branchName string) error {
archiveType := git.ZIP
archivePath := codePath

if !com.IsDir(archivePath) {
if err := os.MkdirAll(archivePath, os.ModePerm); err != nil {
log.Error("MkdirAll failed:" + err.Error())
return err
}
}

// Get corresponding commit.
var (
commit *git.Commit
err error
)

gitRepo := ctx.Repo.GitRepo
if err != nil {
log.Error("OpenRepository failed:" + err.Error())
return err
}

if gitRepo.IsBranchExist(branchName) {
commit, err = gitRepo.GetBranchCommit(branchName)
if err != nil {
log.Error("GetBranchCommit failed:" + err.Error())
return err
}
} else {
log.Error("the branch is not exist: " + branchName)
return fmt.Errorf("The branch does not exist.")
}

archivePath = path.Join(archivePath, grampus.CodeArchiveName)
if !com.IsFile(archivePath) {
if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{
Format: archiveType,
Prefix: setting.Repository.PrefixArchiveFiles,
}); err != nil {
log.Error("CreateArchive failed:" + err.Error())
return err
}
}

return nil
}

func uploadCodeToMinio(codePath, jobName, parentDir string) error {
files, err := readDir(codePath)
if err != nil {
log.Error("readDir(%s) failed: %s", codePath, err.Error())
return err
}

for _, file := range files {
if file.IsDir() {
if err = uploadCodeToMinio(codePath+file.Name()+"/", jobName, parentDir+file.Name()+"/"); err != nil {
log.Error("uploadCodeToMinio(%s) failed: %s", file.Name(), err.Error())
return err
}
} else {
destObject := setting.CBCodePathPrefix + jobName + parentDir + file.Name()
sourceFile := codePath + file.Name()
err = storage.Attachments.UploadObject(destObject, sourceFile)
if err != nil {
log.Error("UploadObject(%s) failed: %s", file.Name(), err.Error())
return err
}
}
}

return nil
}

func readDir(dirname string) ([]os.FileInfo, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}

list, err := f.Readdir(0)
f.Close()
if err != nil {
//todo: can not upload empty folder
if err == io.EOF {
return nil, nil
}
return nil, err
}

//sort.Slice(list, func(i, j int) bool { return list[i].Name() < list[j].Name() })
return list, nil
}
func mkModelPath(modelPath string) error {
return mkPathAndReadMeFile(modelPath, "You can put the files into this directory and download the files by the web page.")
}

func mkPathAndReadMeFile(path string, text string) error {
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
log.Error("MkdirAll(%s) failed:%v", path, err)
return err
}

fileName := path + "README"
f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
log.Error("OpenFile failed", err.Error())
return err
}

defer f.Close()

_, err = f.WriteString(text)
if err != nil {
log.Error("WriteString failed", err.Error())
return err
}

return nil
}

func getPreTrainModelPath(pretrainModelDir string, fileName string) string {
index := strings.Index(pretrainModelDir, "/")
if index > 0 {
filterBucket := pretrainModelDir[index+1:]
return filterBucket + fileName
} else {
return ""
}

}

func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName string) (string, error) {
var command string

workDir := grampus.NpuWorkDir
if processorType == grampus.ProcessorTypeGPU {
workDir = grampus.GpuWorkDir
}

command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScript, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject)
//download code & dataset
if processorType == grampus.ProcessorTypeNPU {
//no need to download code & dataset by internet
} else if processorType == grampus.ProcessorTypeGPU {
commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " '" + dataRemotePath + "' '" + datasetName + "'"
commandDownload = processPretrainModelParameter(pretrainModelPath, pretrainModelFileName, commandDownload)
command += commandDownload
}

//unzip code & dataset
if processorType == grampus.ProcessorTypeNPU {
//no need to process
} else if processorType == grampus.ProcessorTypeGPU {
unZipDatasetCommand := generateDatasetUnzipCommand(datasetName)
commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + unZipDatasetCommand
command += commandUnzip
}

command += "echo \"unzip finished;start to exec code;\";"

// set export
var commandExport string
if processorType == grampus.ProcessorTypeNPU {
commandExport = "export bucket=" + setting.Bucket + " && export remote_path=" + outputRemotePath + ";"
} else if processorType == grampus.ProcessorTypeGPU {
commandExport = "export env=" + setting.Grampus.Env + " && export remote_path=" + outputRemotePath + ";"
}

command += commandExport

//exec code
var parameters models.Parameters
var paramCode string

if len(paramSrc) != 0 {
err := json.Unmarshal([]byte(paramSrc), &parameters)
if err != nil {
log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err)
return command, err
}

for _, parameter := range parameters.Parameter {
paramCode += " --" + parameter.Label + "=" + parameter.Value
}
}

var commandCode string
if processorType == grampus.ProcessorTypeNPU {
commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py /tmp/log/train.log" + paramCode + ";"
} else if processorType == grampus.ProcessorTypeGPU {
if pretrainModelFileName != "" {
paramCode += " --ckpt_url" + "=" + workDir + "pretrainmodel/" + pretrainModelFileName
}
commandCode = "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";"
}

command += commandCode

//get exec result
commandGetRes := "result=$?;"
command += commandGetRes

//upload models
if processorType == grampus.ProcessorTypeNPU {
commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_npu " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;"
command += commandUpload
} else if processorType == grampus.ProcessorTypeGPU {
commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;"
command += commandUpload
}

//check exec result
commandCheckRes := "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1\""
command += commandCheckRes

return command, nil
}

func processPretrainModelParameter(pretrainModelPath string, pretrainModelFileName string, commandDownload string) string {
commandDownloadTemp := commandDownload
if pretrainModelPath != "" {
commandDownloadTemp += " '" + pretrainModelPath + "' '" + pretrainModelFileName + "'"
}
commandDownloadTemp += ";"
return commandDownloadTemp
}

func generateDatasetUnzipCommand(datasetName string) string {
var unZipDatasetCommand string

datasetNameArray := strings.Split(datasetName, ";")
if len(datasetNameArray) == 1 { //单数据集
unZipDatasetCommand = "unzip -q '" + datasetName + "';"
if strings.HasSuffix(datasetNameArray[0], ".tar.gz") {
unZipDatasetCommand = "tar --strip-components=1 -zxvf '" + datasetName + "';"
}

} else { //多数据集
for _, datasetNameTemp := range datasetNameArray {
if strings.HasSuffix(datasetNameTemp, ".tar.gz") {
unZipDatasetCommand = unZipDatasetCommand + "tar -zxvf '" + datasetNameTemp + "';"
} else {
unZipDatasetCommand = unZipDatasetCommand + "unzip -q '" + datasetNameTemp + "' -d './" + strings.TrimSuffix(datasetNameTemp, ".zip") + "';"
}
}

}
return unZipDatasetCommand
}

func getPoolId() string {
var resourcePools modelarts.ResourcePool
json.Unmarshal([]byte(setting.ResourcePools), &resourcePools)

return resourcePools.Info[0].ID
}

Loading…
Cancel
Save