You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

clientManager.go 1.8 kB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package socketwrap
  2. import (
  3. "os"
  4. "os/signal"
  5. "syscall"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/log"
  8. "github.com/elliotchance/orderedmap"
  9. )
  10. type ClientsManager struct {
  11. Clients *orderedmap.OrderedMap
  12. Register chan *Client
  13. Unregister chan *Client
  14. }
  15. func NewClientsManager() *ClientsManager {
  16. return &ClientsManager{
  17. Register: make(chan *Client),
  18. Unregister: make(chan *Client),
  19. Clients: orderedmap.NewOrderedMap(),
  20. }
  21. }
  22. const MaxClients = 100
  23. var LastActionsQueue = NewSyncQueue(15)
  24. func (h *ClientsManager) Run() {
  25. initActionQueue()
  26. sig := make(chan os.Signal, 1)
  27. signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
  28. var signalsReceived uint
  29. for {
  30. select {
  31. case client := <-h.Register:
  32. h.Clients.Set(client, true)
  33. if h.Clients.Len() > MaxClients {
  34. h.Clients.Delete(h.Clients.Front().Key)
  35. }
  36. case client := <-h.Unregister:
  37. if _, ok := h.Clients.Get(client); ok {
  38. h.Clients.Delete(client)
  39. close(client.Send)
  40. }
  41. case message := <-models.ActionChan:
  42. LastActionsQueue.Push(message)
  43. for _, client := range h.Clients.Keys() {
  44. select {
  45. case client.(*Client).Send <- message:
  46. default:
  47. close(client.(*Client).Send)
  48. h.Clients.Delete(client)
  49. }
  50. }
  51. case s := <-sig:
  52. log.Info("received signal", s)
  53. signalsReceived++
  54. if signalsReceived < 2 {
  55. for _, client := range h.Clients.Keys() {
  56. h.Clients.Delete(client)
  57. client.(*Client).Close()
  58. }
  59. break
  60. }
  61. }
  62. }
  63. }
  64. func initActionQueue() {
  65. actions, err := models.GetLast20PublicFeeds()
  66. if err == nil {
  67. for i := len(actions) - 1; i >= 0; i-- {
  68. user, err := models.GetUserByID(actions[i].UserID)
  69. if err == nil {
  70. if !user.IsOrganization() {
  71. LastActionsQueue.Push(actions[i])
  72. }
  73. }
  74. }
  75. }
  76. }