woodpecker/server/handler/ws.go

216 lines
5.1 KiB
Go

package handler
import (
"net/http"
"strconv"
"time"
"github.com/drone/drone/server/database"
"github.com/drone/drone/server/pubsub"
"github.com/drone/drone/server/session"
"github.com/drone/drone/server/worker"
"github.com/drone/drone/shared/model"
"github.com/gorilla/pat"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write the message to the client.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the client.
pongWait = 60 * time.Second
// Send pings to client with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
type WsHandler struct {
pubsub *pubsub.PubSub
commits database.CommitManager
perms database.PermManager
repos database.RepoManager
sess session.Session
}
func NewWsHandler(repos database.RepoManager, commits database.CommitManager, perms database.PermManager, sess session.Session, pubsub *pubsub.PubSub) *WsHandler {
return &WsHandler{pubsub, commits, perms, repos, sess}
}
// WsUser will upgrade the connection to a Websocket and will stream
// all events to the browser pertinent to the authenticated user. If the user
// is not authenticated, only public events are streamed.
func (h *WsHandler) WsUser(w http.ResponseWriter, r *http.Request) error {
// get the user form the session
user := h.sess.UserCookie(r)
// upgrade the websocket
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return badRequest{err}
}
// register a channel for global events
channel := h.pubsub.Register("_global")
sub := channel.Subscribe()
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
sub.Close()
ws.Close()
}()
go func() {
for {
select {
case msg := <-sub.Read():
work, ok := msg.(*worker.Request)
if !ok {
break
}
// user must have read access to the repository
// in order to pass this message along
if ok, _ := h.perms.Member(user, work.Repo); !ok {
break
}
ws.SetWriteDeadline(time.Now().Add(writeWait))
err := ws.WriteJSON(work)
if err != nil {
ws.Close()
return
}
case <-sub.CloseNotify():
ws.Close()
return
case <-ticker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait))
err := ws.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
ws.Close()
return
}
}
}
}()
readWebsocket(ws)
return nil
}
// WsConsole will upgrade the connection to a Websocket and will stream
// the build output to the browser.
func (h *WsHandler) WsConsole(w http.ResponseWriter, r *http.Request) error {
var commitID, _ = strconv.Atoi(r.FormValue(":id"))
commit, err := h.commits.Find(int64(commitID))
if err != nil {
return notFound{err}
}
repo, err := h.repos.Find(commit.RepoID)
if err != nil {
return notFound{err}
}
user := h.sess.UserCookie(r)
if ok, _ := h.perms.Read(user, repo); !ok {
return notFound{err}
}
// find a channel that we can subscribe to
// and listen for stream updates.
channel := h.pubsub.Lookup(commit.ID)
if channel == nil {
return notFound{}
}
sub := channel.Subscribe()
defer sub.Close()
// upgrade the websocket
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return badRequest{err}
}
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
ws.Close()
}()
go func() {
for {
select {
case msg := <-sub.Read():
data, ok := msg.([]byte)
if !ok {
break
}
ws.SetWriteDeadline(time.Now().Add(writeWait))
err := ws.WriteMessage(websocket.TextMessage, data)
if err != nil {
ws.Close()
return
}
case <-sub.CloseNotify():
ws.Close()
return
case <-ticker.C:
ws.SetWriteDeadline(time.Now().Add(writeWait))
err := ws.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
ws.Close()
return
}
}
}
}()
readWebsocket(ws)
return nil
}
// readWebsocket will block while reading the websocket data
func readWebsocket(ws *websocket.Conn) {
defer ws.Close()
ws.SetReadLimit(512)
ws.SetReadDeadline(time.Now().Add(pongWait))
ws.SetPongHandler(func(string) error {
ws.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, _, err := ws.ReadMessage()
if err != nil {
break
}
}
}
// Ping is a method that is being used for internal testing and
// will be removed prior to release
func (h *WsHandler) Ping(w http.ResponseWriter, r *http.Request) error {
channel := h.pubsub.Register("_global")
msg := worker.Request{
Repo: &model.Repo{ID: 1, Private: false, Host: "github.com", Owner: "drone", Name: "drone"},
Commit: &model.Commit{ID: 1, Status: "Started", Branch: "master", Sha: "113f4917ff9174945388d86395f902cd154074cb", Message: "Remove branches by SCM hook", Author: "bradrydzewski", Gravatar: "8c58a0be77ee441bb8f8595b7f1b4e87"},
}
channel.Publish(&msg)
w.WriteHeader(http.StatusOK)
return nil
}
func (h *WsHandler) Register(r *pat.Router) {
r.Post("/ws/ping", errorHandler(h.Ping))
r.Get("/ws/user", errorHandler(h.WsUser))
r.Get("/ws/stdout/{id}", errorHandler(h.WsConsole))
}