From d39dd6f2ff0ffc8e7fd5e7cc8eacce3b5b6e8f56 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Fri, 28 Jun 2019 08:29:57 +0200 Subject: [PATCH] Pause/Resume queue --- cncd/queue/fifo.go | 20 ++++++++++++++++++++ cncd/queue/queue.go | 7 +++++++ router/router.go | 12 ++++++++++++ server/hook.go | 11 +++++++++++ server/stream.go | 1 - 5 files changed, 50 insertions(+), 1 deletion(-) diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index c0428de9a..bc8936ba6 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -31,6 +31,7 @@ type fifo struct { running map[string]*entry pending *list.List extension time.Duration + paused bool } // New returns a new fifo queue. @@ -40,6 +41,7 @@ func New() Queue { running: map[string]*entry{}, pending: list.New(), extension: time.Minute * 10, + paused: false, } } @@ -167,14 +169,32 @@ func (q *fifo) Info(c context.Context) InfoT { for _, entry := range q.running { stats.Running = append(stats.Running, entry.item) } + stats.Paused = q.paused q.Unlock() return stats } +func (q *fifo) Pause() { + q.Lock() + q.paused = true + q.Unlock() +} + +func (q *fifo) Resume() { + q.Lock() + q.paused = false + q.Unlock() + go q.process() +} + // helper function that loops through the queue and attempts to // match the item to a single subscriber. func (q *fifo) process() { + if q.paused { + return + } + defer func() { // the risk of panic is low. This code can probably be removed // once the code has been used in real world installs without issue. diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index 2b70d3285..fc1641c36 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -93,6 +93,7 @@ type InfoT struct { Running int `json:"running_count"` Complete int `json:"completed_count"` } `json:"stats"` + Paused bool } // Filter filters tasks in the queue. If the Filter returns false, @@ -128,4 +129,10 @@ type Queue interface { // Info returns internal queue information. Info(c context.Context) InfoT + + // Stops the queue from handing out new work items in Poll + Pause() + + // Starts the queue again, Poll returns new items + Resume() } diff --git a/router/router.go b/router/router.go index e9956c709..5968d64b2 100644 --- a/router/router.go +++ b/router/router.go @@ -148,6 +148,18 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl ) } + queue := e.Group("/api/queue") + { + queue.GET("/pause", + session.MustAdmin(), + server.PauseQueue, + ) + queue.GET("/resume", + session.MustAdmin(), + server.ResumeQueue, + ) + } + auth := e.Group("/authorize") { auth.GET("", server.HandleAuth) diff --git a/server/hook.go b/server/hook.go index c5f0af973..9f82a172d 100644 --- a/server/hook.go +++ b/server/hook.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "math/rand" + "net/http" "regexp" "strconv" "time" @@ -51,6 +52,16 @@ func GetQueueInfo(c *gin.Context) { ) } +func PauseQueue(c *gin.Context) { + Config.Services.Queue.Pause() + c.Status(http.StatusOK) +} + +func ResumeQueue(c *gin.Context) { + Config.Services.Queue.Resume() + c.Status(http.StatusOK) +} + func PostHook(c *gin.Context) { remote_ := remote.FromContext(c) diff --git a/server/stream.go b/server/stream.go index 56915e332..c4d42341c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -77,7 +77,6 @@ func EventStreamSSE(c *gin.Context) { }() go func() { - // TODO remove this from global config Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) { defer func() { recover() // fix #2480