diff --git a/agent/rpc/auth_client_grpc.go b/agent/rpc/auth_client_grpc.go index 9562f47ea..55c89a41a 100644 --- a/agent/rpc/auth_client_grpc.go +++ b/agent/rpc/auth_client_grpc.go @@ -23,6 +23,8 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" ) +const authClientTimeout = time.Second * 5 + type AuthClient struct { client proto.WoodpeckerAuthClient conn *grpc.ClientConn @@ -39,8 +41,8 @@ func NewAuthGrpcClient(conn *grpc.ClientConn, agentToken string, agentID int64) return client } -func (c *AuthClient) Auth() (string, int64, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) //nolint:mnd +func (c *AuthClient) Auth(ctx context.Context) (string, int64, error) { + ctx, cancel := context.WithTimeout(ctx, authClientTimeout) defer cancel() req := &proto.AuthRequest{ diff --git a/agent/rpc/auth_interceptor.go b/agent/rpc/auth_interceptor.go index 1283aa970..17aa2a587 100644 --- a/agent/rpc/auth_interceptor.go +++ b/agent/rpc/auth_interceptor.go @@ -30,15 +30,12 @@ type AuthInterceptor struct { } // NewAuthInterceptor returns a new auth interceptor. -func NewAuthInterceptor( - authClient *AuthClient, - refreshDuration time.Duration, -) (*AuthInterceptor, error) { +func NewAuthInterceptor(ctx context.Context, authClient *AuthClient, refreshDuration time.Duration) (*AuthInterceptor, error) { interceptor := &AuthInterceptor{ authClient: authClient, } - err := interceptor.scheduleRefreshToken(refreshDuration) + err := interceptor.scheduleRefreshToken(ctx, refreshDuration) if err != nil { return nil, err } @@ -78,21 +75,26 @@ func (interceptor *AuthInterceptor) attachToken(ctx context.Context) context.Con return metadata.AppendToOutgoingContext(ctx, "token", interceptor.accessToken) } -func (interceptor *AuthInterceptor) scheduleRefreshToken(refreshDuration time.Duration) error { - err := interceptor.refreshToken() +func (interceptor *AuthInterceptor) scheduleRefreshToken(ctx context.Context, refreshInterval time.Duration) error { + err := interceptor.refreshToken(ctx) if err != nil { return err } go func() { - wait := refreshDuration + wait := refreshInterval + for { - time.Sleep(wait) - err := interceptor.refreshToken() - if err != nil { - wait = time.Second - } else { - wait = refreshDuration + select { + case <-ctx.Done(): + return + case <-time.After(wait): + err := interceptor.refreshToken(ctx) + if err != nil { + wait = time.Second + } else { + wait = refreshInterval + } } } }() @@ -100,8 +102,8 @@ func (interceptor *AuthInterceptor) scheduleRefreshToken(refreshDuration time.Du return nil } -func (interceptor *AuthInterceptor) refreshToken() error { - accessToken, _, err := interceptor.authClient.Auth() +func (interceptor *AuthInterceptor) refreshToken(ctx context.Context) error { + accessToken, _, err := interceptor.authClient.Auth(ctx) if err != nil { return err } diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index 4c42224b4..e1dc0d8ef 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -17,7 +17,6 @@ package rpc import ( "context" "encoding/json" - "fmt" "strings" "time" @@ -90,8 +89,10 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) case codes.Canceled: if ctx.Err() != nil { // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: next(): context canceled") return nil, nil } + log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) return nil, err case codes.Aborted, @@ -105,10 +106,11 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) // https://github.com/woodpecker-ci/woodpecker/issues/717#issuecomment-1049365104 log.Trace().Err(err).Msg("grpc: to many keepalive pings without sending data") } else { - log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) + log.Warn().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) } default: - return nil, fmt.Errorf("grpc error: next(): code: %v: %w", status.Code(err), err) + log.Error().Err(err).Msgf("grpc error: next(): code: %v", status.Code(err)) + return nil, err } select { @@ -143,9 +145,15 @@ func (c *client) Wait(ctx context.Context, id string) (err error) { break } - log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) - switch status.Code(err) { + case codes.Canceled: + if ctx.Err() != nil { + // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: wait(): context canceled") + return nil + } + log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) + return err case codes.Aborted, codes.DataLoss, @@ -153,7 +161,9 @@ func (c *client) Wait(ctx context.Context, id string) (err error) { codes.Internal, codes.Unavailable: // non-fatal errors + log.Warn().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) default: + log.Error().Err(err).Msgf("grpc error: wait(): code: %v", status.Code(err)) return err } @@ -184,6 +194,14 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) switch status.Code(err) { + case codes.Canceled: + if ctx.Err() != nil { + // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: init(): context canceled") + return nil + } + log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) + return err case codes.Aborted, codes.DataLoss, @@ -191,7 +209,9 @@ func (c *client) Init(ctx context.Context, workflowID string, state rpc.Workflow codes.Internal, codes.Unavailable: // non-fatal errors + log.Warn().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) default: + log.Error().Err(err).Msgf("grpc error: init(): code: %v", status.Code(err)) return err } @@ -222,6 +242,14 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) switch status.Code(err) { + case codes.Canceled: + if ctx.Err() != nil { + // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: done(): context canceled") + return nil + } + log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) + return err case codes.Aborted, codes.DataLoss, @@ -229,7 +257,9 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow codes.Internal, codes.Unavailable: // non-fatal errors + log.Warn().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) default: + log.Error().Err(err).Msgf("grpc error: done(): code: %v", status.Code(err)) return err } @@ -256,6 +286,14 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) switch status.Code(err) { + case codes.Canceled: + if ctx.Err() != nil { + // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: extend(): context canceled") + return nil + } + log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) + return err case codes.Aborted, codes.DataLoss, @@ -263,7 +301,9 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { codes.Internal, codes.Unavailable: // non-fatal errors + log.Warn().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) default: + log.Error().Err(err).Msgf("grpc error: extend(): code: %v", status.Code(err)) return err } @@ -297,6 +337,14 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) switch status.Code(err) { + case codes.Canceled: + if ctx.Err() != nil { + // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: update(): context canceled") + return nil + } + log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) + return err case codes.Aborted, codes.DataLoss, @@ -304,7 +352,9 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er codes.Internal, codes.Unavailable: // non-fatal errors + log.Warn().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) default: + log.Error().Err(err).Msgf("grpc error: update(): code: %v", status.Code(err)) return err } @@ -333,9 +383,15 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { break } - log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) - switch status.Code(err) { + case codes.Canceled: + if ctx.Err() != nil { + // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: log(): context canceled") + return nil + } + log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) + return err case codes.Aborted, codes.DataLoss, @@ -343,7 +399,9 @@ func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { codes.Internal, codes.Unavailable: // non-fatal errors + log.Warn().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) default: + log.Error().Err(err).Msgf("grpc error: log(): code: %v", status.Code(err)) return err } @@ -383,6 +441,14 @@ func (c *client) ReportHealth(ctx context.Context) (err error) { return nil } switch status.Code(err) { + case codes.Canceled: + if ctx.Err() != nil { + // expected as context was canceled + log.Debug().Err(err).Msgf("grpc error: report_health(): context canceled") + return nil + } + log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) + return err case codes.Aborted, codes.DataLoss, @@ -390,7 +456,9 @@ func (c *client) ReportHealth(ctx context.Context) (err error) { codes.Internal, codes.Unavailable: // non-fatal errors + log.Warn().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) default: + log.Error().Err(err).Msgf("grpc error: report_health(): code: %v", status.Code(err)) return err } diff --git a/agent/runner.go b/agent/runner.go index 5cf8be64d..ea3016e36 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -49,7 +49,7 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen } } -func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck +func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:contextcheck log.Debug().Msg("request next execution") meta, _ := metadata.FromOutgoingContext(runnerCtx) @@ -178,7 +178,11 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck Str("error", state.Error). Msg("updating workflow status") - if err := r.client.Done(runnerCtx, workflow.ID, state); err != nil { + doneCtx := runnerCtx + if doneCtx.Err() != nil { + doneCtx = shutdownCtx + } + if err := r.client.Done(doneCtx, workflow.ID, state); err != nil { logger.Error().Err(err).Msg("updating workflow status failed") } else { logger.Debug().Msg("updating workflow status complete") diff --git a/cmd/agent/core/agent.go b/cmd/agent/core/agent.go index d4c37fc4b..b4da071a2 100644 --- a/cmd/agent/core/agent.go +++ b/cmd/agent/core/agent.go @@ -23,12 +23,12 @@ import ( "net/http" "os" "strings" - "sync" + "sync/atomic" "time" "github.com/rs/zerolog/log" - "github.com/tevino/abool/v2" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" grpc_credentials "google.golang.org/grpc/credentials" @@ -47,7 +47,43 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/version" ) +const ( + reportHealthInterval = time.Second * 10 + authInterceptorRefreshInterval = time.Minute * 30 +) + +const ( + shutdownTimeout = time.Second * 5 +) + +var ( + stopAgentFunc context.CancelCauseFunc = func(error) {} + shutdownCancelFunc context.CancelFunc = func() {} + shutdownCtx = context.Background() +) + func run(c *cli.Context, backends []types.Backend) error { + ctx := utils.WithContextSigtermCallback(c.Context, func() { + log.Info().Msg("termination signal is received, shutting down agent") + }) + + agentCtx, ctxCancel := context.WithCancelCause(ctx) + stopAgentFunc = func(err error) { + msg := "shutdown of whole agent" + if err != nil { + log.Error().Err(err).Msg(msg) + } else { + log.Info().Msg(msg) + } + stopAgentFunc = func(error) {} + shutdownCtx, shutdownCancelFunc = context.WithTimeout(shutdownCtx, shutdownTimeout) + ctxCancel(err) + } + defer stopAgentFunc(nil) + defer shutdownCancelFunc() + + serviceWaitingGroup := errgroup.Group{} + agentConfigPath := c.String("agent-config") hostname := c.String("hostname") if len(hostname) == 0 { @@ -58,11 +94,23 @@ func run(c *cli.Context, backends []types.Backend) error { counter.Running = 0 if c.Bool("healthcheck") { - go func() { - if err := http.ListenAndServe(c.String("healthcheck-addr"), nil); err != nil { - log.Error().Err(err).Msgf("cannot listen on address %s", c.String("healthcheck-addr")) - } - }() + serviceWaitingGroup.Go( + func() error { + server := &http.Server{Addr: c.String("healthcheck-addr")} + go func() { + <-agentCtx.Done() + log.Info().Msg("shutdown healthcheck server ...") + if err := server.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck + log.Error().Err(err).Msg("shutdown healthcheck server failed") + } else { + log.Info().Msg("healthcheck server stopped") + } + }() + if err := server.ListenAndServe(); err != nil { + log.Error().Err(err).Msgf("cannot listen on address %s", c.String("healthcheck-addr")) + } + return nil + }) } var transport grpc.DialOption @@ -88,8 +136,10 @@ func run(c *cli.Context, backends []types.Backend) error { agentConfig := readAgentConfig(agentConfigPath) agentToken := c.String("grpc-token") + grpcClientCtx, grpcClientCtxCancel := context.WithCancelCause(context.Background()) + defer grpcClientCtxCancel(nil) authClient := agent_rpc.NewAuthGrpcClient(authConn, agentToken, agentConfig.AgentID) - authInterceptor, err := agent_rpc.NewAuthInterceptor(authClient, 30*time.Minute) //nolint:mnd + authInterceptor, err := agent_rpc.NewAuthInterceptor(grpcClientCtx, authClient, authInterceptorRefreshInterval) if err != nil { return err } @@ -110,30 +160,12 @@ func run(c *cli.Context, backends []types.Backend) error { defer conn.Close() client := agent_rpc.NewGrpcClient(conn) + agentConfigPersisted := atomic.Bool{} - sigterm := abool.New() - ctx := metadata.NewOutgoingContext( - context.Background(), - metadata.Pairs("hostname", hostname), - ) - - agentConfigPersisted := abool.New() - ctx = utils.WithContextSigtermCallback(ctx, func() { - log.Info().Msg("termination signal is received, shutting down") - sigterm.Set() - - // Remove stateless agents from server - if agentConfigPersisted.IsNotSet() { - log.Debug().Msg("unregistering agent from server") - err := client.UnregisterAgent(ctx) - if err != nil { - log.Err(err).Msg("failed to unregister agent from server") - } - } - }) + grpcCtx := metadata.NewOutgoingContext(grpcClientCtx, metadata.Pairs("hostname", hostname)) // check if grpc server version is compatible with agent - grpcServerVersion, err := client.Version(ctx) + grpcServerVersion, err := client.Version(grpcCtx) if err != nil { log.Error().Err(err).Msg("could not get grpc server version") return err @@ -147,12 +179,8 @@ func run(c *cli.Context, backends []types.Backend) error { return err } - var wg sync.WaitGroup - parallel := c.Int("max-workflows") - wg.Add(parallel) - // new engine - backendCtx := context.WithValue(ctx, types.CliContext, c) + backendCtx := context.WithValue(agentCtx, types.CliContext, c) backendName := c.String("backend-engine") backendEngine, err := backend.FindBackend(backendCtx, backends, backendName) if err != nil { @@ -172,14 +200,34 @@ func run(c *cli.Context, backends []types.Backend) error { } log.Debug().Msgf("loaded %s backend engine", backendEngine.Name()) - agentConfig.AgentID, err = client.RegisterAgent(ctx, engInfo.Platform, backendEngine.Name(), version.String(), parallel) + maxWorkflows := c.Int("max-workflows") + agentConfig.AgentID, err = client.RegisterAgent(grpcCtx, engInfo.Platform, backendEngine.Name(), version.String(), maxWorkflows) if err != nil { return err } + serviceWaitingGroup.Go(func() error { + // we close grpc client context once unregister was handled + defer grpcClientCtxCancel(nil) + // we wait till agent context is done + <-agentCtx.Done() + // Remove stateless agents from server + if !agentConfigPersisted.Load() { + log.Debug().Msg("unregistering agent from server ...") + // we want to run it explicit run when context got canceled so run it in background + err := client.UnregisterAgent(grpcClientCtx) + if err != nil { + log.Err(err).Msg("failed to unregister agent from server") + } else { + log.Info().Msg("agent unregistered from server") + } + } + return nil + }) + if agentConfigPath != "" { if err := writeAgentConfig(agentConfig, agentConfigPath); err == nil { - agentConfigPersisted.Set() + agentConfigPersisted.Store(true) } } @@ -200,66 +248,62 @@ func run(c *cli.Context, backends []types.Backend) error { log.Debug().Msgf("agent registered with ID %d", agentConfig.AgentID) - go func() { + serviceWaitingGroup.Go(func() error { for { - if sigterm.IsSet() { - log.Debug().Msg("terminating health reporting") - return - } - - err := client.ReportHealth(ctx) + err := client.ReportHealth(grpcCtx) if err != nil { log.Err(err).Msg("failed to report health") } - <-time.After(time.Second * 10) + select { + case <-agentCtx.Done(): + log.Debug().Msg("terminating health reporting") + return nil + case <-time.After(reportHealthInterval): + } } - }() + }) - for i := 0; i < parallel; i++ { + for i := 0; i < maxWorkflows; i++ { i := i - go func() { - defer wg.Done() - - r := agent.NewRunner(client, filter, hostname, counter, &backendEngine) + serviceWaitingGroup.Go(func() error { + runner := agent.NewRunner(client, filter, hostname, counter, &backendEngine) log.Debug().Msgf("created new runner %d", i) for { - if sigterm.IsSet() { - log.Debug().Msgf("terminating runner %d", i) - return + if agentCtx.Err() != nil { + return nil } log.Debug().Msg("polling new steps") - if err := r.Run(ctx); err != nil { - log.Error().Err(err).Msg("pipeline done with error") - return + if err := runner.Run(agentCtx, shutdownCtx); err != nil { + log.Error().Err(err).Msg("runner done with error") + return err } } - }() + }) } log.Info().Msgf( "starting Woodpecker agent with version '%s' and backend '%s' using platform '%s' running up to %d pipelines in parallel", - version.String(), backendEngine.Name(), engInfo.Platform, parallel) + version.String(), backendEngine.Name(), engInfo.Platform, maxWorkflows) - wg.Wait() - return nil + return serviceWaitingGroup.Wait() } -func runWithRetry(backendEngines []types.Backend) func(context *cli.Context) error { - return func(context *cli.Context) error { - if err := logger.SetupGlobalLogger(context, true); err != nil { +func runWithRetry(backendEngines []types.Backend) func(c *cli.Context) error { + return func(c *cli.Context) error { + if err := logger.SetupGlobalLogger(c, true); err != nil { return err } initHealth() - retryCount := context.Int("connect-retry-count") - retryDelay := context.Duration("connect-retry-delay") + retryCount := c.Int("connect-retry-count") + retryDelay := c.Duration("connect-retry-delay") var err error for i := 0; i < retryCount; i++ { - if err = run(context, backendEngines); status.Code(err) == codes.Unavailable { + if err = run(c, backendEngines); status.Code(err) == codes.Unavailable { log.Warn().Err(err).Msg(fmt.Sprintf("cannot connect to server, retrying in %v", retryDelay)) time.Sleep(retryDelay) } else { diff --git a/cmd/agent/core/health.go b/cmd/agent/core/health.go index 263bd3269..fddf688c7 100644 --- a/cmd/agent/core/health.go +++ b/cmd/agent/core/health.go @@ -82,12 +82,18 @@ var counter = &agent.State{ // handles pinging the endpoint and returns an error if the // agent is in an unhealthy state. func pinger(c *cli.Context) error { + ctx := c.Context healthcheckAddress := c.String("healthcheck-addr") if strings.HasPrefix(healthcheckAddress, ":") { // this seems sufficient according to https://pkg.go.dev/net#Dial healthcheckAddress = "localhost" + healthcheckAddress } - resp, err := http.Get("http://" + healthcheckAddress + "/healthz") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://"+healthcheckAddress+"/healthz", nil) + if err != nil { + return err + } + resp, err := http.DefaultClient.Do(req) if err != nil { return err } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 1c23061da..6e9d9f181 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -105,7 +105,11 @@ func (r *Runtime) Run(runnerCtx context.Context) error { } defer func() { - if err := r.engine.DestroyWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil { + ctx := runnerCtx //nolint:contextcheck + if ctx.Err() != nil { + ctx = GetShutdownCtx() + } + if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil { logger.Error().Err(err).Msg("could not destroy engine") } }() diff --git a/pipeline/shutdown.go b/pipeline/shutdown.go new file mode 100644 index 000000000..b9fd98384 --- /dev/null +++ b/pipeline/shutdown.go @@ -0,0 +1,48 @@ +// Copyright 2024 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "sync" + "time" +) + +const shutdownTimeout = time.Second * 5 + +var ( + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + shutdownCtxLock sync.Mutex +) + +func GetShutdownCtx() context.Context { + shutdownCtxLock.Lock() + defer shutdownCtxLock.Unlock() + if shutdownCtx == nil { + shutdownCtx, shutdownCtxCancel = context.WithTimeout(context.Background(), shutdownTimeout) + } + return shutdownCtx +} + +func CancelShutdown() { + shutdownCtxLock.Lock() + defer shutdownCtxLock.Unlock() + if shutdownCtxCancel == nil { + // we create an canceled context + shutdownCtx, shutdownCtxCancel = context.WithCancel(context.Background()) //nolint:forbidigo + } + shutdownCtxCancel() +}