Gracefully shutdown server (#3896)

This commit is contained in:
6543 2024-07-13 16:46:01 -07:00 committed by GitHub
parent 30cd800110
commit 757f5a58e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 375 additions and 164 deletions

88
cmd/server/grpc_server.go Normal file
View file

@ -0,0 +1,88 @@
// Copyright 2024 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"net"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto"
"go.woodpecker-ci.org/woodpecker/v2/server"
woodpeckerGrpcServer "go.woodpecker-ci.org/woodpecker/v2/server/grpc"
"go.woodpecker-ci.org/woodpecker/v2/server/store"
)
func runGrpcServer(ctx context.Context, c *cli.Context, _store store.Store) error {
lis, err := net.Listen("tcp", c.String("grpc-addr"))
if err != nil {
log.Fatal().Err(err).Msg("failed to listen on grpc-addr") //nolint:forbidigo
}
jwtSecret := c.String("grpc-secret")
jwtManager := woodpeckerGrpcServer.NewJWTManager(jwtSecret)
authorizer := woodpeckerGrpcServer.NewAuthorizer(jwtManager)
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(authorizer.StreamInterceptor),
grpc.UnaryInterceptor(authorizer.UnaryInterceptor),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: c.Duration("keepalive-min-time"),
}),
)
woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer(
server.Config.Services.Queue,
server.Config.Services.Logs,
server.Config.Services.Pubsub,
_store,
)
proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer)
woodpeckerAuthServer := woodpeckerGrpcServer.NewWoodpeckerAuthServer(
jwtManager,
server.Config.Server.AgentToken,
_store,
)
proto.RegisterWoodpeckerAuthServer(grpcServer, woodpeckerAuthServer)
grpcCtx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
go func() {
<-grpcCtx.Done()
if grpcServer == nil {
return
}
log.Info().Msg("terminating grpc service gracefully")
grpcServer.GracefulStop()
log.Info().Msg("grpc service stopped")
}()
if err := grpcServer.Serve(lis); err != nil {
// signal that we don't have to stop the server gracefully anymore
grpcServer = nil
// wrap the error so we know where it did come from
return fmt.Errorf("grpc server failed: %w", err)
}
return nil
}

View file

@ -43,6 +43,6 @@ func main() {
setupSwaggerStaticConfig() setupSwaggerStaticConfig()
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Fatal().Err(err).Msgf("error running server") //nolint:forbidigo log.Error().Err(err).Msgf("error running server")
} }
} }

View file

@ -0,0 +1,108 @@
// Copyright 2024 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"time"
"github.com/prometheus/client_golang/prometheus"
prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/server"
"go.woodpecker-ci.org/woodpecker/v2/server/store"
)
func startMetricsCollector(ctx context.Context, _store store.Store) {
pendingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pending_steps",
Help: "Total number of pending pipeline steps.",
})
waitingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "waiting_steps",
Help: "Total number of pipeline waiting on deps.",
})
runningSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "running_steps",
Help: "Total number of running pipeline steps.",
})
workers := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "worker_count",
Help: "Total number of workers.",
})
pipelines := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pipeline_total_count",
Help: "Total number of pipelines.",
})
users := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "user_count",
Help: "Total number of users.",
})
repos := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "repo_count",
Help: "Total number of repos.",
})
go func() {
log.Info().Msg("queue metric collector started")
for {
stats := server.Config.Services.Queue.Info(ctx)
pendingSteps.Set(float64(stats.Stats.Pending))
waitingSteps.Set(float64(stats.Stats.WaitingOnDeps))
runningSteps.Set(float64(stats.Stats.Running))
workers.Set(float64(stats.Stats.Workers))
select {
case <-ctx.Done():
log.Info().Msg("queue metric collector stopped")
return
case <-time.After(queueInfoRefreshInterval):
}
}
}()
go func() {
log.Info().Msg("store metric collector started")
for {
repoCount, repoErr := _store.GetRepoCount()
userCount, userErr := _store.GetUserCount()
pipelineCount, pipelineErr := _store.GetPipelineCount()
pipelines.Set(float64(pipelineCount))
users.Set(float64(userCount))
repos.Set(float64(repoCount))
if err := errors.Join(repoErr, userErr, pipelineErr); err != nil {
log.Error().Err(err).Msg("could not update store information for metrics")
}
select {
case <-ctx.Done():
log.Info().Msg("store metric collector stopped")
return
case <-time.After(storeInfoRefreshInterval):
}
}
}()
}

