implement cancel function

This commit is contained in:
Brad Rydzewski 2017-03-05 22:05:16 +11:00
parent 50baeaa41e
commit bb7453262a
7 changed files with 95 additions and 59 deletions

View file

@ -103,10 +103,9 @@ func run(ctx context.Context, client rpc.Peer) error {
cancelled := abool.New() cancelled := abool.New()
go func() { go func() {
ok, _ := client.Notify(ctx, work.ID) if err := client.Wait(ctx, work.ID); err != nil {
if ok {
cancelled.SetTo(true) cancelled.SetTo(true)
log.Printf("pipeline: cancel signal received: %s", work.ID) log.Printf("pipeline: cancel signal received: %s: %s", work.ID, err)
cancel() cancel()
} else { } else {
log.Printf("pipeline: cancel channel closed: %s", work.ID) log.Printf("pipeline: cancel channel closed: %s", work.ID)
@ -133,16 +132,19 @@ func run(ctx context.Context, client rpc.Peer) error {
log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err) log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err)
} }
var uploads sync.WaitGroup
defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error { defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error {
part, rerr := rc.NextPart() part, rerr := rc.NextPart()
if rerr != nil { if rerr != nil {
return rerr return rerr
} }
uploads.Add(1)
writer := rpc.NewLineWriter(client, work.ID, proc.Alias) writer := rpc.NewLineWriter(client, work.ID, proc.Alias)
io.Copy(writer, part) io.Copy(writer, part)
defer func() { defer func() {
log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias) log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias)
uploads.Done()
}() }()
part, rerr = rc.NextPart() part, rerr = rc.NextPart()
@ -174,7 +176,7 @@ func run(ctx context.Context, client rpc.Peer) error {
state.ExitCode = xerr.Code state.ExitCode = xerr.Code
} }
if cancelled.IsSet() { if cancelled.IsSet() {
state.ExitCode = 130 state.ExitCode = 137
} else if state.ExitCode == 0 { } else if state.ExitCode == 0 {
state.ExitCode = 1 state.ExitCode = 1
} }
@ -182,6 +184,7 @@ func run(ctx context.Context, client rpc.Peer) error {
log.Printf("pipeline: execution complete: %s", work.ID) log.Printf("pipeline: execution complete: %s", work.ID)
uploads.Wait()
err = client.Update(context.Background(), work.ID, state) err = client.Update(context.Background(), work.ID, state)
if err != nil { if err != nil {
log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err) log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err)

View file

@ -2,12 +2,16 @@ package server
import ( import (
"bufio" "bufio"
"context"
"fmt"
"io" "io"
"net/http" "net/http"
"os"
"strconv" "strconv"
"time" "time"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/cncd/queue"
"github.com/drone/drone/remote" "github.com/drone/drone/remote"
"github.com/drone/drone/shared/httputil" "github.com/drone/drone/shared/httputil"
"github.com/drone/drone/store" "github.com/drone/drone/store"
@ -149,6 +153,7 @@ func DeleteBuild(c *gin.Context) {
job.ExitCode = 137 job.ExitCode = 137
store.UpdateBuildJob(c, build, job) store.UpdateBuildJob(c, build, job)
if os.Getenv("DRONE_CANARY") == "" {
client := stomp.MustFromContext(c) client := stomp.MustFromContext(c)
client.SendJSON("/topic/cancel", model.Event{ client.SendJSON("/topic/cancel", model.Event{
Type: model.Cancelled, Type: model.Cancelled,
@ -156,7 +161,9 @@ func DeleteBuild(c *gin.Context) {
Build: *build, Build: *build,
Job: *job, Job: *job,
}, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10))) }, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10)))
} else {
config.queue.Error(context.Background(), fmt.Sprint(job.ID), queue.ErrCancel)
}
c.String(204, "") c.String(204, "")
} }
@ -197,8 +204,8 @@ func PostBuild(c *gin.Context) {
} }
// fetch the .drone.yml file from the database // fetch the .drone.yml file from the database
config := ToConfig(c) cfg := ToConfig(c)
raw, err := remote_.File(user, repo, build, config.Yaml) raw, err := remote_.File(user, repo, build, cfg.Yaml)
if err != nil { if err != nil {
log.Errorf("failure to get build config for %s. %s", repo.FullName, err) log.Errorf("failure to get build config for %s. %s", repo.FullName, err)
c.AbortWithError(404, err) c.AbortWithError(404, err)
@ -206,7 +213,7 @@ func PostBuild(c *gin.Context) {
} }
// Fetch secrets file but don't exit on error as it's optional // Fetch secrets file but don't exit on error as it's optional
sec, err := remote_.File(user, repo, build, config.Shasum) sec, err := remote_.File(user, repo, build, cfg.Shasum)
if err != nil { if err != nil {
log.Debugf("cannot find build secrets for %s. %s", repo.FullName, err) log.Debugf("cannot find build secrets for %s. %s", repo.FullName, err)
} }

View file

@ -91,10 +91,9 @@ func (s *RPC) Next(c context.Context) (*rpc.Pipeline, error) {
return pipeline, err return pipeline, err
} }
// Notify implements the rpc.Notify function // Wait implements the rpc.Wait function
func (s *RPC) Notify(c context.Context, id string) (bool, error) { func (s *RPC) Wait(c context.Context, id string) error {
err := s.queue.Wait(c, id) return s.queue.Wait(c, id)
return (err == queue.ErrCancel), nil
} }
// Extend implements the rpc.Extend function // Extend implements the rpc.Extend function
@ -211,6 +210,9 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
// Save implements the rpc.Save function // Save implements the rpc.Save function
func (s *RPC) Save(c context.Context, id, mime string, file io.Reader) error { return nil } func (s *RPC) Save(c context.Context, id, mime string, file io.Reader) error { return nil }
// Done implements the rpc.Done function
func (s *RPC) Done(c context.Context, id string) error { return nil }
// Log implements the rpc.Log function // Log implements the rpc.Log function
func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error {
entry := new(logging.Entry) entry := new(logging.Entry)

View file

@ -17,11 +17,12 @@ import (
const ( const (
methodNext = "next" methodNext = "next"
methodNotify = "notify" methodWait = "wait"
methodDone = "done"
methodExtend = "extend" methodExtend = "extend"
methodUpdate = "update" methodUpdate = "update"
methodLog = "log"
methodSave = "save" methodSave = "save"
methodLog = "log"
) )
type ( type (
@ -80,11 +81,18 @@ func (t *Client) Next(c context.Context) (*Pipeline, error) {
return res, err return res, err
} }
// Notify returns true if the pipeline should be cancelled. // Wait blocks until the pipeline is complete.
func (t *Client) Notify(c context.Context, id string) (bool, error) { func (t *Client) Wait(c context.Context, id string) error {
out := false // err := t.call(c, methodWait, id, nil)
err := t.call(c, methodNotify, id, &out) // if err != nil && err.Error() == ErrCancelled.Error() {
return out, err // return ErrCancelled
// }
return t.call(c, methodWait, id, nil)
}
// Done signals the pipeline is complete.
func (t *Client) Done(c context.Context, id string) error {
return t.call(c, methodDone, id, nil)
} }
// Extend extends the pipeline deadline. // Extend extends the pipeline deadline.

View file

@ -7,6 +7,9 @@ import (
"github.com/cncd/pipeline/pipeline/backend" "github.com/cncd/pipeline/pipeline/backend"
) )
// ErrCancelled signals the pipeine is cancelled.
// var ErrCancelled = errors.New("cancelled")
type ( type (
// Filter defines filters for fetching items from the queue. // Filter defines filters for fetching items from the queue.
Filter struct { Filter struct {
@ -36,9 +39,11 @@ type Peer interface {
// Next returns the next pipeline in the queue. // Next returns the next pipeline in the queue.
Next(c context.Context) (*Pipeline, error) Next(c context.Context) (*Pipeline, error)
// Notify returns true if the pipeline should be cancelled. // Wait blocks untilthe pipeline is complete.
// TODO: rename to Done, Wait? Wait(c context.Context, id string) error
Notify(c context.Context, id string) (bool, error)
// Done signals the pipeline is complete.
Done(c context.Context, id string) error
// Extend extends the pipeline deadline // Extend extends the pipeline deadline
Extend(c context.Context, id string) error Extend(c context.Context, id string) error

View file

@ -52,8 +52,10 @@ func (s *Server) router(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.
switch req.Method { switch req.Method {
case methodNext: case methodNext:
return s.next(ctx, req) return s.next(ctx, req)
case methodNotify: case methodWait:
return s.notify(ctx, req) return s.wait(ctx, req)
case methodDone:
return s.done(ctx, req)
case methodExtend: case methodExtend:
return s.extend(ctx, req) return s.extend(ctx, req)
case methodUpdate: case methodUpdate:
@ -73,15 +75,26 @@ func (s *Server) next(ctx context.Context, req *jsonrpc2.Request) (interface{},
return s.peer.Next(ctx) return s.peer.Next(ctx)
} }
// notify unmarshals the rpc request parameters and invokes the peer.Notify // wait unmarshals the rpc request parameters and invokes the peer.Wait
// procedure. The results are retuned and written to the rpc response. // procedure. The results are retuned and written to the rpc response.
func (s *Server) notify(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { func (s *Server) wait(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
var id string var id string
err := json.Unmarshal([]byte(*req.Params), &id) err := json.Unmarshal([]byte(*req.Params), &id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s.peer.Notify(ctx, id) return nil, s.peer.Wait(ctx, id)
}
// done unmarshals the rpc request parameters and invokes the peer.Done
// procedure. The results are retuned and written to the rpc response.
func (s *Server) done(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
var id string
err := json.Unmarshal([]byte(*req.Params), &id)
if err != nil {
return nil, err
}
return nil, s.peer.Done(ctx, id)
} }
// extend unmarshals the rpc request parameters and invokes the peer.Extend // extend unmarshals the rpc request parameters and invokes the peer.Extend
@ -115,8 +128,6 @@ func (s *Server) log(req *jsonrpc2.Request) (interface{}, error) {
return nil, s.peer.Log(noContext, in.ID, in.Line) return nil, s.peer.Log(noContext, in.ID, in.Line)
} }
// save unmarshals the rpc request parameters and invokes the peer.Save
// procedure. The results are retuned and written to the rpc response.
func (s *Server) save(req *jsonrpc2.Request) (interface{}, error) { func (s *Server) save(req *jsonrpc2.Request) (interface{}, error) {
in := new(saveReq) in := new(saveReq)
if err := json.Unmarshal([]byte(*req.Params), in); err != nil { if err := json.Unmarshal([]byte(*req.Params), in); err != nil {

46
vendor/vendor.json vendored
View file

@ -33,68 +33,68 @@
{ {
"checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=", "checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=",
"path": "github.com/cncd/pipeline/pipeline", "path": "github.com/cncd/pipeline/pipeline",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "PSzh0ix/rlMrS/Cl3aH6GHGrJuo=", "checksumSHA1": "PSzh0ix/rlMrS/Cl3aH6GHGrJuo=",
"path": "github.com/cncd/pipeline/pipeline/backend", "path": "github.com/cncd/pipeline/pipeline/backend",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "0CGXRaYwZhJxGIrGhn8WGpkFqPo=", "checksumSHA1": "0CGXRaYwZhJxGIrGhn8WGpkFqPo=",
"path": "github.com/cncd/pipeline/pipeline/backend/docker", "path": "github.com/cncd/pipeline/pipeline/backend/docker",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "uUagpzha5ah/a3RO6IImvzHYFlY=", "checksumSHA1": "uUagpzha5ah/a3RO6IImvzHYFlY=",
"path": "github.com/cncd/pipeline/pipeline/frontend", "path": "github.com/cncd/pipeline/pipeline/frontend",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "O0sulBQAHJeNLg3lO38Cq5uf/eg=", "checksumSHA1": "O0sulBQAHJeNLg3lO38Cq5uf/eg=",
"path": "github.com/cncd/pipeline/pipeline/frontend/yaml", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "e1lZWQdObXCKWqZOGlOeaeERQMc=", "checksumSHA1": "e1lZWQdObXCKWqZOGlOeaeERQMc=",
"path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=", "checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=",
"path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=", "checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=",
"path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=", "checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=",
"path": "github.com/cncd/pipeline/pipeline/interrupt", "path": "github.com/cncd/pipeline/pipeline/interrupt",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "8eTwXZPM/Kp9uE/mnhpWDTiX7nY=", "checksumSHA1": "8eTwXZPM/Kp9uE/mnhpWDTiX7nY=",
"path": "github.com/cncd/pipeline/pipeline/multipart", "path": "github.com/cncd/pipeline/pipeline/multipart",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "5axmtZsHaQ5uE/tuNQZygquNx8U=", "checksumSHA1": "UUmeGDBdpk+UXtexFnNmbWIHgG8=",
"path": "github.com/cncd/pipeline/pipeline/rpc", "path": "github.com/cncd/pipeline/pipeline/rpc",
"revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c",
"revisionTime": "2017-03-04T04:47:59Z" "revisionTime": "2017-03-05T09:53:47Z"
}, },
{ {
"checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=", "checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=",