* Add queue for code indexer * Fix lint * Fix test * Fix lint * Fix bug * Fix bug * Fix lint * Add noqueue * Fix tests * Rename noqueue to immediatetags/v1.13.0-rc1
@@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mssql/issues.bleve | |||
REPO_INDEXER_ENABLED = true | |||
REPO_INDEXER_PATH = integrations/indexers-mssql/repos.bleve | |||
[queue.code_indexer] | |||
TYPE = immediate | |||
[repository] | |||
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mssql/gitea-repositories | |||
@@ -16,6 +16,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql/issues.bleve | |||
REPO_INDEXER_ENABLED = true | |||
REPO_INDEXER_PATH = integrations/indexers-mysql/repos.bleve | |||
[queue.code_indexer] | |||
TYPE = immediate | |||
[repository] | |||
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql/gitea-repositories | |||
@@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql8/issues.bleve | |||
REPO_INDEXER_ENABLED = true | |||
REPO_INDEXER_PATH = integrations/indexers-mysql8/repos.bleve | |||
[queue.code_indexer] | |||
TYPE = immediate | |||
[repository] | |||
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql8/gitea-repositories | |||
@@ -15,6 +15,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-pgsql/issues.bleve | |||
REPO_INDEXER_ENABLED = true | |||
REPO_INDEXER_PATH = integrations/indexers-pgsql/repos.bleve | |||
[queue.code_indexer] | |||
TYPE = immediate | |||
[repository] | |||
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-pgsql/gitea-repositories | |||
@@ -7,7 +7,6 @@ package integrations | |||
import ( | |||
"net/http" | |||
"testing" | |||
"time" | |||
"code.gitea.io/gitea/models" | |||
code_indexer "code.gitea.io/gitea/modules/indexer/code" | |||
@@ -62,14 +61,6 @@ func testSearch(t *testing.T, url string, expected []string) { | |||
assert.EqualValues(t, expected, filenames) | |||
} | |||
func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository, ...chan<- error)) { | |||
waiter := make(chan error, 1) | |||
op(repo, waiter) | |||
select { | |||
case err := <-waiter: | |||
assert.NoError(t, err) | |||
case <-time.After(1 * time.Minute): | |||
assert.Fail(t, "Repository indexer took too long") | |||
} | |||
func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository)) { | |||
op(repo) | |||
} |
@@ -10,6 +10,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-sqlite/issues.bleve | |||
REPO_INDEXER_ENABLED = true | |||
REPO_INDEXER_PATH = integrations/indexers-sqlite/repos.bleve | |||
[queue.code_indexer] | |||
TYPE = immediate | |||
[repository] | |||
ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-sqlite/gitea-repositories | |||
@@ -168,6 +168,11 @@ func (b *ElasticSearchIndexer) init() (bool, error) { | |||
} | |||
func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { | |||
// Ignore vendored files in code search | |||
if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { | |||
return nil, nil | |||
} | |||
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). | |||
RunInDir(repo.RepoPath()) | |||
if err != nil { | |||
@@ -14,6 +14,7 @@ import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/graceful" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/queue" | |||
"code.gitea.io/gitea/modules/setting" | |||
"code.gitea.io/gitea/modules/timeutil" | |||
) | |||
@@ -38,7 +39,7 @@ type SearchResultLanguages struct { | |||
Count int | |||
} | |||
// Indexer defines an interface to indexer issues contents | |||
// Indexer defines an interface to index and search code contents | |||
type Indexer interface { | |||
Index(repo *models.Repository, sha string, changes *repoChanges) error | |||
Delete(repoID int64) error | |||
@@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string { | |||
return indexerID[index+1:] | |||
} | |||
// IndexerData represents data stored in the code indexer | |||
type IndexerData struct { | |||
RepoID int64 | |||
IsDelete bool | |||
} | |||
var ( | |||
indexerQueue queue.Queue | |||
) | |||
func index(indexer Indexer, repoID int64) error { | |||
repo, err := models.GetRepositoryByID(repoID) | |||
if err != nil { | |||
return err | |||
} | |||
sha, err := getDefaultBranchSha(repo) | |||
if err != nil { | |||
return err | |||
} | |||
changes, err := getRepoChanges(repo, sha) | |||
if err != nil { | |||
return err | |||
} else if changes == nil { | |||
return nil | |||
} | |||
if err := indexer.Index(repo, sha, changes); err != nil { | |||
return err | |||
} | |||
return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) | |||
} | |||
// Init initialize the repo indexer | |||
func Init() { | |||
if !setting.Indexer.RepoIndexerEnabled { | |||
@@ -74,8 +109,6 @@ func Init() { | |||
return | |||
} | |||
initQueue(setting.Indexer.UpdateQueueLength) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
graceful.GetManager().RunAtTerminate(ctx, func() { | |||
@@ -85,6 +118,46 @@ func Init() { | |||
}) | |||
waitChannel := make(chan time.Duration) | |||
// Create the Queue | |||
switch setting.Indexer.RepoType { | |||
case "bleve", "elasticsearch": | |||
handler := func(data ...queue.Data) { | |||
idx, err := indexer.get() | |||
if idx == nil || err != nil { | |||
log.Error("Codes indexer handler: unable to get indexer!") | |||
return | |||
} | |||
for _, datum := range data { | |||
indexerData, ok := datum.(*IndexerData) | |||
if !ok { | |||
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) | |||
continue | |||
} | |||
log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete) | |||
if indexerData.IsDelete { | |||
if err := indexer.Delete(indexerData.RepoID); err != nil { | |||
log.Error("indexer.Delete: %v", err) | |||
} | |||
} else { | |||
if err := index(indexer, indexerData.RepoID); err != nil { | |||
log.Error("index: %v", err) | |||
continue | |||
} | |||
} | |||
} | |||
} | |||
indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{}) | |||
if indexerQueue == nil { | |||
log.Fatal("Unable to create codes indexer queue") | |||
} | |||
default: | |||
log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType) | |||
} | |||
go func() { | |||
start := time.Now() | |||
var ( | |||
@@ -139,10 +212,11 @@ func Init() { | |||
indexer.set(rIndexer) | |||
go processRepoIndexerOperationQueue(indexer) | |||
// Start processing the queue | |||
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) | |||
if populate { | |||
go populateRepoIndexer() | |||
go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) | |||
} | |||
select { | |||
case waitChannel <- time.Since(start): | |||
@@ -179,3 +253,77 @@ func Init() { | |||
}() | |||
} | |||
} | |||
// DeleteRepoFromIndexer remove all of a repository's entries from the indexer | |||
func DeleteRepoFromIndexer(repo *models.Repository) { | |||
indexData := &IndexerData{RepoID: repo.ID, IsDelete: true} | |||
if err := indexerQueue.Push(indexData); err != nil { | |||
log.Error("Delete repo index data %v failed: %v", indexData, err) | |||
} | |||
} | |||
// UpdateRepoIndexer update a repository's entries in the indexer | |||
func UpdateRepoIndexer(repo *models.Repository) { | |||
indexData := &IndexerData{RepoID: repo.ID} | |||
if err := indexerQueue.Push(indexData); err != nil { | |||
log.Error("Update repo index data %v failed: %v", indexData, err) | |||
} | |||
} | |||
// populateRepoIndexer populate the repo indexer with pre-existing data. This | |||
// should only be run when the indexer is created for the first time. | |||
func populateRepoIndexer(ctx context.Context) { | |||
log.Info("Populating the repo indexer with existing repositories") | |||
exist, err := models.IsTableNotEmpty("repository") | |||
if err != nil { | |||
log.Fatal("System error: %v", err) | |||
} else if !exist { | |||
return | |||
} | |||
// if there is any existing repo indexer metadata in the DB, delete it | |||
// since we are starting afresh. Also, xorm requires deletes to have a | |||
// condition, and we want to delete everything, thus 1=1. | |||
if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { | |||
log.Fatal("System error: %v", err) | |||
} | |||
var maxRepoID int64 | |||
if maxRepoID, err = models.GetMaxID("repository"); err != nil { | |||
log.Fatal("System error: %v", err) | |||
} | |||
// start with the maximum existing repo ID and work backwards, so that we | |||
// don't include repos that are created after gitea starts; such repos will | |||
// already be added to the indexer, and we don't need to add them again. | |||
for maxRepoID > 0 { | |||
select { | |||
case <-ctx.Done(): | |||
log.Info("Repository Indexer population shutdown before completion") | |||
return | |||
default: | |||
} | |||
ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) | |||
if err != nil { | |||
log.Error("populateRepoIndexer: %v", err) | |||
return | |||
} else if len(ids) == 0 { | |||
break | |||
} | |||
for _, id := range ids { | |||
select { | |||
case <-ctx.Done(): | |||
log.Info("Repository Indexer population shutdown before completion") | |||
return | |||
default: | |||
} | |||
if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { | |||
log.Error("indexerQueue.Push: %v", err) | |||
return | |||
} | |||
maxRepoID = id - 1 | |||
} | |||
} | |||
log.Info("Done (re)populating the repo indexer with existing repositories") | |||
} |
@@ -1,154 +0,0 @@ | |||
// Copyright 2019 The Gitea Authors. All rights reserved. | |||
// Use of this source code is governed by a MIT-style | |||
// license that can be found in the LICENSE file. | |||
package code | |||
import ( | |||
"os" | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/graceful" | |||
"code.gitea.io/gitea/modules/log" | |||
) | |||
type repoIndexerOperation struct { | |||
repoID int64 | |||
deleted bool | |||
watchers []chan<- error | |||
} | |||
var repoIndexerOperationQueue chan repoIndexerOperation | |||
func initQueue(queueLength int) { | |||
repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength) | |||
} | |||
func index(indexer Indexer, repoID int64) error { | |||
repo, err := models.GetRepositoryByID(repoID) | |||
if err != nil { | |||
return err | |||
} | |||
sha, err := getDefaultBranchSha(repo) | |||
if err != nil { | |||
return err | |||
} | |||
changes, err := getRepoChanges(repo, sha) | |||
if err != nil { | |||
return err | |||
} else if changes == nil { | |||
return nil | |||
} | |||
if err := indexer.Index(repo, sha, changes); err != nil { | |||
return err | |||
} | |||
return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) | |||
} | |||
func processRepoIndexerOperationQueue(indexer Indexer) { | |||
for { | |||
select { | |||
case op := <-repoIndexerOperationQueue: | |||
var err error | |||
if op.deleted { | |||
if err = indexer.Delete(op.repoID); err != nil { | |||
log.Error("indexer.Delete: %v", err) | |||
} | |||
} else { | |||
if err = index(indexer, op.repoID); err != nil { | |||
log.Error("indexer.Index: %v", err) | |||
} | |||
} | |||
for _, watcher := range op.watchers { | |||
watcher <- err | |||
} | |||
case <-graceful.GetManager().IsShutdown(): | |||
log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) | |||
return | |||
} | |||
} | |||
} | |||
// DeleteRepoFromIndexer remove all of a repository's entries from the indexer | |||
func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { | |||
addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) | |||
} | |||
// UpdateRepoIndexer update a repository's entries in the indexer | |||
func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { | |||
addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) | |||
} | |||
func addOperationToQueue(op repoIndexerOperation) { | |||
select { | |||
case repoIndexerOperationQueue <- op: | |||
break | |||
default: | |||
go func() { | |||
repoIndexerOperationQueue <- op | |||
}() | |||
} | |||
} | |||
// populateRepoIndexer populate the repo indexer with pre-existing data. This | |||
// should only be run when the indexer is created for the first time. | |||
func populateRepoIndexer() { | |||
log.Info("Populating the repo indexer with existing repositories") | |||
isShutdown := graceful.GetManager().IsShutdown() | |||
exist, err := models.IsTableNotEmpty("repository") | |||
if err != nil { | |||
log.Fatal("System error: %v", err) | |||
} else if !exist { | |||
return | |||
} | |||
// if there is any existing repo indexer metadata in the DB, delete it | |||
// since we are starting afresh. Also, xorm requires deletes to have a | |||
// condition, and we want to delete everything, thus 1=1. | |||
if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { | |||
log.Fatal("System error: %v", err) | |||
} | |||
var maxRepoID int64 | |||
if maxRepoID, err = models.GetMaxID("repository"); err != nil { | |||
log.Fatal("System error: %v", err) | |||
} | |||
// start with the maximum existing repo ID and work backwards, so that we | |||
// don't include repos that are created after gitea starts; such repos will | |||
// already be added to the indexer, and we don't need to add them again. | |||
for maxRepoID > 0 { | |||
select { | |||
case <-isShutdown: | |||
log.Info("Repository Indexer population shutdown before completion") | |||
return | |||
default: | |||
} | |||
ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) | |||
if err != nil { | |||
log.Error("populateRepoIndexer: %v", err) | |||
return | |||
} else if len(ids) == 0 { | |||
break | |||
} | |||
for _, id := range ids { | |||
select { | |||
case <-isShutdown: | |||
log.Info("Repository Indexer population shutdown before completion") | |||
return | |||
default: | |||
} | |||
repoIndexerOperationQueue <- repoIndexerOperation{ | |||
repoID: id, | |||
deleted: false, | |||
} | |||
maxRepoID = id - 1 | |||
} | |||
} | |||
log.Info("Done (re)populating the repo indexer with existing repositories") | |||
} |
@@ -106,7 +106,64 @@ func (*DummyQueue) IsEmpty() bool { | |||
return true | |||
} | |||
var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue} | |||
// ImmediateType is the type to execute the function when push | |||
const ImmediateType Type = "immediate" | |||
// NewImmediate creates a new false queue to execute the function when push | |||
func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) { | |||
return &Immediate{ | |||
handler: handler, | |||
}, nil | |||
} | |||
// Immediate represents an direct execution queue | |||
type Immediate struct { | |||
handler HandlerFunc | |||
} | |||
// Run does nothing | |||
func (*Immediate) Run(_, _ func(context.Context, func())) {} | |||
// Push fakes a push of data to the queue | |||
func (q *Immediate) Push(data Data) error { | |||
return q.PushFunc(data, nil) | |||
} | |||
// PushFunc fakes a push of data to the queue with a function. The function is never run. | |||
func (q *Immediate) PushFunc(data Data, f func() error) error { | |||
if f != nil { | |||
if err := f(); err != nil { | |||
return err | |||
} | |||
} | |||
q.handler(data) | |||
return nil | |||
} | |||
// Has always returns false as this queue never does anything | |||
func (*Immediate) Has(Data) (bool, error) { | |||
return false, nil | |||
} | |||
// Flush always returns nil | |||
func (*Immediate) Flush(time.Duration) error { | |||
return nil | |||
} | |||
// FlushWithContext always returns nil | |||
func (*Immediate) FlushWithContext(context.Context) error { | |||
return nil | |||
} | |||
// IsEmpty asserts that the queue is empty | |||
func (*Immediate) IsEmpty() bool { | |||
return true | |||
} | |||
var queuesMap = map[Type]NewQueueFunc{ | |||
DummyQueueType: NewDummyQueue, | |||
ImmediateType: NewImmediate, | |||
} | |||
// RegisteredTypes provides the list of requested types of queues | |||
func RegisteredTypes() []Type { | |||