initial sort of working mq integration

This commit is contained in:
Brad Rydzewski 2016-09-27 19:33:13 -05:00
parent 0b2f1c8e51
commit 6f44450ef8
6 changed files with 67 additions and 42 deletions

View file

@ -3,18 +3,17 @@ package agent
import ( import (
"os" "os"
"os/signal" "os/signal"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/drone/drone/queue" "github.com/drone/drone/queue"
"github.com/drone/mq/stomp" "github.com/drone/mq/stomp"
"github.com/samalba/dockerclient"
"strings"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/samalba/dockerclient"
) )
// AgentCmd is the exported command for starting the drone agent. // AgentCmd is the exported command for starting the drone agent.
@ -160,16 +159,6 @@ func start(c *cli.Context) {
) )
server := strings.TrimRight(c.String("drone-server"), "/") server := strings.TrimRight(c.String("drone-server"), "/")
client, err := stomp.Dial(server)
if err != nil {
logrus.Fatalf("Cannot connect to host %s. %s", server, err)
}
opts := []stomp.MessageOption{
stomp.WithCredentials("x-token", accessToken),
}
if err = client.Connect(opts...); err != nil {
logrus.Fatalf("Cannot connect to host %s. %s", server, err)
}
tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path")) tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path"))
if err == nil { if err == nil {
@ -180,6 +169,8 @@ func start(c *cli.Context) {
logrus.Fatal(err) logrus.Fatal(err)
} }
var client *stomp.Client
handler := func(m *stomp.Message) { handler := func(m *stomp.Message) {
running.Add(1) running.Add(1)
defer func() { defer func() {
@ -205,24 +196,44 @@ func start(c *cli.Context) {
r.run(work) r.run(work)
} }
_, err = client.Subscribe("/queue/pending", stomp.HandlerFunc(handler),
stomp.WithAck("client"),
stomp.WithPrefetch(
c.Int("docker-max-procs"),
),
// stomp.WithSelector(
// fmt.Sprintf("platorm == '%s/%s'",
// c.String("drone-os"),
// c.String("drone-arch"),
// ),
// ),
)
if err != nil {
logrus.Fatalf("Unable to connect to queue. %s", err)
}
handleSignals() handleSignals()
<-client.Done() backoff := c.Duration("backoff")
for {
// dial the drone server to establish a TCP connection.
client, err = stomp.Dial(server)
if err != nil {
logrus.Errorf("Failed to establish server connection, %s, retry in %v", err, backoff)
<-time.After(backoff)
continue
}
opts := []stomp.MessageOption{
stomp.WithCredentials("x-token", accessToken),
}
// initialize the stomp session and authenticate.
if err = client.Connect(opts...); err != nil {
logrus.Errorf("Failed to establish server session, %s, retry in %v", err, backoff)
<-time.After(backoff)
continue
}
// subscribe to the pending build queue.
client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) {
go handler(m) // HACK until we a channel based Subscribe implementation
}),
stomp.WithAck("client"),
stomp.WithPrefetch(
c.Int("docker-max-procs"),
),
)
logrus.Infof("Server connection establish, ready to process builds.")
<-client.Done()
logrus.Warnf("Server connection interrupted, attempting to reconnect.")
}
} }
// tracks running builds // tracks running builds

View file

@ -62,9 +62,9 @@ func (r *pipeline) run(w *queue.Work) {
} }
// signal for canceling the build. // signal for canceling the build.
sub, err := r.drone.Subscribe("/topic/cancels", stomp.HandlerFunc(cancelFunc)) sub, err := r.drone.Subscribe("/topic/cancel", stomp.HandlerFunc(cancelFunc))
if err != nil { if err != nil {
logrus.Errorf("Error subscribing to /topic/cancels. %s", err) logrus.Errorf("Error subscribing to /topic/cancel. %s", err)
} }
defer func() { defer func() {
r.drone.Unsubscribe(sub) r.drone.Unsubscribe(sub)

View file

@ -6,10 +6,10 @@ import (
"github.com/drone/drone/router" "github.com/drone/drone/router"
"github.com/drone/drone/router/middleware" "github.com/drone/drone/router/middleware"
"github.com/gin-gonic/contrib/ginrus"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/gin-gonic/contrib/ginrus"
) )
var serverCmd = cli.Command{ var serverCmd = cli.Command{

View file

@ -156,7 +156,7 @@ func DeleteBuild(c *gin.Context) {
Repo: *repo, Repo: *repo,
Build: *build, Build: *build,
Job: *job, Job: *job,
}) }, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10)))
c.String(204, "") c.String(204, "")
} }

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"time"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -411,13 +412,21 @@ func HandleUpdate(c context.Context, message *stomp.Message) {
logrus.Errorf("Unable to read logs from broker. %s", err) logrus.Errorf("Unable to read logs from broker. %s", err)
return return
} }
<-done
defer func() {
client.Unsubscribe(sub)
client.Send(dest, []byte{}, stomp.WithRetain("remove"))
}()
select {
case <-done:
case <-time.After(30 * time.Second):
logrus.Errorf("Unable to read logs from broker. Timeout. %s", err)
return
}
if err := store.WriteLog(c, job, &buf); err != nil { if err := store.WriteLog(c, job, &buf); err != nil {
logrus.Errorf("Unable to write logs to store. %s", err) logrus.Errorf("Unable to write logs to store. %s", err)
return return
} }
client.Unsubscribe(sub)
client.Send(dest, []byte{}, stomp.WithRetain("remove"))
} }

View file

@ -65,17 +65,17 @@ func LogStream(c *gin.Context) {
ticker := time.NewTicker(pingPeriod) ticker := time.NewTicker(pingPeriod)
defer ticker.Stop() defer ticker.Stop()
logs := make(chan []byte)
done := make(chan bool) done := make(chan bool)
dest := fmt.Sprintf("/topic/logs.%d", job.ID) dest := fmt.Sprintf("/topic/logs.%d", job.ID)
client, _ := stomp.FromContext(c) client, _ := stomp.FromContext(c)
sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) {
defer m.Release()
if m.Header.GetBool("eof") { if m.Header.GetBool("eof") {
done <- true done <- true
return } else {
logs <- m.Body
} }
ws.SetWriteDeadline(time.Now().Add(writeWait)) m.Release()
ws.WriteMessage(websocket.TextMessage, m.Body)
})) }))
if err != nil { if err != nil {
logrus.Errorf("Unable to read logs from broker. %s", err) logrus.Errorf("Unable to read logs from broker. %s", err)
@ -83,10 +83,15 @@ func LogStream(c *gin.Context) {
} }
defer func() { defer func() {
client.Unsubscribe(sub) client.Unsubscribe(sub)
close(done)
close(logs)
}() }()
for { for {
select { select {
case buf := <-logs:
ws.SetWriteDeadline(time.Now().Add(writeWait))
ws.WriteMessage(websocket.TextMessage, buf)
case <-done: case <-done:
return return
case <-ticker.C: case <-ticker.C:
@ -139,9 +144,9 @@ func EventStream(c *gin.Context) {
return return
} }
defer func() { defer func() {
client.Unsubscribe(sub)
close(quitc) close(quitc)
close(eventc) close(eventc)
client.Unsubscribe(sub)
}() }()
go func() { go func() {