websocket output working

This commit is contained in:
Brad Rydzewski 2015-04-29 19:57:43 -07:00
parent 7a75a17535
commit ad80facbbd
13 changed files with 150 additions and 36 deletions

View file

@ -3,7 +3,7 @@ package common
// Agent represents a worker that has connected // Agent represents a worker that has connected
// to the system in order to perform work // to the system in order to perform work
type Agent struct { type Agent struct {
Name string Name string `json:"name"`
Addr string Addr string `json:"addr"`
IsHealthy bool IsHealthy bool `json:"is_healthy"`
} }

View file

@ -59,6 +59,8 @@ func GetHost(r *http.Request) string {
return r.Header.Get("X-Host") return r.Header.Get("X-Host")
case len(r.Header.Get("XFF")) != 0: case len(r.Header.Get("XFF")) != 0:
return r.Header.Get("XFF") return r.Header.Get("XFF")
case len(r.Header.Get("X-Real-IP")) != 0:
return r.Header.Get("X-Real-IP")
default: default:
return "localhost:8080" return "localhost:8080"
} }

View file

@ -23,7 +23,7 @@ var (
bucketRepoParams = []byte("repo_params") bucketRepoParams = []byte("repo_params")
bucketRepoUsers = []byte("repo_users") bucketRepoUsers = []byte("repo_users")
bucketBuild = []byte("build") bucketBuild = []byte("build")
bucketBuildAgent = []byte("build_agent") bucketBuildAgent = []byte("build_agents")
bucketBuildStatus = []byte("build_status") bucketBuildStatus = []byte("build_status")
bucketBuildLogs = []byte("build_logs") bucketBuildLogs = []byte("build_logs")
bucketBuildSeq = []byte("build_seq") bucketBuildSeq = []byte("build_seq")

View file

@ -223,7 +223,7 @@ func (db *DB) SetBuildAgent(repo string, build int, agent *common.Agent) error {
}) })
} }
func (db *DB) DelBuildAgent(repo string, build int, agent *common.Agent) error { func (db *DB) DelBuildAgent(repo string, build int) error {
key := []byte(repo + "/" + strconv.Itoa(build)) key := []byte(repo + "/" + strconv.Itoa(build))
return db.Update(func(t *bolt.Tx) error { return db.Update(func(t *bolt.Tx) error {
return delete(t, bucketBuildAgent, key) return delete(t, bucketBuildAgent, key)

View file

@ -103,7 +103,7 @@ type Datastore interface {
// BuildAgent returns the agent that is being // BuildAgent returns the agent that is being
// used to execute the build. // used to execute the build.
// BuildAgent(string, int) (*common.Agent, error) BuildAgent(string, int) (*common.Agent, error)
// SetBuild inserts or updates a build for the named // SetBuild inserts or updates a build for the named
// repository. The build number is incremented and // repository. The build number is incremented and
@ -126,11 +126,11 @@ type Datastore interface {
// SetBuildAgent insert or updates the agent that is // SetBuildAgent insert or updates the agent that is
// running a build. // running a build.
// SetBuildAgent(string, int, *common.Agent) error SetBuildAgent(string, int, *common.Agent) error
// DelBuildAgent purges the referce to the agent // DelBuildAgent purges the referce to the agent
// that ran a build. // that ran a build.
// DelBuildAgent(string, int, *common.Agent) error DelBuildAgent(string, int) error
// LogReader gets the task logs at index N for // LogReader gets the task logs at index N for
// the named repository and build number. // the named repository and build number.

View file

@ -124,7 +124,13 @@ func main() {
events := api.Group("/stream") events := api.Group("/stream")
{ {
events.GET("/user", server.GetEvents) events.GET("/user", server.GetEvents)
//events.GET("/logs/:owner/:name/:build/:number")
stream := events.Group("/logs")
{
stream.Use(server.SetRepo())
stream.Use(server.SetPerm())
stream.GET("/:owner/:name/:build/:number", server.GetStream)
}
} }
auth := r.Group("/authorize") auth := r.Group("/authorize")

View file

@ -133,7 +133,7 @@ func RunBuild(c *gin.Context) {
// must not restart a running build // must not restart a running build
if build.State == common.StatePending || build.State == common.StateRunning { if build.State == common.StatePending || build.State == common.StateRunning {
c.Fail(409, err) c.AbortWithStatus(409)
return return
} }

View file

@ -3,8 +3,10 @@ package server
import ( import (
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"strconv" "strconv"
log "github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding" "github.com/gin-gonic/gin/binding"
@ -19,12 +21,35 @@ import (
// GET /queue/pull // GET /queue/pull
func PollBuild(c *gin.Context) { func PollBuild(c *gin.Context) {
queue := ToQueue(c) queue := ToQueue(c)
store := ToDatastore(c)
agent := &common.Agent{
Addr: c.Request.RemoteAddr,
}
// extact the host port and name and
// replace with the default agent port (1999)
host, _, err := net.SplitHostPort(agent.Addr)
if err == nil {
agent.Addr = host
}
agent.Addr = net.JoinHostPort(agent.Addr, "1999")
log.Infof("agent connected and polling builds at %s", agent.Addr)
work := queue.PullClose(c.Writer) work := queue.PullClose(c.Writer)
if work == nil { if work == nil {
c.AbortWithStatus(500) c.AbortWithStatus(500)
} else { return
c.JSON(200, work)
} }
// TODO (bradrydzewski) decide how we want to handle a failure here
// still not sure exact behavior we want ...
err = store.SetBuildAgent(work.Repo.FullName, work.Build.Number, agent)
if err != nil {
log.Errorf("error persisting build agent. %s", err)
}
c.JSON(200, work)
} }
// GET /queue/push/:owner/:repo // GET /queue/push/:owner/:repo
@ -42,6 +67,10 @@ func PushBuild(c *gin.Context) {
return return
} }
if in.State != common.StatePending && in.State != common.StateRunning {
store.DelBuildAgent(repo.FullName, build.Number)
}
build.Duration = in.Duration build.Duration = in.Duration
build.Started = in.Started build.Started = in.Started
build.Finished = in.Finished build.Finished = in.Finished

View file

@ -4,7 +4,7 @@
* BuildsCtrl responsible for rendering the repo's * BuildsCtrl responsible for rendering the repo's
* recent build history. * recent build history.
*/ */
function BuildsCtrl($scope, $routeParams, builds, repos, users, feed) { function BuildsCtrl($scope, $routeParams, builds, repos, users, feed, logs) {
var owner = $routeParams.owner; var owner = $routeParams.owner;
var name = $routeParams.name; var name = $routeParams.name;
@ -83,6 +83,17 @@
var name = $routeParams.name; var name = $routeParams.name;
var fullName = owner+'/'+name; var fullName = owner+'/'+name;
// Initiates streaming a build.
var stream = function() {
var convert = new Filter({stream:true,newline:false});
var term = document.getElementById("term");
term.innerHTML = "";
logs.subscribe(fullName, number, step, function(data){
term.innerHTML += convert.toHtml(data)+"\n";
});
}
// Gets the currently authenticated user // Gets the currently authenticated user
users.getCached().then(function(payload){ users.getCached().then(function(payload){
$scope.user = payload.data; $scope.user = payload.data;
@ -104,6 +115,7 @@
// do nothing // do nothing
} else if ($scope.task.state === 'running') { } else if ($scope.task.state === 'running') {
// stream the build // stream the build
stream();
} else { } else {
// fetch the logs for the finished build. // fetch the logs for the finished build.
@ -154,6 +166,11 @@
if (!event.task || event.task.number !== step) { if (!event.task || event.task.number !== step) {
return; // ignore return; // ignore
} }
if (event.task.state === 'running' && $scope.task.state !== 'running') {
stream();
}
// update the task status // update the task status
$scope.task.state = event.task.state; $scope.task.state = event.task.state;
$scope.task.started_at = event.task.started_at; $scope.task.started_at = event.task.started_at;
@ -163,14 +180,7 @@
$scope.$apply(); $scope.$apply();
}); });
// var convert = new Filter({stream:true,newline:false});
// var term = document.getElementById("term")
// var stdout = document.getElementById("stdout").innerText.split("\n")
// stdout.forEach(function(line, i) {
// setTimeout(function () {
// term.innerHTML += convert.toHtml(line+"\n");
// }, i*i);
// });
} }
angular angular

View file

@ -119,9 +119,10 @@
} }
function RouteChange($rootScope, feed) { function RouteChange($rootScope, feed, logs) {
$rootScope.$on('$routeChangeStart', function (event, next) { $rootScope.$on('$routeChangeStart', function (event, next) {
feed.unsubscribe(); feed.unsubscribe();
logs.unsubscribe();
}); });
$rootScope.$on('$routeChangeSuccess', function (event, current, previous) { $rootScope.$on('$routeChangeSuccess', function (event, current, previous) {

View file

@ -3,25 +3,34 @@
(function () { (function () {
function FeedService($http, $window) { function FeedService($http, $window) {
var token = localStorage.getItem('access_token');
var proto = ($window.location.protocol == 'https:' ? 'wss' : 'ws');
var route = [proto, "://", $window.location.host, '/api/stream/user?access_token=', token].join('');
var wsCallback = undefined; var callback,
var ws = new WebSocket(route); websocket,
ws.onmessage = function(event) { token = localStorage.getItem('access_token');
var data = angular.fromJson(event.data);
if (wsCallback != undefined) { this.subscribe = function(_callback) {
wsCallback(data); callback = _callback;
var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'),
route = [proto, "://", $window.location.host, '/api/stream/user?access_token=', token].join('');
websocket = new WebSocket(route);
websocket.onmessage = function (event) {
if (callback !== undefined) {
callback(angular.fromJson(event.data));
} }
}; };
websocket.onclose = function (event) {
this.subscribe = function(callback) { console.log('user websocket closed');
wsCallback = callback; };
}; };
this.unsubscribe = function() { this.unsubscribe = function() {
wsCallback = undefined; callback = undefined;
if (websocket !== undefined) {
websocket.close();
websocket = undefined;
}
}; };
} }

View file

@ -18,6 +18,35 @@
this.get = function(repoName, number, step) { this.get = function(repoName, number, step) {
return $http.get('/api/repos/'+repoName+'/logs/'+number+'/'+step); return $http.get('/api/repos/'+repoName+'/logs/'+number+'/'+step);
}; };
var callback,
websocket,
token = localStorage.getItem('access_token');
this.subscribe = function (repoName, number, step, _callback) {
callback = _callback;
var proto = ($window.location.protocol === 'https:' ? 'wss' : 'ws'),
route = [proto, "://", $window.location.host, '/api/stream/logs/', repoName, '/', number, '/', step, '?access_token=', token].join('');
websocket = new WebSocket(route);
websocket.onmessage = function (event) {
if (callback !== undefined) {
callback(event.data);
}
};
websocket.onclose = function (event) {
console.log('logs websocket closed');
};
};
this.unsubscribe = function () {
callback = undefined;
if (websocket !== undefined) {
websocket.close();
websocket = undefined;
}
};
} }
angular angular

View file

@ -1,6 +1,9 @@
package server package server
import ( import (
"fmt"
"net/url"
"strconv"
"time" "time"
"github.com/drone/drone/common" "github.com/drone/drone/common"
@ -9,6 +12,7 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/koding/websocketproxy"
) )
const ( const (
@ -96,6 +100,30 @@ func GetEvents(c *gin.Context) {
log.Debugf("closed websocket") log.Debugf("closed websocket")
} }
func GetStream(c *gin.Context) {
store := ToDatastore(c)
repo := ToRepo(c)
build, _ := strconv.Atoi(c.Params.ByName("build"))
task, _ := strconv.Atoi(c.Params.ByName("number"))
agent, err := store.BuildAgent(repo.FullName, build)
if err != nil {
c.Fail(404, err)
return
}
url_, err := url.Parse("ws://" + agent.Addr)
if err != nil {
c.Fail(500, err)
return
}
url_.Path = fmt.Sprintf("/stream/%s/%v/%v", repo.FullName, build, task)
proxy := websocketproxy.NewProxy(url_)
proxy.ServeHTTP(c.Writer, c.Request)
log.Debugf("closed websocket")
}
// readWebsocket will block while reading the websocket data // readWebsocket will block while reading the websocket data
func readWebsocket(ws *websocket.Conn) { func readWebsocket(ws *websocket.Conn) {
defer ws.Close() defer ws.Close()