diff --git a/agent/runner.go b/agent/runner.go new file mode 100644 index 000000000..89f428c28 --- /dev/null +++ b/agent/runner.go @@ -0,0 +1,363 @@ +// Copyright 2018 Drone.IO Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "context" + "encoding/json" + "io" + "io/ioutil" + "strconv" + "sync" + "time" + + "google.golang.org/grpc/metadata" + + "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline" + "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline/backend" + "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline/multipart" + "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline/rpc" + + "github.com/rs/zerolog/log" + "github.com/tevino/abool" +) + +// TODO: Implement log streaming. +// Until now we need to limit the size of the logs and files that we upload. +// The maximum grpc payload size is 4194304. So we need to set these limits below the maximum. +const ( + maxLogsUpload = 2000000 // this is per step + maxFileUpload = 1000000 +) + +type Runner struct { + client rpc.Peer + filter rpc.Filter + hostname string + counter *State + engine *backend.Engine +} + +func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backend *backend.Engine) Runner { + return Runner{ + client: workEngine, + filter: f, + hostname: h, + counter: state, + engine: backend, + } +} + +func (r *Runner) Run(ctx context.Context) error { + log.Debug(). + Msg("request next execution") + + meta, _ := metadata.FromOutgoingContext(ctx) + ctxmeta := metadata.NewOutgoingContext(context.Background(), meta) + + // get the next job from the queue + work, err := r.client.Next(ctx, r.filter) + if err != nil { + return err + } + if work == nil { + return nil + } + + timeout := time.Hour + if minutes := work.Timeout; minutes != 0 { + timeout = time.Duration(minutes) * time.Minute + } + + r.counter.Add( + work.ID, + timeout, + extractRepositoryName(work.Config), // hack + extractBuildNumber(work.Config), // hack + ) + defer r.counter.Done(work.ID) + + logger := log.With(). + Str("repo", extractRepositoryName(work.Config)). // hack + Str("build", extractBuildNumber(work.Config)). // hack + Str("id", work.ID). + Logger() + + logger.Debug(). + Msg("received execution") + + ctx, cancel := context.WithTimeout(ctxmeta, timeout) + defer cancel() + + cancelled := abool.New() + go func() { + logger.Debug(). + Msg("listen for cancel signal") + + if werr := r.client.Wait(ctx, work.ID); werr != nil { + cancelled.SetTo(true) + logger.Warn(). + Err(werr). + Msg("cancel signal received") + + cancel() + } else { + logger.Debug(). + Msg("stop listening for cancel signal") + } + }() + + go func() { + for { + select { + case <-ctx.Done(): + logger.Debug(). + Msg("pipeline done") + + return + case <-time.After(time.Minute): + logger.Debug(). + Msg("pipeline lease renewed") + + r.client.Extend(ctx, work.ID) + } + } + }() + + state := rpc.State{} + state.Started = time.Now().Unix() + + err = r.client.Init(ctxmeta, work.ID, state) + if err != nil { + logger.Error(). + Err(err). + Msg("pipeline initialization failed") + } + + var uploads sync.WaitGroup + defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error { + + loglogger := logger.With(). + Str("image", proc.Image). + Str("stage", proc.Alias). + Logger() + + part, rerr := rc.NextPart() + if rerr != nil { + return rerr + } + uploads.Add(1) + + var secrets []string + for _, secret := range work.Config.Secrets { + if secret.Mask { + secrets = append(secrets, secret.Value) + } + } + + loglogger.Debug().Msg("log stream opened") + + limitedPart := io.LimitReader(part, maxLogsUpload) + logstream := rpc.NewLineWriter(r.client, work.ID, proc.Alias, secrets...) + io.Copy(logstream, limitedPart) + + loglogger.Debug().Msg("log stream copied") + + file := &rpc.File{} + file.Mime = "application/json+logs" + file.Proc = proc.Alias + file.Name = "logs.json" + file.Data, _ = json.Marshal(logstream.Lines()) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + + loglogger.Debug(). + Msg("log stream uploading") + + if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { + loglogger.Error(). + Err(serr). + Msg("log stream upload error") + } + + loglogger.Debug(). + Msg("log stream upload complete") + + defer func() { + loglogger.Debug(). + Msg("log stream closed") + + uploads.Done() + }() + + part, rerr = rc.NextPart() + if rerr != nil { + return nil + } + // TODO should be configurable + limitedPart = io.LimitReader(part, maxFileUpload) + file = &rpc.File{} + file.Mime = part.Header().Get("Content-Type") + file.Proc = proc.Alias + file.Name = part.FileName() + file.Data, _ = ioutil.ReadAll(limitedPart) + file.Size = len(file.Data) + file.Time = time.Now().Unix() + file.Meta = map[string]string{} + + for key, value := range part.Header() { + file.Meta[key] = value[0] + } + + loglogger.Debug(). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream uploading") + + if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { + loglogger.Error(). + Err(serr). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream upload error") + } + + loglogger.Debug(). + Str("file", file.Name). + Str("mime", file.Mime). + Msg("file stream upload complete") + return nil + }) + + defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { + proclogger := logger.With(). + Str("image", state.Pipeline.Step.Image). + Str("stage", state.Pipeline.Step.Alias). + Int("exit_code", state.Process.ExitCode). + Bool("exited", state.Process.Exited). + Logger() + + procState := rpc.State{ + Proc: state.Pipeline.Step.Alias, + Exited: state.Process.Exited, + ExitCode: state.Process.ExitCode, + Started: time.Now().Unix(), // TODO do not do this + Finished: time.Now().Unix(), + } + defer func() { + proclogger.Debug(). + Msg("update step status") + + if uerr := r.client.Update(ctxmeta, work.ID, procState); uerr != nil { + proclogger.Debug(). + Err(uerr). + Msg("update step status error") + } + + proclogger.Debug(). + Msg("update step status complete") + }() + if state.Process.Exited { + return nil + } + if state.Pipeline.Step.Environment == nil { + state.Pipeline.Step.Environment = map[string]string{} + } + + state.Pipeline.Step.Environment["DRONE_MACHINE"] = r.hostname + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "success" + state.Pipeline.Step.Environment["DRONE_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["DRONE_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "success" + state.Pipeline.Step.Environment["DRONE_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) + state.Pipeline.Step.Environment["DRONE_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) + + if state.Pipeline.Error != nil { + state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" + state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "failure" + state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "failure" + } + return nil + }) + + err = pipeline.New(work.Config, + pipeline.WithContext(ctx), + pipeline.WithLogger(defaultLogger), + pipeline.WithTracer(defaultTracer), + pipeline.WithEngine(*r.engine), + ).Run() + + state.Finished = time.Now().Unix() + state.Exited = true + if err != nil { + switch xerr := err.(type) { + case *pipeline.ExitError: + state.ExitCode = xerr.Code + default: + state.ExitCode = 1 + state.Error = err.Error() + } + if cancelled.IsSet() { + state.ExitCode = 137 + } + } + + logger.Debug(). + Str("error", state.Error). + Int("exit_code", state.ExitCode). + Msg("pipeline complete") + + logger.Debug(). + Msg("uploading logs") + + uploads.Wait() + + logger.Debug(). + Msg("uploading logs complete") + + logger.Debug(). + Str("error", state.Error). + Int("exit_code", state.ExitCode). + Msg("updating pipeline status") + + err = r.client.Done(ctxmeta, work.ID, state) + if err != nil { + logger.Error().Err(err). + Msg("updating pipeline status failed") + } else { + logger.Debug(). + Msg("updating pipeline status complete") + } + + return nil +} + +// extract repository name from the configuration +func extractRepositoryName(config *backend.Config) string { + return config.Stages[0].Steps[0].Environment["DRONE_REPO"] +} + +// extract build number from the configuration +func extractBuildNumber(config *backend.Config) string { + return config.Stages[0].Steps[0].Environment["DRONE_BUILD_NUMBER"] +} diff --git a/agent/state.go b/agent/state.go new file mode 100644 index 000000000..3690e954c --- /dev/null +++ b/agent/state.go @@ -0,0 +1,80 @@ +// Copyright 2018 Drone.IO Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "encoding/json" + "io" + "sync" + "time" +) + +type State struct { + sync.Mutex `json:"-"` + Polling int `json:"polling_count"` + Running int `json:"running_count"` + Metadata map[string]Info `json:"running"` +} + +type Info struct { + ID string `json:"id"` + Repo string `json:"repository"` + Build string `json:"build_number"` + Started time.Time `json:"build_started"` + Timeout time.Duration `json:"build_timeout"` +} + +func (s *State) Add(id string, timeout time.Duration, repo, build string) { + s.Lock() + s.Polling-- + s.Running++ + s.Metadata[id] = Info{ + ID: id, + Repo: repo, + Build: build, + Timeout: timeout, + Started: time.Now().UTC(), + } + s.Unlock() +} + +func (s *State) Done(id string) { + s.Lock() + s.Polling++ + s.Running-- + delete(s.Metadata, id) + s.Unlock() +} + +func (s *State) Healthy() bool { + s.Lock() + defer s.Unlock() + now := time.Now() + buf := time.Hour // 1 hour buffer + for _, item := range s.Metadata { + if now.After(item.Started.Add(item.Timeout).Add(buf)) { + return false + } + } + return true +} + +func (s *State) WriteTo(w io.Writer) (int64, error) { + s.Lock() + out, _ := json.Marshal(s) + s.Unlock() + ret, err := w.Write(out) + return int64(ret), err +} diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 46b311d72..7d29e4590 100644 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -17,14 +17,9 @@ package main import ( "context" "crypto/tls" - "encoding/json" - "io" - "io/ioutil" "net/http" "os" - "strconv" "sync" - "time" grpccredentials "google.golang.org/grpc/credentials" @@ -32,10 +27,8 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" - "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline" - "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline/backend" + "github.com/woodpecker-ci/woodpecker/agent" "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline/backend/docker" - "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline/multipart" "github.com/woodpecker-ci/woodpecker/cncd/pipeline/pipeline/rpc" "github.com/rs/zerolog" @@ -132,12 +125,16 @@ func loop(c *cli.Context) error { if sigterm.IsSet() { return } - r := runner{ - client: client, - filter: filter, - hostname: hostname, + + // new docker engine + engine, err := docker.NewEnv() + if err != nil { + log.Error().Err(err).Msg("cannot create docker client") + return } - if err := r.run(ctx); err != nil { + + r := agent.NewRunner(client, filter, hostname, counter, &engine) + if err := r.Run(ctx); err != nil { log.Error().Err(err).Msg("pipeline done with error") return } @@ -149,322 +146,6 @@ func loop(c *cli.Context) error { return nil } -// NOTE we need to limit the size of the logs and files that we upload. -// The maximum grpc payload size is 4194304. So until we implement streaming -// for uploads, we need to set these limits below the maximum. -const ( - maxLogsUpload = 2000000 // this is per step - maxFileUpload = 1000000 -) - -type runner struct { - client rpc.Peer - filter rpc.Filter - hostname string -} - -func (r *runner) run(ctx context.Context) error { - log.Debug(). - Msg("request next execution") - - meta, _ := metadata.FromOutgoingContext(ctx) - ctxmeta := metadata.NewOutgoingContext(context.Background(), meta) - - // get the next job from the queue - work, err := r.client.Next(ctx, r.filter) - if err != nil { - return err - } - if work == nil { - return nil - } - - timeout := time.Hour - if minutes := work.Timeout; minutes != 0 { - timeout = time.Duration(minutes) * time.Minute - } - - counter.Add( - work.ID, - timeout, - extractRepositoryName(work.Config), // hack - extractBuildNumber(work.Config), // hack - ) - defer counter.Done(work.ID) - - logger := log.With(). - Str("repo", extractRepositoryName(work.Config)). // hack - Str("build", extractBuildNumber(work.Config)). // hack - Str("id", work.ID). - Logger() - - logger.Debug(). - Msg("received execution") - - // new docker engine - engine, err := docker.NewEnv() - if err != nil { - logger.Error(). - Err(err). - Msg("cannot create docker client") - - return err - } - - ctx, cancel := context.WithTimeout(ctxmeta, timeout) - defer cancel() - - cancelled := abool.New() - go func() { - logger.Debug(). - Msg("listen for cancel signal") - - if werr := r.client.Wait(ctx, work.ID); werr != nil { - cancelled.SetTo(true) - logger.Warn(). - Err(werr). - Msg("cancel signal received") - - cancel() - } else { - logger.Debug(). - Msg("stop listening for cancel signal") - } - }() - - go func() { - for { - select { - case <-ctx.Done(): - logger.Debug(). - Msg("pipeline done") - - return - case <-time.After(time.Minute): - logger.Debug(). - Msg("pipeline lease renewed") - - r.client.Extend(ctx, work.ID) - } - } - }() - - state := rpc.State{} - state.Started = time.Now().Unix() - - err = r.client.Init(ctxmeta, work.ID, state) - if err != nil { - logger.Error(). - Err(err). - Msg("pipeline initialization failed") - } - - var uploads sync.WaitGroup - defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error { - - loglogger := logger.With(). - Str("image", proc.Image). - Str("stage", proc.Alias). - Logger() - - part, rerr := rc.NextPart() - if rerr != nil { - return rerr - } - uploads.Add(1) - - var secrets []string - for _, secret := range work.Config.Secrets { - if secret.Mask { - secrets = append(secrets, secret.Value) - } - } - - loglogger.Debug().Msg("log stream opened") - - limitedPart := io.LimitReader(part, maxLogsUpload) - logstream := rpc.NewLineWriter(r.client, work.ID, proc.Alias, secrets...) - io.Copy(logstream, limitedPart) - - loglogger.Debug().Msg("log stream copied") - - file := &rpc.File{} - file.Mime = "application/json+logs" - file.Proc = proc.Alias - file.Name = "logs.json" - file.Data, _ = json.Marshal(logstream.Lines()) - file.Size = len(file.Data) - file.Time = time.Now().Unix() - - loglogger.Debug(). - Msg("log stream uploading") - - if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { - loglogger.Error(). - Err(serr). - Msg("log stream upload error") - } - - loglogger.Debug(). - Msg("log stream upload complete") - - defer func() { - loglogger.Debug(). - Msg("log stream closed") - - uploads.Done() - }() - - part, rerr = rc.NextPart() - if rerr != nil { - return nil - } - // TODO should be configurable - limitedPart = io.LimitReader(part, maxFileUpload) - file = &rpc.File{} - file.Mime = part.Header().Get("Content-Type") - file.Proc = proc.Alias - file.Name = part.FileName() - file.Data, _ = ioutil.ReadAll(limitedPart) - file.Size = len(file.Data) - file.Time = time.Now().Unix() - file.Meta = map[string]string{} - - for key, value := range part.Header() { - file.Meta[key] = value[0] - } - - loglogger.Debug(). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream uploading") - - if serr := r.client.Upload(ctxmeta, work.ID, file); serr != nil { - loglogger.Error(). - Err(serr). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream upload error") - } - - loglogger.Debug(). - Str("file", file.Name). - Str("mime", file.Mime). - Msg("file stream upload complete") - return nil - }) - - defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error { - proclogger := logger.With(). - Str("image", state.Pipeline.Step.Image). - Str("stage", state.Pipeline.Step.Alias). - Int("exit_code", state.Process.ExitCode). - Bool("exited", state.Process.Exited). - Logger() - - procState := rpc.State{ - Proc: state.Pipeline.Step.Alias, - Exited: state.Process.Exited, - ExitCode: state.Process.ExitCode, - Started: time.Now().Unix(), // TODO do not do this - Finished: time.Now().Unix(), - } - defer func() { - proclogger.Debug(). - Msg("update step status") - - if uerr := r.client.Update(ctxmeta, work.ID, procState); uerr != nil { - proclogger.Debug(). - Err(uerr). - Msg("update step status error") - } - - proclogger.Debug(). - Msg("update step status complete") - }() - if state.Process.Exited { - return nil - } - if state.Pipeline.Step.Environment == nil { - state.Pipeline.Step.Environment = map[string]string{} - } - - state.Pipeline.Step.Environment["DRONE_MACHINE"] = r.hostname - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "success" - state.Pipeline.Step.Environment["DRONE_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["DRONE_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success" - state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "success" - state.Pipeline.Step.Environment["DRONE_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10) - state.Pipeline.Step.Environment["DRONE_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10) - - if state.Pipeline.Error != nil { - state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure" - state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure" - state.Pipeline.Step.Environment["DRONE_BUILD_STATUS"] = "failure" - state.Pipeline.Step.Environment["DRONE_JOB_STATUS"] = "failure" - } - return nil - }) - - err = pipeline.New(work.Config, - pipeline.WithContext(ctx), - pipeline.WithLogger(defaultLogger), - pipeline.WithTracer(defaultTracer), - pipeline.WithEngine(engine), - ).Run() - - state.Finished = time.Now().Unix() - state.Exited = true - if err != nil { - switch xerr := err.(type) { - case *pipeline.ExitError: - state.ExitCode = xerr.Code - default: - state.ExitCode = 1 - state.Error = err.Error() - } - if cancelled.IsSet() { - state.ExitCode = 137 - } - } - - logger.Debug(). - Str("error", state.Error). - Int("exit_code", state.ExitCode). - Msg("pipeline complete") - - logger.Debug(). - Msg("uploading logs") - - uploads.Wait() - - logger.Debug(). - Msg("uploading logs complete") - - logger.Debug(). - Str("error", state.Error). - Int("exit_code", state.ExitCode). - Msg("updating pipeline status") - - err = r.client.Done(ctxmeta, work.ID, state) - if err != nil { - logger.Error().Err(err). - Msg("updating pipeline status failed") - } else { - logger.Debug(). - Msg("updating pipeline status complete") - } - - return nil -} - type credentials struct { username string password string @@ -480,13 +161,3 @@ func (c *credentials) GetRequestMetadata(oldcontext.Context, ...string) (map[str func (c *credentials) RequireTransportSecurity() bool { return false } - -// extract repository name from the configuration -func extractRepositoryName(config *backend.Config) string { - return config.Stages[0].Steps[0].Environment["DRONE_REPO"] -} - -// extract build number from the configuration -func extractBuildNumber(config *backend.Config) string { - return config.Stages[0].Steps[0].Environment["DRONE_BUILD_NUMBER"] -} diff --git a/cmd/agent/health.go b/cmd/agent/health.go index 99c641459..282d222ab 100644 --- a/cmd/agent/health.go +++ b/cmd/agent/health.go @@ -17,12 +17,10 @@ package main import ( "encoding/json" "fmt" - "io" "net/http" - "sync" - "time" "github.com/urfave/cli" + "github.com/woodpecker-ci/woodpecker/agent" "github.com/woodpecker-ci/woodpecker/version" ) @@ -60,7 +58,7 @@ func handleStats(w http.ResponseWriter, r *http.Request) { w.WriteHeader(500) } w.Header().Add("Content-Type", "text/json") - counter.writeTo(w) + counter.WriteTo(w) } type versionResp struct { @@ -69,65 +67,8 @@ type versionResp struct { } // default statistics counter -var counter = &state{ - Metadata: map[string]info{}, -} - -type state struct { - sync.Mutex `json:"-"` - Polling int `json:"polling_count"` - Running int `json:"running_count"` - Metadata map[string]info `json:"running"` -} - -type info struct { - ID string `json:"id"` - Repo string `json:"repository"` - Build string `json:"build_number"` - Started time.Time `json:"build_started"` - Timeout time.Duration `json:"build_timeout"` -} - -func (s *state) Add(id string, timeout time.Duration, repo, build string) { - s.Lock() - s.Polling-- - s.Running++ - s.Metadata[id] = info{ - ID: id, - Repo: repo, - Build: build, - Timeout: timeout, - Started: time.Now().UTC(), - } - s.Unlock() -} - -func (s *state) Done(id string) { - s.Lock() - s.Polling++ - s.Running-- - delete(s.Metadata, id) - s.Unlock() -} - -func (s *state) Healthy() bool { - s.Lock() - defer s.Unlock() - now := time.Now() - buf := time.Hour // 1 hour buffer - for _, item := range s.Metadata { - if now.After(item.Started.Add(item.Timeout).Add(buf)) { - return false - } - } - return true -} - -func (s *state) writeTo(w io.Writer) (int, error) { - s.Lock() - out, _ := json.Marshal(s) - s.Unlock() - return w.Write(out) +var counter = &agent.State{ + Metadata: map[string]agent.Info{}, } // handles pinging the endpoint and returns an error if the diff --git a/cmd/agent/health_test.go b/cmd/agent/health_test.go index fbe23593f..2fb1bb21d 100644 --- a/cmd/agent/health_test.go +++ b/cmd/agent/health_test.go @@ -17,11 +17,13 @@ package main import ( "testing" "time" + + "github.com/woodpecker-ci/woodpecker/agent" ) func TestHealthy(t *testing.T) { - s := state{} - s.Metadata = map[string]info{} + s := agent.State{} + s.Metadata = map[string]agent.Info{} s.Add("1", time.Hour, "octocat/hello-world", "42") @@ -35,7 +37,7 @@ func TestHealthy(t *testing.T) { t.Errorf("got repository name %s, want %s", got, want) } - s.Metadata["1"] = info{ + s.Metadata["1"] = agent.Info{ Timeout: time.Hour, Started: time.Now().UTC(), } @@ -43,14 +45,14 @@ func TestHealthy(t *testing.T) { t.Error("want healthy status when timeout not exceeded, got false") } - s.Metadata["1"] = info{ + s.Metadata["1"] = agent.Info{ Started: time.Now().UTC().Add(-(time.Minute * 30)), } if s.Healthy() == false { t.Error("want healthy status when timeout+buffer not exceeded, got false") } - s.Metadata["1"] = info{ + s.Metadata["1"] = agent.Info{ Started: time.Now().UTC().Add(-(time.Hour + time.Minute)), } if s.Healthy() == true {