Browse Source

init

cd-notebook
lewis 2 years ago
parent
commit
cdae759a09
5 changed files with 739 additions and 0 deletions
  1. +227
    -0
      modules/modelarts_cd/modelarts.go
  2. +246
    -0
      modules/modelarts_cd/resty.go
  3. +42
    -0
      modules/modelarts_gateway/core/escape.go
  4. +208
    -0
      modules/modelarts_gateway/core/signer.go
  5. +16
    -0
      modules/setting/setting.go

+ 227
- 0
modules/modelarts_cd/modelarts.go View File

@@ -0,0 +1,227 @@
package modelarts_cd

import (
"encoding/json"
"errors"
"strconv"
"strings"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
)

const (
//notebook
storageTypeOBS = "obs"
autoStopDuration = 4 * 60 * 60
autoStopDurationMs = 4 * 60 * 60 * 1000
MORDELART_USER_IMAGE_ENGINE_ID = -1
DataSetMountPath = "/home/ma-user/work"
NotebookEnv = "Python3"
NotebookType = "Ascend"
FlavorInfo = "Ascend: 1*Ascend 910 CPU: 24 核 96GiB (modelarts.kat1.xlarge)"

//train-job
CodePath = "/code/"
OutputPath = "/output/"
ResultPath = "/result/"
LogPath = "/log/"
JobPath = "/job/"
OrderDesc = "desc" //向下查询
OrderAsc = "asc" //向上查询
Lines = 500
TrainUrl = "train_url"
DataUrl = "data_url"
MultiDataUrl = "multi_data_url"
ResultUrl = "result_url"
CkptUrl = "ckpt_url"
DeviceTarget = "device_target"
Ascend = "Ascend"
PerPage = 10
IsLatestVersion = "1"
NotLatestVersion = "0"
VersionCountOne = 1

SortByCreateTime = "create_time"
ConfigTypeCustom = "custom"
TotalVersionCount = 1
)

var (
poolInfos *models.PoolInfos
FlavorInfos *models.FlavorInfos
ImageInfos *models.ImageInfosModelArts
TrainFlavorInfos *Flavor
SpecialPools *models.SpecialPools
)

type VersionInfo struct {
Version []struct {
ID int `json:"id"`
Value string `json:"value"`
Url string `json:"url"`
} `json:"version"`
}

type Flavor struct {
Info []struct {
Code string `json:"code"`
Value string `json:"value"`
} `json:"flavor"`
}

type Engine struct {
Info []struct {
ID int `json:"id"`
Value string `json:"value"`
} `json:"engine"`
}

type ResourcePool struct {
Info []struct {
ID string `json:"id"`
Value string `json:"value"`
} `json:"resource_pool"`
}

type Parameters struct {
Parameter []struct {
Label string `json:"label"`
Value string `json:"value"`
} `json:"parameter"`
}

