From 01a955ed0e21f82807ef75bfd3249613f63230c7 Mon Sep 17 00:00:00 2001 From: Thomas Anderson <127358482+zc-devs@users.noreply.github.com> Date: Tue, 19 Dec 2023 06:53:52 +0300 Subject: [PATCH] Kubernetes refactor (#2794) Kubernetes backend refactoring and tests --------- Co-authored-by: 6543 <6543@obermui.de> --- go.mod | 1 + go.sum | 2 + pipeline/backend/kubernetes/kubernetes.go | 120 ++---- pipeline/backend/kubernetes/pod.go | 438 +++++++++++++------- pipeline/backend/kubernetes/pod_test.go | 253 +++++++++++ pipeline/backend/kubernetes/service.go | 71 +++- pipeline/backend/kubernetes/service_test.go | 18 +- pipeline/backend/kubernetes/utils.go | 12 + pipeline/backend/kubernetes/volume.go | 45 +- pipeline/backend/kubernetes/volume_test.go | 27 +- 10 files changed, 719 insertions(+), 268 deletions(-) create mode 100644 pipeline/backend/kubernetes/pod_test.go diff --git a/go.mod b/go.mod index 08778ab7e..4a62179ba 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/gorilla/securecookie v1.1.2 github.com/jellydator/ttlcache/v3 v3.1.1 github.com/joho/godotenv v1.5.1 + github.com/kinbiko/jsonassert v1.1.1 github.com/lib/pq v1.10.9 github.com/mattn/go-sqlite3 v1.14.19 github.com/moby/moby v24.0.7+incompatible diff --git a/go.sum b/go.sum index 340d24751..92aaa845d 100644 --- a/go.sum +++ b/go.sum @@ -248,6 +248,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kinbiko/jsonassert v1.1.1 h1:DB12divY+YB+cVpHULLuKePSi6+ui4M/shHSzJISkSE= +github.com/kinbiko/jsonassert v1.1.1/go.mod h1:NO4lzrogohtIdNUNzx8sdzB55M4R4Q1bsrWVdqQ7C+A= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= diff --git a/pipeline/backend/kubernetes/kubernetes.go b/pipeline/backend/kubernetes/kubernetes.go index c508c2103..c8accfb69 100644 --- a/pipeline/backend/kubernetes/kubernetes.go +++ b/pipeline/backend/kubernetes/kubernetes.go @@ -30,7 +30,6 @@ import ( "github.com/urfave/cli/v2" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -46,16 +45,16 @@ const ( EngineName = "kubernetes" ) -var noContext = context.Background() +var defaultDeleteOptions = newDefaultDeleteOptions() type kube struct { ctx context.Context client kubernetes.Interface - config *Config + config *config goos string } -type Config struct { +type config struct { Namespace string StorageClass string VolumeSize string @@ -68,10 +67,20 @@ type SecurityContextConfig struct { RunAsNonRoot bool } -func configFromCliContext(ctx context.Context) (*Config, error) { +func newDefaultDeleteOptions() metav1.DeleteOptions { + gracePeriodSeconds := int64(0) // immediately + propagationPolicy := metav1.DeletePropagationBackground + + return metav1.DeleteOptions{ + GracePeriodSeconds: &gracePeriodSeconds, + PropagationPolicy: &propagationPolicy, + } +} + +func configFromCliContext(ctx context.Context) (*config, error) { if ctx != nil { if c, ok := ctx.Value(types.CliContext).(*cli.Context); ok { - config := Config{ + config := config{ Namespace: c.String("backend-k8s-namespace"), StorageClass: c.String("backend-k8s-storage-class"), VolumeSize: c.String("backend-k8s-volume-size"), @@ -151,12 +160,7 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives") for _, vol := range conf.Volumes { - pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx) - if err != nil { - return err - } - - _, err = e.client.CoreV1().PersistentVolumeClaims(e.config.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + _, err := startVolume(ctx, e, vol.Name) if err != nil { return err } @@ -167,21 +171,10 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s for _, stage := range conf.Stages { if stage.Alias == "services" { for _, step := range stage.Steps { - stepName, err := dnsName(step.Name) + svc, err := startService(ctx, e, step) if err != nil { return err } - log.Trace().Str("pod-name", stepName).Msgf("Creating service: %s", step.Name) - svc, err := Service(e.config.Namespace, step.Name, step.Ports) - if err != nil { - return err - } - - svc, err = e.client.CoreV1().Services(e.config.Namespace).Create(ctx, svc, metav1.CreateOptions{}) - if err != nil { - return err - } - extraHosts = append(extraHosts, step.Networks[0].Aliases[0]+":"+svc.Spec.ClusterIP) } } @@ -199,13 +192,8 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s // Start the pipeline step. func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error { - pod, err := Pod(e.config.Namespace, step, e.config.PodLabels, e.config.PodAnnotations, e.goos, e.config.SecurityContext) - if err != nil { - return err - } - - log.Trace().Str("taskUUID", taskUUID).Msgf("Creating pod: %s", pod.Name) - _, err = e.client.CoreV1().Pods(e.config.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + log.Trace().Str("taskUUID", taskUUID).Msgf("Starting step: %s", step.Name) + _, err := startPod(ctx, e, step) return err } @@ -341,92 +329,38 @@ func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) // return rc, nil } -func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error { - podName, err := dnsName(step.Name) - if err != nil { - return err - } - - log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping pod: %s", podName) - - gracePeriodSeconds := int64(0) // immediately - dpb := metav1.DeletePropagationBackground - - deleteOpts := metav1.DeleteOptions{ - GracePeriodSeconds: &gracePeriodSeconds, - PropagationPolicy: &dpb, - } - - if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(ctx, podName, deleteOpts); err != nil && !errors.IsNotFound(err) { - return err - } - - return nil +func (e *kube) DestroyStep(_ context.Context, step *types.Step, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name) + err := stopPod(e.ctx, e, step, defaultDeleteOptions) + return err } // Destroy the pipeline environment. func (e *kube) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error { log.Trace().Str("taskUUID", taskUUID).Msg("Deleting Kubernetes primitives") - gracePeriodSeconds := int64(0) // immediately - dpb := metav1.DeletePropagationBackground - - deleteOpts := metav1.DeleteOptions{ - GracePeriodSeconds: &gracePeriodSeconds, - PropagationPolicy: &dpb, - } - // Use noContext because the ctx sent to this function will be canceled/done in case of error or canceled by user. - // Don't abort on 404 errors from k8s, they most likely mean that the pod hasn't been created yet, usually because pipeline was canceled before running all steps. - // Trace log them in case the info could be useful when troubleshooting. - for _, stage := range conf.Stages { for _, step := range stage.Steps { - stepName, err := dnsName(step.Name) + err := stopPod(e.ctx, e, step, defaultDeleteOptions) if err != nil { return err } - log.Trace().Msgf("Deleting pod: %s", stepName) - if err := e.client.CoreV1().Pods(e.config.Namespace).Delete(noContext, stepName, deleteOpts); err != nil { - if !errors.IsNotFound(err) { - return err - } - } - } - } - for _, stage := range conf.Stages { - if stage.Alias == "services" { - for _, step := range stage.Steps { - log.Trace().Msgf("Deleting service: %s", step.Name) - svc, err := Service(e.config.Namespace, step.Name, step.Ports) + if step.Type == types.StepTypeService { + err := stopService(e.ctx, e, step, defaultDeleteOptions) if err != nil { return err } - if err := e.client.CoreV1().Services(e.config.Namespace).Delete(noContext, svc.Name, deleteOpts); err != nil { - if errors.IsNotFound(err) { - log.Trace().Err(err).Msgf("Unable to delete service %s", svc.Name) - } else { - return err - } - } } } } for _, vol := range conf.Volumes { - pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx) + err := stopVolume(e.ctx, e, vol.Name, defaultDeleteOptions) if err != nil { return err } - err = e.client.CoreV1().PersistentVolumeClaims(e.config.Namespace).Delete(noContext, pvc.Name, deleteOpts) - if err != nil { - if errors.IsNotFound(err) { - log.Trace().Err(err).Msgf("Unable to delete pvc %s", pvc.Name) - } else { - return err - } - } } return nil diff --git a/pipeline/backend/kubernetes/pod.go b/pipeline/backend/kubernetes/pod.go index 48e046d32..5f62bc8b2 100644 --- a/pipeline/backend/kubernetes/pod.go +++ b/pipeline/backend/kubernetes/pod.go @@ -15,12 +15,14 @@ package kubernetes import ( + "context" "fmt" "maps" "strings" "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,176 +30,257 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" ) -func Pod(namespace string, step *types.Step, labels, annotations map[string]string, goos string, secCtxConf SecurityContextConfig) (*v1.Pod, error) { - var ( - vols []v1.Volume - volMounts []v1.VolumeMount - entrypoint []string - args []string - ) +const ( + StepLabel = "step" +) - if step.WorkingDir != "" { - for _, vol := range step.Volumes { - volumeName, err := dnsName(strings.Split(vol, ":")[0]) - if err != nil { - return nil, err - } - - vols = append(vols, v1.Volume{ - Name: volumeName, - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeName, - ReadOnly: false, - }, - }, - }) - - volMounts = append(volMounts, v1.VolumeMount{ - Name: volumeName, - MountPath: volumeMountPath(vol), - }) - } - } - - var pullPolicy v1.PullPolicy - if step.Pull { - pullPolicy = v1.PullAlways - } - - if len(step.Commands) != 0 { - scriptEnv, entry, cmds := common.GenerateContainerConf(step.Commands, goos) - for k, v := range scriptEnv { - step.Environment[k] = v - } - entrypoint = entry - args = cmds - } - - hostAliases := []v1.HostAlias{} - for _, extraHost := range step.ExtraHosts { - host := strings.Split(extraHost, ":") - hostAliases = append(hostAliases, v1.HostAlias{IP: host[1], Hostnames: []string{host[0]}}) - } - - resourceRequirements := v1.ResourceRequirements{Requests: v1.ResourceList{}, Limits: v1.ResourceList{}} +func mkPod(namespace, name, image, workDir, goos, serviceAccountName string, + pool, privileged bool, + commands, vols, extraHosts []string, + labels, annotations, env, nodeSelector map[string]string, + tolerations []types.Toleration, resources types.Resources, + securityContext *types.SecurityContext, securityContextConfig SecurityContextConfig, +) (*v1.Pod, error) { var err error - for key, val := range step.BackendOptions.Kubernetes.Resources.Requests { - resourceKey := v1.ResourceName(key) - resourceRequirements.Requests[resourceKey], err = resource.ParseQuantity(val) - if err != nil { - return nil, fmt.Errorf("resource request '%v' quantity '%v': %w", key, val, err) - } - } - for key, val := range step.BackendOptions.Kubernetes.Resources.Limits { - resourceKey := v1.ResourceName(key) - resourceRequirements.Limits[resourceKey], err = resource.ParseQuantity(val) - if err != nil { - return nil, fmt.Errorf("resource limit '%v' quantity '%v': %w", key, val, err) - } - } - var serviceAccountName string - if step.BackendOptions.Kubernetes.ServiceAccountName != "" { - serviceAccountName = step.BackendOptions.Kubernetes.ServiceAccountName - } + meta := podMeta(name, namespace, labels, annotations) - podName, err := dnsName(step.Name) + spec, err := podSpec(serviceAccountName, vols, extraHosts, env, + nodeSelector, tolerations, securityContext, securityContextConfig) if err != nil { return nil, err } - labels["step"] = podName - - var nodeSelector map[string]string - platform, exist := step.Environment["CI_SYSTEM_PLATFORM"] - if exist && platform != "" { - arch := strings.Split(platform, "/")[1] - nodeSelector = map[string]string{v1.LabelArchStable: arch} - log.Trace().Msgf("Using the node selector from the Agent's platform: %v", nodeSelector) + container, err := podContainer(name, image, workDir, goos, pool, privileged, commands, vols, env, + resources, securityContext) + if err != nil { + return nil, err } - beOptNodeSelector := step.BackendOptions.Kubernetes.NodeSelector - if len(beOptNodeSelector) > 0 { - if len(nodeSelector) == 0 { - nodeSelector = beOptNodeSelector - } else { - log.Trace().Msgf("Appending labels to the node selector from the backend options: %v", beOptNodeSelector) - maps.Copy(nodeSelector, beOptNodeSelector) - } - } - - var tolerations []v1.Toleration - beTolerations := step.BackendOptions.Kubernetes.Tolerations - if len(beTolerations) > 0 { - for _, t := range step.BackendOptions.Kubernetes.Tolerations { - toleration := v1.Toleration{ - Key: t.Key, - Operator: v1.TolerationOperator(t.Operator), - Value: t.Value, - Effect: v1.TaintEffect(t.Effect), - TolerationSeconds: t.TolerationSeconds, - } - tolerations = append(tolerations, toleration) - } - log.Trace().Msgf("Tolerations that will be used in the backend options: %v", beTolerations) - } - - beSecurityContext := step.BackendOptions.Kubernetes.SecurityContext - log.Trace().Interface("Security context", beSecurityContext).Msg("Security context that will be used for pods/containers") - podSecCtx := podSecurityContext(beSecurityContext, secCtxConf) - containerSecCtx := containerSecurityContext(beSecurityContext, step.Privileged) + spec.Containers = append(spec.Containers, container) pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Namespace: namespace, - Labels: labels, - Annotations: annotations, - }, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - HostAliases: hostAliases, - NodeSelector: nodeSelector, - Tolerations: tolerations, - ServiceAccountName: serviceAccountName, - SecurityContext: podSecCtx, - Containers: []v1.Container{{ - Name: podName, - Image: step.Image, - ImagePullPolicy: pullPolicy, - Command: entrypoint, - Args: args, - WorkingDir: step.WorkingDir, - Env: mapToEnvVars(step.Environment), - VolumeMounts: volMounts, - Resources: resourceRequirements, - SecurityContext: containerSecCtx, - }}, - ImagePullSecrets: []v1.LocalObjectReference{{Name: "regcred"}}, - Volumes: vols, - }, + ObjectMeta: meta, + Spec: spec, } return pod, nil } -func mapToEnvVars(m map[string]string) []v1.EnvVar { - var ev []v1.EnvVar - for k, v := range m { - ev = append(ev, v1.EnvVar{ - Name: k, - Value: v, - }) - } - return ev +func podName(step *types.Step) (string, error) { + return dnsName(step.Name) } -func volumeMountPath(i string) string { - s := strings.Split(i, ":") - if len(s) > 1 { - return s[1] +func podMeta(name, namespace string, labels, annotations map[string]string) metav1.ObjectMeta { + meta := metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: annotations, + } + + if labels == nil { + labels = make(map[string]string, 1) + } + labels[StepLabel] = name + meta.Labels = labels + + return meta +} + +func podSpec(serviceAccountName string, vols, extraHosts []string, env, backendNodeSelector map[string]string, backendTolerations []types.Toleration, + securityContext *types.SecurityContext, securityContextConfig SecurityContextConfig, +) (v1.PodSpec, error) { + var err error + spec := v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + ServiceAccountName: serviceAccountName, + ImagePullSecrets: []v1.LocalObjectReference{{Name: "regcred"}}, + } + + spec.HostAliases = hostAliases(extraHosts) + spec.NodeSelector = nodeSelector(backendNodeSelector, env["CI_SYSTEM_PLATFORM"]) + spec.Tolerations = tolerations(backendTolerations) + spec.SecurityContext = podSecurityContext(securityContext, securityContextConfig) + spec.Volumes, err = volumes(vols) + if err != nil { + return spec, err + } + + return spec, nil +} + +func podContainer(name, image, workDir, goos string, pull, privileged bool, commands, volumes []string, env map[string]string, resources types.Resources, + securityContext *types.SecurityContext, +) (v1.Container, error) { + var err error + container := v1.Container{ + Name: name, + Image: image, + WorkingDir: workDir, + } + + if pull { + container.ImagePullPolicy = v1.PullAlways + } + + if len(commands) != 0 { + scriptEnv, command, args := common.GenerateContainerConf(commands, goos) + container.Command = command + container.Args = args + maps.Copy(env, scriptEnv) + } + + container.Env = mapToEnvVars(env) + container.SecurityContext = containerSecurityContext(securityContext, privileged) + + container.Resources, err = resourceRequirements(resources) + if err != nil { + return container, err + } + + container.VolumeMounts, err = volumeMounts(volumes) + if err != nil { + return container, err + } + + return container, nil +} + +func volumes(volumes []string) ([]v1.Volume, error) { + var vols []v1.Volume + + for _, v := range volumes { + volumeName, err := volumeName(v) + if err != nil { + return nil, err + } + vols = append(vols, volume(volumeName)) + } + + return vols, nil +} + +func volume(name string) v1.Volume { + pvcSource := v1.PersistentVolumeClaimVolumeSource{ + ClaimName: name, + ReadOnly: false, + } + return v1.Volume{ + Name: name, + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &pvcSource, + }, + } +} + +func volumeMounts(volumes []string) ([]v1.VolumeMount, error) { + var mounts []v1.VolumeMount + + for _, v := range volumes { + volumeName, err := volumeName(v) + if err != nil { + return nil, err + } + + mount := volumeMount(volumeName, volumeMountPath(v)) + mounts = append(mounts, mount) + } + return mounts, nil +} + +func volumeMount(name, path string) v1.VolumeMount { + return v1.VolumeMount{ + Name: name, + MountPath: path, + } +} + +// Here is the service IPs (placed in /etc/hosts in the Pod) +func hostAliases(extraHosts []string) []v1.HostAlias { + hostAliases := []v1.HostAlias{} + for _, extraHost := range extraHosts { + hostAlias := hostAlias(extraHost) + hostAliases = append(hostAliases, hostAlias) + } + return hostAliases +} + +func hostAlias(extraHost string) v1.HostAlias { + host := strings.Split(extraHost, ":") + return v1.HostAlias{ + IP: host[1], + Hostnames: []string{host[0]}, + } +} + +func resourceRequirements(resources types.Resources) (v1.ResourceRequirements, error) { + var err error + requirements := v1.ResourceRequirements{} + + requirements.Requests, err = resourceList(resources.Requests) + if err != nil { + return requirements, err + } + + requirements.Limits, err = resourceList(resources.Limits) + if err != nil { + return requirements, err + } + + return requirements, nil +} + +func resourceList(resources map[string]string) (v1.ResourceList, error) { + requestResources := v1.ResourceList{} + for key, val := range resources { + resName := v1.ResourceName(key) + resVal, err := resource.ParseQuantity(val) + if err != nil { + return nil, fmt.Errorf("resource request '%v' quantity '%v': %w", key, val, err) + } + requestResources[resName] = resVal + } + return requestResources, nil +} + +func nodeSelector(backendNodeSelector map[string]string, platform string) map[string]string { + nodeSelector := make(map[string]string) + + if platform != "" { + arch := strings.Split(platform, "/")[1] + nodeSelector[v1.LabelArchStable] = arch + log.Trace().Msgf("Using the node selector from the Agent's platform: %v", nodeSelector) + } + + if len(backendNodeSelector) > 0 { + log.Trace().Msgf("Appending labels to the node selector from the backend options: %v", backendNodeSelector) + maps.Copy(nodeSelector, backendNodeSelector) + } + + return nodeSelector +} + +func tolerations(backendTolerations []types.Toleration) []v1.Toleration { + var tolerations []v1.Toleration + + if len(backendTolerations) > 0 { + log.Trace().Msgf("Tolerations that will be used in the backend options: %v", backendTolerations) + for _, backendToleration := range backendTolerations { + toleration := toleration(backendToleration) + tolerations = append(tolerations, toleration) + } + } + + return tolerations +} + +func toleration(backendToleration types.Toleration) v1.Toleration { + return v1.Toleration{ + Key: backendToleration.Key, + Operator: v1.TolerationOperator(backendToleration.Operator), + Value: backendToleration.Value, + Effect: v1.TaintEffect(backendToleration.Effect), + TolerationSeconds: backendToleration.TolerationSeconds, } - return s[0] } func podSecurityContext(sc *types.SecurityContext, secCtxConf SecurityContextConfig) *v1.PodSecurityContext { @@ -226,12 +309,14 @@ func podSecurityContext(sc *types.SecurityContext, secCtxConf SecurityContextCon return nil } - return &v1.PodSecurityContext{ + securityContext := &v1.PodSecurityContext{ RunAsNonRoot: nonRoot, RunAsUser: user, RunAsGroup: group, FSGroup: fsGroup, } + log.Trace().Msgf("Pod security context that will be used: %v", securityContext) + return securityContext } func containerSecurityContext(sc *types.SecurityContext, stepPrivileged bool) *v1.SecurityContext { @@ -247,7 +332,54 @@ func containerSecurityContext(sc *types.SecurityContext, stepPrivileged bool) *v return nil } - return &v1.SecurityContext{ + securityContext := &v1.SecurityContext{ Privileged: privileged, } + log.Trace().Msgf("Container security context that will be used: %v", securityContext) + return securityContext +} + +func mapToEnvVars(m map[string]string) []v1.EnvVar { + var ev []v1.EnvVar + for k, v := range m { + ev = append(ev, v1.EnvVar{ + Name: k, + Value: v, + }) + } + return ev +} + +func startPod(ctx context.Context, engine *kube, step *types.Step) (*v1.Pod, error) { + podName, err := podName(step) + if err != nil { + return nil, err + } + + pod, err := mkPod(engine.config.Namespace, podName, step.Image, step.WorkingDir, engine.goos, step.BackendOptions.Kubernetes.ServiceAccountName, + step.Pull, step.Privileged, + step.Commands, step.Volumes, step.ExtraHosts, + engine.config.PodLabels, engine.config.PodAnnotations, step.Environment, step.BackendOptions.Kubernetes.NodeSelector, + step.BackendOptions.Kubernetes.Tolerations, step.BackendOptions.Kubernetes.Resources, step.BackendOptions.Kubernetes.SecurityContext, engine.config.SecurityContext) + if err != nil { + return nil, err + } + + log.Trace().Msgf("Creating pod: %s", pod.Name) + return engine.client.CoreV1().Pods(engine.config.Namespace).Create(ctx, pod, metav1.CreateOptions{}) +} + +func stopPod(ctx context.Context, engine *kube, step *types.Step, deleteOpts metav1.DeleteOptions) error { + podName, err := podName(step) + if err != nil { + return err + } + log.Trace().Str("name", podName).Msg("Deleting pod") + + err = engine.client.CoreV1().Pods(engine.config.Namespace).Delete(ctx, podName, deleteOpts) + if errors.IsNotFound(err) { + // Don't abort on 404 errors from k8s, they most likely mean that the pod hasn't been created yet, usually because pipeline was canceled before running all steps. + return nil + } + return err } diff --git a/pipeline/backend/kubernetes/pod_test.go b/pipeline/backend/kubernetes/pod_test.go new file mode 100644 index 000000000..24f7d64b7 --- /dev/null +++ b/pipeline/backend/kubernetes/pod_test.go @@ -0,0 +1,253 @@ +// Copyright 2023 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "encoding/json" + "testing" + + "github.com/kinbiko/jsonassert" + "github.com/stretchr/testify/assert" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" +) + +func TestPodName(t *testing.T) { + name, err := podName(&types.Step{Name: "wp_01he8bebctabr3kgk0qj36d2me_0"}) + assert.NoError(t, err) + assert.Equal(t, "wp-01he8bebctabr3kgk0qj36d2me-0", name) + + name, err = podName(&types.Step{Name: "wp\\01he8bebctabr3kgk0qj36d2me-0"}) + assert.NoError(t, err) + assert.Equal(t, "wp\\01he8bebctabr3kgk0qj36d2me-0", name) + + _, err = podName(&types.Step{Name: "wp-01he8bebctabr3kgk0qj36d2me-0-services-0.woodpecker-runtime.svc.cluster.local"}) + assert.ErrorIs(t, err, ErrDNSPatternInvalid) +} + +func TestTinyPod(t *testing.T) { + expected := ` + { + "metadata": { + "name": "wp-01he8bebctabr3kgk0qj36d2me-0", + "namespace": "woodpecker", + "creationTimestamp": null, + "labels": { + "step": "wp-01he8bebctabr3kgk0qj36d2me-0" + } + }, + "spec": { + "volumes": [ + { + "name": "workspace", + "persistentVolumeClaim": { + "claimName": "workspace" + } + } + ], + "containers": [ + { + "name": "wp-01he8bebctabr3kgk0qj36d2me-0", + "image": "gradle:8.4.0-jdk21", + "command": [ + "/bin/sh", + "-c" + ], + "args": [ + "echo $CI_SCRIPT | base64 -d | /bin/sh -e" + ], + "workingDir": "/woodpecker/src", + "env": [ + "<>", + { + "name": "CI", + "value": "woodpecker" + }, + { + "name": "HOME", + "value": "/root" + }, + { + "name": "SHELL", + "value": "/bin/sh" + }, + { + "name": "CI_SCRIPT", + "value": "CmlmIFsgLW4gIiRDSV9ORVRSQ19NQUNISU5FIiBdOyB0aGVuCmNhdCA8PEVPRiA+ICRIT01FLy5uZXRyYwptYWNoaW5lICRDSV9ORVRSQ19NQUNISU5FCmxvZ2luICRDSV9ORVRSQ19VU0VSTkFNRQpwYXNzd29yZCAkQ0lfTkVUUkNfUEFTU1dPUkQKRU9GCmNobW9kIDA2MDAgJEhPTUUvLm5ldHJjCmZpCnVuc2V0IENJX05FVFJDX1VTRVJOQU1FCnVuc2V0IENJX05FVFJDX1BBU1NXT1JECnVuc2V0IENJX1NDUklQVAoKZWNobyArICdncmFkbGUgYnVpbGQnCmdyYWRsZSBidWlsZAo=" + } + ], + "resources": {}, + "volumeMounts": [ + { + "name": "workspace", + "mountPath": "/woodpecker/src" + } + ] + } + ], + "restartPolicy": "Never", + "imagePullSecrets": [ + { + "name": "regcred" + } + ] + }, + "status": {} + }` + + pod, err := mkPod("woodpecker", "wp-01he8bebctabr3kgk0qj36d2me-0", "gradle:8.4.0-jdk21", "/woodpecker/src", "linux/amd64", "", + false, false, + []string{"gradle build"}, []string{"workspace:/woodpecker/src"}, nil, + nil, nil, map[string]string{"CI": "woodpecker"}, nil, + nil, + types.Resources{Requests: nil, Limits: nil}, nil, SecurityContextConfig{}, + ) + assert.NoError(t, err) + + json, err := json.Marshal(pod) + assert.NoError(t, err) + + ja := jsonassert.New(t) + ja.Assertf(string(json), expected) +} + +func TestFullPod(t *testing.T) { + expected := ` + { + "metadata": { + "name": "wp-01he8bebctabr3kgk0qj36d2me-0", + "namespace": "woodpecker", + "creationTimestamp": null, + "labels": { + "app": "test", + "step": "wp-01he8bebctabr3kgk0qj36d2me-0" + }, + "annotations": { + "apparmor.security": "runtime/default" + } + }, + "spec": { + "volumes": [ + { + "name": "woodpecker-cache", + "persistentVolumeClaim": { + "claimName": "woodpecker-cache" + } + } + ], + "containers": [ + { + "name": "wp-01he8bebctabr3kgk0qj36d2me-0", + "image": "meltwater/drone-cache", + "command": [ + "/bin/sh", + "-c" + ], + "args": [ + "echo $CI_SCRIPT | base64 -d | /bin/sh -e" + ], + "workingDir": "/woodpecker/src", + "env": [ + "<>", + { + "name": "CGO", + "value": "0" + }, + { + "name": "CI_SCRIPT", + "value": "CmlmIFsgLW4gIiRDSV9ORVRSQ19NQUNISU5FIiBdOyB0aGVuCmNhdCA8PEVPRiA+ICRIT01FLy5uZXRyYwptYWNoaW5lICRDSV9ORVRSQ19NQUNISU5FCmxvZ2luICRDSV9ORVRSQ19VU0VSTkFNRQpwYXNzd29yZCAkQ0lfTkVUUkNfUEFTU1dPUkQKRU9GCmNobW9kIDA2MDAgJEhPTUUvLm5ldHJjCmZpCnVuc2V0IENJX05FVFJDX1VTRVJOQU1FCnVuc2V0IENJX05FVFJDX1BBU1NXT1JECnVuc2V0IENJX1NDUklQVAoKZWNobyArICdnbyBnZXQnCmdvIGdldAoKZWNobyArICdnbyB0ZXN0JwpnbyB0ZXN0Cg==" + }, + { + "name": "HOME", + "value": "/root" + }, + { + "name": "SHELL", + "value": "/bin/sh" + } + ], + "resources": { + "limits": { + "cpu": "2", + "memory": "256Mi" + }, + "requests": { + "cpu": "1", + "memory": "128Mi" + } + }, + "volumeMounts": [ + { + "name": "woodpecker-cache", + "mountPath": "/woodpecker/src/cache" + } + ], + "imagePullPolicy": "Always", + "securityContext": { + "privileged": true + } + } + ], + "restartPolicy": "Never", + "nodeSelector": { + "storage": "ssd" + }, + "serviceAccountName": "wp-svc-acc", + "securityContext": { + "runAsUser": 101, + "runAsGroup": 101, + "runAsNonRoot": true, + "fsGroup": 101 + }, + "imagePullSecrets": [ + { + "name": "regcred" + } + ], + "tolerations": [ + { + "key": "net-port", + "value": "100Mbit", + "effect": "NoSchedule" + } + ], + "hostAliases": [ + { + "ip": "1.1.1.1", + "hostnames": [ + "cloudflare" + ] + } + ] + }, + "status": {} + }` + + pod, err := mkPod("woodpecker", "wp-01he8bebctabr3kgk0qj36d2me-0", "meltwater/drone-cache", "/woodpecker/src", "linux/amd64", "wp-svc-acc", + true, true, + []string{"go get", "go test"}, []string{"woodpecker-cache:/woodpecker/src/cache"}, []string{"cloudflare:1.1.1.1"}, + map[string]string{"app": "test"}, map[string]string{"apparmor.security": "runtime/default"}, map[string]string{"CGO": "0"}, map[string]string{"storage": "ssd"}, + []types.Toleration{{Key: "net-port", Value: "100Mbit", Effect: types.TaintEffectNoSchedule}}, + types.Resources{Requests: map[string]string{"memory": "128Mi", "cpu": "1000m"}, Limits: map[string]string{"memory": "256Mi", "cpu": "2"}}, + &types.SecurityContext{Privileged: newBool(true), RunAsNonRoot: newBool(true), RunAsUser: newInt64(101), RunAsGroup: newInt64(101), FSGroup: newInt64(101)}, + SecurityContextConfig{RunAsNonRoot: false}, + ) + assert.NoError(t, err) + + json, err := json.Marshal(pod) + assert.NoError(t, err) + + ja := jsonassert.New(t) + ja.Assertf(string(json), expected) +} diff --git a/pipeline/backend/kubernetes/service.go b/pipeline/backend/kubernetes/service.go index b1233f056..2fa66a462 100644 --- a/pipeline/backend/kubernetes/service.go +++ b/pipeline/backend/kubernetes/service.go @@ -15,14 +15,20 @@ package kubernetes import ( + "context" "fmt" + "github.com/rs/zerolog/log" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" ) -func Service(namespace, name string, ports []uint16) (*v1.Service, error) { +func mkService(namespace, name string, ports []uint16, selector map[string]string) (*v1.Service, error) { + log.Trace().Str("name", name).Interface("selector", selector).Interface("ports", ports).Msg("Creating service") + var svcPorts []v1.ServicePort for _, port := range ports { svcPorts = append(svcPorts, v1.ServicePort{ @@ -32,22 +38,57 @@ func Service(namespace, name string, ports []uint16) (*v1.Service, error) { }) } - dnsName, err := dnsName(name) + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + Selector: selector, + Ports: svcPorts, + }, + }, nil +} + +func serviceName(step *types.Step) (string, error) { + return dnsName(step.Name) +} + +func startService(ctx context.Context, engine *kube, step *types.Step) (*v1.Service, error) { + name, err := serviceName(step) + if err != nil { + return nil, err + } + podName, err := podName(step) if err != nil { return nil, err } - return &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: dnsName, - Namespace: namespace, - }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeClusterIP, - Selector: map[string]string{ - "step": dnsName, - }, - Ports: svcPorts, - }, - }, nil + selector := map[string]string{ + StepLabel: podName, + } + + svc, err := mkService(engine.config.Namespace, name, step.Ports, selector) + if err != nil { + return nil, err + } + + return engine.client.CoreV1().Services(engine.config.Namespace).Create(ctx, svc, metav1.CreateOptions{}) +} + +func stopService(ctx context.Context, engine *kube, step *types.Step, deleteOpts metav1.DeleteOptions) error { + svcName, err := serviceName(step) + if err != nil { + return err + } + log.Trace().Str("name", svcName).Msg("Deleting service") + + err = engine.client.CoreV1().Services(engine.config.Namespace).Delete(ctx, svcName, deleteOpts) + if errors.IsNotFound(err) { + // Don't abort on 404 errors from k8s, they most likely mean that the pod hasn't been created yet, usually because pipeline was canceled before running all steps. + log.Trace().Err(err).Msgf("Unable to delete service %s", svcName) + return nil + } + return err } diff --git a/pipeline/backend/kubernetes/service_test.go b/pipeline/backend/kubernetes/service_test.go index 59a631b15..5dc06509f 100644 --- a/pipeline/backend/kubernetes/service_test.go +++ b/pipeline/backend/kubernetes/service_test.go @@ -19,8 +19,22 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" ) +func TestServiceName(t *testing.T) { + name, err := serviceName(&types.Step{Name: "wp_01he8bebctabr3kgk0qj36d2me_0_services_0"}) + assert.NoError(t, err) + assert.Equal(t, "wp-01he8bebctabr3kgk0qj36d2me-0-services-0", name) + + name, err = serviceName(&types.Step{Name: "wp-01he8bebctabr3kgk0qj36d2me-0\\services-0"}) + assert.NoError(t, err) + assert.Equal(t, "wp-01he8bebctabr3kgk0qj36d2me-0\\services-0", name) + + _, err = serviceName(&types.Step{Name: "wp-01he8bebctabr3kgk0qj36d2me-0-services-0.woodpecker-runtime.svc.cluster.local"}) + assert.ErrorIs(t, err, ErrDNSPatternInvalid) +} + func TestService(t *testing.T) { expected := ` { @@ -48,7 +62,7 @@ func TestService(t *testing.T) { } ], "selector": { - "step": "bar" + "step": "baz" }, "type": "ClusterIP" }, @@ -57,7 +71,7 @@ func TestService(t *testing.T) { } }` - s, _ := Service("foo", "bar", []uint16{1, 2, 3}) + s, _ := mkService("foo", "bar", []uint16{1, 2, 3}, map[string]string{"step": "baz"}) j, err := json.Marshal(s) assert.NoError(t, err) assert.JSONEq(t, expected, string(j)) diff --git a/pipeline/backend/kubernetes/utils.go b/pipeline/backend/kubernetes/utils.go index 3b177e2fc..fd924f357 100644 --- a/pipeline/backend/kubernetes/utils.go +++ b/pipeline/backend/kubernetes/utils.go @@ -78,3 +78,15 @@ func getClientInsideOfCluster() (kubernetes.Interface, error) { return kubernetes.NewForConfig(config) } + +func newBool(val bool) *bool { + ptr := new(bool) + *ptr = val + return ptr +} + +func newInt64(val int64) *int64 { + ptr := new(int64) + *ptr = val + return ptr +} diff --git a/pipeline/backend/kubernetes/volume.go b/pipeline/backend/kubernetes/volume.go index 899ee142a..92ea53222 100644 --- a/pipeline/backend/kubernetes/volume.go +++ b/pipeline/backend/kubernetes/volume.go @@ -15,14 +15,17 @@ package kubernetes import ( + "context" "strings" + "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func PersistentVolumeClaim(namespace, name, storageClass, size string, storageRwx bool) (*v1.PersistentVolumeClaim, error) { +func mkPersistentVolumeClaim(namespace, name, storageClass, size string, storageRwx bool) (*v1.PersistentVolumeClaim, error) { _storageClass := &storageClass if storageClass == "" { _storageClass = nil @@ -36,7 +39,7 @@ func PersistentVolumeClaim(namespace, name, storageClass, size string, storageRw accessMode = v1.ReadWriteOnce } - volumeName, err := dnsName(strings.Split(name, ":")[0]) + volumeName, err := volumeName(name) if err != nil { return nil, err } @@ -59,3 +62,41 @@ func PersistentVolumeClaim(namespace, name, storageClass, size string, storageRw return pvc, nil } + +func volumeName(name string) (string, error) { + return dnsName(strings.Split(name, ":")[0]) +} + +func volumeMountPath(name string) string { + s := strings.Split(name, ":") + if len(s) > 1 { + return s[1] + } + return s[0] +} + +func startVolume(ctx context.Context, engine *kube, name string) (*v1.PersistentVolumeClaim, error) { + pvc, err := mkPersistentVolumeClaim(engine.config.Namespace, name, engine.config.StorageClass, engine.config.VolumeSize, engine.config.StorageRwx) + if err != nil { + return nil, err + } + + log.Trace().Msgf("Creating volume: %s", pvc.Name) + return engine.client.CoreV1().PersistentVolumeClaims(engine.config.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) +} + +func stopVolume(ctx context.Context, engine *kube, name string, deleteOpts metav1.DeleteOptions) error { + pvcName, err := volumeName(name) + if err != nil { + return err + } + log.Trace().Str("name", pvcName).Msg("Deleting volume") + + err = engine.client.CoreV1().PersistentVolumeClaims(engine.config.Namespace).Delete(ctx, pvcName, deleteOpts) + if errors.IsNotFound(err) { + // Don't abort on 404 errors from k8s, they most likely mean that the pod hasn't been created yet, usually because pipeline was canceled before running all steps. + log.Trace().Err(err).Msgf("Unable to delete service %s", pvcName) + return nil + } + return err +} diff --git a/pipeline/backend/kubernetes/volume_test.go b/pipeline/backend/kubernetes/volume_test.go index fb38a6825..90e180d78 100644 --- a/pipeline/backend/kubernetes/volume_test.go +++ b/pipeline/backend/kubernetes/volume_test.go @@ -21,6 +21,27 @@ import ( "github.com/stretchr/testify/assert" ) +func TestPvcName(t *testing.T) { + name, err := volumeName("woodpecker_cache:/woodpecker/src/cache") + assert.NoError(t, err) + assert.Equal(t, "woodpecker-cache", name) + + name, err = volumeName("woodpecker\\cache") + assert.NoError(t, err) + assert.Equal(t, "woodpecker\\cache", name) + + _, err = volumeName("-woodpecker.cache:/woodpecker/src/cache") + assert.ErrorIs(t, err, ErrDNSPatternInvalid) +} + +func TestPvcMount(t *testing.T) { + mount := volumeMountPath("woodpecker-cache:/woodpecker/src/cache") + assert.Equal(t, "/woodpecker/src/cache", mount) + + mount = volumeMountPath("/woodpecker/src/cache") + assert.Equal(t, "/woodpecker/src/cache", mount) +} + func TestPersistentVolumeClaim(t *testing.T) { expectedRwx := ` { @@ -64,20 +85,20 @@ func TestPersistentVolumeClaim(t *testing.T) { "status": {} }` - pvc, err := PersistentVolumeClaim("someNamespace", "somename", "local-storage", "1Gi", true) + pvc, err := mkPersistentVolumeClaim("someNamespace", "somename", "local-storage", "1Gi", true) assert.NoError(t, err) j, err := json.Marshal(pvc) assert.NoError(t, err) assert.JSONEq(t, expectedRwx, string(j)) - pvc, err = PersistentVolumeClaim("someNamespace", "somename", "local-storage", "1Gi", false) + pvc, err = mkPersistentVolumeClaim("someNamespace", "somename", "local-storage", "1Gi", false) assert.NoError(t, err) j, err = json.Marshal(pvc) assert.NoError(t, err) assert.JSONEq(t, expectedRwo, string(j)) - _, err = PersistentVolumeClaim("someNamespace", "some0INVALID3name", "local-storage", "1Gi", false) + _, err = mkPersistentVolumeClaim("someNamespace", "some0INVALID3name", "local-storage", "1Gi", false) assert.Error(t, err) }