package models import ( "code.gitea.io/gitea/modules/timeutil" "fmt" "strings" "xorm.io/builder" ) const ( SpecNotVerified int = iota + 1 SpecOnShelf SpecOffShelf ) type SearchSpecOrderBy int const ( SearchSpecOrderById SearchSpecOrderBy = iota SearchSpecOrder4Standard ) type ResourceSpecification struct { ID int64 `xorm:"pk autoincr"` QueueId int64 `xorm:"INDEX"` SourceSpecId string `xorm:"INDEX"` AccCardsNum int CpuCores int MemGiB float32 GPUMemGiB float32 ShareMemGiB float32 UnitPrice int Status int IsAvailable bool IsAutomaticSync bool CreatedTime timeutil.TimeStamp `xorm:"created"` CreatedBy int64 UpdatedTime timeutil.TimeStamp `xorm:"updated"` UpdatedBy int64 } func (r ResourceSpecification) ConvertToRes() *ResourceSpecificationRes { return &ResourceSpecificationRes{ ID: r.ID, SourceSpecId: r.SourceSpecId, AccCardsNum: r.AccCardsNum, CpuCores: r.CpuCores, MemGiB: r.MemGiB, ShareMemGiB: r.ShareMemGiB, GPUMemGiB: r.GPUMemGiB, UnitPrice: r.UnitPrice, Status: r.Status, IsAvailable: r.IsAvailable, UpdatedTime: r.UpdatedTime, } } type ResourceSpecificationReq struct { QueueId int64 `binding:"Required"` SourceSpecId string AccCardsNum int CpuCores int MemGiB float32 GPUMemGiB float32 ShareMemGiB float32 UnitPrice int Status int IsAutomaticSync bool CreatorId int64 } func (r ResourceSpecificationReq) ToDTO() ResourceSpecification { return ResourceSpecification{ QueueId: r.QueueId, SourceSpecId: r.SourceSpecId, AccCardsNum: r.AccCardsNum, CpuCores: r.CpuCores, MemGiB: r.MemGiB, GPUMemGiB: r.GPUMemGiB, ShareMemGiB: r.ShareMemGiB, UnitPrice: r.UnitPrice, Status: r.Status, IsAutomaticSync: r.IsAutomaticSync, CreatedBy: r.CreatorId, UpdatedBy: r.CreatorId, IsAvailable: true, } } type SearchResourceSpecificationOptions struct { ListOptions QueueId int64 Status int Cluster string AvailableCode int OrderBy SearchSpecOrderBy } type SearchResourceBriefSpecificationOptions struct { QueueId int64 Cluster string } type ResourceSpecAndQueueListRes struct { TotalSize int64 List []*ResourceSpecAndQueueRes } func NewResourceSpecAndQueueListRes(totalSize int64, list []ResourceSpecAndQueue) *ResourceSpecAndQueueListRes { resList := make([]*ResourceSpecAndQueueRes, len(list)) for i, v := range list { resList[i] = v.ConvertToRes() } return &ResourceSpecAndQueueListRes{ TotalSize: totalSize, List: resList, } } type ResourceSpecificationRes struct { ID int64 SourceSpecId string AccCardsNum int CpuCores int MemGiB float32 GPUMemGiB float32 ShareMemGiB float32 UnitPrice int Status int IsAvailable bool UpdatedTime timeutil.TimeStamp } func (ResourceSpecificationRes) TableName() string { return "resource_specification" } type ResourceSpecAndQueueRes struct { Spec *ResourceSpecificationRes Queue *ResourceQueueRes } type ResourceSpecAndQueue struct { ResourceSpecification `xorm:"extends"` ResourceQueue `xorm:"extends"` } func (*ResourceSpecAndQueue) TableName() string { return "resource_specification" } func (r ResourceSpecAndQueue) ConvertToRes() *ResourceSpecAndQueueRes { return &ResourceSpecAndQueueRes{ Spec: r.ResourceSpecification.ConvertToRes(), Queue: r.ResourceQueue.ConvertToRes(), } } type FindSpecsOptions struct { JobType JobType ComputeResource string Cluster string AiCenterCode string SpecId int64 QueueCode string SourceSpecId string AccCardsNum int UseAccCardsNum bool AccCardType string CpuCores int UseCpuCores bool MemGiB float32 UseMemGiB bool GPUMemGiB float32 UseGPUMemGiB bool ShareMemGiB float32 UseShareMemGiB bool //if true,find specs no matter used or not used in scene. if false,only find specs used in scene RequestAll bool SpecStatus int } type Specification struct { ID int64 SourceSpecId string AccCardsNum int AccCardType string CpuCores int MemGiB float32 GPUMemGiB float32 ShareMemGiB float32 ComputeResource string UnitPrice int QueueId int64 QueueCode string Cluster string AiCenterCode string AiCenterName string IsExclusive bool ExclusiveOrg string //specs that have the same sourceSpecId, computeResource and cluster as current spec RelatedSpecs []*Specification } func (Specification) TableName() string { return "resource_specification" } func (s *Specification) loadRelatedSpecs() { if s.RelatedSpecs != nil { return } defaultSpecs := make([]*Specification, 0) if s.SourceSpecId == "" { s.RelatedSpecs = defaultSpecs return } r, err := FindSpecs(FindSpecsOptions{ ComputeResource: s.ComputeResource, Cluster: s.Cluster, SourceSpecId: s.SourceSpecId, RequestAll: true, SpecStatus: SpecOnShelf, }) if err != nil { s.RelatedSpecs = defaultSpecs return } s.RelatedSpecs = r } func (s *Specification) GetAvailableCenterIds(userIds ...int64) []string { s.loadRelatedSpecs() if len(s.RelatedSpecs) == 0 { return make([]string, 0) } var uId int64 if len(userIds) > 0 { uId = userIds[0] } //filter exclusive specs specs := FilterExclusiveSpecs(s.RelatedSpecs, uId) centerIds := make([]string, len(specs)) for i, v := range specs { centerIds[i] = v.AiCenterCode } return centerIds } func FilterExclusiveSpecs(r []*Specification, userId int64) []*Specification { if userId == 0 { return r } specs := make([]*Specification, 0, len(r)) specMap := make(map[int64]string, 0) for i := 0; i < len(r); i++ { spec := r[i] if _, has := specMap[spec.ID]; has { continue } if !spec.IsExclusive { specs = append(specs, spec) specMap[spec.ID] = "" continue } orgs := strings.Split(spec.ExclusiveOrg, ";") for _, org := range orgs { isMember, _ := IsOrganizationMemberByOrgName(org, userId) if isMember { specs = append(specs, spec) specMap[spec.ID] = "" break } } } return specs } func DistinctSpecs(r []*Specification) []*Specification { specs := make([]*Specification, 0, len(r)) sourceSpecIdMap := make(map[string]string, 0) for i := 0; i < len(r); i++ { spec := r[i] if spec.SourceSpecId == "" { specs = append(specs, spec) continue } if _, has := sourceSpecIdMap[spec.SourceSpecId]; has { continue } specs = append(specs, spec) sourceSpecIdMap[spec.SourceSpecId] = "" } return specs } func InsertResourceSpecification(r ResourceSpecification) (int64, error) { return x.Insert(&r) } func UpdateResourceSpecificationById(queueId int64, spec ResourceSpecification) (int64, error) { return x.ID(queueId).Update(&spec) } func UpdateSpecUnitPriceById(id int64, unitPrice int) error { _, err := x.Exec("update resource_specification set unit_price = ? ,updated_time = ? where id = ?", unitPrice, timeutil.TimeStampNow(), id) return err } func SearchResourceSpecification(opts SearchResourceSpecificationOptions) (int64, []ResourceSpecAndQueue, error) { var cond = builder.NewCond() if opts.Page <= 0 { opts.Page = 1 } if opts.QueueId > 0 { cond = cond.And(builder.Eq{"resource_specification.queue_id": opts.QueueId}) } if opts.Status > 0 { cond = cond.And(builder.Eq{"resource_specification.status": opts.Status}) } if opts.Cluster != "" { cond = cond.And(builder.Eq{"resource_queue.cluster": opts.Cluster}) } if opts.AvailableCode == 1 { cond = cond.And(builder.Eq{"resource_specification.is_available": true}) } else if opts.AvailableCode == 2 { cond = cond.And(builder.Eq{"resource_specification.is_available": false}) } //cond = cond.And(builder.Or(builder.Eq{"resource_queue.deleted_time": 0}).Or(builder.IsNull{"resource_queue.deleted_time"})) n, err := x.Where(cond).Join("INNER", "resource_queue", "resource_queue.ID = resource_specification.queue_id"). Unscoped().Count(&ResourceSpecAndQueue{}) if err != nil { return 0, nil, err } var orderby = "" switch opts.OrderBy { case SearchSpecOrder4Standard: orderby = "resource_queue.compute_resource asc,resource_queue.acc_card_type asc,resource_specification.acc_cards_num asc,resource_specification.cpu_cores asc,resource_specification.mem_gi_b asc,resource_specification.share_mem_gi_b asc" default: orderby = "resource_specification.id desc" } r := make([]ResourceSpecAndQueue, 0) err = x.Where(cond). Join("INNER", "resource_queue", "resource_queue.ID = resource_specification.queue_id"). OrderBy(orderby). Limit(opts.PageSize, (opts.Page-1)*opts.PageSize). Unscoped().Find(&r) if err != nil { return 0, nil, err } return n, r, nil } func GetSpecScenes(specId int64) ([]ResourceSceneBriefRes, error) { r := make([]ResourceSceneBriefRes, 0) err := x.Where("resource_scene_spec.spec_id = ?", specId). Join("INNER", "resource_scene_spec", "resource_scene_spec.scene_id = resource_scene.id"). Find(&r) if err != nil { return nil, err } return r, nil } func ResourceSpecOnShelf(id int64, unitPrice int) error { _, err := x.Exec("update resource_specification set unit_price = ?,updated_time = ?,status = ? where id = ?", unitPrice, timeutil.TimeStampNow(), SpecOnShelf, id) return err } func ResourceSpecOffShelf(id int64) (int64, error) { sess := x.NewSession() var err error defer func() { if err != nil { sess.Rollback() } sess.Close() }() param := ResourceSpecification{ Status: SpecOffShelf, } n, err := sess.Where("id = ? and status = ?", id, SpecOnShelf).Update(¶m) if err != nil { return 0, err } sess.Commit() return n, err } func GetResourceSpecificationByIds(ids []int64) ([]*Specification, error) { r := make([]*Specification, 0) err := x.In("resource_specification.id", ids). Join("INNER", "resource_queue", "resource_queue.id = resource_specification.queue_id"). Find(&r) return r, err } func GetResourceSpecification(r *ResourceSpecification) (*ResourceSpecification, error) { has, err := x.Get(r) if err != nil { return nil, err } else if !has { return nil, nil } return r, nil } func SyncGrampusSpecs(updateList []ResourceSpecification, insertList []ResourceSpecification, existIds []int64) error { sess := x.NewSession() var err error defer func() { if err != nil { sess.Rollback() } sess.Close() }() //delete specs and scene that no longer exists deleteIds := make([]int64, 0) cond := builder.NewCond() cond = cond.And(builder.NotIn("resource_specification.id", existIds)).And(builder.Eq{"resource_queue.cluster": C2NetCluster}) if err := sess.Cols("resource_specification.id").Table("resource_specification"). Where(cond).Join("INNER", "resource_queue", "resource_queue.id = resource_specification.queue_id"). Find(&deleteIds); err != nil { return err } if len(deleteIds) > 0 { if _, err = sess.Cols("status", "is_available").In("id", deleteIds).Update(&ResourceSpecification{Status: SpecOffShelf, IsAvailable: false}); err != nil { return err } } //update exists specs if len(updateList) > 0 { for _, v := range updateList { if _, err = sess.ID(v.ID).UseBool("is_available").Update(&v); err != nil { return err } } } //insert new specs if len(insertList) > 0 { if _, err = sess.Insert(insertList); err != nil { return err } } return sess.Commit() } //FindSpecs func FindSpecs(opts FindSpecsOptions) ([]*Specification, error) { var cond = builder.NewCond() if !opts.RequestAll && opts.JobType != "" { cond = cond.And(builder.Eq{"resource_scene.job_type": opts.JobType}) } if opts.ComputeResource != "" { cond = cond.And(builder.Eq{"resource_queue.compute_resource": opts.ComputeResource}) } if opts.Cluster != "" { cond = cond.And(builder.Eq{"resource_queue.cluster": opts.Cluster}) } if opts.AiCenterCode != "" { cond = cond.And(builder.Eq{"resource_queue.ai_center_code": opts.AiCenterCode}) } if opts.SpecId > 0 { cond = cond.And(builder.Eq{"resource_specification.id": opts.SpecId}) } if opts.QueueCode != "" { cond = cond.And(builder.Eq{"resource_queue.queue_code": opts.QueueCode}) } if opts.SourceSpecId != "" { cond = cond.And(builder.Eq{"resource_specification.source_spec_id": opts.SourceSpecId}) } if opts.UseAccCardsNum { cond = cond.And(builder.Eq{"resource_specification.acc_cards_num": opts.AccCardsNum}) } if opts.AccCardType != "" { cond = cond.And(builder.Eq{"resource_queue.acc_card_type": opts.AccCardType}) } if opts.UseCpuCores { cond = cond.And(builder.Eq{"resource_specification.cpu_cores": opts.CpuCores}) } if opts.UseMemGiB { cond = cond.And(builder.Eq{"resource_specification.mem_gi_b": opts.MemGiB}) } if opts.UseGPUMemGiB { cond = cond.And(builder.Eq{"resource_specification.gpu_mem_gi_b": opts.GPUMemGiB}) } if opts.UseShareMemGiB { cond = cond.And(builder.Eq{"resource_specification.share_mem_gi_b": opts.ShareMemGiB}) } if opts.SpecStatus > 0 { cond = cond.And(builder.Eq{"resource_specification.status": opts.SpecStatus}) } r := make([]*Specification, 0) s := x.Where(cond). Join("INNER", "resource_queue", "resource_queue.id = resource_specification.queue_id") if !opts.RequestAll { s = s.Join("INNER", "resource_scene_spec", "resource_scene_spec.spec_id = resource_specification.id"). Join("INNER", "resource_scene", "resource_scene_spec.scene_id = resource_scene.id") } err := s.OrderBy("resource_queue.compute_resource asc,resource_queue.acc_card_type asc,resource_specification.acc_cards_num asc,resource_specification.cpu_cores asc,resource_specification.mem_gi_b asc,resource_specification.share_mem_gi_b asc"). Unscoped().Find(&r) if err != nil { return nil, err } return r, nil } func InitQueueAndSpec(queue ResourceQueue, spec ResourceSpecification) (*Specification, error) { sess := x.NewSession() defer sess.Close() sess.Begin() param := ResourceQueue{ QueueCode: queue.QueueCode, Cluster: queue.Cluster, AiCenterCode: queue.AiCenterCode, ComputeResource: queue.ComputeResource, AccCardType: queue.AccCardType, } _, err := sess.Get(¶m) if err != nil { sess.Rollback() return nil, err } if param.ID == 0 { _, err = sess.InsertOne(&queue) if err != nil { sess.Rollback() return nil, err } } else { queue = param } spec.QueueId = queue.ID _, err = sess.InsertOne(&spec) if err != nil { sess.Rollback() return nil, err } sess.Commit() return BuildSpecification(queue, spec), nil } func BuildSpecification(queue ResourceQueue, spec ResourceSpecification) *Specification { return &Specification{ ID: spec.ID, SourceSpecId: spec.SourceSpecId, AccCardsNum: spec.AccCardsNum, AccCardType: queue.AccCardType, CpuCores: spec.CpuCores, MemGiB: spec.MemGiB, GPUMemGiB: spec.GPUMemGiB, ShareMemGiB: spec.ShareMemGiB, ComputeResource: queue.ComputeResource, UnitPrice: spec.UnitPrice, QueueId: queue.ID, QueueCode: queue.QueueCode, Cluster: queue.Cluster, AiCenterCode: queue.AiCenterCode, AiCenterName: queue.AiCenterName, } } func GetCloudbrainOneAccCardType(queueCode string) string { switch queueCode { case "a100": return "A100" case "openidebug": return "T4" case "openidgx": return "V100" } return "" } var cloudbrainTwoSpecsInitFlag = false var cloudbrainTwoSpecs map[string]*Specification func GetCloudbrainTwoSpecs() (map[string]*Specification, error) { if !cloudbrainTwoSpecsInitFlag { r, err := InitCloudbrainTwoSpecs() if err != nil { return nil, err } cloudbrainTwoSpecsInitFlag = true cloudbrainTwoSpecs = r } return cloudbrainTwoSpecs, nil } func InitCloudbrainTwoSpecs() (map[string]*Specification, error) { r := make(map[string]*Specification, 0) queue, err := GetResourceQueue(&ResourceQueue{QueueCode: "openisupport"}) if err != nil { return nil, err } if queue == nil { queue = &ResourceQueue{ QueueCode: "openisupport", Cluster: OpenICluster, AiCenterCode: AICenterOfCloudBrainTwo, AiCenterName: "云脑二", ComputeResource: NPU, AccCardType: "ASCEND910", Remark: "处理历史云脑任务时自动生成", } _, err = x.InsertOne(queue) if err != nil { return nil, err } } for i := 1; i <= 8; i = i * 2 { sourceSpecId := "modelarts.bm.910.arm.public." + fmt.Sprint(i) spec, err := GetResourceSpecification(&ResourceSpecification{ SourceSpecId: sourceSpecId, QueueId: queue.ID, }) if err != nil { return nil, err } if spec == nil { spec = &ResourceSpecification{ QueueId: queue.ID, SourceSpecId: sourceSpecId, AccCardsNum: i, CpuCores: i * 24, MemGiB: float32(i * 256), GPUMemGiB: float32(32), Status: SpecOffShelf, IsAvailable: true, } _, err = x.Insert(spec) if err != nil { return nil, err } } r[sourceSpecId] = BuildSpecification(*queue, *spec) } return r, nil } var grampusSpecsInitFlag = false var grampusSpecs map[string]*Specification func GetGrampusSpecs() (map[string]*Specification, error) { if !grampusSpecsInitFlag { specMap := make(map[string]*Specification, 0) r, err := FindSpecs(FindSpecsOptions{ Cluster: C2NetCluster, RequestAll: true, }) if err != nil { return nil, err } for _, spec := range r { specMap[spec.SourceSpecId] = spec specMap[spec.SourceSpecId+"_"+spec.AiCenterCode] = spec } grampusSpecsInitFlag = true grampusSpecs = specMap } return grampusSpecs, nil }