Merge pull request #40 from laszlocph/pause-queue

Pause queue
This commit is contained in:
Laszlo Fogas 2019-06-28 09:23:57 +02:00 committed by GitHub
commit 936fc8b4cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 110 additions and 1 deletions

View file

@ -31,6 +31,7 @@ type fifo struct {
running map[string]*entry running map[string]*entry
pending *list.List pending *list.List
extension time.Duration extension time.Duration
paused bool
} }
// New returns a new fifo queue. // New returns a new fifo queue.
@ -40,6 +41,7 @@ func New() Queue {
running: map[string]*entry{}, running: map[string]*entry{},
pending: list.New(), pending: list.New(),
extension: time.Minute * 10, extension: time.Minute * 10,
paused: false,
} }
} }
@ -167,14 +169,32 @@ func (q *fifo) Info(c context.Context) InfoT {
for _, entry := range q.running { for _, entry := range q.running {
stats.Running = append(stats.Running, entry.item) stats.Running = append(stats.Running, entry.item)
} }
stats.Paused = q.paused
q.Unlock() q.Unlock()
return stats return stats
} }
func (q *fifo) Pause() {
q.Lock()
q.paused = true
q.Unlock()
}
func (q *fifo) Resume() {
q.Lock()
q.paused = false
q.Unlock()
go q.process()
}
// helper function that loops through the queue and attempts to // helper function that loops through the queue and attempts to
// match the item to a single subscriber. // match the item to a single subscriber.
func (q *fifo) process() { func (q *fifo) process() {
if q.paused {
return
}
defer func() { defer func() {
// the risk of panic is low. This code can probably be removed // the risk of panic is low. This code can probably be removed
// once the code has been used in real world installs without issue. // once the code has been used in real world installs without issue.

View file

@ -238,6 +238,52 @@ func TestFifoCancel(t *testing.T) {
} }
} }
func TestFifoPause(t *testing.T) {
task1 := &Task{
ID: "1",
}
q := New().(*fifo)
var wg sync.WaitGroup
wg.Add(1)
go func() {
_, _ = q.Poll(noContext, func(*Task) bool { return true })
wg.Done()
}()
q.Pause()
t0 := time.Now()
q.Push(noContext, task1)
time.Sleep(20 * time.Millisecond)
q.Resume()
wg.Wait()
t1 := time.Now()
if t1.Sub(t0) < 20 * time.Millisecond {
t.Errorf("Should have waited til resume")
}
q.Pause()
q.Push(noContext, task1)
q.Resume()
_, _ = q.Poll(noContext, func(*Task) bool { return true })
}
func TestFifoPauseResume(t *testing.T) {
task1 := &Task{
ID: "1",
}
q := New().(*fifo)
q.Pause()
q.Push(noContext, task1)
q.Resume()
_, _ = q.Poll(noContext, func(*Task) bool { return true })
}
func TestShouldRun(t *testing.T) { func TestShouldRun(t *testing.T) {
task := &Task{ task := &Task{
ID: "2", ID: "2",

View file

@ -93,6 +93,7 @@ type InfoT struct {
Running int `json:"running_count"` Running int `json:"running_count"`
Complete int `json:"completed_count"` Complete int `json:"completed_count"`
} `json:"stats"` } `json:"stats"`
Paused bool
} }
// Filter filters tasks in the queue. If the Filter returns false, // Filter filters tasks in the queue. If the Filter returns false,
@ -128,4 +129,10 @@ type Queue interface {
// Info returns internal queue information. // Info returns internal queue information.
Info(c context.Context) InfoT Info(c context.Context) InfoT
// Stops the queue from handing out new work items in Poll
Pause()
// Starts the queue again, Poll returns new items
Resume()
} }

View file

@ -148,6 +148,22 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl
) )
} }
queue := e.Group("/api/queue")
{
queue.GET("/pause",
session.MustAdmin(),
server.PauseQueue,
)
queue.GET("/resume",
session.MustAdmin(),
server.ResumeQueue,
)
queue.GET("/norunningbuilds",
session.MustAdmin(),
server.BlockTilQueueHasRunningItem,
)
}
auth := e.Group("/authorize") auth := e.Group("/authorize")
{ {
auth.GET("", server.HandleAuth) auth.GET("", server.HandleAuth)

View file

@ -20,6 +20,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http"
"regexp" "regexp"
"strconv" "strconv"
"time" "time"
@ -51,6 +52,26 @@ func GetQueueInfo(c *gin.Context) {
) )
} }
func PauseQueue(c *gin.Context) {
Config.Services.Queue.Pause()
c.Status(http.StatusOK)
}
func ResumeQueue(c *gin.Context) {
Config.Services.Queue.Resume()
c.Status(http.StatusOK)
}
func BlockTilQueueHasRunningItem(c *gin.Context) {
for {
info := Config.Services.Queue.Info(c)
if info.Stats.Running == 0 {
break
}
}
c.Status(http.StatusOK)
}
func PostHook(c *gin.Context) { func PostHook(c *gin.Context) {
remote_ := remote.FromContext(c) remote_ := remote.FromContext(c)

View file

@ -77,7 +77,6 @@ func EventStreamSSE(c *gin.Context) {
}() }()
go func() { go func() {
// TODO remove this from global config
Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) { Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) {
defer func() { defer func() {
recover() // fix #2480 recover() // fix #2480