pre-marshal websocket message to avoid possible race

This commit is contained in:
Brad Rydzewski 2015-05-05 19:46:26 -07:00
parent cecefe6f65
commit fa07d82461
10 changed files with 193 additions and 97 deletions

View file

@ -143,6 +143,7 @@ func main() {
{ {
stream.Use(server.SetRepo()) stream.Use(server.SetRepo())
stream.Use(server.SetPerm()) stream.Use(server.SetPerm())
stream.GET("/:owner/:name", server.GetRepoEvents)
stream.GET("/:owner/:name/:build/:number", server.GetStream) stream.GET("/:owner/:name/:build/:number", server.GetStream)
} }
} }

View file

@ -3,7 +3,6 @@ package builtin
import ( import (
"testing" "testing"
"github.com/drone/drone/common"
"github.com/drone/drone/eventbus" "github.com/drone/drone/eventbus"
. "github.com/franela/goblin" . "github.com/franela/goblin"
) )
@ -34,8 +33,8 @@ func TestBuild(t *testing.T) {
}) })
g.It("Should send", func() { g.It("Should send", func() {
e1 := &eventbus.Event{Repo: &common.Repo{Name: "foo"}} e1 := &eventbus.Event{Name: "foo"}
e2 := &eventbus.Event{Repo: &common.Repo{Name: "bar"}} e2 := &eventbus.Event{Name: "bar"}
c := make(chan *eventbus.Event) c := make(chan *eventbus.Event)
b := New() b := New()
b.Subscribe(c) b.Subscribe(c)

View file

@ -1,11 +1,15 @@
package eventbus package eventbus
import "github.com/drone/drone/common" const (
EventRepo = "repo"
EventUser = "user"
EventAgent = "agent"
)
type Event struct { type Event struct {
Build *common.Build `json:"build,omitempty"` Kind string
Repo *common.Repo `json:"repo,omitempty"` Name string
Task *common.Task `json:"task,omitempty"` Msg []byte
} }
type Bus interface { type Bus interface {

View file

@ -1,6 +1,7 @@
package builtin package builtin
import ( import (
"encoding/json"
"io" "io"
"io/ioutil" "io/ioutil"
@ -31,9 +32,16 @@ func (u *updater) SetBuild(r *common.Repo, b *common.Build) error {
if err != nil { if err != nil {
return err return err
} }
msg, err := json.Marshal(b)
if err != nil {
return err
}
u.bus.Send(&eventbus.Event{ u.bus.Send(&eventbus.Event{
Repo: r, Name: r.FullName,
Build: b, Kind: eventbus.EventRepo,
Msg: msg,
}) })
return nil return nil
} }
@ -43,10 +51,16 @@ func (u *updater) SetTask(r *common.Repo, b *common.Build, t *common.Task) error
if err != nil { if err != nil {
return err return err
} }
msg, err := json.Marshal(b)
if err != nil {
return err
}
u.bus.Send(&eventbus.Event{ u.bus.Send(&eventbus.Event{
Repo: r, Name: r.FullName,
Build: b, Kind: eventbus.EventRepo,
Task: t, Msg: msg,
}) })
return nil return nil
} }

View file

@ -1,6 +1,7 @@
package server package server
import ( import (
"encoding/json"
"io" "io"
"io/ioutil" "io/ioutil"
"net" "net"
@ -108,9 +109,16 @@ func PushBuild(c *gin.Context) {
} }
// END FIXME --> // END FIXME -->
msg, err := json.Marshal(build)
if err == nil {
c.String(200, err.Error()) // we can ignore this error
return
}
bus.Send(&eventbus.Event{ bus.Send(&eventbus.Event{
Build: build, Name: repo.FullName,
Repo: repo, Kind: eventbus.EventRepo,
Msg: msg,
}) })
c.Writer.WriteHeader(200) c.Writer.WriteHeader(200)
@ -136,11 +144,19 @@ func PushTask(c *gin.Context) {
c.Fail(404, err) c.Fail(404, err)
return return
} }
msg, err := json.Marshal(build)
if err == nil {
c.String(200, err.Error()) // we can ignore this error
return
}
bus.Send(&eventbus.Event{ bus.Send(&eventbus.Event{
Build: build, Name: repo.FullName,
Repo: repo, Kind: eventbus.EventRepo,
Task: in, Msg: msg,
}) })
c.Writer.WriteHeader(200) c.Writer.WriteHeader(200)
} }

View file

@ -4,7 +4,7 @@
* BuildsCtrl responsible for rendering the repo's * BuildsCtrl responsible for rendering the repo's
* recent build history. * recent build history.
*/ */
function BuildsCtrl($scope, $routeParams, builds, repos, users, feed, logs) { function BuildsCtrl($scope, $routeParams, builds, repos, users, logs) {
var owner = $routeParams.owner; var owner = $routeParams.owner;
var name = $routeParams.name; var name = $routeParams.name;
@ -41,32 +41,21 @@
}); });
} }
feed.subscribe(function(event) { repo.subscribe(fullName, function(event) {
if (event.repo.full_name !== fullName) {
return; // ignore
}
// update repository
$scope.repo = event.repo;
$scope.$apply();
var added = false; var added = false;
for (var i=0;i<$scope.builds.length;i++) { for (var i=0;i<$scope.builds.length;i++) {
var build = $scope.builds[i]; var build = $scope.builds[i];
if (event.build.number !== build.number) { if (event.number !== build.number) {
continue; // ignore continue; // ignore
} }
// update the build status // update the build status
build.state = event.build.state; $scope.builds[i] = event;
build.started_at = event.build.started_at;
build.finished_at = event.build.finished_at;
build.duration = event.build.duration;
$scope.builds[i] = build;
$scope.$apply(); $scope.$apply();
added = true; added = true;
} }
if (!added) { if (!added) {
$scope.builds.push(event.build); $scope.builds.push(event);
$scope.$apply(); $scope.$apply();
} }
}); });
@ -75,17 +64,23 @@
/** /**
* BuildCtrl responsible for rendering a build. * BuildCtrl responsible for rendering a build.
*/ */
function BuildCtrl($scope, $routeParams, $window, logs, builds, repos, users, feed) { function BuildCtrl($scope, $routeParams, $window, logs, builds, repos, users) {
var step = parseInt($routeParams.step) || 1; var step = parseInt($routeParams.step) || 1;
var number = $routeParams.number; var number = $routeParams.number;
var owner = $routeParams.owner; var owner = $routeParams.owner;
var name = $routeParams.name; var name = $routeParams.name;
var fullName = owner+'/'+name; var fullName = owner+'/'+name;
var streaming = false;
var tail = false; var tail = false;
// Initiates streaming a build. // Initiates streaming a build.
var stream = function() { var stream = function() {
if (streaming) {
return;
}
streaming = true;
var convert = new Filter({stream:true,newline:false}); var convert = new Filter({stream:true,newline:false});
var term = document.getElementById("term"); var term = document.getElementById("term");
term.innerHTML = ""; term.innerHTML = "";
@ -159,35 +154,22 @@
tail = !tail; tail = !tail;
}; };
feed.subscribe(function(event) { repos.subscribe(fullName, function(event) {
if (event.repo.full_name !== fullName) { if (event.number !== parseInt(number)) {
return; // ignore return; // ignore
} }
if (event.build.number !== parseInt(number)) { // update the build
return; // ignore $scope.build = event;
} $scope.task = event.tasks[step-1];
// update the build status
$scope.build.state = event.build.state;
$scope.build.started_at = event.build.started_at;
$scope.build.finished_at = event.build.finished_at;
$scope.build.duration = event.build.duration;
$scope.$apply(); $scope.$apply();
if (!event.task || event.task.number !== step) { // start streaming the current build
return; // ignore if ($scope.task.state === 'running') {
}
if (event.task.state === 'running' && $scope.task.state !== 'running') {
stream(); stream();
} else {
// resets our streaming state
streaming = false;
} }
// update the task status
$scope.task.state = event.task.state;
$scope.task.started_at = event.task.started_at;
$scope.task.finished_at = event.task.finished_at;
$scope.task.duration = event.task.duration;
$scope.task.exit_code = event.task.exit_code;
$scope.$apply();
}); });

View file

@ -19,18 +19,18 @@
$scope.error = err; $scope.error = err;
}); });
feed.subscribe(function(event) { // feed.subscribe(function(event) {
if (!$scope.repos) { // if (!$scope.repos) {
return; // return;
} // }
for (var i=0;i<$scope.repos.length;i++) { // for (var i=0;i<$scope.repos.length;i++) {
if ($scope.repos[i].full_name === event.repo.full_name) { // if ($scope.repos[i].full_name === event.repo.full_name) {
$scope.repos[i]=event.repo; // $scope.repos[i]=event.repo;
$scope.$apply(); // $scope.$apply();
break; // break;
} // }
}; // };
}); // });
} }
/** /**

View file

@ -72,6 +72,41 @@
this.unwatch = function(repoName) { this.unwatch = function(repoName) {
return $http.delete('/api/repos/'+repoName+'/unwatch'); return $http.delete('/api/repos/'+repoName+'/unwatch');
}; };
var callback,
websocket,
token = localStorage.getItem('access_token');
/**
* Subscribes to a live update feed for a repository
*
* @param {string} Name of the repository.
*/
this.subscribe = function(repo, _callback) {
callback = _callback;
var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'),
route = [proto, "://", $window.location.host, '/api/stream/logs/'+ repo +'?access_token=', token].join('');
websocket = new WebSocket(route);
websocket.onmessage = function (event) {
if (callback !== undefined) {
callback(angular.fromJson(event.data));
}
};
websocket.onclose = function (event) {
console.log('user websocket closed');
};
};
this.unsubscribe = function() {
callback = undefined;
if (websocket !== undefined) {
websocket.close();
websocket = undefined;
}
};
} }
angular angular

