Remove multipart logger (#3200)

This commit is contained in:
qwerty287 2024-01-14 10:54:02 +01:00 committed by GitHub
parent 685907ddf6
commit 45bf8600ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 261 deletions

View file

@ -23,21 +23,16 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline"
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/multipart"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.LogFunc {
return func(step *backend.Step, rc multipart.Reader) error {
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger {
return func(step *backend.Step, rc io.Reader) error {
loglogger := logger.With().
Str("image", step.Image).
Str("workflowID", workflow.ID).
Logger()
part, rerr := rc.NextPart()
if rerr != nil {
return rerr
}
uploads.Add(1)
var secrets []string
@ -50,7 +45,7 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo
loglogger.Debug().Msg("log stream opened")
logStream := rpc.NewLineWriter(r.client, step.UUID, secrets...)
if _, err := io.Copy(logStream, part); err != nil {
if _, err := io.Copy(logStream, rc); err != nil {
log.Error().Err(err).Msg("copy limited logStream part")
}

View file

@ -39,7 +39,6 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/compiler"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/linter"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/matrix"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/multipart"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
)
@ -253,13 +252,8 @@ func convertPathForWindows(path string) string {
return filepath.ToSlash(path)
}
var defaultLogger = pipeline.LogFunc(func(step *backendTypes.Step, rc multipart.Reader) error {
part, err := rc.NextPart()
if err != nil {
return err
}
var defaultLogger = pipeline.Logger(func(step *backendTypes.Step, rc io.Reader) error {
logStream := NewLineWriter(step.Name, step.UUID)
_, err = io.Copy(logStream, part)
_, err := io.Copy(logStream, rc)
return err
})

View file

@ -15,20 +15,10 @@
package pipeline
import (
"io"
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/multipart"
)
// Logger handles the process logging.
type Logger interface {
Log(*backend.Step, multipart.Reader) error
}
// LogFunc type is an adapter to allow the use of an ordinary
// function for process logging.
type LogFunc func(*backend.Step, multipart.Reader) error
// Log calls f(step, r).
func (f LogFunc) Log(step *backend.Step, r multipart.Reader) error {
return f(step, r)
}
type Logger func(*backend.Step, io.Reader) error

View file

@ -1,117 +0,0 @@
// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multipart
import (
"bufio"
"bytes"
"io"
"mime/multipart"
"net/textproto"
)
type (
// Reader is an iterator over parts in a multipart log stream.
Reader interface {
// NextPart returns the next part in the multipart or
// an error. When there are no more parts, the error
// io.EOF is returned.
NextPart() (Part, error)
}
// A Part represents a single part in a multipart body.
Part interface {
io.Reader
// Header returns the headers of the body with the
// keys canonicalized.
Header() textproto.MIMEHeader
// FileName returns the filename parameter of the
// Content-Disposition header.
FileName() string
// FormName returns the name parameter if p has a
// Content-Disposition of type form-data.
FormName() string
}
)
// New returns a new multipart Reader.
func New(r io.Reader) Reader {
buf := bufio.NewReader(r)
out, _ := buf.Peek(8)
if bytes.Equal(out, []byte("PIPELINE")) {
return &multipartReader{
reader: multipart.NewReader(buf, "boundary"),
}
}
return &textReader{
reader: buf,
}
}
//
// wraps the stdlib multi-part reader
//
type multipartReader struct {
reader *multipart.Reader
}
func (r *multipartReader) NextPart() (Part, error) {
next, err := r.reader.NextPart()
if err != nil {
return nil, err
}
part := new(part)
part.Reader = next
part.filename = next.FileName()
part.formname = next.FormName()
part.header = next.Header
return part, nil
}
//
// wraps a simple io.Reader to satisfy the multi-part interface
//
type textReader struct {
reader io.Reader
done bool
}
func (r *textReader) NextPart() (Part, error) {
if r.done {
return nil, io.EOF
}
r.done = true
p := new(part)
p.Reader = r.reader
return p, nil
}
type part struct {
io.Reader
filename string
formname string
header textproto.MIMEHeader
}
func (p *part) Header() textproto.MIMEHeader { return p.header }
func (p *part) FileName() string { return p.filename }
func (p *part) FormName() string { return p.filename }

View file

@ -1,90 +0,0 @@
// Copyright 2023 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multipart
import (
"bytes"
"io"
"testing"
)
func TestReader(t *testing.T) {
b := bytes.NewBufferString(sample)
m := New(b)
part, err := m.NextPart()
if err != nil {
t.Error(err)
return
}
header := part.Header()
if got, want := header.Get("Content-Type"), "text/plain"; got != want {
t.Errorf("Want Content-Type %q, got %q", want, got)
}
body, err := io.ReadAll(part)
if err != nil {
t.Error(err)
return
}
if got, want := string(body), sampleTextPlain; got != want {
t.Errorf("Want body %q, got %q", want, got)
}
part, err = m.NextPart()
if err != nil {
t.Error(err)
return
}
header = part.Header()
if got, want := header.Get("Content-Type"), "application/json+coverage"; got != want {
t.Errorf("Want Content-Type %q, got %q", want, got)
}
if got, want := header.Get("X-Covered"), "96.00"; got != want {
t.Errorf("Want X-Covered %q, got %q", want, got)
}
if got, want := header.Get("X-Covered-Lines"), "96"; got != want {
t.Errorf("Want X-Covered-Lines %q, got %q", want, got)
}
if got, want := header.Get("X-Total-Lines"), "100"; got != want {
t.Errorf("Want X-Total-Lines %q, got %q", want, got)
}
}
var sample = `PIPELINE
Content-Type: multipart/mixed; boundary=boundary
--boundary
Content-Type: text/plain
match: pipeline/frontend/yaml/compiler/coverage.out
match: pipeline/frontend/yaml/coverage.out
match: pipeline/frontend/yaml/linter/coverage.out
--boundary
Content-Type: application/json+coverage
X-Covered: 96.00
X-Covered-Lines: 96
X-Total-Lines: 100
{"metrics":{"covered_lines":96,"total_lines":100}}
--boundary--
`
var sampleTextPlain = `match: pipeline/frontend/yaml/compiler/coverage.out
match: pipeline/frontend/yaml/coverage.out
match: pipeline/frontend/yaml/linter/coverage.out
`

View file

@ -28,7 +28,6 @@ import (
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/multipart"
)
// TODO: move runtime into "runtime" subpackage
@ -89,7 +88,7 @@ func (r *Runtime) MakeLogger() zerolog.Logger {
return logCtx.Logger()
}
// Starts the execution of an workflow and waits for it to complete
// Run starts the execution of a workflow and waits for it to complete
func (r *Runtime) Run(runnerCtx context.Context) error {
logger := r.MakeLogger()
logger.Debug().Msgf("executing %d stages, in order of:", len(r.spec.Stages))
@ -166,27 +165,27 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
logger := r.MakeLogger()
for _, step := range steps {
// required since otherwise the loop variable
// Required since otherwise the loop variable
// will be captured by the function. This will
// recreate the step "variable"
step := step
g.Go(func() error {
// Case the pipeline was already complete.
logger.Debug().
Str("Step", step.Name).
Msg("Prepare")
Str("step", step.Name).
Msg("prepare")
switch {
case r.err != nil && !step.OnFailure:
logger.Debug().
Str("Step", step.Name).
Str("step", step.Name).
Err(r.err).
Msgf("Skipped due to OnFailure=%t", step.OnFailure)
Msgf("skipped due to OnFailure=%t", step.OnFailure)
return nil
case r.err == nil && !step.OnSuccess:
logger.Debug().
Str("Step", step.Name).
Msgf("Skipped due to OnSuccess=%t", step.OnSuccess)
Str("step", step.Name).
Msgf("skipped due to OnSuccess=%t", step.OnSuccess)
return nil
}
@ -200,22 +199,14 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
metadata.SetDroneEnviron(step.Environment)
logger.Debug().
Str("Step", step.Name).
Msg("Executing")
Str("step", step.Name).
Msg("executing")
processState, err := r.exec(step)
logger.Debug().
Str("Step", step.Name).
Msg("Complete")
// if we got a nil process but an error state
// then we need to log the internal error to the step.
if r.logger != nil && err != nil && !errors.Is(err, ErrCancel) && processState == nil {
_ = r.logger.Log(step, multipart.New(strings.NewReader(
"Backend engine error while running step: "+err.Error(),
)))
}
Str("step", step.Name).
Msg("complete")
// Return the error after tracing it.
err = r.traceStep(processState, err, step)
@ -251,7 +242,7 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
defer wg.Done()
logger := r.MakeLogger()
if err := r.logger.Log(step, multipart.New(rc)); err != nil {
if err := r.logger(step, rc); err != nil {
logger.Error().Err(err).Msg("process logging failed")
}
_ = rc.Close()

View file

@ -153,9 +153,9 @@ func cancelPreviousPipelines(
if err = Cancel(ctx, _store, repo, user, active); err != nil {
log.Error().
Err(err).
Str("Ref", active.Ref).
Int64("ID", active.ID).
Msg("Failed to cancel pipeline")
Str("ref", active.Ref).
Int64("id", active.ID).
Msg("failed to cancel pipeline")
}
}