diff --git a/drone.go b/drone.go index 98fb1e7fe..66e364a64 100644 --- a/drone.go +++ b/drone.go @@ -143,6 +143,7 @@ func main() { { stream.Use(server.SetRepo()) stream.Use(server.SetPerm()) + stream.GET("/:owner/:name", server.GetRepoEvents) stream.GET("/:owner/:name/:build/:number", server.GetStream) } } diff --git a/eventbus/builtin/bus_test.go b/eventbus/builtin/bus_test.go index 1ce215842..7e67b6643 100644 --- a/eventbus/builtin/bus_test.go +++ b/eventbus/builtin/bus_test.go @@ -3,7 +3,6 @@ package builtin import ( "testing" - "github.com/drone/drone/common" "github.com/drone/drone/eventbus" . "github.com/franela/goblin" ) @@ -34,8 +33,8 @@ func TestBuild(t *testing.T) { }) g.It("Should send", func() { - e1 := &eventbus.Event{Repo: &common.Repo{Name: "foo"}} - e2 := &eventbus.Event{Repo: &common.Repo{Name: "bar"}} + e1 := &eventbus.Event{Name: "foo"} + e2 := &eventbus.Event{Name: "bar"} c := make(chan *eventbus.Event) b := New() b.Subscribe(c) diff --git a/eventbus/bus.go b/eventbus/bus.go index 436a15111..7e61ddf56 100644 --- a/eventbus/bus.go +++ b/eventbus/bus.go @@ -1,11 +1,15 @@ package eventbus -import "github.com/drone/drone/common" +const ( + EventRepo = "repo" + EventUser = "user" + EventAgent = "agent" +) type Event struct { - Build *common.Build `json:"build,omitempty"` - Repo *common.Repo `json:"repo,omitempty"` - Task *common.Task `json:"task,omitempty"` + Kind string + Name string + Msg []byte } type Bus interface { diff --git a/runner/builtin/updater.go b/runner/builtin/updater.go index 776ed9d24..5dbd38448 100644 --- a/runner/builtin/updater.go +++ b/runner/builtin/updater.go @@ -1,6 +1,7 @@ package builtin import ( + "encoding/json" "io" "io/ioutil" @@ -31,9 +32,16 @@ func (u *updater) SetBuild(r *common.Repo, b *common.Build) error { if err != nil { return err } + + msg, err := json.Marshal(b) + if err != nil { + return err + } + u.bus.Send(&eventbus.Event{ - Repo: r, - Build: b, + Name: r.FullName, + Kind: eventbus.EventRepo, + Msg: msg, }) return nil } @@ -43,10 +51,16 @@ func (u *updater) SetTask(r *common.Repo, b *common.Build, t *common.Task) error if err != nil { return err } + + msg, err := json.Marshal(b) + if err != nil { + return err + } + u.bus.Send(&eventbus.Event{ - Repo: r, - Build: b, - Task: t, + Name: r.FullName, + Kind: eventbus.EventRepo, + Msg: msg, }) return nil } diff --git a/server/queue.go b/server/queue.go index 0251298e2..284d3245f 100644 --- a/server/queue.go +++ b/server/queue.go @@ -1,6 +1,7 @@ package server import ( + "encoding/json" "io" "io/ioutil" "net" @@ -108,9 +109,16 @@ func PushBuild(c *gin.Context) { } // END FIXME --> + msg, err := json.Marshal(build) + if err == nil { + c.String(200, err.Error()) // we can ignore this error + return + } + bus.Send(&eventbus.Event{ - Build: build, - Repo: repo, + Name: repo.FullName, + Kind: eventbus.EventRepo, + Msg: msg, }) c.Writer.WriteHeader(200) @@ -136,11 +144,19 @@ func PushTask(c *gin.Context) { c.Fail(404, err) return } + + msg, err := json.Marshal(build) + if err == nil { + c.String(200, err.Error()) // we can ignore this error + return + } + bus.Send(&eventbus.Event{ - Build: build, - Repo: repo, - Task: in, + Name: repo.FullName, + Kind: eventbus.EventRepo, + Msg: msg, }) + c.Writer.WriteHeader(200) } diff --git a/server/static/scripts/controllers/builds.js b/server/static/scripts/controllers/builds.js index 1c0707ddf..be1085103 100644 --- a/server/static/scripts/controllers/builds.js +++ b/server/static/scripts/controllers/builds.js @@ -4,7 +4,7 @@ * BuildsCtrl responsible for rendering the repo's * recent build history. */ - function BuildsCtrl($scope, $routeParams, builds, repos, users, feed, logs) { + function BuildsCtrl($scope, $routeParams, builds, repos, users, logs) { var owner = $routeParams.owner; var name = $routeParams.name; @@ -41,32 +41,21 @@ }); } - feed.subscribe(function(event) { - if (event.repo.full_name !== fullName) { - return; // ignore - } - // update repository - $scope.repo = event.repo; - $scope.$apply(); - + repo.subscribe(fullName, function(event) { var added = false; for (var i=0;i<$scope.builds.length;i++) { var build = $scope.builds[i]; - if (event.build.number !== build.number) { + if (event.number !== build.number) { continue; // ignore } // update the build status - build.state = event.build.state; - build.started_at = event.build.started_at; - build.finished_at = event.build.finished_at; - build.duration = event.build.duration; - $scope.builds[i] = build; + $scope.builds[i] = event; $scope.$apply(); added = true; } if (!added) { - $scope.builds.push(event.build); + $scope.builds.push(event); $scope.$apply(); } }); @@ -75,17 +64,23 @@ /** * BuildCtrl responsible for rendering a build. */ - function BuildCtrl($scope, $routeParams, $window, logs, builds, repos, users, feed) { + function BuildCtrl($scope, $routeParams, $window, logs, builds, repos, users) { var step = parseInt($routeParams.step) || 1; var number = $routeParams.number; var owner = $routeParams.owner; var name = $routeParams.name; var fullName = owner+'/'+name; + var streaming = false; var tail = false; // Initiates streaming a build. var stream = function() { + if (streaming) { + return; + } + streaming = true; + var convert = new Filter({stream:true,newline:false}); var term = document.getElementById("term"); term.innerHTML = ""; @@ -159,35 +154,22 @@ tail = !tail; }; - feed.subscribe(function(event) { - if (event.repo.full_name !== fullName) { + repos.subscribe(fullName, function(event) { + if (event.number !== parseInt(number)) { return; // ignore } - if (event.build.number !== parseInt(number)) { - return; // ignore - } - // update the build status - $scope.build.state = event.build.state; - $scope.build.started_at = event.build.started_at; - $scope.build.finished_at = event.build.finished_at; - $scope.build.duration = event.build.duration; + // update the build + $scope.build = event; + $scope.task = event.tasks[step-1]; $scope.$apply(); - if (!event.task || event.task.number !== step) { - return; // ignore - } - - if (event.task.state === 'running' && $scope.task.state !== 'running') { + // start streaming the current build + if ($scope.task.state === 'running') { stream(); + } else { + // resets our streaming state + streaming = false; } - - // update the task status - $scope.task.state = event.task.state; - $scope.task.started_at = event.task.started_at; - $scope.task.finished_at = event.task.finished_at; - $scope.task.duration = event.task.duration; - $scope.task.exit_code = event.task.exit_code; - $scope.$apply(); }); diff --git a/server/static/scripts/controllers/repos.js b/server/static/scripts/controllers/repos.js index 0443ceb58..037e94eaf 100644 --- a/server/static/scripts/controllers/repos.js +++ b/server/static/scripts/controllers/repos.js @@ -19,18 +19,18 @@ $scope.error = err; }); - feed.subscribe(function(event) { - if (!$scope.repos) { - return; - } - for (var i=0;i<$scope.repos.length;i++) { - if ($scope.repos[i].full_name === event.repo.full_name) { - $scope.repos[i]=event.repo; - $scope.$apply(); - break; - } - }; - }); + // feed.subscribe(function(event) { + // if (!$scope.repos) { + // return; + // } + // for (var i=0;i<$scope.repos.length;i++) { + // if ($scope.repos[i].full_name === event.repo.full_name) { + // $scope.repos[i]=event.repo; + // $scope.$apply(); + // break; + // } + // }; + // }); } /** diff --git a/server/static/scripts/services/repos.js b/server/static/scripts/services/repos.js index 4927ef676..0c9ce7135 100644 --- a/server/static/scripts/services/repos.js +++ b/server/static/scripts/services/repos.js @@ -72,6 +72,41 @@ this.unwatch = function(repoName) { return $http.delete('/api/repos/'+repoName+'/unwatch'); }; + + + var callback, + websocket, + token = localStorage.getItem('access_token'); + + /** + * Subscribes to a live update feed for a repository + * + * @param {string} Name of the repository. + */ + this.subscribe = function(repo, _callback) { + callback = _callback; + + var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'), + route = [proto, "://", $window.location.host, '/api/stream/logs/'+ repo +'?access_token=', token].join(''); + + websocket = new WebSocket(route); + websocket.onmessage = function (event) { + if (callback !== undefined) { + callback(angular.fromJson(event.data)); + } + }; + websocket.onclose = function (event) { + console.log('user websocket closed'); + }; + }; + + this.unsubscribe = function() { + callback = undefined; + if (websocket !== undefined) { + websocket.close(); + websocket = undefined; + } + }; } angular diff --git a/server/static/scripts/views/agents.html b/server/static/scripts/views/agents.html index 7bc572e88..144813c88 100644 --- a/server/static/scripts/views/agents.html +++ b/server/static/scripts/views/agents.html @@ -8,5 +8,5 @@
-docker run drone/drone-agent --addr={{ addr }} --token={{ token }}
+docker run -d drone/drone-agent --addr={{ addr }} --token={{ token }}
 
diff --git a/server/ws.go b/server/ws.go index ccbbe2cba..6c5009e89 100644 --- a/server/ws.go +++ b/server/ws.go @@ -7,7 +7,6 @@ import ( "strconv" "time" - "github.com/drone/drone/common" "github.com/drone/drone/eventbus" log "github.com/Sirupsen/logrus" @@ -36,19 +35,77 @@ var upgrader = websocket.Upgrader{ // GetEvents will upgrade the connection to a Websocket and will stream // event updates to the browser. func GetEvents(c *gin.Context) { - bus := ToBus(c) - user := ToUser(c) - remote := ToRemote(c) - - // TODO (bradrydzewski) revisit this approach at some point. + // bus := ToBus(c) + // user := ToUser(c) + // remote := ToRemote(c) // - // instead of constantly checking for remote permissions, we will - // cache them for the lifecycle of this websocket. The pro here is - // that we are making way less external calls (good). The con is that - // if a ton of developers conntect to websockets for long periods of - // time with heavy build traffic (not super likely, but possible) this - // caching strategy could take up a lot of memory. - perms_ := map[string]*common.Perm{} + // // TODO (bradrydzewski) revisit this approach at some point. + // // + // // instead of constantly checking for remote permissions, we will + // // cache them for the lifecycle of this websocket. The pro here is + // // that we are making way less external calls (good). The con is that + // // if a ton of developers conntect to websockets for long periods of + // // time with heavy build traffic (not super likely, but possible) this + // // caching strategy could take up a lot of memory. + // perms_ := map[string]*common.Perm{} + // + // // upgrade the websocket + // ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) + // if err != nil { + // c.Fail(400, err) + // return + // } + // + // ticker := time.NewTicker(pingPeriod) + // eventc := make(chan *eventbus.Event, 1) + // bus.Subscribe(eventc) + // defer func() { + // bus.Unsubscribe(eventc) + // ticker.Stop() + // ws.Close() + // close(eventc) + // }() + // + // go func() { + // for { + // select { + // case event := <-eventc: + // if event == nil { + // return // why would this ever happen? + // } + // perm, ok := perms_[event.Repo.FullName] + // if !ok { + // perm = perms(remote, user, event.Repo) + // perms_[event.Repo.FullName] = perm + // } + // + // if perm != nil && perm.Pull { + // err := ws.WriteJSON(event) + // if err != nil { + // log.Errorln(err, event) + // } + // } + // case <-ticker.C: + // ws.SetWriteDeadline(time.Now().Add(writeWait)) + // err := ws.WriteMessage(websocket.PingMessage, []byte{}) + // if err != nil { + // ws.Close() + // log.Debugf("closed websocket") + // return + // } + // } + // } + // }() + // + // readWebsocket(ws) + // log.Debugf("closed websocket") +} + +// GetRepoEvents will upgrade the connection to a Websocket and will stream +// event updates to the browser. +func GetRepoEvents(c *gin.Context) { + bus := ToBus(c) + repo := ToRepo(c) // upgrade the websocket ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) @@ -71,20 +128,8 @@ func GetEvents(c *gin.Context) { for { select { case event := <-eventc: - if event == nil { - return // why would this ever happen? - } - perm, ok := perms_[event.Repo.FullName] - if !ok { - perm = perms(remote, user, event.Repo) - perms_[event.Repo.FullName] = perm - } - - if perm != nil && perm.Pull { - err := ws.WriteJSON(event) - if err != nil { - log.Errorln(err, event) - } + if event.Kind == eventbus.EventRepo && event.Name == repo.FullName { + ws.WriteMessage(websocket.TextMessage, event.Msg) } case <-ticker.C: ws.SetWriteDeadline(time.Now().Add(writeWait))