Process workflow logs in batches (#4045)

This commit is contained in:
hg 2024-09-18 19:29:56 +05:00 committed by GitHub
parent 4683968925
commit 276b279b7f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 359 additions and 209 deletions

View file

@ -25,29 +25,44 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcproto "google.golang.org/protobuf/proto"
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto"
)
// Set grpc version on compile time to compare against server version response.
const ClientGrpcVersion int32 = proto.Version
const (
// Set grpc version on compile time to compare against server version response.
ClientGrpcVersion int32 = proto.Version
// Maximum size of an outgoing log message.
// Picked to prevent it from going over GRPC size limit (4 MiB) with a large safety margin.
maxLogBatchSize int = 1 * 1024 * 1024
// Maximum amount of time between sending consecutive batched log messages.
// Controls the delay between the CI job generating a log record, and web users receiving it.
maxLogFlushPeriod time.Duration = time.Second
)
type client struct {
client proto.WoodpeckerClient
conn *grpc.ClientConn
logs chan *proto.LogEntry
}
// NewGrpcClient returns a new grpc Client.
func NewGrpcClient(conn *grpc.ClientConn) rpc.Peer {
func NewGrpcClient(ctx context.Context, conn *grpc.ClientConn) rpc.Peer {
client := new(client)
client.client = proto.NewWoodpeckerClient(conn)
client.conn = conn
client.logs = make(chan *proto.LogEntry, 10) // max memory use: 10 lines * 1 MiB
go client.processLogs(ctx)
return client
}
func (c *client) Close() error {
close(c.logs)
return c.conn.Close()
}
@ -367,18 +382,69 @@ func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepSt
return nil
}
// Log writes the step log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)
req.LogEntry = new(proto.LogEntry)
req.LogEntry.StepUuid = logEntry.StepUUID
req.LogEntry.Data = logEntry.Data
req.LogEntry.Line = int32(logEntry.Line)
req.LogEntry.Time = logEntry.Time
req.LogEntry.Type = int32(logEntry.Type)
// EnqueueLog queues the log entry to be written in a batch later.
func (c *client) EnqueueLog(logEntry *rpc.LogEntry) {
c.logs <- &proto.LogEntry{
StepUuid: logEntry.StepUUID,
Data: logEntry.Data,
Line: int32(logEntry.Line),
Time: logEntry.Time,
Type: int32(logEntry.Type),
}
}
func (c *client) processLogs(ctx context.Context) {
var entries []*proto.LogEntry
var bytes int
send := func() {
if len(entries) == 0 {
return
}
log.Debug().
Int("entries", len(entries)).
Int("bytes", bytes).
Msg("log drain: sending queued logs")
if err := c.sendLogs(ctx, entries); err != nil {
log.Error().Err(err).Msg("log drain: could not send logs to server")
}
// even if send failed, we don't have infinite memory; retry has already been used
entries = entries[:0]
bytes = 0
}
// ctx.Done() is covered by the log channel being closed
for {
_, err = c.client.Log(ctx, req)
select {
case entry, ok := <-c.logs:
if !ok {
log.Info().Msg("log drain: channel closed")
send()
return
}
entries = append(entries, entry)
bytes += grpcproto.Size(entry) // cspell:words grpcproto
if bytes >= maxLogBatchSize {
send()
}
case <-time.After(maxLogFlushPeriod):
send()
}
}
}
func (c *client) sendLogs(ctx context.Context, entries []*proto.LogEntry) error {
req := &proto.LogRequest{LogEntries: entries}
retry := c.newBackOff()
for {
_, err := c.client.Log(ctx, req)
if err == nil {
break
}

View file

@ -156,7 +156,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
}
defer conn.Close()
client := agent_rpc.NewGrpcClient(conn)
client := agent_rpc.NewGrpcClient(ctx, conn)
agentConfigPersisted := atomic.Bool{}
grpcCtx := metadata.NewOutgoingContext(grpcClientCtx, metadata.Pairs("hostname", hostname))

View file

@ -16,7 +16,6 @@
package log
import (
"context"
"io"
"strings"
"sync"
@ -67,9 +66,6 @@ func (w *LineWriter) Write(p []byte) (n int, err error) {
w.num++
if err := w.peer.Log(context.Background(), line); err != nil {
return 0, err
}
w.peer.EnqueueLog(line)
return len(data), nil
}

View file

@ -27,7 +27,7 @@ import (
func TestLineWriter(t *testing.T) {
peer := mocks.NewPeer(t)
peer.On("Log", mock.Anything, mock.Anything).Return(nil)
peer.On("EnqueueLog", mock.Anything)
secrets := []string{"world"}
lw := log.NewLineWriter(peer, "e9ea76a5-44a1-4059-9c4a-6956c478b26d", secrets...)
@ -37,7 +37,7 @@ func TestLineWriter(t *testing.T) {
_, err = lw.Write([]byte("the previous line had no newline at the end"))
assert.NoError(t, err)
peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
@ -45,7 +45,7 @@ func TestLineWriter(t *testing.T) {
Data: []byte("hello ********"),
})
peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,

View file

@ -35,6 +35,11 @@ func (_m *Peer) Done(c context.Context, workflowID string, state rpc.WorkflowSta
return r0
}
// EnqueueLog provides a mock function with given fields: logEntry
func (_m *Peer) EnqueueLog(logEntry *rpc.LogEntry) {
_m.Called(logEntry)
}
// Extend provides a mock function with given fields: c, workflowID
func (_m *Peer) Extend(c context.Context, workflowID string) error {
ret := _m.Called(c, workflowID)
@ -71,24 +76,6 @@ func (_m *Peer) Init(c context.Context, workflowID string, state rpc.WorkflowSta
return r0
}
// Log provides a mock function with given fields: c, logEntry
func (_m *Peer) Log(c context.Context, logEntry *rpc.LogEntry) error {
ret := _m.Called(c, logEntry)
if len(ret) == 0 {
panic("no return value specified for Log")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *rpc.LogEntry) error); ok {
r0 = rf(c, logEntry)
} else {
r0 = ret.Error(0)
}
return r0
}
// Next provides a mock function with given fields: c, f
func (_m *Peer) Next(c context.Context, f rpc.Filter) (*rpc.Workflow, error) {
ret := _m.Called(c, f)

View file

@ -82,8 +82,8 @@ type Peer interface {
// Update updates the step state
Update(c context.Context, workflowID string, state StepState) error
// Log writes the step log entry
Log(c context.Context, logEntry *LogEntry) error
// EnqueueLog queues the step log entry for delayed sending
EnqueueLog(logEntry *LogEntry)
// RegisterAgent register our agent to the server
RegisterAgent(ctx context.Context, platform, backend, version string, capacity int) (int64, error)

View file

@ -16,4 +16,4 @@ package proto
// Version is the version of the woodpecker.proto file,
// IMPORTANT: increased by 1 each time it get changed.
const Version int32 = 9
const Version int32 = 10

View file

@ -685,7 +685,7 @@ type LogRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
LogEntry *LogEntry `protobuf:"bytes,1,opt,name=logEntry,proto3" json:"logEntry,omitempty"`
LogEntries []*LogEntry `protobuf:"bytes,1,rep,name=logEntries,proto3" json:"logEntries,omitempty"`
}
func (x *LogRequest) Reset() {
@ -720,9 +720,9 @@ func (*LogRequest) Descriptor() ([]byte, []int) {
return file_woodpecker_proto_rawDescGZIP(), []int{11}
}
func (x *LogRequest) GetLogEntry() *LogEntry {
func (x *LogRequest) GetLogEntries() []*LogEntry {
if x != nil {
return x.LogEntry
return x.LogEntries
}
return nil
}
@ -1212,91 +1212,91 @@ var file_woodpecker_proto_rawDesc = []byte{
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64,
0x12, 0x26, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x10, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x65, 0x70, 0x53, 0x74, 0x61, 0x74,
0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x39, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2b, 0x0a, 0x08, 0x6c, 0x6f, 0x67, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x45, 0x6e,
0x74, 0x72, 0x79, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x2d, 0x0a, 0x13,
0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x82, 0x01, 0x0a, 0x14,
0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d,
0x12, 0x1a, 0x0a, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01,
0x28, 0x05, 0x52, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, 0x12, 0x18, 0x0a, 0x07,
0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62,
0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e,
0x22, 0x5b, 0x0a, 0x0f, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x76, 0x65, 0x72, 0x73,
0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x56,
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3b, 0x0a,
0x0c, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, 0x0a,
0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77,
0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x22, 0x32, 0x0a, 0x15, 0x52, 0x65,
0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18,
0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x49,
0x0a, 0x0b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a,
0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x19,
0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03,
0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x64, 0x0a, 0x0c, 0x41, 0x75, 0x74,
0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20,
0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c,
0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x32,
0xbb, 0x04, 0x0a, 0x0a, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x12, 0x31,
0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
0x00, 0x12, 0x31, 0x0a, 0x04, 0x4e, 0x65, 0x78, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x3d, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x0a, 0x6c, 0x6f, 0x67, 0x45, 0x6e, 0x74,
0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x6c, 0x6f, 0x67,
0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x2d, 0x0a, 0x13, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22,
0x82, 0x01, 0x0a, 0x14, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e,
0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74,
0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74,
0x66, 0x6f, 0x72, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79,
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79,
0x12, 0x18, 0x0a, 0x07, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65,
0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72,
0x73, 0x69, 0x6f, 0x6e, 0x22, 0x5b, 0x0a, 0x0f, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x72, 0x70, 0x63, 0x5f,
0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x67,
0x72, 0x70, 0x63, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65,
0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f,
0x6e, 0x22, 0x3b, 0x0a, 0x0c, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x2b, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x6f, 0x72, 0x6b,
0x66, 0x6c, 0x6f, 0x77, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x22, 0x32,
0x0a, 0x15, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74,
0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74,
0x49, 0x64, 0x22, 0x49, 0x0a, 0x0b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x6b,
0x65, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02,
0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x64, 0x0a,
0x0c, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a,
0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69,
0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64,
0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e,
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f,
0x6b, 0x65, 0x6e, 0x32, 0xbb, 0x04, 0x0a, 0x0a, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b,
0x65, 0x72, 0x12, 0x31, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x04, 0x4e, 0x65, 0x78, 0x74, 0x12, 0x12, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74,
0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x57, 0x61, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00,
0x12, 0x2a, 0x0a, 0x04, 0x57, 0x61, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04,
0x44, 0x6f, 0x6e, 0x65, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x6f, 0x6e,
0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06, 0x45, 0x78, 0x74, 0x65,
0x6e, 0x64, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x78, 0x74, 0x65, 0x6e,
0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06, 0x55, 0x70, 0x64, 0x61,
0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74,
0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x28, 0x0a, 0x03, 0x4c, 0x6f, 0x67, 0x12,
0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79,
0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0d, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67,
0x65, 0x6e, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x67, 0x69,
0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65,
0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x12, 0x2f, 0x0a, 0x0f, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67,
0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x00, 0x12, 0x3a, 0x0a, 0x0c, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74,
0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x32, 0x43, 0x0a,
0x0e, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x41, 0x75, 0x74, 0x68, 0x12,
0x31, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x22, 0x00, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x6f, 0x2e, 0x77, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63,
0x6b, 0x65, 0x72, 0x2d, 0x63, 0x69, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x77, 0x6f, 0x6f, 0x64, 0x70,
0x65, 0x63, 0x6b, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e,
0x65, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
0x12, 0x2a, 0x0a, 0x04, 0x44, 0x6f, 0x6e, 0x65, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x44, 0x6f, 0x6e, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06,
0x45, 0x78, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45,
0x78, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06,
0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55,
0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x28, 0x0a, 0x03,
0x4c, 0x6f, 0x67, 0x12, 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x6f, 0x67, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45,
0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0d, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74,
0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x67,
0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x0f, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74,
0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d,
0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0c, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48,
0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65,
0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22,
0x00, 0x32, 0x43, 0x0a, 0x0e, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x41,
0x75, 0x74, 0x68, 0x12, 0x31, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x6f, 0x2e, 0x77, 0x6f, 0x6f,
0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x2d, 0x63, 0x69, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x77,
0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x69, 0x70,
0x65, 0x6c, 0x69, 0x6e, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -1341,7 +1341,7 @@ var file_woodpecker_proto_depIdxs = []int32{
1, // 2: proto.InitRequest.state:type_name -> proto.WorkflowState
1, // 3: proto.DoneRequest.state:type_name -> proto.WorkflowState
0, // 4: proto.UpdateRequest.state:type_name -> proto.StepState
2, // 5: proto.LogRequest.logEntry:type_name -> proto.LogEntry
2, // 5: proto.LogRequest.logEntries:type_name -> proto.LogEntry
4, // 6: proto.NextResponse.workflow:type_name -> proto.Workflow
12, // 7: proto.Woodpecker.Version:input_type -> proto.Empty
5, // 8: proto.Woodpecker.Next:input_type -> proto.NextRequest

View file

@ -106,7 +106,7 @@ message UpdateRequest {
}
message LogRequest {
LogEntry logEntry = 1;
repeated LogEntry logEntries = 1;
}
message Empty {

View file

@ -28,12 +28,19 @@ import (
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/server"
"go.woodpecker-ci.org/woodpecker/v2/server/logging"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
"go.woodpecker-ci.org/woodpecker/v2/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v2/server/router/middleware/session"
"go.woodpecker-ci.org/woodpecker/v2/server/store"
)
const (
// How many batches of logs to keep for each client before starting to
// drop them if the client is not consuming them faster than they arrive.
maxQueuedBatchesPerClient int = 30
)
// EventStreamSSE
//
// @Summary Stream events like pipeline updates
@ -213,17 +220,32 @@ func LogStreamSSE(c *gin.Context) {
}
go func() {
err := server.Config.Services.Logs.Tail(ctx, step.ID, func(entries ...*model.LogEntry) {
for _, entry := range entries {
select {
case <-ctx.Done():
return
default:
ee, _ := json.Marshal(entry)
logChan <- ee
batches := make(logging.LogChan, maxQueuedBatchesPerClient)
go func() {
defer func() {
if r := recover(); r != nil {
log.Error().Msgf("error sending log message: %v", r)
}
}()
for entries := range batches {
for _, entry := range entries {
select {
case <-ctx.Done():
return
default:
if ee, err := json.Marshal(entry); err == nil {
logChan <- ee
} else {
log.Error().Err(err).Msg("unable to serialize log entry")
}
}
}
}
})
}()
err := server.Config.Services.Logs.Tail(ctx, step.ID, batches)
if err != nil {
log.Error().Err(err).Msg("tail of logs failed")
}

View file

@ -336,27 +336,12 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt
}
// Log writes a log entry to the database and publishes it to the pubsub.
func (s *RPC) Log(c context.Context, rpcLogEntry *rpc.LogEntry) error {
// convert rpc log_entry to model.log_entry
step, err := s.store.StepByUUID(rpcLogEntry.StepUUID)
// An explicit stepUUID makes it obvious that all entries must come from the same step.
func (s *RPC) Log(c context.Context, stepUUID string, rpcLogEntries []*rpc.LogEntry) error {
step, err := s.store.StepByUUID(stepUUID)
if err != nil {
return fmt.Errorf("could not find step with uuid %s in store: %w", rpcLogEntry.StepUUID, err)
return fmt.Errorf("could not find step with uuid %s in store: %w", stepUUID, err)
}
logEntry := &model.LogEntry{
StepID: step.ID,
Time: rpcLogEntry.Time,
Line: rpcLogEntry.Line,
Data: rpcLogEntry.Data,
Type: model.LogEntryType(rpcLogEntry.Type),
}
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
go func() {
// write line to listening web clients
if err := s.logger.Write(c, logEntry.StepID, logEntry); err != nil {
log.Error().Err(err).Msgf("rpc server could not write to logger")
}
}()
agent, err := s.getAgentFromContext(c)
if err != nil {
@ -368,7 +353,34 @@ func (s *RPC) Log(c context.Context, rpcLogEntry *rpc.LogEntry) error {
return err
}
return server.Config.Services.LogStore.LogAppend(logEntry)
var logEntries []*model.LogEntry
for _, rpcLogEntry := range rpcLogEntries {
if rpcLogEntry.StepUUID != stepUUID {
return fmt.Errorf("expected step UUID %s, got %s", stepUUID, rpcLogEntry.StepUUID)
}
logEntries = append(logEntries, &model.LogEntry{
StepID: step.ID,
Time: rpcLogEntry.Time,
Line: rpcLogEntry.Line,
Data: rpcLogEntry.Data,
Type: model.LogEntryType(rpcLogEntry.Type),
})
}
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
go func() {
// write line to listening web clients
if err := s.logger.Write(c, step.ID, logEntries); err != nil {
log.Error().Err(err).Msgf("rpc server could not write to logger")
}
}()
if err = server.Config.Services.LogStore.LogAppend(step, logEntries); err != nil {
log.Error().Err(err).Msg("could not store log entries")
}
return nil
}
func (s *RPC) RegisterAgent(ctx context.Context, platform, backend, version string, capacity int32) (int64, error) {

View file

@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto"
@ -133,15 +134,39 @@ func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) (
}
func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto.Empty, error) {
logEntry := &rpc.LogEntry{
Data: req.GetLogEntry().GetData(),
Line: int(req.GetLogEntry().GetLine()),
Time: req.GetLogEntry().GetTime(),
StepUUID: req.GetLogEntry().GetStepUuid(),
Type: int(req.GetLogEntry().GetType()),
var (
entries []*rpc.LogEntry
stepUUID string
)
write := func() error {
if len(entries) > 0 {
if err := s.peer.Log(c, stepUUID, entries); err != nil {
log.Error().Err(err).Msg("could not write log entries")
return err
}
}
return nil
}
for _, reqEntry := range req.GetLogEntries() {
entry := &rpc.LogEntry{
Data: reqEntry.GetData(),
Line: int(reqEntry.GetLine()),
Time: reqEntry.GetTime(),
StepUUID: reqEntry.GetStepUuid(),
Type: int(reqEntry.GetType()),
}
if entry.StepUUID != stepUUID {
_ = write()
stepUUID = entry.StepUUID
entries = entries[:0]
}
entries = append(entries, entry)
}
res := new(proto.Empty)
err := s.peer.Log(c, logEntry)
err := write()
return res, err
}

View file

@ -18,6 +18,8 @@ import (
"context"
"sync"
logger "github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
)
@ -38,7 +40,7 @@ import (
// sub.start()... event loop
type subscriber struct {
handler Handler
receiver LogChan
}
type stream struct {
@ -77,7 +79,7 @@ func (l *log) Open(_ context.Context, stepID int64) error {
return nil
}
func (l *log) Write(ctx context.Context, stepID int64, logEntry *model.LogEntry) error {
func (l *log) Write(ctx context.Context, stepID int64, entries []*model.LogEntry) error {
l.Lock()
s, ok := l.streams[stepID]
l.Unlock()
@ -92,15 +94,20 @@ func (l *log) Write(ctx context.Context, stepID int64, logEntry *model.LogEntry)
}
s.Lock()
s.list = append(s.list, logEntry)
s.list = append(s.list, entries...)
for sub := range s.subs {
go sub.handler(logEntry)
select {
case sub.receiver <- entries:
default:
logger.Info().Msgf("subscriber channel is full -- dropping logs for step %d", stepID)
}
}
s.Unlock()
return nil
}
func (l *log) Tail(c context.Context, stepID int64, handler Handler) error {
func (l *log) Tail(c context.Context, stepID int64, receiver LogChan) error {
l.Lock()
s, ok := l.streams[stepID]
l.Unlock()
@ -109,11 +116,11 @@ func (l *log) Tail(c context.Context, stepID int64, handler Handler) error {
}
sub := &subscriber{
handler: handler,
receiver: receiver,
}
s.Lock()
if len(s.list) != 0 {
sub.handler(s.list...)
sub.receiver <- s.list
}
s.subs[sub] = struct{}{}
s.Unlock()

View file

@ -39,28 +39,37 @@ func TestLogging(t *testing.T) {
context.Background(),
)
receiver := make(LogChan, 10)
defer close(receiver)
go func() {
for range receiver {
wg.Done()
}
}()
logger := New()
assert.NoError(t, logger.Open(ctx, testStepID))
go func() {
assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() }))
assert.NoError(t, logger.Tail(ctx, testStepID, receiver))
}()
go func() {
assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() }))
assert.NoError(t, logger.Tail(ctx, testStepID, receiver))
}()
<-time.After(500 * time.Millisecond)
wg.Add(4)
go func() {
assert.NoError(t, logger.Write(ctx, testStepID, testEntry))
assert.NoError(t, logger.Write(ctx, testStepID, testEntry))
assert.NoError(t, logger.Write(ctx, testStepID, []*model.LogEntry{testEntry}))
assert.NoError(t, logger.Write(ctx, testStepID, []*model.LogEntry{testEntry}))
}()
wg.Wait()
wg.Add(1)
go func() {
assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() }))
assert.NoError(t, logger.Tail(ctx, testStepID, receiver))
}()
<-time.After(500 * time.Millisecond)

View file

@ -24,8 +24,8 @@ import (
// ErrNotFound is returned when the log does not exist.
var ErrNotFound = errors.New("stream: not found")
// Handler defines a callback function for handling log entries.
type Handler func(...*model.LogEntry)
// LogChan defines a channel type for receiving ordered batches of log entries.
type LogChan chan []*model.LogEntry
// Log defines a log multiplexer.
type Log interface {
@ -33,10 +33,10 @@ type Log interface {
Open(c context.Context, stepID int64) error
// Write writes the entry to the log.
Write(c context.Context, stepID int64, entry *model.LogEntry) error
Write(c context.Context, stepID int64, entries []*model.LogEntry) error
// Tail tails the log.
Tail(c context.Context, stepID int64, handler Handler) error
Tail(c context.Context, stepID int64, handler LogChan) error
// Close closes the log.
Close(c context.Context, stepID int64) error

View file

@ -8,6 +8,8 @@ import (
"path/filepath"
"strings"
logger "github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/pipeline"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
"go.woodpecker-ci.org/woodpecker/v2/server/services/log"
@ -70,19 +72,30 @@ func (l logStore) LogFind(step *model.Step) ([]*model.LogEntry, error) {
return entries, nil
}
func (l logStore) LogAppend(logEntry *model.LogEntry) error {
file, err := os.OpenFile(l.filePath(logEntry.StepID), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
func (l logStore) LogAppend(step *model.Step, logEntries []*model.LogEntry) error {
path := l.filePath(step.ID)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
logger.Error().Err(err).Msgf("could not open log file %s", path)
return err
}
jsonData, err := json.Marshal(logEntry)
if err != nil {
return err
var bytes []byte
for _, logEntry := range logEntries {
if jsonLine, err := json.Marshal(logEntry); err == nil {
bytes = append(bytes, jsonLine...)
bytes = append(bytes, byte('\n'))
} else {
logger.Error().Err(err).Msg("could not convert log entry to JSON")
}
}
_, err = file.Write(append(jsonData, byte('\n')))
if err != nil {
return err
if _, err = file.Write(bytes); err != nil {
logger.Error().Err(err).Msg("could not write out log entries")
}
return file.Close()
}

View file

@ -4,6 +4,6 @@ import "go.woodpecker-ci.org/woodpecker/v2/server/model"
type Service interface {
LogFind(step *model.Step) ([]*model.LogEntry, error)
LogAppend(logEntry *model.LogEntry) error
LogAppend(step *model.Step, logEntries []*model.LogEntry) error
LogDelete(step *model.Step) error
}

View file

@ -15,16 +15,33 @@
package datastore
import (
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
)
// Maximum number of records to store in one PostgreSQL statement.
// Too large a value results in `pq: got XX parameters but PostgreSQL only supports 65535 parameters`.
const pgBatchSize = 1000
func (s storage) LogFind(step *model.Step) ([]*model.LogEntry, error) {
var logEntries []*model.LogEntry
return logEntries, s.engine.Asc("id").Where("step_id = ?", step.ID).Find(&logEntries)
}
func (s storage) LogAppend(logEntry *model.LogEntry) error {
_, err := s.engine.Insert(logEntry)
func (s storage) LogAppend(_ *model.Step, logEntries []*model.LogEntry) error {
var err error
// TODO: adapted from slices.Chunk(); switch to it in Go 1.23+
for i := 0; i < len(logEntries); i += pgBatchSize {
end := min(pgBatchSize, len(logEntries[i:]))
chunk := logEntries[i : i+end]
if _, err = s.engine.Insert(chunk); err != nil {
log.Error().Err(err).Msg("could not store log entries to db")
}
}
return err
}

View file

@ -45,9 +45,7 @@ func TestLogCreateFindDelete(t *testing.T) {
},
}
for _, logEntry := range logEntries {
assert.NoError(t, store.LogAppend(logEntry))
}
assert.NoError(t, store.LogAppend(&step, logEntries))
// we want to find our inserted logs
_logEntries, err := store.LogFind(&step)
@ -83,9 +81,7 @@ func TestLogAppend(t *testing.T) {
},
}
for _, logEntry := range logEntries {
assert.NoError(t, store.LogAppend(logEntry))
}
assert.NoError(t, store.LogAppend(&step, logEntries))
logEntry := &model.LogEntry{
StepID: step.ID,
@ -94,7 +90,7 @@ func TestLogAppend(t *testing.T) {
Time: 20,
}
assert.NoError(t, store.LogAppend(logEntry))
assert.NoError(t, store.LogAppend(&step, []*model.LogEntry{logEntry}))
_logEntries, err := store.LogFind(&step)
assert.NoError(t, err)

View file

@ -1340,17 +1340,17 @@ func (_m *Store) HasRedirectionForRepo(_a0 int64, _a1 string) (bool, error) {
return r0, r1
}
// LogAppend provides a mock function with given fields: logEntry
func (_m *Store) LogAppend(logEntry *model.LogEntry) error {
ret := _m.Called(logEntry)
// LogAppend provides a mock function with given fields: _a0, _a1
func (_m *Store) LogAppend(_a0 *model.Step, _a1 []*model.LogEntry) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for LogAppend")
}
var r0 error
if rf, ok := ret.Get(0).(func(*model.LogEntry) error); ok {
r0 = rf(logEntry)
if rf, ok := ret.Get(0).(func(*model.Step, []*model.LogEntry) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}

View file

@ -143,7 +143,7 @@ type Store interface {
// Logs
LogFind(*model.Step) ([]*model.LogEntry, error)
LogAppend(logEntry *model.LogEntry) error
LogAppend(*model.Step, []*model.LogEntry) error
LogDelete(*model.Step) error
// Tasks