From 7a3cc57f9f76d1c620a3816872043b32b7e2a6f7 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Fri, 24 Jun 2022 14:54:46 +0800 Subject: [PATCH] #2225 update --- models/action.go | 10 ++ models/attachment.go | 6 +- models/error.go | 12 +++ models/limit_config.go | 8 ++ models/models.go | 1 + models/repo_watch.go | 5 +- models/reward_admin_log.go | 46 ++++++++ models/reward_operate_record.go | 84 ++++++++++++++- models/reward_periodic_task.go | 11 +- models/task_accomplish_log.go | 11 +- models/task_config.go | 30 ------ modules/auth/wechat/event_handle.go | 7 +- modules/cloudbrain/resty.go | 8 +- modules/cron/tasks_basic.go | 2 +- modules/notification/action/action.go | 103 ++++++++++++++++++ modules/notification/base/notifier.go | 7 +- modules/notification/base/null.go | 8 +- modules/notification/notification.go | 15 ++- modules/notification/task/task.go | 157 ---------------------------- modules/redis/redis_client/client.go | 4 +- modules/redis/redis_key/reward_redis_key.go | 5 +- modules/redis/redis_key/serial_redis_key.go | 10 ++ modules/setting/setting.go | 10 +- routers/repo/cloudbrain.go | 12 +-- routers/repo/modelarts.go | 8 +- routers/reward/point/point.go | 46 ++++++++ routers/routes/routes.go | 4 + routers/task/task.go | 15 +++ routers/user/setting/profile.go | 2 +- services/reward/admin_operate.go | 50 +++++++++ services/reward/cloubrain_deduct.go | 17 +-- services/reward/limiter/limiter.go | 39 ++----- services/reward/operator.go | 54 ++++++++-- services/reward/period_task.go | 8 +- services/reward/point/point_operate.go | 2 +- services/reward/record.go | 20 ++++ services/reward/serial.go | 21 ++++ services/task/task.go | 31 ++++-- 38 files changed, 591 insertions(+), 298 deletions(-) create mode 100644 models/reward_admin_log.go delete mode 100644 modules/notification/task/task.go create mode 100644 modules/redis/redis_key/serial_redis_key.go create mode 100644 routers/task/task.go create mode 100644 services/reward/admin_operate.go create mode 100644 services/reward/record.go create mode 100644 services/reward/serial.go diff --git a/models/action.go b/models/action.go index 9b92b4192..456d5c6bc 100755 --- a/models/action.go +++ b/models/action.go @@ -58,6 +58,16 @@ const ( ActionCreateBenchMarkTask //29 ActionCreateNewModelTask //30 ActionCreateGPUTrainTask //31 + + ActionBindWechat //32issue_assignees + ActionCreateCloudbrainTask //33 + ActionDatasetRecommended //34 + ActionCreateImage //35 + ActionImageRecommend //36 + ActionChangeUserAvatar //37 + ActionPushCommits //38 + ActionForkRepo //39 + ) // Action represents user operation type and other information to diff --git a/models/attachment.go b/models/attachment.go index 0e4751ed2..453c819b1 100755 --- a/models/attachment.go +++ b/models/attachment.go @@ -654,9 +654,9 @@ func Attachments(opts *AttachmentsOptions) ([]*AttachmentInfo, int64, error) { return attachments, count, nil } -func GetAllUserIdByDatasetId(datasetId int64) ([]int64, error) { - r := make([]int64, 0) - if err := x.Table("attachment").Where("dataset_id = ?", datasetId).Distinct("uploader_id").Find(&r); err != nil { +func GetAllDatasetContributorByDatasetId(datasetId int64) ([]*User, error) { + r := make([]*User, 0) + if err := x.Select("distinct(user.*)").Table("attachment").Join("LEFT", "user", "user.ID = attachment.uploader_id").Where("attachment.dataset_id = ?", datasetId).Find(&r); err != nil { return nil, err } return r, nil diff --git a/models/error.go b/models/error.go index 19afa9d8b..7c7b0418b 100755 --- a/models/error.go +++ b/models/error.go @@ -2024,3 +2024,15 @@ func IsErrRecordNotExist(err error) bool { func (err ErrRecordNotExist) Error() string { return fmt.Sprintf("record not exist in database") } + +type ErrInsufficientPointsBalance struct { +} + +func IsErrInsufficientPointsBalance(err error) bool { + _, ok := err.(ErrInsufficientPointsBalance) + return ok +} + +func (err ErrInsufficientPointsBalance) Error() string { + return fmt.Sprintf("Insufficient points balance") +} diff --git a/models/limit_config.go b/models/limit_config.go index 62ff3bfbe..ce8d2cfc2 100644 --- a/models/limit_config.go +++ b/models/limit_config.go @@ -41,6 +41,14 @@ func (l LimitScope) Name() string { } } +type LimiterRejectPolicy string + +const ( + JustReject LimiterRejectPolicy = "JUST_REJECT" + PermittedOnce LimiterRejectPolicy = "PERMITTED_ONCE" + FillUp LimiterRejectPolicy = "FillUp" +) + type LimitConfig struct { ID int64 `xorm:"pk autoincr"` Tittle string diff --git a/models/models.go b/models/models.go index c6c0d6610..731b31960 100755 --- a/models/models.go +++ b/models/models.go @@ -151,6 +151,7 @@ func init() { new(RewardPeriodicTask), new(PointAccountLog), new(PointAccount), + new(RewardAdminLog), ) tablesStatistic = append(tablesStatistic, diff --git a/models/repo_watch.go b/models/repo_watch.go index 2d01bde1f..864aec254 100644 --- a/models/repo_watch.go +++ b/models/repo_watch.go @@ -25,6 +25,7 @@ const ( ) var ActionChan = make(chan *Action, 200) +var ActionChan4Task = make(chan Action, 200) // Watch is connection request for receiving repository notification. type Watch struct { @@ -199,6 +200,9 @@ func notifyWatchers(e Engine, actions ...*Action) error { if _, err = e.InsertOne(act); err != nil { return fmt.Errorf("insert new actioner: %v", err) } + // After InsertOne(act),the act has ID + // Send the act to task chan + ActionChan4Task <- *act if repoChanged { act.loadRepo() @@ -279,7 +283,6 @@ func notifyWatchers(e Engine, actions ...*Action) error { // NotifyWatchers creates batch of actions for every watcher. func NotifyWatchers(actions ...*Action) error { - error := notifyWatchers(x, actions...) producer(actions...) return error diff --git a/models/reward_admin_log.go b/models/reward_admin_log.go new file mode 100644 index 000000000..5e4258682 --- /dev/null +++ b/models/reward_admin_log.go @@ -0,0 +1,46 @@ +package models + +import ( + "code.gitea.io/gitea/modules/timeutil" +) + +const ( + RewardAdminLogProcessing = 1 + RewardAdminLogSuccess = 2 + RewardAdminLogFailed = 3 +) + +type RewardAdminLog struct { + ID int64 `xorm:"pk autoincr"` + LogId string `xorm:"INDEX NOT NULL"` + Amount int64 `xorm:"NOT NULL"` + RewardType string + Remark string + Status int + TargetUserId int64 `xorm:"INDEX NOT NULL"` + CreatorId int64 `xorm:"NOT NULL"` + CreatorName string + CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` +} + +func getRewardAdminLog(ra *RewardAdminLog) (*RewardAdminLog, error) { + has, err := x.Get(ra) + if err != nil { + return nil, err + } else if !has { + return nil, ErrRecordNotExist{} + } + return ra, nil +} + +func InsertRewardAdminLog(ra *RewardAdminLog) (int64, error) { + return x.Insert(ra) +} + +func UpdateRewardAdminLogStatus(logId string, oldStatus, newStatus int) error { + _, err := x.Where("log_id = ? and status = ?", logId, oldStatus).Update(&RewardAdminLog{Status: newStatus}) + if err != nil { + return err + } + return nil +} diff --git a/models/reward_operate_record.go b/models/reward_operate_record.go index d3b2e0a10..d58accfa5 100644 --- a/models/reward_operate_record.go +++ b/models/reward_operate_record.go @@ -95,6 +95,7 @@ func GetRewardOperateTypeInstance(s string) RewardOperateType { const ( OperateTypeIncrease RewardOperateType = "INCREASE" OperateTypeDecrease RewardOperateType = "DECREASE" + OperateTypeNull RewardOperateType = "NIL" ) const ( @@ -105,11 +106,18 @@ const ( const Semicolon = ";" +type RewardOperateOrderBy string + +const ( + RewardOrderByID RewardOperateOrderBy = "id" +) + type RewardOperateRecord struct { ID int64 `xorm:"pk autoincr"` - RecordId string `xorm:"INDEX NOT NULL"` + SerialNo string `xorm:"INDEX NOT NULL"` UserId int64 `xorm:"INDEX NOT NULL"` Amount int64 `xorm:"NOT NULL"` + Tittle string RewardType string `xorm:"NOT NULL"` SourceType string `xorm:"NOT NULL"` SourceId string `xorm:"INDEX NOT NULL"` @@ -121,6 +129,32 @@ type RewardOperateRecord struct { UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` } +type AdminRewardOperateReq struct { + TargetUserId int64 `binding:"Required"` + OperateType RewardOperateType `binding:"Required"` + Amount int64 `binding:"Required;Range(1,100000)"` + Remark string + RewardType RewardType +} + +func (r RewardOperateRecord) ToShow() RewardOperateRecordShow { + return RewardOperateRecordShow{ + SerialNo: r.SerialNo, + Date: r.CreatedUnix, + Tittle: r.Tittle, + OperateType: r.OperateType, + Amount: r.Amount, + } +} + +type RewardOperateRecordShow struct { + SerialNo string + Date timeutil.TimeStamp + Tittle string + OperateType string + Amount int64 +} + func getPointOperateRecord(tl *RewardOperateRecord) (*RewardOperateRecord, error) { has, err := x.Get(tl) if err != nil { @@ -140,14 +174,14 @@ func GetPointOperateRecordBySourceTypeAndRequestId(sourceType, requestId, operat return getPointOperateRecord(t) } -func GetPointOperateRecordByRecordId(recordId string) (*RewardOperateRecord, error) { +func GetPointOperateRecordBySerialNo(serialNo string) (*RewardOperateRecord, error) { t := &RewardOperateRecord{ - RecordId: recordId, + SerialNo: serialNo, } return getPointOperateRecord(t) } -func InsertAwardOperateRecord(tl *RewardOperateRecord) (int64, error) { +func InsertRewardOperateRecord(tl *RewardOperateRecord) (int64, error) { return x.Insert(tl) } @@ -175,11 +209,13 @@ func SumRewardAmountInTaskPeriod(rewardType string, sourceType string, userId in type RewardOperateContext struct { SourceType SourceType SourceId string + Tittle string Remark string Reward Reward TargetUserId int64 RequestId string OperateType RewardOperateType + RejectPolicy LimiterRejectPolicy } type Reward struct { @@ -202,3 +238,43 @@ type UserRewardOperation struct { func AppendRemark(remark, appendStr string) string { return strings.TrimPrefix(remark+Semicolon+appendStr, Semicolon) } + +type RewardRecordListOpts struct { + ListOptions + UserId int64 + OperateType RewardOperateType + RewardType RewardType + OrderBy RewardOperateOrderBy +} + +func GetRewardRecordList(opts RewardRecordListOpts) ([]RewardOperateRecord, int64, error) { + if opts.Page <= 0 { + opts.Page = 1 + } + + if len(opts.OrderBy) == 0 { + opts.OrderBy = RewardOrderByID + } + + r := make([]RewardOperateRecord, 0) + cond := builder.NewCond() + if opts.UserId > 0 { + cond = cond.And(builder.Eq{"user_id": opts.UserId}) + } + if opts.OperateType != OperateTypeNull { + cond = cond.And(builder.Eq{"operate_type": opts.OperateType.Name()}) + } + cond = cond.And(builder.Eq{"reward_type": opts.RewardType.Name()}) + cond = cond.And(builder.Gt{"amount": 0}) + + count, err := x.Where(cond).Count(&RewardOperateRecord{}) + if err != nil { + return nil, 0, err + } + + err = x.Where(cond).Limit(opts.PageSize, (opts.Page-1)*opts.PageSize).OrderBy(string(opts.OrderBy)).Find(&r) + if err != nil { + return nil, 0, err + } + return r, count, nil +} diff --git a/models/reward_periodic_task.go b/models/reward_periodic_task.go index e6ebd17c2..5db5301b5 100644 --- a/models/reward_periodic_task.go +++ b/models/reward_periodic_task.go @@ -29,7 +29,7 @@ func (r PeriodType) Name() string { type RewardPeriodicTask struct { ID int64 `xorm:"pk autoincr"` - OperateRecordId string `xorm:"INDEX NOT NULL"` + OperateSerialNo string `xorm:"INDEX NOT NULL"` DelaySeconds int64 IntervalSeconds int64 Amount int64 `xorm:"NOT NULL"` @@ -45,6 +45,7 @@ type StartPeriodicTaskOpts struct { SourceType SourceType SourceId string Remark string + Tittle string TargetUserId int64 RequestId string OperateType RewardOperateType @@ -76,7 +77,7 @@ func IncrRewardTaskSuccessCount(t RewardPeriodicTask, count int64, nextTime time sess.Rollback() return err } - _, err = sess.Exec("update reward_operate_record set amount = amount + ? ,updated_unix = ? where record_id = ?", count*t.Amount, timeutil.TimeStampNow(), t.OperateRecordId) + _, err = sess.Exec("update reward_operate_record set amount = amount + ? ,updated_unix = ? where serial_no = ?", count*t.Amount, timeutil.TimeStampNow(), t.OperateSerialNo) if err != nil { sess.Rollback() return err @@ -88,7 +89,7 @@ func IncrRewardTaskSuccessCount(t RewardPeriodicTask, count int64, nextTime time func GetPeriodicTaskBySourceIdAndType(sourceType SourceType, sourceId string, operateType RewardOperateType) (*RewardPeriodicTask, error) { r := RewardPeriodicTask{} _, err := x.SQL("select rpt.* from reward_periodic_task rpt "+ - "inner join reward_operate_record ror on rpt.operate_record_id = ror.record_id"+ + "inner join reward_operate_record ror on rpt.operate_serial_no = ror.serial_no"+ " where ror.source_type = ? and source_id = ? and operate_type = ? ", sourceType.Name(), sourceId, operateType.Name()).Get(&r) if err != nil { return nil, err @@ -96,7 +97,7 @@ func GetPeriodicTaskBySourceIdAndType(sourceType SourceType, sourceId string, op return &r, nil } -func StopPeriodicTask(taskId int64, operateRecordId string, stopTime time.Time) error { +func StopPeriodicTask(taskId int64, operateSerialNo string, stopTime time.Time) error { sess := x.NewSession() defer sess.Close() _, err := sess.Where("id = ? and status = ?", taskId, PeriodicTaskStatusRunning).Update(&RewardPeriodicTask{Status: PeriodicTaskStatusFinished, FinishedUnix: timeutil.TimeStamp(stopTime.Unix())}) @@ -104,7 +105,7 @@ func StopPeriodicTask(taskId int64, operateRecordId string, stopTime time.Time) sess.Rollback() return err } - _, err = sess.Where("record_id = ? and status = ?", operateRecordId, OperateStatusOperating).Update(&RewardOperateRecord{Status: OperateStatusSucceeded}) + _, err = sess.Where("serial_no = ? and status = ?", operateSerialNo, OperateStatusOperating).Update(&RewardOperateRecord{Status: OperateStatusSucceeded}) if err != nil { sess.Rollback() return err diff --git a/models/task_accomplish_log.go b/models/task_accomplish_log.go index 3736d1c41..a1edb71ee 100644 --- a/models/task_accomplish_log.go +++ b/models/task_accomplish_log.go @@ -6,11 +6,12 @@ import ( ) type TaskAccomplishLog struct { - ID int64 `xorm:"pk autoincr"` - LogId string `xorm:"INDEX NOT NULL"` - ConfigId int64 `xorm:"NOT NULL"` - TaskCode string `xorm:"NOT NULL"` - UserId int64 `xorm:"INDEX NOT NULL"` + ID int64 `xorm:"pk autoincr"` + LogId string `xorm:"INDEX NOT NULL"` + ConfigId int64 `xorm:"NOT NULL"` + TaskCode string `xorm:"NOT NULL"` + UserId int64 `xorm:"INDEX NOT NULL"` + ActionId int64 CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` } diff --git a/models/task_config.go b/models/task_config.go index cd4329834..eef3a6c33 100644 --- a/models/task_config.go +++ b/models/task_config.go @@ -5,36 +5,6 @@ import ( ) const ( - TaskTypeNewIssue = "NEW_ISSUE" - TaskTypeIssueChangeStatus = "ISSUE_CHANGE_STATUS" - TaskTypeCreateIssueComment = "CREATE_ISSUE_COMMENT" - TaskTypeNewPullRequest = "NEW_PULL_REQUEST" - TaskTypeRenameRepository = "RENAME_REPOSITORY" - TaskTypeAliasRepository = "ALIAS_REPOSITORY" - TaskTypeTransferRepository = "TRANSFER_REPOSITORY" - TaskTypeCreateRepository = "CREATE_REPOSITORY" - TaskTypeCreatePublicRepository = "CREATE_PUBLIC_REPOSITORY" - TaskTypeForkRepository = "FORK_REPOSITORY" - TaskTypePullRequestReview = "PULL_REQUEST_REVIEW" - TaskTypeCommentPull = "COMMENT_PULL" - TaskTypeApprovePullRequest = "APPROVE_PULL_REQUEST" - TaskTypeRejectPullRequest = "REJECT_PULL_REQUEST" - TaskTypeMergePullRequest = "MERGE_PULL_REQUEST" - TaskTypeSyncPushCommits = "SYNC_PUSH_COMMITS" - TaskTypeSyncCreateRef = "SYNC_CREATE_REF" - TaskTypeSyncDeleteRef = "SYNC_DELETE_REF" - TaskTypeBindWechat = "BIND_WECHAT" - TaskTypeUploadAttachment = "UPLOAD_ATTACHMENT" - TaskTypeCreateCloudbrainTask = "CREATE_CLOUDBRAIN_TASK" - TaskTypeDatasetRecommended = "DATASET_RECOMMENDED" - TaskTypeCreateModel = "CREATE_MODEL" - TaskTypeCreatePublicImage = "CREATE_PUBLIC_IMAGE" - TaskTypeImageRecommend = "IMAGE_RECOMMEND" - TaskTypeChangeUserAvatar = "CHANGE_USER_AVATAR" - TaskTypePushCommits = "PUSH_COMMITS" -) - -const ( PeriodNotCycle = "NOT_CYCLE" PeriodDaily = "DAILY" ) diff --git a/modules/auth/wechat/event_handle.go b/modules/auth/wechat/event_handle.go index 67c3a7265..399537f1e 100644 --- a/modules/auth/wechat/event_handle.go +++ b/modules/auth/wechat/event_handle.go @@ -1,6 +1,7 @@ package wechat import ( + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/redis/redis_client" "code.gitea.io/gitea/modules/redis/redis_key" @@ -72,6 +73,10 @@ func HandleSubscribeEvent(we WechatEvent) string { jsonStr, _ := json.Marshal(qrCache) redis_client.Setex(redis_key.WechatBindingUserIdKey(sceneStr), string(jsonStr), 60*time.Second) } - notification.NotifyWechatBind(qrCache.UserId, we.FromUserName) + u, err := models.GetUserByID(qrCache.UserId) + if err == nil { + notification.NotifyWechatBind(u, we.FromUserName) + } + return BIND_REPLY_SUCCESS } diff --git a/modules/cloudbrain/resty.go b/modules/cloudbrain/resty.go index 75614e571..d45468ddb 100755 --- a/modules/cloudbrain/resty.go +++ b/modules/cloudbrain/resty.go @@ -212,7 +212,7 @@ func getQueryString(page int, size int, name string) string { return fmt.Sprintf("pageIndex=%d&pageSize=%d&name=%s", page, size, name) } -func CommitImage(jobID string, params models.CommitImageParams) error { +func CommitImage(jobID string, params models.CommitImageParams, doer *models.User) error { imageTag := strings.TrimSpace(params.ImageTag) dbImage, err := models.GetImageByTag(imageTag) @@ -314,12 +314,12 @@ sendjob: }) if err == nil { go updateImageStatus(image, isSetCreatedUnix, createTime) - notification.NotifyCreateImage(params.UID, image) + notification.NotifyCreateImage(doer, image) } return err } -func CommitAdminImage(params models.CommitImageParams) error { +func CommitAdminImage(params models.CommitImageParams, doer *models.User) error { imageTag := strings.TrimSpace(params.ImageTag) exist, err := models.IsImageExist(imageTag) @@ -357,7 +357,7 @@ func CommitAdminImage(params models.CommitImageParams) error { return nil }) if err == nil { - notification.NotifyCreateImage(params.UID, image) + notification.NotifyCreateImage(doer, image) } return err } diff --git a/modules/cron/tasks_basic.go b/modules/cron/tasks_basic.go index 39100594d..5892699eb 100755 --- a/modules/cron/tasks_basic.go +++ b/modules/cron/tasks_basic.go @@ -212,7 +212,7 @@ func registerRewardPeriodTask() { RegisterTaskFatal("reward_period_task", &BaseConfig{ Enabled: true, RunAtStart: true, - Schedule: "@every 5m", + Schedule: "@every 1m", }, func(ctx context.Context, _ *models.User, _ Config) error { reward.StartRewardTask() return nil diff --git a/modules/notification/action/action.go b/modules/notification/action/action.go index 2ac73c2c3..6a43c6e9a 100644 --- a/modules/notification/action/action.go +++ b/modules/notification/action/action.go @@ -5,6 +5,7 @@ package action import ( + "code.gitea.io/gitea/modules/auth" "encoding/json" "fmt" "path" @@ -345,3 +346,105 @@ func (a *actionNotifier) NotifyOtherTask(doer *models.User, repo *models.Reposit log.Error("notifyWatchers: %v", err) } } + +func (t *actionNotifier) NotifyWechatBind(user *models.User, wechatOpenId string) { + act := &models.Action{ + ActUserID: user.ID, + ActUser: user, + OpType: models.ActionBindWechat, + IsPrivate: true, + Content: wechatOpenId, + } + if err := models.NotifyWatchers(act); err != nil { + log.Error("notifyWatchers: %v", err) + } +} + +func (t *actionNotifier) NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string) { + switch action { + case "recommend": + users, err := models.GetAllDatasetContributorByDatasetId(dataset.ID) + if err != nil { + return + } + var actions = make([]*models.Action, 0) + for _, user := range users { + actions = append(actions, &models.Action{ + OpType: models.ActionDatasetRecommended, + ActUserID: user.ID, + ActUser: user, + RepoID: dataset.RepoID, + Repo: dataset.Repo, + Content: fmt.Sprint(dataset.ID), + }) + } + if err := models.NotifyWatchers(actions...); err != nil { + log.Error("notifyWatchers: %v", err) + } + } +} + +func (t *actionNotifier) NotifyCreateImage(doer *models.User, image models.Image) { + act := &models.Action{ + ActUserID: doer.ID, + ActUser: doer, + OpType: models.ActionCreateImage, + IsPrivate: image.IsPrivate, + Content: fmt.Sprint(image.ID), + } + if err := models.NotifyWatchers(act); err != nil { + log.Error("notifyWatchers: %v", err) + } +} + +func (t *actionNotifier) NotifyImageRecommend(optUser *models.User, imageId int64, action string) { + image, err := models.GetImageByID(imageId) + if err != nil { + return + } + u, err := models.GetUserByID(image.UID) + if err != nil { + return + } + switch action { + case "recommend": + act := &models.Action{ + ActUserID: u.ID, + ActUser: u, + OpType: models.ActionImageRecommend, + IsPrivate: false, + Content: fmt.Sprint(imageId), + } + if err := models.NotifyWatchers(act); err != nil { + log.Error("notifyWatchers: %v", err) + } + } +} + +func (t *actionNotifier) NotifyChangeUserAvatar(user *models.User, form auth.AvatarForm) { + act := &models.Action{ + ActUserID: user.ID, + ActUser: user, + OpType: models.ActionChangeUserAvatar, + IsPrivate: true, + } + if err := models.NotifyWatchers(act); err != nil { + log.Error("notifyWatchers: %v", err) + } +} + +func (t *actionNotifier) NotifyPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *repository.PushCommits) { + act := &models.Action{ + ActUserID: pusher.ID, + ActUser: pusher, + OpType: models.ActionPushCommits, + RepoID: repo.ID, + Repo: repo, + RefName: refName, + IsPrivate: repo.IsPrivate, + Content: fmt.Sprintf("%s|%s", oldCommitID, newCommitID), + } + if err := models.NotifyWatchers(act); err != nil { + log.Error("notifyWatchers: %v", err) + } +} diff --git a/modules/notification/base/notifier.go b/modules/notification/base/notifier.go index c3c7f404a..7673a5909 100644 --- a/modules/notification/base/notifier.go +++ b/modules/notification/base/notifier.go @@ -6,6 +6,7 @@ package base import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/repository" ) @@ -56,9 +57,9 @@ type Notifier interface { NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) - NotifyWechatBind(userId int64, wechatOpenId string) + NotifyWechatBind(user *models.User, wechatOpenId string) NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string) - NotifyCreateImage(optUserId int64, image models.Image) + NotifyCreateImage(doer *models.User, image models.Image) NotifyImageRecommend(optUser *models.User, imageId int64, action string) - NotifyChangeUserAvatar(user *models.User) + NotifyChangeUserAvatar(user *models.User, form auth.AvatarForm) } diff --git a/modules/notification/base/null.go b/modules/notification/base/null.go index c0a224697..eea5c5e77 100644 --- a/modules/notification/base/null.go +++ b/modules/notification/base/null.go @@ -6,6 +6,7 @@ package base import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/repository" ) @@ -159,18 +160,19 @@ func (*NullNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, } -func (*NullNotifier) NotifyWechatBind(userId int64, wechatOpenId string) { +func (*NullNotifier) NotifyWechatBind(user *models.User, wechatOpenId string) { } func (*NullNotifier) NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string) { } -func (*NullNotifier) NotifyCreateImage(optUserId int64, image models.Image) { +func (*NullNotifier) NotifyCreateImage(doer *models.User, image models.Image) { } func (*NullNotifier) NotifyImageRecommend(optUser *models.User, imageId int64, action string) { } -func (*NullNotifier) NotifyChangeUserAvatar(user *models.User) { +func (*NullNotifier) NotifyChangeUserAvatar(user *models.User, form auth.AvatarForm) { + } diff --git a/modules/notification/notification.go b/modules/notification/notification.go index 118bdf994..d652dc043 100644 --- a/modules/notification/notification.go +++ b/modules/notification/notification.go @@ -6,11 +6,11 @@ package notification import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/auth" "code.gitea.io/gitea/modules/notification/action" "code.gitea.io/gitea/modules/notification/base" "code.gitea.io/gitea/modules/notification/indexer" "code.gitea.io/gitea/modules/notification/mail" - "code.gitea.io/gitea/modules/notification/task" "code.gitea.io/gitea/modules/notification/ui" "code.gitea.io/gitea/modules/notification/webhook" "code.gitea.io/gitea/modules/repository" @@ -36,7 +36,6 @@ func NewContext() { RegisterNotifier(indexer.NewNotifier()) RegisterNotifier(webhook.NewNotifier()) RegisterNotifier(action.NewNotifier()) - RegisterNotifier(task.NewNotifier()) } // NotifyUploadAttachment notifies attachment upload message to notifiers @@ -273,9 +272,9 @@ func NotifySyncDeleteRef(pusher *models.User, repo *models.Repository, refType, } // NotifyWechatBind notifies wechat bind -func NotifyWechatBind(userId int64, wechatOpenId string) { +func NotifyWechatBind(user *models.User, wechatOpenId string) { for _, notifier := range notifiers { - notifier.NotifyWechatBind(userId, wechatOpenId) + notifier.NotifyWechatBind(user, wechatOpenId) } } @@ -287,9 +286,9 @@ func NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, actio } // NotifyDatasetRecommend -func NotifyCreateImage(optUserId int64, image models.Image) { +func NotifyCreateImage(doer *models.User, image models.Image) { for _, notifier := range notifiers { - notifier.NotifyCreateImage(optUserId, image) + notifier.NotifyCreateImage(doer, image) } } @@ -301,8 +300,8 @@ func NotifyImageRecommend(optUser *models.User, imageId int64, action string) { } // NotifyDatasetRecommend -func NotifyChangeUserAvatar(user *models.User) { +func NotifyChangeUserAvatar(user *models.User, form auth.AvatarForm) { for _, notifier := range notifiers { - notifier.NotifyChangeUserAvatar(user) + notifier.NotifyChangeUserAvatar(user, form) } } diff --git a/modules/notification/task/task.go b/modules/notification/task/task.go deleted file mode 100644 index 077d6699b..000000000 --- a/modules/notification/task/task.go +++ /dev/null @@ -1,157 +0,0 @@ -package task - -import ( - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/notification/base" - "code.gitea.io/gitea/modules/repository" - "code.gitea.io/gitea/services/task" - "strings" -) - -type taskNotifier struct { - base.NullNotifier -} - -var ( - _ base.Notifier = &taskNotifier{} -) - -// NewNotifier create a new actionNotifier notifier -func NewNotifier() base.Notifier { - return &taskNotifier{} -} - -func (t *taskNotifier) NotifyNewIssue(issue *models.Issue) { - task.Accomplish(issue.Poster.ID, models.TaskTypeNewIssue) -} - -// NotifyIssueChangeStatus notifies close or reopen issue to notifiers -func (t *taskNotifier) NotifyIssueChangeStatus(doer *models.User, issue *models.Issue, actionComment *models.Comment, closeOrReopen bool) { - task.Accomplish(doer.ID, models.TaskTypeIssueChangeStatus) -} - -// NotifyCreateIssueComment notifies comment on an issue to notifiers -func (t *taskNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository, - issue *models.Issue, comment *models.Comment) { - task.Accomplish(doer.ID, models.TaskTypeCreateIssueComment) -} - -func (t *taskNotifier) NotifyNewPullRequest(pull *models.PullRequest) { - task.Accomplish(pull.Issue.Poster.ID, models.TaskTypeNewPullRequest) -} - -func (t *taskNotifier) NotifyRenameRepository(doer *models.User, repo *models.Repository, oldRepoName string) { - task.Accomplish(doer.ID, models.TaskTypeRenameRepository) -} - -func (t *taskNotifier) NotifyAliasRepository(doer *models.User, repo *models.Repository, oldAlias string) { - task.Accomplish(doer.ID, models.TaskTypeAliasRepository) -} - -func (t *taskNotifier) NotifyTransferRepository(doer *models.User, repo *models.Repository, oldOwnerName string) { - task.Accomplish(doer.ID, models.TaskTypeTransferRepository) -} - -func (t *taskNotifier) NotifyCreateRepository(doer *models.User, u *models.User, repo *models.Repository) { - if !repo.IsPrivate { - task.Accomplish(doer.ID, models.TaskTypeCreatePublicRepository) - } - -} - -func (t *taskNotifier) NotifyForkRepository(doer *models.User, oldRepo, repo *models.Repository) { - task.Accomplish(doer.ID, models.TaskTypeForkRepository) -} - -func (t *taskNotifier) NotifyPullRequestReview(pr *models.PullRequest, review *models.Review, comment *models.Comment) { - for _, lines := range review.CodeComments { - for _, comments := range lines { - for _, _ = range comments { - task.Accomplish(review.Reviewer.ID, models.TaskTypePullRequestReview) - } - } - } - if review.Type != models.ReviewTypeComment || strings.TrimSpace(comment.Content) != "" { - - switch review.Type { - case models.ReviewTypeApprove: - task.Accomplish(review.Reviewer.ID, models.TaskTypeApprovePullRequest) - case models.ReviewTypeReject: - task.Accomplish(review.Reviewer.ID, models.TaskTypeRejectPullRequest) - default: - task.Accomplish(review.Reviewer.ID, models.TaskTypeCommentPull) - } - - } -} - -func (t *taskNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *models.User) { - task.Accomplish(doer.ID, models.TaskTypeMergePullRequest) -} - -func (t *taskNotifier) NotifySyncPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *repository.PushCommits) { - task.Accomplish(repo.OwnerID, models.TaskTypeSyncPushCommits) -} - -func (t *taskNotifier) NotifySyncCreateRef(doer *models.User, repo *models.Repository, refType, refFullName string) { - task.Accomplish(repo.OwnerID, models.TaskTypeSyncCreateRef) -} - -func (t *taskNotifier) NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string) { - task.Accomplish(repo.OwnerID, models.TaskTypeSyncDeleteRef) -} - -func (t *taskNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) { - switch optype { - case models.ActionUploadAttachment: - task.Accomplish(doer.ID, models.TaskTypeUploadAttachment) - case models.ActionCreateDebugGPUTask, - models.ActionCreateDebugNPUTask, - models.ActionCreateTrainTask, - models.ActionCreateInferenceTask, - models.ActionCreateBenchMarkTask, - models.ActionCreateGPUTrainTask: - task.Accomplish(doer.ID, models.TaskTypeCreateCloudbrainTask) - case models.ActionCreateNewModelTask: - task.Accomplish(doer.ID, models.TaskTypeCreateModel) - } - return -} - -func (t *taskNotifier) NotifyWechatBind(userId int64, wechatOpenId string) { - task.Accomplish(userId, models.TaskTypeBindWechat) -} - -func (t *taskNotifier) NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string) { - switch action { - case "recommend": - userIds, err := models.GetAllUserIdByDatasetId(dataset.ID) - if err != nil { - return - } - for _, userId := range userIds { - task.Accomplish(userId, models.TaskTypeDatasetRecommended) - } - } -} - -func (t *taskNotifier) NotifyCreateImage(optUserId int64, image models.Image) { - if !image.IsPrivate { - task.Accomplish(optUserId, models.TaskTypeCreatePublicImage) - } -} - -func (t *taskNotifier) NotifyImageRecommend(optUser *models.User, imageId int64, action string) { - switch action { - case "recommend": - task.Accomplish(optUser.ID, models.TaskTypeImageRecommend) - } -} - -func (t *taskNotifier) NotifyChangeUserAvatar(user *models.User) { - task.Accomplish(user.ID, models.TaskTypeChangeUserAvatar) -} - -func (t *taskNotifier) NotifyPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *repository.PushCommits) { - task.Accomplish(pusher.ID, models.TaskTypePushCommits) -} diff --git a/modules/redis/redis_client/client.go b/modules/redis/redis_client/client.go index c5cb936b3..e795234df 100644 --- a/modules/redis/redis_client/client.go +++ b/modules/redis/redis_client/client.go @@ -99,11 +99,11 @@ func IncrBy(key string, n int64) (int64, error) { } -func Expire(key string, expireSeconds int64) error { +func Expire(key string, expireTime time.Duration) error { redisClient := labelmsg.Get() defer redisClient.Close() - _, err := redisClient.Do("EXPIRE", key, expireSeconds) + _, err := redisClient.Do("EXPIRE", key, int64(expireTime.Seconds())) if err != nil { return err } diff --git a/modules/redis/redis_key/reward_redis_key.go b/modules/redis/redis_key/reward_redis_key.go index f6c9480a9..05c10ce4f 100644 --- a/modules/redis/redis_key/reward_redis_key.go +++ b/modules/redis/redis_key/reward_redis_key.go @@ -1,6 +1,8 @@ package redis_key -import "fmt" +import ( + "fmt" +) const REWARD_REDIS_PREFIX = "reward" @@ -11,6 +13,7 @@ func RewardOperateLock(requestId string, sourceType string, operateType string) func RewardOperateNotification() string { return KeyJoin(REWARD_REDIS_PREFIX, "operate", "notification") } + func RewardTaskRunningLock(taskId int64) string { return KeyJoin(REWARD_REDIS_PREFIX, "periodic_task", fmt.Sprint(taskId), "lock") } diff --git a/modules/redis/redis_key/serial_redis_key.go b/modules/redis/redis_key/serial_redis_key.go new file mode 100644 index 000000000..c0ecf39eb --- /dev/null +++ b/modules/redis/redis_key/serial_redis_key.go @@ -0,0 +1,10 @@ +package redis_key + +import "time" + +const SERIAL_REDIS_PREFIX = "serial" + +func RewardSerialCounter(now time.Time) string { + h := now.Format("200601021504") + return KeyJoin(SERIAL_REDIS_PREFIX, "reward_operate", h, "counter") +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index b5ffe6eab..217388789 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -232,7 +232,7 @@ var ( TimeoutStep: 10 * time.Second, MaxTimeout: 60 * time.Second, EventSourceUpdateTime: 10 * time.Second, - RewardNotifyUpdateTime: 3 * time.Second, + RewardNotifyUpdateTime: 2 * time.Second, }, Admin: struct { UserPagingNum int @@ -549,7 +549,9 @@ var ( WechatAuthSwitch bool //point config - CloudBrainTaskPointPaySwitch bool + CloudBrainPaySwitch bool + CloudBrainPayDelay time.Duration + CloudBrainPayInterval time.Duration //nginx proxy PROXYURL string @@ -1380,7 +1382,9 @@ func NewContext() { WechatAuthSwitch = sec.Key("AUTH_SWITCH").MustBool(false) sec = Cfg.Section("point") - CloudBrainTaskPointPaySwitch = sec.Key("CLOUDBRAIN_PAY_SWITCH").MustBool(false) + CloudBrainPaySwitch = sec.Key("CLOUDBRAIN_PAY_SWITCH").MustBool(false) + CloudBrainPayDelay = sec.Key("CLOUDBRAIN_PAY_DELAY").MustDuration(30 * time.Minute) + CloudBrainPayInterval = sec.Key("CLOUDBRAIN_PAY_INTERVAL").MustDuration(60 * time.Minute) SetRadarMapConfig() diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index b4d532ab0..bcbc5ea6d 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -233,7 +233,7 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { if !reward.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, jobType, resourceSpecId) cloudBrainNewDataPrepare(ctx) - ctx.RenderWithErr("point balance not enough", tpl, &form) + ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tpl, &form) return } @@ -319,7 +319,7 @@ func CloudBrainRestart(ctx *context.Context) { if !reward.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, task.JobType, task.ResourceSpecId) resultCode = "-1" - errorMsg = "insufficient points balance" + errorMsg = models.ErrInsufficientPointsBalance{}.Error() break } @@ -737,7 +737,7 @@ func CloudBrainAdminCommitImage(ctx *context.Context, form auth.CommitAdminImage UID: ctx.User.ID, Type: models.GetRecommondType(form.IsRecommend), Place: form.Place, - }) + }, ctx.User) if err != nil { log.Error("CommitImagefailed") if models.IsErrImageTagExist(err) { @@ -784,7 +784,7 @@ func CloudBrainCommitImage(ctx *context.Context, form auth.CommitImageCloudBrain CloudBrainType: form.Type, Topics: validTopics, UID: ctx.User.ID, - }) + }, ctx.User) if err != nil { log.Error("CommitImage(%s) failed:%v", ctx.Cloudbrain.JobName, err.Error(), ctx.Data["msgID"]) if models.IsErrImageTagExist(err) { @@ -1862,7 +1862,7 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) cloudBrainNewDataPrepare(ctx) - ctx.RenderWithErr("point balance not enough", tplCloudBrainBenchmarkNew, &form) + ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplCloudBrainBenchmarkNew, &form) return } @@ -2024,7 +2024,7 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm) if !reward.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, jobType, resourceSpecId) cloudBrainNewDataPrepare(ctx) - ctx.RenderWithErr("point balance not enough", tpl, &form) + ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tpl, &form) return } diff --git a/routers/repo/modelarts.go b/routers/repo/modelarts.go index dea996a50..6e5175e15 100755 --- a/routers/repo/modelarts.go +++ b/routers/repo/modelarts.go @@ -210,7 +210,7 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeDebug), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) cloudBrainNewDataPrepare(ctx) - ctx.RenderWithErr("point balance not enough", tplModelArtsNotebookNew, &form) + ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsNotebookNew, &form) return } count, err := models.GetCloudbrainNotebookCountByUserID(ctx.User.ID) @@ -429,7 +429,7 @@ func NotebookManage(ctx *context.Context) { if !reward.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, task.JobType, task.ResourceSpecId) resultCode = "-1" - errorMsg = "point balance not enough" + errorMsg = models.ErrInsufficientPointsBalance{}.Error() break return } @@ -1005,7 +1005,7 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeTrain), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) cloudBrainNewDataPrepare(ctx) - ctx.RenderWithErr("point balance not enough", tplModelArtsTrainJobNew, &form) + ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsTrainJobNew, &form) return } count, err := models.GetCloudbrainTrainJobCountByUserID(ctx.User.ID) @@ -1854,7 +1854,7 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeInference), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) inferenceJobErrorNewDataPrepare(ctx, form) - ctx.RenderWithErr("point balance not enough", tplModelArtsInferenceJobNew, &form) + ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsInferenceJobNew, &form) return } count, err := models.GetCloudbrainInferenceJobCountByUserID(ctx.User.ID) diff --git a/routers/reward/point/point.go b/routers/reward/point/point.go index eaae76c4f..3140b4c38 100644 --- a/routers/reward/point/point.go +++ b/routers/reward/point/point.go @@ -1,8 +1,10 @@ package point import ( + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/routers/response" + "code.gitea.io/gitea/services/reward" "code.gitea.io/gitea/services/reward/point/account" "net/http" ) @@ -29,3 +31,47 @@ func GetPointAccount(ctx *context.Context) { } ctx.JSON(http.StatusOK, response.SuccessWithData(res)) } + +func GetPointRecordList(ctx *context.Context) { + operateType := ctx.Query("operate") + page := ctx.QueryInt("page") + var orderBy models.RewardOperateOrderBy + switch ctx.Query("sort") { + default: + orderBy = models.RewardOrderByID + } + t := models.GetRewardOperateTypeInstance(operateType) + if t == "" { + ctx.JSON(http.StatusOK, response.ServerError("param error")) + return + } + + r, err := reward.GetRewardRecordList(models.RewardRecordListOpts{ + ListOptions: models.ListOptions{PageSize: 20, Page: page}, + UserId: ctx.User.ID, + OperateType: t, + RewardType: models.RewardTypePoint, + OrderBy: orderBy, + }) + if err != nil { + ctx.JSON(http.StatusOK, response.ServerError(err.Error())) + return + } + + ctx.JSON(http.StatusOK, response.SuccessWithData(r)) + return +} + +func OperatePointAccountBalance(ctx *context.Context, req models.AdminRewardOperateReq) { + req.RewardType = models.RewardTypePoint + if req.OperateType.Name() == "" { + ctx.JSON(http.StatusOK, "param error") + return + } + err := reward.AdminBalanceOperate(req, ctx.User) + if err != nil { + ctx.JSON(http.StatusOK, response.ServerError(err.Error())) + return + } + ctx.JSON(http.StatusOK, response.Success()) +} diff --git a/routers/routes/routes.go b/routers/routes/routes.go index 3ce633f93..0658765ca 100755 --- a/routers/routes/routes.go +++ b/routers/routes/routes.go @@ -324,6 +324,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Get("/", routers.Home) m.Get("/dashboard", routers.Dashboard) go routers.SocketManager.Run() + go task.RunTask() m.Get("/action/notification", routers.ActionNotification) m.Get("/reward/notification", routers.ActionNotification) m.Get("/recommend/org", routers.RecommendOrgFromPromote) @@ -594,6 +595,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Group("/reward/point", func() { m.Get("/limiter/list", point.GetPointLimitConfigList) m.Post("/limiter/add", bindIgnErr(models.LimitConfigVO{}), point.AddPointLimitConfig) + m.Post("/operate", binding.Bind(models.AdminRewardOperateReq{}), point.OperatePointAccountBalance) }) m.Group("/task/config", func() { @@ -601,6 +603,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Post("/add", bindIgnErr(models.TaskConfigWithLimit{}), task.AddTaskConfig) m.Post("/add/batch", bindIgnErr(models.BatchLimitConfigVO{}), task.BatchAddTaskConfig) }) + }, adminReq) // ***** END: Admin ***** @@ -1330,6 +1333,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Group("/reward/point", func() { m.Get("/account", point.GetPointAccount) + m.Get("/record/list", point.GetPointRecordList) }, reqSignIn) if setting.API.EnableSwagger { diff --git a/routers/task/task.go b/routers/task/task.go new file mode 100644 index 000000000..1d3b8595b --- /dev/null +++ b/routers/task/task.go @@ -0,0 +1,15 @@ +package task + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/services/task" +) + +func RunTask() { + for { + select { + case action := <-models.ActionChan4Task: + task.Accomplish(action) + } + } +} diff --git a/routers/user/setting/profile.go b/routers/user/setting/profile.go index 1c1e664d0..0d788b422 100755 --- a/routers/user/setting/profile.go +++ b/routers/user/setting/profile.go @@ -166,7 +166,7 @@ func AvatarPost(ctx *context.Context, form auth.AvatarForm) { if err := UpdateAvatarSetting(ctx, form, ctx.User); err != nil { ctx.Flash.Error(err.Error()) } else { - notification.NotifyChangeUserAvatar(ctx.User) + notification.NotifyChangeUserAvatar(ctx.User, form) ctx.Flash.Success(ctx.Tr("settings.update_avatar_success")) } diff --git a/services/reward/admin_operate.go b/services/reward/admin_operate.go new file mode 100644 index 000000000..1eec0f414 --- /dev/null +++ b/services/reward/admin_operate.go @@ -0,0 +1,50 @@ +package reward + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/services/reward/limiter" +) + +func AdminBalanceOperate(req models.AdminRewardOperateReq, doer *models.User) error { + logId := util.UUID() + _, err := models.InsertRewardAdminLog(&models.RewardAdminLog{ + LogId: logId, + Amount: req.Amount, + RewardType: req.RewardType.Name(), + TargetUserId: req.TargetUserId, + CreatorId: doer.ID, + CreatorName: doer.Name, + Remark: req.Remark, + Status: models.RewardAdminLogProcessing, + }) + if err != nil { + log.Error("AdminBalanceOperate InsertRewardAdminLog error.%v", err) + return err + } + + //reward + err = Operate(&models.RewardOperateContext{ + SourceType: models.SourceTypeAdminOperate, + SourceId: logId, + Tittle: "管理员操作", + Reward: models.Reward{ + Amount: req.Amount, + Type: req.RewardType, + }, + TargetUserId: req.TargetUserId, + RequestId: logId, + OperateType: req.OperateType, + Remark: req.Remark, + RejectPolicy: limiter.JustReject, + }) + + if err != nil { + log.Error("AdminBalanceOperate operate error.%v", err) + models.UpdateRewardAdminLogStatus(logId, models.RewardAdminLogProcessing, models.RewardAdminLogFailed) + return err + } + models.UpdateRewardAdminLogStatus(logId, models.RewardAdminLogProcessing, models.RewardAdminLogSuccess) + return nil +} diff --git a/services/reward/cloubrain_deduct.go b/services/reward/cloubrain_deduct.go index 61068a87a..ce23e2dc7 100644 --- a/services/reward/cloubrain_deduct.go +++ b/services/reward/cloubrain_deduct.go @@ -15,9 +15,11 @@ var ( TrainResourceSpecs *models.ResourceSpecs ) +const RUN_CLOUDBRAIN_TASK_TITTLE = "运行云脑任务" + //IsPointBalanceEnough check whether the user's point balance is bigger than task unit price func IsPointBalanceEnough(targetUserId int64, jobType string, resourceSpecId int) bool { - if !setting.CloudBrainTaskPointPaySwitch { + if !setting.CloudBrainPaySwitch { return true } spec := getResourceSpec(jobType, resourceSpecId) @@ -33,7 +35,7 @@ func IsPointBalanceEnough(targetUserId int64, jobType string, resourceSpecId int } func StartCloudBrainPointDeductTask(task models.Cloudbrain) { - if !setting.CloudBrainTaskPointPaySwitch { + if !setting.CloudBrainPaySwitch { return } @@ -48,11 +50,12 @@ func StartCloudBrainPointDeductTask(task models.Cloudbrain) { TargetUserId: task.UserID, RequestId: getCloudBrainPointTaskSourceId(task), OperateType: models.OperateTypeDecrease, - Delay: 30 * time.Minute, - Interval: 60 * time.Minute, + Delay: setting.CloudBrainPayDelay, + Interval: setting.CloudBrainPayInterval, UnitAmount: spec.UnitPrice, RewardType: models.RewardTypePoint, StartTime: time.Unix(int64(task.StartTime), 0), + Tittle: RUN_CLOUDBRAIN_TASK_TITTLE, }) } @@ -61,7 +64,7 @@ func StopCloudBrainPointDeductTask(task models.Cloudbrain) { } func getCloudBrainPointTaskSourceId(task models.Cloudbrain) string { - return models.SourceTypeRunCloudbrainTask.Name() + "_" + task.JobType + "_" + fmt.Sprint(task.Type) + "_" + fmt.Sprint(task.ID) + return fmt.Sprint(task.ID) } func getResourceSpec(jobType string, resourceSpecId int) *models.ResourceSpec { @@ -100,11 +103,11 @@ func StartCloudbrainPointDeductTask() { }() log.Debug("try to run CloudbrainPointDeductTask") end := time.Now() - start := end.Add(5 * time.Minute) + start := end.Add(-5 * time.Minute) if firstTimeFlag { //When it is executed for the first time, it needs to process the tasks of the last 1 hours. //This is done to prevent the application from hanging for a long time - start = end.Add(1 * time.Hour) + start = end.Add(-1 * time.Hour) firstTimeFlag = false } diff --git a/services/reward/limiter/limiter.go b/services/reward/limiter/limiter.go index f094e3a43..88e72a1a1 100644 --- a/services/reward/limiter/limiter.go +++ b/services/reward/limiter/limiter.go @@ -12,14 +12,6 @@ import ( "time" ) -type limiterRejectPolicy string - -const ( - JustReject limiterRejectPolicy = "JUST_REJECT" - PermittedOnce limiterRejectPolicy = "PERMITTED_ONCE" - FillUp limiterRejectPolicy = "FillUp" -) - type limiterRunner struct { limiters []models.LimitConfig index int @@ -27,7 +19,7 @@ type limiterRunner struct { amount int64 limitCode string limitType models.LimitType - rejectPolicy limiterRejectPolicy + rejectPolicy models.LimiterRejectPolicy resultMap map[int]limitResult minRealAmount int64 } @@ -46,7 +38,7 @@ func newLimitResult(isLoss bool, planAmount int64, realAmount int64) limitResult } } -func newLimiterRunner(limitCode string, limitType models.LimitType, userId, amount int64, policy limiterRejectPolicy) *limiterRunner { +func newLimiterRunner(limitCode string, limitType models.LimitType, userId, amount int64, policy models.LimiterRejectPolicy) *limiterRunner { return &limiterRunner{ userId: userId, amount: amount, @@ -149,7 +141,7 @@ func (l *limiterRunner) limit(r models.LimitConfig) error { usedNum, err = redis_client.IncrBy(redisKey, n) } if p != nil { - redis_client.Expire(redisKey, int64(p.LeftTime.Seconds())) + redis_client.Expire(redisKey, p.LeftTime) } } if usedNum > r.LimitNum { @@ -158,16 +150,16 @@ func (l *limiterRunner) limit(r models.LimitConfig) error { return errors.New(fmt.Sprintf("%s:over limit", r.Tittle)) } switch l.rejectPolicy { - case FillUp: + case models.FillUp: exceed := usedNum - r.LimitNum realAmount := l.amount - exceed redis_client.IncrBy(redisKey, -1*exceed) l.resultMap[l.index] = newLimitResult(true, l.amount, realAmount) return nil - case JustReject: + case models.JustReject: redis_client.IncrBy(redisKey, -1*l.amount) return errors.New(fmt.Sprintf("%s:over limit", r.Tittle)) - case PermittedOnce: + case models.PermittedOnce: l.resultMap[l.index] = newLimitResult(false, l.amount, l.amount) return nil } @@ -200,8 +192,11 @@ func (l *limiterRunner) countInPeriod(r models.LimitConfig, p *models.PeriodResu } } -func CheckLimitWithFillUp(limitCode string, limitType models.LimitType, userId, amount int64) (int64, error) { - r := newLimiterRunner(limitCode, limitType, userId, amount, FillUp) +func CheckLimit(limitCode string, limitType models.LimitType, userId, amount int64, rejectPolicy models.LimiterRejectPolicy) (int64, error) { + if rejectPolicy == "" { + rejectPolicy = models.JustReject + } + r := newLimiterRunner(limitCode, limitType, userId, amount, rejectPolicy) err := r.Run() if err != nil { return 0, err @@ -209,18 +204,6 @@ func CheckLimitWithFillUp(limitCode string, limitType models.LimitType, userId, return r.minRealAmount, nil } -func CheckLimitWithPermittedOnce(limitCode string, limitType models.LimitType, userId, amount int64) error { - r := newLimiterRunner(limitCode, limitType, userId, amount, PermittedOnce) - err := r.Run() - return err -} - -func CheckLimit(limitCode string, limitType models.LimitType, userId, amount int64) error { - r := newLimiterRunner(limitCode, limitType, userId, amount, JustReject) - err := r.Run() - return err -} - func GetLimiters(limitCode string, limitType models.LimitType) ([]models.LimitConfig, error) { limiters, err := GetLimitersByLimitType(limitType) if err != nil { diff --git a/services/reward/operator.go b/services/reward/operator.go index 50ec01ff3..865ac10d0 100644 --- a/services/reward/operator.go +++ b/services/reward/operator.go @@ -5,7 +5,6 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" - "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/services/reward/point" "errors" "fmt" @@ -69,7 +68,7 @@ func Operate(ctx *models.RewardOperateContext) error { } //new reward operate record - recordId, err := initAwardOperateRecord(ctx) + recordId, err := initRewardOperateRecord(ctx) if err != nil { return err } @@ -110,9 +109,12 @@ func isHandled(sourceType string, requestId string, operateType string) (bool, e } -func initAwardOperateRecord(ctx *models.RewardOperateContext) (string, error) { +func initRewardOperateRecord(ctx *models.RewardOperateContext) (string, error) { + sn, err := generateOperateSerialNo(ctx.OperateType, ctx.Reward.Type) + if err != nil { + return "", err + } record := &models.RewardOperateRecord{ - RecordId: util.UUID(), UserId: ctx.TargetUserId, Amount: ctx.Reward.Amount, RewardType: ctx.Reward.Type.Name(), @@ -122,17 +124,22 @@ func initAwardOperateRecord(ctx *models.RewardOperateContext) (string, error) { OperateType: ctx.OperateType.Name(), Status: models.OperateStatusOperating, Remark: ctx.Remark, + Tittle: ctx.Tittle, + SerialNo: sn, } - _, err := models.InsertAwardOperateRecord(record) + _, err = models.InsertRewardOperateRecord(record) if err != nil { return "", err } - return record.RecordId, nil + return record.SerialNo, nil } func createPeriodicRewardOperateRecord(ctx *models.StartPeriodicTaskOpts) (string, error) { + sn, err := generateOperateSerialNo(ctx.OperateType, ctx.RewardType) + if err != nil { + return "", err + } record := &models.RewardOperateRecord{ - RecordId: util.UUID(), UserId: ctx.TargetUserId, Amount: 0, RewardType: ctx.RewardType.Name(), @@ -142,12 +149,14 @@ func createPeriodicRewardOperateRecord(ctx *models.StartPeriodicTaskOpts) (strin OperateType: ctx.OperateType.Name(), Status: models.OperateStatusOperating, Remark: ctx.Remark, + Tittle: ctx.Tittle, + SerialNo: sn, } - _, err := models.InsertAwardOperateRecord(record) + _, err = models.InsertRewardOperateRecord(record) if err != nil { return "", err } - return record.RecordId, nil + return record.SerialNo, nil } func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) error { @@ -230,5 +239,30 @@ func StopPeriodicTask(sourceType models.SourceType, sourceId string, operateType } now := time.Now() RunRewardTask(*task, now) - return models.StopPeriodicTask(task.ID, task.OperateRecordId, now) + return models.StopPeriodicTask(task.ID, task.OperateSerialNo, now) +} + +func generateOperateSerialNo(operateType models.RewardOperateType, rewardType models.RewardType) (string, error) { + s, err := GetSerialNoByRedis() + if err != nil { + return "", err + } + + switch operateType { + case models.OperateTypeIncrease: + s += "1" + case models.OperateTypeDecrease: + s += "2" + default: + s += "9" + } + + switch rewardType { + case models.RewardTypePoint: + s += "1" + default: + s += "9" + } + + return s, nil } diff --git a/services/reward/period_task.go b/services/reward/period_task.go index d00e8d0c4..846989652 100644 --- a/services/reward/period_task.go +++ b/services/reward/period_task.go @@ -15,7 +15,7 @@ func NewRewardPeriodicTask(operateRecordId string, opts *models.StartPeriodicTas task.DelaySeconds = int64(opts.Delay.Seconds()) task.IntervalSeconds = int64(opts.Interval.Seconds()) task.Amount = opts.UnitAmount - task.OperateRecordId = operateRecordId + task.OperateSerialNo = operateRecordId task.Status = models.PeriodicTaskStatusRunning task.NextExecuteTime = timeutil.TimeStamp(opts.StartTime.Add(opts.Delay).Unix()) @@ -54,9 +54,9 @@ func RunRewardTask(t models.RewardPeriodicTask, now time.Time) { return } defer lock.UnLock() - record, err := models.GetPointOperateRecordByRecordId(t.OperateRecordId) + record, err := models.GetPointOperateRecordBySerialNo(t.OperateSerialNo) if err != nil { - log.Error("RunRewardTask. GetPointOperateRecordByRecordId error. %v", err) + log.Error("RunRewardTask. GetPointOperateRecordBySerialNo error. %v", err) return } if record.Status != models.OperateStatusOperating { @@ -75,7 +75,7 @@ func RunRewardTask(t models.RewardPeriodicTask, now time.Time) { } err = operator.Operate(&models.RewardOperateContext{ SourceType: models.SourceTypeRunCloudbrainTask, - SourceId: t.OperateRecordId, + SourceId: t.OperateSerialNo, Reward: models.Reward{ Amount: n * t.Amount, Type: models.GetRewardTypeInstance(record.RewardType), diff --git a/services/reward/point/point_operate.go b/services/reward/point/point_operate.go index 4b84cdd0c..51a3657ad 100644 --- a/services/reward/point/point_operate.go +++ b/services/reward/point/point_operate.go @@ -18,7 +18,7 @@ type PointOperator struct { } func (operator *PointOperator) IsLimited(ctx *models.RewardOperateContext) bool { - realAmount, err := limiter.CheckLimitWithFillUp(ctx.SourceType.Name(), models.LimitTypeRewardPoint, ctx.TargetUserId, ctx.Reward.Amount) + realAmount, err := limiter.CheckLimit(ctx.SourceType.Name(), models.LimitTypeRewardPoint, ctx.TargetUserId, ctx.Reward.Amount, ctx.RejectPolicy) if err != nil { return true } diff --git a/services/reward/record.go b/services/reward/record.go new file mode 100644 index 000000000..ac28b3565 --- /dev/null +++ b/services/reward/record.go @@ -0,0 +1,20 @@ +package reward + +import "code.gitea.io/gitea/models" + +type RecordResponse struct { + Records []models.RewardOperateRecordShow + Total int64 +} + +func GetRewardRecordList(opts models.RewardRecordListOpts) (*RecordResponse, error) { + l, n, err := models.GetRewardRecordList(opts) + if err != nil { + return nil, err + } + r := make([]models.RewardOperateRecordShow, 0) + for _, v := range l { + r = append(r, v.ToShow()) + } + return &RecordResponse{Records: r, Total: n}, nil +} diff --git a/services/reward/serial.go b/services/reward/serial.go new file mode 100644 index 000000000..e9509c403 --- /dev/null +++ b/services/reward/serial.go @@ -0,0 +1,21 @@ +package reward + +import ( + "code.gitea.io/gitea/modules/redis/redis_client" + "code.gitea.io/gitea/modules/redis/redis_key" + "fmt" + "math/rand" + "time" +) + +func GetSerialNoByRedis() (string, error) { + now := time.Now() + n, err := redis_client.IncrBy(redis_key.RewardSerialCounter(now), 1) + if err != nil { + return "", err + } + if n == 1 { + redis_client.Expire(redis_key.RewardSerialCounter(now), 5*time.Minute) + } + return now.Format("200601021504") + fmt.Sprint(rand.Intn(10)) + fmt.Sprintf("%02d", n), nil +} diff --git a/services/task/task.go b/services/task/task.go index 4c85ce52e..c2c4861a3 100644 --- a/services/task/task.go +++ b/services/task/task.go @@ -9,17 +9,33 @@ import ( "fmt" ) -func Accomplish(userId int64, taskType string) { - go accomplish(userId, taskType) +func Accomplish(action models.Action) { + switch action.OpType { + case models.ActionCreateRepo, + models.ActionCreateImage: + if action.Repo.IsPrivate { + return + } + case models.ActionCreateDebugGPUTask, + models.ActionCreateDebugNPUTask, + models.ActionCreateTrainTask, + models.ActionCreateInferenceTask, + models.ActionCreateBenchMarkTask, + models.ActionCreateGPUTrainTask: + action.OpType = models.ActionCreateCloudbrainTask + } + go accomplish(action) } -func accomplish(userId int64, taskType string) error { +func accomplish(action models.Action) error { defer func() { if err := recover(); err != nil { combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) log.Error("PANIC:%v", combinedErr) } }() + userId := action.ActUserID + taskType := fmt.Sprint(action.OpType) //get task config config, err := GetTaskConfig(taskType) @@ -33,7 +49,7 @@ func accomplish(userId int64, taskType string) error { } //is limited? - if isLimited(userId, config) { + if isLimited(userId, config, limiter.JustReject) { log.Info("task accomplish maximum times are reached,userId=%d taskType=%s", userId, taskType) return nil } @@ -45,6 +61,7 @@ func accomplish(userId int64, taskType string) error { ConfigId: config.ID, TaskCode: config.TaskCode, UserId: userId, + ActionId: action.ID, }) if err != nil { return err @@ -54,6 +71,7 @@ func accomplish(userId int64, taskType string) error { reward.Operate(&models.RewardOperateContext{ SourceType: models.SourceTypeAccomplishTask, SourceId: logId, + Tittle: config.Tittle, Reward: models.Reward{ Amount: config.AwardAmount, Type: models.GetRewardTypeInstance(config.AwardType), @@ -61,13 +79,14 @@ func accomplish(userId int64, taskType string) error { TargetUserId: userId, RequestId: logId, OperateType: models.OperateTypeIncrease, + RejectPolicy: limiter.FillUp, }) return nil } -func isLimited(userId int64, config *models.TaskConfig) bool { - if err := limiter.CheckLimit(config.TaskCode, models.LimitTypeTask, userId, 1); err != nil { +func isLimited(userId int64, config *models.TaskConfig, rejectPolicy limiter.LimiterRejectPolicy) bool { + if _, err := limiter.CheckLimit(config.TaskCode, models.LimitTypeTask, userId, 1, rejectPolicy); err != nil { return true } return false