Improve step logging (#3722)

This commit is contained in:
Anbraten 2024-06-13 17:18:32 +02:00 committed by GitHub
parent 6a0c236d77
commit 8b387e73ee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 716 additions and 150 deletions

View file

@ -19,16 +19,22 @@ import (
"sync"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline"
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger {
const (
// Store not more than 1mb in a log-line as 4mb is the limit of a grpc message
// and log-lines needs to be parsed by the browsers later on.
maxLogLineLength = 1024 * 1024 // 1mb
)
func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger {
return func(step *backend.Step, rc io.Reader) error {
logLogger := logger.With().
logger := _logger.With().
Str("image", step.Image).
Str("workflowID", workflow.ID).
Logger()
@ -40,14 +46,14 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo
secrets = append(secrets, secret.Value)
}
logLogger.Debug().Msg("log stream opened")
logger.Debug().Msg("log stream opened")
logStream := rpc.NewLineWriter(r.client, step.UUID, secrets...)
if _, err := io.Copy(logStream, rc); err != nil {
log.Error().Err(err).Msg("copy limited logStream part")
logStream := log.NewLineWriter(r.client, step.UUID, secrets...)
if err := log.CopyLineByLine(logStream, rc, maxLogLineLength); err != nil {
logger.Error().Err(err).Msg("copy limited logStream part")
}
logLogger.Debug().Msg("log stream copied, close ...")
logger.Debug().Msg("log stream copied, close ...")
uploads.Done()
return nil

View file

@ -39,6 +39,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/compiler"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/linter"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/matrix"
pipelineLog "go.woodpecker-ci.org/woodpecker/v2/pipeline/log"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
)
@ -273,8 +274,8 @@ func convertPathForWindows(path string) string {
return filepath.ToSlash(path)
}
const maxLogLineLength = 1024 * 1024 // 1mb
var defaultLogger = pipeline.Logger(func(step *backendTypes.Step, rc io.Reader) error {
logStream := NewLineWriter(step.Name, step.UUID)
_, err := io.Copy(logStream, rc)
return err
logWriter := NewLineWriter(step.Name, step.UUID)
return pipelineLog.CopyLineByLine(logWriter, rc, maxLogLineLength)
})

View file

@ -16,50 +16,34 @@ package exec
import (
"fmt"
"io"
"os"
"strings"
"time"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)
// LineWriter sends logs to the client.
type LineWriter struct {
stepName string
stepUUID string
num int
now time.Time
rep *strings.Replacer
lines []*rpc.LogEntry
stepName string
stepUUID string
num int
startTime time.Time
}
// NewLineWriter returns a new line reader.
func NewLineWriter(stepName, stepUUID string) *LineWriter {
func NewLineWriter(stepName, stepUUID string) io.WriteCloser {
return &LineWriter{
stepName: stepName,
stepUUID: stepUUID,
now: time.Now().UTC(),
stepName: stepName,
stepUUID: stepUUID,
startTime: time.Now().UTC(),
}
}
func (w *LineWriter) Write(p []byte) (n int, err error) {
data := string(p)
if w.rep != nil {
data = w.rep.Replace(data)
}
line := &rpc.LogEntry{
Data: data,
StepUUID: w.stepUUID,
Line: w.num,
Time: int64(time.Since(w.now).Seconds()),
Type: rpc.LogEntryStdout,
}
fmt.Fprintf(os.Stderr, "[%s:L%d:%ds] %s", w.stepName, w.num, int64(time.Since(w.now).Seconds()), data)
fmt.Fprintf(os.Stderr, "[%s:L%d:%ds] %s", w.stepName, w.num, int64(time.Since(w.startTime).Seconds()), p)
w.num++
w.lines = append(w.lines, line)
return len(p), nil
}
func (w *LineWriter) Close() error {
return nil
}

View file

@ -0,0 +1,79 @@
// Copyright 2022 Woodpecker Authors
// Copyright 2011 Drone.IO Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package log
import (
"context"
"io"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/shared"
)
// LineWriter sends logs to the client.
type LineWriter struct {
sync.Mutex
peer rpc.Peer
stepUUID string
num int
startTime time.Time
replacer *strings.Replacer
}
// NewLineWriter returns a new line reader.
func NewLineWriter(peer rpc.Peer, stepUUID string, secret ...string) io.WriteCloser {
lw := &LineWriter{
peer: peer,
stepUUID: stepUUID,
startTime: time.Now().UTC(),
replacer: shared.NewSecretsReplacer(secret),
}
return lw
}
func (w *LineWriter) Write(p []byte) (n int, err error) {
data := string(p)
if w.replacer != nil {
data = w.replacer.Replace(data)
}
log.Trace().Str("step-uuid", w.stepUUID).Msgf("grpc write line: %s", data)
line := &rpc.LogEntry{
Data: []byte(strings.TrimSuffix(data, "\n")), // remove trailing newline
StepUUID: w.stepUUID,
Time: int64(time.Since(w.startTime).Seconds()),
Type: rpc.LogEntryStdout,
Line: w.num,
}
w.num++
if err := w.peer.Log(context.Background(), line); err != nil {
return 0, err
}
return len(data), nil
}
func (w *LineWriter) Close() error {
return nil
}

View file

@ -0,0 +1,58 @@
// Copyright 2019 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package log_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/mocks"
)
func TestLineWriter(t *testing.T) {
peer := mocks.NewPeer(t)
peer.On("Log", mock.Anything, mock.Anything).Return(nil)
secrets := []string{"world"}
lw := log.NewLineWriter(peer, "e9ea76a5-44a1-4059-9c4a-6956c478b26d", secrets...)
defer lw.Close()
_, err := lw.Write([]byte("hello world\n"))
assert.NoError(t, err)
_, err = lw.Write([]byte("the previous line had no newline at the end"))
assert.NoError(t, err)
peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Line: 0,
Data: []byte("hello ********"),
})
peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Line: 1,
Data: []byte("the previous line had no newline at the end"),
})
peer.AssertExpectations(t)
}

