remote agent can now pull builds and push results

This commit is contained in:
Brad Rydzewski 2015-05-18 20:44:44 -07:00
parent ef1e09f073
commit 7aedd78015
15 changed files with 105 additions and 244 deletions

View file

@ -7,6 +7,7 @@ import (
"os"
"time"
log "github.com/Sirupsen/logrus"
"github.com/drone/drone/pkg/queue"
runner "github.com/drone/drone/pkg/runner/builtin"
@ -47,11 +48,19 @@ func main() {
for {
w, err := pull()
if err != nil {
log.Errorln(err)
time.Sleep(30 * time.Second)
continue
}
log.Infof("Pulled and running build %s / %d",
w.Repo.FullName, w.Commit.Sequence)
runner_ := runner.Runner{&updater{}}
runner_.Run(w)
err = runner_.Run(w)
if err != nil {
log.Errorln(err)
}
}
}()

View file

@ -11,20 +11,20 @@ import (
"strconv"
"time"
//logs "github.com/Sirupsen/logrus"
logs "github.com/Sirupsen/logrus"
common "github.com/drone/drone/pkg/types"
)
type updater struct{}
func (u *updater) SetCommit(user *common.User, r *common.Repo, c *common.Commit) error {
path := fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence)
path := fmt.Sprintf("/api/queue/push/%s", r.FullName)
return sendBackoff("POST", path, c, nil)
}
func (u *updater) SetBuild(r *common.Repo, c *common.Commit, b *common.Build) error {
path := fmt.Sprintf("/api/queue/push/%s", r.FullName)
return sendBackoff("POST", path, c, nil)
path := fmt.Sprintf("/api/queue/push/%s/%v", r.FullName, c.Sequence)
return sendBackoff("POST", path, b, nil)
}
func (u *updater) SetLogs(r *common.Repo, c *common.Commit, b *common.Build, rc io.ReadCloser) error {
@ -40,7 +40,7 @@ func sendBackoff(method, path string, in, out interface{}) error {
if err == nil {
break
}
if attempts > 30 {
if attempts > 99 {
break
}
attempts++
@ -99,19 +99,21 @@ func send(method, path string, in, out interface{}) error {
// make the request using the default http client
resp, err := http.DefaultClient.Do(req)
if err != nil {
logs.Errorf("Error posting request. %s", err)
return err
}
defer resp.Body.Close()
// Check for an http error status (ie not 200 StatusOK)
if resp.StatusCode > 300 {
logs.Errorf("Error status code %d", resp.StatusCode)
return fmt.Errorf(resp.Status)
}
// Decode the JSON response
if out != nil {
return json.NewDecoder(resp.Body).Decode(out)
err = json.NewDecoder(resp.Body).Decode(out)
}
return nil
return err
}

View file

@ -13,6 +13,7 @@ import (
"github.com/drone/drone/pkg/settings"
"github.com/elazarl/go-bindata-assetfs"
log "github.com/Sirupsen/logrus"
eventbus "github.com/drone/drone/pkg/bus/builtin"
queue "github.com/drone/drone/pkg/queue/builtin"
runner "github.com/drone/drone/pkg/runner/builtin"
@ -48,7 +49,15 @@ func main() {
queue_ := queue.New()
updater := runner.NewUpdater(eventbus_, store, remote)
runner_ := runner.Runner{Updater: updater}
go run(&runner_, queue_)
// launch the local queue runner if the system
// is not conifugred to run in agent mode
if settings.Agents != nil && settings.Agents.Secret != "" {
log.Infof("Run builds using remote build agents")
} else {
log.Infof("Run builds using the embedded build runner")
go run(&runner_, queue_)
}
r := gin.Default()
@ -86,14 +95,6 @@ func main() {
users.DELETE("/:name", server.DeleteUser)
}
agents := api.Group("/agents")
{
agents.Use(server.MustAdmin())
agents.GET("", server.GetAgents)
agents.POST("", server.PostAgent)
agents.DELETE("/:id", server.DeleteAgent)
}
repos := api.Group("/repos/:owner/:name")
{
repos.POST("", server.PostRepo)
@ -136,6 +137,7 @@ func main() {
queue := api.Group("/queue")
{
queue.Use(server.MustAgent())
queue.Use(server.SetSettings(settings))
queue.Use(server.SetUpdater(updater))
queue.POST("/pull", server.PollBuild)

View file

@ -1,94 +0,0 @@
package server
import (
"strconv"
"github.com/drone/drone/pkg/types"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
common "github.com/drone/drone/pkg/types"
)
// GetAgents accepts a request to retrieve all build
// agents from the datastore and return encoded in JSON
// format.
//
// GET /api/agents
//
func GetAgents(c *gin.Context) {
store := ToDatastore(c)
agents, err := store.AgentList()
if err != nil {
c.Fail(400, err)
} else {
c.JSON(200, agents)
}
}
// PostAgent accepts a request to register a new build
// agent with the system. The registered agent is returned
// from the datastore and return encoded in JSON format.
//
// POST /api/agents
//
func PostAgent(c *gin.Context) {
store := ToDatastore(c)
in := &common.Agent{}
if !c.BindWith(in, binding.JSON) {
return
}
// attept to fetch the agent from the
// datastore. If the agent already exists we
// should re-activate
agent, err := store.AgentAddr(in.Addr)
if err != nil {
agent = &common.Agent{}
agent.Addr = in.Addr
agent.Token = types.GenerateToken()
agent.Active = true
agent.IsHealthy = true
err = store.AddAgent(agent)
if err != nil {
c.Fail(400, err)
} else {
c.JSON(200, agent)
}
return
}
agent.Active = true
err = store.SetAgent(agent)
if err != nil {
c.Fail(400, err)
} else {
c.JSON(200, agent)
}
}
// DeleteAgent accepts a request to delete a build agent
// from the system.
//
// DELETE /api/agents/:id
//
func DeleteAgent(c *gin.Context) {
store := ToDatastore(c)
idstr := c.Params.ByName("id")
id, _ := strconv.Atoi(idstr)
agent, err := store.Agent(int64(id))
if err != nil {
c.Fail(404, err)
return
}
agent.Active = false
err = store.SetAgent(agent)
if err != nil {
c.Fail(400, err)
return
}
c.Writer.WriteHeader(200)
}

View file

@ -1,6 +1,7 @@
package server
import (
"net"
"strconv"
"github.com/gin-gonic/gin"
@ -14,9 +15,17 @@ import (
func PollBuild(c *gin.Context) {
queue := ToQueue(c)
store := ToDatastore(c)
agent := ToAgent(c)
log.Infof("agent connected and polling builds at %s", agent.Addr)
// extract the IP address from the agent that is
// polling for builds.
host := c.Request.RemoteAddr
addr, _, err := net.SplitHostPort(host)
if err != nil {
addr = host
}
addr = net.JoinHostPort(addr, "1999")
log.Infof("agent connected and polling builds at %s", addr)
// pull an item from the queue
work := queue.PullClose(c.Writer)
@ -25,14 +34,14 @@ func PollBuild(c *gin.Context) {
return
}
// store the agent details with the commit
work.Commit.AgentID = agent.ID
err := store.SetCommit(work.Commit)
// persist the relationship between agent and commit.
err = store.SetAgent(work.Commit, addr)
if err != nil {
log.Errorf("unable to associate agent with commit. %s", err)
// IMPORTANT: this should never happen, and even if it does
// it is an error scenario that will only impact live streaming
// so we choose it log and ignore.
// note the we are ignoring and just logging the error here.
// we consider this an acceptible failure because it doesn't
// impact anything other than live-streaming output.
log.Errorf("unable to store the agent address %s for build %s %v",
addr, work.Repo.FullName, work.Commit.Sequence)
}
c.JSON(200, work)

View file

@ -130,14 +130,6 @@ func ToRepo(c *gin.Context) *common.Repo {
return v.(*common.Repo)
}
func ToAgent(c *gin.Context) *common.Agent {
v, ok := c.Get("agent")
if !ok {
return nil
}
return v.(*common.Agent)
}
func ToDatastore(c *gin.Context) store.Store {
return c.MustGet("datastore").(store.Store)
}
@ -254,22 +246,19 @@ func MustAdmin() gin.HandlerFunc {
func MustAgent() gin.HandlerFunc {
return func(c *gin.Context) {
store := ToDatastore(c)
conf := ToSettings(c)
// verify remote agents are enabled
if conf.Agents == nil || len(conf.Agents.Secret) == 0 {
c.AbortWithStatus(405)
return
}
// verify the agent token matches
token := c.Request.FormValue("token")
if len(token) == 0 {
if token != conf.Agents.Secret {
c.AbortWithStatus(401)
return
}
agent, err := store.AgentToken(token)
if err != nil {
c.Fail(401, err)
return
}
if agent.Active == false {
c.AbortWithStatus(403)
return
}
c.Set("agent", agent)
c.Next()
}
}

View file

@ -71,21 +71,12 @@ func GetStream(c *gin.Context) {
var rc io.ReadCloser
// if no agent is assigned to the build we
// should stream the local logs
if commit.AgentID == 0 {
rc, err = runner.Logs(build)
if err != nil {
c.Fail(404, err)
return
}
} else {
agent, err := store.Agent(commit.AgentID)
if err != nil {
c.Fail(404, err)
return
}
resp, err := http.Get("http://" + agent.Addr)
addr, err := store.Agent(commit)
// if the commit is being executed by an agent
// we'll proxy the build output directly to the
// remote Docker client, through the agent.
if err == nil {
resp, err := http.Get("http://" + addr)
if err != nil {
c.Fail(500, err)
return
@ -95,6 +86,15 @@ func GetStream(c *gin.Context) {
return
}
rc = resp.Body
} else {
// else if the commit is not being executed
// by the build agent we can use the local runner
rc, err = runner.Logs(build)
if err != nil {
c.Fail(404, err)
return
}
}
defer func() {

View file

@ -91,6 +91,10 @@ type Database struct {
Datasource string `toml:"datasource"`
}
type Agents struct {
Secret string `toml:"secret"`
}
// Settings defines global settings for the Drone system.
type Settings struct {
Database *Database `toml:"database"`
@ -98,6 +102,7 @@ type Settings struct {
Service *Service `toml:"service"`
Server *Server `toml:"server"`
Session *Session `toml:"session"`
Agents *Agents `toml:"agents"`
Plugins map[string]interface{} `toml:"plugins"`
}

View file

@ -16,62 +16,38 @@ func NewAgentstore(db *sql.DB) *Agentstore {
}
// Agent returns an agent by ID.
func (db *Agentstore) Agent(id int64) (*common.Agent, error) {
func (db *Agentstore) Agent(commit *common.Commit) (string, error) {
var agent = new(common.Agent)
var err = meddler.Load(db, agentTable, agent, id)
return agent, err
}
// AgentAddr returns an agent by address.
func (db *Agentstore) AgentAddr(addr string) (*common.Agent, error) {
var agent = new(common.Agent)
var err = meddler.QueryRow(db, agent, rebind(agentAddrQuery), addr)
return agent, err
}
// AgentToken returns an agent by token.
func (db *Agentstore) AgentToken(token string) (*common.Agent, error) {
var agent = new(common.Agent)
var err = meddler.QueryRow(db, agent, rebind(agentTokenQuery), token)
return agent, err
}
// AgentList returns a list of all build agents.
func (db *Agentstore) AgentList() ([]*common.Agent, error) {
var agents []*common.Agent
var err = meddler.QueryAll(db, &agents, rebind(agentListQuery), true)
return agents, err
}
// AddAgent inserts an agent in the datastore.
func (db *Agentstore) AddAgent(agent *common.Agent) error {
return meddler.Insert(db, agentTable, agent)
var err = meddler.QueryRow(db, agent, rebind(agentQuery), commit.ID)
return agent.Addr, err
}
// SetAgent updates an agent in the datastore.
func (db *Agentstore) SetAgent(agent *common.Agent) error {
return meddler.Update(db, agentTable, agent)
func (db *Agentstore) SetAgent(commit *common.Commit, addr string) error {
agent := &agent{}
agent.Addr = addr
agent.CommitID = commit.ID
db.Exec(rebind(deleteAgentQuery), commit.ID)
return meddler.Insert(db, agentTable, agent)
}
// Agent table name in database.
type agent struct {
ID int64 `meddler:"agent_id,pk"`
Addr string `meddler:"agent_addr"`
CommitID int64 `meddler:"commit_id"`
}
// Build table name in database.
const agentTable = "agents"
const agentTokenQuery = `
const agentQuery = `
SELECT *
FROM agents
WHERE agent_token = ?
WHERE commit_id = ?
LIMIT 1;
`
const agentAddrQuery = `
SELECT *
FROM agents
WHERE agent_addr = ?
LIMIT 1;
`
const agentListQuery = `
SELECT *
FROM agents
WHERE agent_active = ?;
const deleteAgentQuery = `
DELETE FROM agents
WHERE commit_id = ?;
`

View file

@ -125,7 +125,6 @@ var commitTable = `
CREATE TABLE IF NOT EXISTS commits (
commit_id INTEGER PRIMARY KEY AUTOINCREMENT
,repo_id INTEGER
,agent_id INTEGER
,commit_seq INTEGER
,commit_state VARCHAR(255)
,commit_started INTEGER
@ -152,10 +151,6 @@ var commitRepoIndex = `
CREATE INDEX commits_repo_idx ON commits (repo_id);
`
var agentRepoIndex = `
CREATE INDEX commits_agent_idx ON commits (agent_id);
`
var tokenTable = `
CREATE TABLE IF NOT EXISTS tokens (
token_id INTEGER PRIMARY KEY AUTOINCREMENT
@ -222,13 +217,8 @@ CREATE TABLE IF NOT EXISTS blobs (
var agentTable = `
CREATE TABLE IF NOT EXISTS agents (
agent_id INTEGER PRIMARY KEY AUTOINCREMENT
,agent_kind VARCHAR(255)
,commit_id INTEGER
,agent_addr VARCHAR(2000)
,agent_token VARCHAR(2000)
,agent_active BOOL
,agent_cert BLOB
,agent_key BLOB
,UNIQUE(agent_addr)
,UNIQUE(agent_token)
,UNIQUE(commit_id)
);
`

View file

@ -152,20 +152,8 @@ type Store interface {
//
// Agent returns an agent by ID.
Agent(int64) (*common.Agent, error)
// AgentAddr returns an agent by address.
AgentAddr(string) (*common.Agent, error)
// AgentToken returns an agent by token.
AgentToken(string) (*common.Agent, error)
// AgentList returns a list of all build agents.
AgentList() ([]*common.Agent, error)
// AddAgent inserts an agent in the datastore.
AddAgent(*common.Agent) error
Agent(*common.Commit) (string, error)
// SetAgent updates an agent in the datastore.
SetAgent(*common.Agent) error
SetAgent(*common.Commit, string) error
}

View file

@ -1,7 +1,7 @@
package types
type Build struct {
ID int64 `meddler:"build_id,pk" json:"-"`
ID int64 `meddler:"build_id,pk" json:"id"`
CommitID int64 `meddler:"commit_id" json:"-"`
State string `meddler:"build_state" json:"state"`
ExitCode int `meddler:"build_exit" json:"exit_code"`

View file

@ -10,9 +10,8 @@ const (
)
type Commit struct {
ID int64 `meddler:"commit_id,pk" json:"-"`
ID int64 `meddler:"commit_id,pk" json:"id"`
RepoID int64 `meddler:"repo_id" json:"-"`
AgentID int64 `meddler:"agent_id" json:"-"`
Sequence int `meddler:"commit_seq" json:"sequence"`
State string `meddler:"commit_state" json:"state"`
Started int64 `meddler:"commit_started" json:"started_at"`

View file

@ -1,14 +0,0 @@
package types
type Task struct {
Number int `json:"number"`
State string `json:"state"`
ExitCode int `json:"exit_code"`
Duration int64 `json:"duration"`
Started int64 `json:"started_at"`
Finished int64 `json:"finished_at"`
// Environment represents the build environment
// combination from the matrix.
Environment map[string]string `json:"environment,omitempty"`
}

View file

@ -1,7 +1,7 @@
package types
type User struct {
ID int64 `meddler:"user_id,pk" json:"-"`
ID int64 `meddler:"user_id,pk" json:"id"`
Login string `meddler:"user_login" json:"login,omitempty"`
Token string `meddler:"user_token" json:"-"`
Secret string `meddler:"user_secret" json:"-"`