From a14d21f5b98f120807d109acf66177948b7ef35e Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sun, 17 May 2015 23:07:10 -0700 Subject: [PATCH] improved streaming w/ websocket. potential for auto re-connect, resume --- .../static/scripts/controllers/builds.js | 2 +- .../static/scripts/services/logs.js | 25 ++-- pkg/runner/builtin/runner.go | 22 +-- pkg/server/ws.go | 136 ++++-------------- 4 files changed, 55 insertions(+), 130 deletions(-) diff --git a/cmd/drone-server/static/scripts/controllers/builds.js b/cmd/drone-server/static/scripts/controllers/builds.js index 3f73ae937..408cd2d12 100644 --- a/cmd/drone-server/static/scripts/controllers/builds.js +++ b/cmd/drone-server/static/scripts/controllers/builds.js @@ -87,7 +87,7 @@ // subscribes to the build otuput. logs.subscribe(fullName, number, step, function(data){ - term.innerHTML += convert.toHtml(data); + term.innerHTML += convert.toHtml(data.replace("\\n","\n")); if (tail) { // scrolls to the bottom of the page if enabled $window.scrollTo(0, $window.document.body.scrollHeight); diff --git a/cmd/drone-server/static/scripts/services/logs.js b/cmd/drone-server/static/scripts/services/logs.js index a1e261807..58893282f 100644 --- a/cmd/drone-server/static/scripts/services/logs.js +++ b/cmd/drone-server/static/scripts/services/logs.js @@ -20,31 +20,34 @@ }; var callback, - websocket, + events, token = localStorage.getItem('access_token'); this.subscribe = function (repoName, number, step, _callback) { callback = _callback; - var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'), - route = [proto, "://", $window.location.host, '/api/stream/', repoName, '/', number, '/', step, '?access_token=', token].join(''); - - websocket = new WebSocket(route); - websocket.onmessage = function (event) { + var route = ['/api/stream/', repoName, '/', number, '/', step, '?access_token=', token].join('') + events = new EventSource(route, { withCredentials: true }); + events.onmessage = function (event) { if (callback !== undefined) { callback(event.data); } }; - websocket.onclose = function (event) { - console.log('logs websocket closed'); + events.onerror = function (event) { + callback = undefined; + if (events !== undefined) { + events.close(); + events = undefined; + } + console.log('user event stream closed due to error.', event); }; }; this.unsubscribe = function () { callback = undefined; - if (websocket !== undefined) { - websocket.close(); - websocket = undefined; + if (events !== undefined) { + events.close(); + events = undefined; } }; } diff --git a/pkg/runner/builtin/runner.go b/pkg/runner/builtin/runner.go index 8bba9f45e..e182204ae 100644 --- a/pkg/runner/builtin/runner.go +++ b/pkg/runner/builtin/runner.go @@ -224,6 +224,7 @@ func (r *Runner) Logs(build *common.Build) (io.ReadCloser, error) { if err != nil { return nil, err } + // verify the container is running. if not we'll // do an exponential backoff and attempt to wait if !info.State.Running { @@ -242,16 +243,17 @@ func (r *Runner) Logs(build *common.Build) (io.ReadCloser, error) { } } - rc, err := client.ContainerLogs(info.Id, logOptsTail) - if err != nil { - return nil, err - } - pr, pw := io.Pipe() - go func() { - defer rc.Close() - docker.StdCopy(pw, pw, rc) - }() - return pr, nil + return client.ContainerLogs(info.Id, logOptsTail) + // rc, err := client.ContainerLogs(info.Id, logOptsTail) + // if err != nil { + // return nil, err + // } + // pr, pw := io.Pipe() + // go func() { + // defer rc.Close() + // docker.StdCopy(pw, pw, rc) + // }() + // return pr, nil } func cname(build *common.Build) string { diff --git a/pkg/server/ws.go b/pkg/server/ws.go index 40ca7696b..3a49f4f77 100644 --- a/pkg/server/ws.go +++ b/pkg/server/ws.go @@ -1,39 +1,17 @@ package server import ( - "bufio" - "encoding/json" "io" - "net/http" "strconv" - "time" "github.com/drone/drone/pkg/bus" log "github.com/Sirupsen/logrus" + "github.com/drone/drone/pkg/docker" "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" - - // "github.com/koding/websocketproxy" + "github.com/manucorporat/sse" ) -const ( - // Time allowed to write the message to the client. - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the client. - pongWait = 60 * time.Second - - // Send pings to client with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 -) - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { return true }, -} - // GetRepoEvents will upgrade the connection to a Websocket and will stream // event updates to the browser. func GetRepoEvents(c *gin.Context) { @@ -45,6 +23,7 @@ func GetRepoEvents(c *gin.Context) { bus_.Subscribe(eventc) defer func() { bus_.Unsubscribe(eventc) + close(eventc) log.Infof("closed event stream") }() @@ -57,9 +36,10 @@ func GetRepoEvents(c *gin.Context) { } if event.Kind == bus.EventRepo && event.Name == repo.FullName { - d := map[string]interface{}{} - json.Unmarshal(event.Msg, &d) - c.SSEvent("message", d) + sse.Encode(w, sse.Event{ + Event: "message", + Data: string(event.Msg), + }) } case <-c.Writer.CloseNotify(): return false @@ -75,11 +55,7 @@ func GetStream(c *gin.Context) { commitseq, _ := strconv.Atoi(c.Params.ByName("build")) buildseq, _ := strconv.Atoi(c.Params.ByName("number")) - // agent, err := store.BuildAgent(repo.FullName, build) - // if err != nil { - // c.Fail(404, err) - // return - // } + c.Writer.Header().Set("Content-Type", "text/event-stream") commit, err := store.CommitSeq(repo, commitseq) if err != nil { @@ -97,89 +73,33 @@ func GetStream(c *gin.Context) { c.Fail(404, err) return } + go func() { + <-c.Writer.CloseNotify() + rc.Close() + }() - // upgrade the websocket - ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - c.Fail(400, err) - return - } + rw := &StreamWriter{c.Writer, 0} - var ticker = time.NewTicker(pingPeriod) - var out = make(chan []byte) defer func() { - log.Infof("closed stdout websocket") - ticker.Stop() + log.Infof("closed log stream") rc.Close() - ws.Close() }() - go func() { - for { - select { - case <-c.Writer.CloseNotify(): - rc.Close() - ws.Close() - return - case line := <-out: - ws.WriteMessage(websocket.TextMessage, line) - case <-ticker.C: - ws.SetWriteDeadline(time.Now().Add(writeWait)) - err := ws.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - rc.Close() - ws.Close() - return - } - } - } - }() - - go func() { - rd := bufio.NewReader(rc) - for { - str, err := rd.ReadBytes('\n') - - if err != nil { - break - } - if len(str) == 0 { - break - } - - out <- str - } - rc.Close() - ws.Close() - }() - - readWebsocket(ws) - - // url_, err := url.Parse("ws://" + agent.Addr) - // if err != nil { - // c.Fail(500, err) - // return - // } - // url_.Path = fmt.Sprintf("/stream/%s/%v/%v", repo.FullName, build, task) - // proxy := websocketproxy.NewProxy(url_) - // proxy.ServeHTTP(c.Writer, c.Request) - - // log.Debugf("closed websocket") + docker.StdCopy(rw, rw, rc) } -// readWebsocket will block while reading the websocket data -func readWebsocket(ws *websocket.Conn) { - defer ws.Close() - ws.SetReadLimit(512) - ws.SetReadDeadline(time.Now().Add(pongWait)) - ws.SetPongHandler(func(string) error { - ws.SetReadDeadline(time.Now().Add(pongWait)) - return nil +type StreamWriter struct { + writer gin.ResponseWriter + count int +} + +func (w *StreamWriter) Write(data []byte) (int, error) { + var err = sse.Encode(w.writer, sse.Event{ + Id: strconv.Itoa(w.count), + Event: "message", + Data: string(data), }) - for { - _, _, err := ws.ReadMessage() - if err != nil { - break - } - } + w.writer.Flush() + w.count += len(data) + return len(data), err }