View file

@ -15,10 +15,10 @@
package main package main
import ( import (
"context"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
@ -32,25 +32,48 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto"
"go.woodpecker-ci.org/woodpecker/v2/server" "go.woodpecker-ci.org/woodpecker/v2/server"
"go.woodpecker-ci.org/woodpecker/v2/server/cron" "go.woodpecker-ci.org/woodpecker/v2/server/cron"
woodpeckerGrpcServer "go.woodpecker-ci.org/woodpecker/v2/server/grpc"
"go.woodpecker-ci.org/woodpecker/v2/server/router" "go.woodpecker-ci.org/woodpecker/v2/server/router"
"go.woodpecker-ci.org/woodpecker/v2/server/router/middleware" "go.woodpecker-ci.org/woodpecker/v2/server/router/middleware"
"go.woodpecker-ci.org/woodpecker/v2/server/web" "go.woodpecker-ci.org/woodpecker/v2/server/web"
"go.woodpecker-ci.org/woodpecker/v2/shared/logger" "go.woodpecker-ci.org/woodpecker/v2/shared/logger"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
"go.woodpecker-ci.org/woodpecker/v2/version" "go.woodpecker-ci.org/woodpecker/v2/version"
) )
const (
shutdownTimeout = time.Second * 5
)
var (
stopServerFunc context.CancelCauseFunc = func(error) {}
shutdownCancelFunc context.CancelFunc = func() {}
shutdownCtx = context.Background()
)
func run(c *cli.Context) error { func run(c *cli.Context) error {
if err := logger.SetupGlobalLogger(c, true); err != nil { if err := logger.SetupGlobalLogger(c, true); err != nil {
return err return err
} }
ctx := utils.WithContextSigtermCallback(c.Context, func() {
log.Info().Msg("termination signal is received, shutting down server")
})
ctx, ctxCancel := context.WithCancelCause(ctx)
stopServerFunc = func(err error) {
if err != nil {
log.Error().Err(err).Msg("shutdown of whole server")
}
stopServerFunc = func(error) {}
shutdownCtx, shutdownCancelFunc = context.WithTimeout(shutdownCtx, shutdownTimeout)
ctxCancel(err)
}
defer stopServerFunc(nil)
defer shutdownCancelFunc()
// set gin mode based on log level // set gin mode based on log level
if zerolog.GlobalLevel() > zerolog.DebugLevel { if zerolog.GlobalLevel() > zerolog.DebugLevel {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
@ -74,7 +97,7 @@ func run(c *cli.Context) error {
) )
} }
_store, err := setupStore(c) _store, err := setupStore(ctx, c)
if err != nil { if err != nil {
return fmt.Errorf("can't setup store: %w", err) return fmt.Errorf("can't setup store: %w", err)
} }
@ -84,56 +107,35 @@ func run(c *cli.Context) error {
} }
}() }()
err = setupEvilGlobals(c, _store) err = setupEvilGlobals(ctx, c, _store)
if err != nil { if err != nil {
return fmt.Errorf("can't setup globals: %w", err) return fmt.Errorf("can't setup globals: %w", err)
} }
var g errgroup.Group // wait for all services until one do stops with an error
serviceWaitingGroup := errgroup.Group{}
setupMetrics(&g, _store) log.Info().Msgf("starting Woodpecker server with version '%s'", version.String())
g.Go(func() error { startMetricsCollector(ctx, _store)
return cron.Start(c.Context, _store)
serviceWaitingGroup.Go(func() error {
log.Info().Msg("starting cron service ...")
if err := cron.Run(ctx, _store); err != nil {
go stopServerFunc(err)
return err
}
log.Info().Msg("cron service stopped")
return nil
}) })
// start the grpc server // start the grpc server
g.Go(func() error { serviceWaitingGroup.Go(func() error {
lis, err := net.Listen("tcp", c.String("grpc-addr")) log.Info().Msg("starting grpc server ...")
if err != nil { if err := runGrpcServer(ctx, c, _store); err != nil {
log.Fatal().Err(err).Msg("failed to listen on grpc-addr") //nolint:forbidigo // stop whole server as grpc is essential
} go stopServerFunc(err)
return err
jwtSecret := c.String("grpc-secret")
jwtManager := woodpeckerGrpcServer.NewJWTManager(jwtSecret)
authorizer := woodpeckerGrpcServer.NewAuthorizer(jwtManager)
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(authorizer.StreamInterceptor),
grpc.UnaryInterceptor(authorizer.UnaryInterceptor),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: c.Duration("keepalive-min-time"),
}),
)
woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer(
server.Config.Services.Queue,
server.Config.Services.Logs,
server.Config.Services.Pubsub,
_store,
)
proto.RegisterWoodpeckerServer(grpcServer, woodpeckerServer)
woodpeckerAuthServer := woodpeckerGrpcServer.NewWoodpeckerAuthServer(
jwtManager,
server.Config.Server.AgentToken,
_store,
)
proto.RegisterWoodpeckerAuthServer(grpcServer, woodpeckerAuthServer)
err = grpcServer.Serve(lis)
if err != nil {
log.Fatal().Err(err).Msg("failed to serve grpc server") //nolint:forbidigo
} }
return nil return nil
}) })
@ -173,20 +175,33 @@ func run(c *cli.Context) error {
switch { switch {
case c.String("server-cert") != "": case c.String("server-cert") != "":
// start the server with tls enabled // start the server with tls enabled
g.Go(func() error { serviceWaitingGroup.Go(func() error {
serve := &http.Server{ tlsServer := &http.Server{
Addr: server.Config.Server.PortTLS, Addr: server.Config.Server.PortTLS,
Handler: handler, Handler: handler,
TLSConfig: &tls.Config{ TLSConfig: &tls.Config{
NextProtos: []string{"h2", "http/1.1"}, NextProtos: []string{"h2", "http/1.1"},
}, },
} }
err = serve.ListenAndServeTLS(
go func() {
<-ctx.Done()
log.Info().Msg("shutdown tls server ...")
if err := tlsServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
log.Error().Err(err).Msg("shutdown tls server failed")
} else {
log.Info().Msg("tls server stopped")
}
}()
log.Info().Msg("starting tls server ...")
err := tlsServer.ListenAndServeTLS(
c.String("server-cert"), c.String("server-cert"),
c.String("server-key"), c.String("server-key"),
) )
if err != nil && !errors.Is(err, http.ErrServerClosed) { if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal().Err(err).Msg("failed to start server with tls") //nolint:forbidigo log.Error().Err(err).Msg("TLS server failed")
stopServerFunc(fmt.Errorf("TLS server failed: %w", err))
} }
return err return err
}) })
@ -202,12 +217,27 @@ func run(c *cli.Context) error {
http.Redirect(w, req, req.URL.String(), http.StatusMovedPermanently) http.Redirect(w, req, req.URL.String(), http.StatusMovedPermanently)
} }
g.Go(func() error { serviceWaitingGroup.Go(func() error {
err := http.ListenAndServe(server.Config.Server.Port, http.HandlerFunc(redirect)) redirectServer := &http.Server{
if err != nil && !errors.Is(err, http.ErrServerClosed) { Addr: server.Config.Server.Port,
log.Fatal().Err(err).Msg("unable to start server to redirect from http to https") //nolint:forbidigo Handler: http.HandlerFunc(redirect),
} }
return err go func() {
<-ctx.Done()
log.Info().Msg("shutdown redirect server ...")
if err := redirectServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
log.Error().Err(err).Msg("shutdown redirect server failed")
} else {
log.Info().Msg("redirect server stopped")
}
}()
log.Info().Msg("starting redirect server ...")
if err := redirectServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("redirect server failed")
stopServerFunc(fmt.Errorf("redirect server failed: %w", err))
}
return nil
}) })
case c.Bool("lets-encrypt"): case c.Bool("lets-encrypt"):
// start the server with lets-encrypt // start the server with lets-encrypt
@ -219,39 +249,76 @@ func run(c *cli.Context) error {
return err return err
} }
g.Go(func() error { serviceWaitingGroup.Go(func() error {
go func() {
<-ctx.Done()
log.Error().Msg("there is no certmagic.HTTPS alternative who is context aware we will fail in 2 seconds")
time.Sleep(time.Second * 2)
log.Fatal().Msg("we kill certmagic by fail") //nolint:forbidigo
}()
log.Info().Msg("starting certmagic server ...")
if err := certmagic.HTTPS([]string{address.Host}, handler); err != nil { if err := certmagic.HTTPS([]string{address.Host}, handler); err != nil {
log.Fatal().Err(err).Msg("certmagic does not work") //nolint:forbidigo log.Error().Err(err).Msg("certmagic does not work")
stopServerFunc(fmt.Errorf("certmagic failed: %w", err))
} }
return nil return nil
}) })
default: default:
// start the server without tls // start the server without tls
g.Go(func() error { serviceWaitingGroup.Go(func() error {
err := http.ListenAndServe( httpServer := &http.Server{
c.String("server-addr"), Addr: c.String("server-addr"),
handler, Handler: handler,
) }
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal().Err(err).Msg("could not start server") //nolint:forbidigo go func() {
<-ctx.Done()
log.Info().Msg("shutdown http server ...")
if err := httpServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
log.Error().Err(err).Msg("shutdown http server failed")
} else {
log.Info().Msg("http server stopped")
}
}()
log.Info().Msg("starting http server ...")
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("http server failed")
stopServerFunc(fmt.Errorf("http server failed: %w", err))
} }
return err return err
}) })
} }
if metricsServerAddr := c.String("metrics-server-addr"); metricsServerAddr != "" { if metricsServerAddr := c.String("metrics-server-addr"); metricsServerAddr != "" {
g.Go(func() error { serviceWaitingGroup.Go(func() error {
metricsRouter := gin.New() metricsRouter := gin.New()
metricsRouter.GET("/metrics", gin.WrapH(prometheus_http.Handler())) metricsRouter.GET("/metrics", gin.WrapH(prometheus_http.Handler()))
err := http.ListenAndServe(metricsServerAddr, metricsRouter)
if err != nil && !errors.Is(err, http.ErrServerClosed) { metricsServer := &http.Server{
log.Fatal().Err(err).Msg("could not start metrics server") //nolint:forbidigo Addr: metricsServerAddr,
Handler: metricsRouter,
}
go func() {
<-ctx.Done()
log.Info().Msg("shutdown metrics server ...")
if err := metricsServer.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
log.Error().Err(err).Msg("shutdown metrics server failed")
} else {
log.Info().Msg("metrics server stopped")
}
}()
log.Info().Msg("starting metrics server ...")
if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("metrics server failed")
stopServerFunc(fmt.Errorf("metrics server failed: %w", err))
} }
return err return err
}) })
} }
log.Info().Msgf("starting Woodpecker server with version '%s'", version.String()) return serviceWaitingGroup.Wait()
return g.Wait()
} }

