2022-10-18 01:24:12 +00:00
// Copyright 2022 Woodpecker Authors
2021-06-28 17:28:18 +00:00
// Copyright 2021 Informatyka Boguslawski sp. z o.o. sp.k., http://www.ib.pl/
2022-10-18 01:24:12 +00:00
// Copyright 2018 Drone.IO Inc.
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// http://www.apache.org/licenses/LICENSE-2.0
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-06-28 17:28:18 +00:00
//
// This file has been modified by Informatyka Boguslawski sp. z o.o. sp.k.
2018-02-19 22:24:10 +00:00
2021-09-22 18:48:01 +00:00
package grpc
2017-03-05 07:56:08 +00:00
import (
"context"
"encoding/json"
2023-01-28 13:13:04 +00:00
"errors"
2017-04-12 17:56:30 +00:00
"fmt"
2017-03-05 07:56:08 +00:00
"strconv"
2023-01-28 13:13:04 +00:00
"time"
2017-03-05 07:56:08 +00:00
2021-06-22 10:34:35 +00:00
"github.com/prometheus/client_golang/prometheus"
2023-04-30 01:40:13 +00:00
"github.com/rs/zerolog/log"
2021-10-12 07:25:13 +00:00
grpcMetadata "google.golang.org/grpc/metadata"
2021-09-24 11:18:34 +00:00
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
2022-11-04 23:35:06 +00:00
"github.com/woodpecker-ci/woodpecker/server/forge"
2021-09-23 20:29:09 +00:00
"github.com/woodpecker-ci/woodpecker/server/logging"
2021-10-12 07:25:13 +00:00
"github.com/woodpecker-ci/woodpecker/server/model"
2022-11-06 11:44:04 +00:00
"github.com/woodpecker-ci/woodpecker/server/pipeline"
2021-09-23 20:29:09 +00:00
"github.com/woodpecker-ci/woodpecker/server/pubsub"
"github.com/woodpecker-ci/woodpecker/server/queue"
2021-09-23 11:33:59 +00:00
"github.com/woodpecker-ci/woodpecker/server/store"
2017-03-05 07:56:08 +00:00
)
type RPC struct {
2022-11-04 23:35:06 +00:00
forge forge . Forge
2022-10-18 01:24:12 +00:00
queue queue . Queue
2023-10-13 05:34:33 +00:00
pubsub * pubsub . Publisher
2022-10-18 01:24:12 +00:00
logger logging . Log
store store . Store
pipelineTime * prometheus . GaugeVec
pipelineCount * prometheus . CounterVec
2017-03-05 07:56:08 +00:00
}
// Next implements the rpc.Next function
2023-08-21 16:30:19 +00:00
func ( s * RPC ) Next ( c context . Context , agentFilter rpc . Filter ) ( * rpc . Workflow , error ) {
2021-09-22 18:48:01 +00:00
metadata , ok := grpcMetadata . FromIncomingContext ( c )
2017-07-20 16:21:15 +00:00
if ok {
hostname , ok := metadata [ "hostname" ]
if ok && len ( hostname ) != 0 {
2021-10-12 07:25:13 +00:00
log . Debug ( ) . Msgf ( "agent connected: %s: polling" , hostname [ 0 ] )
2017-07-20 16:21:15 +00:00
}
}
2022-05-30 23:12:18 +00:00
fn , err := createFilterFunc ( agentFilter )
2017-09-08 23:45:17 +00:00
if err != nil {
return nil , err
2017-03-12 08:46:59 +00:00
}
2019-06-16 13:56:32 +00:00
for {
2023-01-30 19:18:48 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return nil , err
} else if agent . NoSchedule {
return nil , nil
}
2023-03-21 13:10:43 +00:00
task , err := s . queue . Poll ( c , agent . ID , fn )
2019-06-16 13:56:32 +00:00
if err != nil {
return nil , err
} else if task == nil {
return nil , nil
}
if task . ShouldRun ( ) {
2023-08-21 16:30:19 +00:00
workflow := new ( rpc . Workflow )
err = json . Unmarshal ( task . Data , workflow )
return workflow , err
2021-12-01 13:22:06 +00:00
}
2023-01-30 19:18:48 +00:00
2021-12-01 13:22:06 +00:00
if err := s . Done ( c , task . ID , rpc . State { } ) ; err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "mark task '%s' done failed" , task . ID )
2019-06-16 13:56:32 +00:00
}
}
2017-03-05 07:56:08 +00:00
}
2017-03-05 11:05:16 +00:00
// Wait implements the rpc.Wait function
func ( s * RPC ) Wait ( c context . Context , id string ) error {
return s . queue . Wait ( c , id )
2017-03-05 07:56:08 +00:00
}
// Extend implements the rpc.Extend function
func ( s * RPC ) Extend ( c context . Context , id string ) error {
return s . queue . Extend ( c , id )
}
// Update implements the rpc.Update function
2023-10-13 05:34:33 +00:00
func ( s * RPC ) Update ( _ context . Context , id string , state rpc . State ) error {
2023-06-27 16:01:18 +00:00
workflowID , err := strconv . ParseInt ( id , 10 , 64 )
2017-03-05 07:56:08 +00:00
if err != nil {
return err
}
2023-06-27 16:01:18 +00:00
workflow , err := s . store . WorkflowLoad ( workflowID )
2017-03-05 07:56:08 +00:00
if err != nil {
2023-06-27 16:01:18 +00:00
log . Error ( ) . Msgf ( "error: rpc.update: cannot find workflow with id %d: %s" , workflowID , err )
2017-03-05 07:56:08 +00:00
return err
}
2023-06-27 16:01:18 +00:00
currentPipeline , err := s . store . GetPipeline ( workflow . PipelineID )
2017-03-05 07:56:08 +00:00
if err != nil {
2023-06-27 16:01:18 +00:00
log . Error ( ) . Msgf ( "error: cannot find pipeline with id %d: %s" , workflow . PipelineID , err )
2017-04-03 09:34:37 +00:00
return err
}
2023-06-27 16:01:18 +00:00
step , err := s . store . StepChild ( currentPipeline , workflow . PID , state . Step )
2017-04-03 09:34:37 +00:00
if err != nil {
2022-10-28 15:38:53 +00:00
log . Error ( ) . Msgf ( "error: cannot find step with name %s: %s" , state . Step , err )
2017-03-05 07:56:08 +00:00
return err
}
2022-11-06 11:44:04 +00:00
repo , err := s . store . GetRepo ( currentPipeline . RepoID )
2017-03-05 07:56:08 +00:00
if err != nil {
2022-11-06 11:44:04 +00:00
log . Error ( ) . Msgf ( "error: cannot find repo with id %d: %s" , currentPipeline . RepoID , err )
2017-03-05 07:56:08 +00:00
return err
}
2023-06-27 16:01:18 +00:00
if err := pipeline . UpdateStepStatus ( s . store , step , state , currentPipeline . Started ) ; err != nil {
2022-10-28 15:38:53 +00:00
log . Error ( ) . Err ( err ) . Msg ( "rpc.update: cannot update step" )
2017-04-01 11:17:04 +00:00
}
2023-06-27 16:01:18 +00:00
if currentPipeline . Workflows , err = s . store . WorkflowGetTree ( currentPipeline ) ; err != nil {
2022-10-28 15:38:53 +00:00
log . Error ( ) . Err ( err ) . Msg ( "can not build tree from step list" )
2021-12-08 22:36:23 +00:00
return err
}
2017-04-01 11:17:04 +00:00
message := pubsub . Message {
Labels : map [ string ] string {
"repo" : repo . FullName ,
2021-11-22 11:55:13 +00:00
"private" : strconv . FormatBool ( repo . IsSCMPrivate ) ,
2017-04-01 11:17:04 +00:00
} ,
}
message . Data , _ = json . Marshal ( model . Event {
2022-10-18 01:24:12 +00:00
Repo : * repo ,
2022-11-06 11:44:04 +00:00
Pipeline : * currentPipeline ,
2017-04-01 11:17:04 +00:00
} )
2023-10-13 05:34:33 +00:00
s . pubsub . Publish ( message )
2017-03-05 07:56:08 +00:00
2017-04-01 11:17:04 +00:00
return nil
}
// Init implements the rpc.Init function
func ( s * RPC ) Init ( c context . Context , id string , state rpc . State ) error {
2022-10-28 15:38:53 +00:00
stepID , err := strconv . ParseInt ( id , 10 , 64 )
2017-04-01 11:17:04 +00:00
if err != nil {
return err
}
2023-06-27 16:01:18 +00:00
workflow , err := s . store . WorkflowLoad ( stepID )
2017-04-01 11:17:04 +00:00
if err != nil {
2022-10-28 15:38:53 +00:00
log . Error ( ) . Msgf ( "error: cannot find step with id %d: %s" , stepID , err )
2017-04-01 11:17:04 +00:00
return err
}
2023-03-21 13:10:43 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return err
2017-07-19 21:46:03 +00:00
}
2023-06-27 16:01:18 +00:00
workflow . AgentID = agent . ID
2017-04-01 11:17:04 +00:00
2023-06-27 16:01:18 +00:00
currentPipeline , err := s . store . GetPipeline ( workflow . PipelineID )
2017-04-01 11:17:04 +00:00
if err != nil {
2023-06-27 16:01:18 +00:00
log . Error ( ) . Msgf ( "error: cannot find pipeline with id %d: %s" , workflow . PipelineID , err )
2017-04-01 11:17:04 +00:00
return err
}
2022-11-06 11:44:04 +00:00
repo , err := s . store . GetRepo ( currentPipeline . RepoID )
2017-04-01 11:17:04 +00:00
if err != nil {
2022-11-06 11:44:04 +00:00
log . Error ( ) . Msgf ( "error: cannot find repo with id %d: %s" , currentPipeline . RepoID , err )
2017-04-01 11:17:04 +00:00
return err
}
2017-03-05 07:56:08 +00:00
2022-11-06 11:44:04 +00:00
if currentPipeline . Status == model . StatusPending {
if currentPipeline , err = pipeline . UpdateToStatusRunning ( s . store , * currentPipeline , state . Started ) ; err != nil {
log . Error ( ) . Msgf ( "error: init: cannot update build_id %d state: %s" , currentPipeline . ID , err )
2017-04-01 11:17:04 +00:00
}
2017-03-05 07:56:08 +00:00
}
2023-06-27 16:01:18 +00:00
s . updateForgeStatus ( c , repo , currentPipeline , workflow )
2023-05-14 12:18:43 +00:00
2017-04-01 11:17:04 +00:00
defer func ( ) {
2023-06-27 16:01:18 +00:00
currentPipeline . Workflows , _ = s . store . WorkflowGetTree ( currentPipeline )
2017-04-01 11:17:04 +00:00
message := pubsub . Message {
Labels : map [ string ] string {
"repo" : repo . FullName ,
2021-11-22 11:55:13 +00:00
"private" : strconv . FormatBool ( repo . IsSCMPrivate ) ,
2017-04-01 11:17:04 +00:00
} ,
}
message . Data , _ = json . Marshal ( model . Event {
2022-10-18 01:24:12 +00:00
Repo : * repo ,
2022-11-06 11:44:04 +00:00
Pipeline : * currentPipeline ,
2017-04-01 11:17:04 +00:00
} )
2023-10-13 05:34:33 +00:00
s . pubsub . Publish ( message )
2017-04-01 11:17:04 +00:00
} ( )
2017-03-05 07:56:08 +00:00
2023-06-27 16:01:18 +00:00
workflow , err = pipeline . UpdateWorkflowToStatusStarted ( s . store , * workflow , state )
2023-05-14 12:18:43 +00:00
if err != nil {
return err
}
2023-06-27 16:01:18 +00:00
s . updateForgeStatus ( c , repo , currentPipeline , workflow )
2023-05-14 12:18:43 +00:00
return nil
2017-04-01 11:17:04 +00:00
}
// Done implements the rpc.Done function
func ( s * RPC ) Done ( c context . Context , id string , state rpc . State ) error {
2022-11-23 14:35:24 +00:00
workflowID , err := strconv . ParseInt ( id , 10 , 64 )
2017-04-01 11:17:04 +00:00
if err != nil {
return err
}
2017-03-05 07:56:08 +00:00
2023-06-27 16:01:18 +00:00
workflow , err := s . store . WorkflowLoad ( workflowID )
2017-04-01 11:17:04 +00:00
if err != nil {
2023-06-06 07:52:08 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find step with id %d" , workflowID )
2017-04-01 11:17:04 +00:00
return err
}
2023-06-27 16:01:18 +00:00
workflow . Children , err = s . store . StepListFromWorkflowFind ( workflow )
if err != nil {
return err
}
2022-11-23 14:35:24 +00:00
currentPipeline , err := s . store . GetPipeline ( workflow . PipelineID )
2017-04-01 11:17:04 +00:00
if err != nil {
2023-06-06 07:52:08 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find pipeline with id %d" , workflow . PipelineID )
2017-04-01 11:17:04 +00:00
return err
}
2022-11-06 11:44:04 +00:00
repo , err := s . store . GetRepo ( currentPipeline . RepoID )
2017-04-01 11:17:04 +00:00
if err != nil {
2023-06-06 07:52:08 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find repo with id %d" , currentPipeline . RepoID )
2017-04-01 11:17:04 +00:00
return err
}
2023-06-06 07:52:08 +00:00
logger := log . With ( ) .
2022-01-31 14:38:39 +00:00
Str ( "repo_id" , fmt . Sprint ( repo . ID ) ) .
2023-06-06 07:52:08 +00:00
Str ( "pipeline_id" , fmt . Sprint ( currentPipeline . ID ) ) .
Str ( "workflow_id" , id ) . Logger ( )
logger . Trace ( ) . Msgf ( "gRPC Done with state: %#v" , state )
2022-01-31 14:38:39 +00:00
2023-06-27 16:01:18 +00:00
if workflow , err = pipeline . UpdateWorkflowStatusToDone ( s . store , * workflow , state ) ; err != nil {
2023-06-06 07:52:08 +00:00
logger . Error ( ) . Err ( err ) . Msgf ( "pipeline.UpdateStepStatusToDone: cannot update workflow state: %s" , err )
2019-09-15 05:29:45 +00:00
}
2019-06-16 08:54:31 +00:00
2019-06-16 13:56:32 +00:00
var queueErr error
2022-11-23 14:35:24 +00:00
if workflow . Failing ( ) {
2023-06-06 07:52:08 +00:00
queueErr = s . queue . Error ( c , id , fmt . Errorf ( "Step finished with exit code %d, %s" , state . ExitCode , state . Error ) )
2019-06-16 13:56:32 +00:00
} else {
2022-11-23 14:35:24 +00:00
queueErr = s . queue . Done ( c , id , workflow . State )
2019-06-16 13:56:32 +00:00
}
if queueErr != nil {
2023-06-06 07:52:08 +00:00
logger . Error ( ) . Err ( queueErr ) . Msg ( "queue.Done: cannot ack workflow" )
2019-06-16 08:54:31 +00:00
}
2023-06-27 16:01:18 +00:00
currentPipeline . Workflows , err = s . store . WorkflowGetTree ( currentPipeline )
2021-12-08 22:36:23 +00:00
if err != nil {
return err
}
2023-06-27 16:01:18 +00:00
s . completeChildrenIfParentCompleted ( workflow )
2019-06-16 08:54:31 +00:00
2023-06-27 16:01:18 +00:00
if ! model . IsThereRunningStage ( currentPipeline . Workflows ) {
if currentPipeline , err = pipeline . UpdateStatusToDone ( s . store , * currentPipeline , model . PipelineStatus ( currentPipeline . Workflows ) , workflow . Stopped ) ; err != nil {
2023-06-06 07:52:08 +00:00
logger . Error ( ) . Err ( err ) . Msgf ( "pipeline.UpdateStatusToDone: cannot update workflow final state" )
2019-06-16 08:54:31 +00:00
}
2019-06-17 08:48:40 +00:00
}
2022-11-23 14:35:24 +00:00
s . updateForgeStatus ( c , repo , currentPipeline , workflow )
2019-06-16 08:54:31 +00:00
2023-06-06 07:52:08 +00:00
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
go func ( ) {
2023-06-27 16:01:18 +00:00
for _ , wf := range currentPipeline . Workflows {
for _ , step := range wf . Children {
if err := s . logger . Close ( c , step . ID ) ; err != nil {
logger . Error ( ) . Err ( err ) . Msgf ( "done: cannot close log stream for step %d" , step . ID )
}
2023-06-06 07:52:08 +00:00
}
}
} ( )
2019-06-16 08:54:31 +00:00
2023-10-13 05:34:33 +00:00
if err := s . notify ( repo , currentPipeline ) ; err != nil {
2021-12-08 22:36:23 +00:00
return err
}
2019-06-16 08:54:31 +00:00
2022-11-06 11:44:04 +00:00
if currentPipeline . Status == model . StatusSuccess || currentPipeline . Status == model . StatusFailure {
s . pipelineCount . WithLabelValues ( repo . FullName , currentPipeline . Branch , string ( currentPipeline . Status ) , "total" ) . Inc ( )
s . pipelineTime . WithLabelValues ( repo . FullName , currentPipeline . Branch , string ( currentPipeline . Status ) , "total" ) . Set ( float64 ( currentPipeline . Finished - currentPipeline . Started ) )
2019-06-28 12:23:52 +00:00
}
2023-06-27 16:01:18 +00:00
if currentPipeline . IsMultiPipeline ( ) {
2022-11-23 14:35:24 +00:00
s . pipelineTime . WithLabelValues ( repo . FullName , currentPipeline . Branch , string ( workflow . State ) , workflow . Name ) . Set ( float64 ( workflow . Stopped - workflow . Started ) )
2019-06-28 12:23:52 +00:00
}
2019-06-16 08:54:31 +00:00
return nil
}
// Log implements the rpc.Log function
2023-06-06 07:52:08 +00:00
func ( s * RPC ) Log ( c context . Context , _logEntry * rpc . LogEntry ) error {
// convert rpc log_entry to model.log_entry
step , err := s . store . StepByUUID ( _logEntry . StepUUID )
if err != nil {
return fmt . Errorf ( "could not find step with uuid %s in store: %w" , _logEntry . StepUUID , err )
}
logEntry := & model . LogEntry {
StepID : step . ID ,
Time : _logEntry . Time ,
Line : _logEntry . Line ,
Data : [ ] byte ( _logEntry . Data ) ,
Type : model . LogEntryType ( _logEntry . Type ) ,
}
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
go func ( ) {
// write line to listening web clients
if err := s . logger . Write ( c , logEntry . StepID , logEntry ) ; err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "rpc server could not write to logger" )
}
} ( )
// make line persistent in database
return s . store . LogAppend ( logEntry )
2019-06-16 08:54:31 +00:00
}
2023-01-28 13:13:04 +00:00
func ( s * RPC ) RegisterAgent ( ctx context . Context , platform , backend , version string , capacity int32 ) ( int64 , error ) {
agent , err := s . getAgentFromContext ( ctx )
if err != nil {
return - 1 , err
}
agent . Backend = backend
agent . Platform = platform
agent . Capacity = capacity
agent . Version = version
err = s . store . AgentUpdate ( agent )
if err != nil {
return - 1 , err
}
return agent . ID , nil
}
func ( s * RPC ) ReportHealth ( ctx context . Context , status string ) error {
agent , err := s . getAgentFromContext ( ctx )
if err != nil {
return err
}
if status != "I am alive!" {
return errors . New ( "Are you alive?" )
}
agent . LastContact = time . Now ( ) . Unix ( )
return s . store . AgentUpdate ( agent )
}
2023-06-27 16:01:18 +00:00
func ( s * RPC ) completeChildrenIfParentCompleted ( completedWorkflow * model . Workflow ) {
for _ , c := range completedWorkflow . Children {
if c . Running ( ) {
if _ , err := pipeline . UpdateStepToStatusSkipped ( s . store , * c , completedWorkflow . Stopped ) ; err != nil {
log . Error ( ) . Msgf ( "error: done: cannot update step_id %d child state: %s" , c . ID , err )
2017-04-01 11:17:04 +00:00
}
2017-03-05 07:56:08 +00:00
}
2017-04-04 09:30:06 +00:00
}
2019-06-16 08:54:31 +00:00
}
2017-04-04 09:30:06 +00:00
2023-06-27 16:01:18 +00:00
func ( s * RPC ) updateForgeStatus ( ctx context . Context , repo * model . Repo , pipeline * model . Pipeline , workflow * model . Workflow ) {
2019-06-16 08:54:31 +00:00
user , err := s . store . GetUser ( repo . UserID )
2021-12-28 16:02:49 +00:00
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "can not get user with id '%d'" , repo . UserID )
return
}
2023-10-08 12:05:06 +00:00
forge . Refresh ( ctx , s . forge , s . store , user )
2021-12-28 16:02:49 +00:00
2022-10-28 15:38:53 +00:00
// only do status updates for parent steps
2023-06-27 16:01:18 +00:00
if workflow != nil {
err = s . forge . Status ( ctx , user , repo , pipeline , workflow )
2019-06-16 08:54:31 +00:00
if err != nil {
2022-10-18 01:24:12 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "error setting commit status for %s/%d" , repo . FullName , pipeline . Number )
2019-06-16 08:54:31 +00:00
}
2017-03-05 07:56:08 +00:00
}
2019-06-16 08:54:31 +00:00
}
2017-03-05 07:56:08 +00:00
2023-10-13 05:34:33 +00:00
func ( s * RPC ) notify ( repo * model . Repo , pipeline * model . Pipeline ) ( err error ) {
2017-04-01 11:17:04 +00:00
message := pubsub . Message {
Labels : map [ string ] string {
"repo" : repo . FullName ,
2021-11-22 11:55:13 +00:00
"private" : strconv . FormatBool ( repo . IsSCMPrivate ) ,
2017-04-01 11:17:04 +00:00
} ,
}
2017-03-05 07:56:08 +00:00
message . Data , _ = json . Marshal ( model . Event {
2022-10-18 01:24:12 +00:00
Repo : * repo ,
Pipeline : * pipeline ,
2017-03-05 07:56:08 +00:00
} )
2023-10-13 05:34:33 +00:00
s . pubsub . Publish ( message )
2021-12-08 22:36:23 +00:00
return nil
2017-03-05 07:56:08 +00:00
}
2023-01-28 13:13:04 +00:00
func ( s * RPC ) getAgentFromContext ( ctx context . Context ) ( * model . Agent , error ) {
2023-09-01 15:02:21 +00:00
md , ok := grpcMetadata . FromIncomingContext ( ctx )
2023-01-28 13:13:04 +00:00
if ! ok {
return nil , errors . New ( "metadata is not provided" )
}
values := md [ "agent_id" ]
if len ( values ) == 0 {
return nil , errors . New ( "agent_id is not provided" )
}
_agentID := values [ 0 ]
agentID , err := strconv . ParseInt ( _agentID , 10 , 64 )
if err != nil {
return nil , errors . New ( "agent_id is not a valid integer" )
}
return s . store . AgentFind ( agentID )
}