From 757f5a58e2d72bb84361bfb4268c917a8dcacd72 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Sat, 13 Jul 2024 16:46:01 -0700 Subject: [PATCH] Gracefully shutdown server (#3896) --- cmd/server/grpc_server.go | 88 ++++++++ cmd/server/main.go | 2 +- cmd/server/metrics_server.go | 108 +++++++++ cmd/server/server.go | 211 ++++++++++++------ cmd/server/setup.go | 84 ++----- server/cron/cron.go | 4 +- server/queue/persistent.go | 4 +- server/store/datastore/engine.go | 6 +- .../migration/020_alter_logs_table.go | 7 +- server/store/datastore/migration/migration.go | 4 +- .../datastore/migration/migration_test.go | 5 +- server/store/mocks/store.go | 12 +- server/store/store.go | 4 +- 13 files changed, 375 insertions(+), 164 deletions(-) create mode 100644 cmd/server/grpc_server.go create mode 100644 cmd/server/metrics_server.go diff --git a/cmd/server/grpc_server.go b/cmd/server/grpc_server.go new file mode 100644 index 000000000..e573d8381 --- /dev/null +++ b/cmd/server/grpc_server.go @@ -0,0 +1,88 @@ +// Copyright 2024 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "net" + + "github.com/rs/zerolog/log" + "github.com/urfave/cli/v2" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + + "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" + "go.woodpecker-ci.org/woodpecker/v2/server" + woodpeckerGrpcServer "go.woodpecker-ci.org/woodpecker/v2/server/grpc" + "go.woodpecker-ci.org/woodpecker/v2/server/store" +) + +func runGrpcServer(ctx context.Context, c *cli.Context, _store store.Store) error { + lis, err := net.Listen("tcp", c.String("grpc-addr")) + if err != nil { + log.Fatal().Err(err).Msg("failed to listen on grpc-addr") //nolint:forbidigo + } + + jwtSecret := c.String("grpc-secret") + jwtManager := woodpeckerGrpcServer.NewJWTManager(jwtSecret) + + authorizer := woodpeckerGrpcServer.NewAuthorizer(jwtManager) + grpcServer := grpc.NewServer( + grpc.StreamInterceptor(authorizer.StreamInterceptor), + grpc.UnaryInterceptor(authorizer.UnaryInterceptor), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: c.Duration("keepalive-min-time"), + }), + ) + + woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer( + server.Config.Services.Queue, + server.Config.Services.Logs, + server.Config.Services.Pubsub, + _store, + ) + proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer) + + woodpeckerAuthServer := woodpeckerGrpcServer.NewWoodpeckerAuthServer( + jwtManager, + server.Config.Server.AgentToken, + _store, + ) + proto.RegisterWoodpeckerAuthServer(grpcServer, woodpeckerAuthServer) + + grpcCtx, cancel := context.WithCancelCause(ctx) + defer cancel(nil) + + go func() { + <-grpcCtx.Done() + if grpcServer == nil { + return + } + log.Info().Msg("terminating grpc service gracefully") + grpcServer.GracefulStop() + log.Info().Msg("grpc service stopped") + }() + + if err := grpcServer.Serve(lis); err != nil { + // signal that we don't have to stop the server gracefully anymore + grpcServer = nil + + // wrap the error so we know where it did come from + return fmt.Errorf("grpc server failed: %w", err) + } + + return nil +} diff --git a/cmd/server/main.go b/cmd/server/main.go index da7f986b4..f8050418a 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -43,6 +43,6 @@ func main() { setupSwaggerStaticConfig() if err := app.Run(os.Args); err != nil { - log.Fatal().Err(err).Msgf("error running server") //nolint:forbidigo + log.Error().Err(err).Msgf("error running server") } } diff --git a/cmd/server/metrics_server.go b/cmd/server/metrics_server.go new file mode 100644 index 000000000..007d2e48d --- /dev/null +++ b/cmd/server/metrics_server.go @@ -0,0 +1,108 @@ +// Copyright 2024 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "errors" + "time" + + "github.com/prometheus/client_golang/prometheus" + prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/rs/zerolog/log" + + "go.woodpecker-ci.org/woodpecker/v2/server" + "go.woodpecker-ci.org/woodpecker/v2/server/store" +) + +func startMetricsCollector(ctx context.Context, _store store.Store) { + pendingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{ + Namespace: "woodpecker", + Name: "pending_steps", + Help: "Total number of pending pipeline steps.", + }) + waitingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{ + Namespace: "woodpecker", + Name: "waiting_steps", + Help: "Total number of pipeline waiting on deps.", + }) + runningSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{ + Namespace: "woodpecker", + Name: "running_steps", + Help: "Total number of running pipeline steps.", + }) + workers := prometheus_auto.NewGauge(prometheus.GaugeOpts{ + Namespace: "woodpecker", + Name: "worker_count", + Help: "Total number of workers.", + }) + pipelines := prometheus_auto.NewGauge(prometheus.GaugeOpts{ + Namespace: "woodpecker", + Name: "pipeline_total_count", + Help: "Total number of pipelines.", + }) + users := prometheus_auto.NewGauge(prometheus.GaugeOpts{ + Namespace: "woodpecker", + Name: "user_count", + Help: "Total number of users.", + }) + repos := prometheus_auto.NewGauge(prometheus.GaugeOpts{ + Namespace: "woodpecker", + Name: "repo_count", + Help: "Total number of repos.", + }) + + go func() { + log.Info().Msg("queue metric collector started") + + for { + stats := server.Config.Services.Queue.Info(ctx) + pendingSteps.Set(float64(stats.Stats.Pending)) + waitingSteps.Set(float64(stats.Stats.WaitingOnDeps)) + runningSteps.Set(float64(stats.Stats.Running)) + workers.Set(float64(stats.Stats.Workers)) + + select { + case <-ctx.Done(): + log.Info().Msg("queue metric collector stopped") + return + case <-time.After(queueInfoRefreshInterval): + } + } + }() + go func() { + log.Info().Msg("store metric collector started") + + for { + repoCount, repoErr := _store.GetRepoCount() + userCount, userErr := _store.GetUserCount() + pipelineCount, pipelineErr := _store.GetPipelineCount() + pipelines.Set(float64(pipelineCount)) + users.Set(float64(userCount)) + repos.Set(float64(repoCount)) + + if err := errors.Join(repoErr, userErr, pipelineErr); err != nil { + log.Error().Err(err).Msg("could not update store information for metrics") + } + + select { + case <-ctx.Done(): + log.Info().Msg("store metric collector stopped") + return + case <-time.After(storeInfoRefreshInterval): + } + } + }() +} diff --git a/cmd/server/server.go b/cmd/server/server.go index 525b01034..c41186132 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -15,10 +15,10 @@ package main import ( + "context" "crypto/tls" "errors" "fmt" - "net" "net/http" "net/http/httputil" "net/url" @@ -32,25 +32,48 @@ import ( "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" "go.woodpecker-ci.org/woodpecker/v2/server" "go.woodpecker-ci.org/woodpecker/v2/server/cron" - woodpeckerGrpcServer "go.woodpecker-ci.org/woodpecker/v2/server/grpc" "go.woodpecker-ci.org/woodpecker/v2/server/router" "go.woodpecker-ci.org/woodpecker/v2/server/router/middleware" "go.woodpecker-ci.org/woodpecker/v2/server/web" "go.woodpecker-ci.org/woodpecker/v2/shared/logger" + "go.woodpecker-ci.org/woodpecker/v2/shared/utils" "go.woodpecker-ci.org/woodpecker/v2/version" ) +const ( + shutdownTimeout = time.Second * 5 +) + +var ( + stopServerFunc context.CancelCauseFunc = func(error) {} + shutdownCancelFunc context.CancelFunc = func() {} + shutdownCtx = context.Background() +) + func run(c *cli.Context) error { if err := logger.SetupGlobalLogger(c, true); err != nil { return err } + ctx := utils.WithContextSigtermCallback(c.Context, func() { + log.Info().Msg("termination signal is received, shutting down server") + }) + + ctx, ctxCancel := context.WithCancelCause(ctx) + stopServerFunc = func(err error) { + if err != nil { + log.Error().Err(err).Msg("shutdown of whole server") + } + stopServerFunc = func(error) {} + shutdownCtx, shutdownCancelFunc = context.WithTimeout(shutdownCtx, shutdownTimeout) + ctxCancel(err) + } + defer stopServerFunc(nil) + defer shutdownCancelFunc() + // set gin mode based on log level if zerolog.GlobalLevel() > zerolog.DebugLevel { gin.SetMode(gin.ReleaseMode) @@ -74,7 +97,7 @@ func run(c *cli.Context) error { ) } - _store, err := setupStore(c) + _store, err := setupStore(ctx, c) if err != nil { return fmt.Errorf("can't setup store: %w", err) } @@ -84,56 +107,35 @@ func run(c *cli.Context) error { } }() - err = setupEvilGlobals(c, _store) + err = setupEvilGlobals(ctx, c, _store) if err != nil { return fmt.Errorf("can't setup globals: %w", err) } - var g errgroup.Group + // wait for all services until one do stops with an error + serviceWaitingGroup := errgroup.Group{} - setupMetrics(&g, _store) + log.Info().Msgf("starting Woodpecker server with version '%s'", version.String()) - g.Go(func() error { - return cron.Start(c.Context, _store) + startMetricsCollector(ctx, _store) + + serviceWaitingGroup.Go(func() error { + log.Info().Msg("starting cron service ...") + if err := cron.Run(ctx, _store); err != nil { + go stopServerFunc(err) + return err + } + log.Info().Msg("cron service stopped") + return nil }) // start the grpc server - g.Go(func() error { - lis, err := net.Listen("tcp", c.String("grpc-addr")) - if err != nil { - log.Fatal().Err(err).Msg("failed to listen on grpc-addr") //nolint:forbidigo - } - - jwtSecret := c.String("grpc-secret") - jwtManager := woodpeckerGrpcServer.NewJWTManager(jwtSecret) - - authorizer := woodpeckerGrpcServer.NewAuthorizer(jwtManager) - grpcServer := grpc.NewServer( - grpc.StreamInterceptor(authorizer.StreamInterceptor), - grpc.UnaryInterceptor(authorizer.UnaryInterceptor), - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: c.Duration("keepalive-min-time"), - }), - ) - - woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer( - server.Config.Services.Queue, - server.Config.Services.Logs, - server.Config.Services.Pubsub, - _store, - ) - proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer) - - woodpeckerAuthServer := woodpeckerGrpcServer.NewWoodpeckerAuthServer( - jwtManager, - server.Config.Server.AgentToken, - _store, - ) - proto.RegisterWoodpeckerAuthServer(grpcServer, woodpeckerAuthServer) - - err = grpcServer.Serve(lis) - if err != nil { - log.Fatal().Err(err).Msg("failed to serve grpc server") //nolint:forbidigo + serviceWaitingGroup.Go(func() error { + log.Info().Msg("starting grpc server ...") + if err := runGrpcServer(ctx, c, _store); err != nil { + // stop whole server as grpc is essential + go stopServerFunc(err) + return err } return nil }) @@ -173,20 +175,33 @@ func run(c *cli.Context) error { switch { case c.String("server-cert") != "": // start the server with tls enabled - g.Go(func() error { - serve := &http.Server{ + serviceWaitingGroup.Go(func() error { + tlsServer := &http.Server{ Addr: server.Config.Server.PortTLS, Handler: handler, TLSConfig: &tls.Config{ NextProtos: []string{"h2", "http/1.1"}, }, } - err = serve.ListenAndServeTLS( + + go func() { + <-ctx.Done() + log.Info().Msg("shutdown tls server ...") + if err := tlsServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck + log.Error().Err(err).Msg("shutdown tls server failed") + } else { + log.Info().Msg("tls server stopped") + } + }() + + log.Info().Msg("starting tls server ...") + err := tlsServer.ListenAndServeTLS( c.String("server-cert"), c.String("server-key"), ) if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatal().Err(err).Msg("failed to start server with tls") //nolint:forbidigo + log.Error().Err(err).Msg("TLS server failed") + stopServerFunc(fmt.Errorf("TLS server failed: %w", err)) } return err }) @@ -202,12 +217,27 @@ func run(c *cli.Context) error { http.Redirect(w, req, req.URL.String(), http.StatusMovedPermanently) } - g.Go(func() error { - err := http.ListenAndServe(server.Config.Server.Port, http.HandlerFunc(redirect)) - if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatal().Err(err).Msg("unable to start server to redirect from http to https") //nolint:forbidigo + serviceWaitingGroup.Go(func() error { + redirectServer := &http.Server{ + Addr: server.Config.Server.Port, + Handler: http.HandlerFunc(redirect), } - return err + go func() { + <-ctx.Done() + log.Info().Msg("shutdown redirect server ...") + if err := redirectServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck + log.Error().Err(err).Msg("shutdown redirect server failed") + } else { + log.Info().Msg("redirect server stopped") + } + }() + + log.Info().Msg("starting redirect server ...") + if err := redirectServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error().Err(err).Msg("redirect server failed") + stopServerFunc(fmt.Errorf("redirect server failed: %w", err)) + } + return nil }) case c.Bool("lets-encrypt"): // start the server with lets-encrypt @@ -219,39 +249,76 @@ func run(c *cli.Context) error { return err } - g.Go(func() error { + serviceWaitingGroup.Go(func() error { + go func() { + <-ctx.Done() + log.Error().Msg("there is no certmagic.HTTPS alternative who is context aware we will fail in 2 seconds") + time.Sleep(time.Second * 2) + log.Fatal().Msg("we kill certmagic by fail") //nolint:forbidigo + }() + + log.Info().Msg("starting certmagic server ...") if err := certmagic.HTTPS([]string{address.Host}, handler); err != nil { - log.Fatal().Err(err).Msg("certmagic does not work") //nolint:forbidigo + log.Error().Err(err).Msg("certmagic does not work") + stopServerFunc(fmt.Errorf("certmagic failed: %w", err)) } return nil }) default: // start the server without tls - g.Go(func() error { - err := http.ListenAndServe( - c.String("server-addr"), - handler, - ) - if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatal().Err(err).Msg("could not start server") //nolint:forbidigo + serviceWaitingGroup.Go(func() error { + httpServer := &http.Server{ + Addr: c.String("server-addr"), + Handler: handler, + } + + go func() { + <-ctx.Done() + log.Info().Msg("shutdown http server ...") + if err := httpServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck + log.Error().Err(err).Msg("shutdown http server failed") + } else { + log.Info().Msg("http server stopped") + } + }() + + log.Info().Msg("starting http server ...") + if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error().Err(err).Msg("http server failed") + stopServerFunc(fmt.Errorf("http server failed: %w", err)) } return err }) } if metricsServerAddr := c.String("metrics-server-addr"); metricsServerAddr != "" { - g.Go(func() error { + serviceWaitingGroup.Go(func() error { metricsRouter := gin.New() metricsRouter.GET("/metrics", gin.WrapH(prometheus_http.Handler())) - err := http.ListenAndServe(metricsServerAddr, metricsRouter) - if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatal().Err(err).Msg("could not start metrics server") //nolint:forbidigo + + metricsServer := &http.Server{ + Addr: metricsServerAddr, + Handler: metricsRouter, + } + + go func() { + <-ctx.Done() + log.Info().Msg("shutdown metrics server ...") + if err := metricsServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck + log.Error().Err(err).Msg("shutdown metrics server failed") + } else { + log.Info().Msg("metrics server stopped") + } + }() + + log.Info().Msg("starting metrics server ...") + if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error().Err(err).Msg("metrics server failed") + stopServerFunc(fmt.Errorf("metrics server failed: %w", err)) } return err }) } - log.Info().Msgf("starting Woodpecker server with version '%s'", version.String()) - - return g.Wait() + return serviceWaitingGroup.Wait() } diff --git a/cmd/server/setup.go b/cmd/server/setup.go index 3c885dbaf..858d79375 100644 --- a/cmd/server/setup.go +++ b/cmd/server/setup.go @@ -26,11 +26,8 @@ import ( "time" "github.com/gorilla/securecookie" - "github.com/prometheus/client_golang/prometheus" - prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" - "golang.org/x/sync/errgroup" "go.woodpecker-ci.org/woodpecker/v2/server" "go.woodpecker-ci.org/woodpecker/v2/server/cache" @@ -49,7 +46,12 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/shared/constant" ) -func setupStore(c *cli.Context) (store.Store, error) { +const ( + queueInfoRefreshInterval = 500 * time.Millisecond + storeInfoRefreshInterval = 10 * time.Second +) + +func setupStore(ctx context.Context, c *cli.Context) (store.Store, error) { datasource := c.String("datasource") driver := c.String("driver") xorm := store.XORM{ @@ -86,7 +88,7 @@ func setupStore(c *cli.Context) (store.Store, error) { return nil, fmt.Errorf("could not open datastore: %w", err) } - if err := store.Migrate(c.Bool("migrations-allow-long")); err != nil { + if err := store.Migrate(ctx, c.Bool("migrations-allow-long")); err != nil { return nil, fmt.Errorf("could not migrate datastore: %w", err) } @@ -102,74 +104,14 @@ func checkSqliteFileExist(path string) error { return err } -func setupQueue(c *cli.Context, s store.Store) queue.Queue { - return queue.WithTaskStore(queue.New(c.Context), s) +func setupQueue(ctx context.Context, s store.Store) queue.Queue { + return queue.WithTaskStore(ctx, queue.New(ctx), s) } -func setupMembershipService(_ *cli.Context, _store store.Store) cache.MembershipService { +func setupMembershipService(_ context.Context, _store store.Store) cache.MembershipService { return cache.NewMembershipService(_store) } -func setupMetrics(g *errgroup.Group, _store store.Store) { - pendingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{ - Namespace: "woodpecker", - Name: "pending_steps", - Help: "Total number of pending pipeline steps.", - }) - waitingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{ - Namespace: "woodpecker", - Name: "waiting_steps", - Help: "Total number of pipeline waiting on deps.", - }) - runningSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{ - Namespace: "woodpecker", - Name: "running_steps", - Help: "Total number of running pipeline steps.", - }) - workers := prometheus_auto.NewGauge(prometheus.GaugeOpts{ - Namespace: "woodpecker", - Name: "worker_count", - Help: "Total number of workers.", - }) - pipelines := prometheus_auto.NewGauge(prometheus.GaugeOpts{ - Namespace: "woodpecker", - Name: "pipeline_total_count", - Help: "Total number of pipelines.", - }) - users := prometheus_auto.NewGauge(prometheus.GaugeOpts{ - Namespace: "woodpecker", - Name: "user_count", - Help: "Total number of users.", - }) - repos := prometheus_auto.NewGauge(prometheus.GaugeOpts{ - Namespace: "woodpecker", - Name: "repo_count", - Help: "Total number of repos.", - }) - - g.Go(func() error { - for { - stats := server.Config.Services.Queue.Info(context.TODO()) - pendingSteps.Set(float64(stats.Stats.Pending)) - waitingSteps.Set(float64(stats.Stats.WaitingOnDeps)) - runningSteps.Set(float64(stats.Stats.Running)) - workers.Set(float64(stats.Stats.Workers)) - time.Sleep(500 * time.Millisecond) - } - }) - g.Go(func() error { - for { - repoCount, _ := _store.GetRepoCount() - userCount, _ := _store.GetUserCount() - pipelineCount, _ := _store.GetPipelineCount() - pipelines.Set(float64(pipelineCount)) - users.Set(float64(userCount)) - repos.Set(float64(repoCount)) - time.Sleep(10 * time.Second) - } - }) -} - func setupLogStore(c *cli.Context, s store.Store) (logService.Service, error) { switch c.String("log-store") { case "file": @@ -202,12 +144,12 @@ func setupJWTSecret(_store store.Store) (string, error) { return jwtSecret, nil } -func setupEvilGlobals(c *cli.Context, s store.Store) error { +func setupEvilGlobals(ctx context.Context, c *cli.Context, s store.Store) error { // services - server.Config.Services.Queue = setupQueue(c, s) + server.Config.Services.Queue = setupQueue(ctx, s) server.Config.Services.Logs = logging.New() server.Config.Services.Pubsub = pubsub.New() - server.Config.Services.Membership = setupMembershipService(c, s) + server.Config.Services.Membership = setupMembershipService(ctx, s) serviceManager, err := services.NewManager(c, s, setup.Forge) if err != nil { return fmt.Errorf("could not setup service manager: %w", err) diff --git a/server/cron/cron.go b/server/cron/cron.go index a1dcddd29..0dc05ee65 100644 --- a/server/cron/cron.go +++ b/server/cron/cron.go @@ -37,8 +37,8 @@ const ( checkItems = 10 ) -// Start starts the cron scheduler loop. -func Start(ctx context.Context, store store.Store) error { +// Run starts the cron scheduler loop. +func Run(ctx context.Context, store store.Store) error { for { select { case <-ctx.Done(): diff --git a/server/queue/persistent.go b/server/queue/persistent.go index 96d9fff9f..e87f7c7b2 100644 --- a/server/queue/persistent.go +++ b/server/queue/persistent.go @@ -27,9 +27,9 @@ import ( // WithTaskStore returns a queue that is backed by the TaskStore. This // ensures the task Queue can be restored when the system starts. -func WithTaskStore(q Queue, s store.Store) Queue { +func WithTaskStore(ctx context.Context, q Queue, s store.Store) Queue { tasks, _ := s.TaskList() - if err := q.PushAtOnce(context.Background(), tasks); err != nil { + if err := q.PushAtOnce(ctx, tasks); err != nil { log.Error().Err(err).Msg("PushAtOnce failed") } return &persistentQueue{q, s} diff --git a/server/store/datastore/engine.go b/server/store/datastore/engine.go index bee837a1d..8499681ac 100644 --- a/server/store/datastore/engine.go +++ b/server/store/datastore/engine.go @@ -15,6 +15,8 @@ package datastore import ( + "context" + "github.com/rs/zerolog" "xorm.io/xorm" xlog "xorm.io/xorm/log" @@ -54,8 +56,8 @@ func (s storage) Ping() error { } // Migrate old storage or init new one. -func (s storage) Migrate(allowLong bool) error { - return migration.Migrate(s.engine, allowLong) +func (s storage) Migrate(ctx context.Context, allowLong bool) error { + return migration.Migrate(ctx, s.engine, allowLong) } func (s storage) Close() error { diff --git a/server/store/datastore/migration/020_alter_logs_table.go b/server/store/datastore/migration/020_alter_logs_table.go index 33e050f32..4f13434cb 100644 --- a/server/store/datastore/migration/020_alter_logs_table.go +++ b/server/store/datastore/migration/020_alter_logs_table.go @@ -21,7 +21,6 @@ import ( "runtime" "github.com/rs/zerolog/log" - "github.com/tevino/abool/v2" "src.techknowlogick.com/xormigrate" "xorm.io/xorm" @@ -90,16 +89,14 @@ var migrateLogs2LogEntries = xormigrate.Migration{ logs := make([]*oldLogs020, 0, perPage020) logEntries := make([]*oldLogEntry020, 0, 50) - sigterm := abool.New() ctx, cancelCtx := context.WithCancelCause(context.Background()) defer cancelCtx(nil) - _ = utils.WithContextSigtermCallback(ctx, func() { + sigtermCtx := utils.WithContextSigtermCallback(ctx, func() { log.Info().Msg("ctrl+c received, stopping current migration") - sigterm.Set() }) for { - if sigterm.IsSet() { + if sigtermCtx.Err() != nil { return fmt.Errorf("migration 'migrate-logs-to-log_entries' gracefully aborted") } diff --git a/server/store/datastore/migration/migration.go b/server/store/datastore/migration/migration.go index bdcead5d9..4c905e66e 100644 --- a/server/store/datastore/migration/migration.go +++ b/server/store/datastore/migration/migration.go @@ -15,6 +15,7 @@ package migration import ( + "context" "fmt" "reflect" @@ -85,7 +86,8 @@ var allBeans = []any{ new(model.Org), } -func Migrate(e *xorm.Engine, allowLong bool) error { +// TODO: make xormigrate context aware +func Migrate(_ context.Context, e *xorm.Engine, allowLong bool) error { e.SetDisableGlobalCache(true) m := xormigrate.New(e, migrationTasks) diff --git a/server/store/datastore/migration/migration_test.go b/server/store/datastore/migration/migration_test.go index cf4b1a761..661bd01ac 100644 --- a/server/store/datastore/migration/migration_test.go +++ b/server/store/datastore/migration/migration_test.go @@ -15,6 +15,7 @@ package migration import ( + "context" "os" "testing" "time" @@ -95,7 +96,7 @@ func testDB(t *testing.T, new bool) (engine *xorm.Engine, closeDB func()) { func TestMigrate(t *testing.T) { // init new db engine, closeDB := testDB(t, true) - assert.NoError(t, Migrate(engine, true)) + assert.NoError(t, Migrate(context.Background(), engine, true)) closeDB() dbType := engine.Dialect().URI().DBType @@ -106,6 +107,6 @@ func TestMigrate(t *testing.T) { // migrate old db engine, closeDB = testDB(t, false) - assert.NoError(t, Migrate(engine, true)) + assert.NoError(t, Migrate(context.Background(), engine, true)) closeDB() } diff --git a/server/store/mocks/store.go b/server/store/mocks/store.go index e0c2663ff..55d7257aa 100644 --- a/server/store/mocks/store.go +++ b/server/store/mocks/store.go @@ -6,6 +6,8 @@ package mocks import ( + context "context" + mock "github.com/stretchr/testify/mock" model "go.woodpecker-ci.org/woodpecker/v2/server/model" ) @@ -1404,17 +1406,17 @@ func (_m *Store) LogFind(_a0 *model.Step) ([]*model.LogEntry, error) { return r0, r1 } -// Migrate provides a mock function with given fields: _a0 -func (_m *Store) Migrate(_a0 bool) error { - ret := _m.Called(_a0) +// Migrate provides a mock function with given fields: _a0, _a1 +func (_m *Store) Migrate(_a0 context.Context, _a1 bool) error { + ret := _m.Called(_a0, _a1) if len(ret) == 0 { panic("no return value specified for Migrate") } var r0 error - if rf, ok := ret.Get(0).(func(bool) error); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func(context.Context, bool) error); ok { + r0 = rf(_a0, _a1) } else { r0 = ret.Error(0) } diff --git a/server/store/store.go b/server/store/store.go index 8ff3b1052..ff408f075 100644 --- a/server/store/store.go +++ b/server/store/store.go @@ -17,6 +17,8 @@ package store //go:generate mockery --name Store --output mocks --case underscore --note "+build test" import ( + "context" + "go.woodpecker-ci.org/woodpecker/v2/server/model" ) @@ -200,5 +202,5 @@ type Store interface { // Store operations Ping() error Close() error - Migrate(bool) error + Migrate(context.Context, bool) error }