func GenerateNotebook(ctx *context.Context, displayJobName, jobName, uuid, description, flavor, imageId string) error {
if poolInfos == nil {
json.Unmarshal([]byte(setting.PoolInfos), &poolInfos)
}

imageName, err := GetNotebookImageName(imageId)
if err != nil {
log.Error("GetNotebookImageName failed: %v", err.Error())
return err
}
createTime := timeutil.TimeStampNow()
jobResult, err := createNotebook(models.CreateNotebook2Params{
JobName: jobName,
Description: description,
Flavor: flavor,
Duration: autoStopDurationMs,
ImageID: imageId,
PoolID: poolInfos.PoolInfo[0].PoolId,
Feature: models.NotebookFeature,
Volume: models.VolumeReq{
Capacity: setting.Capacity,
Category: models.EVSCategory,
Ownership: models.ManagedOwnership,
},
WorkspaceID: "0",
})
if err != nil {
log.Error("createNotebook2 failed: %v", err.Error())
if strings.HasPrefix(err.Error(), UnknownErrorPrefix) {
log.Info("(%s)unknown error, set temp status", displayJobName)
errTemp := models.InsertCloudbrainTemp(&models.CloudbrainTemp{
JobID: models.TempJobId,
VersionID: models.TempVersionId,
Status: models.TempJobStatus,
Type: models.TypeCloudBrainTwo,
JobName: jobName,
JobType: string(models.JobTypeDebug),
})
if errTemp != nil {
log.Error("InsertCloudbrainTemp failed: %v", errTemp.Error())
return errTemp
}
}
return err
}
task := &models.Cloudbrain{
Status: jobResult.Status,
UserID: ctx.User.ID,
RepoID: ctx.Repo.Repository.ID,
JobID: jobResult.ID,
JobName: jobName,
FlavorCode: flavor,
DisplayJobName: displayJobName,
JobType: string(models.JobTypeDebug),
Type: models.TypeCloudBrainTwo,
Uuid: uuid,
ComputeResource: models.NPUResource,
Image: imageName,
Description: description,
CreatedUnix: createTime,
UpdatedUnix: createTime,
}

err = models.CreateCloudbrain(task)
if err != nil {
return err
}

stringId := strconv.FormatInt(task.ID, 10)
notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, stringId, displayJobName, models.ActionCreateDebugNPUTask)
return nil
}

func GetNotebookImageName(imageId string) (string, error) {
var validImage = false
var imageName = ""

if ImageInfos == nil {
json.Unmarshal([]byte(setting.ImageInfos), &ImageInfos)
}

for _, imageInfo := range ImageInfos.ImageInfo {
if imageInfo.Id == imageId {
validImage = true
imageName = imageInfo.Value
}
}

if !validImage {
log.Error("the image id(%s) is invalid", imageId)
return imageName, errors.New("the image id is invalid")
}

return imageName, nil
}

func HandleNotebookInfo(task *models.Cloudbrain) error {

result, err := GetNotebook(task.JobID)
if err != nil {
log.Error("GetNotebook2(%s) failed:%v", task.DisplayJobName, err)
return err
}

if result != nil {
oldStatus := task.Status
task.Status = result.Status
if task.StartTime == 0 && result.Lease.UpdateTime > 0 {
task.StartTime = timeutil.TimeStamp(result.Lease.UpdateTime / 1000)
}
if task.EndTime == 0 && models.IsModelArtsDebugJobTerminal(task.Status) {
task.EndTime = timeutil.TimeStampNow()
}
task.CorrectCreateUnix()
task.ComputeAndSetDuration()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
}
if task.FlavorCode == "" {
task.FlavorCode = result.Flavor
}
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.DisplayJobName, err)
return err
}
}

return nil
}

+ 246
- 0
modules/modelarts_cd/resty.go View File

@@ -0,0 +1,246 @@
package modelarts_cd

import (
"bytes"
"code.gitea.io/gitea/modules/modelarts_gateway/core"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)

var (
httpClient *http.Client
HOST string
TOKEN string
)

const (
methodPassword = "password"

urlGetToken = "/v3/auth/tokens"
errorCodeExceedLimit = "ModelArts.0118"

//notebook 2.0
urlNotebook2 = "/notebooks"

//error code
modelartsIllegalToken = "ModelArts.6401"
NotebookNotFound = "ModelArts.6404"
NotebookNoPermission = "ModelArts.6407"
NotebookInvalid = "ModelArts.6400"
UnknownErrorPrefix = "UNKNOWN:"
)

func getHttpClient() *http.Client {
if httpClient == nil {
httpClient = &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
}
}
return httpClient
}

func GetNotebook(jobID string) (*models.GetNotebook2Result, error) {
client := getHttpClient()
var result models.GetNotebook2Result

retry := 0

sendjob:
res, err := client.R().
SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetResult(&result).
Get(HOST + "/v1/" + setting.ProjectID + urlNotebook2 + "/" + jobID)

if err != nil {
return nil, fmt.Errorf("resty GetJob: %v", err)
}

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
goto sendjob
}

