re-attempt pushing updates from agent on failure

This commit is contained in:
Brad Rydzewski 2016-04-21 18:05:54 -07:00
parent faf7ff675d
commit 5ca35c4c63
5 changed files with 272 additions and 273 deletions

View file

@ -1,225 +1,22 @@
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/oauth2"
)
const (
pathPull = "%s/api/queue/pull/%s/%s"
pathWait = "%s/api/queue/wait/%d"
pathStream = "%s/api/queue/stream/%d"
pathPush = "%s/api/queue/status/%d"
)
// Client is used to communicate with a Drone server.
type Client interface {
// Pull pulls work from the server queue.
Pull(os, arch string) (*queue.Work, error)
type client struct {
client *http.Client
base string // base url
}
// NewClient returns a client at the specified url.
func NewClient(uri string) Client {
return &client{http.DefaultClient, uri}
}
// NewClientToken returns a client at the specified url that
// authenticates all outbound requests with the given token.
func NewClientToken(uri, token string) Client {
config := new(oauth2.Config)
auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token})
return &client{auther, uri}
}
// Pull pulls work from the server queue.
func (c *client) Pull(os, arch string) (*queue.Work, error) {
out := new(queue.Work)
uri := fmt.Sprintf(pathPull, c.base, os, arch)
err := c.post(uri, nil, out)
return out, err
}
// Push pushes an update to the server.
func (c *client) Push(p *queue.Work) error {
uri := fmt.Sprintf(pathPush, c.base, p.Job.ID)
err := c.post(uri, p, nil)
return err
}
// Stream streams the build logs to the server.
func (c *client) Stream(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathStream, c.base, id)
err := c.post(uri, rc, nil)
return err
}
// Wait watches and waits for the build to cancel or finish.
func (c *client) Wait(id int64) *Wait {
ctx, cancel := context.WithCancel(context.Background())
return &Wait{id, c, ctx, cancel}
}
type Wait struct {
id int64
client *client
ctx context.Context
cancel context.CancelFunc
}
func (w *Wait) Done() (*model.Job, error) {
uri := fmt.Sprintf(pathWait, w.client.base, w.id)
req, err := w.client.createRequest(uri, "POST", nil)
if err != nil {
return nil, err
}
res, err := ctxhttp.Do(w.ctx, w.client.client, req)
if err != nil {
return nil, err
}
defer res.Body.Close()
job := &model.Job{}
err = json.NewDecoder(res.Body).Decode(&job)
if err != nil {
return nil, err
}
return job, nil
}
func (w *Wait) Cancel() {
w.cancel()
}
//
// http request helper functions
//
// helper function for making an http GET request.
func (c *client) get(rawurl string, out interface{}) error {
return c.do(rawurl, "GET", nil, out)
}
// helper function for making an http POST request.
func (c *client) post(rawurl string, in, out interface{}) error {
return c.do(rawurl, "POST", in, out)
}
// helper function for making an http PUT request.
func (c *client) put(rawurl string, in, out interface{}) error {
return c.do(rawurl, "PUT", in, out)
}
// helper function for making an http PATCH request.
func (c *client) patch(rawurl string, in, out interface{}) error {
return c.do(rawurl, "PATCH", in, out)
}
// helper function for making an http DELETE request.
func (c *client) delete(rawurl string) error {
return c.do(rawurl, "DELETE", nil, nil)
}
// helper function to make an http request
func (c *client) do(rawurl, method string, in, out interface{}) error {
// executes the http request and returns the body as
// and io.ReadCloser
body, err := c.open(rawurl, method, in, out)
if err != nil {
return err
}
defer body.Close()
// if a json response is expected, parse and return
// the json response.
if out != nil {
return json.NewDecoder(body).Decode(out)
}
return nil
}
// helper function to open an http request
func (c *client) open(rawurl, method string, in, out interface{}) (io.ReadCloser, error) {
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
// creates a new http request to bitbucket.
req, err := http.NewRequest(method, uri.String(), nil)
if err != nil {
return nil, err
}
// if we are posting or putting data, we need to
// write it to the body of the request.
if in != nil {
rc, ok := in.(io.ReadCloser)
if ok {
req.Body = rc
req.Header.Set("Content-Type", "plain/text")
} else {
inJson, err := json.Marshal(in)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(inJson)
req.Body = ioutil.NopCloser(buf)
req.ContentLength = int64(len(inJson))
req.Header.Set("Content-Length", strconv.Itoa(len(inJson)))
req.Header.Set("Content-Type", "application/json")
}
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode > http.StatusPartialContent {
defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf(string(out))
}
return resp.Body, nil
}
// createRequest is a helper function that builds an http.Request.
func (c *client) createRequest(rawurl, method string, in interface{}) (*http.Request, error) {
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
// if we are posting or putting data, we need to
// write it to the body of the request.
var buf io.ReadWriter
if in != nil {
buf = new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(in)
if err != nil {
return nil, err
}
}
// creates a new http request to bitbucket.
req, err := http.NewRequest(method, uri.String(), buf)
if err != nil {
return nil, err
}
if in != nil {
req.Header.Set("Content-Type", "application/json")
}
return req, nil
// Push pushes an update to the server.
Push(*queue.Work) error
// Stream streams the build logs to the server.
Stream(int64, io.ReadCloser) error
// Wait waits for the job to the complete.
Wait(int64) *Wait
}

225
client/client_impl.go Normal file
View file

@ -0,0 +1,225 @@
package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"github.com/drone/drone/model"
"github.com/drone/drone/queue"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/oauth2"
)
const (
pathPull = "%s/api/queue/pull/%s/%s"
pathWait = "%s/api/queue/wait/%d"
pathStream = "%s/api/queue/stream/%d"
pathPush = "%s/api/queue/status/%d"
)
type client struct {
client *http.Client
base string // base url
}
// NewClient returns a client at the specified url.
func NewClient(uri string) Client {
return &client{http.DefaultClient, uri}
}
// NewClientToken returns a client at the specified url that
// authenticates all outbound requests with the given token.
func NewClientToken(uri, token string) Client {
config := new(oauth2.Config)
auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token})
return &client{auther, uri}
}
// Pull pulls work from the server queue.
func (c *client) Pull(os, arch string) (*queue.Work, error) {
out := new(queue.Work)
uri := fmt.Sprintf(pathPull, c.base, os, arch)
err := c.post(uri, nil, out)
return out, err
}
// Push pushes an update to the server.
func (c *client) Push(p *queue.Work) error {
uri := fmt.Sprintf(pathPush, c.base, p.Job.ID)
err := c.post(uri, p, nil)
return err
}
// Stream streams the build logs to the server.
func (c *client) Stream(id int64, rc io.ReadCloser) error {
uri := fmt.Sprintf(pathStream, c.base, id)
err := c.post(uri, rc, nil)
return err
}
// Wait watches and waits for the build to cancel or finish.
func (c *client) Wait(id int64) *Wait {
ctx, cancel := context.WithCancel(context.Background())
return &Wait{id, c, ctx, cancel}
}
type Wait struct {
id int64
client *client
ctx context.Context
cancel context.CancelFunc
}
func (w *Wait) Done() (*model.Job, error) {
uri := fmt.Sprintf(pathWait, w.client.base, w.id)
req, err := w.client.createRequest(uri, "POST", nil)
if err != nil {
return nil, err
}
res, err := ctxhttp.Do(w.ctx, w.client.client, req)
if err != nil {
return nil, err
}
defer res.Body.Close()
job := &model.Job{}
err = json.NewDecoder(res.Body).Decode(&job)
if err != nil {
return nil, err
}
return job, nil
}
func (w *Wait) Cancel() {
w.cancel()
}
//
// http request helper functions
//
// helper function for making an http GET request.
func (c *client) get(rawurl string, out interface{}) error {
return c.do(rawurl, "GET", nil, out)
}
// helper function for making an http POST request.
func (c *client) post(rawurl string, in, out interface{}) error {
return c.do(rawurl, "POST", in, out)
}
// helper function for making an http PUT request.
func (c *client) put(rawurl string, in, out interface{}) error {
return c.do(rawurl, "PUT", in, out)
}
// helper function for making an http PATCH request.
func (c *client) patch(rawurl string, in, out interface{}) error {
return c.do(rawurl, "PATCH", in, out)
}
// helper function for making an http DELETE request.
func (c *client) delete(rawurl string) error {
return c.do(rawurl, "DELETE", nil, nil)
}
// helper function to make an http request
func (c *client) do(rawurl, method string, in, out interface{}) error {
// executes the http request and returns the body as
// and io.ReadCloser
body, err := c.open(rawurl, method, in, out)
if err != nil {
return err
}
defer body.Close()
// if a json response is expected, parse and return
// the json response.
if out != nil {
return json.NewDecoder(body).Decode(out)
}
return nil
}
// helper function to open an http request
func (c *client) open(rawurl, method string, in, out interface{}) (io.ReadCloser, error) {
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
// creates a new http request to bitbucket.
req, err := http.NewRequest(method, uri.String(), nil)
if err != nil {
return nil, err
}
// if we are posting or putting data, we need to
// write it to the body of the request.
if in != nil {
rc, ok := in.(io.ReadCloser)
if ok {
req.Body = rc
req.Header.Set("Content-Type", "plain/text")
} else {
inJson, err := json.Marshal(in)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(inJson)
req.Body = ioutil.NopCloser(buf)
req.ContentLength = int64(len(inJson))
req.Header.Set("Content-Length", strconv.Itoa(len(inJson)))
req.Header.Set("Content-Type", "application/json")
}
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode > http.StatusPartialContent {
defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf(string(out))
}
return resp.Body, nil
}
// createRequest is a helper function that builds an http.Request.
func (c *client) createRequest(rawurl, method string, in interface{}) (*http.Request, error) {
uri, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
// if we are posting or putting data, we need to
// write it to the body of the request.
var buf io.ReadWriter
if in != nil {
buf = new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(in)
if err != nil {
return nil, err
}
}
// creates a new http request to bitbucket.
req, err := http.NewRequest(method, uri.String(), buf)
if err != nil {
return nil, err
}
if in != nil {
req.Header.Set("Content-Type", "application/json")
}
return req, nil
}

View file

@ -1,22 +0,0 @@
package client
import (
"io"
"github.com/drone/drone/queue"
)
// Client is used to communicate with a Drone server.
type Client interface {
// Pull pulls work from the server queue.
Pull(os, arch string) (*queue.Work, error)
// Push pushes an update to the server.
Push(*queue.Work) error
// Stream streams the build logs to the server.
Stream(int64, io.ReadCloser) error
// Wait waits for the job to the complete.
Wait(int64) *Wait
}

View file

@ -54,12 +54,6 @@ func (r *pipeline) run() error {
envs := toEnv(w)
w.Yaml = expander.ExpandString(w.Yaml, envs)
if w.Verified {
}
if w.Signed {
}
// inject the netrc file into the clone plugin if the repositroy is
// private and requires authentication.
@ -123,45 +117,42 @@ func (r *pipeline) run() error {
compile.Transforms(trans)
spec, err := compile.CompileString(w.Yaml)
if err != nil {
// TODO handle error
logrus.Infof("Error compiling Yaml %s/%s#%d %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error())
return err
w.Job.Error = err.Error()
w.Job.ExitCode = 255
w.Job.Finished = w.Job.Started
w.Job.Status = model.StatusError
pushRetry(r.drone, w)
return nil
}
if err := r.drone.Push(w); err != nil {
logrus.Errorf("Error persisting update %s/%s#%d.%d. %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
return err
}
pushRetry(r.drone, w)
conf := runner.Config{
Engine: docker.New(r.docker),
}
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
run := conf.Runner(ctx, spec)
run.Run()
c := context.TODO()
c, timout := context.WithTimeout(c, time.Minute*time.Duration(w.Repo.Timeout))
c, cancel := context.WithCancel(c)
defer cancel()
defer timout()
run := conf.Runner(c, spec)
run.Run()
wait := r.drone.Wait(w.Job.ID)
if err != nil {
return err
}
defer wait.Cancel()
go func() {
_, werr := wait.Done()
if werr == nil {
if _, err := wait.Done(); err == nil {
logrus.Infof("Cancel build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
cancel()
}
}()
defer wait.Cancel()
rc, wc := io.Pipe()
go func() {
// TODO(bradrydzewski) figure out how to resume upload on failure
err := r.drone.Stream(w.Job.ID, rc)
if err != nil && err != io.ErrClosedPipe {
logrus.Errorf("Error streaming build logs. %s", err)
@ -207,7 +198,21 @@ func (r *pipeline) run() error {
logrus.Infof("Finished build %s/%s#%d.%d",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number)
return r.drone.Push(w)
pushRetry(r.drone, w)
return nil
}
func pushRetry(client client.Client, w *queue.Work) {
for {
err := client.Push(w)
if err == nil {
return
}
logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s",
w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err)
logrus.Infof("Retry update in 30s")
time.Sleep(time.Second * 30)
}
}
func toEnv(w *queue.Work) map[string]string {
@ -240,15 +245,8 @@ func toEnv(w *queue.Work) map[string]string {
"DRONE_BUILD_FINISHED": fmt.Sprintf("%d", w.Build.Finished),
"DRONE_YAML_VERIFIED": fmt.Sprintf("%v", w.Verified),
"DRONE_YAML_SIGNED": fmt.Sprintf("%v", w.Signed),
// SHORTER ALIASES
"DRONE_BRANCH": w.Build.Branch,
"DRONE_COMMIT": w.Build.Commit,
// TODO(bradrydzewski) netrc should only be injected via secrets
// "DRONE_NETRC_USERNAME": w.Netrc.Login,
// "DRONE_NETRC_PASSWORD": w.Netrc.Password,
// "DRONE_NETRC_MACHINE": w.Netrc.Machine,
"DRONE_BRANCH": w.Build.Branch,
"DRONE_COMMIT": w.Build.Commit,
}
if w.Build.Event == model.EventTag {

View file

@ -6,6 +6,7 @@ type Job struct {
BuildID int64 `json:"-" meddler:"job_build_id"`
NodeID int64 `json:"-" meddler:"job_node_id"`
Number int `json:"number" meddler:"job_number"`
Error string `json:"error" meddler:"-"`
Status string `json:"status" meddler:"job_status"`
ExitCode int `json:"exit_code" meddler:"job_exit_code"`
Enqueued int64 `json:"enqueued_at" meddler:"job_enqueued"`