From 27aadca029a081591f782ec508adb237d595f7b1 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sat, 16 May 2015 19:46:12 -0700 Subject: [PATCH] upgraded to latest version of gin to take advantage of SSE --- .drone.yml | 1 + server/server.go | 32 +++++++-------- server/static/scripts/services/repos.js | 20 +++++---- server/ws.go | 54 +++++++------------------ 4 files changed, 40 insertions(+), 67 deletions(-) diff --git a/.drone.yml b/.drone.yml index 47560fed0..8420f27a9 100644 --- a/.drone.yml +++ b/.drone.yml @@ -5,6 +5,7 @@ env: - GOROOT=/usr/local/go - PATH=$PATH:$GOROOT/bin:$GOPATH/bin script: + - git clone git://github.com/gin-gonic/gin.git $GOPATH/src/github.com/gin-gonic/gin - go get -u github.com/jteeuwen/go-bindata/... - make bindata deps - make build diff --git a/server/server.go b/server/server.go index c3a1a92b3..31f83d17a 100644 --- a/server/server.go +++ b/server/server.go @@ -24,8 +24,8 @@ func SetQueue(q queue.Queue) gin.HandlerFunc { } func ToQueue(c *gin.Context) queue.Queue { - v, err := c.Get("queue") - if err != nil { + v, ok := c.Get("queue") + if !ok { return nil } return v.(queue.Queue) @@ -39,16 +39,16 @@ func SetBus(r eventbus.Bus) gin.HandlerFunc { } func ToBus(c *gin.Context) eventbus.Bus { - v, err := c.Get("eventbus") - if err != nil { + v, ok := c.Get("eventbus") + if !ok { return nil } return v.(eventbus.Bus) } func ToRemote(c *gin.Context) remote.Remote { - v, err := c.Get("remote") - if err != nil { + v, ok := c.Get("remote") + if !ok { return nil } return v.(remote.Remote) @@ -62,8 +62,8 @@ func SetRemote(r remote.Remote) gin.HandlerFunc { } func ToRunner(c *gin.Context) runner.Runner { - v, err := c.Get("runner") - if err != nil { + v, ok := c.Get("runner") + if !ok { return nil } return v.(runner.Runner) @@ -77,8 +77,8 @@ func SetRunner(r runner.Runner) gin.HandlerFunc { } func ToSettings(c *gin.Context) *settings.Settings { - v, err := c.Get("settings") - if err != nil { + v, ok := c.Get("settings") + if !ok { return nil } return v.(*settings.Settings) @@ -92,24 +92,24 @@ func SetSettings(s *settings.Settings) gin.HandlerFunc { } func ToPerm(c *gin.Context) *common.Perm { - v, err := c.Get("perm") - if err != nil { + v, ok := c.Get("perm") + if !ok { return nil } return v.(*common.Perm) } func ToUser(c *gin.Context) *common.User { - v, err := c.Get("user") - if err != nil { + v, ok := c.Get("user") + if !ok { return nil } return v.(*common.User) } func ToRepo(c *gin.Context) *common.Repo { - v, err := c.Get("repo") - if err != nil { + v, ok := c.Get("repo") + if !ok { return nil } return v.(*common.Repo) diff --git a/server/static/scripts/services/repos.js b/server/static/scripts/services/repos.js index 4f2216604..5fc306fa8 100644 --- a/server/static/scripts/services/repos.js +++ b/server/static/scripts/services/repos.js @@ -75,7 +75,7 @@ var callback, - websocket, + events, token = localStorage.getItem('access_token'); /** @@ -86,25 +86,23 @@ this.subscribe = function(repo, _callback) { callback = _callback; - var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'), - route = [proto, "://", $window.location.host, '/api/stream/'+ repo +'?access_token=', token].join(''); - - websocket = new WebSocket(route); - websocket.onmessage = function (event) { + events = new EventSource("/api/stream/" + repo + "?access_token=" + token, { withCredentials: true }); + events.onmessage = function (event) { + console.log(event); if (callback !== undefined) { callback(angular.fromJson(event.data)); } }; - websocket.onclose = function (event) { - console.log('user websocket closed'); + events.onerror = function (event) { + console.log('user event stream closed due to error.', event); }; }; this.unsubscribe = function() { callback = undefined; - if (websocket !== undefined) { - websocket.close(); - websocket = undefined; + if (events !== undefined) { + events.close(); + events = undefined; } }; } diff --git a/server/ws.go b/server/ws.go index 56da691fa..132b70ec0 100644 --- a/server/ws.go +++ b/server/ws.go @@ -2,6 +2,7 @@ package server import ( "bufio" + "io" "net/http" "strconv" "time" @@ -37,54 +38,27 @@ var upgrader = websocket.Upgrader{ func GetRepoEvents(c *gin.Context) { bus := ToBus(c) repo := ToRepo(c) + c.Writer.Header().Set("Content-Type", "text/event-stream") - // upgrade the websocket - ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - c.Fail(400, err) - return - } - - ticker := time.NewTicker(pingPeriod) - eventc := make(chan *eventbus.Event) + eventc := make(chan *eventbus.Event, 1) bus.Subscribe(eventc) defer func() { bus.Unsubscribe(eventc) - ticker.Stop() - ws.Close() - close(eventc) - log.Infof("closed websocket") + log.Infof("closed event stream") }() - go func() { - for { - select { - case <-c.Writer.CloseNotify(): - ws.Close() - return - case event := <-eventc: - if event == nil { - log.Infof("closed websocket") - ws.Close() - return - } - if event.Kind == eventbus.EventRepo && event.Name == repo.FullName { - ws.WriteMessage(websocket.TextMessage, event.Msg) - break - } - case <-ticker.C: - ws.SetWriteDeadline(time.Now().Add(writeWait)) - err := ws.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - log.Infof("closed websocket") - ws.Close() - return - } - } + c.Stream(func(w io.Writer) bool { + event := <-eventc + if event == nil { + return false + } + if event.Kind == eventbus.EventRepo && + event.Name == repo.FullName { + c.SSEvent("message", event.Msg) } - }() - readWebsocket(ws) + return true + }) } func GetStream(c *gin.Context) {