Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: techknowlogick <techknowlogick@gitea.io>tags/v1.15.0-dev
@@ -9,25 +9,56 @@ import ( | |||||
"reflect" | "reflect" | ||||
) | ) | ||||
// Mappable represents an interface that can MapTo another interface | |||||
type Mappable interface { | |||||
MapTo(v interface{}) error | |||||
} | |||||
// toConfig will attempt to convert a given configuration cfg into the provided exemplar type. | // toConfig will attempt to convert a given configuration cfg into the provided exemplar type. | ||||
// | // | ||||
// It will tolerate the cfg being passed as a []byte or string of a json representation of the | // It will tolerate the cfg being passed as a []byte or string of a json representation of the | ||||
// exemplar or the correct type of the exemplar itself | // exemplar or the correct type of the exemplar itself | ||||
func toConfig(exemplar, cfg interface{}) (interface{}, error) { | func toConfig(exemplar, cfg interface{}) (interface{}, error) { | ||||
// First of all check if we've got the same type as the exemplar - if so it's all fine. | |||||
if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { | if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { | ||||
return cfg, nil | return cfg, nil | ||||
} | } | ||||
// Now if not - does it provide a MapTo function we can try? | |||||
if mappable, ok := cfg.(Mappable); ok { | |||||
newVal := reflect.New(reflect.TypeOf(exemplar)) | |||||
if err := mappable.MapTo(newVal.Interface()); err == nil { | |||||
return newVal.Elem().Interface(), nil | |||||
} | |||||
// MapTo has failed us ... let's try the json route ... | |||||
} | |||||
// OK we've been passed a byte array right? | |||||
configBytes, ok := cfg.([]byte) | configBytes, ok := cfg.([]byte) | ||||
if !ok { | if !ok { | ||||
configStr, ok := cfg.(string) | |||||
if !ok { | |||||
return nil, ErrInvalidConfiguration{cfg: cfg} | |||||
} | |||||
// oh ... it's a string then? | |||||
var configStr string | |||||
configStr, ok = cfg.(string) | |||||
configBytes = []byte(configStr) | configBytes = []byte(configStr) | ||||
} | } | ||||
if !ok { | |||||
// hmm ... can we marshal it to json? | |||||
var err error | |||||
configBytes, err = json.Marshal(cfg) | |||||
ok = (err == nil) | |||||
} | |||||
if !ok { | |||||
// no ... we've tried hard enough at this point - throw an error! | |||||
return nil, ErrInvalidConfiguration{cfg: cfg} | |||||
} | |||||
// OK unmarshal the byte array into a new copy of the exemplar | |||||
newVal := reflect.New(reflect.TypeOf(exemplar)) | newVal := reflect.New(reflect.TypeOf(exemplar)) | ||||
if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { | if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { | ||||
// If we can't unmarshal it then return an error! | |||||
return nil, ErrInvalidConfiguration{cfg: cfg, err: err} | return nil, ErrInvalidConfiguration{cfg: cfg, err: err} | ||||
} | } | ||||
return newVal.Elem().Interface(), nil | return newVal.Elem().Interface(), nil | ||||
@@ -27,27 +27,10 @@ func validType(t string) (Type, error) { | |||||
func getQueueSettings(name string) (setting.QueueSettings, []byte) { | func getQueueSettings(name string) (setting.QueueSettings, []byte) { | ||||
q := setting.GetQueueSettings(name) | q := setting.GetQueueSettings(name) | ||||
opts := make(map[string]interface{}) | |||||
opts["Name"] = name | |||||
opts["QueueLength"] = q.Length | |||||
opts["BatchLength"] = q.BatchLength | |||||
opts["DataDir"] = q.DataDir | |||||
opts["Addresses"] = q.Addresses | |||||
opts["Network"] = q.Network | |||||
opts["Password"] = q.Password | |||||
opts["DBIndex"] = q.DBIndex | |||||
opts["QueueName"] = q.QueueName | |||||
opts["SetName"] = q.SetName | |||||
opts["Workers"] = q.Workers | |||||
opts["MaxWorkers"] = q.MaxWorkers | |||||
opts["BlockTimeout"] = q.BlockTimeout | |||||
opts["BoostTimeout"] = q.BoostTimeout | |||||
opts["BoostWorkers"] = q.BoostWorkers | |||||
opts["ConnectionString"] = q.ConnectionString | |||||
cfg, err := json.Marshal(opts) | |||||
cfg, err := json.Marshal(q) | |||||
if err != nil { | if err != nil { | ||||
log.Error("Unable to marshall generic options: %v Error: %v", opts, err) | |||||
log.Error("Unable to marshall generic options: %v Error: %v", q, err) | |||||
log.Error("Unable to create queue for %s", name, err) | log.Error("Unable to create queue for %s", name, err) | ||||
return q, []byte{} | return q, []byte{} | ||||
} | } | ||||
@@ -75,7 +58,7 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { | |||||
Timeout: q.Timeout, | Timeout: q.Timeout, | ||||
MaxAttempts: q.MaxAttempts, | MaxAttempts: q.MaxAttempts, | ||||
Config: cfg, | Config: cfg, | ||||
QueueLength: q.Length, | |||||
QueueLength: q.QueueLength, | |||||
Name: name, | Name: name, | ||||
}, exemplar) | }, exemplar) | ||||
} | } | ||||
@@ -114,7 +97,7 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un | |||||
Timeout: q.Timeout, | Timeout: q.Timeout, | ||||
MaxAttempts: q.MaxAttempts, | MaxAttempts: q.MaxAttempts, | ||||
Config: cfg, | Config: cfg, | ||||
QueueLength: q.Length, | |||||
QueueLength: q.QueueLength, | |||||
}, exemplar) | }, exemplar) | ||||
} | } | ||||
if err != nil { | if err != nil { | ||||
@@ -16,8 +16,9 @@ import ( | |||||
// QueueSettings represent the settings for a queue from the ini | // QueueSettings represent the settings for a queue from the ini | ||||
type QueueSettings struct { | type QueueSettings struct { | ||||
Name string | |||||
DataDir string | DataDir string | ||||
Length int | |||||
QueueLength int `ini:"LENGTH"` | |||||
BatchLength int | BatchLength int | ||||
ConnectionString string | ConnectionString string | ||||
Type string | Type string | ||||
@@ -44,6 +45,8 @@ var Queue = QueueSettings{} | |||||
func GetQueueSettings(name string) QueueSettings { | func GetQueueSettings(name string) QueueSettings { | ||||
q := QueueSettings{} | q := QueueSettings{} | ||||
sec := Cfg.Section("queue." + name) | sec := Cfg.Section("queue." + name) | ||||
q.Name = name | |||||
// DataDir is not directly inheritable | // DataDir is not directly inheritable | ||||
q.DataDir = filepath.Join(Queue.DataDir, name) | q.DataDir = filepath.Join(Queue.DataDir, name) | ||||
// QueueName is not directly inheritable either | // QueueName is not directly inheritable either | ||||
@@ -65,8 +68,9 @@ func GetQueueSettings(name string) QueueSettings { | |||||
q.DataDir = filepath.Join(AppDataPath, q.DataDir) | q.DataDir = filepath.Join(AppDataPath, q.DataDir) | ||||
} | } | ||||
_, _ = sec.NewKey("DATADIR", q.DataDir) | _, _ = sec.NewKey("DATADIR", q.DataDir) | ||||
// The rest are... | // The rest are... | ||||
q.Length = sec.Key("LENGTH").MustInt(Queue.Length) | |||||
q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength) | |||||
q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) | q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) | ||||
q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) | q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) | ||||
q.Type = sec.Key("TYPE").MustString(Queue.Type) | q.Type = sec.Key("TYPE").MustString(Queue.Type) | ||||
@@ -91,7 +95,7 @@ func NewQueueService() { | |||||
if !filepath.IsAbs(Queue.DataDir) { | if !filepath.IsAbs(Queue.DataDir) { | ||||
Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir) | Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir) | ||||
} | } | ||||
Queue.Length = sec.Key("LENGTH").MustInt(20) | |||||
Queue.QueueLength = sec.Key("LENGTH").MustInt(20) | |||||
Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) | Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) | ||||
Queue.ConnectionString = sec.Key("CONN_STR").MustString("") | Queue.ConnectionString = sec.Key("CONN_STR").MustString("") | ||||
Queue.Type = sec.Key("TYPE").MustString("persistable-channel") | Queue.Type = sec.Key("TYPE").MustString("persistable-channel") | ||||