Merge pull request #125 from vito/injectable-builder

Injectable builder (and friends)
This commit is contained in:
Brad Rydzewski 2014-02-25 15:42:37 -08:00
commit c0adf459f9
7 changed files with 342 additions and 263 deletions

View file

@ -11,6 +11,7 @@ import (
"time"
"github.com/drone/drone/pkg/build"
"github.com/drone/drone/pkg/build/docker"
"github.com/drone/drone/pkg/build/log"
"github.com/drone/drone/pkg/build/repo"
"github.com/drone/drone/pkg/build/script"
@ -174,7 +175,7 @@ func run(path string) {
// loop through and create builders
for _, b := range builds { //script.Builds {
builder := build.Builder{}
builder := build.New(docker.DefaultClient)
builder.Build = b
builder.Repo = &code
builder.Key = key
@ -186,7 +187,7 @@ func run(path string) {
builder.Stdout = &buf
}
builders = append(builders, &builder)
builders = append(builders, builder)
}
switch *parallel {

View file

@ -19,9 +19,6 @@ import (
"github.com/drone/drone/pkg/build/script"
)
// instance of the Docker client
var client = docker.New()
// BuildState stores information about a build
// process including the Exit status and various
// Runtime statistics (coming soon).
@ -35,6 +32,12 @@ type BuildState struct {
// Max RAM, Max Swap, Disk space, and more.
}
func New(dockerClient *docker.Client) *Builder {
return &Builder{
dockerClient: dockerClient,
}
}
// Builder represents a build process being prepared
// to run.
type Builder struct {
@ -82,6 +85,8 @@ type Builder struct {
// specified services and linked to
// this build.
services []*docker.Container
dockerClient *docker.Client
}
func (b *Builder) Run() error {
@ -178,19 +183,19 @@ func (b *Builder) setup() error {
log.Infof("starting service container %s", image.Tag)
// Run the contianer
run, err := client.Containers.RunDaemonPorts(image.Tag, image.Ports...)
run, err := b.dockerClient.Containers.RunDaemonPorts(image.Tag, image.Ports...)
if err != nil {
return err
}
// Get the container info
info, err := client.Containers.Inspect(run.ID)
info, err := b.dockerClient.Containers.Inspect(run.ID)
if err != nil {
// on error kill the container since it hasn't yet been
// added to the array and would therefore not get
// removed in the defer statement.
client.Containers.Stop(run.ID, 10)
client.Containers.Remove(run.ID)
b.dockerClient.Containers.Stop(run.ID, 10)
b.dockerClient.Containers.Remove(run.ID)
return err
}
@ -220,16 +225,16 @@ func (b *Builder) setup() error {
// check for build container (ie bradrydzewski/go:1.2)
// and download if it doesn't already exist
if _, err := client.Images.Inspect(b.Build.Image); err == docker.ErrNotFound {
if _, err := b.dockerClient.Images.Inspect(b.Build.Image); err == docker.ErrNotFound {
// download the image if it doesn't exist
if err := client.Images.Pull(b.Build.Image); err != nil {
if err := b.dockerClient.Images.Pull(b.Build.Image); err != nil {
return err
}
}
// create the Docker image
id := createUID()
if err := client.Images.Build(id, dir); err != nil {
if err := b.dockerClient.Images.Build(id, dir); err != nil {
return err
}
@ -237,11 +242,11 @@ func (b *Builder) setup() error {
log.Infof("copying repository to %s", b.Repo.Dir)
// get the image details
b.image, err = client.Images.Inspect(id)
b.image, err = b.dockerClient.Images.Inspect(id)
if err != nil {
// if we have problems with the image make sure
// we remove it before we exit
client.Images.Remove(id)
b.dockerClient.Images.Remove(id)
return err
}
@ -260,10 +265,10 @@ func (b *Builder) teardown() error {
log.Info("removing build container")
// stop the container, ignore error message
client.Containers.Stop(b.container.ID, 15)
b.dockerClient.Containers.Stop(b.container.ID, 15)
// remove the container, ignore error message
if err := client.Containers.Remove(b.container.ID); err != nil {
if err := b.dockerClient.Containers.Remove(b.container.ID); err != nil {
log.Errf("failed to delete build container %s", b.container.ID)
}
}
@ -274,10 +279,10 @@ func (b *Builder) teardown() error {
log.Infof("removing service container %s", b.Build.Services[i])
// stop the service container, ignore the error
client.Containers.Stop(container.ID, 15)
b.dockerClient.Containers.Stop(container.ID, 15)
// remove the service container, ignore the error
if err := client.Containers.Remove(container.ID); err != nil {
if err := b.dockerClient.Containers.Remove(container.ID); err != nil {
log.Errf("failed to delete service container %s", container.ID)
}
}
@ -287,7 +292,7 @@ func (b *Builder) teardown() error {
// debugging
log.Info("removing build image")
if _, err := client.Images.Remove(b.image.ID); err != nil {
if _, err := b.dockerClient.Images.Remove(b.image.ID); err != nil {
log.Errf("failed to completely delete build image %s. %s", b.image.ID, err.Error())
}
}
@ -322,7 +327,7 @@ func (b *Builder) run() error {
}
// create the container from the image
run, err := client.Containers.Create(&conf)
run, err := b.dockerClient.Containers.Create(&conf)
if err != nil {
return err
}
@ -332,18 +337,18 @@ func (b *Builder) run() error {
// attach to the container
go func() {
client.Containers.Attach(run.ID, &writer{b.Stdout})
b.dockerClient.Containers.Attach(run.ID, &writer{b.Stdout})
}()
// start the container
if err := client.Containers.Start(run.ID, &host); err != nil {
if err := b.dockerClient.Containers.Start(run.ID, &host); err != nil {
b.BuildState.ExitCode = 1
b.BuildState.Finished = time.Now().UTC().Unix()
return err
}
// wait for the container to stop
wait, err := client.Containers.Wait(run.ID)
wait, err := b.dockerClient.Containers.Wait(run.ID)
if err != nil {
b.BuildState.ExitCode = 1
b.BuildState.Finished = time.Now().UTC().Unix()

View file

@ -29,6 +29,8 @@ const (
// Enables verbose logging to the Terminal window
var Logging = true
var DefaultClient = New() // TEMPORARY; PLEASE CONSTRUCT/INJECT YOURSELF
// New creates an instance of the Docker Client
func New() *Client {
c := &Client{}

View file

@ -1,22 +0,0 @@
package queue
import (
"runtime"
)
func init() {
// get the number of CPUs. Since builds
// tend to be CPU-intensive we should only
// execute 1 build per CPU.
ncpu := runtime.NumCPU()
// must be at least 1
if ncpu < 1 {
ncpu = 1
}
// spawn a worker for each CPU
for i := 0; i < ncpu; i++ {
go work()
}
}

View file

@ -1,46 +1,16 @@
package queue
import (
"bytes"
"fmt"
bldr "github.com/drone/drone/pkg/build"
"github.com/drone/drone/pkg/build/git"
r "github.com/drone/drone/pkg/build/repo"
"github.com/drone/drone/pkg/build/docker"
"github.com/drone/drone/pkg/build/script"
"github.com/drone/drone/pkg/channel"
"github.com/drone/drone/pkg/database"
. "github.com/drone/drone/pkg/model"
"github.com/drone/drone/pkg/plugin/notify"
"github.com/drone/go-github/github"
"log"
"path/filepath"
"runtime"
"time"
)
// queue that will store all build tasks until
// they are processed by a worker.
var queue = make(chan *BuildTask)
// work is a function that will infinitely
// run in the background waiting for tasks that
// it can pull off the queue and execute.
func work() {
var task *BuildTask
for {
// get work item (pointer) from the queue
task = <-queue
if task == nil {
continue
}
// execute the task
task.execute()
}
}
// Add adds the task to the build queue.
func Add(task *BuildTask) {
queue <- task
// A Queue dispatches tasks to workers.
type Queue struct {
tasks chan<- *BuildTask
}
// BuildTasks represents a build that is pending
@ -55,193 +25,36 @@ type BuildTask struct {
Script *script.Build
}
// execute will execute the build task and persist
// the results to the datastore.
func (b *BuildTask) execute() error {
// we need to be sure that we can recover
// from any sort panic that could occur
// to avoid brining down the entire application
defer func() {
if e := recover(); e != nil {
b.Build.Finished = time.Now().UTC()
b.Commit.Finished = time.Now().UTC()
b.Build.Duration = b.Build.Finished.Unix() - b.Build.Started.Unix()
b.Commit.Duration = b.Build.Finished.Unix() - b.Build.Started.Unix()
b.Commit.Status = "Error"
b.Build.Status = "Error"
database.SaveBuild(b.Build)
database.SaveCommit(b.Commit)
var defaultQueue = Start(runtime.NumCPU(), newRunner(docker.DefaultClient, 300*time.Second)) // TEMPORARY; INJECT PLEASE
var Add = defaultQueue.Add // TEMPORARY; INJECT PLEASE
func Start(workers int, runner Runner) *Queue {
// get the number of CPUs. Since builds
// tend to be CPU-intensive we should only
// execute 1 build per CPU.
// must be at least 1
// if ncpu < 1 {
// ncpu = 1
// }
tasks := make(chan *BuildTask)
queue := &Queue{tasks: tasks}
// spawn a worker for each CPU
for i := 0; i < workers; i++ {
worker := worker{
runner: runner,
}
}()
// update commit and build status
b.Commit.Status = "Started"
b.Build.Status = "Started"
b.Build.Started = time.Now().UTC()
b.Commit.Started = time.Now().UTC()
// persist the commit to the database
if err := database.SaveCommit(b.Commit); err != nil {
return err
go worker.work(tasks)
}
// persist the build to the database
if err := database.SaveBuild(b.Build); err != nil {
return err
}
// get settings
settings, _ := database.GetSettings()
// notification context
context := &notify.Context{
Repo: b.Repo,
Commit: b.Commit,
Host: settings.URL().String(),
}
// send all "started" notifications
if b.Script.Notifications != nil {
b.Script.Notifications.Send(context)
}
// Send "started" notification to Github
if err := updateGitHubStatus(b.Repo, b.Commit); err != nil {
log.Printf("error updating github status: %s\n", err.Error())
}
// make sure a channel exists for the repository,
// the commit, and the commit output (TODO)
reposlug := fmt.Sprintf("%s/%s/%s", b.Repo.Host, b.Repo.Owner, b.Repo.Name)
commitslug := fmt.Sprintf("%s/%s/%s/commit/%s", b.Repo.Host, b.Repo.Owner, b.Repo.Name, b.Commit.Hash)
consoleslug := fmt.Sprintf("%s/%s/%s/commit/%s/builds/%s", b.Repo.Host, b.Repo.Owner, b.Repo.Name, b.Commit.Hash, b.Build.Slug)
channel.Create(reposlug)
channel.Create(commitslug)
channel.CreateStream(consoleslug)
// notify the channels that the commit and build started
channel.SendJSON(reposlug, b.Commit)
channel.SendJSON(commitslug, b.Build)
var buf = &bufferWrapper{channel: consoleslug}
// append private parameters to the environment
// variable section of the .drone.yml file
if b.Repo.Params != nil {
for k, v := range b.Repo.Params {
b.Script.Env = append(b.Script.Env, k+"="+v)
}
}
// execute the build
builder := bldr.Builder{}
builder.Build = b.Script
builder.Repo = &r.Repo{Path: b.Repo.URL, Branch: b.Commit.Branch, Commit: b.Commit.Hash, PR: b.Commit.PullRequest, Dir: filepath.Join("/var/cache/drone/src", b.Repo.Slug), Depth: git.GitDepth(b.Script.Git)}
builder.Key = []byte(b.Repo.PrivateKey)
builder.Stdout = buf
builder.Timeout = 300 * time.Minute
defer func() {
// update the status of the commit using the
// GitHub status API.
if err := updateGitHubStatus(b.Repo, b.Commit); err != nil {
log.Printf("error updating github status: %s\n", err.Error())
}
}()
buildErr := builder.Run()
b.Build.Finished = time.Now().UTC()
b.Commit.Finished = time.Now().UTC()
b.Build.Duration = b.Build.Finished.UnixNano() - b.Build.Started.UnixNano()
b.Commit.Duration = b.Build.Finished.UnixNano() - b.Build.Started.UnixNano()
b.Commit.Status = "Success"
b.Build.Status = "Success"
b.Build.Stdout = buf.buf.String()
// if exit code != 0 set to failure
if builder.BuildState == nil || builder.BuildState.ExitCode != 0 {
b.Commit.Status = "Failure"
b.Build.Status = "Failure"
if buildErr != nil && b.Build.Stdout == "" {
// TODO: If you wanted to have very friendly error messages, you could do that here
b.Build.Stdout = buildErr.Error() + "\n"
}
}
// persist the build to the database
if err := database.SaveBuild(b.Build); err != nil {
return err
}
// persist the commit to the database
if err := database.SaveCommit(b.Commit); err != nil {
return err
}
// notify the channels that the commit and build finished
channel.SendJSON(reposlug, b.Commit)
channel.SendJSON(commitslug, b.Build)
channel.Close(consoleslug)
// send all "finished" notifications
if b.Script.Notifications != nil {
b.Script.Notifications.Send(context)
}
return nil
return queue
}
// updateGitHubStatus is a helper function that will send
// the build status to GitHub using the Status API.
// see https://github.com/blog/1227-commit-status-api
func updateGitHubStatus(repo *Repo, commit *Commit) error {
// convert from drone status to github status
var message, status string
switch commit.Status {
case "Success":
status = "success"
message = "The build succeeded on drone.io"
case "Failure":
status = "failure"
message = "The build failed on drone.io"
case "Started":
status = "pending"
message = "The build is pending on drone.io"
default:
status = "error"
message = "The build errored on drone.io"
}
// get the system settings
settings, _ := database.GetSettings()
// get the user from the database
// since we need his / her GitHub token
user, err := database.GetUser(repo.UserID)
if err != nil {
return err
}
client := github.New(user.GithubToken)
client.ApiUrl = settings.GitHubApiUrl;
var url string
url = settings.URL().String() + "/" + repo.Slug + "/commit/" + commit.Hash
return client.Repos.CreateStatus(repo.Owner, repo.Name, status, url, message, commit.Hash)
}
type bufferWrapper struct {
buf bytes.Buffer
// name of the channel
channel string
}
func (b *bufferWrapper) Write(p []byte) (n int, err error) {
n, err = b.buf.Write(p)
channel.SendBytes(b.channel, p)
return
// Add adds the task to the build queue.
func (q *Queue) Add(task *BuildTask) {
q.tasks <- task
}

40
pkg/queue/runner.go Normal file
View file

@ -0,0 +1,40 @@
package queue
import (
"io"
"time"
"github.com/drone/drone/pkg/build"
"github.com/drone/drone/pkg/build/docker"
"github.com/drone/drone/pkg/build/repo"
"github.com/drone/drone/pkg/build/script"
)
type Runner interface {
Run(buildScript *script.Build, repo *repo.Repo, key []byte, buildOutput io.Writer) (success bool, err error)
}
type runner struct {
dockerClient *docker.Client
timeout time.Duration
}
func newRunner(dockerClient *docker.Client, timeout time.Duration) *runner {
return &runner{
dockerClient: dockerClient,
timeout: timeout,
}
}
func (r *runner) Run(buildScript *script.Build, repo *repo.Repo, key []byte, buildOutput io.Writer) (bool, error) {
builder := build.New(r.dockerClient)
builder.Build = buildScript
builder.Repo = repo
builder.Key = key
builder.Stdout = buildOutput
builder.Timeout = r.timeout
err := builder.Run()
return builder.BuildState == nil || builder.BuildState.ExitCode != 0, err
}

240
pkg/queue/worker.go Normal file
View file

@ -0,0 +1,240 @@
package queue
import (
"bytes"
"fmt"
"github.com/drone/drone/pkg/build/git"
r "github.com/drone/drone/pkg/build/repo"
"github.com/drone/drone/pkg/channel"
"github.com/drone/drone/pkg/database"
. "github.com/drone/drone/pkg/model"
"github.com/drone/drone/pkg/plugin/notify"
"github.com/drone/go-github/github"
"io"
"log"
"path/filepath"
"time"
)
type worker struct {
runner Runner
}
// work is a function that will infinitely
// run in the background waiting for tasks that
// it can pull off the queue and execute.
func (w *worker) work(queue <-chan *BuildTask) {
var task *BuildTask
for {
// get work item (pointer) from the queue
task = <-queue
if task == nil {
continue
}
// execute the task
w.execute(task)
}
}
// execute will execute the build task and persist
// the results to the datastore.
func (w *worker) execute(task *BuildTask) error {
// we need to be sure that we can recover
// from any sort panic that could occur
// to avoid brining down the entire application
defer func() {
if e := recover(); e != nil {
task.Build.Finished = time.Now().UTC()
task.Commit.Finished = time.Now().UTC()
task.Build.Duration = task.Build.Finished.Unix() - task.Build.Started.Unix()
task.Commit.Duration = task.Build.Finished.Unix() - task.Build.Started.Unix()
task.Commit.Status = "Error"
task.Build.Status = "Error"
database.SaveBuild(task.Build)
database.SaveCommit(task.Commit)
}
}()
// update commit and build status
task.Commit.Status = "Started"
task.Build.Status = "Started"
task.Build.Started = time.Now().UTC()
task.Commit.Started = time.Now().UTC()
// persist the commit to the database
if err := database.SaveCommit(task.Commit); err != nil {
return err
}
// persist the build to the database
if err := database.SaveBuild(task.Build); err != nil {
return err
}
// get settings
settings, _ := database.GetSettings()
// notification context
context := &notify.Context{
Repo: task.Repo,
Commit: task.Commit,
Host: settings.URL().String(),
}
// send all "started" notifications
if task.Script.Notifications != nil {
task.Script.Notifications.Send(context)
}
// Send "started" notification to Github
if err := updateGitHubStatus(task.Repo, task.Commit); err != nil {
log.Printf("error updating github status: %s\n", err.Error())
}
// make sure a channel exists for the repository,
// the commit, and the commit output (TODO)
reposlug := fmt.Sprintf("%s/%s/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name)
commitslug := fmt.Sprintf("%s/%s/%s/commit/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name, task.Commit.Hash)
consoleslug := fmt.Sprintf("%s/%s/%s/commit/%s/builds/%s", task.Repo.Host, task.Repo.Owner, task.Repo.Name, task.Commit.Hash, task.Build.Slug)
channel.Create(reposlug)
channel.Create(commitslug)
channel.CreateStream(consoleslug)
// notify the channels that the commit and build started
channel.SendJSON(reposlug, task.Commit)
channel.SendJSON(commitslug, task.Build)
var buf = &bufferWrapper{channel: consoleslug}
// append private parameters to the environment
// variable section of the .drone.yml file
if task.Repo.Params != nil {
for k, v := range task.Repo.Params {
task.Script.Env = append(task.Script.Env, k+"="+v)
}
}
defer func() {
// update the status of the commit using the
// GitHub status API.
if err := updateGitHubStatus(task.Repo, task.Commit); err != nil {
log.Printf("error updating github status: %s\n", err.Error())
}
}()
// execute the build
passed, buildErr := w.runBuild(task, buf)
task.Build.Finished = time.Now().UTC()
task.Commit.Finished = time.Now().UTC()
task.Build.Duration = task.Build.Finished.UnixNano() - task.Build.Started.UnixNano()
task.Commit.Duration = task.Build.Finished.UnixNano() - task.Build.Started.UnixNano()
task.Commit.Status = "Success"
task.Build.Status = "Success"
task.Build.Stdout = buf.buf.String()
// if exit code != 0 set to failure
if passed {
task.Commit.Status = "Failure"
task.Build.Status = "Failure"
if buildErr != nil && task.Build.Stdout == "" {
// TODO: If you wanted to have very friendly error messages, you could do that here
task.Build.Stdout = buildErr.Error() + "\n"
}
}
// persist the build to the database
if err := database.SaveBuild(task.Build); err != nil {
return err
}
// persist the commit to the database
if err := database.SaveCommit(task.Commit); err != nil {
return err
}
// notify the channels that the commit and build finished
channel.SendJSON(reposlug, task.Commit)
channel.SendJSON(commitslug, task.Build)
channel.Close(consoleslug)
// send all "finished" notifications
if task.Script.Notifications != nil {
task.Script.Notifications.Send(context)
}
return nil
}
func (w *worker) runBuild(task *BuildTask, buf io.Writer) (bool, error) {
repo := &r.Repo{
Path: task.Repo.URL,
Branch: task.Commit.Branch,
Commit: task.Commit.Hash,
PR: task.Commit.PullRequest,
Dir: filepath.Join("/var/cache/drone/src", task.Repo.Slug),
Depth: git.GitDepth(task.Script.Git),
}
return w.runner.Run(
task.Script,
repo,
[]byte(task.Repo.PrivateKey),
buf,
)
}
// updateGitHubStatus is a helper function that will send
// the build status to GitHub using the Status API.
// see https://github.com/blog/1227-commit-status-api
func updateGitHubStatus(repo *Repo, commit *Commit) error {
// convert from drone status to github status
var message, status string
switch commit.Status {
case "Success":
status = "success"
message = "The build succeeded on drone.io"
case "Failure":
status = "failure"
message = "The build failed on drone.io"
case "Started":
status = "pending"
message = "The build is pending on drone.io"
default:
status = "error"
message = "The build errored on drone.io"
}
// get the system settings
settings, _ := database.GetSettings()
// get the user from the database
// since we need his / her GitHub token
user, err := database.GetUser(repo.UserID)
if err != nil {
return err
}
client := github.New(user.GithubToken)
client.ApiUrl = settings.GitHubApiUrl
var url string
url = settings.URL().String() + "/" + repo.Slug + "/commit/" + commit.Hash
return client.Repos.CreateStatus(repo.Owner, repo.Name, status, url, message, commit.Hash)
}
type bufferWrapper struct {
buf bytes.Buffer
// name of the channel
channel string
}
func (b *bufferWrapper) Write(p []byte) (n int, err error) {
n, err = b.buf.Write(p)
channel.SendBytes(b.channel, p)
return
}