package handler import ( "log" "net/http" "strconv" "time" "github.com/drone/drone/server/datastore" "github.com/drone/drone/server/pubsub" "github.com/drone/drone/server/worker" "github.com/goji/context" "github.com/gorilla/websocket" "github.com/zenazn/goji/web" ) const ( // Time allowed to write the message to the client. writeWait = 10 * time.Second // Time allowed to read the next pong message from the client. pongWait = 60 * time.Second // Send pings to client with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } // WsUser will upgrade the connection to a Websocket and will stream // all events to the browser pertinent to the authenticated user. If the user // is not authenticated, only public events are streamed. func WsUser(c web.C, w http.ResponseWriter, r *http.Request) { var ctx = context.FromC(c) var user = ToUser(c) // upgrade the websocket ws, err := upgrader.Upgrade(w, r, nil) if err != nil { w.WriteHeader(http.StatusBadRequest) return } // register a channel for global events channel := pubsub.Register(ctx, "_global") sub := channel.Subscribe() ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() sub.Close() ws.Close() }() go func() { for { select { case msg := <-sub.Read(): work, ok := msg.(*worker.Work) if !ok { break } // user must have read access to the repository // in order to pass this message along if role, err := datastore.GetPerm(ctx, user, work.Repo); err != nil || role.Read == false { break } ws.SetWriteDeadline(time.Now().Add(writeWait)) err := ws.WriteJSON(work) if err != nil { ws.Close() return } case <-sub.CloseNotify(): ws.Close() return case <-ticker.C: ws.SetWriteDeadline(time.Now().Add(writeWait)) err := ws.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { ws.Close() return } } } }() readWebsocket(ws) } // WsConsole will upgrade the connection to a Websocket and will stream // the build output to the browser. func WsConsole(c web.C, w http.ResponseWriter, r *http.Request) { var commitID, _ = strconv.Atoi(c.URLParams["id"]) var ctx = context.FromC(c) var user = ToUser(c) commit, err := datastore.GetCommit(ctx, int64(commitID)) if err != nil { w.WriteHeader(http.StatusNotFound) return } repo, err := datastore.GetRepo(ctx, commit.RepoID) if err != nil { w.WriteHeader(http.StatusNotFound) return } role, err := datastore.GetPerm(ctx, user, repo) if err != nil || role.Read == false { w.WriteHeader(http.StatusNotFound) return } // find a channel that we can subscribe to // and listen for stream updates. channel := pubsub.Lookup(ctx, commit.ID) if channel == nil { w.WriteHeader(http.StatusNotFound) return } sub := channel.Subscribe() defer sub.Close() // upgrade the websocket ws, err := upgrader.Upgrade(w, r, nil) if err != nil { w.WriteHeader(http.StatusBadRequest) return } ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() ws.Close() }() go func() { for { select { case msg := <-sub.Read(): data, ok := msg.([]byte) if !ok { break } ws.SetWriteDeadline(time.Now().Add(writeWait)) err := ws.WriteMessage(websocket.TextMessage, data) if err != nil { log.Printf("websocket for commit %d closed. Err: %s\n", commitID, err) ws.Close() return } case <-sub.CloseNotify(): log.Printf("websocket for commit %d closed by client\n", commitID) ws.Close() return case <-ticker.C: ws.SetWriteDeadline(time.Now().Add(writeWait)) err := ws.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { log.Printf("websocket for commit %d closed. Err: %s\n", commitID, err) ws.Close() return } } } }() readWebsocket(ws) } // readWebsocket will block while reading the websocket data func readWebsocket(ws *websocket.Conn) { defer ws.Close() ws.SetReadLimit(512) ws.SetReadDeadline(time.Now().Add(pongWait)) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { _, _, err := ws.ReadMessage() if err != nil { break } } }