diff --git a/cluster/cluster.go b/cluster/cluster.go deleted file mode 100644 index ff2d740e6..000000000 --- a/cluster/cluster.go +++ /dev/null @@ -1,89 +0,0 @@ -package cluster - -import ( - "sync" - - "github.com/samalba/dockerclient" -) - -// TODO (bradrydzewski) ability to cancel work. -// TODO (bradrydzewski) ability to remove a worker. - -type Cluster struct { - sync.Mutex - clients map[dockerclient.Client]bool - clientc chan dockerclient.Client -} - -func New() *Cluster { - return &Cluster{ - clients: make(map[dockerclient.Client]bool), - clientc: make(chan dockerclient.Client, 999), - } -} - -// Allocate allocates a client to the pool to -// be available to accept work. -func (c *Cluster) Allocate(cli dockerclient.Client) bool { - if c.IsAllocated(cli) { - return false - } - - c.Lock() - c.clients[cli] = true - c.Unlock() - - c.clientc <- cli - return true -} - -// IsAllocated is a helper function that returns -// true if the client is currently allocated to -// the Pool. -func (c *Cluster) IsAllocated(cli dockerclient.Client) bool { - c.Lock() - defer c.Unlock() - _, ok := c.clients[cli] - return ok -} - -// Deallocate removes the worker from the pool of -// available clients. If the client is currently -// reserved and performing work it will finish, -// but no longer be given new work. -func (c *Cluster) Deallocate(cli dockerclient.Client) { - c.Lock() - defer c.Unlock() - delete(c.clients, cli) -} - -// List returns a list of all Workers currently -// allocated to the Pool. -func (c *Cluster) List() []dockerclient.Client { - c.Lock() - defer c.Unlock() - - var clients []dockerclient.Client - for cli := range c.clients { - clients = append(clients, cli) - } - return clients -} - -// Reserve reserves the next available worker to -// start doing work. Once work is complete, the -// worker should be released back to the pool. -func (p *Cluster) Reserve() <-chan dockerclient.Client { - return p.clientc -} - -// Release releases the worker back to the pool -// of available workers. -func (c *Cluster) Release(cli dockerclient.Client) bool { - if !c.IsAllocated(cli) { - return false - } - - c.clientc <- cli - return true -} diff --git a/drone.go b/drone.go index 782adb6cc..7083c5ea9 100644 --- a/drone.go +++ b/drone.go @@ -114,6 +114,12 @@ func main() { queue.POST("/push/:owner/:name/:build/:task/logs", server.PushLogs) } + events := api.Group("/stream") + { + events.GET("/user", server.GetEvents) + //events.GET("/logs/:owner/:name/:build/:number") + } + auth := r.Group("/authorize") { auth.Use(server.SetHeaders()) diff --git a/eventbus/bus.go b/eventbus/bus.go index 83dd48bb6..436a15111 100644 --- a/eventbus/bus.go +++ b/eventbus/bus.go @@ -3,9 +3,9 @@ package eventbus import "github.com/drone/drone/common" type Event struct { - Build *common.Build - Repo *common.Repo - Task *common.Task + Build *common.Build `json:"build,omitempty"` + Repo *common.Repo `json:"repo,omitempty"` + Task *common.Task `json:"task,omitempty"` } type Bus interface { diff --git a/server/queue.go b/server/queue.go index c8315e397..e23479a22 100644 --- a/server/queue.go +++ b/server/queue.go @@ -37,6 +37,7 @@ func PushBuild(c *gin.Context) { c.Fail(404, err) return } + build.Duration = in.Duration build.Started = in.Started build.Finished = in.Finished @@ -47,16 +48,33 @@ func PushBuild(c *gin.Context) { return } + if repo.Last == nil || build.Number >= repo.Last.Number { + repo.Last = build + store.SetRepo(repo) + } + + // <-- FIXME + // for some reason the Repo and Build fail to marshal to JSON. + // It has something to do with memory / pointers. So it goes away + // if I just refetch these items. Needs to be fixed in the future, + // but for now should be ok + repo, err = store.Repo(repo.FullName) + if err != nil { + c.Fail(500, err) + return + } + build, err = store.Build(repo.FullName, in.Number) + if err != nil { + c.Fail(404, err) + return + } + // END FIXME --> + bus.Send(&eventbus.Event{ Build: build, Repo: repo, }) - if repo.Last != nil && repo.Last.Number > build.Number { - c.Writer.WriteHeader(200) - return - } - repo.Last = build - store.SetRepo(repo) + c.Writer.WriteHeader(200) } @@ -83,6 +101,7 @@ func PushTask(c *gin.Context) { bus.Send(&eventbus.Event{ Build: build, Repo: repo, + Task: in, }) c.Writer.WriteHeader(200) } diff --git a/server/static/index.html b/server/static/index.html index 87a8fa4dc..4da4f2f68 100644 --- a/server/static/index.html +++ b/server/static/index.html @@ -31,6 +31,7 @@ + diff --git a/server/static/scripts/controllers/builds.js b/server/static/scripts/controllers/builds.js index 2c75c92aa..c504c7340 100644 --- a/server/static/scripts/controllers/builds.js +++ b/server/static/scripts/controllers/builds.js @@ -4,7 +4,7 @@ * BuildsCtrl responsible for rendering the repo's * recent build history. */ - function BuildsCtrl($scope, $routeParams, builds, repos, users) { + function BuildsCtrl($scope, $routeParams, builds, repos, users, feed) { var owner = $routeParams.owner; var name = $routeParams.name; @@ -40,12 +40,31 @@ delete $scope.repo.subscription; }); } + + feed.subscribe(function(event) { + if (event.repo.full_name !== fullName) { + return; // ignore + } + // update repository + $scope.repo = event.repo; + $scope.apply(); + + if (event.build.number !== parseInt(number)) { + return; // ignore + } + // update the build status + $scope.build.state = event.build.state; + $scope.build.started = event.build.started; + $scope.build.finished = event.build.finished; + $scope.build.duration = event.build.duration; + $scope.$apply(); + }); } /** * BuildCtrl responsible for rendering a build. */ - function BuildCtrl($scope, $routeParams, logs, builds, repos, users) { + function BuildCtrl($scope, $routeParams, logs, builds, repos, users, feed) { var step = parseInt($routeParams.step) || 1; var number = $routeParams.number; @@ -99,6 +118,32 @@ }); }; + feed.subscribe(function(event) { + if (event.repo.full_name !== fullName) { + return; // ignore + } + if (event.build.number !== parseInt(number)) { + return; // ignore + } + // update the build status + $scope.build.state = event.build.state; + $scope.build.started = event.build.started; + $scope.build.finished = event.build.finished; + $scope.build.duration = event.build.duration; + $scope.$apply(); + + if (!event.task || event.task.number !== step) { + return; // ignore + } + // update the task status + $scope.task.state = event.task.state; + $scope.task.started = event.task.started; + $scope.task.finished = event.task.finished; + $scope.task.duration = event.task.duration; + $scope.task.exit_code = event.task.exit_code; + $scope.$apply(); + }); + // var convert = new Filter({stream:true,newline:false}); // var term = document.getElementById("term") // var stdout = document.getElementById("stdout").innerText.split("\n") diff --git a/server/static/scripts/controllers/repos.js b/server/static/scripts/controllers/repos.js index cb86a7ae4..0443ceb58 100644 --- a/server/static/scripts/controllers/repos.js +++ b/server/static/scripts/controllers/repos.js @@ -3,14 +3,14 @@ /** * ReposCtrl responsible for rendering the user's * repository home screen. - */ - function ReposCtrl($scope, $routeParams, repos, users) { + */ + function ReposCtrl($scope, $routeParams, repos, users, feed) { - // Gets the currently authenticated user + // Gets the currently authenticated user users.getCached().then(function(payload){ $scope.user = payload.data; }); - + // Gets a list of repos to display in the // dropdown. repos.list().then(function(payload){ @@ -18,12 +18,25 @@ }).catch(function(err){ $scope.error = err; }); + + feed.subscribe(function(event) { + if (!$scope.repos) { + return; + } + for (var i=0;i<$scope.repos.length;i++) { + if ($scope.repos[i].full_name === event.repo.full_name) { + $scope.repos[i]=event.repo; + $scope.$apply(); + break; + } + }; + }); } /** * RepoAddCtrl responsible for activaing a new * repository. - */ + */ function RepoAddCtrl($scope, $location, repos, users) { $scope.add = function(slug) { repos.post(slug).then(function(payload) { @@ -36,17 +49,17 @@ /** * RepoEditCtrl responsible for editing a repository. - */ + */ function RepoEditCtrl($scope, $location, $routeParams, repos, users) { var owner = $routeParams.owner; var name = $routeParams.name; var fullName = owner+'/'+name; - // Gets the currently authenticated user + // Gets the currently authenticated user users.getCached().then(function(payload){ $scope.user = payload.data; }); - + // Gets a repository repos.get(fullName).then(function(payload){ $scope.repo = payload.data; @@ -89,4 +102,4 @@ .controller('ReposCtrl', ReposCtrl) .controller('RepoAddCtrl', RepoAddCtrl) .controller('RepoEditCtrl', RepoEditCtrl); -})(); \ No newline at end of file +})(); diff --git a/server/static/scripts/drone.js b/server/static/scripts/drone.js index 3f9db519e..e139d9ba6 100644 --- a/server/static/scripts/drone.js +++ b/server/static/scripts/drone.js @@ -12,7 +12,7 @@ /** * Bootstraps the application and retrieves the - * token from the + * token from the */ function Authorize() { // First, parse the query string @@ -117,24 +117,23 @@ }); } - // /** - // * - // */ - // function RouteChange($rootScope, stdout, projs) { - // $rootScope.$on('$routeChangeStart', function (event, next) { - // projs.unsubscribe(); - // stdout.unsubscribe(); - // }); - // //$rootScope.$on('$routeChangeSuccess', function (event, current, previous) { - // // document.title = current.$$route.title + ' · drone.io'; - // //}); - // } + function RouteChange($rootScope, feed) { + $rootScope.$on('$routeChangeStart', function (event, next) { + feed.unsubscribe(); + }); + + $rootScope.$on('$routeChangeSuccess', function (event, current, previous) { + if (current.$$route.title) { + document.title = current.$$route.title + ' · drone'; + } + }); + } angular .module('drone') .config(Authorize) - .config(Config); - // .run(RouteChange); + .config(Config) + .run(RouteChange); -})(); \ No newline at end of file +})(); diff --git a/server/static/scripts/services/feed.js b/server/static/scripts/services/feed.js new file mode 100644 index 000000000..33920f5b2 --- /dev/null +++ b/server/static/scripts/services/feed.js @@ -0,0 +1,31 @@ +'use strict'; + +(function () { + + function FeedService($http, $window) { + var token = localStorage.getItem('access_token'); + var proto = ($window.location.protocol == 'https:' ? 'wss' : 'ws'); + var route = [proto, "://", $window.location.host, '/api/stream/user?access_token=', token].join(''); + + var wsCallback = undefined; + var ws = new WebSocket(route); + ws.onmessage = function(event) { + var data = angular.fromJson(event.data); + if (wsCallback != undefined) { + wsCallback(data); + } + }; + + this.subscribe = function(callback) { + wsCallback = callback; + }; + + this.unsubscribe = function() { + wsCallback = undefined; + }; + } + + angular + .module('drone') + .service('feed', FeedService); +})(); diff --git a/server/ws.go b/server/ws.go index 670b5afcd..3634b8723 100644 --- a/server/ws.go +++ b/server/ws.go @@ -3,8 +3,10 @@ package server import ( "time" + "github.com/drone/drone/common" "github.com/drone/drone/eventbus" + log "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) @@ -32,6 +34,16 @@ func GetEvents(c *gin.Context) { user := ToUser(c) remote := ToRemote(c) + // TODO (bradrydzewski) revisit this approach at some point. + // + // instead of constantly checking for remote permissions, we will + // cache them for the lifecycle of this websocket. The pro here is + // that we are making way less external calls (good). The con is that + // if a ton of developers conntect to websockets for long periods of + // time with heavy build traffic (not super likely, but possible) this + // caching strategy could take up a lot of memory. + perms_ := map[string]*common.Perm{} + // upgrade the websocket ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { @@ -56,15 +68,24 @@ func GetEvents(c *gin.Context) { if event == nil { return // why would this ever happen? } - perms := perms(remote, user, event.Repo) - if perms != nil && perms.Pull { - ws.WriteJSON(event) + perm, ok := perms_[event.Repo.FullName] + if !ok { + perm = perms(remote, user, event.Repo) + perms_[event.Repo.FullName] = perm + } + + if perm != nil && perm.Pull { + err := ws.WriteJSON(event) + if err != nil { + log.Errorln(err, event) + } } case <-ticker.C: ws.SetWriteDeadline(time.Now().Add(writeWait)) err := ws.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { ws.Close() + log.Debugf("closed websocket") return } } @@ -72,6 +93,7 @@ func GetEvents(c *gin.Context) { }() readWebsocket(ws) + log.Debugf("closed websocket") } // readWebsocket will block while reading the websocket data