From ad80facbbdd7de80489d02598efc3e703ad52a3e Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Wed, 29 Apr 2015 19:57:43 -0700 Subject: [PATCH] websocket output working --- common/agent.go | 6 ++-- common/httputil/httputil.go | 2 ++ datastore/bolt/bolt.go | 2 +- datastore/bolt/build.go | 2 +- datastore/datastore.go | 6 ++-- drone.go | 8 ++++- server/builds.go | 2 +- server/queue.go | 33 ++++++++++++++++-- server/static/scripts/controllers/builds.js | 28 +++++++++++----- server/static/scripts/drone.js | 3 +- server/static/scripts/services/feed.js | 37 +++++++++++++-------- server/static/scripts/services/logs.js | 29 ++++++++++++++++ server/ws.go | 28 ++++++++++++++++ 13 files changed, 150 insertions(+), 36 deletions(-) diff --git a/common/agent.go b/common/agent.go index 4e4a8f71a..fc35e0883 100644 --- a/common/agent.go +++ b/common/agent.go @@ -3,7 +3,7 @@ package common // Agent represents a worker that has connected // to the system in order to perform work type Agent struct { - Name string - Addr string - IsHealthy bool + Name string `json:"name"` + Addr string `json:"addr"` + IsHealthy bool `json:"is_healthy"` } diff --git a/common/httputil/httputil.go b/common/httputil/httputil.go index 4affaccec..c22c16933 100644 --- a/common/httputil/httputil.go +++ b/common/httputil/httputil.go @@ -59,6 +59,8 @@ func GetHost(r *http.Request) string { return r.Header.Get("X-Host") case len(r.Header.Get("XFF")) != 0: return r.Header.Get("XFF") + case len(r.Header.Get("X-Real-IP")) != 0: + return r.Header.Get("X-Real-IP") default: return "localhost:8080" } diff --git a/datastore/bolt/bolt.go b/datastore/bolt/bolt.go index dfcbc5e54..ffbd9dd10 100644 --- a/datastore/bolt/bolt.go +++ b/datastore/bolt/bolt.go @@ -23,7 +23,7 @@ var ( bucketRepoParams = []byte("repo_params") bucketRepoUsers = []byte("repo_users") bucketBuild = []byte("build") - bucketBuildAgent = []byte("build_agent") + bucketBuildAgent = []byte("build_agents") bucketBuildStatus = []byte("build_status") bucketBuildLogs = []byte("build_logs") bucketBuildSeq = []byte("build_seq") diff --git a/datastore/bolt/build.go b/datastore/bolt/build.go index 90e34b251..444a4035b 100644 --- a/datastore/bolt/build.go +++ b/datastore/bolt/build.go @@ -223,7 +223,7 @@ func (db *DB) SetBuildAgent(repo string, build int, agent *common.Agent) error { }) } -func (db *DB) DelBuildAgent(repo string, build int, agent *common.Agent) error { +func (db *DB) DelBuildAgent(repo string, build int) error { key := []byte(repo + "/" + strconv.Itoa(build)) return db.Update(func(t *bolt.Tx) error { return delete(t, bucketBuildAgent, key) diff --git a/datastore/datastore.go b/datastore/datastore.go index 31df86143..1438ee3e0 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -103,7 +103,7 @@ type Datastore interface { // BuildAgent returns the agent that is being // used to execute the build. - // BuildAgent(string, int) (*common.Agent, error) + BuildAgent(string, int) (*common.Agent, error) // SetBuild inserts or updates a build for the named // repository. The build number is incremented and @@ -126,11 +126,11 @@ type Datastore interface { // SetBuildAgent insert or updates the agent that is // running a build. - // SetBuildAgent(string, int, *common.Agent) error + SetBuildAgent(string, int, *common.Agent) error // DelBuildAgent purges the referce to the agent // that ran a build. - // DelBuildAgent(string, int, *common.Agent) error + DelBuildAgent(string, int) error // LogReader gets the task logs at index N for // the named repository and build number. diff --git a/drone.go b/drone.go index 4c457326e..04c72519c 100644 --- a/drone.go +++ b/drone.go @@ -124,7 +124,13 @@ func main() { events := api.Group("/stream") { events.GET("/user", server.GetEvents) - //events.GET("/logs/:owner/:name/:build/:number") + + stream := events.Group("/logs") + { + stream.Use(server.SetRepo()) + stream.Use(server.SetPerm()) + stream.GET("/:owner/:name/:build/:number", server.GetStream) + } } auth := r.Group("/authorize") diff --git a/server/builds.go b/server/builds.go index 2f89bacb7..265c36d4d 100644 --- a/server/builds.go +++ b/server/builds.go @@ -133,7 +133,7 @@ func RunBuild(c *gin.Context) { // must not restart a running build if build.State == common.StatePending || build.State == common.StateRunning { - c.Fail(409, err) + c.AbortWithStatus(409) return } diff --git a/server/queue.go b/server/queue.go index 1bdbdf2d9..486ef652b 100644 --- a/server/queue.go +++ b/server/queue.go @@ -3,8 +3,10 @@ package server import ( "io" "io/ioutil" + "net" "strconv" + log "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" @@ -19,12 +21,35 @@ import ( // GET /queue/pull func PollBuild(c *gin.Context) { queue := ToQueue(c) + store := ToDatastore(c) + agent := &common.Agent{ + Addr: c.Request.RemoteAddr, + } + + // extact the host port and name and + // replace with the default agent port (1999) + host, _, err := net.SplitHostPort(agent.Addr) + if err == nil { + agent.Addr = host + } + agent.Addr = net.JoinHostPort(agent.Addr, "1999") + + log.Infof("agent connected and polling builds at %s", agent.Addr) + work := queue.PullClose(c.Writer) if work == nil { c.AbortWithStatus(500) - } else { - c.JSON(200, work) + return } + + // TODO (bradrydzewski) decide how we want to handle a failure here + // still not sure exact behavior we want ... + err = store.SetBuildAgent(work.Repo.FullName, work.Build.Number, agent) + if err != nil { + log.Errorf("error persisting build agent. %s", err) + } + + c.JSON(200, work) } // GET /queue/push/:owner/:repo @@ -42,6 +67,10 @@ func PushBuild(c *gin.Context) { return } + if in.State != common.StatePending && in.State != common.StateRunning { + store.DelBuildAgent(repo.FullName, build.Number) + } + build.Duration = in.Duration build.Started = in.Started build.Finished = in.Finished diff --git a/server/static/scripts/controllers/builds.js b/server/static/scripts/controllers/builds.js index cd5ee8ed6..c05b0ed37 100644 --- a/server/static/scripts/controllers/builds.js +++ b/server/static/scripts/controllers/builds.js @@ -4,7 +4,7 @@ * BuildsCtrl responsible for rendering the repo's * recent build history. */ - function BuildsCtrl($scope, $routeParams, builds, repos, users, feed) { + function BuildsCtrl($scope, $routeParams, builds, repos, users, feed, logs) { var owner = $routeParams.owner; var name = $routeParams.name; @@ -83,6 +83,17 @@ var name = $routeParams.name; var fullName = owner+'/'+name; + // Initiates streaming a build. + var stream = function() { + var convert = new Filter({stream:true,newline:false}); + var term = document.getElementById("term"); + term.innerHTML = ""; + + logs.subscribe(fullName, number, step, function(data){ + term.innerHTML += convert.toHtml(data)+"\n"; + }); + } + // Gets the currently authenticated user users.getCached().then(function(payload){ $scope.user = payload.data; @@ -104,6 +115,7 @@ // do nothing } else if ($scope.task.state === 'running') { // stream the build + stream(); } else { // fetch the logs for the finished build. @@ -154,6 +166,11 @@ if (!event.task || event.task.number !== step) { return; // ignore } + + if (event.task.state === 'running' && $scope.task.state !== 'running') { + stream(); + } + // update the task status $scope.task.state = event.task.state; $scope.task.started_at = event.task.started_at; @@ -163,14 +180,7 @@ $scope.$apply(); }); - // var convert = new Filter({stream:true,newline:false}); - // var term = document.getElementById("term") - // var stdout = document.getElementById("stdout").innerText.split("\n") - // stdout.forEach(function(line, i) { - // setTimeout(function () { - // term.innerHTML += convert.toHtml(line+"\n"); - // }, i*i); - // }); + } angular diff --git a/server/static/scripts/drone.js b/server/static/scripts/drone.js index 9adf491dc..16f51c077 100644 --- a/server/static/scripts/drone.js +++ b/server/static/scripts/drone.js @@ -119,9 +119,10 @@ } - function RouteChange($rootScope, feed) { + function RouteChange($rootScope, feed, logs) { $rootScope.$on('$routeChangeStart', function (event, next) { feed.unsubscribe(); + logs.unsubscribe(); }); $rootScope.$on('$routeChangeSuccess', function (event, current, previous) { diff --git a/server/static/scripts/services/feed.js b/server/static/scripts/services/feed.js index 33920f5b2..d4a209cb2 100644 --- a/server/static/scripts/services/feed.js +++ b/server/static/scripts/services/feed.js @@ -3,25 +3,34 @@ (function () { function FeedService($http, $window) { - var token = localStorage.getItem('access_token'); - var proto = ($window.location.protocol == 'https:' ? 'wss' : 'ws'); - var route = [proto, "://", $window.location.host, '/api/stream/user?access_token=', token].join(''); - var wsCallback = undefined; - var ws = new WebSocket(route); - ws.onmessage = function(event) { - var data = angular.fromJson(event.data); - if (wsCallback != undefined) { - wsCallback(data); - } - }; + var callback, + websocket, + token = localStorage.getItem('access_token'); - this.subscribe = function(callback) { - wsCallback = callback; + this.subscribe = function(_callback) { + callback = _callback; + + var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'), + route = [proto, "://", $window.location.host, '/api/stream/user?access_token=', token].join(''); + + websocket = new WebSocket(route); + websocket.onmessage = function (event) { + if (callback !== undefined) { + callback(angular.fromJson(event.data)); + } + }; + websocket.onclose = function (event) { + console.log('user websocket closed'); + }; }; this.unsubscribe = function() { - wsCallback = undefined; + callback = undefined; + if (websocket !== undefined) { + websocket.close(); + websocket = undefined; + } }; } diff --git a/server/static/scripts/services/logs.js b/server/static/scripts/services/logs.js index 685e57455..177f58818 100644 --- a/server/static/scripts/services/logs.js +++ b/server/static/scripts/services/logs.js @@ -18,6 +18,35 @@ this.get = function(repoName, number, step) { return $http.get('/api/repos/'+repoName+'/logs/'+number+'/'+step); }; + + var callback, + websocket, + token = localStorage.getItem('access_token'); + + this.subscribe = function (repoName, number, step, _callback) { + callback = _callback; + + var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'), + route = [proto, "://", $window.location.host, '/api/stream/logs/', repoName, '/', number, '/', step, '?access_token=', token].join(''); + + websocket = new WebSocket(route); + websocket.onmessage = function (event) { + if (callback !== undefined) { + callback(event.data); + } + }; + websocket.onclose = function (event) { + console.log('logs websocket closed'); + }; + }; + + this.unsubscribe = function () { + callback = undefined; + if (websocket !== undefined) { + websocket.close(); + websocket = undefined; + } + }; } angular diff --git a/server/ws.go b/server/ws.go index 3634b8723..becc934f0 100644 --- a/server/ws.go +++ b/server/ws.go @@ -1,6 +1,9 @@ package server import ( + "fmt" + "net/url" + "strconv" "time" "github.com/drone/drone/common" @@ -9,6 +12,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + "github.com/koding/websocketproxy" ) const ( @@ -96,6 +100,30 @@ func GetEvents(c *gin.Context) { log.Debugf("closed websocket") } +func GetStream(c *gin.Context) { + store := ToDatastore(c) + repo := ToRepo(c) + build, _ := strconv.Atoi(c.Params.ByName("build")) + task, _ := strconv.Atoi(c.Params.ByName("number")) + + agent, err := store.BuildAgent(repo.FullName, build) + if err != nil { + c.Fail(404, err) + return + } + + url_, err := url.Parse("ws://" + agent.Addr) + if err != nil { + c.Fail(500, err) + return + } + url_.Path = fmt.Sprintf("/stream/%s/%v/%v", repo.FullName, build, task) + proxy := websocketproxy.NewProxy(url_) + proxy.ServeHTTP(c.Writer, c.Request) + + log.Debugf("closed websocket") +} + // readWebsocket will block while reading the websocket data func readWebsocket(ws *websocket.Conn) { defer ws.Close()