@@ -0,0 +1,33 @@ | |||
package models | |||
import "code.gitea.io/gitea/modules/timeutil" | |||
type LimitConfig struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
Tittle string | |||
RefreshRate string `xorm:"NOT NULL"` | |||
Scope string `xorm:"NOT NULL"` | |||
LimitNum int64 `xorm:"NOT NULL"` | |||
LimitCode string `xorm:"NOT NULL"` | |||
Creator int64 `xorm:"NOT NULL"` | |||
CreatedUnix timeutil.TimeStamp `xorm:"created"` | |||
DeletedAt timeutil.TimeStamp `xorm:"deleted"` | |||
} | |||
func findLimitConfig(tl *LimitConfig) ([]LimitConfig, error) { | |||
r := make([]LimitConfig, 0) | |||
err := x.Find(r, tl) | |||
if err != nil { | |||
return nil, err | |||
} else if len(r) == 0 { | |||
return nil, ErrRecordNotExist{} | |||
} | |||
return r, nil | |||
} | |||
func GetLimitConfigByLimitCode(limitCode string) ([]LimitConfig, error) { | |||
t := &LimitConfig{ | |||
LimitCode: limitCode, | |||
} | |||
return findLimitConfig(t) | |||
} |
@@ -6,27 +6,92 @@ type PointAccountStatus int | |||
// Possible PointAccountStatus types. | |||
const ( | |||
PointAccountNormal PointAccountStatus = iota + 1 // 1 | |||
PointAccountFreeze // 2 | |||
PointAccountDeleted // 3 | |||
PointAccountNormal int = iota + 1 // 1 | |||
PointAccountFreeze // 2 | |||
PointAccountDeleted // 3 | |||
) | |||
type PointAccount struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
AccountCode string `xorm:"INDEX NOT NULL"` | |||
Balance int64 `xorm:"NOT NULL DEFAULT 0"` | |||
TotalEarned int64 `xorm:"NOT NULL DEFAULT 0"` | |||
TotalConsumed int64 `xorm:"NOT NULL DEFAULT 0"` | |||
UserId int64 `xorm:"INDEX NOT NULL"` | |||
Status string `xorm:"NOT NULL"` | |||
Status int `xorm:"NOT NULL"` | |||
Version int64 `xorm:"NOT NULL"` | |||
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` | |||
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` | |||
} | |||
func (account *PointAccount) Increase(amount int64) error { | |||
func (account *PointAccount) Increase(amount int64, sourceId string) error { | |||
sess := x.NewSession() | |||
defer sess.Close() | |||
sql := "update point_account set balance = balance + ?,total_earned = total_earned + ? ,version = version + 1 where account_code = ? " | |||
_, err := sess.Exec(sql, amount, amount, account.AccountCode) | |||
if err != nil { | |||
sess.Rollback() | |||
return err | |||
} | |||
accountLog := &PointAccountLog{ | |||
AccountCode: account.AccountCode, | |||
UserId: account.UserId, | |||
Type: IncreaseAccountBalance, | |||
SourceId: sourceId, | |||
PointsAmount: amount, | |||
BalanceBefore: account.Balance, | |||
BalanceAfter: account.Balance + amount, | |||
AccountVersion: account.Version, | |||
} | |||
_, err = sess.Insert(accountLog) | |||
if err != nil { | |||
sess.Rollback() | |||
return err | |||
} | |||
sess.Commit() | |||
return nil | |||
} | |||
func (account *PointAccount) Decrease(amount int64) error { | |||
func (account *PointAccount) Decrease(amount int64, sourceId string) error { | |||
sess := x.NewSession() | |||
defer sess.Close() | |||
sql := "update point_account set balance = balance - ?,total_consumed = total_consumed + ? ,version = version + 1 where account_code = ? " | |||
_, err := sess.Exec(sql, amount, amount, account.AccountCode) | |||
if err != nil { | |||
sess.Rollback() | |||
return err | |||
} | |||
accountLog := &PointAccountLog{ | |||
AccountCode: account.AccountCode, | |||
UserId: account.UserId, | |||
Type: DecreaseAccountBalance, | |||
SourceId: sourceId, | |||
PointsAmount: amount, | |||
BalanceBefore: account.Balance, | |||
BalanceAfter: account.Balance - amount, | |||
AccountVersion: account.Version, | |||
} | |||
_, err = sess.Insert(accountLog) | |||
if err != nil { | |||
sess.Rollback() | |||
return err | |||
} | |||
sess.Commit() | |||
return nil | |||
} | |||
func GetAccountByUserId(userId int64) (*PointAccount, error) { | |||
p := &PointAccount{} | |||
has, err := x.Where("user_id = ?", userId).Get(p) | |||
if err != nil { | |||
return nil, err | |||
} | |||
if !has { | |||
return nil, nil | |||
} | |||
return p, nil | |||
} | |||
func InsertAccount(tl *PointAccount) (int64, error) { | |||
return x.Insert(tl) | |||
} |
@@ -2,15 +2,20 @@ package models | |||
import "code.gitea.io/gitea/modules/timeutil" | |||
const ( | |||
IncreaseAccountBalance = "increase" | |||
DecreaseAccountBalance = "decrease" | |||
) | |||
type PointAccountLog struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
AccountId int64 `xorm:"INDEX NOT NULL"` | |||
AccountCode string `xorm:"INDEX NOT NULL"` | |||
UserId int64 `xorm:"INDEX NOT NULL"` | |||
Type string `xorm:"NOT NULL"` | |||
SourceId string `xorm:"INDEX NOT NULL"` | |||
PointsAmount int64 `xorm:"NOT NULL"` | |||
AmountBefore int64 `xorm:"NOT NULL"` | |||
AmountAfter int64 `xorm:"NOT NULL"` | |||
BalanceBefore int64 `xorm:"NOT NULL"` | |||
BalanceAfter int64 `xorm:"NOT NULL"` | |||
AccountVersion int64 `xorm:"NOT NULL"` | |||
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` | |||
} |
@@ -1,23 +0,0 @@ | |||
package models | |||
import "code.gitea.io/gitea/modules/timeutil" | |||
const ( | |||
LimitConfigRefreshRateOnce = "ONCE" | |||
LimitConfigRefreshRateDaily = "DAILY" | |||
) | |||
const ( | |||
LimitTargetRangeAllUser = "ALL_USER" | |||
LimitTargetRangeSingleUser = "SINGLE_USER" | |||
) | |||
type PointLimitConfig struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
Tittle string | |||
RefreshRate string `xorm:"NOT NULL"` | |||
TargetRange string `xorm:"NOT NULL"` | |||
LimitNum int64 `xorm:"NOT NULL"` | |||
Creator int64 `xorm:"NOT NULL"` | |||
CreatedUnix timeutil.TimeStamp `xorm:"created"` | |||
DeletedAt timeutil.TimeStamp `xorm:"deleted"` | |||
} |
@@ -1,42 +0,0 @@ | |||
package models | |||
import "code.gitea.io/gitea/modules/timeutil" | |||
type RewardSourceType string | |||
const ( | |||
SourceTypeAccomplishTask RewardSourceType = "ACCOMPLISH_TASK" | |||
SourceTypeAdminOperate RewardSourceType = "ADMIN_OPERATE" | |||
SourceTypeRunCloudbrainTask RewardSourceType = "RUN_CLOUBRAIN_TASK" | |||
) | |||
type RewardType string | |||
const ( | |||
RewardTypePoint RewardType = "POINT" | |||
) | |||
const ( | |||
OperateTypeIncrease = "INCREASE_POINT" | |||
OperateTypeDecrease = "DECREASE_POINT" | |||
) | |||
const ( | |||
OperateStatusOperating = "OPERATING" | |||
OperateStatusSucceeded = "SUCCEEDED" | |||
OperateStatusFailed = "FAILED" | |||
) | |||
type PointOperateRecord struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
UserId int64 `xorm:"INDEX NOT NULL"` | |||
PointsAmount int64 `xorm:"NOT NULL"` | |||
RelatedType string `xorm:"NOT NULL"` | |||
SourceId string `xorm:"INDEX NOT NULL"` | |||
OperateType string `xorm:"NOT NULL"` | |||
OperateRate string `xorm:"NOT NULL default once"` | |||
Status string `xorm:"NOT NULL"` | |||
Remark string | |||
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` | |||
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` | |||
} |
@@ -0,0 +1,84 @@ | |||
package models | |||
import ( | |||
"code.gitea.io/gitea/modules/timeutil" | |||
"fmt" | |||
) | |||
type RewardSourceType string | |||
const ( | |||
SourceTypeAccomplishTask RewardSourceType = "ACCOMPLISH_TASK" | |||
SourceTypeAdminOperate RewardSourceType = "ADMIN_OPERATE" | |||
SourceTypeRunCloudbrainTask RewardSourceType = "RUN_CLOUBRAIN_TASK" | |||
) | |||
func (r *RewardSourceType) String() string { | |||
return fmt.Sprint(r) | |||
} | |||
type RewardType string | |||
const ( | |||
RewardTypePoint RewardType = "POINT" | |||
) | |||
func (r *RewardType) String() string { | |||
return fmt.Sprint(r) | |||
} | |||
const ( | |||
OperateTypeIncrease = "INCREASE_POINT" | |||
OperateTypeDecrease = "DECREASE_POINT" | |||
) | |||
const ( | |||
OperateStatusOperating = "OPERATING" | |||
OperateStatusSucceeded = "SUCCEEDED" | |||
OperateStatusFailed = "FAILED" | |||
) | |||
type RewardOperateRecord struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
UserId int64 `xorm:"INDEX NOT NULL"` | |||
Amount int64 `xorm:"NOT NULL"` | |||
RewardType string `xorm:"NOT NULL"` | |||
SourceType string `xorm:"NOT NULL"` | |||
SourceId string `xorm:"INDEX NOT NULL"` | |||
RequestId string `xorm:"INDEX NOT NULL"` | |||
OperateType string `xorm:"NOT NULL"` | |||
CycleIntervalSeconds int64 `xorm:"NOT NULL default 0"` | |||
Status string `xorm:"NOT NULL"` | |||
Remark string | |||
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` | |||
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` | |||
} | |||
func getPointOperateRecord(tl *RewardOperateRecord) (*RewardOperateRecord, error) { | |||
has, err := x.Get(tl) | |||
if err != nil { | |||
return nil, err | |||
} else if !has { | |||
return nil, ErrRecordNotExist{} | |||
} | |||
return tl, nil | |||
} | |||
func GetPointOperateRecordBySourceTypeAndRequestId(sourceType, requestId string) (*RewardOperateRecord, error) { | |||
t := &RewardOperateRecord{ | |||
SourceType: sourceType, | |||
RequestId: requestId, | |||
} | |||
return getPointOperateRecord(t) | |||
} | |||
func InsertAwardOperateRecord(tl *RewardOperateRecord) (int64, error) { | |||
return x.Insert(tl) | |||
} | |||
func UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) (int64, error) { | |||
r := &RewardOperateRecord{ | |||
Status: newStatus, | |||
} | |||
return x.Cols("status").Where("source_type=? and requestId=? and status=?", sourceType, requestId, oldStatus).Update(r) | |||
} |
@@ -7,6 +7,7 @@ import ( | |||
type TaskAccomplishLog struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
LogId string `xorm:"INDEX NOT NULL"` | |||
ConfigId int64 `xorm:"NOT NULL"` | |||
TaskCode string `xorm:"NOT NULL"` | |||
UserId int64 `xorm:"INDEX NOT NULL"` | |||
@@ -14,9 +15,10 @@ type TaskAccomplishLog struct { | |||
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` | |||
} | |||
type LimiterPeriod struct { | |||
type PeriodResult struct { | |||
StartTime time.Time | |||
EndTime time.Time | |||
LeftTime time.Duration | |||
} | |||
func getTaskAccomplishLog(tl *TaskAccomplishLog) (*TaskAccomplishLog, error) { | |||
@@ -37,7 +39,7 @@ func GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskCode string) (*Task | |||
return getTaskAccomplishLog(t) | |||
} | |||
func CountInTaskPeriod(configId int64, userId int64, period *LimiterPeriod) (int64, error) { | |||
func CountInTaskPeriod(configId int64, userId int64, period *PeriodResult) (int64, error) { | |||
if period == nil { | |||
return x.Where("config_id = ? and user_id = ?", configId, userId).Count(&TaskAccomplishLog{}) | |||
} else { | |||
@@ -16,8 +16,8 @@ func (t *TaskType) String() string { | |||
} | |||
const ( | |||
TaskConfigRefreshRateNotCycle = "NOT_CYCLE" | |||
TaskConfigRefreshRateDaily = "DAILY" | |||
PeriodNotCycle = "NOT_CYCLE" | |||
PeriodDaily = "DAILY" | |||
) | |||
//PointTaskConfig Only add and delete are allowed, edit is not allowed | |||
@@ -85,3 +85,48 @@ func TTL(key string) (int, error) { | |||
return n, nil | |||
} | |||
func IncrBy(key string, n int64) (int64, error) { | |||
redisClient := labelmsg.Get() | |||
defer redisClient.Close() | |||
reply, err := redisClient.Do("INCRBY", key, n) | |||
if err != nil { | |||
return 0, err | |||
} | |||
i, err := strconv.ParseInt(fmt.Sprint(reply), 10, 64) | |||
return i, nil | |||
} | |||
func Expire(key string, expireSeconds int64) error { | |||
redisClient := labelmsg.Get() | |||
defer redisClient.Close() | |||
_, err := redisClient.Do("EXPIRE ", key, expireSeconds) | |||
if err != nil { | |||
return err | |||
} | |||
return nil | |||
} | |||
//GetInt64 get redis value by Get(key) | |||
//and then parse the value to int64 | |||
//return {isExist(bool)} {value(int64)} {error(error)} | |||
func GetInt64(key string) (bool, int64, error) { | |||
str, err := Get(key) | |||
if err != nil { | |||
return false, 0, err | |||
} | |||
if str == "" { | |||
return false, 0, nil | |||
} | |||
i, err := strconv.ParseInt(str, 10, 64) | |||
if err != nil { | |||
return false, 0, err | |||
} | |||
return true, i, nil | |||
} |
@@ -0,0 +1,17 @@ | |||
package redis_key | |||
import "fmt" | |||
const ACCOUNT_REDIS_PREFIX = "account" | |||
func PointAccountOperateLock(accountCode string) string { | |||
return KeyJoin(ACCOUNT_REDIS_PREFIX, accountCode, "operate", "lock") | |||
} | |||
func PointAccountDetail(userId int64) string { | |||
return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "detail") | |||
} | |||
func PointAccountInitLock(userId int64) string { | |||
return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "init", "lock") | |||
} |
@@ -0,0 +1,26 @@ | |||
package redis_key | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"fmt" | |||
) | |||
const LIMIT_REDIS_PREFIX = "limit" | |||
func LimitCount(userId int64, limitCode string, period *models.PeriodResult) string { | |||
if userId == 0 { | |||
if period == nil { | |||
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, "count") | |||
} | |||
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count") | |||
} | |||
if period == nil { | |||
return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, "count") | |||
} | |||
return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count") | |||
} | |||
func LimitConfig(limitCode string) string { | |||
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, "config") | |||
} |
@@ -0,0 +1,7 @@ | |||
package redis_key | |||
const REWARD_REDIS_PREFIX = "reward" | |||
func RewardSendLock(requestId string, sourceType string) string { | |||
return KeyJoin(REWARD_REDIS_PREFIX, requestId, sourceType, "send") | |||
} |
@@ -2,13 +2,12 @@ package redis_key | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"fmt" | |||
) | |||
const TASK_REDIS_PREFIX = "task" | |||
func TaskAccomplishLock(userId int64, sourceId string, taskType models.TaskType) string { | |||
return KeyJoin(TASK_REDIS_PREFIX, fmt.Sprint(userId), sourceId, taskType.String(), "accomplish") | |||
func TaskAccomplishLock(sourceId string, taskType models.TaskType) string { | |||
return KeyJoin(TASK_REDIS_PREFIX, sourceId, taskType.String(), "accomplish") | |||
} | |||
func TaskConfig(taskType models.TaskType) string { | |||
@@ -0,0 +1,10 @@ | |||
package util | |||
import ( | |||
gouuid "github.com/satori/go.uuid" | |||
"strings" | |||
) | |||
func UUID() string { | |||
return strings.ReplaceAll(gouuid.NewV4().String(), "-", "") | |||
} |
@@ -1,9 +0,0 @@ | |||
package account | |||
func IncreaseAmount(userId int64, amount int64) error { | |||
return nil | |||
} | |||
func DecreaseAmount(userId int64, amount int64) error { | |||
return nil | |||
} |
@@ -1,4 +0,0 @@ | |||
package reward | |||
type CallbackHandler struct { | |||
} |
@@ -0,0 +1,93 @@ | |||
package limiter | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/redis/redis_client" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/services/task/period" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"time" | |||
) | |||
type limiterRunner struct { | |||
limiters []models.LimitConfig | |||
index int | |||
userId int64 | |||
amount int64 | |||
limitCode string | |||
} | |||
func newLimiterRunner(limitCode string, userId, amount int64) *limiterRunner { | |||
return &limiterRunner{ | |||
userId: userId, | |||
amount: amount, | |||
limitCode: limitCode, | |||
index: 0, | |||
} | |||
} | |||
func (l *limiterRunner) Run() error { | |||
if err := l.LoadLimiters(l.limitCode); err != nil { | |||
return err | |||
} | |||
//todo 验证未配置的情况 | |||
for l.index <= len(l.limiters) { | |||
err := l.limit(l.limiters[l.index]) | |||
if err != nil { | |||
return err | |||
} | |||
l.index += 1 | |||
} | |||
return nil | |||
} | |||
func (l *limiterRunner) limit(r models.LimitConfig) error { | |||
p, err := period.GetPeriod(r.RefreshRate) | |||
if err != nil { | |||
return err | |||
} | |||
redisKey := redis_key.LimitCount(l.userId, r.LimitCode, p) | |||
usedNum, err := redis_client.IncrBy(redisKey, l.amount) | |||
//if it is the first time,set expire time | |||
if usedNum == l.amount && p != nil { | |||
//todo 验证浮点精确度 | |||
redis_client.Expire(redisKey, int64(p.LeftTime.Seconds())) | |||
} | |||
if usedNum > r.LimitNum { | |||
redis_client.IncrBy(redisKey, -1*l.amount) | |||
return errors.New(fmt.Sprintf("%s:over limit", r.Tittle)) | |||
} | |||
return nil | |||
} | |||
func (l *limiterRunner) LoadLimiters(limitCode string) error { | |||
redisKey := redis_key.LimitConfig(limitCode) | |||
val, _ := redis_client.Get(redisKey) | |||
if val != "" { | |||
if val == redis_key.EMPTY_REDIS_VAL { | |||
return nil | |||
} | |||
limiters := make([]models.LimitConfig, 0) | |||
json.Unmarshal([]byte(val), limiters) | |||
return nil | |||
} | |||
limiters, err := models.GetLimitConfigByLimitCode(limitCode) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) | |||
return nil | |||
} | |||
return err | |||
} | |||
jsonStr, _ := json.Marshal(limiters) | |||
redis_client.Setex(redisKey, string(jsonStr), 30*24*time.Hour) | |||
return nil | |||
} | |||
func CheckLimit(limitCode string, userId, amount int64) error { | |||
r := newLimiterRunner(limitCode, userId, amount) | |||
return r.Run() | |||
} |
@@ -2,9 +2,13 @@ package reward | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/services/reward/point" | |||
"errors" | |||
"fmt" | |||
"time" | |||
) | |||
var RewardOperatorMap = map[string]RewardOperator{ | |||
@@ -12,47 +16,109 @@ var RewardOperatorMap = map[string]RewardOperator{ | |||
} | |||
type RewardOperateContext struct { | |||
SourceType models.RewardSourceType | |||
SourceId string | |||
Remark string | |||
Reward Reward | |||
TargetUserId int64 | |||
} | |||
type RewardOperateResponse int | |||
const ( | |||
RewardOperateSuccess RewardOperateResponse = iota + 1 | |||
RewardOperateBalanceNotEnough | |||
) | |||
func (t RewardOperateResponse) IsSuccess() bool { | |||
return t == RewardOperateSuccess | |||
SourceType models.RewardSourceType | |||
SourceId string | |||
Remark string | |||
Reward Reward | |||
TargetUserId int64 | |||
RequestId string | |||
OperateType string | |||
CycleIntervalSeconds int64 | |||
} | |||
type RewardOperator interface { | |||
IsOperated(ctx RewardOperateContext) bool | |||
IsLimited(ctx RewardOperateContext) bool | |||
Operate(ctx RewardOperateContext) error | |||
} | |||
func Send(ctx RewardOperateContext) error { | |||
//add lock | |||
var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardSendLock(ctx.RequestId, ctx.SourceType.String())) | |||
if !rewardLock.Lock(3 * time.Second) { | |||
log.Info("duplicated reward request,targetUserId=%d requestId=%s", ctx.TargetUserId, ctx.RequestId) | |||
return nil | |||
} | |||
defer rewardLock.UnLock() | |||
//is handled before? | |||
isHandled, err := isHandled(ctx.SourceType, ctx.RequestId) | |||
if err != nil { | |||
log.Error("reward is handled error,%v", err) | |||
return err | |||
} | |||
if isHandled { | |||
log.Info("reward has been handled,ctx=%+v", ctx) | |||
return nil | |||
} | |||
//get operator | |||
operator := GetOperator(ctx.Reward.Type) | |||
if operator == nil { | |||
return errors.New("operator of reward type is not exist") | |||
} | |||
if operator.IsOperated(ctx) { | |||
return nil | |||
} | |||
//is limited? | |||
if operator.IsLimited(ctx) { | |||
return nil | |||
} | |||
//new reward operate record | |||
if err := initAwardOperateRecord(ctx); err != nil { | |||
return err | |||
} | |||
//operate | |||
if err := operator.Operate(ctx); err != nil { | |||
updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) | |||
return err | |||
} | |||
//if not a cycle operate,update status to success | |||
if ctx.CycleIntervalSeconds > 0 { | |||
updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded) | |||
} | |||
return nil | |||
} | |||
func GetOperator(rewardType string) RewardOperator { | |||
return RewardOperatorMap[rewardType] | |||
} | |||
func isHandled(sourceType models.RewardSourceType, requestId string) (bool, error) { | |||
_, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType.String(), requestId) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
return false, nil | |||
} | |||
return false, err | |||
} | |||
return true, nil | |||
} | |||
func initAwardOperateRecord(ctx RewardOperateContext) error { | |||
_, err := models.InsertAwardOperateRecord(&models.RewardOperateRecord{ | |||
UserId: ctx.TargetUserId, | |||
Amount: ctx.Reward.Amount, | |||
RewardType: ctx.Reward.Type, | |||
SourceType: ctx.SourceType.String(), | |||
SourceId: ctx.SourceId, | |||
RequestId: ctx.RequestId, | |||
OperateType: ctx.OperateType, | |||
CycleIntervalSeconds: ctx.CycleIntervalSeconds, | |||
Status: models.OperateStatusOperating, | |||
Remark: ctx.Remark, | |||
}) | |||
if err != nil { | |||
return err | |||
} | |||
return nil | |||
} | |||
func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) error { | |||
_, err := models.UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus) | |||
if err != nil { | |||
return err | |||
} | |||
return nil | |||
} |
@@ -0,0 +1,58 @@ | |||
package account | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/redis/redis_client" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/modules/util" | |||
"encoding/json" | |||
"time" | |||
) | |||
func GetAccount(userId int64) (*models.PointAccount, error) { | |||
redisKey := redis_key.PointAccountDetail(userId) | |||
val, _ := redis_client.Get(redisKey) | |||
if val != "" { | |||
account := &models.PointAccount{} | |||
json.Unmarshal([]byte(val), account) | |||
return account, nil | |||
} | |||
account, err := models.GetAccountByUserId(userId) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
a, err := InitAccount(userId) | |||
if err != nil { | |||
return nil, err | |||
} | |||
return a, nil | |||
} | |||
return nil, err | |||
} | |||
jsonStr, _ := json.Marshal(account) | |||
redis_client.Setex(redisKey, string(jsonStr), 24*time.Hour) | |||
return account, nil | |||
} | |||
func InitAccount(userId int64) (*models.PointAccount, error) { | |||
lock := redis_lock.NewDistributeLock(redis_key.PointAccountInitLock(userId)) | |||
if lock.LockWithWait(3*time.Second, 3*time.Second) { | |||
defer lock.UnLock() | |||
account, _ := models.GetAccountByUserId(userId) | |||
if account == nil { | |||
models.InsertAccount(&models.PointAccount{ | |||
Balance: 0, | |||
TotalEarned: 0, | |||
TotalConsumed: 0, | |||
UserId: userId, | |||
Status: models.PointAccountNormal, | |||
Version: 0, | |||
AccountCode: util.UUID(), | |||
}) | |||
return models.GetAccountByUserId(userId) | |||
} | |||
return account, nil | |||
} | |||
return nil, nil | |||
} |
@@ -1,19 +1,44 @@ | |||
package point | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/services/reward" | |||
"code.gitea.io/gitea/services/reward/limiter" | |||
"code.gitea.io/gitea/services/reward/point/account" | |||
"errors" | |||
"time" | |||
) | |||
type PointOperator struct { | |||
} | |||
func (operator *PointOperator) IsOperated(ctx reward.RewardOperateContext) bool { | |||
//todo | |||
return true | |||
} | |||
func (operator *PointOperator) IsLimited(ctx reward.RewardOperateContext) bool { | |||
if err := limiter.CheckLimit(ctx.Reward.Type, ctx.TargetUserId, ctx.Reward.Amount); err != nil { | |||
return false | |||
} | |||
return true | |||
} | |||
func (operator *PointOperator) Operate(ctx reward.RewardOperateContext) error { | |||
a, err := account.GetAccount(ctx.TargetUserId) | |||
if err != nil || a == nil { | |||
return errors.New("get account error") | |||
} | |||
lock := redis_lock.NewDistributeLock(redis_key.PointAccountOperateLock(a.AccountCode)) | |||
if lock.LockWithWait(3*time.Second, 3*time.Second) { | |||
defer lock.UnLock() | |||
na, _ := account.GetAccount(ctx.TargetUserId) | |||
if ctx.OperateType == models.OperateTypeIncrease { | |||
na.Increase(ctx.Reward.Amount, ctx.SourceId) | |||
} else if ctx.OperateType == models.OperateTypeDecrease { | |||
na.Decrease(ctx.Reward.Amount, ctx.SourceId) | |||
} | |||
} else { | |||
return errors.New("Get account operate lock failed") | |||
} | |||
return nil | |||
} |
@@ -1,39 +0,0 @@ | |||
package task | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"time" | |||
) | |||
var LimiterMap = map[string]Limiter{ | |||
models.TaskConfigRefreshRateNotCycle: new(NoCycleLimiter), | |||
models.TaskConfigRefreshRateDaily: new(DailyLimiter), | |||
} | |||
type Limiter interface { | |||
GetCurrentPeriod() *models.LimiterPeriod | |||
} | |||
type NoCycleLimiter struct { | |||
} | |||
func (l *NoCycleLimiter) GetCurrentPeriod() *models.LimiterPeriod { | |||
return nil | |||
} | |||
type DailyLimiter struct { | |||
} | |||
func (l *DailyLimiter) GetCurrentPeriod() *models.LimiterPeriod { | |||
t := time.Now() | |||
startTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) | |||
endTime := startTime.Add(24 * time.Hour) | |||
return &models.LimiterPeriod{ | |||
StartTime: startTime, | |||
EndTime: endTime, | |||
} | |||
} | |||
func GetLimiter(refreshRateype string) Limiter { | |||
return LimiterMap[refreshRateype] | |||
} |
@@ -0,0 +1,50 @@ | |||
package period | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"errors" | |||
"time" | |||
) | |||
var PeriodHandlerMap = map[string]PeriodHandler{ | |||
models.PeriodNotCycle: new(NoCycleHandler), | |||
models.PeriodDaily: new(DailyHandler), | |||
} | |||
type PeriodHandler interface { | |||
GetCurrentPeriod() *models.PeriodResult | |||
} | |||
type NoCycleHandler struct { | |||
} | |||
func (l *NoCycleHandler) GetCurrentPeriod() *models.PeriodResult { | |||
return nil | |||
} | |||
type DailyHandler struct { | |||
} | |||
func (l *DailyHandler) GetCurrentPeriod() *models.PeriodResult { | |||
t := time.Now() | |||
startTime := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) | |||
endTime := startTime.Add(24 * time.Hour) | |||
leftTime := endTime.Sub(t) | |||
return &models.PeriodResult{ | |||
StartTime: startTime, | |||
EndTime: endTime, | |||
LeftTime: leftTime, | |||
} | |||
} | |||
func getPeriodHandler(refreshRateype string) PeriodHandler { | |||
return PeriodHandlerMap[refreshRateype] | |||
} | |||
func GetPeriod(refreshRate string) (*models.PeriodResult, error) { | |||
handler := getPeriodHandler(refreshRate) | |||
if handler == nil { | |||
return nil, errors.New("task config incorrect") | |||
} | |||
return handler.GetCurrentPeriod(), nil | |||
} |
@@ -5,8 +5,9 @@ import ( | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/modules/util" | |||
"code.gitea.io/gitea/services/reward" | |||
"errors" | |||
"code.gitea.io/gitea/services/task/period" | |||
"time" | |||
) | |||
@@ -16,7 +17,7 @@ func Accomplish(userId int64, taskType models.TaskType, sourceId string) { | |||
func accomplish(userId int64, taskType models.TaskType, sourceId string) error { | |||
//lock | |||
var taskLock = redis_lock.NewDistributeLock(redis_key.TaskAccomplishLock(userId, sourceId, taskType)) | |||
var taskLock = redis_lock.NewDistributeLock(redis_key.TaskAccomplishLock(sourceId, taskType)) | |||
if !taskLock.Lock(3 * time.Second) { | |||
log.Info("duplicated task request,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId) | |||
return nil | |||
@@ -57,12 +58,17 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { | |||
} | |||
//add log | |||
models.InsertTaskAccomplishLog(&models.TaskAccomplishLog{ | |||
logId := util.UUID() | |||
_, err = models.InsertTaskAccomplishLog(&models.TaskAccomplishLog{ | |||
LogId: logId, | |||
ConfigId: config.ID, | |||
TaskCode: config.TaskCode, | |||
UserId: userId, | |||
SourceId: sourceId, | |||
}) | |||
if err != nil { | |||
return err | |||
} | |||
//reward | |||
reward.Send(reward.RewardOperateContext{ | |||
@@ -73,6 +79,7 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { | |||
Type: config.AwardType, | |||
}, | |||
TargetUserId: userId, | |||
RequestId: logId, | |||
}) | |||
return nil | |||
@@ -91,11 +98,11 @@ func isHandled(taskType models.TaskType, sourceId string) (bool, error) { | |||
} | |||
func IsLimited(userId int64, config *models.TaskConfig) (bool, error) { | |||
limiter := GetLimiter(config.RefreshRate) | |||
if limiter == nil { | |||
return false, errors.New("task config incorrect") | |||
p, err := period.GetPeriod(config.RefreshRate) | |||
if err != nil { | |||
return false, err | |||
} | |||
n, err := models.CountInTaskPeriod(config.ID, userId, limiter.GetCurrentPeriod()) | |||
n, err := models.CountInTaskPeriod(config.ID, userId, p) | |||
if err != nil { | |||
return false, err | |||
} | |||
@@ -24,9 +24,9 @@ func GetTaskConfig(taskType models.TaskType) (*models.TaskConfig, error) { | |||
config, err := models.GetTaskConfigByTaskCode(taskType.String()) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) | |||
return nil, nil | |||
} | |||
redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) | |||
return nil, err | |||
} | |||
jsonStr, _ := json.Marshal(config) | |||