1
0
Fork 0
mirror of https://github.com/woodpecker-ci/woodpecker.git synced 2025-04-10 13:54:13 +00:00

extract pipeline_name from metadata to agent and add to container names

This commit is contained in:
pat-s 2025-01-05 20:57:57 +01:00
parent 7700e0e6c8
commit a0d8c70a2c
No known key found for this signature in database
GPG key ID: 3C6318841EF78925
17 changed files with 189 additions and 180 deletions

View file

@ -72,6 +72,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
repoName := extractRepositoryName(workflow.Config) // hack
pipelineNumber := extractPipelineNumber(workflow.Config) // hack
pipelineName := extractPipelineName(workflow.Config) // hack
r.counter.Add(
workflow.ID,
@ -149,6 +150,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
"workflow_id": workflow.ID,
"repo": repoName,
"pipeline_number": pipelineNumber,
"pipeline_name": pipelineName,
}),
).Run(runnerCtx)
@ -197,3 +199,7 @@ func extractRepositoryName(config *backend.Config) string {
func extractPipelineNumber(config *backend.Config) string {
return config.Stages[0].Steps[0].Environment["CI_PIPELINE_NUMBER"]
}
func extractPipelineName(config *backend.Config) string {
return config.Stages[0].Steps[0].Environment["CI_WORKFLOW_NAME"]
}

View file

@ -69,8 +69,8 @@ func (e *docker) toConfig(step *types.Step, options BackendOptions) *container.C
return config
}
func toContainerName(step *types.Step) string {
return "wp_" + step.UUID[:5] + "-" + step.Name
func toContainerName(step *types.Step, workflowName string) string {
return "wp_" + step.UUID[:5] + "-" + workflowName + "-" + step.Name
}
// returns a container host configuration.

View file

@ -125,8 +125,8 @@ var (
)
func TestToContainerName(t *testing.T) {
assert.EqualValues(t, "wp_f5182-hello", toContainerName(testCmdStep))
assert.EqualValues(t, "wp_d841e-lint", toContainerName(testPluginStep))
assert.EqualValues(t, "wp_f5182-workflowNameTest-hello", toContainerName(testCmdStep, "workflowNameTest"))
assert.EqualValues(t, "wp_d841e-workflowNameTest-lint", toContainerName(testPluginStep, "workflowNameTest"))
}
func TestStepToConfig(t *testing.T) {

View file

@ -141,8 +141,8 @@ func (e *docker) Load(ctx context.Context) (*backend.BackendInfo, error) {
}, nil
}
func (e *docker) SetupWorkflow(ctx context.Context, conf *backend.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment")
func (e *docker) SetupWorkflow(ctx context.Context, conf *backend.Config, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Str("workflowName", workflowName).Msg("create workflow environment")
_, err := e.client.VolumeCreate(ctx, volume.CreateOptions{
Name: conf.Volume.Name,
@ -163,17 +163,17 @@ func (e *docker) SetupWorkflow(ctx context.Context, conf *backend.Config, taskUU
return err
}
func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID string) error {
func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID, workflowName string) error {
options, err := parseBackendOptions(step)
if err != nil {
log.Error().Err(err).Msg("could not parse backend options")
}
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s-%s", workflowName, step.Name)
config := e.toConfig(step, options)
hostConfig := toHostConfig(step, &e.config)
containerName := toContainerName(step)
containerName := toContainerName(step, workflowName)
// create pull options with encoded authorization credentials.
pullOpts := image.PullOptions{}
@ -246,10 +246,10 @@ func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID str
return e.client.ContainerStart(ctx, containerName, container.StartOptions{})
}
func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) {
func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID, workflowName string) (*backend.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)
containerName := toContainerName(step)
containerName := toContainerName(step, workflowName)
wait, errC := e.client.ContainerWait(ctx, containerName, "")
select {
@ -269,10 +269,10 @@ func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID stri
}, nil
}
func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID string) (io.ReadCloser, error) {
func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID, workflowName string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)
logs, err := e.client.ContainerLogs(ctx, toContainerName(step), container.LogsOptions{
logs, err := e.client.ContainerLogs(ctx, toContainerName(step, workflowName), container.LogsOptions{
Follow: true,
ShowStdout: true,
ShowStderr: true,
@ -293,10 +293,10 @@ func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID stri
return rc, nil
}
func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID string) error {
func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name)
containerName := toContainerName(step)
containerName := toContainerName(step, workflowName)
if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
return err
@ -309,12 +309,12 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s
return nil
}
func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID string) error {
func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
containerName := toContainerName(step)
containerName := toContainerName(step, workflowName)
if err := e.client.ContainerKill(ctx, containerName, "9"); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
log.Error().Err(err).Msgf("could not kill container '%s'", step.Name)
}

View file

@ -78,17 +78,20 @@ func (e *dummy) Load(_ context.Context) (*backend.BackendInfo, error) {
}, nil
}
func (e *dummy) SetupWorkflow(_ context.Context, _ *backend.Config, taskUUID string) error {
func (e *dummy) SetupWorkflow(_ context.Context, _ *backend.Config, taskUUID, workflowName string) error {
if taskUUID == WorkflowSetupFailUUID {
return fmt.Errorf("expected fail to setup workflow")
}
if workflowName == "" {
return fmt.Errorf("expected fail to setup workflow")
}
log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment")
e.kv.Store("task_"+taskUUID, "setup")
return nil
}
func (e *dummy) StartStep(_ context.Context, step *backend.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)
func (e *dummy) StartStep(_ context.Context, step *backend.Step, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s-%s", workflowName, step.Name)
// internal state checks
_, exist := e.kv.Load("task_" + taskUUID)
@ -114,8 +117,8 @@ func (e *dummy) StartStep(_ context.Context, step *backend.Step, taskUUID string
return nil
}
func (e *dummy) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)
func (e *dummy) WaitStep(ctx context.Context, step *backend.Step, taskUUID, workflowName string) (*backend.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s-%s", workflowName, step.Name)
_, exist := e.kv.Load("task_" + taskUUID)
if !exist {
@ -172,8 +175,8 @@ func (e *dummy) WaitStep(ctx context.Context, step *backend.Step, taskUUID strin
}, nil
}
func (e *dummy) TailStep(_ context.Context, step *backend.Step, taskUUID string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)
func (e *dummy) TailStep(_ context.Context, step *backend.Step, taskUUID, workflowName string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s-%s", workflowName, step.Name)
_, exist := e.kv.Load("task_" + taskUUID)
if !exist {
@ -196,8 +199,8 @@ func (e *dummy) TailStep(_ context.Context, step *backend.Step, taskUUID string)
return io.NopCloser(strings.NewReader(dummyExecStepOutput(step))), nil
}
func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name)
func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Str("workflowName", workflowName).Msgf("stop step %s", step.Name)
_, exist := e.kv.Load("task_" + taskUUID)
if !exist {
@ -217,8 +220,8 @@ func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID stri
return nil
}
func (e *dummy) DestroyWorkflow(_ context.Context, _ *backend.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")
func (e *dummy) DestroyWorkflow(_ context.Context, _ *backend.Config, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Str("workflowName", workflowName).Msgf("delete workflow environment")
_, exist := e.kv.Load("task_" + taskUUID)
if !exist {

View file

@ -34,22 +34,22 @@ func TestSmalPipelineDummyRun(t *testing.T) {
_, err := dummyEngine.Load(ctx)
assert.NoError(t, err)
assert.Error(t, dummyEngine.SetupWorkflow(ctx, nil, dummy.WorkflowSetupFailUUID))
assert.Error(t, dummyEngine.SetupWorkflow(ctx, nil, dummy.WorkflowSetupFailUUID, "workflowTestName"))
t.Run("expect fail of step func with non setup workflow", func(t *testing.T) {
step := &types.Step{Name: "step1", UUID: "SID_1"}
nonExistWorkflowID := "WID_NONE"
err := dummyEngine.StartStep(ctx, step, nonExistWorkflowID)
err := dummyEngine.StartStep(ctx, step, nonExistWorkflowID, "workflowTestName")
assert.Error(t, err)
_, err = dummyEngine.TailStep(ctx, step, nonExistWorkflowID)
_, err = dummyEngine.TailStep(ctx, step, nonExistWorkflowID, "workflowTestName")
assert.Error(t, err)
_, err = dummyEngine.WaitStep(ctx, step, nonExistWorkflowID)
_, err = dummyEngine.WaitStep(ctx, step, nonExistWorkflowID, "workflowTestName")
assert.Error(t, err)
err = dummyEngine.DestroyStep(ctx, step, nonExistWorkflowID)
err = dummyEngine.DestroyStep(ctx, step, nonExistWorkflowID, "workflowTestName")
assert.Error(t, err)
})
@ -63,11 +63,11 @@ func TestSmalPipelineDummyRun(t *testing.T) {
}
workflowUUID := "WID_1"
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
assert.NoError(t, dummyEngine.StartStep(ctx, step, workflowUUID))
assert.NoError(t, dummyEngine.StartStep(ctx, step, workflowUUID, "workflowTestName"))
reader, err := dummyEngine.TailStep(ctx, step, workflowUUID)
reader, err := dummyEngine.TailStep(ctx, step, workflowUUID, "workflowTestName")
assert.NoError(t, err)
log, err := io.ReadAll(reader)
assert.NoError(t, err)
@ -81,14 +81,14 @@ echo nein
------------------
`, string(log))
state, err := dummyEngine.WaitStep(ctx, step, workflowUUID)
state, err := dummyEngine.WaitStep(ctx, step, workflowUUID, "workflowTestName")
assert.NoError(t, err)
assert.NoError(t, state.Error)
assert.EqualValues(t, 0, state.ExitCode)
assert.NoError(t, dummyEngine.DestroyStep(ctx, step, workflowUUID))
assert.NoError(t, dummyEngine.DestroyStep(ctx, step, workflowUUID, "workflowTestName"))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
})
t.Run("step exec error", func(t *testing.T) {
@ -100,21 +100,21 @@ echo nein
}
workflowUUID := "WID_1"
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
assert.NoError(t, dummyEngine.StartStep(ctx, step, workflowUUID))
assert.NoError(t, dummyEngine.StartStep(ctx, step, workflowUUID, "workflowTestName"))
_, err := dummyEngine.TailStep(ctx, step, workflowUUID)
_, err := dummyEngine.TailStep(ctx, step, workflowUUID, "workflowTestName")
assert.NoError(t, err)
state, err := dummyEngine.WaitStep(ctx, step, workflowUUID)
state, err := dummyEngine.WaitStep(ctx, step, workflowUUID, "workflowTestName")
assert.NoError(t, err)
assert.NoError(t, state.Error)
assert.EqualValues(t, 1, state.ExitCode)
assert.NoError(t, dummyEngine.DestroyStep(ctx, step, workflowUUID))
assert.NoError(t, dummyEngine.DestroyStep(ctx, step, workflowUUID, "workflowTestName"))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
})
t.Run("step tail error", func(t *testing.T) {
@ -125,19 +125,19 @@ echo nein
}
workflowUUID := "WID_1"
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
assert.NoError(t, dummyEngine.StartStep(ctx, step, workflowUUID))
assert.NoError(t, dummyEngine.StartStep(ctx, step, workflowUUID, "workflowTestName"))
_, err := dummyEngine.TailStep(ctx, step, workflowUUID)
_, err := dummyEngine.TailStep(ctx, step, workflowUUID, "workflowTestName")
assert.Error(t, err)
_, err = dummyEngine.WaitStep(ctx, step, workflowUUID)
_, err = dummyEngine.WaitStep(ctx, step, workflowUUID, "workflowTestName")
assert.NoError(t, err)
assert.NoError(t, dummyEngine.DestroyStep(ctx, step, workflowUUID))
assert.NoError(t, dummyEngine.DestroyStep(ctx, step, workflowUUID, "workflowTestName"))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
})
t.Run("step start fail", func(t *testing.T) {
@ -149,20 +149,20 @@ echo nein
}
workflowUUID := "WID_1"
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.SetupWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
assert.Error(t, dummyEngine.StartStep(ctx, step, workflowUUID))
assert.Error(t, dummyEngine.StartStep(ctx, step, workflowUUID, "workflowTestName"))
_, err := dummyEngine.TailStep(ctx, step, workflowUUID)
_, err := dummyEngine.TailStep(ctx, step, workflowUUID, "workflowTestName")
assert.Error(t, err)
state, err := dummyEngine.WaitStep(ctx, step, workflowUUID)
state, err := dummyEngine.WaitStep(ctx, step, workflowUUID, "workflowTestName")
assert.Error(t, err)
assert.Error(t, state.Error)
assert.EqualValues(t, 0, state.ExitCode)
assert.Error(t, dummyEngine.DestroyStep(ctx, step, workflowUUID))
assert.Error(t, dummyEngine.DestroyStep(ctx, step, workflowUUID, "workflowTestName"))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID))
assert.NoError(t, dummyEngine.DestroyWorkflow(ctx, nil, workflowUUID, "workflowTestName"))
})
}

View file

@ -188,7 +188,7 @@ func (e *kube) getConfig() *config {
}
// SetupWorkflow sets up the pipeline environment.
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
_, err := startVolume(ctx, e, conf.Volume.Name)
@ -200,7 +200,7 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
if step.Type == types.StepTypeService {
svc, err := startService(ctx, e, step)
svc, err := startService(ctx, e, step, workflowName)
if err != nil {
return err
}
@ -220,28 +220,28 @@ func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID s
}
// StartStep starts the pipeline step.
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID, workflowName string) error {
options, err := parseBackendOptions(step)
if err != nil {
log.Error().Err(err).Msg("could not parse backend options")
}
if needsRegistrySecret(step) {
err = startRegistrySecret(ctx, e, step)
err = startRegistrySecret(ctx, e, step, workflowName)
if err != nil {
return err
}
}
log.Trace().Str("taskUUID", taskUUID).Msgf("starting step: %s", step.Name)
_, err = startPod(ctx, e, step, options)
_, err = startPod(ctx, e, step, options, workflowName)
return err
}
// WaitStep waits for the pipeline step to complete and returns
// the completion results.
func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
podName, err := stepToPodName(step)
func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID, workflowName string) (*types.State, error) {
podName, err := stepToPodName(step, workflowName)
if err != nil {
return nil, err
}
@ -316,8 +316,8 @@ func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string)
}
// TailStep tails the pipeline step logs.
func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
podName, err := stepToPodName(step)
func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID, workflowName string) (io.ReadCloser, error) {
podName, err := stepToPodName(step, workflowName)
if err != nil {
return nil, err
}
@ -388,17 +388,17 @@ func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string)
return rc, nil
}
func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID, workflowName string) error {
var errs []error
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name)
if needsRegistrySecret(step) {
err := stopRegistrySecret(ctx, e, step, defaultDeleteOptions)
err := stopRegistrySecret(ctx, e, step, defaultDeleteOptions, workflowName)
if err != nil {
errs = append(errs, err)
}
}
err := stopPod(ctx, e, step, defaultDeleteOptions)
err := stopPod(ctx, e, step, defaultDeleteOptions, workflowName)
if err != nil {
errs = append(errs, err)
}
@ -406,18 +406,18 @@ func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID strin
}
// DestroyWorkflow destroys the pipeline environment.
func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("deleting Kubernetes primitives")
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
err := stopPod(ctx, e, step, defaultDeleteOptions)
err := stopPod(ctx, e, step, defaultDeleteOptions, workflowName)
if err != nil {
return err
}
if step.Type == types.StepTypeService {
err := stopService(ctx, e, step, defaultDeleteOptions)
err := stopService(ctx, e, step, defaultDeleteOptions, workflowName)
if err != nil {
return err
}

View file

@ -36,7 +36,7 @@ const (
defaultFSGroup int64 = 1000
)
func mkPod(step *types.Step, config *config, podName, goos string, options BackendOptions) (*v1.Pod, error) {
func mkPod(step *types.Step, config *config, podName, goos string, options BackendOptions, workflowName string) (*v1.Pod, error) {
var err error
nsp := newNativeSecretsProcessor(config, options.Secrets)
@ -45,12 +45,12 @@ func mkPod(step *types.Step, config *config, podName, goos string, options Backe
return nil, err
}
meta, err := podMeta(step, config, options, podName)
meta, err := podMeta(step, config, options, podName, workflowName)
if err != nil {
return nil, err
}
spec, err := podSpec(step, config, options, nsp)
spec, err := podSpec(step, config, options, nsp, workflowName)
if err != nil {
return nil, err
}
@ -69,18 +69,18 @@ func mkPod(step *types.Step, config *config, podName, goos string, options Backe
return pod, nil
}
func stepToPodName(step *types.Step) (name string, err error) {
func stepToPodName(step *types.Step, workflowName string) (name string, err error) {
if step.Type == types.StepTypeService {
return serviceName(step)
return serviceName(step, workflowName)
}
return podName(step)
return podName(step, workflowName)
}
func podName(step *types.Step) (string, error) {
return dnsName(podPrefix + step.UUID[:5] + "-" + step.Name)
func podName(step *types.Step, workflowName string) (string, error) {
return dnsName(podPrefix + step.UUID[:5] + "-" + workflowName + "-" + step.Name)
}
func podMeta(step *types.Step, config *config, options BackendOptions, podName string) (meta_v1.ObjectMeta, error) {
func podMeta(step *types.Step, config *config, options BackendOptions, podName, workflowName string) (meta_v1.ObjectMeta, error) {
var err error
meta := meta_v1.ObjectMeta{
Name: podName,
@ -88,7 +88,7 @@ func podMeta(step *types.Step, config *config, options BackendOptions, podName s
Annotations: podAnnotations(config, options),
}
meta.Labels, err = podLabels(step, config, options)
meta.Labels, err = podLabels(step, config, options, workflowName)
if err != nil {
return meta, err
}
@ -96,7 +96,7 @@ func podMeta(step *types.Step, config *config, options BackendOptions, podName s
return meta, nil
}
func podLabels(step *types.Step, config *config, options BackendOptions) (map[string]string, error) {
func podLabels(step *types.Step, config *config, options BackendOptions, workflowName string) (map[string]string, error) {
var err error
labels := make(map[string]string)
@ -113,7 +113,7 @@ func podLabels(step *types.Step, config *config, options BackendOptions) (map[st
maps.Copy(labels, config.PodLabels)
}
if step.Type == types.StepTypeService {
labels[ServiceLabel], _ = serviceName(step)
labels[ServiceLabel], _ = serviceName(step, workflowName)
}
labels[StepLabel], err = stepLabel(step)
if err != nil {
@ -146,7 +146,7 @@ func podAnnotations(config *config, options BackendOptions) map[string]string {
return annotations
}
func podSpec(step *types.Step, config *config, options BackendOptions, nsp nativeSecretsProcessor) (v1.PodSpec, error) {
func podSpec(step *types.Step, config *config, options BackendOptions, nsp nativeSecretsProcessor, workflowName string) (v1.PodSpec, error) {
var err error
spec := v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
@ -176,7 +176,7 @@ func podSpec(step *types.Step, config *config, options BackendOptions, nsp nativ
spec.ImagePullSecrets = secretsReferences(config.ImagePullSecretNames)
if needsRegistrySecret(step) {
log.Trace().Msgf("using an image pull secret from registries")
name, err := registrySecretName(step)
name, err := registrySecretName(step, workflowName)
if err != nil {
return spec, err
}
@ -524,13 +524,13 @@ func mapToEnvVars(m map[string]string) []v1.EnvVar {
return ev
}
func startPod(ctx context.Context, engine *kube, step *types.Step, options BackendOptions) (*v1.Pod, error) {
podName, err := stepToPodName(step)
func startPod(ctx context.Context, engine *kube, step *types.Step, options BackendOptions, workflowName string) (*v1.Pod, error) {
podName, err := stepToPodName(step, workflowName)
if err != nil {
return nil, err
}
engineConfig := engine.getConfig()
pod, err := mkPod(step, engineConfig, podName, engine.goos, options)
pod, err := mkPod(step, engineConfig, podName, engine.goos, options, workflowName)
if err != nil {
return nil, err
}
@ -539,8 +539,8 @@ func startPod(ctx context.Context, engine *kube, step *types.Step, options Backe
return engine.client.CoreV1().Pods(engineConfig.Namespace).Create(ctx, pod, meta_v1.CreateOptions{})
}
func stopPod(ctx context.Context, engine *kube, step *types.Step, deleteOpts meta_v1.DeleteOptions) error {
podName, err := stepToPodName(step)
func stopPod(ctx context.Context, engine *kube, step *types.Step, deleteOpts meta_v1.DeleteOptions, workflowName string) error {
podName, err := stepToPodName(step, workflowName)
if err != nil {
return err
}

View file

@ -26,33 +26,33 @@ import (
)
func TestPodName(t *testing.T) {
name, err := podName(&types.Step{UUID: "01he8bebctabr3kgk0qj36d2me-0", Name: "go-test"})
name, err := podName(&types.Step{UUID: "01he8bebctabr3kgk0qj36d2me-0", Name: "go-test"}, "workflowNameTest")
assert.NoError(t, err)
assert.Equal(t, "wp-01he8-go-test", name)
assert.Equal(t, "wp-01he8-workflownametest-go-test", name)
_, err = podName(&types.Step{UUID: "01he8bebctabr3kgk0qj36d2me\\0a"})
_, err = podName(&types.Step{UUID: "01he8bebctabr3kgk0qj36d2me\\0a"}, "workflowNameTest")
assert.ErrorIs(t, err, ErrDNSPatternInvalid)
_, err = podName(&types.Step{UUID: "01he8bebctabr3kgk0qj36d2me-0-services-0..woodpecker-runtime.svc.cluster.local"})
_, err = podName(&types.Step{UUID: "01he8bebctabr3kgk0qj36d2me-0-services-0..woodpecker-runtime.svc.cluster.local"}, "workflowNameTest")
assert.ErrorIs(t, err, ErrDNSPatternInvalid)
}
func TestStepToPodName(t *testing.T) {
name, err := stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "clone", Type: types.StepTypeClone})
name, err := stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "clone", Type: types.StepTypeClone}, "workflowNameTest")
assert.NoError(t, err)
assert.EqualValues(t, "wp-01he8-clone", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "cache", Type: types.StepTypeCache})
assert.EqualValues(t, "wp-01he8-workflownametest-clone", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "cache", Type: types.StepTypeCache}, "workflowNameTest")
assert.NoError(t, err)
assert.EqualValues(t, "wp-01he8-cache", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "release", Type: types.StepTypePlugin})
assert.EqualValues(t, "wp-01he8-workflownametest-cache", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "release", Type: types.StepTypePlugin}, "workflowNameTest")
assert.NoError(t, err)
assert.EqualValues(t, "wp-01he8-release", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "prepare-env", Type: types.StepTypeCommands})
assert.EqualValues(t, "wp-01he8-workflownametest-release", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "prepare-env", Type: types.StepTypeCommands}, "workflowNameTest")
assert.NoError(t, err)
assert.EqualValues(t, "wp-01he8-prepare-env", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "postgres", Type: types.StepTypeService})
assert.EqualValues(t, "wp-01he8-workflownametest-prepare-env", name)
name, err = stepToPodName(&types.Step{UUID: "01he8bebctabr3kg", Name: "postgres", Type: types.StepTypeService}, "workflowNameTest")
assert.NoError(t, err)
assert.EqualValues(t, "wp-svc-01he8-postgres", name)
assert.EqualValues(t, "wp-svc-01he8-workflownametest-postgres", name)
}
func TestStepLabel(t *testing.T) {
@ -68,7 +68,7 @@ func TestTinyPod(t *testing.T) {
const expected = `
{
"metadata": {
"name": "wp-01he8-go-test",
"name": "wp-01he8-workflownametest-go-test",
"namespace": "woodpecker",
"creationTimestamp": null,
"labels": {
@ -86,7 +86,7 @@ func TestTinyPod(t *testing.T) {
],
"containers": [
{
"name": "wp-01he8-go-test",
"name": "wp-01he8-workflownametest-go-test",
"image": "gradle:8.4.0-jdk21",
"command": [
"/bin/sh",
@ -133,7 +133,7 @@ func TestTinyPod(t *testing.T) {
Environment: map[string]string{"CI": "woodpecker"},
}, &config{
Namespace: "woodpecker",
}, "wp-01he8-go-test", "linux/amd64", BackendOptions{})
}, "wp-01he8-workflownametest-go-test", "linux/amd64", BackendOptions{}, "workflowNameTest")
assert.NoError(t, err)
podJSON, err := json.Marshal(pod)
@ -147,7 +147,7 @@ func TestFullPod(t *testing.T) {
const expected = `
{
"metadata": {
"name": "wp-01he8-go-test",
"name": "wp-01he8-workflownametest-go-test",
"namespace": "woodpecker",
"creationTimestamp": null,
"labels": {
@ -171,7 +171,7 @@ func TestFullPod(t *testing.T) {
],
"containers": [
{
"name": "wp-01he8-go-test",
"name": "wp-01he8-workflownametest-go-test",
"image": "meltwater/drone-cache",
"command": [
"/bin/sh",
@ -256,7 +256,7 @@ func TestFullPod(t *testing.T) {
"name": "another-pull-secret"
},
{
"name": "wp-01he8-go-test"
"name": "wp-01he8-workflownametest-go-test"
}
],
"tolerations": [
@ -335,7 +335,7 @@ func TestFullPod(t *testing.T) {
PodAnnotationsAllowFromStep: true,
PodNodeSelector: map[string]string{"topology.kubernetes.io/region": "eu-central-1"},
SecurityContext: SecurityContextConfig{RunAsNonRoot: false},
}, "wp-01he8-go-test", "linux/amd64", BackendOptions{
}, "wp-01he8-workflownametest-go-test", "linux/amd64", BackendOptions{
Labels: map[string]string{"part-of": "woodpecker-ci"},
Annotations: map[string]string{"kubernetes.io/limit-ranger": "LimitRanger plugin set: cpu, memory request and limit for container"},
NodeSelector: map[string]string{"storage": "ssd"},
@ -347,7 +347,7 @@ func TestFullPod(t *testing.T) {
Limits: map[string]string{"memory": "256Mi", "cpu": "2"},
},
SecurityContext: &secCtx,
})
}, "workflowNameTest")
assert.NoError(t, err)
podJSON, err := json.Marshal(pod)
@ -366,9 +366,9 @@ func TestPodPrivilege(t *testing.T) {
}, &config{
Namespace: "woodpecker",
SecurityContext: SecurityContextConfig{RunAsNonRoot: globalRunAsRoot},
}, "wp-01he8-go-test", "linux/amd64", BackendOptions{
}, "wp-01he8-workflownametest-go-test", "linux/amd64", BackendOptions{
SecurityContext: &secCtx,
})
}, "workflowNameTest")
}
// securty context is requesting user and group 101 (non-root)
@ -443,7 +443,7 @@ func TestScratchPod(t *testing.T) {
const expected = `
{
"metadata": {
"name": "wp-01he8-go-test",
"name": "wp-01he8-workflownametest-go-test",
"namespace": "woodpecker",
"creationTimestamp": null,
"labels": {
@ -453,7 +453,7 @@ func TestScratchPod(t *testing.T) {
"spec": {
"containers": [
{
"name": "wp-01he8-go-test",
"name": "wp-01he8-workflownametest-go-test",
"image": "quay.io/curl/curl",
"command": [
"/usr/bin/curl",
@ -474,7 +474,7 @@ func TestScratchPod(t *testing.T) {
Entrypoint: []string{"/usr/bin/curl", "-v", "google.com"},
}, &config{
Namespace: "woodpecker",
}, "wp-01he8-go-test", "linux/amd64", BackendOptions{})
}, "wp-01he8-workflownametest-go-test", "linux/amd64", BackendOptions{}, "workflowNameTest")
assert.NoError(t, err)
podJSON, err := json.Marshal(pod)
@ -593,7 +593,7 @@ func TestSecrets(t *testing.T) {
Target: SecretTarget{File: "~/.docker/config.json"},
},
},
})
}, "workflowNameTest")
assert.NoError(t, err)
podJSON, err := json.Marshal(pod)

View file

@ -204,13 +204,13 @@ func needsRegistrySecret(step *types.Step) bool {
return step.AuthConfig.Username != "" && step.AuthConfig.Password != ""
}
func mkRegistrySecret(step *types.Step, config *config) (*v1.Secret, error) {
name, err := registrySecretName(step)
func mkRegistrySecret(step *types.Step, config *config, workflowName string) (*v1.Secret, error) {
name, err := registrySecretName(step, workflowName)
if err != nil {
return nil, err
}
labels, err := registrySecretLabels(step)
labels, err := registrySecretLabels(step, workflowName)
if err != nil {
return nil, err
}
@ -247,16 +247,16 @@ func mkRegistrySecret(step *types.Step, config *config) (*v1.Secret, error) {
}, nil
}
func registrySecretName(step *types.Step) (string, error) {
return podName(step)
func registrySecretName(step *types.Step, workflowName string) (string, error) {
return podName(step, workflowName)
}
func registrySecretLabels(step *types.Step) (map[string]string, error) {
func registrySecretLabels(step *types.Step, workflowName string) (map[string]string, error) {
var err error
labels := make(map[string]string)
if step.Type == types.StepTypeService {
labels[ServiceLabel], _ = serviceName(step)
labels[ServiceLabel], _ = serviceName(step, workflowName)
}
labels[StepLabel], err = stepLabel(step)
if err != nil {
@ -266,8 +266,8 @@ func registrySecretLabels(step *types.Step) (map[string]string, error) {
return labels, nil
}
func startRegistrySecret(ctx context.Context, engine *kube, step *types.Step) error {
secret, err := mkRegistrySecret(step, engine.config)
func startRegistrySecret(ctx context.Context, engine *kube, step *types.Step, workflowName string) error {
secret, err := mkRegistrySecret(step, engine.config, workflowName)
if err != nil {
return err
}
@ -279,8 +279,8 @@ func startRegistrySecret(ctx context.Context, engine *kube, step *types.Step) er
return nil
}
func stopRegistrySecret(ctx context.Context, engine *kube, step *types.Step, deleteOpts meta_v1.DeleteOptions) error {
name, err := registrySecretName(step)
func stopRegistrySecret(ctx context.Context, engine *kube, step *types.Step, deleteOpts meta_v1.DeleteOptions, workflowName string) error {
name, err := registrySecretName(step, workflowName)
if err != nil {
return err
}

View file

@ -208,7 +208,7 @@ func TestUsernameAndPasswordNeedsSecret(t *testing.T) {
func TestRegistrySecret(t *testing.T) {
const expected = `{
"metadata": {
"name": "wp-01he8-go-test",
"name": "wp-01he8-workflownametest-go-test",
"namespace": "woodpecker",
"creationTimestamp": null,
"labels": {
@ -231,7 +231,7 @@ func TestRegistrySecret(t *testing.T) {
},
}, &config{
Namespace: "woodpecker",
})
}, "workflowNameTest")
assert.NoError(t, err)
secretJSON, err := json.Marshal(secret)

View file

@ -33,8 +33,8 @@ const (
servicePrefix = "wp-svc-"
)
func mkService(step *types.Step, config *config) (*v1.Service, error) {
name, err := serviceName(step)
func mkService(step *types.Step, config *config, workflowName string) (*v1.Service, error) {
name, err := serviceName(step, workflowName)
if err != nil {
return nil, err
}
@ -62,8 +62,8 @@ func mkService(step *types.Step, config *config) (*v1.Service, error) {
}, nil
}
func serviceName(step *types.Step) (string, error) {
return dnsName(servicePrefix + step.UUID[:5] + "-" + step.Name)
func serviceName(step *types.Step, workflowName string) (string, error) {
return dnsName(servicePrefix + step.UUID[:5] + "-" + workflowName + "-" + step.Name)
}
func servicePort(port types.Port) v1.ServicePort {
@ -77,9 +77,9 @@ func servicePort(port types.Port) v1.ServicePort {
}
}
func startService(ctx context.Context, engine *kube, step *types.Step) (*v1.Service, error) {
func startService(ctx context.Context, engine *kube, step *types.Step, workflowName string) (*v1.Service, error) {
engineConfig := engine.getConfig()
svc, err := mkService(step, engineConfig)
svc, err := mkService(step, engineConfig, workflowName)
if err != nil {
return nil, err
}
@ -88,8 +88,8 @@ func startService(ctx context.Context, engine *kube, step *types.Step) (*v1.Serv
return engine.client.CoreV1().Services(engineConfig.Namespace).Create(ctx, svc, meta_v1.CreateOptions{})
}
func stopService(ctx context.Context, engine *kube, step *types.Step, deleteOpts meta_v1.DeleteOptions) error {
svcName, err := serviceName(step)
func stopService(ctx context.Context, engine *kube, step *types.Step, deleteOpts meta_v1.DeleteOptions, workflowName string) error {
svcName, err := serviceName(step, workflowName)
if err != nil {
return err
}

View file

@ -24,24 +24,24 @@ import (
)
func TestServiceName(t *testing.T) {
name, err := serviceName(&types.Step{Name: "database", UUID: "01he8bebctabr3kgk0qj36d2me"})
name, err := serviceName(&types.Step{Name: "database", UUID: "01he8bebctabr3kgk0qj36d2me"}, "workflowNameTest")
assert.NoError(t, err)
assert.Equal(t, "wp-svc-01he8-database", name)
assert.Equal(t, "wp-svc-01he8-workflownametest-database", name)
name, err = serviceName(&types.Step{Name: "wp-01he8-clone-0-services-0.woodpecker-runtime.svc.cluster.local", UUID: "01he8bebctabr3kgk0qj36d2me"})
name, err = serviceName(&types.Step{Name: "wp-01he8-workflownametest-clone-0-services-0.woodpecker-runtime.svc.cluster.local", UUID: "01he8bebctabr3kgk0qj36d2me"}, "workflowNameTest")
assert.NoError(t, err)
assert.Equal(t, "wp-svc-01he8-wp-01he8-clone-0-services-0.woodpecker-runtime.svc.cluster.local", name)
assert.Equal(t, "wp-svc-01he8-workflownametest-wp-01he8-workflownametest-clone-0-services-0.woodpecker-runtime.svc.cluster.local", name)
name, err = serviceName(&types.Step{Name: "awesome_service", UUID: "01he8bebctabr3kgk0qj36d2me"})
name, err = serviceName(&types.Step{Name: "awesome_service", UUID: "01he8bebctabr3kgk0qj36d2me"}, "workflowNameTest")
assert.NoError(t, err)
assert.Equal(t, "wp-svc-01he8-awesome-service", name)
assert.Equal(t, "wp-svc-01he8-workflownametest-awesome-service", name)
}
func TestService(t *testing.T) {
expected := `
{
"metadata": {
"name": "wp-svc-01he8-bar",
"name": "wp-svc-01he8-workflownametest-bar",
"namespace": "foo",
"creationTimestamp": null
},
@ -66,7 +66,7 @@ func TestService(t *testing.T) {
}
],
"selector": {
"service": "wp-svc-01he8-bar"
"service": "wp-svc-01he8-workflownametest-bar"
},
"type": "ClusterIP"
},
@ -83,7 +83,7 @@ func TestService(t *testing.T) {
Name: "bar",
UUID: "01he8bebctabr3kgk0qj36d2me-0",
Ports: ports,
}, &config{Namespace: "foo"})
}, &config{Namespace: "foo"}, "workflowNameTest")
assert.NoError(t, err)
j, err := json.Marshal(s)
assert.NoError(t, err)

View file

@ -90,8 +90,8 @@ func (e *local) Load(ctx context.Context) (*types.BackendInfo, error) {
}
// SetupWorkflow the pipeline environment.
func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment")
func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Str("workflowName", workflowName).Msg("create workflow environment")
baseDir, err := os.MkdirTemp(e.tempDir, "woodpecker-local-*")
if err != nil {
@ -119,8 +119,8 @@ func (e *local) SetupWorkflow(_ context.Context, _ *types.Config, taskUUID strin
}
// StartStep the pipeline step.
func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)
func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s-%s", workflowName, step.Name)
state, err := e.getState(taskUUID)
if err != nil {
@ -204,8 +204,8 @@ func (e *local) execPlugin(ctx context.Context, step *types.Step, state *workflo
// WaitStep for the pipeline step to complete and returns
// the completion results.
func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)
func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID, workflowName string) (*types.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s-%s", workflowName, step.Name)
state, err := e.getState(taskUUID)
if err != nil {
@ -234,19 +234,19 @@ func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (
}
// TailStep the pipeline step logs.
func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)
func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID, workflowName string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Str("workflowName", workflowName).Msgf("tail logs of step %s", step.Name)
return e.output, nil
}
func (e *local) DestroyStep(_ context.Context, _ *types.Step, _ string) error {
func (e *local) DestroyStep(_ context.Context, _ *types.Step, _, _ string) error {
// WaitStep already waits for the command to finish, so there is nothing to do here.
return nil
}
// DestroyWorkflow the pipeline environment.
func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("delete workflow environment")
func (e *local) DestroyWorkflow(_ context.Context, _ *types.Config, taskUUID, workflowName string) error {
log.Trace().Str("taskUUID", taskUUID).Str("workflowName", workflowName).Msg("delete workflow environment")
state, err := e.getState(taskUUID)
if err != nil {

View file

@ -37,23 +37,23 @@ type Backend interface {
Load(ctx context.Context) (*BackendInfo, error)
// SetupWorkflow sets up the workflow environment.
SetupWorkflow(ctx context.Context, conf *Config, taskUUID string) error
SetupWorkflow(ctx context.Context, conf *Config, taskUUID, workflowName string) error
// StartStep starts the workflow step.
StartStep(ctx context.Context, step *Step, taskUUID string) error
StartStep(ctx context.Context, step *Step, taskUUID, workflowName string) error
// WaitStep waits for the workflow step to complete and returns
// the completion results.
WaitStep(ctx context.Context, step *Step, taskUUID string) (*State, error)
WaitStep(ctx context.Context, step *Step, taskUUID, workflowName string) (*State, error)
// TailStep tails the workflow step logs.
TailStep(ctx context.Context, step *Step, taskUUID string) (io.ReadCloser, error)
TailStep(ctx context.Context, step *Step, taskUUID, workflowName string) (io.ReadCloser, error)
// DestroyStep destroys the workflow step.
DestroyStep(ctx context.Context, step *Step, taskUUID string) error
DestroyStep(ctx context.Context, step *Step, taskUUID, workflowName string) error
// DestroyWorkflow destroys the workflow environment.
DestroyWorkflow(ctx context.Context, conf *Config, taskUUID string) error
DestroyWorkflow(ctx context.Context, conf *Config, taskUUID, workflowName string) error
}
// BackendInfo represents the reported information of a loaded backend.

View file

@ -147,7 +147,7 @@ pipeline:
commands: meh!
`
workflow1, err := ParseString(sampleYamlPipeline)
workflowNameTest, err := ParseString(sampleYamlPipeline)
if !assert.NoError(t, err) {
return
}
@ -157,9 +157,9 @@ pipeline:
return
}
assert.EqualValues(t, workflow1, workflow2)
assert.Len(t, workflow1.Steps.ContainerList, 1)
assert.EqualValues(t, "say hello", workflow1.Steps.ContainerList[0].Name)
assert.EqualValues(t, workflowNameTest, workflow2)
assert.Len(t, workflowNameTest.Steps.ContainerList, 1)
assert.EqualValues(t, "say hello", workflowNameTest.Steps.ContainerList[0].Name)
}
var sampleYaml = `

View file

@ -109,13 +109,13 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
if ctx.Err() != nil {
ctx = GetShutdownCtx()
}
if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID); err != nil {
if err := r.engine.DestroyWorkflow(ctx, r.spec, r.taskUUID, r.Description["pipeline_name"]); err != nil {
logger.Error().Err(err).Msg("could not destroy engine")
}
}()
r.started = time.Now().Unix()
if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil {
if err := r.engine.SetupWorkflow(runnerCtx, r.spec, r.taskUUID, r.Description["pipeline_name"]); err != nil {
return err
}
@ -123,7 +123,7 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
select {
case <-r.ctx.Done():
return ErrCancel
case err := <-r.execAll(stage.Steps):
case err := <-r.execAll(stage.Steps, r.Description["pipeline_name"]):
if err != nil {
r.err = err
}
@ -163,7 +163,7 @@ func (r *Runtime) traceStep(processState *backend.State, err error, step *backen
}
// Executes a set of parallel steps.
func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
func (r *Runtime) execAll(steps []*backend.Step, workflowName string) <-chan error {
var g errgroup.Group
done := make(chan error)
logger := r.MakeLogger()
@ -206,7 +206,7 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
Str("step", step.Name).
Msg("executing")
processState, err := r.exec(step)
processState, err := r.exec(step, workflowName)
logger.Debug().
Str("step", step.Name).
@ -229,14 +229,14 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
}
// Executes the step and returns the state and error.
func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil {
func (r *Runtime) exec(step *backend.Step, workflowName string) (*backend.State, error) {
if err := r.engine.StartStep(r.ctx, step, r.taskUUID, workflowName); err != nil {
return nil, err
}
var wg sync.WaitGroup
if r.logger != nil {
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID)
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID, workflowName)
if err != nil {
return nil, err
}
@ -261,7 +261,7 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID)
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID, workflowName)
if err != nil {
if errors.Is(err, context.Canceled) {
return waitState, ErrCancel
@ -269,7 +269,7 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
return nil, err
}
if err := r.engine.DestroyStep(r.ctx, step, r.taskUUID); err != nil {
if err := r.engine.DestroyStep(r.ctx, step, r.taskUUID, workflowName); err != nil {
return nil, err
}