diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 8f0593acff..8676561cf1 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -6,6 +6,7 @@ package issues import ( "context" + "fmt" "os" "sync" "time" @@ -51,9 +52,10 @@ type Indexer interface { } type indexerHolder struct { - indexer Indexer - mutex sync.RWMutex - cond *sync.Cond + indexer Indexer + mutex sync.RWMutex + cond *sync.Cond + cancelled bool } func newIndexerHolder() *indexerHolder { @@ -62,6 +64,13 @@ func newIndexerHolder() *indexerHolder { return h } +func (h *indexerHolder) cancel() { + h.mutex.Lock() + defer h.mutex.Unlock() + h.cancelled = true + h.cond.Broadcast() +} + func (h *indexerHolder) set(indexer Indexer) { h.mutex.Lock() defer h.mutex.Unlock() @@ -72,7 +81,7 @@ func (h *indexerHolder) set(indexer Indexer) { func (h *indexerHolder) get() Indexer { h.mutex.RLock() defer h.mutex.RUnlock() - if h.indexer == nil { + if h.indexer == nil && !h.cancelled { h.cond.Wait() } return h.indexer @@ -93,6 +102,12 @@ func InitIssueIndexer(syncReindex bool) { switch setting.Indexer.IssueType { case "bleve": handler := func(data ...queue.Data) { + indexer := holder.get() + if indexer == nil { + log.Error("Unable to get indexer!") + return + } + iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber) for _, datum := range data { indexerData, ok := datum.(*IndexerData) @@ -102,12 +117,12 @@ func InitIssueIndexer(syncReindex bool) { } log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) if indexerData.IsDelete { - _ = holder.get().Delete(indexerData.IDs...) + _ = indexer.Delete(indexerData.IDs...) continue } iData = append(iData, indexerData) } - if err := holder.get().Index(iData); err != nil { + if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) } } @@ -132,6 +147,7 @@ func InitIssueIndexer(syncReindex bool) { issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) exist, err := issueIndexer.Init() if err != nil { + holder.cancel() log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) } populate = !exist @@ -153,6 +169,7 @@ func InitIssueIndexer(syncReindex bool) { issueIndexer := &DBIndexer{} holder.set(issueIndexer) default: + holder.cancel() log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } @@ -168,10 +185,14 @@ func InitIssueIndexer(syncReindex bool) { } } waitChannel <- time.Since(start) + close(waitChannel) }() if syncReindex { - <-waitChannel + select { + case <-waitChannel: + case <-graceful.GetManager().IsShutdown(): + } } else if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout @@ -181,6 +202,8 @@ func InitIssueIndexer(syncReindex bool) { select { case duration := <-waitChannel: log.Info("Issue Indexer Initialization took %v", duration) + case <-graceful.GetManager().IsShutdown(): + log.Warn("Shutdown occurred before issue index initialisation was complete") case <-time.After(timeout): if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok { shutdownable.Terminate() @@ -293,7 +316,13 @@ func DeleteRepoIssueIndexer(repo *models.Repository) { // SearchIssuesByKeyword search issue ids by keywords and repo id func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 - res, err := holder.get().Search(keyword, repoIDs, 1000, 0) + indexer := holder.get() + + if indexer == nil { + log.Error("Unable to get indexer!") + return nil, fmt.Errorf("unable to get issue indexer") + } + res, err := indexer.Search(keyword, repoIDs, 1000, 0) if err != nil { return nil, err }