Pause/Resume queue

This commit is contained in:
Laszlo Fogas 2019-06-28 08:29:57 +02:00
parent 8f8276f219
commit d39dd6f2ff
5 changed files with 50 additions and 1 deletions

View file

@ -31,6 +31,7 @@ type fifo struct {
running map[string]*entry running map[string]*entry
pending *list.List pending *list.List
extension time.Duration extension time.Duration
paused bool
} }
// New returns a new fifo queue. // New returns a new fifo queue.
@ -40,6 +41,7 @@ func New() Queue {
running: map[string]*entry{}, running: map[string]*entry{},
pending: list.New(), pending: list.New(),
extension: time.Minute * 10, extension: time.Minute * 10,
paused: false,
} }
} }
@ -167,14 +169,32 @@ func (q *fifo) Info(c context.Context) InfoT {
for _, entry := range q.running { for _, entry := range q.running {
stats.Running = append(stats.Running, entry.item) stats.Running = append(stats.Running, entry.item)
} }
stats.Paused = q.paused
q.Unlock() q.Unlock()
return stats return stats
} }
func (q *fifo) Pause() {
q.Lock()
q.paused = true
q.Unlock()
}
func (q *fifo) Resume() {
q.Lock()
q.paused = false
q.Unlock()
go q.process()
}
// helper function that loops through the queue and attempts to // helper function that loops through the queue and attempts to
// match the item to a single subscriber. // match the item to a single subscriber.
func (q *fifo) process() { func (q *fifo) process() {
if q.paused {
return
}
defer func() { defer func() {
// the risk of panic is low. This code can probably be removed // the risk of panic is low. This code can probably be removed
// once the code has been used in real world installs without issue. // once the code has been used in real world installs without issue.

View file

@ -93,6 +93,7 @@ type InfoT struct {
Running int `json:"running_count"` Running int `json:"running_count"`
Complete int `json:"completed_count"` Complete int `json:"completed_count"`
} `json:"stats"` } `json:"stats"`
Paused bool
} }
// Filter filters tasks in the queue. If the Filter returns false, // Filter filters tasks in the queue. If the Filter returns false,
@ -128,4 +129,10 @@ type Queue interface {
// Info returns internal queue information. // Info returns internal queue information.
Info(c context.Context) InfoT Info(c context.Context) InfoT
// Stops the queue from handing out new work items in Poll
Pause()
// Starts the queue again, Poll returns new items
Resume()
} }

View file

@ -148,6 +148,18 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl
) )
} }
queue := e.Group("/api/queue")
{
queue.GET("/pause",
session.MustAdmin(),
server.PauseQueue,
)
queue.GET("/resume",
session.MustAdmin(),
server.ResumeQueue,
)
}
auth := e.Group("/authorize") auth := e.Group("/authorize")
{ {
auth.GET("", server.HandleAuth) auth.GET("", server.HandleAuth)

View file

@ -20,6 +20,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http"
"regexp" "regexp"
"strconv" "strconv"
"time" "time"
@ -51,6 +52,16 @@ func GetQueueInfo(c *gin.Context) {
) )
} }
func PauseQueue(c *gin.Context) {
Config.Services.Queue.Pause()
c.Status(http.StatusOK)
}
func ResumeQueue(c *gin.Context) {
Config.Services.Queue.Resume()
c.Status(http.StatusOK)
}
func PostHook(c *gin.Context) { func PostHook(c *gin.Context) {
remote_ := remote.FromContext(c) remote_ := remote.FromContext(c)

View file

@ -77,7 +77,6 @@ func EventStreamSSE(c *gin.Context) {
}() }()
go func() { go func() {
// TODO remove this from global config
Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) { Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) {
defer func() { defer func() {
recover() // fix #2480 recover() // fix #2480