View file

@ -26,11 +26,8 @@ import (
"time" "time"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
"github.com/prometheus/client_golang/prometheus"
prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
"go.woodpecker-ci.org/woodpecker/v2/server" "go.woodpecker-ci.org/woodpecker/v2/server"
"go.woodpecker-ci.org/woodpecker/v2/server/cache" "go.woodpecker-ci.org/woodpecker/v2/server/cache"
@ -49,7 +46,12 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/shared/constant" "go.woodpecker-ci.org/woodpecker/v2/shared/constant"
) )
func setupStore(c *cli.Context) (store.Store, error) { const (
queueInfoRefreshInterval = 500 * time.Millisecond
storeInfoRefreshInterval = 10 * time.Second
)
func setupStore(ctx context.Context, c *cli.Context) (store.Store, error) {
datasource := c.String("datasource") datasource := c.String("datasource")
driver := c.String("driver") driver := c.String("driver")
xorm := store.XORM{ xorm := store.XORM{
@ -86,7 +88,7 @@ func setupStore(c *cli.Context) (store.Store, error) {
return nil, fmt.Errorf("could not open datastore: %w", err) return nil, fmt.Errorf("could not open datastore: %w", err)
} }
if err := store.Migrate(c.Bool("migrations-allow-long")); err != nil { if err := store.Migrate(ctx, c.Bool("migrations-allow-long")); err != nil {
return nil, fmt.Errorf("could not migrate datastore: %w", err) return nil, fmt.Errorf("could not migrate datastore: %w", err)
} }
@ -102,74 +104,14 @@ func checkSqliteFileExist(path string) error {
return err return err
} }
func setupQueue(c *cli.Context, s store.Store) queue.Queue { func setupQueue(ctx context.Context, s store.Store) queue.Queue {
return queue.WithTaskStore(queue.New(c.Context), s) return queue.WithTaskStore(ctx, queue.New(ctx), s)
} }
func setupMembershipService(_ *cli.Context, _store store.Store) cache.MembershipService { func setupMembershipService(_ context.Context, _store store.Store) cache.MembershipService {
return cache.NewMembershipService(_store) return cache.NewMembershipService(_store)
} }
func setupMetrics(g *errgroup.Group, _store store.Store) {
pendingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pending_steps",
Help: "Total number of pending pipeline steps.",
})
waitingSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "waiting_steps",
Help: "Total number of pipeline waiting on deps.",
})
runningSteps := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "running_steps",
Help: "Total number of running pipeline steps.",
})
workers := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "worker_count",
Help: "Total number of workers.",
})
pipelines := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pipeline_total_count",
Help: "Total number of pipelines.",
})
users := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "user_count",
Help: "Total number of users.",
})
repos := prometheus_auto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "repo_count",
Help: "Total number of repos.",
})
g.Go(func() error {
for {
stats := server.Config.Services.Queue.Info(context.TODO())
pendingSteps.Set(float64(stats.Stats.Pending))
waitingSteps.Set(float64(stats.Stats.WaitingOnDeps))
runningSteps.Set(float64(stats.Stats.Running))
workers.Set(float64(stats.Stats.Workers))
time.Sleep(500 * time.Millisecond)
}
})
g.Go(func() error {
for {
repoCount, _ := _store.GetRepoCount()
userCount, _ := _store.GetUserCount()
pipelineCount, _ := _store.GetPipelineCount()
pipelines.Set(float64(pipelineCount))
users.Set(float64(userCount))
repos.Set(float64(repoCount))
time.Sleep(10 * time.Second)
}
})
}
func setupLogStore(c *cli.Context, s store.Store) (logService.Service, error) { func setupLogStore(c *cli.Context, s store.Store) (logService.Service, error) {
switch c.String("log-store") { switch c.String("log-store") {
case "file": case "file":
@ -202,12 +144,12 @@ func setupJWTSecret(_store store.Store) (string, error) {
return jwtSecret, nil return jwtSecret, nil
} }
func setupEvilGlobals(c *cli.Context, s store.Store) error { func setupEvilGlobals(ctx context.Context, c *cli.Context, s store.Store) error {
// services // services
server.Config.Services.Queue = setupQueue(c, s) server.Config.Services.Queue = setupQueue(ctx, s)
server.Config.Services.Logs = logging.New() server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = pubsub.New() server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Membership = setupMembershipService(c, s) server.Config.Services.Membership = setupMembershipService(ctx, s)
serviceManager, err := services.NewManager(c, s, setup.Forge) serviceManager, err := services.NewManager(c, s, setup.Forge)
if err != nil { if err != nil {
return fmt.Errorf("could not setup service manager: %w", err) return fmt.Errorf("could not setup service manager: %w", err)

View file

@ -37,8 +37,8 @@ const (
checkItems = 10 checkItems = 10
) )
// Start starts the cron scheduler loop. // Run starts the cron scheduler loop.
func Start(ctx context.Context, store store.Store) error { func Run(ctx context.Context, store store.Store) error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View file

@ -27,9 +27,9 @@ import (
// WithTaskStore returns a queue that is backed by the TaskStore. This // WithTaskStore returns a queue that is backed by the TaskStore. This
// ensures the task Queue can be restored when the system starts. // ensures the task Queue can be restored when the system starts.
func WithTaskStore(q Queue, s store.Store) Queue { func WithTaskStore(ctx context.Context, q Queue, s store.Store) Queue {
tasks, _ := s.TaskList() tasks, _ := s.TaskList()
if err := q.PushAtOnce(context.Background(), tasks); err != nil { if err := q.PushAtOnce(ctx, tasks); err != nil {
log.Error().Err(err).Msg("PushAtOnce failed") log.Error().Err(err).Msg("PushAtOnce failed")
} }
return &persistentQueue{q, s} return &persistentQueue{q, s}

View file

@ -15,6 +15,8 @@
package datastore package datastore
import ( import (
"context"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"xorm.io/xorm" "xorm.io/xorm"
xlog "xorm.io/xorm/log" xlog "xorm.io/xorm/log"
@ -54,8 +56,8 @@ func (s storage) Ping() error {
} }
// Migrate old storage or init new one. // Migrate old storage or init new one.
func (s storage) Migrate(allowLong bool) error { func (s storage) Migrate(ctx context.Context, allowLong bool) error {
return migration.Migrate(s.engine, allowLong) return migration.Migrate(ctx, s.engine, allowLong)
} }
func (s storage) Close() error { func (s storage) Close() error {

View file

@ -21,7 +21,6 @@ import (
"runtime" "runtime"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/tevino/abool/v2"
"src.techknowlogick.com/xormigrate" "src.techknowlogick.com/xormigrate"
"xorm.io/xorm" "xorm.io/xorm"
@ -90,16 +89,14 @@ var migrateLogs2LogEntries = xormigrate.Migration{
logs := make([]*oldLogs020, 0, perPage020) logs := make([]*oldLogs020, 0, perPage020)
logEntries := make([]*oldLogEntry020, 0, 50) logEntries := make([]*oldLogEntry020, 0, 50)
sigterm := abool.New()
ctx, cancelCtx := context.WithCancelCause(context.Background()) ctx, cancelCtx := context.WithCancelCause(context.Background())
defer cancelCtx(nil) defer cancelCtx(nil)
_ = utils.WithContextSigtermCallback(ctx, func() { sigtermCtx := utils.WithContextSigtermCallback(ctx, func() {
log.Info().Msg("ctrl+c received, stopping current migration") log.Info().Msg("ctrl+c received, stopping current migration")
sigterm.Set()
}) })
for { for {
if sigterm.IsSet() { if sigtermCtx.Err() != nil {
return fmt.Errorf("migration 'migrate-logs-to-log_entries' gracefully aborted") return fmt.Errorf("migration 'migrate-logs-to-log_entries' gracefully aborted")
} }

View file

@ -15,6 +15,7 @@
package migration package migration
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
@ -85,7 +86,8 @@ var allBeans = []any{
new(model.Org), new(model.Org),
} }
func Migrate(e *xorm.Engine, allowLong bool) error { // TODO: make xormigrate context aware
func Migrate(_ context.Context, e *xorm.Engine, allowLong bool) error {
e.SetDisableGlobalCache(true) e.SetDisableGlobalCache(true)
m := xormigrate.New(e, migrationTasks) m := xormigrate.New(e, migrationTasks)

View file

@ -15,6 +15,7 @@
package migration package migration
import ( import (
"context"
"os" "os"
"testing" "testing"
"time" "time"
@ -95,7 +96,7 @@ func testDB(t *testing.T, new bool) (engine *xorm.Engine, closeDB func()) {
func TestMigrate(t *testing.T) { func TestMigrate(t *testing.T) {
// init new db // init new db
engine, closeDB := testDB(t, true) engine, closeDB := testDB(t, true)
assert.NoError(t, Migrate(engine, true)) assert.NoError(t, Migrate(context.Background(), engine, true))
closeDB() closeDB()
dbType := engine.Dialect().URI().DBType dbType := engine.Dialect().URI().DBType
@ -106,6 +107,6 @@ func TestMigrate(t *testing.T) {
// migrate old db // migrate old db
engine, closeDB = testDB(t, false) engine, closeDB = testDB(t, false)
assert.NoError(t, Migrate(engine, true)) assert.NoError(t, Migrate(context.Background(), engine, true))
closeDB() closeDB()
} }

View file

@ -6,6 +6,8 @@
package mocks package mocks
import ( import (
context "context"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
model "go.woodpecker-ci.org/woodpecker/v2/server/model" model "go.woodpecker-ci.org/woodpecker/v2/server/model"
) )
@ -1404,17 +1406,17 @@ func (_m *Store) LogFind(_a0 *model.Step) ([]*model.LogEntry, error) {
return r0, r1 return r0, r1
} }
// Migrate provides a mock function with given fields: _a0 // Migrate provides a mock function with given fields: _a0, _a1
func (_m *Store) Migrate(_a0 bool) error { func (_m *Store) Migrate(_a0 context.Context, _a1 bool) error {
ret := _m.Called(_a0) ret := _m.Called(_a0, _a1)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for Migrate") panic("no return value specified for Migrate")
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(bool) error); ok { if rf, ok := ret.Get(0).(func(context.Context, bool) error); ok {
r0 = rf(_a0) r0 = rf(_a0, _a1)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }

View file

@ -17,6 +17,8 @@ package store
//go:generate mockery --name Store --output mocks --case underscore --note "+build test" //go:generate mockery --name Store --output mocks --case underscore --note "+build test"
import ( import (
"context"
"go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/model"
) )
@ -200,5 +202,5 @@ type Store interface {
// Store operations // Store operations
Ping() error Ping() error
Close() error Close() error
Migrate(bool) error Migrate(context.Context, bool) error
} }