@@ -346,6 +346,25 @@ func GetFeeds(opts GetFeedsOptions) ([]*Action, error) { | |||||
return actions, nil | return actions, nil | ||||
} | } | ||||
func GetLast20PublicFeeds() ([]*Action, error) { | |||||
cond := builder.NewCond() | |||||
cond = cond.And(builder.Eq{"is_private": false}) | |||||
cond = cond.And(builder.Eq{"is_deleted": false}) | |||||
actions := make([]*Action, 0, 20) | |||||
if err := x.Limit(20).Desc("id").Where(cond).Find(&actions); err != nil { | |||||
return nil, fmt.Errorf("Find: %v", err) | |||||
} | |||||
if err := ActionList(actions).LoadAttributes(); err != nil { | |||||
return nil, fmt.Errorf("LoadAttributes: %v", err) | |||||
} | |||||
return actions, nil | |||||
} | |||||
func GetUnTransformedActions() ([]*Action, error) { | func GetUnTransformedActions() ([]*Action, error) { | ||||
actions := make([]*Action, 0, 10) | actions := make([]*Action, 0, 10) | ||||
err := x.Where("op_type = ?", ActionCommitRepo). | err := x.Where("op_type = ?", ActionCommitRepo). | ||||
@@ -29,7 +29,7 @@ func ActionNotification(ctx *context.Context) { | |||||
} | } | ||||
client := &socketwrap.Client{Manager: SocketManager, Conn: conn, Send: make(chan *models.Action, 256)} | client := &socketwrap.Client{Manager: SocketManager, Conn: conn, Send: make(chan *models.Action, 256)} | ||||
WriteLastTenActionsIfHave(conn) | |||||
WriteLastActionsIfHave(conn) | |||||
client.Manager.Register <- client | client.Manager.Register <- client | ||||
@@ -37,12 +37,12 @@ func ActionNotification(ctx *context.Context) { | |||||
} | } | ||||
func WriteLastTenActionsIfHave(conn *websocket.Conn) { | |||||
socketwrap.LastTenActionsQueue.Mutex.RLock() | |||||
func WriteLastActionsIfHave(conn *websocket.Conn) { | |||||
socketwrap.LastActionsQueue.Mutex.RLock() | |||||
{ | { | ||||
size := socketwrap.LastTenActionsQueue.Queue.Len() | |||||
size := socketwrap.LastActionsQueue.Queue.Len() | |||||
if size > 0 { | if size > 0 { | ||||
tempE := socketwrap.LastTenActionsQueue.Queue.Front() | |||||
tempE := socketwrap.LastActionsQueue.Queue.Front() | |||||
conn.WriteJSON(tempE.Value) | conn.WriteJSON(tempE.Value) | ||||
for i := 1; i < size; i++ { | for i := 1; i < size; i++ { | ||||
tempE = tempE.Next() | tempE = tempE.Next() | ||||
@@ -52,5 +52,5 @@ func WriteLastTenActionsIfHave(conn *websocket.Conn) { | |||||
} | } | ||||
} | } | ||||
socketwrap.LastTenActionsQueue.Mutex.RUnlock() | |||||
socketwrap.LastActionsQueue.Mutex.RUnlock() | |||||
} | } |
@@ -18,9 +18,10 @@ func NewClientsManager() *ClientsManager { | |||||
} | } | ||||
} | } | ||||
var LastTenActionsQueue = NewSyncQueue(10) | |||||
var LastActionsQueue = NewSyncQueue(20) | |||||
func (h *ClientsManager) Run() { | func (h *ClientsManager) Run() { | ||||
initActionQueue() | |||||
for { | for { | ||||
select { | select { | ||||
case client := <-h.Register: | case client := <-h.Register: | ||||
@@ -31,7 +32,7 @@ func (h *ClientsManager) Run() { | |||||
close(client.Send) | close(client.Send) | ||||
} | } | ||||
case message := <-models.ActionChan: | case message := <-models.ActionChan: | ||||
LastTenActionsQueue.Push(message) | |||||
LastActionsQueue.Push(message) | |||||
for client := range h.Clients { | for client := range h.Clients { | ||||
select { | select { | ||||
case client.Send <- message: | case client.Send <- message: | ||||
@@ -44,3 +45,12 @@ func (h *ClientsManager) Run() { | |||||
} | } | ||||
} | } | ||||
} | } | ||||
func initActionQueue() { | |||||
actions, err := models.GetLast20PublicFeeds() | |||||
if err == nil { | |||||
for i := len(actions) - 1; i >= 0; i-- { | |||||
LastActionsQueue.Push(actions[i]) | |||||
} | |||||
} | |||||
} |