diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index 21d6af30f..ff6d6d0ec 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -25,29 +25,44 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + grpcproto "google.golang.org/protobuf/proto" backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" ) -// Set grpc version on compile time to compare against server version response. -const ClientGrpcVersion int32 = proto.Version +const ( + // Set grpc version on compile time to compare against server version response. + ClientGrpcVersion int32 = proto.Version + + // Maximum size of an outgoing log message. + // Picked to prevent it from going over GRPC size limit (4 MiB) with a large safety margin. + maxLogBatchSize int = 1 * 1024 * 1024 + + // Maximum amount of time between sending consecutive batched log messages. + // Controls the delay between the CI job generating a log record, and web users receiving it. + maxLogFlushPeriod time.Duration = time.Second +) type client struct { client proto.WoodpeckerClient conn *grpc.ClientConn + logs chan *proto.LogEntry } // NewGrpcClient returns a new grpc Client. -func NewGrpcClient(conn *grpc.ClientConn) rpc.Peer { +func NewGrpcClient(ctx context.Context, conn *grpc.ClientConn) rpc.Peer { client := new(client) client.client = proto.NewWoodpeckerClient(conn) client.conn = conn + client.logs = make(chan *proto.LogEntry, 10) // max memory use: 10 lines * 1 MiB + go client.processLogs(ctx) return client } func (c *client) Close() error { + close(c.logs) return c.conn.Close() } @@ -367,18 +382,69 @@ func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepSt return nil } -// Log writes the step log entry. -func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { - retry := c.newBackOff() - req := new(proto.LogRequest) - req.LogEntry = new(proto.LogEntry) - req.LogEntry.StepUuid = logEntry.StepUUID - req.LogEntry.Data = logEntry.Data - req.LogEntry.Line = int32(logEntry.Line) - req.LogEntry.Time = logEntry.Time - req.LogEntry.Type = int32(logEntry.Type) +// EnqueueLog queues the log entry to be written in a batch later. +func (c *client) EnqueueLog(logEntry *rpc.LogEntry) { + c.logs <- &proto.LogEntry{ + StepUuid: logEntry.StepUUID, + Data: logEntry.Data, + Line: int32(logEntry.Line), + Time: logEntry.Time, + Type: int32(logEntry.Type), + } +} + +func (c *client) processLogs(ctx context.Context) { + var entries []*proto.LogEntry + var bytes int + + send := func() { + if len(entries) == 0 { + return + } + + log.Debug(). + Int("entries", len(entries)). + Int("bytes", bytes). + Msg("log drain: sending queued logs") + + if err := c.sendLogs(ctx, entries); err != nil { + log.Error().Err(err).Msg("log drain: could not send logs to server") + } + + // even if send failed, we don't have infinite memory; retry has already been used + entries = entries[:0] + bytes = 0 + } + + // ctx.Done() is covered by the log channel being closed for { - _, err = c.client.Log(ctx, req) + select { + case entry, ok := <-c.logs: + if !ok { + log.Info().Msg("log drain: channel closed") + send() + return + } + + entries = append(entries, entry) + bytes += grpcproto.Size(entry) // cspell:words grpcproto + + if bytes >= maxLogBatchSize { + send() + } + + case <-time.After(maxLogFlushPeriod): + send() + } + } +} + +func (c *client) sendLogs(ctx context.Context, entries []*proto.LogEntry) error { + req := &proto.LogRequest{LogEntries: entries} + retry := c.newBackOff() + + for { + _, err := c.client.Log(ctx, req) if err == nil { break } diff --git a/cmd/agent/core/agent.go b/cmd/agent/core/agent.go index dea39a8a5..f0c7f0071 100644 --- a/cmd/agent/core/agent.go +++ b/cmd/agent/core/agent.go @@ -156,7 +156,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error { } defer conn.Close() - client := agent_rpc.NewGrpcClient(conn) + client := agent_rpc.NewGrpcClient(ctx, conn) agentConfigPersisted := atomic.Bool{} grpcCtx := metadata.NewOutgoingContext(grpcClientCtx, metadata.Pairs("hostname", hostname)) diff --git a/pipeline/log/line_writer.go b/pipeline/log/line_writer.go index 0c5058f45..ff8706f36 100644 --- a/pipeline/log/line_writer.go +++ b/pipeline/log/line_writer.go @@ -16,7 +16,6 @@ package log import ( - "context" "io" "strings" "sync" @@ -67,9 +66,6 @@ func (w *LineWriter) Write(p []byte) (n int, err error) { w.num++ - if err := w.peer.Log(context.Background(), line); err != nil { - return 0, err - } - + w.peer.EnqueueLog(line) return len(data), nil } diff --git a/pipeline/log/line_writer_test.go b/pipeline/log/line_writer_test.go index 8bf8f6251..8a2d6b120 100644 --- a/pipeline/log/line_writer_test.go +++ b/pipeline/log/line_writer_test.go @@ -27,7 +27,7 @@ import ( func TestLineWriter(t *testing.T) { peer := mocks.NewPeer(t) - peer.On("Log", mock.Anything, mock.Anything).Return(nil) + peer.On("EnqueueLog", mock.Anything) secrets := []string{"world"} lw := log.NewLineWriter(peer, "e9ea76a5-44a1-4059-9c4a-6956c478b26d", secrets...) @@ -37,7 +37,7 @@ func TestLineWriter(t *testing.T) { _, err = lw.Write([]byte("the previous line had no newline at the end")) assert.NoError(t, err) - peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{ + peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{ StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d", Time: 0, Type: rpc.LogEntryStdout, @@ -45,7 +45,7 @@ func TestLineWriter(t *testing.T) { Data: []byte("hello ********"), }) - peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{ + peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{ StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d", Time: 0, Type: rpc.LogEntryStdout, diff --git a/pipeline/rpc/mocks/peer.go b/pipeline/rpc/mocks/peer.go index 8d0c45d18..c98456284 100644 --- a/pipeline/rpc/mocks/peer.go +++ b/pipeline/rpc/mocks/peer.go @@ -35,6 +35,11 @@ func (_m *Peer) Done(c context.Context, workflowID string, state rpc.WorkflowSta return r0 } +// EnqueueLog provides a mock function with given fields: logEntry +func (_m *Peer) EnqueueLog(logEntry *rpc.LogEntry) { + _m.Called(logEntry) +} + // Extend provides a mock function with given fields: c, workflowID func (_m *Peer) Extend(c context.Context, workflowID string) error { ret := _m.Called(c, workflowID) @@ -71,24 +76,6 @@ func (_m *Peer) Init(c context.Context, workflowID string, state rpc.WorkflowSta return r0 } -// Log provides a mock function with given fields: c, logEntry -func (_m *Peer) Log(c context.Context, logEntry *rpc.LogEntry) error { - ret := _m.Called(c, logEntry) - - if len(ret) == 0 { - panic("no return value specified for Log") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *rpc.LogEntry) error); ok { - r0 = rf(c, logEntry) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // Next provides a mock function with given fields: c, f func (_m *Peer) Next(c context.Context, f rpc.Filter) (*rpc.Workflow, error) { ret := _m.Called(c, f) diff --git a/pipeline/rpc/peer.go b/pipeline/rpc/peer.go index b38b01051..050a2ab11 100644 --- a/pipeline/rpc/peer.go +++ b/pipeline/rpc/peer.go @@ -82,8 +82,8 @@ type Peer interface { // Update updates the step state Update(c context.Context, workflowID string, state StepState) error - // Log writes the step log entry - Log(c context.Context, logEntry *LogEntry) error + // EnqueueLog queues the step log entry for delayed sending + EnqueueLog(logEntry *LogEntry) // RegisterAgent register our agent to the server RegisterAgent(ctx context.Context, platform, backend, version string, capacity int) (int64, error) diff --git a/pipeline/rpc/proto/version.go b/pipeline/rpc/proto/version.go index 2de7645f3..8f79192ce 100644 --- a/pipeline/rpc/proto/version.go +++ b/pipeline/rpc/proto/version.go @@ -16,4 +16,4 @@ package proto // Version is the version of the woodpecker.proto file, // IMPORTANT: increased by 1 each time it get changed. -const Version int32 = 9 +const Version int32 = 10 diff --git a/pipeline/rpc/proto/woodpecker.pb.go b/pipeline/rpc/proto/woodpecker.pb.go index 8ce1dbc12..dd3423175 100644 --- a/pipeline/rpc/proto/woodpecker.pb.go +++ b/pipeline/rpc/proto/woodpecker.pb.go @@ -685,7 +685,7 @@ type LogRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - LogEntry *LogEntry `protobuf:"bytes,1,opt,name=logEntry,proto3" json:"logEntry,omitempty"` + LogEntries []*LogEntry `protobuf:"bytes,1,rep,name=logEntries,proto3" json:"logEntries,omitempty"` } func (x *LogRequest) Reset() { @@ -720,9 +720,9 @@ func (*LogRequest) Descriptor() ([]byte, []int) { return file_woodpecker_proto_rawDescGZIP(), []int{11} } -func (x *LogRequest) GetLogEntry() *LogEntry { +func (x *LogRequest) GetLogEntries() []*LogEntry { if x != nil { - return x.LogEntry + return x.LogEntries } return nil } @@ -1212,91 +1212,91 @@ var file_woodpecker_proto_rawDesc = []byte{ 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x65, 0x70, 0x53, 0x74, 0x61, 0x74, - 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x39, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2b, 0x0a, 0x08, 0x6c, 0x6f, 0x67, 0x45, 0x6e, 0x74, - 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x45, 0x6e, - 0x74, 0x72, 0x79, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x2d, 0x0a, 0x13, - 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x82, 0x01, 0x0a, 0x14, - 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, - 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, 0x12, 0x18, 0x0a, 0x07, - 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, - 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x22, 0x5b, 0x0a, 0x0f, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x76, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, - 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3b, 0x0a, - 0x0c, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, 0x0a, - 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, - 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x22, 0x32, 0x0a, 0x15, 0x52, 0x65, - 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x49, - 0x0a, 0x0b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, - 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x19, - 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x64, 0x0a, 0x0c, 0x41, 0x75, 0x74, - 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, - 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x32, - 0xbb, 0x04, 0x0a, 0x0a, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x12, 0x31, - 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x31, 0x0a, 0x04, 0x4e, 0x65, 0x78, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x3d, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x0a, 0x6c, 0x6f, 0x67, 0x45, 0x6e, 0x74, + 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x6c, 0x6f, 0x67, + 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x22, 0x2d, 0x0a, 0x13, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, + 0x82, 0x01, 0x0a, 0x14, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, + 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, + 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, + 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, + 0x12, 0x18, 0x0a, 0x07, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x5b, 0x0a, 0x0f, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x72, 0x70, 0x63, 0x5f, + 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x67, + 0x72, 0x70, 0x63, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x22, 0x3b, 0x0a, 0x0c, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x2b, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x22, 0x32, + 0x0a, 0x15, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, + 0x49, 0x64, 0x22, 0x49, 0x0a, 0x0b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x6b, + 0x65, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x64, 0x0a, + 0x0c, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, + 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, + 0x6b, 0x65, 0x6e, 0x32, 0xbb, 0x04, 0x0a, 0x0a, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, + 0x65, 0x72, 0x12, 0x31, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x04, 0x4e, 0x65, 0x78, 0x74, 0x12, 0x12, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x65, 0x78, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, + 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, 0x57, 0x61, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, - 0x12, 0x2a, 0x0a, 0x04, 0x57, 0x61, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x57, 0x61, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2a, 0x0a, 0x04, - 0x44, 0x6f, 0x6e, 0x65, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x44, 0x6f, 0x6e, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06, 0x45, 0x78, 0x74, 0x65, - 0x6e, 0x64, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x78, 0x74, 0x65, 0x6e, - 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x28, 0x0a, 0x03, 0x4c, 0x6f, 0x67, 0x12, - 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0d, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, - 0x65, 0x6e, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x67, 0x69, - 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x2f, 0x0a, 0x0f, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, - 0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, - 0x00, 0x12, 0x3a, 0x0a, 0x0c, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, - 0x68, 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, - 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x32, 0x43, 0x0a, - 0x0e, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x41, 0x75, 0x74, 0x68, 0x12, - 0x31, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x6f, 0x2e, 0x77, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, - 0x6b, 0x65, 0x72, 0x2d, 0x63, 0x69, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x77, 0x6f, 0x6f, 0x64, 0x70, - 0x65, 0x63, 0x6b, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, - 0x65, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x12, 0x2a, 0x0a, 0x04, 0x44, 0x6f, 0x6e, 0x65, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x44, 0x6f, 0x6e, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06, + 0x45, 0x78, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, + 0x78, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x2e, 0x0a, 0x06, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x28, 0x0a, 0x03, + 0x4c, 0x6f, 0x67, 0x12, 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x6f, 0x67, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0d, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x0f, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x0c, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x48, + 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, + 0x70, 0x6f, 0x72, 0x74, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x0c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, + 0x00, 0x32, 0x43, 0x0a, 0x0e, 0x57, 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x41, + 0x75, 0x74, 0x68, 0x12, 0x31, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x6f, 0x2e, 0x77, 0x6f, 0x6f, + 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x2d, 0x63, 0x69, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x77, + 0x6f, 0x6f, 0x64, 0x70, 0x65, 0x63, 0x6b, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x69, 0x70, + 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1341,7 +1341,7 @@ var file_woodpecker_proto_depIdxs = []int32{ 1, // 2: proto.InitRequest.state:type_name -> proto.WorkflowState 1, // 3: proto.DoneRequest.state:type_name -> proto.WorkflowState 0, // 4: proto.UpdateRequest.state:type_name -> proto.StepState - 2, // 5: proto.LogRequest.logEntry:type_name -> proto.LogEntry + 2, // 5: proto.LogRequest.logEntries:type_name -> proto.LogEntry 4, // 6: proto.NextResponse.workflow:type_name -> proto.Workflow 12, // 7: proto.Woodpecker.Version:input_type -> proto.Empty 5, // 8: proto.Woodpecker.Next:input_type -> proto.NextRequest diff --git a/pipeline/rpc/proto/woodpecker.proto b/pipeline/rpc/proto/woodpecker.proto index 8fc577586..ea6c987b9 100644 --- a/pipeline/rpc/proto/woodpecker.proto +++ b/pipeline/rpc/proto/woodpecker.proto @@ -106,7 +106,7 @@ message UpdateRequest { } message LogRequest { - LogEntry logEntry = 1; + repeated LogEntry logEntries = 1; } message Empty { diff --git a/server/api/stream.go b/server/api/stream.go index 2d3c7ee86..a620dd068 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -28,12 +28,19 @@ import ( "github.com/rs/zerolog/log" "go.woodpecker-ci.org/woodpecker/v2/server" + "go.woodpecker-ci.org/woodpecker/v2/server/logging" "go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/pubsub" "go.woodpecker-ci.org/woodpecker/v2/server/router/middleware/session" "go.woodpecker-ci.org/woodpecker/v2/server/store" ) +const ( + // How many batches of logs to keep for each client before starting to + // drop them if the client is not consuming them faster than they arrive. + maxQueuedBatchesPerClient int = 30 +) + // EventStreamSSE // // @Summary Stream events like pipeline updates @@ -213,17 +220,32 @@ func LogStreamSSE(c *gin.Context) { } go func() { - err := server.Config.Services.Logs.Tail(ctx, step.ID, func(entries ...*model.LogEntry) { - for _, entry := range entries { - select { - case <-ctx.Done(): - return - default: - ee, _ := json.Marshal(entry) - logChan <- ee + batches := make(logging.LogChan, maxQueuedBatchesPerClient) + + go func() { + defer func() { + if r := recover(); r != nil { + log.Error().Msgf("error sending log message: %v", r) + } + }() + + for entries := range batches { + for _, entry := range entries { + select { + case <-ctx.Done(): + return + default: + if ee, err := json.Marshal(entry); err == nil { + logChan <- ee + } else { + log.Error().Err(err).Msg("unable to serialize log entry") + } + } } } - }) + }() + + err := server.Config.Services.Logs.Tail(ctx, step.ID, batches) if err != nil { log.Error().Err(err).Msg("tail of logs failed") } diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 718d82709..934d87469 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -336,27 +336,12 @@ func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowSt } // Log writes a log entry to the database and publishes it to the pubsub. -func (s *RPC) Log(c context.Context, rpcLogEntry *rpc.LogEntry) error { - // convert rpc log_entry to model.log_entry - step, err := s.store.StepByUUID(rpcLogEntry.StepUUID) +// An explicit stepUUID makes it obvious that all entries must come from the same step. +func (s *RPC) Log(c context.Context, stepUUID string, rpcLogEntries []*rpc.LogEntry) error { + step, err := s.store.StepByUUID(stepUUID) if err != nil { - return fmt.Errorf("could not find step with uuid %s in store: %w", rpcLogEntry.StepUUID, err) + return fmt.Errorf("could not find step with uuid %s in store: %w", stepUUID, err) } - logEntry := &model.LogEntry{ - StepID: step.ID, - Time: rpcLogEntry.Time, - Line: rpcLogEntry.Line, - Data: rpcLogEntry.Data, - Type: model.LogEntryType(rpcLogEntry.Type), - } - - // make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9) - go func() { - // write line to listening web clients - if err := s.logger.Write(c, logEntry.StepID, logEntry); err != nil { - log.Error().Err(err).Msgf("rpc server could not write to logger") - } - }() agent, err := s.getAgentFromContext(c) if err != nil { @@ -368,7 +353,34 @@ func (s *RPC) Log(c context.Context, rpcLogEntry *rpc.LogEntry) error { return err } - return server.Config.Services.LogStore.LogAppend(logEntry) + var logEntries []*model.LogEntry + + for _, rpcLogEntry := range rpcLogEntries { + if rpcLogEntry.StepUUID != stepUUID { + return fmt.Errorf("expected step UUID %s, got %s", stepUUID, rpcLogEntry.StepUUID) + } + logEntries = append(logEntries, &model.LogEntry{ + StepID: step.ID, + Time: rpcLogEntry.Time, + Line: rpcLogEntry.Line, + Data: rpcLogEntry.Data, + Type: model.LogEntryType(rpcLogEntry.Type), + }) + } + + // make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9) + go func() { + // write line to listening web clients + if err := s.logger.Write(c, step.ID, logEntries); err != nil { + log.Error().Err(err).Msgf("rpc server could not write to logger") + } + }() + + if err = server.Config.Services.LogStore.LogAppend(step, logEntries); err != nil { + log.Error().Err(err).Msg("could not store log entries") + } + + return nil } func (s *RPC) RegisterAgent(ctx context.Context, platform, backend, version string, capacity int32) (int64, error) { diff --git a/server/grpc/server.go b/server/grpc/server.go index d6882c0f8..633a7ee99 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus" prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/rs/zerolog/log" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" @@ -133,15 +134,39 @@ func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) ( } func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto.Empty, error) { - logEntry := &rpc.LogEntry{ - Data: req.GetLogEntry().GetData(), - Line: int(req.GetLogEntry().GetLine()), - Time: req.GetLogEntry().GetTime(), - StepUUID: req.GetLogEntry().GetStepUuid(), - Type: int(req.GetLogEntry().GetType()), + var ( + entries []*rpc.LogEntry + stepUUID string + ) + + write := func() error { + if len(entries) > 0 { + if err := s.peer.Log(c, stepUUID, entries); err != nil { + log.Error().Err(err).Msg("could not write log entries") + return err + } + } + return nil } + + for _, reqEntry := range req.GetLogEntries() { + entry := &rpc.LogEntry{ + Data: reqEntry.GetData(), + Line: int(reqEntry.GetLine()), + Time: reqEntry.GetTime(), + StepUUID: reqEntry.GetStepUuid(), + Type: int(reqEntry.GetType()), + } + if entry.StepUUID != stepUUID { + _ = write() + stepUUID = entry.StepUUID + entries = entries[:0] + } + entries = append(entries, entry) + } + res := new(proto.Empty) - err := s.peer.Log(c, logEntry) + err := write() return res, err } diff --git a/server/logging/log.go b/server/logging/log.go index 1db5475e5..0e713644f 100644 --- a/server/logging/log.go +++ b/server/logging/log.go @@ -18,6 +18,8 @@ import ( "context" "sync" + logger "github.com/rs/zerolog/log" + "go.woodpecker-ci.org/woodpecker/v2/server/model" ) @@ -38,7 +40,7 @@ import ( // sub.start()... event loop type subscriber struct { - handler Handler + receiver LogChan } type stream struct { @@ -77,7 +79,7 @@ func (l *log) Open(_ context.Context, stepID int64) error { return nil } -func (l *log) Write(ctx context.Context, stepID int64, logEntry *model.LogEntry) error { +func (l *log) Write(ctx context.Context, stepID int64, entries []*model.LogEntry) error { l.Lock() s, ok := l.streams[stepID] l.Unlock() @@ -92,15 +94,20 @@ func (l *log) Write(ctx context.Context, stepID int64, logEntry *model.LogEntry) } s.Lock() - s.list = append(s.list, logEntry) + s.list = append(s.list, entries...) for sub := range s.subs { - go sub.handler(logEntry) + select { + case sub.receiver <- entries: + default: + logger.Info().Msgf("subscriber channel is full -- dropping logs for step %d", stepID) + } } s.Unlock() + return nil } -func (l *log) Tail(c context.Context, stepID int64, handler Handler) error { +func (l *log) Tail(c context.Context, stepID int64, receiver LogChan) error { l.Lock() s, ok := l.streams[stepID] l.Unlock() @@ -109,11 +116,11 @@ func (l *log) Tail(c context.Context, stepID int64, handler Handler) error { } sub := &subscriber{ - handler: handler, + receiver: receiver, } s.Lock() if len(s.list) != 0 { - sub.handler(s.list...) + sub.receiver <- s.list } s.subs[sub] = struct{}{} s.Unlock() diff --git a/server/logging/log_test.go b/server/logging/log_test.go index 90565a48f..f84a5b8f3 100644 --- a/server/logging/log_test.go +++ b/server/logging/log_test.go @@ -39,28 +39,37 @@ func TestLogging(t *testing.T) { context.Background(), ) + receiver := make(LogChan, 10) + defer close(receiver) + + go func() { + for range receiver { + wg.Done() + } + }() + logger := New() assert.NoError(t, logger.Open(ctx, testStepID)) go func() { - assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() })) + assert.NoError(t, logger.Tail(ctx, testStepID, receiver)) }() go func() { - assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() })) + assert.NoError(t, logger.Tail(ctx, testStepID, receiver)) }() <-time.After(500 * time.Millisecond) wg.Add(4) go func() { - assert.NoError(t, logger.Write(ctx, testStepID, testEntry)) - assert.NoError(t, logger.Write(ctx, testStepID, testEntry)) + assert.NoError(t, logger.Write(ctx, testStepID, []*model.LogEntry{testEntry})) + assert.NoError(t, logger.Write(ctx, testStepID, []*model.LogEntry{testEntry})) }() wg.Wait() wg.Add(1) go func() { - assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() })) + assert.NoError(t, logger.Tail(ctx, testStepID, receiver)) }() <-time.After(500 * time.Millisecond) diff --git a/server/logging/logging.go b/server/logging/logging.go index 400273def..e8eb80117 100644 --- a/server/logging/logging.go +++ b/server/logging/logging.go @@ -24,8 +24,8 @@ import ( // ErrNotFound is returned when the log does not exist. var ErrNotFound = errors.New("stream: not found") -// Handler defines a callback function for handling log entries. -type Handler func(...*model.LogEntry) +// LogChan defines a channel type for receiving ordered batches of log entries. +type LogChan chan []*model.LogEntry // Log defines a log multiplexer. type Log interface { @@ -33,10 +33,10 @@ type Log interface { Open(c context.Context, stepID int64) error // Write writes the entry to the log. - Write(c context.Context, stepID int64, entry *model.LogEntry) error + Write(c context.Context, stepID int64, entries []*model.LogEntry) error // Tail tails the log. - Tail(c context.Context, stepID int64, handler Handler) error + Tail(c context.Context, stepID int64, handler LogChan) error // Close closes the log. Close(c context.Context, stepID int64) error diff --git a/server/services/log/file/file.go b/server/services/log/file/file.go index 551a05d89..f19931beb 100644 --- a/server/services/log/file/file.go +++ b/server/services/log/file/file.go @@ -8,6 +8,8 @@ import ( "path/filepath" "strings" + logger "github.com/rs/zerolog/log" + "go.woodpecker-ci.org/woodpecker/v2/pipeline" "go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/services/log" @@ -70,19 +72,30 @@ func (l logStore) LogFind(step *model.Step) ([]*model.LogEntry, error) { return entries, nil } -func (l logStore) LogAppend(logEntry *model.LogEntry) error { - file, err := os.OpenFile(l.filePath(logEntry.StepID), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) +func (l logStore) LogAppend(step *model.Step, logEntries []*model.LogEntry) error { + path := l.filePath(step.ID) + + file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) if err != nil { + logger.Error().Err(err).Msgf("could not open log file %s", path) return err } - jsonData, err := json.Marshal(logEntry) - if err != nil { - return err + + var bytes []byte + + for _, logEntry := range logEntries { + if jsonLine, err := json.Marshal(logEntry); err == nil { + bytes = append(bytes, jsonLine...) + bytes = append(bytes, byte('\n')) + } else { + logger.Error().Err(err).Msg("could not convert log entry to JSON") + } } - _, err = file.Write(append(jsonData, byte('\n'))) - if err != nil { - return err + + if _, err = file.Write(bytes); err != nil { + logger.Error().Err(err).Msg("could not write out log entries") } + return file.Close() } diff --git a/server/services/log/service.go b/server/services/log/service.go index 5cc53d1e9..da8f98146 100644 --- a/server/services/log/service.go +++ b/server/services/log/service.go @@ -4,6 +4,6 @@ import "go.woodpecker-ci.org/woodpecker/v2/server/model" type Service interface { LogFind(step *model.Step) ([]*model.LogEntry, error) - LogAppend(logEntry *model.LogEntry) error + LogAppend(step *model.Step, logEntries []*model.LogEntry) error LogDelete(step *model.Step) error } diff --git a/server/store/datastore/log.go b/server/store/datastore/log.go index 68c708c63..3aedf4650 100644 --- a/server/store/datastore/log.go +++ b/server/store/datastore/log.go @@ -15,16 +15,33 @@ package datastore import ( + "github.com/rs/zerolog/log" + "go.woodpecker-ci.org/woodpecker/v2/server/model" ) +// Maximum number of records to store in one PostgreSQL statement. +// Too large a value results in `pq: got XX parameters but PostgreSQL only supports 65535 parameters`. +const pgBatchSize = 1000 + func (s storage) LogFind(step *model.Step) ([]*model.LogEntry, error) { var logEntries []*model.LogEntry return logEntries, s.engine.Asc("id").Where("step_id = ?", step.ID).Find(&logEntries) } -func (s storage) LogAppend(logEntry *model.LogEntry) error { - _, err := s.engine.Insert(logEntry) +func (s storage) LogAppend(_ *model.Step, logEntries []*model.LogEntry) error { + var err error + + // TODO: adapted from slices.Chunk(); switch to it in Go 1.23+ + for i := 0; i < len(logEntries); i += pgBatchSize { + end := min(pgBatchSize, len(logEntries[i:])) + chunk := logEntries[i : i+end] + + if _, err = s.engine.Insert(chunk); err != nil { + log.Error().Err(err).Msg("could not store log entries to db") + } + } + return err } diff --git a/server/store/datastore/log_test.go b/server/store/datastore/log_test.go index f2ee1ffc6..94897268c 100644 --- a/server/store/datastore/log_test.go +++ b/server/store/datastore/log_test.go @@ -45,9 +45,7 @@ func TestLogCreateFindDelete(t *testing.T) { }, } - for _, logEntry := range logEntries { - assert.NoError(t, store.LogAppend(logEntry)) - } + assert.NoError(t, store.LogAppend(&step, logEntries)) // we want to find our inserted logs _logEntries, err := store.LogFind(&step) @@ -83,9 +81,7 @@ func TestLogAppend(t *testing.T) { }, } - for _, logEntry := range logEntries { - assert.NoError(t, store.LogAppend(logEntry)) - } + assert.NoError(t, store.LogAppend(&step, logEntries)) logEntry := &model.LogEntry{ StepID: step.ID, @@ -94,7 +90,7 @@ func TestLogAppend(t *testing.T) { Time: 20, } - assert.NoError(t, store.LogAppend(logEntry)) + assert.NoError(t, store.LogAppend(&step, []*model.LogEntry{logEntry})) _logEntries, err := store.LogFind(&step) assert.NoError(t, err) diff --git a/server/store/mocks/store.go b/server/store/mocks/store.go index 55d7257aa..df88e436d 100644 --- a/server/store/mocks/store.go +++ b/server/store/mocks/store.go @@ -1340,17 +1340,17 @@ func (_m *Store) HasRedirectionForRepo(_a0 int64, _a1 string) (bool, error) { return r0, r1 } -// LogAppend provides a mock function with given fields: logEntry -func (_m *Store) LogAppend(logEntry *model.LogEntry) error { - ret := _m.Called(logEntry) +// LogAppend provides a mock function with given fields: _a0, _a1 +func (_m *Store) LogAppend(_a0 *model.Step, _a1 []*model.LogEntry) error { + ret := _m.Called(_a0, _a1) if len(ret) == 0 { panic("no return value specified for LogAppend") } var r0 error - if rf, ok := ret.Get(0).(func(*model.LogEntry) error); ok { - r0 = rf(logEntry) + if rf, ok := ret.Get(0).(func(*model.Step, []*model.LogEntry) error); ok { + r0 = rf(_a0, _a1) } else { r0 = ret.Error(0) } diff --git a/server/store/store.go b/server/store/store.go index ff408f075..f1e958994 100644 --- a/server/store/store.go +++ b/server/store/store.go @@ -143,7 +143,7 @@ type Store interface { // Logs LogFind(*model.Step) ([]*model.LogEntry, error) - LogAppend(logEntry *model.LogEntry) error + LogAppend(*model.Step, []*model.LogEntry) error LogDelete(*model.Step) error // Tasks