provide runner to workers, use it for all builds

Signed-off-by: Abhijit Hiremagalur <abhi@pivotallabs.com>
This commit is contained in:
Alex Suraci 2014-02-24 17:51:25 -08:00 committed by Abhijit Hiremagalur
parent ddc8e7a56f
commit 4b52fcad1a
3 changed files with 62 additions and 45 deletions

View file

@ -1,9 +1,11 @@
package queue package queue
import ( import (
"github.com/drone/drone/pkg/build/docker"
"github.com/drone/drone/pkg/build/script" "github.com/drone/drone/pkg/build/script"
. "github.com/drone/drone/pkg/model" . "github.com/drone/drone/pkg/model"
"runtime" "runtime"
"time"
) )
// A Queue dispatches tasks to workers. // A Queue dispatches tasks to workers.
@ -23,11 +25,11 @@ type BuildTask struct {
Script *script.Build Script *script.Build
} }
var defaultQueue = Start(runtime.NumCPU()) // TEMPORARY; INJECT PLEASE var defaultQueue = Start(runtime.NumCPU(), newRunner(docker.DefaultClient, 300*time.Second)) // TEMPORARY; INJECT PLEASE
var Add = defaultQueue.Add // TEMPORARY; INJECT PLEASE var Add = defaultQueue.Add // TEMPORARY; INJECT PLEASE
func Start(workers int) *Queue { func Start(workers int, runner Runner) *Queue {
// get the number of CPUs. Since builds // get the number of CPUs. Since builds
// tend to be CPU-intensive we should only // tend to be CPU-intensive we should only
// execute 1 build per CPU. // execute 1 build per CPU.
@ -42,7 +44,10 @@ func Start(workers int) *Queue {
// spawn a worker for each CPU // spawn a worker for each CPU
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
worker := worker{} worker := worker{
runner: runner,
}
go worker.work(tasks) go worker.work(tasks)
} }

40
pkg/queue/runner.go Normal file
View file

@ -0,0 +1,40 @@
package queue
import (
"io"
"time"
"github.com/drone/drone/pkg/build"
"github.com/drone/drone/pkg/build/docker"
"github.com/drone/drone/pkg/build/repo"
"github.com/drone/drone/pkg/build/script"
)
type Runner interface {
Run(buildScript *script.Build, repo *repo.Repo, key []byte, buildOutput io.Writer) (success bool, err error)
}
type runner struct {
dockerClient *docker.Client
timeout time.Duration
}
func newRunner(dockerClient *docker.Client, timeout time.Duration) *runner {
return &runner{
dockerClient: dockerClient,
timeout: timeout,
}
}
func (r *runner) Run(buildScript *script.Build, repo *repo.Repo, key []byte, buildOutput io.Writer) (bool, error) {
builder := build.New(r.dockerClient)
builder.Build = buildScript
builder.Repo = repo
builder.Key = key
builder.Stdout = buildOutput
builder.Timeout = r.timeout
err := builder.Run()
return builder.BuildState == nil || builder.BuildState.ExitCode != 0, err
}

View file

@ -3,11 +3,8 @@ package queue
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/drone/drone/pkg/build"
"github.com/drone/drone/pkg/build/docker"
"github.com/drone/drone/pkg/build/git" "github.com/drone/drone/pkg/build/git"
r "github.com/drone/drone/pkg/build/repo" r "github.com/drone/drone/pkg/build/repo"
"github.com/drone/drone/pkg/build/script"
"github.com/drone/drone/pkg/channel" "github.com/drone/drone/pkg/channel"
"github.com/drone/drone/pkg/database" "github.com/drone/drone/pkg/database"
. "github.com/drone/drone/pkg/model" . "github.com/drone/drone/pkg/model"
@ -19,7 +16,9 @@ import (
"time" "time"
) )
type worker struct{} type worker struct {
runner Runner
}
// work is a function that will infinitely // work is a function that will infinitely
// run in the background waiting for tasks that // run in the background waiting for tasks that
@ -125,7 +124,7 @@ func (w *worker) execute(task *BuildTask) error {
}() }()
// execute the build // execute the build
passed, buildErr := runBuild(task, buf) //w.builder.Build(script, repo, task.Repo.PrivateKey, buf) passed, buildErr := w.runBuild(task, buf)
task.Build.Finished = time.Now().UTC() task.Build.Finished = time.Now().UTC()
task.Commit.Finished = time.Now().UTC() task.Commit.Finished = time.Now().UTC()
@ -168,47 +167,20 @@ func (w *worker) execute(task *BuildTask) error {
return nil return nil
} }
type runner struct { func (w *worker) runBuild(task *BuildTask, buf io.Writer) (bool, error) {
dockerClient *docker.Client
timeout time.Duration
}
func (r *runner) Build(buildScript *script.Build, repo *r.Repo, key []byte, buildOutput io.Writer) (bool, error) {
builder := build.New(r.dockerClient)
builder.Build = buildScript
builder.Repo = repo
builder.Key = key
builder.Stdout = buildOutput
builder.Timeout = r.timeout
err := builder.Run()
return builder.BuildState == nil || builder.BuildState.ExitCode != 0, err
}
func newRunner(dockerClient *docker.Client, timeout time.Duration) *runner {
return &runner{
dockerClient: dockerClient,
timeout: timeout,
}
}
func runBuild(b *BuildTask, buf io.Writer) (bool, error) {
runner := newRunner(docker.DefaultClient, 300*time.Minute)
repo := &r.Repo{ repo := &r.Repo{
Path: b.Repo.URL, Path: task.Repo.URL,
Branch: b.Commit.Branch, Branch: task.Commit.Branch,
Commit: b.Commit.Hash, Commit: task.Commit.Hash,
PR: b.Commit.PullRequest, PR: task.Commit.PullRequest,
Dir: filepath.Join("/var/cache/drone/src", b.Repo.Slug), Dir: filepath.Join("/var/cache/drone/src", task.Repo.Slug),
Depth: git.GitDepth(b.Script.Git), Depth: git.GitDepth(task.Script.Git),
} }
return runner.Build( return w.runner.Run(
b.Script, task.Script,
repo, repo,
[]byte(b.Repo.PrivateKey), []byte(task.Repo.PrivateKey),
buf, buf,
) )
} }