diff --git a/client/client.go b/client/client.go index 307a4e74e..783c2c936 100644 --- a/client/client.go +++ b/client/client.go @@ -1,225 +1,22 @@ package client import ( - "bytes" - "encoding/json" - "fmt" "io" - "io/ioutil" - "net/http" - "net/url" - "strconv" - "github.com/drone/drone/model" "github.com/drone/drone/queue" - "golang.org/x/net/context" - "golang.org/x/net/context/ctxhttp" - "golang.org/x/oauth2" ) -const ( - pathPull = "%s/api/queue/pull/%s/%s" - pathWait = "%s/api/queue/wait/%d" - pathStream = "%s/api/queue/stream/%d" - pathPush = "%s/api/queue/status/%d" -) +// Client is used to communicate with a Drone server. +type Client interface { + // Pull pulls work from the server queue. + Pull(os, arch string) (*queue.Work, error) -type client struct { - client *http.Client - base string // base url -} - -// NewClient returns a client at the specified url. -func NewClient(uri string) Client { - return &client{http.DefaultClient, uri} -} - -// NewClientToken returns a client at the specified url that -// authenticates all outbound requests with the given token. -func NewClientToken(uri, token string) Client { - config := new(oauth2.Config) - auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token}) - return &client{auther, uri} -} - -// Pull pulls work from the server queue. -func (c *client) Pull(os, arch string) (*queue.Work, error) { - out := new(queue.Work) - uri := fmt.Sprintf(pathPull, c.base, os, arch) - err := c.post(uri, nil, out) - return out, err -} - -// Push pushes an update to the server. -func (c *client) Push(p *queue.Work) error { - uri := fmt.Sprintf(pathPush, c.base, p.Job.ID) - err := c.post(uri, p, nil) - return err -} - -// Stream streams the build logs to the server. -func (c *client) Stream(id int64, rc io.ReadCloser) error { - uri := fmt.Sprintf(pathStream, c.base, id) - err := c.post(uri, rc, nil) - return err -} - -// Wait watches and waits for the build to cancel or finish. -func (c *client) Wait(id int64) *Wait { - ctx, cancel := context.WithCancel(context.Background()) - return &Wait{id, c, ctx, cancel} -} - -type Wait struct { - id int64 - client *client - - ctx context.Context - cancel context.CancelFunc -} - -func (w *Wait) Done() (*model.Job, error) { - uri := fmt.Sprintf(pathWait, w.client.base, w.id) - req, err := w.client.createRequest(uri, "POST", nil) - if err != nil { - return nil, err - } - - res, err := ctxhttp.Do(w.ctx, w.client.client, req) - if err != nil { - return nil, err - } - defer res.Body.Close() - job := &model.Job{} - err = json.NewDecoder(res.Body).Decode(&job) - if err != nil { - return nil, err - } - return job, nil -} - -func (w *Wait) Cancel() { - w.cancel() -} - -// -// http request helper functions -// - -// helper function for making an http GET request. -func (c *client) get(rawurl string, out interface{}) error { - return c.do(rawurl, "GET", nil, out) -} - -// helper function for making an http POST request. -func (c *client) post(rawurl string, in, out interface{}) error { - return c.do(rawurl, "POST", in, out) -} - -// helper function for making an http PUT request. -func (c *client) put(rawurl string, in, out interface{}) error { - return c.do(rawurl, "PUT", in, out) -} - -// helper function for making an http PATCH request. -func (c *client) patch(rawurl string, in, out interface{}) error { - return c.do(rawurl, "PATCH", in, out) -} - -// helper function for making an http DELETE request. -func (c *client) delete(rawurl string) error { - return c.do(rawurl, "DELETE", nil, nil) -} - -// helper function to make an http request -func (c *client) do(rawurl, method string, in, out interface{}) error { - // executes the http request and returns the body as - // and io.ReadCloser - body, err := c.open(rawurl, method, in, out) - if err != nil { - return err - } - defer body.Close() - - // if a json response is expected, parse and return - // the json response. - if out != nil { - return json.NewDecoder(body).Decode(out) - } - return nil -} - -// helper function to open an http request -func (c *client) open(rawurl, method string, in, out interface{}) (io.ReadCloser, error) { - uri, err := url.Parse(rawurl) - if err != nil { - return nil, err - } - - // creates a new http request to bitbucket. - req, err := http.NewRequest(method, uri.String(), nil) - if err != nil { - return nil, err - } - - // if we are posting or putting data, we need to - // write it to the body of the request. - if in != nil { - rc, ok := in.(io.ReadCloser) - if ok { - req.Body = rc - req.Header.Set("Content-Type", "plain/text") - } else { - inJson, err := json.Marshal(in) - if err != nil { - return nil, err - } - - buf := bytes.NewBuffer(inJson) - req.Body = ioutil.NopCloser(buf) - - req.ContentLength = int64(len(inJson)) - req.Header.Set("Content-Length", strconv.Itoa(len(inJson))) - req.Header.Set("Content-Type", "application/json") - } - } - resp, err := c.client.Do(req) - if err != nil { - return nil, err - } - if resp.StatusCode > http.StatusPartialContent { - defer resp.Body.Close() - out, _ := ioutil.ReadAll(resp.Body) - return nil, fmt.Errorf(string(out)) - } - return resp.Body, nil -} - -// createRequest is a helper function that builds an http.Request. -func (c *client) createRequest(rawurl, method string, in interface{}) (*http.Request, error) { - uri, err := url.Parse(rawurl) - if err != nil { - return nil, err - } - - // if we are posting or putting data, we need to - // write it to the body of the request. - var buf io.ReadWriter - if in != nil { - buf = new(bytes.Buffer) - err := json.NewEncoder(buf).Encode(in) - if err != nil { - return nil, err - } - } - - // creates a new http request to bitbucket. - req, err := http.NewRequest(method, uri.String(), buf) - if err != nil { - return nil, err - } - if in != nil { - req.Header.Set("Content-Type", "application/json") - } - return req, nil + // Push pushes an update to the server. + Push(*queue.Work) error + + // Stream streams the build logs to the server. + Stream(int64, io.ReadCloser) error + + // Wait waits for the job to the complete. + Wait(int64) *Wait } diff --git a/client/client_impl.go b/client/client_impl.go new file mode 100644 index 000000000..307a4e74e --- /dev/null +++ b/client/client_impl.go @@ -0,0 +1,225 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "strconv" + + "github.com/drone/drone/model" + "github.com/drone/drone/queue" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" + "golang.org/x/oauth2" +) + +const ( + pathPull = "%s/api/queue/pull/%s/%s" + pathWait = "%s/api/queue/wait/%d" + pathStream = "%s/api/queue/stream/%d" + pathPush = "%s/api/queue/status/%d" +) + +type client struct { + client *http.Client + base string // base url +} + +// NewClient returns a client at the specified url. +func NewClient(uri string) Client { + return &client{http.DefaultClient, uri} +} + +// NewClientToken returns a client at the specified url that +// authenticates all outbound requests with the given token. +func NewClientToken(uri, token string) Client { + config := new(oauth2.Config) + auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token}) + return &client{auther, uri} +} + +// Pull pulls work from the server queue. +func (c *client) Pull(os, arch string) (*queue.Work, error) { + out := new(queue.Work) + uri := fmt.Sprintf(pathPull, c.base, os, arch) + err := c.post(uri, nil, out) + return out, err +} + +// Push pushes an update to the server. +func (c *client) Push(p *queue.Work) error { + uri := fmt.Sprintf(pathPush, c.base, p.Job.ID) + err := c.post(uri, p, nil) + return err +} + +// Stream streams the build logs to the server. +func (c *client) Stream(id int64, rc io.ReadCloser) error { + uri := fmt.Sprintf(pathStream, c.base, id) + err := c.post(uri, rc, nil) + return err +} + +// Wait watches and waits for the build to cancel or finish. +func (c *client) Wait(id int64) *Wait { + ctx, cancel := context.WithCancel(context.Background()) + return &Wait{id, c, ctx, cancel} +} + +type Wait struct { + id int64 + client *client + + ctx context.Context + cancel context.CancelFunc +} + +func (w *Wait) Done() (*model.Job, error) { + uri := fmt.Sprintf(pathWait, w.client.base, w.id) + req, err := w.client.createRequest(uri, "POST", nil) + if err != nil { + return nil, err + } + + res, err := ctxhttp.Do(w.ctx, w.client.client, req) + if err != nil { + return nil, err + } + defer res.Body.Close() + job := &model.Job{} + err = json.NewDecoder(res.Body).Decode(&job) + if err != nil { + return nil, err + } + return job, nil +} + +func (w *Wait) Cancel() { + w.cancel() +} + +// +// http request helper functions +// + +// helper function for making an http GET request. +func (c *client) get(rawurl string, out interface{}) error { + return c.do(rawurl, "GET", nil, out) +} + +// helper function for making an http POST request. +func (c *client) post(rawurl string, in, out interface{}) error { + return c.do(rawurl, "POST", in, out) +} + +// helper function for making an http PUT request. +func (c *client) put(rawurl string, in, out interface{}) error { + return c.do(rawurl, "PUT", in, out) +} + +// helper function for making an http PATCH request. +func (c *client) patch(rawurl string, in, out interface{}) error { + return c.do(rawurl, "PATCH", in, out) +} + +// helper function for making an http DELETE request. +func (c *client) delete(rawurl string) error { + return c.do(rawurl, "DELETE", nil, nil) +} + +// helper function to make an http request +func (c *client) do(rawurl, method string, in, out interface{}) error { + // executes the http request and returns the body as + // and io.ReadCloser + body, err := c.open(rawurl, method, in, out) + if err != nil { + return err + } + defer body.Close() + + // if a json response is expected, parse and return + // the json response. + if out != nil { + return json.NewDecoder(body).Decode(out) + } + return nil +} + +// helper function to open an http request +func (c *client) open(rawurl, method string, in, out interface{}) (io.ReadCloser, error) { + uri, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + // creates a new http request to bitbucket. + req, err := http.NewRequest(method, uri.String(), nil) + if err != nil { + return nil, err + } + + // if we are posting or putting data, we need to + // write it to the body of the request. + if in != nil { + rc, ok := in.(io.ReadCloser) + if ok { + req.Body = rc + req.Header.Set("Content-Type", "plain/text") + } else { + inJson, err := json.Marshal(in) + if err != nil { + return nil, err + } + + buf := bytes.NewBuffer(inJson) + req.Body = ioutil.NopCloser(buf) + + req.ContentLength = int64(len(inJson)) + req.Header.Set("Content-Length", strconv.Itoa(len(inJson))) + req.Header.Set("Content-Type", "application/json") + } + } + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode > http.StatusPartialContent { + defer resp.Body.Close() + out, _ := ioutil.ReadAll(resp.Body) + return nil, fmt.Errorf(string(out)) + } + return resp.Body, nil +} + +// createRequest is a helper function that builds an http.Request. +func (c *client) createRequest(rawurl, method string, in interface{}) (*http.Request, error) { + uri, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + // if we are posting or putting data, we need to + // write it to the body of the request. + var buf io.ReadWriter + if in != nil { + buf = new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(in) + if err != nil { + return nil, err + } + } + + // creates a new http request to bitbucket. + req, err := http.NewRequest(method, uri.String(), buf) + if err != nil { + return nil, err + } + if in != nil { + req.Header.Set("Content-Type", "application/json") + } + return req, nil +} diff --git a/client/interface.go b/client/interface.go deleted file mode 100644 index 783c2c936..000000000 --- a/client/interface.go +++ /dev/null @@ -1,22 +0,0 @@ -package client - -import ( - "io" - - "github.com/drone/drone/queue" -) - -// Client is used to communicate with a Drone server. -type Client interface { - // Pull pulls work from the server queue. - Pull(os, arch string) (*queue.Work, error) - - // Push pushes an update to the server. - Push(*queue.Work) error - - // Stream streams the build logs to the server. - Stream(int64, io.ReadCloser) error - - // Wait waits for the job to the complete. - Wait(int64) *Wait -} diff --git a/drone/agent/exec.go b/drone/agent/exec.go index f26d94a72..2a3eafba4 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -54,12 +54,6 @@ func (r *pipeline) run() error { envs := toEnv(w) w.Yaml = expander.ExpandString(w.Yaml, envs) - if w.Verified { - - } - if w.Signed { - - } // inject the netrc file into the clone plugin if the repositroy is // private and requires authentication. @@ -123,45 +117,42 @@ func (r *pipeline) run() error { compile.Transforms(trans) spec, err := compile.CompileString(w.Yaml) if err != nil { - // TODO handle error - logrus.Infof("Error compiling Yaml %s/%s#%d %s", - w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error()) - return err + w.Job.Error = err.Error() + w.Job.ExitCode = 255 + w.Job.Finished = w.Job.Started + w.Job.Status = model.StatusError + pushRetry(r.drone, w) + return nil } - if err := r.drone.Push(w); err != nil { - logrus.Errorf("Error persisting update %s/%s#%d.%d. %s", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) - return err - } + pushRetry(r.drone, w) conf := runner.Config{ Engine: docker.New(r.docker), } - ctx := context.TODO() - ctx, cancel := context.WithCancel(ctx) - - run := conf.Runner(ctx, spec) - run.Run() + c := context.TODO() + c, timout := context.WithTimeout(c, time.Minute*time.Duration(w.Repo.Timeout)) + c, cancel := context.WithCancel(c) defer cancel() + defer timout() + + run := conf.Runner(c, spec) + run.Run() wait := r.drone.Wait(w.Job.ID) - if err != nil { - return err - } + defer wait.Cancel() go func() { - _, werr := wait.Done() - if werr == nil { + if _, err := wait.Done(); err == nil { logrus.Infof("Cancel build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) cancel() } }() - defer wait.Cancel() rc, wc := io.Pipe() go func() { + // TODO(bradrydzewski) figure out how to resume upload on failure err := r.drone.Stream(w.Job.ID, rc) if err != nil && err != io.ErrClosedPipe { logrus.Errorf("Error streaming build logs. %s", err) @@ -207,7 +198,21 @@ func (r *pipeline) run() error { logrus.Infof("Finished build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - return r.drone.Push(w) + pushRetry(r.drone, w) + return nil +} + +func pushRetry(client client.Client, w *queue.Work) { + for { + err := client.Push(w) + if err == nil { + return + } + logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) + logrus.Infof("Retry update in 30s") + time.Sleep(time.Second * 30) + } } func toEnv(w *queue.Work) map[string]string { @@ -240,15 +245,8 @@ func toEnv(w *queue.Work) map[string]string { "DRONE_BUILD_FINISHED": fmt.Sprintf("%d", w.Build.Finished), "DRONE_YAML_VERIFIED": fmt.Sprintf("%v", w.Verified), "DRONE_YAML_SIGNED": fmt.Sprintf("%v", w.Signed), - - // SHORTER ALIASES - "DRONE_BRANCH": w.Build.Branch, - "DRONE_COMMIT": w.Build.Commit, - - // TODO(bradrydzewski) netrc should only be injected via secrets - // "DRONE_NETRC_USERNAME": w.Netrc.Login, - // "DRONE_NETRC_PASSWORD": w.Netrc.Password, - // "DRONE_NETRC_MACHINE": w.Netrc.Machine, + "DRONE_BRANCH": w.Build.Branch, + "DRONE_COMMIT": w.Build.Commit, } if w.Build.Event == model.EventTag { diff --git a/model/job.go b/model/job.go index be39d201f..607d690f7 100644 --- a/model/job.go +++ b/model/job.go @@ -6,6 +6,7 @@ type Job struct { BuildID int64 `json:"-" meddler:"job_build_id"` NodeID int64 `json:"-" meddler:"job_node_id"` Number int `json:"number" meddler:"job_number"` + Error string `json:"error" meddler:"-"` Status string `json:"status" meddler:"job_status"` ExitCode int `json:"exit_code" meddler:"job_exit_code"` Enqueued int64 `json:"enqueued_at" meddler:"job_enqueued"`