View file

@ -8,5 +8,5 @@
</dl> </dl>
<pre> <pre>
docker run drone/drone-agent --addr={{ addr }} --token={{ token }} docker run -d drone/drone-agent --addr={{ addr }} --token={{ token }}
</pre> </pre>

View file

@ -7,7 +7,6 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/drone/drone/common"
"github.com/drone/drone/eventbus" "github.com/drone/drone/eventbus"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
@ -36,19 +35,77 @@ var upgrader = websocket.Upgrader{
// GetEvents will upgrade the connection to a Websocket and will stream // GetEvents will upgrade the connection to a Websocket and will stream
// event updates to the browser. // event updates to the browser.
func GetEvents(c *gin.Context) { func GetEvents(c *gin.Context) {
bus := ToBus(c) // bus := ToBus(c)
user := ToUser(c) // user := ToUser(c)
remote := ToRemote(c) // remote := ToRemote(c)
// TODO (bradrydzewski) revisit this approach at some point.
// //
// instead of constantly checking for remote permissions, we will // // TODO (bradrydzewski) revisit this approach at some point.
// cache them for the lifecycle of this websocket. The pro here is // //
// that we are making way less external calls (good). The con is that // // instead of constantly checking for remote permissions, we will
// if a ton of developers conntect to websockets for long periods of // // cache them for the lifecycle of this websocket. The pro here is
// time with heavy build traffic (not super likely, but possible) this // // that we are making way less external calls (good). The con is that
// caching strategy could take up a lot of memory. // // if a ton of developers conntect to websockets for long periods of
perms_ := map[string]*common.Perm{} // // time with heavy build traffic (not super likely, but possible) this
// // caching strategy could take up a lot of memory.
// perms_ := map[string]*common.Perm{}
//
// // upgrade the websocket
// ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
// if err != nil {
// c.Fail(400, err)
// return
// }
//
// ticker := time.NewTicker(pingPeriod)
// eventc := make(chan *eventbus.Event, 1)
// bus.Subscribe(eventc)
// defer func() {
// bus.Unsubscribe(eventc)
// ticker.Stop()
// ws.Close()
// close(eventc)
// }()
//
// go func() {
// for {
// select {
// case event := <-eventc:
// if event == nil {
// return // why would this ever happen?
// }
// perm, ok := perms_[event.Repo.FullName]
// if !ok {
// perm = perms(remote, user, event.Repo)
// perms_[event.Repo.FullName] = perm
// }
//
// if perm != nil && perm.Pull {
// err := ws.WriteJSON(event)
// if err != nil {
// log.Errorln(err, event)
// }
// }
// case <-ticker.C:
// ws.SetWriteDeadline(time.Now().Add(writeWait))
// err := ws.WriteMessage(websocket.PingMessage, []byte{})
// if err != nil {
// ws.Close()
// log.Debugf("closed websocket")
// return
// }
// }
// }
// }()
//
// readWebsocket(ws)
// log.Debugf("closed websocket")
}
// GetRepoEvents will upgrade the connection to a Websocket and will stream
// event updates to the browser.
func GetRepoEvents(c *gin.Context) {
bus := ToBus(c)
repo := ToRepo(c)
// upgrade the websocket // upgrade the websocket
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
@ -71,20 +128,8 @@ func GetEvents(c *gin.Context) {
for { for {
select { select {
case event := <-eventc: case event := <-eventc:
if event == nil { if event.Kind == eventbus.EventRepo && event.Name == repo.FullName {
return // why would this ever happen? ws.WriteMessage(websocket.TextMessage, event.Msg)
}
perm, ok := perms_[event.Repo.FullName]
if !ok {
perm = perms(remote, user, event.Repo)
perms_[event.Repo.FullName] = perm
}
if perm != nil && perm.Pull {
err := ws.WriteJSON(event)
if err != nil {
log.Errorln(err, event)
}
} }
case <-ticker.C: case <-ticker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait)) ws.SetWriteDeadline(time.Now().Add(writeWait))