ability to stream logs using websockets

This commit is contained in:
Brad Rydzewski 2016-05-23 14:35:58 -07:00
parent 5c9a22a58b
commit 5456f13e9c
7 changed files with 209 additions and 25 deletions

View file

@ -42,6 +42,28 @@ func NewClientUpdater(client client.Client) UpdateFunc {
}
}
func NewStreamLogger(stream client.StreamWriter, w io.Writer, limit int64) LoggerFunc {
var err error
var size int64
return func(line *build.Line) {
if size > limit {
return
}
// TODO remove this double-serialization
linejson, _ := json.Marshal(line)
w.Write(linejson)
w.Write([]byte{'\n'})
if err = stream.WriteJSON(line); err != nil {
logrus.Errorf("Error streaming build logs. %s", err)
}
size += int64(len(line.Out))
}
}
func NewClientLogger(client client.Client, id int64, rc io.ReadCloser, wc io.WriteCloser, limit int64) LoggerFunc {
var once sync.Once
var size int64

View file

@ -102,6 +102,10 @@ type Client interface {
// Stream streams the build logs to the server.
Stream(int64, io.ReadCloser) error
LogStream(int64) (StreamWriter, error)
LogPost(int64, io.ReadCloser) error
// Wait waits for the job to the complete.
Wait(int64) *Wait

View file

@ -13,17 +13,20 @@ import (
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"github.com/gorilla/websocket"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/oauth2"
)
const (
pathPull = "%s/api/queue/pull/%s/%s"
pathWait = "%s/api/queue/wait/%d"
pathStream = "%s/api/queue/stream/%d"
pathPush = "%s/api/queue/status/%d"
pathPing = "%s/api/queue/ping"
pathPull = "%s/api/queue/pull/%s/%s"
pathWait = "%s/api/queue/wait/%d"
pathStream = "%s/api/queue/stream/%d"
pathPush = "%s/api/queue/status/%d"
pathPing = "%s/api/queue/ping"
pathLogs = "%s/api/queue/logs/%d"
pathLogsAuth = "%s/api/queue/logs/%d?access_token=%s"
pathSelf = "%s/api/user"
pathFeed = "%s/api/user/feed"
@ -48,12 +51,13 @@ const (
type client struct {
client *http.Client
token string // auth token
base string // base url
}
// NewClient returns a client at the specified url.
func NewClient(uri string) Client {
return &client{http.DefaultClient, uri}
return &client{client: http.DefaultClient, base: uri}
}
// NewClientToken returns a client at the specified url that authenticates all
@ -61,7 +65,7 @@ func NewClient(uri string) Client {
func NewClientToken(uri, token string) Client {
config := new(oauth2.Config)
auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token})
return &client{auther, uri}
return &client{client: auther, base: uri, token: token}
}
// NewClientTokenTLS returns a client at the specified url that authenticates
@ -74,7 +78,7 @@ func NewClientTokenTLS(uri, token string, c *tls.Config) Client {
trans.Base = &http.Transport{TLSClientConfig: c}
}
}
return &client{auther, uri}
return &client{client: auther, base: uri, token: token}
}
// Self returns the currently authenticated user.
@ -304,9 +308,42 @@ func (c *client) Ping() error {
func (c *client) Stream(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathStream, c.base, id)
err := c.post(uri, rc, nil)
return err
}
// LogPost sends the full build logs to the server.
func (c *client) LogPost(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathLogs, c.base, id)
return c.post(uri, rc, nil)
}
// StreamWriter implements a special writer for streaming log entries to the
// central Drone server. The standard implementation is the gorilla.Connection.
type StreamWriter interface {
Close() error
WriteJSON(interface{}) error
}
// LogStream streams the build logs to the server.
func (c *client) LogStream(id int64) (StreamWriter, error) {
rawurl := fmt.Sprintf(pathLogsAuth, c.base, id, c.token)
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
if uri.Scheme == "https" {
uri.Scheme = "wss"
} else {
uri.Scheme = "ws"
}
// TODO need TLS client configuration
conn, _, err := websocket.DefaultDialer.Dial(uri.String(), nil)
return conn, err
}
// Wait watches and waits for the build to cancel or finish.
func (c *client) Wait(id int64) *Wait {
ctx, cancel := context.WithCancel(context.Background())

View file

@ -1,7 +1,8 @@
package agent
import (
"io"
"bytes"
"io/ioutil"
"time"
"github.com/Sirupsen/logrus"
@ -40,15 +41,23 @@ func (r *pipeline) run() error {
engine := docker.NewClient(r.docker)
// streaming the logs
rc, wc := io.Pipe()
defer func() {
wc.Close()
rc.Close()
}()
// rc, wc := io.Pipe()
// defer func() {
// wc.Close()
// rc.Close()
// }()
var buf bytes.Buffer
stream, err := r.drone.LogStream(w.Job.ID)
if err != nil {
return err
}
a := agent.Agent{
Update: agent.NewClientUpdater(r.drone),
Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs),
Update: agent.NewClientUpdater(r.drone),
// Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs),
Logger: agent.NewStreamLogger(stream, &buf, r.config.logs),
Engine: engine,
Timeout: r.config.timeout,
Platform: r.config.platform,
@ -70,8 +79,11 @@ func (r *pipeline) run() error {
a.Run(w, cancel)
wc.Close()
rc.Close()
if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil {
logrus.Errorf("Error sending logs for %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
}
stream.Close()
logrus.Infof("Finished build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)

View file

@ -156,6 +156,9 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
queue.POST("/stream/:id", server.Stream)
queue.POST("/status/:id", server.Update)
queue.POST("/ping", server.Ping)
queue.POST("/logs/:id", server.PostLogs)
queue.GET("/logs/:id", server.WriteLogs)
}
// DELETE THESE

View file

@ -3,6 +3,7 @@ package server
import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
@ -15,6 +16,7 @@ import (
"github.com/drone/drone/store"
"github.com/drone/drone/stream"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// Pull is a long request that polls and attemts to pull work off the queue stack.
@ -25,6 +27,12 @@ func Pull(c *gin.Context) {
if w == nil {
logrus.Debugf("Agent %s could not pull work.", c.ClientIP())
} else {
// setup the channel to stream logs
if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil {
logrus.Errorf("Unable to create stream. %s", err)
}
c.JSON(202, w)
logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d",
@ -63,7 +71,7 @@ func Wait(c *gin.Context) {
}
}
// Update handles build updates from the agent and persists to the database.
// Update handles build updates from the agent and persists to the database.
func Update(c *gin.Context) {
work := &queue.Work{}
if err := c.BindJSON(work); err != nil {
@ -99,12 +107,12 @@ func Update(c *gin.Context) {
store.UpdateBuild(c, build)
}
if job.Status == model.StatusRunning {
err := stream.Create(c, stream.ToKey(job.ID))
if err != nil {
logrus.Errorf("Unable to create stream. %s", err)
}
}
// if job.Status == model.StatusRunning {
// err := stream.Create(c, stream.ToKey(job.ID))
// if err != nil {
// logrus.Errorf("Unable to create stream. %s", err)
// }
// }
ok, err := store.UpdateBuildJob(c, build, job)
if err != nil {
@ -199,3 +207,98 @@ func Ping(c *gin.Context) {
}
c.String(200, "PONG")
}
//
//
// Below are alternate implementations for the Queue that use websockets.
//
//
// PostLogs handles an http request from the agent to post build logs. These
// logs are posted at the end of the build process.
func PostLogs(c *gin.Context) {
id, _ := strconv.ParseInt(c.Param("id"), 10, 64)
job, err := store.GetJob(c, id)
if err != nil {
c.String(404, "Cannot upload logs. %s", err)
return
}
if err := store.WriteLog(c, job, c.Request.Body); err != nil {
c.String(500, "Cannot persist logs", err)
return
}
c.String(200, "")
}
// WriteLogs handles an http request from the agent to stream build logs from
// the agent to the server to enable real time streamings to the client.
func WriteLogs(c *gin.Context) {
id, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
c.String(500, "Invalid input. %s", err)
return
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.String(500, "Cannot upgrade to websocket. %s", err)
return
}
defer conn.Close()
wc, err := stream.Writer(c, stream.ToKey(id))
if err != nil {
c.String(500, "Cannot create stream writer. %s", err)
return
}
defer func() {
wc.Close()
stream.Delete(c, stream.ToKey(id))
}()
var msg []byte
for {
_, msg, err = conn.ReadMessage()
if err != nil {
break
}
wc.Write(msg)
wc.Write(newline)
}
if err != nil && err != io.EOF {
c.String(500, "Error reading logs. %s", err)
return
}
//
// rc, err := stream.Reader(c, stream.ToKey(id))
// if err != nil {
// c.String(500, "Failed to create stream reader. %s", err)
// return
// }
//
// wg := sync.WaitGroup{}
// wg.Add(1)
//
// go func() {
// defer recover()
// store.WriteLog(c, &model.Job{ID: id}, rc)
// wg.Done()
// }()
//
// wc.Close()
// wg.Wait()
}
// newline defines a newline constant to separate lines in the build output
var newline = []byte{'\n'}
// upgrader defines the default behavior for upgrading the websocket.
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

View file

@ -11,6 +11,9 @@ func Clone(c *yaml.Config, plugin string) error {
return nil
}
}
if plugin == "" {
plugin = "git"
}
s := &yaml.Container{
Image: plugin,