woodpecker/pipeline/backend/kubernetes/kubernetes.go
Fernando Barbosa c7467b9828
fix: agent panic when node is terminated during step execution (#3331)
Fixes https://github.com/woodpecker-ci/woodpecker/issues/3330

This adds error handling on the agent's WaitStep function, on two
sections where it could encounter a `panic: runtime error: invalid
memory address or nil pointer dereference` in case it could no longer
access complete information about a specific pod.

This error was found to happen if the node in which the pod was running
was terminated during the step's execution.
spite active pipelines being executed on the node.

Now instead of a panic on the agent's logs and undefined behavior on the
UI it will display a more helpful error message on the UI.

### Additional context

We observed the bug first on v2.1.1, but tested the fix internally on
top of 2.3.0.


![image](https://github.com/woodpecker-ci/woodpecker/assets/7269710/dfbcf089-85f7-4b5d-8102-f21af95c5cda)
2024-02-05 22:46:14 +01:00

397 lines
10 KiB
Go

// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"fmt"
"io"
"maps"
"os"
"runtime"
"slices"
"time"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
// To authenticate to GCP K8s clusters
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
)
const (
EngineName = "kubernetes"
)
var defaultDeleteOptions = newDefaultDeleteOptions()
type kube struct {
client kubernetes.Interface
config *config
goos string
}
type config struct {
Namespace string
StorageClass string
VolumeSize string
StorageRwx bool
PodLabels map[string]string
PodAnnotations map[string]string
ImagePullSecretNames []string
SecurityContext SecurityContextConfig
}
type SecurityContextConfig struct {
RunAsNonRoot bool
}
func newDefaultDeleteOptions() metav1.DeleteOptions {
gracePeriodSeconds := int64(0) // immediately
propagationPolicy := metav1.DeletePropagationBackground
return metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &propagationPolicy,
}
}
func configFromCliContext(ctx context.Context) (*config, error) {
if ctx != nil {
if c, ok := ctx.Value(types.CliContext).(*cli.Context); ok {
config := config{
Namespace: c.String("backend-k8s-namespace"),
StorageClass: c.String("backend-k8s-storage-class"),
VolumeSize: c.String("backend-k8s-volume-size"),
StorageRwx: c.Bool("backend-k8s-storage-rwx"),
PodLabels: make(map[string]string), // just init empty map to prevent nil panic
PodAnnotations: make(map[string]string), // just init empty map to prevent nil panic
ImagePullSecretNames: c.StringSlice("backend-k8s-pod-image-pull-secret-names"),
SecurityContext: SecurityContextConfig{
RunAsNonRoot: c.Bool("backend-k8s-secctx-nonroot"),
},
}
// TODO: remove in next major
if len(config.ImagePullSecretNames) == 1 && config.ImagePullSecretNames[0] == "regcred" {
log.Warn().Msg("WOODPECKER_BACKEND_K8S_PULL_SECRET_NAMES is set to the default ('regcred'). It will default to empty in Woodpecker 3.0. Set it explicitly before then.")
}
// Unmarshal label and annotation settings here to ensure they're valid on startup
if labels := c.String("backend-k8s-pod-labels"); labels != "" {
if err := yaml.Unmarshal([]byte(labels), &config.PodLabels); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod labels '%s'", c.String("backend-k8s-pod-labels"))
return nil, err
}
}
if annotations := c.String("backend-k8s-pod-annotations"); annotations != "" {
if err := yaml.Unmarshal([]byte(c.String("backend-k8s-pod-annotations")), &config.PodAnnotations); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod annotations '%s'", c.String("backend-k8s-pod-annotations"))
return nil, err
}
}
return &config, nil
}
}
return nil, types.ErrNoCliContextFound
}
// New returns a new Kubernetes Backend.
func New() types.Backend {
return &kube{}
}
func (e *kube) Name() string {
return EngineName
}
func (e *kube) IsAvailable(context.Context) bool {
host := os.Getenv("KUBERNETES_SERVICE_HOST")
return len(host) > 0
}
func (e *kube) Load(ctx context.Context) (*types.BackendInfo, error) {
config, err := configFromCliContext(ctx)
if err != nil {
return nil, err
}
e.config = config
var kubeClient kubernetes.Interface
_, err = rest.InClusterConfig()
if err != nil {
kubeClient, err = getClientOutOfCluster()
} else {
kubeClient, err = getClientInsideOfCluster()
}
if err != nil {
return nil, err
}
e.client = kubeClient
// TODO(2693): use info resp of kubeClient to define platform var
e.goos = runtime.GOOS
return &types.BackendInfo{
Platform: runtime.GOOS + "/" + runtime.GOARCH,
}, nil
}
func (e *kube) getConfig() *config {
if e.config == nil {
return nil
}
c := *e.config
c.PodLabels = maps.Clone(e.config.PodLabels)
c.PodAnnotations = maps.Clone(e.config.PodAnnotations)
c.ImagePullSecretNames = slices.Clone(e.config.ImagePullSecretNames)
return &c
}
// Setup the pipeline environment.
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
for _, vol := range conf.Volumes {
_, err := startVolume(ctx, e, vol.Name)
if err != nil {
return err
}
}
extraHosts := []types.HostAlias{}
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
if step.Type == types.StepTypeService {
svc, err := startService(ctx, e, step)
if err != nil {
return err
}
hostAlias := types.HostAlias{Name: step.Networks[0].Aliases[0], IP: svc.Spec.ClusterIP}
extraHosts = append(extraHosts, hostAlias)
}
}
}
log.Trace().Msgf("adding extra hosts: %v", extraHosts)
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
step.ExtraHosts = extraHosts
}
}
return nil
}
// Start the pipeline step.
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("starting step: %s", step.Name)
_, err := startPod(ctx, e, step)
return err
}
// Wait for the pipeline step to complete and returns
// the completion results.
func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
podName, err := stepToPodName(step)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("waiting for pod: %s", podName)
finished := make(chan bool)
podUpdated := func(old, new any) {
pod, ok := new.(*v1.Pod)
if !ok {
log.Error().Msgf("could not parse pod: %v", new)
return
}
if pod.Name == podName {
if isImagePullBackOffState(pod) {
finished <- true
}
switch pod.Status.Phase {
case v1.PodSucceeded, v1.PodFailed, v1.PodUnknown:
finished <- true
}
}
}
// TODO 5 seconds is against best practice, k3s didn't work otherwise
si := informers.NewSharedInformerFactoryWithOptions(e.client, 5*time.Second, informers.WithNamespace(e.config.Namespace))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
},
); err != nil {
return nil, err
}
stop := make(chan struct{})
si.Start(stop)
defer close(stop)
// TODO Cancel on ctx.Done
<-finished
pod, err := e.client.CoreV1().Pods(e.config.Namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, err
}
if isImagePullBackOffState(pod) {
return nil, fmt.Errorf("could not pull image for pod %s", podName)
}
if len(pod.Status.ContainerStatuses) == 0 {
return nil, fmt.Errorf("no container statuses found for pod %s", podName)
}
cs := pod.Status.ContainerStatuses[0]
if cs.State.Terminated == nil {
err := fmt.Errorf("no terminated state found for container %s/%s", podName, cs.Name)
log.Error().Str("taskUUID", taskUUID).Str("pod", podName).Str("container", cs.Name).Interface("state", cs.State).Msg(err.Error())
return nil, err
}
bs := &types.State{
ExitCode: int(cs.State.Terminated.ExitCode),
Exited: true,
OOMKilled: false,
}
return bs, nil
}
// Tail the pipeline step logs.
func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
podName, err := stepToPodName(step)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of pod: %s", podName)
up := make(chan bool)
podUpdated := func(old, new any) {
pod, ok := new.(*v1.Pod)
if !ok {
log.Error().Msgf("could not parse pod: %v", new)
return
}
if pod.Name == podName {
switch pod.Status.Phase {
case v1.PodRunning, v1.PodSucceeded, v1.PodFailed:
up <- true
}
}
}
// TODO 5 seconds is against best practice, k3s didn't work otherwise
si := informers.NewSharedInformerFactoryWithOptions(e.client, 5*time.Second, informers.WithNamespace(e.config.Namespace))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
},
); err != nil {
return nil, err
}
stop := make(chan struct{})
si.Start(stop)
defer close(stop)
<-up
opts := &v1.PodLogOptions{
Follow: true,
Container: podName,
}
logs, err := e.client.CoreV1().RESTClient().Get().
Namespace(e.config.Namespace).
Name(podName).
Resource("pods").
SubResource("log").
VersionedParams(opts, scheme.ParameterCodec).
Stream(ctx)
if err != nil {
return nil, err
}
rc, wc := io.Pipe()
go func() {
defer logs.Close()
defer wc.Close()
defer rc.Close()
_, err = io.Copy(wc, logs)
if err != nil {
return
}
}()
return rc, nil
}
func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name)
err := stopPod(ctx, e, step, defaultDeleteOptions)
return err
}
// Destroy the pipeline environment.
func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("deleting Kubernetes primitives")
// Use noContext because the ctx sent to this function will be canceled/done in case of error or canceled by user.
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
err := stopPod(ctx, e, step, defaultDeleteOptions)
if err != nil {
return err
}
if step.Type == types.StepTypeService {
err := stopService(ctx, e, step, defaultDeleteOptions)
if err != nil {
return err
}
}
}
}
for _, vol := range conf.Volumes {
err := stopVolume(ctx, e, vol.Name, defaultDeleteOptions)
if err != nil {
return err
}
}
return nil
}