diff --git a/models/limit_config.go b/models/limit_config.go new file mode 100644 index 000000000..273af0de1 --- /dev/null +++ b/models/limit_config.go @@ -0,0 +1,33 @@ +package models + +import "code.gitea.io/gitea/modules/timeutil" + +type LimitConfig struct { + ID int64 `xorm:"pk autoincr"` + Tittle string + RefreshRate string `xorm:"NOT NULL"` + Scope string `xorm:"NOT NULL"` + LimitNum int64 `xorm:"NOT NULL"` + LimitCode string `xorm:"NOT NULL"` + Creator int64 `xorm:"NOT NULL"` + CreatedUnix timeutil.TimeStamp `xorm:"created"` + DeletedAt timeutil.TimeStamp `xorm:"deleted"` +} + +func findLimitConfig(tl *LimitConfig) ([]LimitConfig, error) { + r := make([]LimitConfig, 0) + err := x.Find(r, tl) + if err != nil { + return nil, err + } else if len(r) == 0 { + return nil, ErrRecordNotExist{} + } + return r, nil +} + +func GetLimitConfigByLimitCode(limitCode string) ([]LimitConfig, error) { + t := &LimitConfig{ + LimitCode: limitCode, + } + return findLimitConfig(t) +} diff --git a/models/point_account.go b/models/point_account.go index f889d5d4f..7fa38cb7a 100644 --- a/models/point_account.go +++ b/models/point_account.go @@ -6,27 +6,92 @@ type PointAccountStatus int // Possible PointAccountStatus types. const ( - PointAccountNormal PointAccountStatus = iota + 1 // 1 - PointAccountFreeze // 2 - PointAccountDeleted // 3 + PointAccountNormal int = iota + 1 // 1 + PointAccountFreeze // 2 + PointAccountDeleted // 3 ) type PointAccount struct { ID int64 `xorm:"pk autoincr"` + AccountCode string `xorm:"INDEX NOT NULL"` Balance int64 `xorm:"NOT NULL DEFAULT 0"` TotalEarned int64 `xorm:"NOT NULL DEFAULT 0"` TotalConsumed int64 `xorm:"NOT NULL DEFAULT 0"` UserId int64 `xorm:"INDEX NOT NULL"` - Status string `xorm:"NOT NULL"` + Status int `xorm:"NOT NULL"` Version int64 `xorm:"NOT NULL"` CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` } -func (account *PointAccount) Increase(amount int64) error { +func (account *PointAccount) Increase(amount int64, sourceId string) error { + sess := x.NewSession() + defer sess.Close() + sql := "update point_account set balance = balance + ?,total_earned = total_earned + ? ,version = version + 1 where account_code = ? " + _, err := sess.Exec(sql, amount, amount, account.AccountCode) + if err != nil { + sess.Rollback() + return err + } + accountLog := &PointAccountLog{ + AccountCode: account.AccountCode, + UserId: account.UserId, + Type: IncreaseAccountBalance, + SourceId: sourceId, + PointsAmount: amount, + BalanceBefore: account.Balance, + BalanceAfter: account.Balance + amount, + AccountVersion: account.Version, + } + _, err = sess.Insert(accountLog) + if err != nil { + sess.Rollback() + return err + } + sess.Commit() return nil } -func (account *PointAccount) Decrease(amount int64) error { +func (account *PointAccount) Decrease(amount int64, sourceId string) error { + sess := x.NewSession() + defer sess.Close() + sql := "update point_account set balance = balance - ?,total_consumed = total_consumed + ? ,version = version + 1 where account_code = ? " + _, err := sess.Exec(sql, amount, amount, account.AccountCode) + if err != nil { + sess.Rollback() + return err + } + accountLog := &PointAccountLog{ + AccountCode: account.AccountCode, + UserId: account.UserId, + Type: DecreaseAccountBalance, + SourceId: sourceId, + PointsAmount: amount, + BalanceBefore: account.Balance, + BalanceAfter: account.Balance - amount, + AccountVersion: account.Version, + } + _, err = sess.Insert(accountLog) + if err != nil { + sess.Rollback() + return err + } + sess.Commit() return nil } + +func GetAccountByUserId(userId int64) (*PointAccount, error) { + p := &PointAccount{} + has, err := x.Where("user_id = ?", userId).Get(p) + if err != nil { + return nil, err + } + if !has { + return nil, nil + } + return p, nil +} + +func InsertAccount(tl *PointAccount) (int64, error) { + return x.Insert(tl) +} diff --git a/models/point_account_log.go b/models/point_account_log.go index f699495e7..3ed39ed77 100644 --- a/models/point_account_log.go +++ b/models/point_account_log.go @@ -2,15 +2,20 @@ package models import "code.gitea.io/gitea/modules/timeutil" +const ( + IncreaseAccountBalance = "increase" + DecreaseAccountBalance = "decrease" +) + type PointAccountLog struct { ID int64 `xorm:"pk autoincr"` - AccountId int64 `xorm:"INDEX NOT NULL"` + AccountCode string `xorm:"INDEX NOT NULL"` UserId int64 `xorm:"INDEX NOT NULL"` Type string `xorm:"NOT NULL"` SourceId string `xorm:"INDEX NOT NULL"` PointsAmount int64 `xorm:"NOT NULL"` - AmountBefore int64 `xorm:"NOT NULL"` - AmountAfter int64 `xorm:"NOT NULL"` + BalanceBefore int64 `xorm:"NOT NULL"` + BalanceAfter int64 `xorm:"NOT NULL"` AccountVersion int64 `xorm:"NOT NULL"` CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` } diff --git a/models/point_limit_config.go b/models/point_limit_config.go deleted file mode 100644 index 60fb5e735..000000000 --- a/models/point_limit_config.go +++ /dev/null @@ -1,23 +0,0 @@ -package models - -import "code.gitea.io/gitea/modules/timeutil" - -const ( - LimitConfigRefreshRateOnce = "ONCE" - LimitConfigRefreshRateDaily = "DAILY" -) -const ( - LimitTargetRangeAllUser = "ALL_USER" - LimitTargetRangeSingleUser = "SINGLE_USER" -) - -type PointLimitConfig struct { - ID int64 `xorm:"pk autoincr"` - Tittle string - RefreshRate string `xorm:"NOT NULL"` - TargetRange string `xorm:"NOT NULL"` - LimitNum int64 `xorm:"NOT NULL"` - Creator int64 `xorm:"NOT NULL"` - CreatedUnix timeutil.TimeStamp `xorm:"created"` - DeletedAt timeutil.TimeStamp `xorm:"deleted"` -} diff --git a/models/point_operate_record.go b/models/point_operate_record.go deleted file mode 100644 index b0ffb094c..000000000 --- a/models/point_operate_record.go +++ /dev/null @@ -1,42 +0,0 @@ -package models - -import "code.gitea.io/gitea/modules/timeutil" - -type RewardSourceType string - -const ( - SourceTypeAccomplishTask RewardSourceType = "ACCOMPLISH_TASK" - SourceTypeAdminOperate RewardSourceType = "ADMIN_OPERATE" - SourceTypeRunCloudbrainTask RewardSourceType = "RUN_CLOUBRAIN_TASK" -) - -type RewardType string - -const ( - RewardTypePoint RewardType = "POINT" -) - -const ( - OperateTypeIncrease = "INCREASE_POINT" - OperateTypeDecrease = "DECREASE_POINT" -) - -const ( - OperateStatusOperating = "OPERATING" - OperateStatusSucceeded = "SUCCEEDED" - OperateStatusFailed = "FAILED" -) - -type PointOperateRecord struct { - ID int64 `xorm:"pk autoincr"` - UserId int64 `xorm:"INDEX NOT NULL"` - PointsAmount int64 `xorm:"NOT NULL"` - RelatedType string `xorm:"NOT NULL"` - SourceId string `xorm:"INDEX NOT NULL"` - OperateType string `xorm:"NOT NULL"` - OperateRate string `xorm:"NOT NULL default once"` - Status string `xorm:"NOT NULL"` - Remark string - CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` - UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` -} diff --git a/models/reward_operate_record.go b/models/reward_operate_record.go new file mode 100644 index 000000000..ca1f52168 --- /dev/null +++ b/models/reward_operate_record.go @@ -0,0 +1,84 @@ +package models + +import ( + "code.gitea.io/gitea/modules/timeutil" + "fmt" +) + +type RewardSourceType string + +const ( + SourceTypeAccomplishTask RewardSourceType = "ACCOMPLISH_TASK" + SourceTypeAdminOperate RewardSourceType = "ADMIN_OPERATE" + SourceTypeRunCloudbrainTask RewardSourceType = "RUN_CLOUBRAIN_TASK" +) + +func (r *RewardSourceType) String() string { + return fmt.Sprint(r) +} + +type RewardType string + +const ( + RewardTypePoint RewardType = "POINT" +) + +func (r *RewardType) String() string { + return fmt.Sprint(r) +} + +const ( + OperateTypeIncrease = "INCREASE_POINT" + OperateTypeDecrease = "DECREASE_POINT" +) + +const ( + OperateStatusOperating = "OPERATING" + OperateStatusSucceeded = "SUCCEEDED" + OperateStatusFailed = "FAILED" +) + +type RewardOperateRecord struct { + ID int64 `xorm:"pk autoincr"` + UserId int64 `xorm:"INDEX NOT NULL"` + Amount int64 `xorm:"NOT NULL"` + RewardType string `xorm:"NOT NULL"` + SourceType string `xorm:"NOT NULL"` + SourceId string `xorm:"INDEX NOT NULL"` + RequestId string `xorm:"INDEX NOT NULL"` + OperateType string `xorm:"NOT NULL"` + CycleIntervalSeconds int64 `xorm:"NOT NULL default 0"` + Status string `xorm:"NOT NULL"` + Remark string + CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` + UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` +} + +func getPointOperateRecord(tl *RewardOperateRecord) (*RewardOperateRecord, error) { + has, err := x.Get(tl) + if err != nil { + return nil, err + } else if !has { + return nil, ErrRecordNotExist{} + } + return tl, nil +} + +func GetPointOperateRecordBySourceTypeAndRequestId(sourceType, requestId string) (*RewardOperateRecord, error) { + t := &RewardOperateRecord{ + SourceType: sourceType, + RequestId: requestId, + } + return getPointOperateRecord(t) +} + +func InsertAwardOperateRecord(tl *RewardOperateRecord) (int64, error) { + return x.Insert(tl) +} + +func UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) (int64, error) { + r := &RewardOperateRecord{ + Status: newStatus, + } + return x.Cols("status").Where("source_type=? and requestId=? and status=?", sourceType, requestId, oldStatus).Update(r) +} diff --git a/models/task_accomplish_log.go b/models/task_accomplish_log.go index ed2927678..707c214f5 100644 --- a/models/task_accomplish_log.go +++ b/models/task_accomplish_log.go @@ -7,6 +7,7 @@ import ( type TaskAccomplishLog struct { ID int64 `xorm:"pk autoincr"` + LogId string `xorm:"INDEX NOT NULL"` ConfigId int64 `xorm:"NOT NULL"` TaskCode string `xorm:"NOT NULL"` UserId int64 `xorm:"INDEX NOT NULL"` @@ -14,9 +15,10 @@ type TaskAccomplishLog struct { CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` } -type LimiterPeriod struct { +type PeriodResult struct { StartTime time.Time EndTime time.Time + LeftTime time.Duration } func getTaskAccomplishLog(tl *TaskAccomplishLog) (*TaskAccomplishLog, error) { @@ -37,7 +39,7 @@ func GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskCode string) (*Task return getTaskAccomplishLog(t) } -func CountInTaskPeriod(configId int64, userId int64, period *LimiterPeriod) (int64, error) { +func CountInTaskPeriod(configId int64, userId int64, period *PeriodResult) (int64, error) { if period == nil { return x.Where("config_id = ? and user_id = ?", configId, userId).Count(&TaskAccomplishLog{}) } else { diff --git a/models/task_config.go b/models/task_config.go index f74237b59..036f4e315 100644 --- a/models/task_config.go +++ b/models/task_config.go @@ -16,8 +16,8 @@ func (t *TaskType) String() string { } const ( - TaskConfigRefreshRateNotCycle = "NOT_CYCLE" - TaskConfigRefreshRateDaily = "DAILY" + PeriodNotCycle = "NOT_CYCLE" + PeriodDaily = "DAILY" ) //PointTaskConfig Only add and delete are allowed, edit is not allowed diff --git a/modules/redis/redis_client/client.go b/modules/redis/redis_client/client.go index 437aecdae..2c487a72c 100644 --- a/modules/redis/redis_client/client.go +++ b/modules/redis/redis_client/client.go @@ -85,3 +85,48 @@ func TTL(key string) (int, error) { return n, nil } + +func IncrBy(key string, n int64) (int64, error) { + redisClient := labelmsg.Get() + defer redisClient.Close() + + reply, err := redisClient.Do("INCRBY", key, n) + if err != nil { + return 0, err + } + i, err := strconv.ParseInt(fmt.Sprint(reply), 10, 64) + return i, nil + +} + +func Expire(key string, expireSeconds int64) error { + redisClient := labelmsg.Get() + defer redisClient.Close() + + _, err := redisClient.Do("EXPIRE ", key, expireSeconds) + if err != nil { + return err + } + return nil + +} + +//GetInt64 get redis value by Get(key) +//and then parse the value to int64 +//return {isExist(bool)} {value(int64)} {error(error)} +func GetInt64(key string) (bool, int64, error) { + str, err := Get(key) + if err != nil { + return false, 0, err + } + if str == "" { + return false, 0, nil + } + + i, err := strconv.ParseInt(str, 10, 64) + if err != nil { + return false, 0, err + } + return true, i, nil + +} diff --git a/modules/redis/redis_key/account_redis_key.go b/modules/redis/redis_key/account_redis_key.go new file mode 100644 index 000000000..f36a8ea5c --- /dev/null +++ b/modules/redis/redis_key/account_redis_key.go @@ -0,0 +1,17 @@ +package redis_key + +import "fmt" + +const ACCOUNT_REDIS_PREFIX = "account" + +func PointAccountOperateLock(accountCode string) string { + return KeyJoin(ACCOUNT_REDIS_PREFIX, accountCode, "operate", "lock") +} + +func PointAccountDetail(userId int64) string { + return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "detail") +} + +func PointAccountInitLock(userId int64) string { + return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "init", "lock") +} diff --git a/modules/redis/redis_key/limit_redis_key.go b/modules/redis/redis_key/limit_redis_key.go new file mode 100644 index 000000000..e9d8352a2 --- /dev/null +++ b/modules/redis/redis_key/limit_redis_key.go @@ -0,0 +1,26 @@ +package redis_key + +import ( + "code.gitea.io/gitea/models" + "fmt" +) + +const LIMIT_REDIS_PREFIX = "limit" + +func LimitCount(userId int64, limitCode string, period *models.PeriodResult) string { + if userId == 0 { + if period == nil { + return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, "count") + } + return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count") + } + if period == nil { + return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, "count") + } + return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count") + +} + +func LimitConfig(limitCode string) string { + return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, "config") +} diff --git a/modules/redis/redis_key/reward_redis_key.go b/modules/redis/redis_key/reward_redis_key.go new file mode 100644 index 000000000..df8c0ca16 --- /dev/null +++ b/modules/redis/redis_key/reward_redis_key.go @@ -0,0 +1,7 @@ +package redis_key + +const REWARD_REDIS_PREFIX = "reward" + +func RewardSendLock(requestId string, sourceType string) string { + return KeyJoin(REWARD_REDIS_PREFIX, requestId, sourceType, "send") +} diff --git a/modules/redis/redis_key/task_redis_key.go b/modules/redis/redis_key/task_redis_key.go index b33e575fb..2eb8c21d1 100644 --- a/modules/redis/redis_key/task_redis_key.go +++ b/modules/redis/redis_key/task_redis_key.go @@ -2,13 +2,12 @@ package redis_key import ( "code.gitea.io/gitea/models" - "fmt" ) const TASK_REDIS_PREFIX = "task" -func TaskAccomplishLock(userId int64, sourceId string, taskType models.TaskType) string { - return KeyJoin(TASK_REDIS_PREFIX, fmt.Sprint(userId), sourceId, taskType.String(), "accomplish") +func TaskAccomplishLock(sourceId string, taskType models.TaskType) string { + return KeyJoin(TASK_REDIS_PREFIX, sourceId, taskType.String(), "accomplish") } func TaskConfig(taskType models.TaskType) string { diff --git a/modules/util/uuid_util.go b/modules/util/uuid_util.go new file mode 100644 index 000000000..301c6ff38 --- /dev/null +++ b/modules/util/uuid_util.go @@ -0,0 +1,10 @@ +package util + +import ( + gouuid "github.com/satori/go.uuid" + "strings" +) + +func UUID() string { + return strings.ReplaceAll(gouuid.NewV4().String(), "-", "") +} diff --git a/services/reward/account/account.go b/services/reward/account/account.go deleted file mode 100644 index 4967b368e..000000000 --- a/services/reward/account/account.go +++ /dev/null @@ -1,9 +0,0 @@ -package account - -func IncreaseAmount(userId int64, amount int64) error { - return nil -} - -func DecreaseAmount(userId int64, amount int64) error { - return nil -} diff --git a/services/reward/callback.go b/services/reward/callback.go deleted file mode 100644 index b67ffa673..000000000 --- a/services/reward/callback.go +++ /dev/null @@ -1,4 +0,0 @@ -package reward - -type CallbackHandler struct { -} diff --git a/services/reward/limiter/limiter.go b/services/reward/limiter/limiter.go new file mode 100644 index 000000000..aca8af22e --- /dev/null +++ b/services/reward/limiter/limiter.go @@ -0,0 +1,93 @@ +package limiter + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/redis/redis_client" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/services/task/period" + "encoding/json" + "errors" + "fmt" + "time" +) + +type limiterRunner struct { + limiters []models.LimitConfig + index int + userId int64 + amount int64 + limitCode string +} + +func newLimiterRunner(limitCode string, userId, amount int64) *limiterRunner { + return &limiterRunner{ + userId: userId, + amount: amount, + limitCode: limitCode, + index: 0, + } +} + +func (l *limiterRunner) Run() error { + if err := l.LoadLimiters(l.limitCode); err != nil { + return err + } + //todo 验证未配置的情况 + for l.index <= len(l.limiters) { + err := l.limit(l.limiters[l.index]) + if err != nil { + return err + } + l.index += 1 + } + return nil +} + +func (l *limiterRunner) limit(r models.LimitConfig) error { + p, err := period.GetPeriod(r.RefreshRate) + if err != nil { + return err + } + redisKey := redis_key.LimitCount(l.userId, r.LimitCode, p) + usedNum, err := redis_client.IncrBy(redisKey, l.amount) + //if it is the first time,set expire time + if usedNum == l.amount && p != nil { + //todo 验证浮点精确度 + redis_client.Expire(redisKey, int64(p.LeftTime.Seconds())) + } + if usedNum > r.LimitNum { + redis_client.IncrBy(redisKey, -1*l.amount) + return errors.New(fmt.Sprintf("%s:over limit", r.Tittle)) + } + return nil +} + +func (l *limiterRunner) LoadLimiters(limitCode string) error { + redisKey := redis_key.LimitConfig(limitCode) + val, _ := redis_client.Get(redisKey) + if val != "" { + if val == redis_key.EMPTY_REDIS_VAL { + return nil + } + limiters := make([]models.LimitConfig, 0) + json.Unmarshal([]byte(val), limiters) + return nil + } + limiters, err := models.GetLimitConfigByLimitCode(limitCode) + if err != nil { + if models.IsErrRecordNotExist(err) { + redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) + return nil + } + return err + } + jsonStr, _ := json.Marshal(limiters) + redis_client.Setex(redisKey, string(jsonStr), 30*24*time.Hour) + + return nil +} + +func CheckLimit(limitCode string, userId, amount int64) error { + r := newLimiterRunner(limitCode, userId, amount) + return r.Run() +} diff --git a/services/reward/operator.go b/services/reward/operator.go index 848ba703d..321562474 100644 --- a/services/reward/operator.go +++ b/services/reward/operator.go @@ -2,9 +2,13 @@ package reward import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/services/reward/point" "errors" "fmt" + "time" ) var RewardOperatorMap = map[string]RewardOperator{ @@ -12,47 +16,109 @@ var RewardOperatorMap = map[string]RewardOperator{ } type RewardOperateContext struct { - SourceType models.RewardSourceType - SourceId string - Remark string - Reward Reward - TargetUserId int64 -} - -type RewardOperateResponse int - -const ( - RewardOperateSuccess RewardOperateResponse = iota + 1 - RewardOperateBalanceNotEnough -) - -func (t RewardOperateResponse) IsSuccess() bool { - return t == RewardOperateSuccess + SourceType models.RewardSourceType + SourceId string + Remark string + Reward Reward + TargetUserId int64 + RequestId string + OperateType string + CycleIntervalSeconds int64 } type RewardOperator interface { - IsOperated(ctx RewardOperateContext) bool IsLimited(ctx RewardOperateContext) bool Operate(ctx RewardOperateContext) error } func Send(ctx RewardOperateContext) error { + //add lock + var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardSendLock(ctx.RequestId, ctx.SourceType.String())) + if !rewardLock.Lock(3 * time.Second) { + log.Info("duplicated reward request,targetUserId=%d requestId=%s", ctx.TargetUserId, ctx.RequestId) + return nil + } + defer rewardLock.UnLock() + + //is handled before? + isHandled, err := isHandled(ctx.SourceType, ctx.RequestId) + if err != nil { + log.Error("reward is handled error,%v", err) + return err + } + if isHandled { + log.Info("reward has been handled,ctx=%+v", ctx) + return nil + } + + //get operator operator := GetOperator(ctx.Reward.Type) if operator == nil { return errors.New("operator of reward type is not exist") } - if operator.IsOperated(ctx) { - return nil - } + + //is limited? if operator.IsLimited(ctx) { return nil } + + //new reward operate record + if err := initAwardOperateRecord(ctx); err != nil { + return err + } + + //operate if err := operator.Operate(ctx); err != nil { + updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) return err } + + //if not a cycle operate,update status to success + if ctx.CycleIntervalSeconds > 0 { + updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded) + } return nil } func GetOperator(rewardType string) RewardOperator { return RewardOperatorMap[rewardType] } + +func isHandled(sourceType models.RewardSourceType, requestId string) (bool, error) { + _, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType.String(), requestId) + if err != nil { + if models.IsErrRecordNotExist(err) { + return false, nil + } + return false, err + } + return true, nil + +} + +func initAwardOperateRecord(ctx RewardOperateContext) error { + _, err := models.InsertAwardOperateRecord(&models.RewardOperateRecord{ + UserId: ctx.TargetUserId, + Amount: ctx.Reward.Amount, + RewardType: ctx.Reward.Type, + SourceType: ctx.SourceType.String(), + SourceId: ctx.SourceId, + RequestId: ctx.RequestId, + OperateType: ctx.OperateType, + CycleIntervalSeconds: ctx.CycleIntervalSeconds, + Status: models.OperateStatusOperating, + Remark: ctx.Remark, + }) + if err != nil { + return err + } + return nil +} + +func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) error { + _, err := models.UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus) + if err != nil { + return err + } + return nil +} diff --git a/services/reward/point/account/point_account.go b/services/reward/point/account/point_account.go new file mode 100644 index 000000000..9ff5001fc --- /dev/null +++ b/services/reward/point/account/point_account.go @@ -0,0 +1,58 @@ +package account + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/redis/redis_client" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "code.gitea.io/gitea/modules/util" + "encoding/json" + "time" +) + +func GetAccount(userId int64) (*models.PointAccount, error) { + redisKey := redis_key.PointAccountDetail(userId) + val, _ := redis_client.Get(redisKey) + if val != "" { + account := &models.PointAccount{} + json.Unmarshal([]byte(val), account) + return account, nil + } + account, err := models.GetAccountByUserId(userId) + if err != nil { + if models.IsErrRecordNotExist(err) { + a, err := InitAccount(userId) + if err != nil { + return nil, err + } + return a, nil + } + return nil, err + } + jsonStr, _ := json.Marshal(account) + redis_client.Setex(redisKey, string(jsonStr), 24*time.Hour) + return account, nil +} + +func InitAccount(userId int64) (*models.PointAccount, error) { + lock := redis_lock.NewDistributeLock(redis_key.PointAccountInitLock(userId)) + if lock.LockWithWait(3*time.Second, 3*time.Second) { + defer lock.UnLock() + account, _ := models.GetAccountByUserId(userId) + if account == nil { + models.InsertAccount(&models.PointAccount{ + Balance: 0, + TotalEarned: 0, + TotalConsumed: 0, + UserId: userId, + Status: models.PointAccountNormal, + Version: 0, + AccountCode: util.UUID(), + }) + return models.GetAccountByUserId(userId) + } + return account, nil + } + return nil, nil + +} diff --git a/services/reward/point/point_operate.go b/services/reward/point/point_operate.go index 5a6c18bff..ddcac515b 100644 --- a/services/reward/point/point_operate.go +++ b/services/reward/point/point_operate.go @@ -1,19 +1,44 @@ package point import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/services/reward" + "code.gitea.io/gitea/services/reward/limiter" + "code.gitea.io/gitea/services/reward/point/account" + "errors" + "time" ) type PointOperator struct { } -func (operator *PointOperator) IsOperated(ctx reward.RewardOperateContext) bool { - //todo - return true -} func (operator *PointOperator) IsLimited(ctx reward.RewardOperateContext) bool { + if err := limiter.CheckLimit(ctx.Reward.Type, ctx.TargetUserId, ctx.Reward.Amount); err != nil { + return false + } return true } + func (operator *PointOperator) Operate(ctx reward.RewardOperateContext) error { + a, err := account.GetAccount(ctx.TargetUserId) + if err != nil || a == nil { + return errors.New("get account error") + } + + lock := redis_lock.NewDistributeLock(redis_key.PointAccountOperateLock(a.AccountCode)) + if lock.LockWithWait(3*time.Second, 3*time.Second) { + defer lock.UnLock() + na, _ := account.GetAccount(ctx.TargetUserId) + if ctx.OperateType == models.OperateTypeIncrease { + na.Increase(ctx.Reward.Amount, ctx.SourceId) + } else if ctx.OperateType == models.OperateTypeDecrease { + na.Decrease(ctx.Reward.Amount, ctx.SourceId) + } + + } else { + return errors.New("Get account operate lock failed") + } return nil } diff --git a/services/task/limiter.go b/services/task/limiter.go deleted file mode 100644 index 6c2cd4f44..000000000 --- a/services/task/limiter.go +++ /dev/null @@ -1,39 +0,0 @@ -package task - -import ( - "code.gitea.io/gitea/models" - "time" -) - -var LimiterMap = map[string]Limiter{ - models.TaskConfigRefreshRateNotCycle: new(NoCycleLimiter), - models.TaskConfigRefreshRateDaily: new(DailyLimiter), -} - -type Limiter interface { - GetCurrentPeriod() *models.LimiterPeriod -} - -type NoCycleLimiter struct { -} - -func (l *NoCycleLimiter) GetCurrentPeriod() *models.LimiterPeriod { - return nil -} - -type DailyLimiter struct { -} - -func (l *DailyLimiter) GetCurrentPeriod() *models.LimiterPeriod { - t := time.Now() - startTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) - endTime := startTime.Add(24 * time.Hour) - return &models.LimiterPeriod{ - StartTime: startTime, - EndTime: endTime, - } -} - -func GetLimiter(refreshRateype string) Limiter { - return LimiterMap[refreshRateype] -} diff --git a/services/task/period/handler.go b/services/task/period/handler.go new file mode 100644 index 000000000..c3e5443d3 --- /dev/null +++ b/services/task/period/handler.go @@ -0,0 +1,50 @@ +package period + +import ( + "code.gitea.io/gitea/models" + "errors" + "time" +) + +var PeriodHandlerMap = map[string]PeriodHandler{ + models.PeriodNotCycle: new(NoCycleHandler), + models.PeriodDaily: new(DailyHandler), +} + +type PeriodHandler interface { + GetCurrentPeriod() *models.PeriodResult +} + +type NoCycleHandler struct { +} + +func (l *NoCycleHandler) GetCurrentPeriod() *models.PeriodResult { + return nil +} + +type DailyHandler struct { +} + +func (l *DailyHandler) GetCurrentPeriod() *models.PeriodResult { + t := time.Now() + startTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) + endTime := startTime.Add(24 * time.Hour) + leftTime := endTime.Sub(t) + return &models.PeriodResult{ + StartTime: startTime, + EndTime: endTime, + LeftTime: leftTime, + } +} + +func getPeriodHandler(refreshRateype string) PeriodHandler { + return PeriodHandlerMap[refreshRateype] +} + +func GetPeriod(refreshRate string) (*models.PeriodResult, error) { + handler := getPeriodHandler(refreshRate) + if handler == nil { + return nil, errors.New("task config incorrect") + } + return handler.GetCurrentPeriod(), nil +} diff --git a/services/task/task.go b/services/task/task.go index c5f65240b..403a2ba8f 100644 --- a/services/task/task.go +++ b/services/task/task.go @@ -5,8 +5,9 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" + "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/services/reward" - "errors" + "code.gitea.io/gitea/services/task/period" "time" ) @@ -16,7 +17,7 @@ func Accomplish(userId int64, taskType models.TaskType, sourceId string) { func accomplish(userId int64, taskType models.TaskType, sourceId string) error { //lock - var taskLock = redis_lock.NewDistributeLock(redis_key.TaskAccomplishLock(userId, sourceId, taskType)) + var taskLock = redis_lock.NewDistributeLock(redis_key.TaskAccomplishLock(sourceId, taskType)) if !taskLock.Lock(3 * time.Second) { log.Info("duplicated task request,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId) return nil @@ -57,12 +58,17 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { } //add log - models.InsertTaskAccomplishLog(&models.TaskAccomplishLog{ + logId := util.UUID() + _, err = models.InsertTaskAccomplishLog(&models.TaskAccomplishLog{ + LogId: logId, ConfigId: config.ID, TaskCode: config.TaskCode, UserId: userId, SourceId: sourceId, }) + if err != nil { + return err + } //reward reward.Send(reward.RewardOperateContext{ @@ -73,6 +79,7 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { Type: config.AwardType, }, TargetUserId: userId, + RequestId: logId, }) return nil @@ -91,11 +98,11 @@ func isHandled(taskType models.TaskType, sourceId string) (bool, error) { } func IsLimited(userId int64, config *models.TaskConfig) (bool, error) { - limiter := GetLimiter(config.RefreshRate) - if limiter == nil { - return false, errors.New("task config incorrect") + p, err := period.GetPeriod(config.RefreshRate) + if err != nil { + return false, err } - n, err := models.CountInTaskPeriod(config.ID, userId, limiter.GetCurrentPeriod()) + n, err := models.CountInTaskPeriod(config.ID, userId, p) if err != nil { return false, err } diff --git a/services/task/task_config.go b/services/task/task_config.go index 6812f5d67..22e0b1828 100644 --- a/services/task/task_config.go +++ b/services/task/task_config.go @@ -24,9 +24,9 @@ func GetTaskConfig(taskType models.TaskType) (*models.TaskConfig, error) { config, err := models.GetTaskConfigByTaskCode(taskType.String()) if err != nil { if models.IsErrRecordNotExist(err) { + redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) return nil, nil } - redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) return nil, err } jsonStr, _ := json.Marshal(config)