mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2024-05-13 19:52:39 +00:00
Compare commits
21 commits
727d95d95a
...
018a3aa005
Author | SHA1 | Date | |
---|---|---|---|
018a3aa005 | |||
c6b2cd8a48 | |||
325b1b5e57 | |||
4b1ff6d1a7 | |||
2c3cd83402 | |||
a230e88c3a | |||
3a3e09d86d | |||
795b7c2ef9 | |||
41ac0fe00f | |||
c92aa6de4c | |||
f7a0e1b1f8 | |||
e08869716a | |||
7a545940d0 | |||
401fc45dac | |||
ce463b32b1 | |||
bc68e2fb08 | |||
00b66a72fa | |||
b3930d1733 | |||
436a101b1a | |||
bb20cf38ba | |||
3bb2b82965 |
|
@ -3,7 +3,7 @@ when:
|
|||
|
||||
variables:
|
||||
- &golang_image 'docker.io/golang:1.22.2'
|
||||
- &node_image 'docker.io/node:21-alpine'
|
||||
- &node_image 'docker.io/node:22-alpine'
|
||||
- &xgo_image 'docker.io/techknowlogick/xgo:go-1.22.1'
|
||||
- &xgo_version 'go-1.21.2'
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
variables:
|
||||
- &golang_image 'docker.io/golang:1.22.2'
|
||||
- &node_image 'docker.io/node:21-alpine'
|
||||
- &node_image 'docker.io/node:22-alpine'
|
||||
- &xgo_image 'docker.io/techknowlogick/xgo:go-1.22.1'
|
||||
- &xgo_version 'go-1.21.2'
|
||||
- &buildx_plugin 'docker.io/woodpeckerci/plugin-docker-buildx:3.2.1'
|
||||
|
|
|
@ -13,7 +13,7 @@ steps:
|
|||
branch: renovate/*
|
||||
|
||||
- name: spellcheck
|
||||
image: docker.io/node:21-alpine
|
||||
image: docker.io/node:22-alpine
|
||||
depends_on: []
|
||||
commands:
|
||||
- corepack enable
|
||||
|
|
|
@ -6,7 +6,7 @@ when:
|
|||
- renovate/*
|
||||
|
||||
variables:
|
||||
- &node_image 'docker.io/node:21-alpine'
|
||||
- &node_image 'docker.io/node:22-alpine'
|
||||
- &when
|
||||
path:
|
||||
# related config files
|
||||
|
|
|
@ -17,15 +17,22 @@ package agent
|
|||
import (
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
agentLogger "go.woodpecker-ci.org/woodpecker/v2/agent/logger"
|
||||
"go.woodpecker-ci.org/woodpecker/v2/pipeline"
|
||||
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
|
||||
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
|
||||
)
|
||||
|
||||
const (
|
||||
writeBufferSize = 10240 // 10kb
|
||||
flushInterval = 1 * time.Second
|
||||
)
|
||||
|
||||
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger {
|
||||
return func(step *backend.Step, rc io.Reader) error {
|
||||
loglogger := logger.With().
|
||||
|
@ -43,7 +50,9 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo
|
|||
loglogger.Debug().Msg("log stream opened")
|
||||
|
||||
logStream := rpc.NewLineWriter(r.client, step.UUID, secrets...)
|
||||
if _, err := io.Copy(logStream, rc); err != nil {
|
||||
logStreamBufferWithTimeout := agentLogger.NewLogBuffer(logStream, writeBufferSize, flushInterval)
|
||||
defer logStreamBufferWithTimeout.Close()
|
||||
if _, err := io.Copy(logStreamBufferWithTimeout, rc); err != nil {
|
||||
log.Error().Err(err).Msg("copy limited logStream part")
|
||||
}
|
||||
|
||||
|
|
84
agent/logger/buf.go
Normal file
84
agent/logger/buf.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
package logger
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LogBuffer struct {
|
||||
*sync.Mutex
|
||||
buffer *bufio.Writer
|
||||
flushInterval time.Duration
|
||||
timer *time.Timer
|
||||
closeChan chan struct{}
|
||||
}
|
||||
|
||||
func NewLogBuffer(writer io.Writer, bufferSize int, flushInterval time.Duration) *LogBuffer {
|
||||
lb := &LogBuffer{
|
||||
Mutex: &sync.Mutex{},
|
||||
buffer: bufio.NewWriterSize(writer, bufferSize),
|
||||
flushInterval: flushInterval,
|
||||
timer: time.NewTimer(flushInterval),
|
||||
closeChan: make(chan struct{}),
|
||||
}
|
||||
go lb.start()
|
||||
return lb
|
||||
}
|
||||
|
||||
func (lb *LogBuffer) Write(data []byte) (int, error) {
|
||||
n, err := lb.buffer.Write(data)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// reset timer since there's new activity
|
||||
if !lb.timer.Stop() {
|
||||
<-lb.timer.C
|
||||
}
|
||||
lb.timer.Reset(lb.flushInterval)
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (lb *LogBuffer) start() {
|
||||
for {
|
||||
if !lb.waitForFlush() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LogBuffer) waitForFlush() bool {
|
||||
// wait for either a timeout or a manual flush signal
|
||||
select {
|
||||
case <-lb.timer.C:
|
||||
// time limit reached, flush the buffer
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
err := lb.buffer.Flush()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
case <-lb.closeChan:
|
||||
// close signal received
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (lb *LogBuffer) Flush() error {
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
return lb.buffer.Flush()
|
||||
}
|
||||
|
||||
func (lb *LogBuffer) Close() error {
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
lb.timer.Stop()
|
||||
close(lb.closeChan)
|
||||
return lb.buffer.Flush()
|
||||
}
|
92
agent/logger/buf_test.go
Normal file
92
agent/logger/buf_test.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package logger_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.woodpecker-ci.org/woodpecker/v2/agent/logger"
|
||||
)
|
||||
|
||||
type testBuffer struct {
|
||||
buf []byte
|
||||
flushes int
|
||||
}
|
||||
|
||||
func (b *testBuffer) Write(p []byte) (n int, err error) {
|
||||
b.buf = append(b.buf, p...)
|
||||
b.flushes++
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func TestFlushAfterSize(t *testing.T) {
|
||||
bufSize := 4
|
||||
bufTime := 10 * time.Minute // using a high value to avoid the timer to trigger
|
||||
|
||||
testBuffer := &testBuffer{
|
||||
buf: make([]byte, 0),
|
||||
flushes: 0,
|
||||
}
|
||||
logBuffer := logger.NewLogBuffer(testBuffer, bufSize, bufTime)
|
||||
defer logBuffer.Close()
|
||||
|
||||
// write 4 bytes (exact buffer size, so fill buffer)
|
||||
if _, err := logBuffer.Write([]byte("123")); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if string(testBuffer.buf) != "" {
|
||||
t.Fatalf("expected 0 bytes, got %s", testBuffer.buf)
|
||||
}
|
||||
|
||||
// write 4 more bytes (buffer should be flushed once)
|
||||
if _, err := logBuffer.Write([]byte("4567")); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if string(testBuffer.buf) != "1234" {
|
||||
t.Fatalf("expected 1234, got %s", testBuffer.buf)
|
||||
}
|
||||
|
||||
// write 2 more bytes (buffer should be flushed again)
|
||||
if _, err := logBuffer.Write([]byte("89")); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// check if the buffer is flushed
|
||||
if testBuffer.flushes != 2 {
|
||||
t.Fatalf("expected 2 flushes, got %d", testBuffer.flushes)
|
||||
}
|
||||
|
||||
if string(testBuffer.buf) != "12345678" {
|
||||
t.Fatalf("expected 12345678, got %s", testBuffer.buf)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlushAfterTime(t *testing.T) {
|
||||
bufSize := 1024 // using a high value to avoid the buffer to be flushed by size
|
||||
bufTime := 10 * time.Millisecond
|
||||
|
||||
testBuffer := &testBuffer{
|
||||
buf: make([]byte, 0),
|
||||
}
|
||||
|
||||
logBuffer := logger.NewLogBuffer(testBuffer, bufSize, bufTime)
|
||||
defer logBuffer.Close()
|
||||
|
||||
// write 4 bytes
|
||||
if _, err := logBuffer.Write([]byte("1234")); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// check if the buffer was not flushed
|
||||
if string(testBuffer.buf) != "" {
|
||||
t.Fatalf("expected 0 bytes, got %d", len(testBuffer.buf))
|
||||
}
|
||||
|
||||
// wait for the buffer to be flushed
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
if string(testBuffer.buf) != "1234" {
|
||||
t.Fatalf("expected 4 bytes, got %d", len(testBuffer.buf))
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
# docker build --rm -f docker/Dockerfile.make -t woodpecker/make:local .
|
||||
FROM docker.io/golang:1.22-alpine3.19 as golang_image
|
||||
FROM docker.io/node:21-alpine3.19
|
||||
FROM docker.io/node:22-alpine3.19
|
||||
|
||||
# renovate: datasource=repology depName=alpine_3_19/make versioning=loose
|
||||
ENV MAKE_VERSION="4.4.1-r2"
|
||||
|
|
|
@ -53,8 +53,8 @@
|
|||
},
|
||||
"pnpm": {
|
||||
"overrides": {
|
||||
"trim": "^0.0.3",
|
||||
"got": "^11.8.5"
|
||||
"trim": "^1.0.0",
|
||||
"got": "^14.0.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
1045
docs/pnpm-lock.yaml
1045
docs/pnpm-lock.yaml
File diff suppressed because it is too large
Load diff
|
@ -18,6 +18,7 @@ package rpc
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -64,7 +65,7 @@ type LineWriter struct {
|
|||
}
|
||||
|
||||
// NewLineWriter returns a new line reader.
|
||||
func NewLineWriter(peer Peer, stepUUID string, secret ...string) *LineWriter {
|
||||
func NewLineWriter(peer Peer, stepUUID string, secret ...string) io.Writer {
|
||||
return &LineWriter{
|
||||
peer: peer,
|
||||
stepUUID: stepUUID,
|
||||
|
@ -96,13 +97,3 @@ func (w *LineWriter) Write(p []byte) (n int, err error) {
|
|||
w.lines = append(w.lines, line)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Lines returns the line history
|
||||
func (w *LineWriter) Lines() []*LogEntry {
|
||||
return w.lines
|
||||
}
|
||||
|
||||
// Clear clears the line history
|
||||
func (w *LineWriter) Clear() {
|
||||
w.lines = w.lines[:0]
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ func LogStreamSSE(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
if step.State != model.StatusRunning {
|
||||
if step.State != model.StatusPending && step.State != model.StatusRunning {
|
||||
log.Debug().Msg("step not running (anymore).")
|
||||
logWriteStringErr(io.WriteString(rw, "event: error\ndata: step not running (anymore)\n\n"))
|
||||
return
|
||||
|
|
|
@ -59,11 +59,11 @@ func (s storage) GetPipelineList(repo *model.Repo, p *model.ListOptions, f *mode
|
|||
|
||||
if f != nil {
|
||||
if f.After != 0 {
|
||||
cond = cond.And(builder.Gt{"pipeline_started": f.After})
|
||||
cond = cond.And(builder.Gt{"pipeline_created": f.After})
|
||||
}
|
||||
|
||||
if f.Before != 0 {
|
||||
cond = cond.And(builder.Lt{"pipeline_started": f.Before})
|
||||
cond = cond.And(builder.Lt{"pipeline_created": f.Before})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -231,21 +231,19 @@ func TestPipelines(t *testing.T) {
|
|||
})
|
||||
|
||||
g.It("Should get filtered pipelines", func() {
|
||||
dt1, _ := time.Parse(time.RFC3339, "2023-01-15T15:00:00Z")
|
||||
pipeline1 := &model.Pipeline{
|
||||
RepoID: repo.ID,
|
||||
Started: dt1.Unix(),
|
||||
RepoID: repo.ID,
|
||||
}
|
||||
dt2, _ := time.Parse(time.RFC3339, "2023-01-15T16:30:00Z")
|
||||
pipeline2 := &model.Pipeline{
|
||||
RepoID: repo.ID,
|
||||
Started: dt2.Unix(),
|
||||
RepoID: repo.ID,
|
||||
}
|
||||
err1 := store.CreatePipeline(pipeline1, []*model.Step{}...)
|
||||
g.Assert(err1).IsNil()
|
||||
time.Sleep(1 * time.Second)
|
||||
before := time.Now().Unix()
|
||||
err2 := store.CreatePipeline(pipeline2, []*model.Step{}...)
|
||||
g.Assert(err2).IsNil()
|
||||
pipelines, err3 := store.GetPipelineList(&model.Repo{ID: 1}, &model.ListOptions{Page: 1, PerPage: 50}, &model.PipelineFilter{Before: dt2.Unix()})
|
||||
pipelines, err3 := store.GetPipelineList(&model.Repo{ID: 1}, &model.ListOptions{Page: 1, PerPage: 50}, &model.PipelineFilter{Before: before})
|
||||
g.Assert(err3).IsNil()
|
||||
g.Assert(len(pipelines)).Equal(1)
|
||||
g.Assert(pipelines[0].ID).Equal(pipeline1.ID)
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -297,7 +297,7 @@ async function loadLogs() {
|
|||
const logs = await apiClient.getLogs(repo.value.id, pipeline.value.number, step.value.id);
|
||||
logs?.forEach((line) => writeLog({ index: line.line, text: decode(line.data), time: line.time }));
|
||||
flushLogs(false);
|
||||
} else if (isStepRunning(step.value)) {
|
||||
} else if (step.value.state === 'pending' || isStepRunning(step.value)) {
|
||||
loadedStepSlug.value = stepSlug.value;
|
||||
stream.value = apiClient.streamLogs(repo.value.id, pipeline.value.number, step.value.id, (line) => {
|
||||
writeLog({ index: line.line, text: decode(line.data), time: line.time });
|
||||
|
|
Loading…
Reference in a new issue