mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-01-22 07:08:58 +00:00
438 lines
12 KiB
Go
438 lines
12 KiB
Go
// Copyright 2022 Woodpecker Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package kubernetes
|
|
|
|
import (
|
|
"context"
|
|
std_errs "errors"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"os"
|
|
"runtime"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/urfave/cli/v3"
|
|
"gopkg.in/yaml.v3"
|
|
v1 "k8s.io/api/core/v1"
|
|
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // To authenticate to GCP K8s clusters
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
|
|
)
|
|
|
|
const (
|
|
EngineName = "kubernetes"
|
|
// TODO: 5 seconds is against best practice, k3s didn't work otherwise
|
|
defaultResyncDuration = 5 * time.Second
|
|
)
|
|
|
|
var defaultDeleteOptions = newDefaultDeleteOptions()
|
|
|
|
type kube struct {
|
|
client kubernetes.Interface
|
|
config *config
|
|
goos string
|
|
}
|
|
|
|
type config struct {
|
|
Namespace string
|
|
StorageClass string
|
|
VolumeSize string
|
|
StorageRwx bool
|
|
PodLabels map[string]string
|
|
PodLabelsAllowFromStep bool
|
|
PodAnnotations map[string]string
|
|
PodAnnotationsAllowFromStep bool
|
|
PodNodeSelector map[string]string
|
|
ImagePullSecretNames []string
|
|
SecurityContext SecurityContextConfig
|
|
NativeSecretsAllowFromStep bool
|
|
}
|
|
type SecurityContextConfig struct {
|
|
RunAsNonRoot bool
|
|
FSGroup *int64
|
|
}
|
|
|
|
func newDefaultDeleteOptions() meta_v1.DeleteOptions {
|
|
gracePeriodSeconds := int64(0) // immediately
|
|
propagationPolicy := meta_v1.DeletePropagationBackground
|
|
|
|
return meta_v1.DeleteOptions{
|
|
GracePeriodSeconds: &gracePeriodSeconds,
|
|
PropagationPolicy: &propagationPolicy,
|
|
}
|
|
}
|
|
|
|
func configFromCliContext(ctx context.Context) (*config, error) {
|
|
if ctx != nil {
|
|
if c, ok := ctx.Value(types.CliCommand).(*cli.Command); ok {
|
|
config := config{
|
|
Namespace: c.String("backend-k8s-namespace"),
|
|
StorageClass: c.String("backend-k8s-storage-class"),
|
|
VolumeSize: c.String("backend-k8s-volume-size"),
|
|
StorageRwx: c.Bool("backend-k8s-storage-rwx"),
|
|
PodLabels: make(map[string]string), // just init empty map to prevent nil panic
|
|
PodLabelsAllowFromStep: c.Bool("backend-k8s-pod-labels-allow-from-step"),
|
|
PodAnnotations: make(map[string]string), // just init empty map to prevent nil panic
|
|
PodAnnotationsAllowFromStep: c.Bool("backend-k8s-pod-annotations-allow-from-step"),
|
|
PodNodeSelector: make(map[string]string), // just init empty map to prevent nil panic
|
|
ImagePullSecretNames: c.StringSlice("backend-k8s-pod-image-pull-secret-names"),
|
|
SecurityContext: SecurityContextConfig{
|
|
RunAsNonRoot: c.Bool("backend-k8s-secctx-nonroot"), // cspell:words secctx nonroot
|
|
FSGroup: newInt64(defaultFSGroup),
|
|
},
|
|
NativeSecretsAllowFromStep: c.Bool("backend-k8s-allow-native-secrets"),
|
|
}
|
|
// Unmarshal label and annotation settings here to ensure they're valid on startup
|
|
if labels := c.String("backend-k8s-pod-labels"); labels != "" {
|
|
if err := yaml.Unmarshal([]byte(labels), &config.PodLabels); err != nil {
|
|
log.Error().Err(err).Msgf("could not unmarshal pod labels '%s'", c.String("backend-k8s-pod-labels"))
|
|
return nil, err
|
|
}
|
|
}
|
|
if annotations := c.String("backend-k8s-pod-annotations"); annotations != "" {
|
|
if err := yaml.Unmarshal([]byte(c.String("backend-k8s-pod-annotations")), &config.PodAnnotations); err != nil {
|
|
log.Error().Err(err).Msgf("could not unmarshal pod annotations '%s'", c.String("backend-k8s-pod-annotations"))
|
|
return nil, err
|
|
}
|
|
}
|
|
if nodeSelector := c.String("backend-k8s-pod-node-selector"); nodeSelector != "" {
|
|
if err := yaml.Unmarshal([]byte(nodeSelector), &config.PodNodeSelector); err != nil {
|
|
log.Error().Err(err).Msgf("could not unmarshal pod node selector '%s'", nodeSelector)
|
|
return nil, err
|
|
}
|
|
}
|
|
return &config, nil
|
|
}
|
|
}
|
|
|
|
return nil, types.ErrNoCliContextFound
|
|
}
|
|
|
|
// New returns a new Kubernetes Backend.
|
|
func New() types.Backend {
|
|
return &kube{}
|
|
}
|
|
|
|
func (e *kube) Name() string {
|
|
return EngineName
|
|
}
|
|
|
|
func (e *kube) IsAvailable(context.Context) bool {
|
|
host := os.Getenv("KUBERNETES_SERVICE_HOST")
|
|
return len(host) > 0
|
|
}
|
|
|
|
func (e *kube) Flags() []cli.Flag {
|
|
return Flags
|
|
}
|
|
|
|
func (e *kube) Load(ctx context.Context) (*types.BackendInfo, error) {
|
|
config, err := configFromCliContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
e.config = config
|
|
|
|
var kubeClient kubernetes.Interface
|
|
_, err = rest.InClusterConfig()
|
|
if err != nil {
|
|
kubeClient, err = getClientOutOfCluster()
|
|
} else {
|
|
kubeClient, err = getClientInsideOfCluster()
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.client = kubeClient
|
|
|
|
// TODO(2693): use info resp of kubeClient to define platform var
|
|
e.goos = runtime.GOOS
|
|
return &types.BackendInfo{
|
|
Platform: runtime.GOOS + "/" + runtime.GOARCH,
|
|
}, nil
|
|
}
|
|
|
|
func (e *kube) getConfig() *config {
|
|
if e.config == nil {
|
|
return nil
|
|
}
|
|
c := *e.config
|
|
c.PodLabels = maps.Clone(e.config.PodLabels)
|
|
c.PodAnnotations = maps.Clone(e.config.PodAnnotations)
|
|
c.PodNodeSelector = maps.Clone(e.config.PodNodeSelector)
|
|
c.ImagePullSecretNames = slices.Clone(e.config.ImagePullSecretNames)
|
|
return &c
|
|
}
|
|
|
|
// SetupWorkflow sets up the pipeline environment.
|
|
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
|
|
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
|
|
|
|
for _, vol := range conf.Volumes {
|
|
_, err := startVolume(ctx, e, vol.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var extraHosts []types.HostAlias
|
|
for _, stage := range conf.Stages {
|
|
for _, step := range stage.Steps {
|
|
if step.Type == types.StepTypeService {
|
|
svc, err := startService(ctx, e, step)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hostAlias := types.HostAlias{Name: step.Networks[0].Aliases[0], IP: svc.Spec.ClusterIP}
|
|
extraHosts = append(extraHosts, hostAlias)
|
|
}
|
|
}
|
|
}
|
|
log.Trace().Msgf("adding extra hosts: %v", extraHosts)
|
|
for _, stage := range conf.Stages {
|
|
for _, step := range stage.Steps {
|
|
step.ExtraHosts = extraHosts
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StartStep starts the pipeline step.
|
|
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
|
|
options, err := parseBackendOptions(step)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("could not parse backend options")
|
|
}
|
|
|
|
if needsRegistrySecret(step) {
|
|
err = startRegistrySecret(ctx, e, step)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Trace().Str("taskUUID", taskUUID).Msgf("starting step: %s", step.Name)
|
|
_, err = startPod(ctx, e, step, options)
|
|
return err
|
|
}
|
|
|
|
// WaitStep waits for the pipeline step to complete and returns
|
|
// the completion results.
|
|
func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
|
|
podName, err := stepToPodName(step)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Trace().Str("taskUUID", taskUUID).Msgf("waiting for pod: %s", podName)
|
|
|
|
finished := make(chan bool)
|
|
|
|
podUpdated := func(_, newPod any) {
|
|
pod, ok := newPod.(*v1.Pod)
|
|
if !ok {
|
|
log.Error().Msgf("could not parse pod: %v", newPod)
|
|
return
|
|
}
|
|
|
|
if pod.Name == podName {
|
|
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
|
|
finished <- true
|
|
}
|
|
|
|
switch pod.Status.Phase {
|
|
case v1.PodSucceeded, v1.PodFailed, v1.PodUnknown:
|
|
finished <- true
|
|
}
|
|
}
|
|
}
|
|
|
|
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.Namespace))
|
|
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
|
|
cache.ResourceEventHandlerFuncs{
|
|
UpdateFunc: podUpdated,
|
|
},
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stop := make(chan struct{})
|
|
si.Start(stop)
|
|
defer close(stop)
|
|
|
|
// TODO: Cancel on ctx.Done
|
|
<-finished
|
|
|
|
pod, err := e.client.CoreV1().Pods(e.config.Namespace).Get(ctx, podName, meta_v1.GetOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
|
|
return nil, fmt.Errorf("could not pull image for pod %s", podName)
|
|
}
|
|
|
|
if len(pod.Status.ContainerStatuses) == 0 {
|
|
return nil, fmt.Errorf("no container statuses found for pod %s", podName)
|
|
}
|
|
|
|
cs := pod.Status.ContainerStatuses[0]
|
|
|
|
if cs.State.Terminated == nil {
|
|
err := fmt.Errorf("no terminated state found for container %s/%s", podName, cs.Name)
|
|
log.Error().Str("taskUUID", taskUUID).Str("pod", podName).Str("container", cs.Name).Interface("state", cs.State).Msg(err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
bs := &types.State{
|
|
ExitCode: int(cs.State.Terminated.ExitCode),
|
|
Exited: true,
|
|
OOMKilled: false,
|
|
}
|
|
|
|
return bs, nil
|
|
}
|
|
|
|
// TailStep tails the pipeline step logs.
|
|
func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
|
|
podName, err := stepToPodName(step)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of pod: %s", podName)
|
|
|
|
up := make(chan bool)
|
|
|
|
podUpdated := func(_, newPod any) {
|
|
pod, ok := newPod.(*v1.Pod)
|
|
if !ok {
|
|
log.Error().Msgf("could not parse pod: %v", newPod)
|
|
return
|
|
}
|
|
|
|
if pod.Name == podName {
|
|
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
|
|
up <- true
|
|
}
|
|
switch pod.Status.Phase {
|
|
case v1.PodRunning, v1.PodSucceeded, v1.PodFailed:
|
|
up <- true
|
|
}
|
|
}
|
|
}
|
|
|
|
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.Namespace))
|
|
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
|
|
cache.ResourceEventHandlerFuncs{
|
|
UpdateFunc: podUpdated,
|
|
},
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stop := make(chan struct{})
|
|
si.Start(stop)
|
|
defer close(stop)
|
|
|
|
<-up
|
|
|
|
opts := &v1.PodLogOptions{
|
|
Follow: true,
|
|
Container: podName,
|
|
}
|
|
|
|
logs, err := e.client.CoreV1().RESTClient().Get().
|
|
Namespace(e.config.Namespace).
|
|
Name(podName).
|
|
Resource("pods").
|
|
SubResource("log").
|
|
VersionedParams(opts, scheme.ParameterCodec).
|
|
Stream(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rc, wc := io.Pipe()
|
|
|
|
go func() {
|
|
defer logs.Close()
|
|
defer wc.Close()
|
|
|
|
_, err = io.Copy(wc, logs)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}()
|
|
return rc, nil
|
|
}
|
|
|
|
func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
|
|
var errs []error
|
|
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name)
|
|
if needsRegistrySecret(step) {
|
|
err := stopRegistrySecret(ctx, e, step, defaultDeleteOptions)
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
|
|
err := stopPod(ctx, e, step, defaultDeleteOptions)
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
return std_errs.Join(errs...)
|
|
}
|
|
|
|
// DestroyWorkflow destroys the pipeline environment.
|
|
func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
|
|
log.Trace().Str("taskUUID", taskUUID).Msg("deleting Kubernetes primitives")
|
|
|
|
for _, stage := range conf.Stages {
|
|
for _, step := range stage.Steps {
|
|
err := stopPod(ctx, e, step, defaultDeleteOptions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if step.Type == types.StepTypeService {
|
|
err := stopService(ctx, e, step, defaultDeleteOptions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, vol := range conf.Volumes {
|
|
err := stopVolume(ctx, e, vol.Name, defaultDeleteOptions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|