You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

elastic_search.go 5.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package issues
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. "strconv"
  10. "time"
  11. "code.gitea.io/gitea/modules/log"
  12. "github.com/olivere/elastic/v7"
  13. )
  14. var (
  15. _ Indexer = &ElasticSearchIndexer{}
  16. )
  17. // ElasticSearchIndexer implements Indexer interface
  18. type ElasticSearchIndexer struct {
  19. client *elastic.Client
  20. indexerName string
  21. }
  22. type elasticLogger struct {
  23. *log.Logger
  24. }
  25. func (l elasticLogger) Printf(format string, args ...interface{}) {
  26. _ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...)
  27. }
  28. // NewElasticSearchIndexer creates a new elasticsearch indexer
  29. func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, error) {
  30. opts := []elastic.ClientOptionFunc{
  31. elastic.SetURL(url),
  32. elastic.SetSniff(false),
  33. elastic.SetHealthcheckInterval(10 * time.Second),
  34. elastic.SetGzip(false),
  35. }
  36. logger := elasticLogger{log.GetLogger(log.DEFAULT)}
  37. if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG {
  38. opts = append(opts, elastic.SetTraceLog(logger))
  39. } else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL {
  40. opts = append(opts, elastic.SetErrorLog(logger))
  41. } else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN {
  42. opts = append(opts, elastic.SetInfoLog(logger))
  43. }
  44. client, err := elastic.NewClient(opts...)
  45. if err != nil {
  46. return nil, err
  47. }
  48. return &ElasticSearchIndexer{
  49. client: client,
  50. indexerName: indexerName,
  51. }, nil
  52. }
  53. const (
  54. defaultMapping = `{
  55. "mappings": {
  56. "properties": {
  57. "id": {
  58. "type": "integer",
  59. "index": true
  60. },
  61. "repo_id": {
  62. "type": "integer",
  63. "index": true
  64. },
  65. "title": {
  66. "type": "text",
  67. "index": true
  68. },
  69. "content": {
  70. "type": "text",
  71. "index": true
  72. },
  73. "comments": {
  74. "type" : "text",
  75. "index": true
  76. }
  77. }
  78. }
  79. }`
  80. )
  81. // Init will initialize the indexer
  82. func (b *ElasticSearchIndexer) Init() (bool, error) {
  83. ctx := context.Background()
  84. exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
  85. if err != nil {
  86. return false, err
  87. }
  88. if !exists {
  89. var mapping = defaultMapping
  90. createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
  91. if err != nil {
  92. return false, err
  93. }
  94. if !createIndex.Acknowledged {
  95. return false, errors.New("init failed")
  96. }
  97. return false, nil
  98. }
  99. return true, nil
  100. }
  101. // Index will save the index data
  102. func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
  103. if len(issues) == 0 {
  104. return nil
  105. } else if len(issues) == 1 {
  106. issue := issues[0]
  107. _, err := b.client.Index().
  108. Index(b.indexerName).
  109. Id(fmt.Sprintf("%d", issue.ID)).
  110. BodyJson(map[string]interface{}{
  111. "id": issue.ID,
  112. "repo_id": issue.RepoID,
  113. "title": issue.Title,
  114. "content": issue.Content,
  115. "comments": issue.Comments,
  116. }).
  117. Do(context.Background())
  118. return err
  119. }
  120. reqs := make([]elastic.BulkableRequest, 0)
  121. for _, issue := range issues {
  122. reqs = append(reqs,
  123. elastic.NewBulkIndexRequest().
  124. Index(b.indexerName).
  125. Id(fmt.Sprintf("%d", issue.ID)).
  126. Doc(map[string]interface{}{
  127. "id": issue.ID,
  128. "repo_id": issue.RepoID,
  129. "title": issue.Title,
  130. "content": issue.Content,
  131. "comments": issue.Comments,
  132. }),
  133. )
  134. }
  135. _, err := b.client.Bulk().
  136. Index(b.indexerName).
  137. Add(reqs...).
  138. Do(context.Background())
  139. return err
  140. }
  141. // Delete deletes indexes by ids
  142. func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
  143. if len(ids) == 0 {
  144. return nil
  145. } else if len(ids) == 1 {
  146. _, err := b.client.Delete().
  147. Index(b.indexerName).
  148. Id(fmt.Sprintf("%d", ids[0])).
  149. Do(context.Background())
  150. return err
  151. }
  152. reqs := make([]elastic.BulkableRequest, 0)
  153. for _, id := range ids {
  154. reqs = append(reqs,
  155. elastic.NewBulkDeleteRequest().
  156. Index(b.indexerName).
  157. Id(fmt.Sprintf("%d", id)),
  158. )
  159. }
  160. _, err := b.client.Bulk().
  161. Index(b.indexerName).
  162. Add(reqs...).
  163. Do(context.Background())
  164. return err
  165. }
  166. // Search searches for issues by given conditions.
  167. // Returns the matching issue IDs
  168. func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
  169. kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
  170. query := elastic.NewBoolQuery()
  171. query = query.Must(kwQuery)
  172. if len(repoIDs) > 0 {
  173. var repoStrs = make([]interface{}, 0, len(repoIDs))
  174. for _, repoID := range repoIDs {
  175. repoStrs = append(repoStrs, repoID)
  176. }
  177. repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
  178. query = query.Must(repoQuery)
  179. }
  180. searchResult, err := b.client.Search().
  181. Index(b.indexerName).
  182. Query(query).
  183. Sort("id", true).
  184. From(start).Size(limit).
  185. Do(context.Background())
  186. if err != nil {
  187. return nil, err
  188. }
  189. hits := make([]Match, 0, limit)
  190. for _, hit := range searchResult.Hits.Hits {
  191. id, _ := strconv.ParseInt(hit.Id, 10, 64)
  192. hits = append(hits, Match{
  193. ID: id,
  194. })
  195. }
  196. return &SearchResult{
  197. Total: searchResult.TotalHits(),
  198. Hits: hits,
  199. }, nil
  200. }
  201. // Close implements indexer
  202. func (b *ElasticSearchIndexer) Close() {}