62
pipeline/log/utils.go Normal file
View file

@ -0,0 +1,62 @@
// Copyright 2024 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package log
import (
"bufio"
"errors"
"io"
)
func writeChunks(dst io.WriteCloser, data []byte, size int) error {
if len(data) <= size {
_, err := dst.Write(data)
return err
}
for len(data) > size {
if _, err := dst.Write(data[:size]); err != nil {
return err
}
data = data[size:]
}
if len(data) > 0 {
_, err := dst.Write(data)
return err
}
return nil
}
func CopyLineByLine(dst io.WriteCloser, src io.Reader, maxSize int) error {
r := bufio.NewReader(src)
defer dst.Close()
for {
// TODO: read til newline or maxSize directly
line, err := r.ReadBytes('\n')
if errors.Is(err, io.EOF) {
return writeChunks(dst, line, maxSize)
} else if err != nil {
return err
}
err = writeChunks(dst, line, maxSize)
if err != nil {
return err
}
}
}

146
pipeline/log/utils_test.go Normal file
View file

@ -0,0 +1,146 @@
// Copyright 2024 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package log_test
import (
"io"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/log"
)
type testWriter struct {
*sync.Mutex
writes []string
}
func (b *testWriter) Write(p []byte) (n int, err error) {
b.Lock()
defer b.Unlock()
b.writes = append(b.writes, string(p))
return len(p), nil
}
func (b *testWriter) Close() error {
return nil
}
func (b *testWriter) GetWrites() []string {
b.Lock()
defer b.Unlock()
w := make([]string, len(b.writes))
copy(w, b.writes)
return w
}
func TestCopyLineByLine(t *testing.T) {
r, w := io.Pipe()
testWriter := &testWriter{
Mutex: &sync.Mutex{},
writes: make([]string, 0),
}
go func() {
err := log.CopyLineByLine(testWriter, r, 1024)
assert.NoError(t, err)
}()
// wait for the goroutine to start
time.Sleep(time.Millisecond)
// write 4 bytes without newline
if _, err := w.Write([]byte("1234")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
writes := testWriter.GetWrites()
assert.Lenf(t, writes, 0, "expected 0 writes, got: %v", writes)
// write more bytes with newlines
if _, err := w.Write([]byte("5\n678\n90")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
writes = testWriter.GetWrites()
assert.Lenf(t, writes, 2, "expected 2 writes, got: %v", writes)
// wait for the goroutine to write the data
time.Sleep(time.Millisecond)
writtenData := strings.Join(writes, "-")
assert.Equal(t, "12345\n-678\n", writtenData, "unexpected writtenData: %s", writtenData)
// closing the writer should flush the remaining data
w.Close()
// wait for the goroutine to finish
time.Sleep(10 * time.Millisecond)
// the written data contains all the data we wrote
writtenData = strings.Join(testWriter.GetWrites(), "-")
assert.Equal(t, "12345\n-678\n-90", writtenData, "unexpected writtenData: %s", writtenData)
}
func TestCopyLineByLineSizeLimit(t *testing.T) {
r, w := io.Pipe()
testWriter := &testWriter{
Mutex: &sync.Mutex{},
writes: make([]string, 0),
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := log.CopyLineByLine(testWriter, r, 4)
assert.NoError(t, err)
}()
// wait for the goroutine to start
time.Sleep(time.Millisecond)
// write 4 bytes without newline
if _, err := w.Write([]byte("12345")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
writes := testWriter.GetWrites()
assert.Lenf(t, testWriter.GetWrites(), 0, "expected 0 writes, got: %v", writes)
// write more bytes
if _, err := w.Write([]byte("67\n89")); err != nil {
t.Fatalf("unexpected error: %v", err)
}
writes = testWriter.GetWrites()
assert.Lenf(t, testWriter.GetWrites(), 2, "expected 2 writes, got: %v", writes)
writes = testWriter.GetWrites()
writtenData := strings.Join(writes, "-")
assert.Equal(t, "1234-567\n", writtenData, "unexpected writtenData: %s", writtenData)
// closing the writer should flush the remaining data
w.Close()
wg.Wait()
}

View file

@ -16,14 +16,7 @@
package rpc
import (
"context"
"fmt"
"strings"
"time"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/shared"
)
// Identifies the type of line in the logs.
@ -41,7 +34,7 @@ type LogEntry struct {
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Line int `json:"line,omitempty"`
Data string `json:"data,omitempty"`
Data []byte `json:"data,omitempty"`
}
func (l *LogEntry) String() string {
@ -52,57 +45,3 @@ func (l *LogEntry) String() string {
return fmt.Sprintf("[%s:L%v:%vs] %s", l.StepUUID, l.Line, l.Time, l.Data)
}
}
// LineWriter sends logs to the client.
type LineWriter struct {
peer Peer
stepUUID string
num int
now time.Time
rep *strings.Replacer
lines []*LogEntry
}
// NewLineWriter returns a new line reader.
func NewLineWriter(peer Peer, stepUUID string, secret ...string) *LineWriter {
return &LineWriter{
peer: peer,
stepUUID: stepUUID,
now: time.Now().UTC(),
rep: shared.NewSecretsReplacer(secret),
lines: nil,
}
}
func (w *LineWriter) Write(p []byte) (n int, err error) {
data := string(p)
if w.rep != nil {
data = w.rep.Replace(data)
}
log.Trace().Str("step-uuid", w.stepUUID).Msgf("grpc write line: %s", data)
line := &LogEntry{
Data: data,
StepUUID: w.stepUUID,
Time: int64(time.Since(w.now).Seconds()),
Type: LogEntryStdout,
Line: w.num,
}
if err := w.peer.Log(context.Background(), line); err != nil {
log.Error().Err(err).Str("step-uuid", w.stepUUID).Msg("fail to write pipeline log to peer")
}
w.num++
w.lines = append(w.lines, line)
return len(p), nil
}
// Lines returns the line history.
func (w *LineWriter) Lines() []*LogEntry {
return w.lines
}
// Clear clears the line history.
func (w *LineWriter) Clear() {
w.lines = w.lines[:0]
}

View file

@ -25,7 +25,7 @@ func TestLogEntry(t *testing.T) {
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 60,
Line: 1,
Data: "starting redis server",
Data: []byte("starting redis server"),
}
assert.Equal(t, "[e9ea76a5-44a1-4059-9c4a-6956c478b26d:L1:60s] starting redis server", line.String())
}

261
pipeline/rpc/mocks/peer.go Normal file
View file

@ -0,0 +1,261 @@
// Code generated by mockery. DO NOT EDIT.
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
rpc "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)
// Peer is an autogenerated mock type for the Peer type
type Peer struct {
mock.Mock
}
// Done provides a mock function with given fields: c, id, state
func (_m *Peer) Done(c context.Context, id string, state rpc.State) error {
ret := _m.Called(c, id, state)
if len(ret) == 0 {
panic("no return value specified for Done")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok {
r0 = rf(c, id, state)
} else {
r0 = ret.Error(0)
}
return r0
}
// Extend provides a mock function with given fields: c, id
func (_m *Peer) Extend(c context.Context, id string) error {
ret := _m.Called(c, id)
if len(ret) == 0 {
panic("no return value specified for Extend")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(c, id)
} else {
r0 = ret.Error(0)
}
return r0
}
// Init provides a mock function with given fields: c, id, state
func (_m *Peer) Init(c context.Context, id string, state rpc.State) error {
ret := _m.Called(c, id, state)
if len(ret) == 0 {
panic("no return value specified for Init")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok {
r0 = rf(c, id, state)
} else {
r0 = ret.Error(0)
}
return r0
}
// Log provides a mock function with given fields: c, logEntry
func (_m *Peer) Log(c context.Context, logEntry *rpc.LogEntry) error {
ret := _m.Called(c, logEntry)
if len(ret) == 0 {
panic("no return value specified for Log")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *rpc.LogEntry) error); ok {
r0 = rf(c, logEntry)
} else {
r0 = ret.Error(0)
}
return r0
}
// Next provides a mock function with given fields: c, f
func (_m *Peer) Next(c context.Context, f rpc.Filter) (*rpc.Workflow, error) {
ret := _m.Called(c, f)
if len(ret) == 0 {
panic("no return value specified for Next")
}
var r0 *rpc.Workflow
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, rpc.Filter) (*rpc.Workflow, error)); ok {
return rf(c, f)
}
if rf, ok := ret.Get(0).(func(context.Context, rpc.Filter) *rpc.Workflow); ok {
r0 = rf(c, f)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*rpc.Workflow)
}
}
if rf, ok := ret.Get(1).(func(context.Context, rpc.Filter) error); ok {
r1 = rf(c, f)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RegisterAgent provides a mock function with given fields: ctx, platform, backend, version, capacity
func (_m *Peer) RegisterAgent(ctx context.Context, platform string, backend string, version string, capacity int) (int64, error) {
ret := _m.Called(ctx, platform, backend, version, capacity)
if len(ret) == 0 {
panic("no return value specified for RegisterAgent")
}
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, int) (int64, error)); ok {
return rf(ctx, platform, backend, version, capacity)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, int) int64); ok {
r0 = rf(ctx, platform, backend, version, capacity)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, string, string, string, int) error); ok {
r1 = rf(ctx, platform, backend, version, capacity)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ReportHealth provides a mock function with given fields: c
func (_m *Peer) ReportHealth(c context.Context) error {
ret := _m.Called(c)
if len(ret) == 0 {
panic("no return value specified for ReportHealth")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(c)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnregisterAgent provides a mock function with given fields: ctx
func (_m *Peer) UnregisterAgent(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for UnregisterAgent")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// Update provides a mock function with given fields: c, id, state
func (_m *Peer) Update(c context.Context, id string, state rpc.State) error {
ret := _m.Called(c, id, state)
if len(ret) == 0 {
panic("no return value specified for Update")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok {
r0 = rf(c, id, state)
} else {
r0 = ret.Error(0)
}
return r0
}
// Version provides a mock function with given fields: c
func (_m *Peer) Version(c context.Context) (*rpc.Version, error) {
ret := _m.Called(c)
if len(ret) == 0 {
panic("no return value specified for Version")
}
var r0 *rpc.Version
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*rpc.Version, error)); ok {
return rf(c)
}
if rf, ok := ret.Get(0).(func(context.Context) *rpc.Version); ok {
r0 = rf(c)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*rpc.Version)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(c)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Wait provides a mock function with given fields: c, id
func (_m *Peer) Wait(c context.Context, id string) error {
ret := _m.Called(c, id)
if len(ret) == 0 {
panic("no return value specified for Wait")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(c, id)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewPeer(t interface {
mock.TestingT
Cleanup(func())
}) *Peer {
mock := &Peer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -50,6 +50,8 @@ type (
}
)
//go:generate mockery --name Peer --output mocks --case underscore
// Peer defines a peer-to-peer connection.
type Peer interface {
// Version returns the server- & grpc-version

View file

@ -16,4 +16,4 @@ package proto
// Version is the version of the woodpecker.proto file,
// IMPORTANT: increased by 1 each time it get changed.
const Version int32 = 7
const Version int32 = 8

View file

@ -15,8 +15,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc v4.24.4
// protoc-gen-go v1.34.1
// protoc v4.25.1
// source: woodpecker.proto
package proto
@ -131,7 +131,7 @@ type LogEntry struct {
Time int64 `protobuf:"varint,2,opt,name=time,proto3" json:"time,omitempty"`
Line int32 `protobuf:"varint,3,opt,name=line,proto3" json:"line,omitempty"`
Type int32 `protobuf:"varint,4,opt,name=type,proto3" json:"type,omitempty"` // 0 = stdout, 1 = stderr, 2 = exit-code, 3 = metadata, 4 = progress
Data string `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"`
Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *LogEntry) Reset() {
@ -194,11 +194,11 @@ func (x *LogEntry) GetType() int32 {
return 0
}
func (x *LogEntry) GetData() string {
func (x *LogEntry) GetData() []byte {
if x != nil {
return x.Data
}
return ""
return nil
}
type Filter struct {
@ -1109,7 +1109,7 @@ var file_woodpecker_proto_rawDesc = []byte{
0x0a, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x6c, 0x69,
0x6e, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05,
0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x76, 0x0a, 0x06, 0x46, 0x69,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x76, 0x0a, 0x06, 0x46, 0x69,
0x6c, 0x74, 0x65, 0x72, 0x12, 0x31, 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x01,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x69, 0x6c,
0x74, 0x65, 0x72, 0x2e, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52,

View file

@ -55,7 +55,7 @@ message LogEntry {
int64 time = 2;
int32 line = 3;
int32 type = 4; // 0 = stdout, 1 = stderr, 2 = exit-code, 3 = metadata, 4 = progress
string data = 5;
bytes data = 5;
}
message Filter {

View file

@ -1,7 +1,22 @@
// Copyright 2021 Woodpecker Authors
// Copyright 2011 Drone.IO Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.24.4
// - protoc v4.25.1
// source: woodpecker.proto
package proto
@ -59,7 +74,7 @@ func NewWoodpeckerClient(cc grpc.ClientConnInterface) WoodpeckerClient {
func (c *woodpeckerClient) Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) {
out := new(VersionResponse)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Version", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Version_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -68,7 +83,7 @@ func (c *woodpeckerClient) Version(ctx context.Context, in *Empty, opts ...grpc.
func (c *woodpeckerClient) Next(ctx context.Context, in *NextRequest, opts ...grpc.CallOption) (*NextResponse, error) {
out := new(NextResponse)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Next", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Next_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -77,7 +92,7 @@ func (c *woodpeckerClient) Next(ctx context.Context, in *NextRequest, opts ...gr
func (c *woodpeckerClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Init", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Init_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -86,7 +101,7 @@ func (c *woodpeckerClient) Init(ctx context.Context, in *InitRequest, opts ...gr
func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Wait", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Wait_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -95,7 +110,7 @@ func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...gr
func (c *woodpeckerClient) Done(ctx context.Context, in *DoneRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Done", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Done_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -104,7 +119,7 @@ func (c *woodpeckerClient) Done(ctx context.Context, in *DoneRequest, opts ...gr
func (c *woodpeckerClient) Extend(ctx context.Context, in *ExtendRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Extend", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Extend_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -113,7 +128,7 @@ func (c *woodpeckerClient) Extend(ctx context.Context, in *ExtendRequest, opts .
func (c *woodpeckerClient) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Update", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Update_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -122,7 +137,7 @@ func (c *woodpeckerClient) Update(ctx context.Context, in *UpdateRequest, opts .
func (c *woodpeckerClient) Log(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/Log", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_Log_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -131,7 +146,7 @@ func (c *woodpeckerClient) Log(ctx context.Context, in *LogRequest, opts ...grpc
func (c *woodpeckerClient) RegisterAgent(ctx context.Context, in *RegisterAgentRequest, opts ...grpc.CallOption) (*RegisterAgentResponse, error) {
out := new(RegisterAgentResponse)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/RegisterAgent", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_RegisterAgent_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -149,7 +164,7 @@ func (c *woodpeckerClient) UnregisterAgent(ctx context.Context, in *Empty, opts
func (c *woodpeckerClient) ReportHealth(ctx context.Context, in *ReportHealthRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.Woodpecker/ReportHealth", in, out, opts...)
err := c.cc.Invoke(ctx, Woodpecker_ReportHealth_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -234,7 +249,7 @@ func _Woodpecker_Version_Handler(srv interface{}, ctx context.Context, dec func(
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Version",
FullMethod: Woodpecker_Version_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Version(ctx, req.(*Empty))
@ -252,7 +267,7 @@ func _Woodpecker_Next_Handler(srv interface{}, ctx context.Context, dec func(int
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Next",
FullMethod: Woodpecker_Next_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Next(ctx, req.(*NextRequest))
@ -270,7 +285,7 @@ func _Woodpecker_Init_Handler(srv interface{}, ctx context.Context, dec func(int
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Init",
FullMethod: Woodpecker_Init_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Init(ctx, req.(*InitRequest))
@ -288,7 +303,7 @@ func _Woodpecker_Wait_Handler(srv interface{}, ctx context.Context, dec func(int
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Wait",
FullMethod: Woodpecker_Wait_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Wait(ctx, req.(*WaitRequest))
@ -306,7 +321,7 @@ func _Woodpecker_Done_Handler(srv interface{}, ctx context.Context, dec func(int
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Done",
FullMethod: Woodpecker_Done_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Done(ctx, req.(*DoneRequest))
@ -324,7 +339,7 @@ func _Woodpecker_Extend_Handler(srv interface{}, ctx context.Context, dec func(i
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Extend",
FullMethod: Woodpecker_Extend_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Extend(ctx, req.(*ExtendRequest))
@ -342,7 +357,7 @@ func _Woodpecker_Update_Handler(srv interface{}, ctx context.Context, dec func(i
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Update",
FullMethod: Woodpecker_Update_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Update(ctx, req.(*UpdateRequest))
@ -360,7 +375,7 @@ func _Woodpecker_Log_Handler(srv interface{}, ctx context.Context, dec func(inte
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/Log",
FullMethod: Woodpecker_Log_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).Log(ctx, req.(*LogRequest))
@ -378,7 +393,7 @@ func _Woodpecker_RegisterAgent_Handler(srv interface{}, ctx context.Context, dec
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/RegisterAgent",
FullMethod: Woodpecker_RegisterAgent_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).RegisterAgent(ctx, req.(*RegisterAgentRequest))
@ -414,7 +429,7 @@ func _Woodpecker_ReportHealth_Handler(srv interface{}, ctx context.Context, dec
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Woodpecker/ReportHealth",
FullMethod: Woodpecker_ReportHealth_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerServer).ReportHealth(ctx, req.(*ReportHealthRequest))
@ -478,6 +493,10 @@ var Woodpecker_ServiceDesc = grpc.ServiceDesc{
Metadata: "woodpecker.proto",
}
const (
WoodpeckerAuth_Auth_FullMethodName = "/proto.WoodpeckerAuth/Auth"
)
// WoodpeckerAuthClient is the client API for WoodpeckerAuth service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
@ -495,7 +514,7 @@ func NewWoodpeckerAuthClient(cc grpc.ClientConnInterface) WoodpeckerAuthClient {
func (c *woodpeckerAuthClient) Auth(ctx context.Context, in *AuthRequest, opts ...grpc.CallOption) (*AuthResponse, error) {
out := new(AuthResponse)
err := c.cc.Invoke(ctx, "/proto.WoodpeckerAuth/Auth", in, out, opts...)
err := c.cc.Invoke(ctx, WoodpeckerAuth_Auth_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -540,7 +559,7 @@ func _WoodpeckerAuth_Auth_Handler(srv interface{}, ctx context.Context, dec func
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.WoodpeckerAuth/Auth",
FullMethod: WoodpeckerAuth_Auth_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WoodpeckerAuthServer).Auth(ctx, req.(*AuthRequest))

View file

@ -186,7 +186,7 @@ func LogStreamSSE(c *gin.Context) {
return
}
if step.State != model.StatusRunning {
if step.State != model.StatusPending && step.State != model.StatusRunning {
log.Debug().Msg("step not running (anymore).")
logWriteStringErr(io.WriteString(rw, "event: error\ndata: step not running (anymore)\n\n"))
return
@ -205,6 +205,13 @@ func LogStreamSSE(c *gin.Context) {
log.Debug().Msg("log stream: connection closed")
}()
err = server.Config.Services.Logs.Open(ctx, step.ID)
if err != nil {
log.Error().Err(err).Msg("log stream: open failed")
logWriteStringErr(io.WriteString(rw, "event: error\ndata: can't open stream\n\n"))
return
}
go func() {
err := server.Config.Services.Logs.Tail(ctx, step.ID, func(entries ...*model.LogEntry) {
for _, entry := range entries {

View file

@ -328,7 +328,7 @@ func (s *RPC) Log(c context.Context, _logEntry *rpc.LogEntry) error {
StepID: step.ID,
Time: _logEntry.Time,
Line: _logEntry.Line,
Data: []byte(_logEntry.Data),
Data: _logEntry.Data,
Type: model.LogEntryType(_logEntry.Type),
}
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)

View file

@ -112,7 +112,7 @@ import { useStorage } from '@vueuse/core';
import { AnsiUp } from 'ansi_up';
import { decode } from 'js-base64';
import { debounce } from 'lodash';
import { computed, inject, nextTick, onMounted, ref, toRef, watch, type Ref } from 'vue';
import { computed, inject, nextTick, onBeforeUnmount, onMounted, ref, toRef, watch, type Ref } from 'vue';
import { useI18n } from 'vue-i18n';
import { useRoute } from 'vue-router';
@ -126,7 +126,7 @@ import { findStep, isStepFinished, isStepRunning } from '~/utils/helpers';
interface LogLine {
index: number;
number: number;
text: string;
text?: string;
time?: number;
type: 'error' | 'warning' | null;
}
@ -184,7 +184,7 @@ function writeLog(line: Partial<LogLine>) {
logBuffer.value.push({
index: line.index ?? 0,
number: (line.index ?? 0) + 1,
text: ansiUp.value.ansi_to_html(line.text ?? ''),
text: ansiUp.value.ansi_to_html(decode(line.text ?? '')),
time: line.time ?? 0,
type: null, // TODO: implement way to detect errors and warnings
});
@ -277,18 +277,16 @@ async function loadLogs() {
return;
}
if (!repo) {
throw new Error('Unexpected: "repo" should be provided at this place');
}
log.value = undefined;
logBuffer.value = [];
ansiUp.value = new AnsiUp();
ansiUp.value.use_classes = true;
if (!repo) {
throw new Error('Unexpected: "repo" should be provided at this place');
}
if (stream.value) {
stream.value.close();
}
stream.value?.close();
if (!hasLogs.value || !step.value) {
return;
@ -297,12 +295,12 @@ async function loadLogs() {
if (isStepFinished(step.value)) {
loadedStepSlug.value = stepSlug.value;
const logs = await apiClient.getLogs(repo.value.id, pipeline.value.number, step.value.id);
logs?.forEach((line) => writeLog({ index: line.line, text: decode(line.data), time: line.time }));
logs?.forEach((line) => writeLog({ index: line.line, text: line.data, time: line.time }));
flushLogs(false);
} else if (isStepRunning(step.value)) {
} else if (step.value.state === 'pending' || isStepRunning(step.value)) {
loadedStepSlug.value = stepSlug.value;
stream.value = apiClient.streamLogs(repo.value.id, pipeline.value.number, step.value.id, (line) => {
writeLog({ index: line.line, text: decode(line.data), time: line.time });
writeLog({ index: line.line, text: line.data, time: line.time });
flushLogs(true);
});
}
@ -331,6 +329,10 @@ onMounted(async () => {
await loadLogs();
});
onBeforeUnmount(() => {
stream.value?.close();
});
watch(stepSlug, async () => {
await loadLogs();
});