var response models.NotebookResult
err = json.Unmarshal(res.Body(), &response)
if err != nil {
log.Error("json.Unmarshal failed: %s", err.Error())
return &result, fmt.Errorf("son.Unmarshal failed: %s", err.Error())
}

if len(response.ErrorCode) != 0 {
log.Error("GetJob failed(%s): %s", response.ErrorCode, response.ErrorMsg)
if response.ErrorCode == modelartsIllegalToken && retry < 1 {
retry++
_ = getToken()
goto sendjob
}
return &result, fmt.Errorf("GetJob failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

return &result, nil
}

func ManageNotebook(jobID string, param models.NotebookAction) (*models.NotebookActionResult, error) {
client := getHttpClient()
var result models.NotebookActionResult

retry := 0

sendjob:
res, err := client.R().
SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetResult(&result).
Post(HOST + "/v1/" + setting.ProjectID + urlNotebook2 + "/" + jobID + "/" + param.Action + "?duration=" + strconv.Itoa(autoStopDurationMs))

if err != nil {
return &result, fmt.Errorf("resty ManageNotebook2: %v", err)
}

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
goto sendjob
}

var response models.NotebookResult
err = json.Unmarshal(res.Body(), &response)
if err != nil {
log.Error("json.Unmarshal failed: %s", err.Error())
return &result, fmt.Errorf("son.Unmarshal failed: %s", err.Error())
}

if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

if len(response.ErrorCode) != 0 {
log.Error("ManageNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
if response.ErrorCode == modelartsIllegalToken && retry < 1 {
retry++
_ = getToken()
goto sendjob
}
return &result, fmt.Errorf("ManageNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

return &result, nil
}

func DelNotebook(jobID string) (*models.NotebookDelResult, error) {
client := getHttpClient()
var result models.NotebookDelResult

retry := 0

sendjob:
res, err := client.R().
SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetResult(&result).
Delete(HOST + "/v1/" + setting.ProjectID + urlNotebook2 + "/" + jobID)

if err != nil {
return &result, fmt.Errorf("resty DelJob: %v", err)
}

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
goto sendjob
}

var response models.NotebookResult
err = json.Unmarshal(res.Body(), &response)
if err != nil {
log.Error("json.Unmarshal failed: %s", err.Error())
return &result, fmt.Errorf("son.Unmarshal failed: %s", err.Error())
}

if len(response.ErrorCode) != 0 {
log.Error("DelNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
if response.ErrorCode == modelartsIllegalToken && retry < 1 {
retry++
_ = getToken()
goto sendjob
}
return &result, fmt.Errorf("DelNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

return &result, nil
}

func createNotebook(createJobParams models.CreateNotebook2Params) (*models.CreateNotebookResult, error) {
client := getHttpClient()
var result models.CreateNotebookResult

retry := 0

s := core.Signer{
Key: "",
Secret: "",
}

r, _ := http.NewRequest(http.MethodPost, "", ioutil.NopCloser(bytes.NewBuffer([]byte(""))))

s.Sign(r)

resp, err := http.DefaultClient.Do(r)
body, err := ioutil.ReadAll(resp.Body)

sendjob:
res, err := client.
SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetBody(createJobParams).
SetResult(&result).
Post(HOST + "/v1/" + setting.ProjectID + urlNotebook2)

if err != nil {
return nil, fmt.Errorf("resty create notebook2: %s", err)
}

if res.StatusCode() == http.StatusUnauthorized && retry < 1 {
retry++
_ = getToken()
goto sendjob
}

var response models.NotebookResult
err = json.Unmarshal(res.Body(), &response)
if err != nil {
log.Error("json.Unmarshal failed: %s", err.Error())
return &result, fmt.Errorf("json.Unmarshal failed: %s", err.Error())
}

if res.StatusCode() == http.StatusBadGateway {
return &result, fmt.Errorf(UnknownErrorPrefix+"createNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

if len(response.ErrorCode) != 0 {
log.Error("createNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
if response.ErrorCode == errorCodeExceedLimit {
response.ErrorMsg = "所选规格使用数量已超过最大配额限制。"
}
if response.ErrorCode == modelartsIllegalToken && retry < 1 {
retry++
_ = getToken()
goto sendjob
}
return &result, fmt.Errorf("createNotebook2 failed(%s): %s", response.ErrorCode, response.ErrorMsg)
}

return &result, nil
}

+ 42
- 0
modules/modelarts_gateway/core/escape.go View File

@@ -0,0 +1,42 @@
// based on https://github.com/golang/go/blob/master/src/net/url/url.go
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package core

func shouldEscape(c byte) bool {
if 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || '0' <= c && c <= '9' || c == '_' || c == '-' || c == '~' || c == '.' {
return false
}
return true
}
func escape(s string) string {
hexCount := 0
for i := 0; i < len(s); i++ {
c := s[i]
if shouldEscape(c) {
hexCount++
}
}

if hexCount == 0 {
return s
}

t := make([]byte, len(s)+2*hexCount)
j := 0
for i := 0; i < len(s); i++ {
switch c := s[i]; {
case shouldEscape(c):
t[j] = '%'
t[j+1] = "0123456789ABCDEF"[c>>4]
t[j+2] = "0123456789ABCDEF"[c&15]
j += 3
default:
t[j] = s[i]
j++
}
}
return string(t)
}

+ 208
- 0
modules/modelarts_gateway/core/signer.go View File

@@ -0,0 +1,208 @@
// HWS API Gateway Signature
// based on https://github.com/datastream/aws/blob/master/signv4.go
// Copyright (c) 2014, Xianjie

package core

import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strings"
"time"
)

const (
BasicDateFormat = "20060102T150405Z"
Algorithm = "SDK-HMAC-SHA256"
HeaderXDate = "X-Sdk-Date"
HeaderHost = "host"
HeaderAuthorization = "Authorization"
HeaderContentSha256 = "X-Sdk-Content-Sha256"
)

func hmacsha256(key []byte, data string) ([]byte, error) {
h := hmac.New(sha256.New, []byte(key))
if _, err := h.Write([]byte(data)); err != nil {
return nil, err
}
return h.Sum(nil), nil
}

// Build a CanonicalRequest from a regular request string
//
// CanonicalRequest =
// HTTPRequestMethod + '\n' +
// CanonicalURI + '\n' +
// CanonicalQueryString + '\n' +
// CanonicalHeaders + '\n' +
// SignedHeaders + '\n' +
// HexEncode(Hash(RequestPayload))
func CanonicalRequest(r *http.Request, signedHeaders []string) (string, error) {
var hexencode string
var err error
if hex := r.Header.Get(HeaderContentSha256); hex != "" {
hexencode = hex
} else {
data, err := RequestPayload(r)
if err != nil {
return "", err
}
hexencode, err = HexEncodeSHA256Hash(data)
if err != nil {
return "", err
}
}
return fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", r.Method, CanonicalURI(r), CanonicalQueryString(r), CanonicalHeaders(r, signedHeaders), strings.Join(signedHeaders, ";"), hexencode), err
}

// CanonicalURI returns request uri
func CanonicalURI(r *http.Request) string {
pattens := strings.Split(r.URL.Path, "/")
var uri []string
for _, v := range pattens {
uri = append(uri, escape(v))
}
urlpath := strings.Join(uri, "/")
if len(urlpath) == 0 || urlpath[len(urlpath)-1] != '/' {
urlpath = urlpath + "/"
}
return urlpath
}

// CanonicalQueryString
func CanonicalQueryString(r *http.Request) string {
var keys []string
query := r.URL.Query()
for key := range query {
keys = append(keys, key)
}
sort.Strings(keys)
var a []string
for _, key := range keys {
k := escape(key)
sort.Strings(query[key])
for _, v := range query[key] {
kv := fmt.Sprintf("%s=%s", k, escape(v))
a = append(a, kv)
}
}
queryStr := strings.Join(a, "&")
r.URL.RawQuery = queryStr
return queryStr
}

// CanonicalHeaders
func CanonicalHeaders(r *http.Request, signerHeaders []string) string {
var a []string
header := make(map[string][]string)
for k, v := range r.Header {
header[strings.ToLower(k)] = v
}
for _, key := range signerHeaders {
value := header[key]
if strings.EqualFold(key, HeaderHost) {
value = []string{r.Host}
}
sort.Strings(value)
for _, v := range value {
a = append(a, key+":"+strings.TrimSpace(v))
}
}
return fmt.Sprintf("%s\n", strings.Join(a, "\n"))
}

// SignedHeaders
func SignedHeaders(r *http.Request) []string {
var a []string
for key := range r.Header {
a = append(a, strings.ToLower(key))
}
sort.Strings(a)
return a
}

// RequestPayload
func RequestPayload(r *http.Request) ([]byte, error) {
if r.Body == nil {
return []byte(""), nil
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return []byte(""), err
}
r.Body = ioutil.NopCloser(bytes.NewBuffer(b))
return b, err
}

// Create a "String to Sign".
func StringToSign(canonicalRequest string, t time.Time) (string, error) {
hash := sha256.New()
_, err := hash.Write([]byte(canonicalRequest))
if err != nil {
return "", err
}
return fmt.Sprintf("%s\n%s\n%x",
Algorithm, t.UTC().Format(BasicDateFormat), hash.Sum(nil)), nil
}

// Create the HWS Signature.
func SignStringToSign(stringToSign string, signingKey []byte) (string, error) {
hm, err := hmacsha256(signingKey, stringToSign)
return fmt.Sprintf("%x", hm), err
}

// HexEncodeSHA256Hash returns hexcode of sha256
func HexEncodeSHA256Hash(body []byte) (string, error) {
hash := sha256.New()
if body == nil {
body = []byte("")
}
_, err := hash.Write(body)
return fmt.Sprintf("%x", hash.Sum(nil)), err
}

// Get the finalized value for the "Authorization" header. The signature parameter is the output from SignStringToSign
func AuthHeaderValue(signature, accessKey string, signedHeaders []string) string {
return fmt.Sprintf("%s Access=%s, SignedHeaders=%s, Signature=%s", Algorithm, accessKey, strings.Join(signedHeaders, ";"), signature)
}

// Signature HWS meta
type Signer struct {
Key string
Secret string
}

// SignRequest set Authorization header
func (s *Signer) Sign(r *http.Request) error {
var t time.Time
var err error
var dt string
if dt = r.Header.Get(HeaderXDate); dt != "" {
t, err = time.Parse(BasicDateFormat, dt)
}
if err != nil || dt == "" {
t = time.Now()
r.Header.Set(HeaderXDate, t.UTC().Format(BasicDateFormat))
}
signedHeaders := SignedHeaders(r)
canonicalRequest, err := CanonicalRequest(r, signedHeaders)
if err != nil {
return err
}
stringToSign, err := StringToSign(canonicalRequest, t)
if err != nil {
return err
}
signature, err := SignStringToSign(stringToSign, []byte(s.Secret))
if err != nil {
return err
}
authValue := AuthHeaderValue(signature, s.Key, signedHeaders)
r.Header.Set(HeaderAuthorization, authValue)
return nil
}

+ 16
- 0
modules/setting/setting.go View File

@@ -548,6 +548,22 @@ var (
TrainJobFLAVORINFOS string
ModelArtsSpecialPools string

// modelarts-cd config
ModelartsCD = struct {
ModelArtsHost string
IamHost string
ProjectID string
ProjectName string
ModelArtsUsername string
ModelArtsPassword string
ModelArtsDomain string
AllowedOrg string
ProfileID string
PoolInfos string
Flavor string
DebugHost string
}{}

//grampus config
Grampus = struct {
Env string


Loading…
Cancel
Save