2022-10-18 01:24:12 +00:00
// Copyright 2022 Woodpecker Authors
2021-06-28 17:28:18 +00:00
// Copyright 2021 Informatyka Boguslawski sp. z o.o. sp.k., http://www.ib.pl/
2022-10-18 01:24:12 +00:00
// Copyright 2018 Drone.IO Inc.
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// http://www.apache.org/licenses/LICENSE-2.0
2018-03-21 13:02:17 +00:00
//
2018-02-19 22:24:10 +00:00
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-09-22 18:48:01 +00:00
package grpc
2017-03-05 07:56:08 +00:00
import (
"context"
"encoding/json"
2023-01-28 13:13:04 +00:00
"errors"
2017-04-12 17:56:30 +00:00
"fmt"
2017-03-05 07:56:08 +00:00
"strconv"
2023-01-28 13:13:04 +00:00
"time"
2017-03-05 07:56:08 +00:00
2021-06-22 10:34:35 +00:00
"github.com/prometheus/client_golang/prometheus"
2023-04-30 01:40:13 +00:00
"github.com/rs/zerolog/log"
2021-10-12 07:25:13 +00:00
grpcMetadata "google.golang.org/grpc/metadata"
2024-12-22 09:44:34 +00:00
"go.woodpecker-ci.org/woodpecker/v3/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v3/server"
"go.woodpecker-ci.org/woodpecker/v3/server/forge"
"go.woodpecker-ci.org/woodpecker/v3/server/logging"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pipeline"
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v3/server/queue"
"go.woodpecker-ci.org/woodpecker/v3/server/store"
2017-03-05 07:56:08 +00:00
)
2024-09-10 12:52:50 +00:00
// updateAgentLastWorkDelay the delay before the LastWork info should be updated.
2024-08-17 08:18:47 +00:00
const updateAgentLastWorkDelay = time . Minute
2017-03-05 07:56:08 +00:00
type RPC struct {
2022-10-18 01:24:12 +00:00
queue queue . Queue
2023-10-13 05:34:33 +00:00
pubsub * pubsub . Publisher
2022-10-18 01:24:12 +00:00
logger logging . Log
store store . Store
pipelineTime * prometheus . GaugeVec
pipelineCount * prometheus . CounterVec
2017-03-05 07:56:08 +00:00
}
2024-07-01 09:20:55 +00:00
// Next blocks until it provides the next workflow to execute.
2023-08-21 16:30:19 +00:00
func ( s * RPC ) Next ( c context . Context , agentFilter rpc . Filter ) ( * rpc . Workflow , error ) {
2023-12-19 13:17:36 +00:00
if hostname , err := s . getHostnameFromContext ( c ) ; err == nil {
log . Debug ( ) . Msgf ( "agent connected: %s: polling" , hostname )
2017-07-20 16:21:15 +00:00
}
2024-02-16 09:04:13 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return nil , err
}
if agent . NoSchedule {
time . Sleep ( 1 * time . Second )
return nil , nil
}
2023-01-30 19:18:48 +00:00
2024-09-30 11:33:16 +00:00
agentServerLabels , err := agent . GetServerLabels ( )
if err != nil {
return nil , err
}
// enforce labels from server by overwriting agent labels
for k , v := range agentServerLabels {
agentFilter . Labels [ k ] = v
}
log . Trace ( ) . Msgf ( "Agent %s[%d] tries to pull task with labels: %v" , agent . Name , agent . ID , agentFilter . Labels )
filterFn := createFilterFunc ( agentFilter )
2024-02-16 09:04:13 +00:00
for {
// poll blocks until a task is available or the context is canceled / worker is kicked
task , err := s . queue . Poll ( c , agent . ID , filterFn )
if err != nil || task == nil {
2019-06-16 13:56:32 +00:00
return nil , err
}
if task . ShouldRun ( ) {
2023-08-21 16:30:19 +00:00
workflow := new ( rpc . Workflow )
err = json . Unmarshal ( task . Data , workflow )
return workflow , err
2021-12-01 13:22:06 +00:00
}
2023-01-30 19:18:48 +00:00
2024-02-16 09:04:13 +00:00
// task should not run, so mark it as done
2024-07-01 09:20:55 +00:00
if err := s . Done ( c , task . ID , rpc . WorkflowState { } ) ; err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "marking workflow task '%s' as done failed" , task . ID )
2019-06-16 13:56:32 +00:00
}
}
2017-03-05 07:56:08 +00:00
}
2024-07-01 09:20:55 +00:00
// Wait blocks until the workflow with the given ID is done.
func ( s * RPC ) Wait ( c context . Context , workflowID string ) error {
2024-09-30 11:33:16 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return err
}
if err := s . checkAgentPermissionByWorkflow ( c , agent , workflowID , nil , nil ) ; err != nil {
return err
}
2024-07-01 09:20:55 +00:00
return s . queue . Wait ( c , workflowID )
2017-03-05 07:56:08 +00:00
}
2024-07-01 09:20:55 +00:00
// Extend extends the lease for the workflow with the given ID.
func ( s * RPC ) Extend ( c context . Context , workflowID string ) error {
2024-07-01 17:34:47 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return err
}
2024-08-14 19:53:35 +00:00
err = s . updateAgentLastWork ( agent )
2024-07-01 17:34:47 +00:00
if err != nil {
return err
}
2024-09-30 11:33:16 +00:00
if err := s . checkAgentPermissionByWorkflow ( c , agent , workflowID , nil , nil ) ; err != nil {
return err
}
return s . queue . Extend ( c , agent . ID , workflowID )
2017-03-05 07:56:08 +00:00
}
2024-07-01 09:20:55 +00:00
// Update updates the state of a step.
2024-09-30 11:33:16 +00:00
func ( s * RPC ) Update ( c context . Context , strWorkflowID string , state rpc . StepState ) error {
2024-07-01 09:20:55 +00:00
workflowID , err := strconv . ParseInt ( strWorkflowID , 10 , 64 )
2017-03-05 07:56:08 +00:00
if err != nil {
return err
}
2023-06-27 16:01:18 +00:00
workflow , err := s . store . WorkflowLoad ( workflowID )
2017-03-05 07:56:08 +00:00
if err != nil {
2024-01-09 14:39:09 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "rpc.update: cannot find workflow with id %d" , workflowID )
2017-03-05 07:56:08 +00:00
return err
}
2023-06-27 16:01:18 +00:00
currentPipeline , err := s . store . GetPipeline ( workflow . PipelineID )
2017-03-05 07:56:08 +00:00
if err != nil {
2024-01-09 14:39:09 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find pipeline with id %d" , workflow . PipelineID )
2017-04-03 09:34:37 +00:00
return err
}
2024-09-30 11:33:16 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return err
}
2024-01-09 14:39:09 +00:00
step , err := s . store . StepByUUID ( state . StepUUID )
2017-04-03 09:34:37 +00:00
if err != nil {
2024-01-09 14:39:09 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find step with uuid %s" , state . StepUUID )
2017-03-05 07:56:08 +00:00
return err
}
2024-01-09 14:39:09 +00:00
if step . PipelineID != currentPipeline . ID {
msg := fmt . Sprintf ( "agent returned status with step uuid '%s' which does not belong to current pipeline" , state . StepUUID )
log . Error ( ) .
Int64 ( "stepPipelineID" , step . PipelineID ) .
Int64 ( "currentPipelineID" , currentPipeline . ID ) .
Msg ( msg )
2024-08-14 19:37:05 +00:00
return errors . New ( msg )
2024-01-09 14:39:09 +00:00
}
2022-11-06 11:44:04 +00:00
repo , err := s . store . GetRepo ( currentPipeline . RepoID )
2017-03-05 07:56:08 +00:00
if err != nil {
2024-01-09 14:39:09 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find repo with id %d" , currentPipeline . RepoID )
2017-03-05 07:56:08 +00:00
return err
}
2024-09-30 11:33:16 +00:00
// check before agent can alter some state
if err := s . checkAgentPermissionByWorkflow ( c , agent , strWorkflowID , currentPipeline , repo ) ; err != nil {
return err
}
2024-01-09 17:34:55 +00:00
if err := pipeline . UpdateStepStatus ( s . store , step , state ) ; err != nil {
2022-10-28 15:38:53 +00:00
log . Error ( ) . Err ( err ) . Msg ( "rpc.update: cannot update step" )
2017-04-01 11:17:04 +00:00
}
2023-06-27 16:01:18 +00:00
if currentPipeline . Workflows , err = s . store . WorkflowGetTree ( currentPipeline ) ; err != nil {
2024-01-10 21:56:42 +00:00
log . Error ( ) . Err ( err ) . Msg ( "cannot build tree from step list" )
2021-12-08 22:36:23 +00:00
return err
}
2017-04-01 11:17:04 +00:00
message := pubsub . Message {
Labels : map [ string ] string {
"repo" : repo . FullName ,
2021-11-22 11:55:13 +00:00
"private" : strconv . FormatBool ( repo . IsSCMPrivate ) ,
2017-04-01 11:17:04 +00:00
} ,
}
2024-01-09 20:35:37 +00:00
message . Data , err = json . Marshal ( model . Event {
2022-10-18 01:24:12 +00:00
Repo : * repo ,
2022-11-06 11:44:04 +00:00
Pipeline : * currentPipeline ,
2017-04-01 11:17:04 +00:00
} )
2024-01-09 20:35:37 +00:00
if err != nil {
return err
}
2023-10-13 05:34:33 +00:00
s . pubsub . Publish ( message )
2017-03-05 07:56:08 +00:00
2017-04-01 11:17:04 +00:00
return nil
}
2024-05-13 20:58:21 +00:00
// Init implements the rpc.Init function.
2024-07-01 09:20:55 +00:00
func ( s * RPC ) Init ( c context . Context , strWorkflowID string , state rpc . WorkflowState ) error {
workflowID , err := strconv . ParseInt ( strWorkflowID , 10 , 64 )
2017-04-01 11:17:04 +00:00
if err != nil {
return err
}
2024-07-01 09:20:55 +00:00
workflow , err := s . store . WorkflowLoad ( workflowID )
2017-04-01 11:17:04 +00:00
if err != nil {
2024-07-01 09:20:55 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find workflow with id %d" , workflowID )
2017-04-01 11:17:04 +00:00
return err
}
2023-03-21 13:10:43 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return err
2017-07-19 21:46:03 +00:00
}
2024-09-30 11:33:16 +00:00
2023-06-27 16:01:18 +00:00
workflow . AgentID = agent . ID
2017-04-01 11:17:04 +00:00
2023-06-27 16:01:18 +00:00
currentPipeline , err := s . store . GetPipeline ( workflow . PipelineID )
2017-04-01 11:17:04 +00:00
if err != nil {
2024-01-11 18:17:07 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find pipeline with id %d" , workflow . PipelineID )
2017-04-01 11:17:04 +00:00
return err
}
2022-11-06 11:44:04 +00:00
repo , err := s . store . GetRepo ( currentPipeline . RepoID )
2017-04-01 11:17:04 +00:00
if err != nil {
2024-01-11 18:17:07 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find repo with id %d" , currentPipeline . RepoID )
2017-04-01 11:17:04 +00:00
return err
}
2017-03-05 07:56:08 +00:00
2024-09-30 11:33:16 +00:00
// check before agent can alter some state
if err := s . checkAgentPermissionByWorkflow ( c , agent , strWorkflowID , currentPipeline , repo ) ; err != nil {
return err
}
2022-11-06 11:44:04 +00:00
if currentPipeline . Status == model . StatusPending {
if currentPipeline , err = pipeline . UpdateToStatusRunning ( s . store , * currentPipeline , state . Started ) ; err != nil {
2024-07-01 09:20:55 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "init: cannot update pipeline %d state" , currentPipeline . ID )
2017-04-01 11:17:04 +00:00
}
2017-03-05 07:56:08 +00:00
}
2023-06-27 16:01:18 +00:00
s . updateForgeStatus ( c , repo , currentPipeline , workflow )
2023-05-14 12:18:43 +00:00
2017-04-01 11:17:04 +00:00
defer func ( ) {
2023-06-27 16:01:18 +00:00
currentPipeline . Workflows , _ = s . store . WorkflowGetTree ( currentPipeline )
2017-04-01 11:17:04 +00:00
message := pubsub . Message {
Labels : map [ string ] string {
"repo" : repo . FullName ,
2021-11-22 11:55:13 +00:00
"private" : strconv . FormatBool ( repo . IsSCMPrivate ) ,
2017-04-01 11:17:04 +00:00
} ,
}
2024-01-09 20:35:37 +00:00
message . Data , err = json . Marshal ( model . Event {
2022-10-18 01:24:12 +00:00
Repo : * repo ,
2022-11-06 11:44:04 +00:00
Pipeline : * currentPipeline ,
2017-04-01 11:17:04 +00:00
} )
2024-01-09 20:35:37 +00:00
if err != nil {
2024-01-10 19:57:12 +00:00
log . Error ( ) . Err ( err ) . Msg ( "could not marshal JSON" )
2024-01-09 20:35:37 +00:00
return
}
2023-10-13 05:34:33 +00:00
s . pubsub . Publish ( message )
2017-04-01 11:17:04 +00:00
} ( )
2017-03-05 07:56:08 +00:00
2024-07-01 09:20:55 +00:00
workflow , err = pipeline . UpdateWorkflowStatusToRunning ( s . store , * workflow , state )
2023-05-14 12:18:43 +00:00
if err != nil {
return err
}
2023-06-27 16:01:18 +00:00
s . updateForgeStatus ( c , repo , currentPipeline , workflow )
2024-07-01 09:20:55 +00:00
2024-08-14 19:53:35 +00:00
return s . updateAgentLastWork ( agent )
2017-04-01 11:17:04 +00:00
}
2024-07-01 09:20:55 +00:00
// Done marks the workflow with the given ID as done.
func ( s * RPC ) Done ( c context . Context , strWorkflowID string , state rpc . WorkflowState ) error {
workflowID , err := strconv . ParseInt ( strWorkflowID , 10 , 64 )
2017-04-01 11:17:04 +00:00
if err != nil {
return err
}
2017-03-05 07:56:08 +00:00
2023-06-27 16:01:18 +00:00
workflow , err := s . store . WorkflowLoad ( workflowID )
2017-04-01 11:17:04 +00:00
if err != nil {
2024-07-01 09:20:55 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find workflow with id %d" , workflowID )
2017-04-01 11:17:04 +00:00
return err
}
2023-06-27 16:01:18 +00:00
workflow . Children , err = s . store . StepListFromWorkflowFind ( workflow )
if err != nil {
return err
}
2022-11-23 14:35:24 +00:00
currentPipeline , err := s . store . GetPipeline ( workflow . PipelineID )
2017-04-01 11:17:04 +00:00
if err != nil {
2023-06-06 07:52:08 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find pipeline with id %d" , workflow . PipelineID )
2017-04-01 11:17:04 +00:00
return err
}
2022-11-06 11:44:04 +00:00
repo , err := s . store . GetRepo ( currentPipeline . RepoID )
2017-04-01 11:17:04 +00:00
if err != nil {
2023-06-06 07:52:08 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot find repo with id %d" , currentPipeline . RepoID )
2017-04-01 11:17:04 +00:00
return err
}
2024-09-30 11:33:16 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return err
}
// check before agent can alter some state
if err := s . checkAgentPermissionByWorkflow ( c , agent , strWorkflowID , currentPipeline , repo ) ; err != nil {
return err
}
2023-06-06 07:52:08 +00:00
logger := log . With ( ) .
2022-01-31 14:38:39 +00:00
Str ( "repo_id" , fmt . Sprint ( repo . ID ) ) .
2023-06-06 07:52:08 +00:00
Str ( "pipeline_id" , fmt . Sprint ( currentPipeline . ID ) ) .
2024-07-01 09:20:55 +00:00
Str ( "workflow_id" , strWorkflowID ) . Logger ( )
2023-06-06 07:52:08 +00:00
logger . Trace ( ) . Msgf ( "gRPC Done with state: %#v" , state )
2022-01-31 14:38:39 +00:00
2023-06-27 16:01:18 +00:00
if workflow , err = pipeline . UpdateWorkflowStatusToDone ( s . store , * workflow , state ) ; err != nil {
2024-07-01 09:20:55 +00:00
logger . Error ( ) . Err ( err ) . Msgf ( "pipeline.UpdateWorkflowStatusToDone: cannot update workflow state: %s" , err )
2019-09-15 05:29:45 +00:00
}
2019-06-16 08:54:31 +00:00
2019-06-16 13:56:32 +00:00
var queueErr error
2022-11-23 14:35:24 +00:00
if workflow . Failing ( ) {
2024-07-01 09:20:55 +00:00
queueErr = s . queue . Error ( c , strWorkflowID , fmt . Errorf ( "workflow finished with error %s" , state . Error ) )
2019-06-16 13:56:32 +00:00
} else {
2024-07-01 09:20:55 +00:00
queueErr = s . queue . Done ( c , strWorkflowID , workflow . State )
2019-06-16 13:56:32 +00:00
}
if queueErr != nil {
2023-06-06 07:52:08 +00:00
logger . Error ( ) . Err ( queueErr ) . Msg ( "queue.Done: cannot ack workflow" )
2019-06-16 08:54:31 +00:00
}
2023-06-27 16:01:18 +00:00
currentPipeline . Workflows , err = s . store . WorkflowGetTree ( currentPipeline )
2021-12-08 22:36:23 +00:00
if err != nil {
return err
}
2023-06-27 16:01:18 +00:00
s . completeChildrenIfParentCompleted ( workflow )
2019-06-16 08:54:31 +00:00
2023-06-27 16:01:18 +00:00
if ! model . IsThereRunningStage ( currentPipeline . Workflows ) {
2024-07-01 09:20:55 +00:00
if currentPipeline , err = pipeline . UpdateStatusToDone ( s . store , * currentPipeline , model . PipelineStatus ( currentPipeline . Workflows ) , workflow . Finished ) ; err != nil {
logger . Error ( ) . Err ( err ) . Msgf ( "pipeline.UpdateStatusToDone: cannot update workflows final state" )
2019-06-16 08:54:31 +00:00
}
2019-06-17 08:48:40 +00:00
}
2022-11-23 14:35:24 +00:00
s . updateForgeStatus ( c , repo , currentPipeline , workflow )
2019-06-16 08:54:31 +00:00
2023-06-06 07:52:08 +00:00
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
go func ( ) {
2023-10-24 08:44:36 +00:00
for _ , step := range workflow . Children {
if err := s . logger . Close ( c , step . ID ) ; err != nil {
logger . Error ( ) . Err ( err ) . Msgf ( "done: cannot close log stream for step %d" , step . ID )
2023-06-06 07:52:08 +00:00
}
}
} ( )
2019-06-16 08:54:31 +00:00
2023-10-13 05:34:33 +00:00
if err := s . notify ( repo , currentPipeline ) ; err != nil {
2021-12-08 22:36:23 +00:00
return err
}
2019-06-16 08:54:31 +00:00
2022-11-06 11:44:04 +00:00
if currentPipeline . Status == model . StatusSuccess || currentPipeline . Status == model . StatusFailure {
s . pipelineCount . WithLabelValues ( repo . FullName , currentPipeline . Branch , string ( currentPipeline . Status ) , "total" ) . Inc ( )
s . pipelineTime . WithLabelValues ( repo . FullName , currentPipeline . Branch , string ( currentPipeline . Status ) , "total" ) . Set ( float64 ( currentPipeline . Finished - currentPipeline . Started ) )
2019-06-28 12:23:52 +00:00
}
2023-06-27 16:01:18 +00:00
if currentPipeline . IsMultiPipeline ( ) {
2024-07-01 09:20:55 +00:00
s . pipelineTime . WithLabelValues ( repo . FullName , currentPipeline . Branch , string ( workflow . State ) , workflow . Name ) . Set ( float64 ( workflow . Finished - workflow . Started ) )
2019-06-28 12:23:52 +00:00
}
2024-08-14 19:53:35 +00:00
return s . updateAgentLastWork ( agent )
2019-06-16 08:54:31 +00:00
}
2024-07-01 09:20:55 +00:00
// Log writes a log entry to the database and publishes it to the pubsub.
2024-09-18 14:29:56 +00:00
// An explicit stepUUID makes it obvious that all entries must come from the same step.
func ( s * RPC ) Log ( c context . Context , stepUUID string , rpcLogEntries [ ] * rpc . LogEntry ) error {
step , err := s . store . StepByUUID ( stepUUID )
2023-06-06 07:52:08 +00:00
if err != nil {
2024-09-18 14:29:56 +00:00
return fmt . Errorf ( "could not find step with uuid %s in store: %w" , stepUUID , err )
2023-06-06 07:52:08 +00:00
}
2024-07-01 17:34:47 +00:00
agent , err := s . getAgentFromContext ( c )
if err != nil {
return err
}
2024-08-14 19:53:35 +00:00
2024-09-30 11:33:16 +00:00
currentPipeline , err := s . store . GetPipeline ( step . PipelineID )
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "cannot find pipeline with id %d" , step . PipelineID )
return err
}
// check before agent can alter some state
if err := s . checkAgentPermissionByWorkflow ( c , agent , "" , currentPipeline , nil ) ; err != nil {
return err
}
2024-08-14 19:53:35 +00:00
err = s . updateAgentLastWork ( agent )
if err != nil {
2024-07-01 17:34:47 +00:00
return err
}
2024-09-18 14:29:56 +00:00
var logEntries [ ] * model . LogEntry
for _ , rpcLogEntry := range rpcLogEntries {
if rpcLogEntry . StepUUID != stepUUID {
return fmt . Errorf ( "expected step UUID %s, got %s" , stepUUID , rpcLogEntry . StepUUID )
}
logEntries = append ( logEntries , & model . LogEntry {
StepID : step . ID ,
Time : rpcLogEntry . Time ,
Line : rpcLogEntry . Line ,
Data : rpcLogEntry . Data ,
Type : model . LogEntryType ( rpcLogEntry . Type ) ,
} )
}
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
go func ( ) {
// write line to listening web clients
if err := s . logger . Write ( c , step . ID , logEntries ) ; err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "rpc server could not write to logger" )
}
} ( )
if err = server . Config . Services . LogStore . LogAppend ( step , logEntries ) ; err != nil {
log . Error ( ) . Err ( err ) . Msg ( "could not store log entries" )
}
return nil
2019-06-16 08:54:31 +00:00
}
2024-10-06 15:13:41 +00:00
func ( s * RPC ) RegisterAgent ( ctx context . Context , info rpc . AgentInfo ) ( int64 , error ) {
2023-01-28 13:13:04 +00:00
agent , err := s . getAgentFromContext ( ctx )
if err != nil {
return - 1 , err
}
2023-12-19 13:17:36 +00:00
if agent . Name == "" {
if hostname , err := s . getHostnameFromContext ( ctx ) ; err == nil {
agent . Name = hostname
}
}
2024-10-06 15:13:41 +00:00
agent . Backend = info . Backend
agent . Platform = info . Platform
agent . Capacity = int32 ( info . Capacity )
agent . Version = info . Version
agent . CustomLabels = info . CustomLabels
2023-01-28 13:13:04 +00:00
err = s . store . AgentUpdate ( agent )
if err != nil {
return - 1 , err
}
return agent . ID , nil
}
2024-07-01 09:20:55 +00:00
// UnregisterAgent removes the agent from the database.
2023-11-01 23:53:47 +00:00
func ( s * RPC ) UnregisterAgent ( ctx context . Context ) error {
agent , err := s . getAgentFromContext ( ctx )
2023-12-24 11:14:30 +00:00
if ! agent . IsSystemAgent ( ) {
2023-11-24 17:19:38 +00:00
// registered with individual agent token -> do not unregister
return nil
}
2024-07-01 09:20:55 +00:00
log . Debug ( ) . Msgf ( "un-registering agent with ID %d" , agent . ID )
2023-11-01 23:53:47 +00:00
if err != nil {
return err
}
err = s . store . AgentDelete ( agent )
return err
}
2023-01-28 13:13:04 +00:00
func ( s * RPC ) ReportHealth ( ctx context . Context , status string ) error {
agent , err := s . getAgentFromContext ( ctx )
if err != nil {
return err
}
if status != "I am alive!" {
2024-01-10 21:56:42 +00:00
//nolint:stylecheck
2023-01-28 13:13:04 +00:00
return errors . New ( "Are you alive?" )
}
agent . LastContact = time . Now ( ) . Unix ( )
return s . store . AgentUpdate ( agent )
}
2024-09-30 11:33:16 +00:00
func ( s * RPC ) checkAgentPermissionByWorkflow ( _ context . Context , agent * model . Agent , strWorkflowID string , pipeline * model . Pipeline , repo * model . Repo ) error {
var err error
if repo == nil && pipeline == nil {
workflowID , err := strconv . ParseInt ( strWorkflowID , 10 , 64 )
if err != nil {
return err
}
workflow , err := s . store . WorkflowLoad ( workflowID )
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "cannot find workflow with id %d" , workflowID )
return err
}
pipeline , err = s . store . GetPipeline ( workflow . PipelineID )
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "cannot find pipeline with id %d" , workflow . PipelineID )
return err
}
}
if repo == nil {
repo , err = s . store . GetRepo ( pipeline . RepoID )
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "cannot find repo with id %d" , pipeline . RepoID )
return err
}
}
if agent . CanAccessRepo ( repo ) {
return nil
}
msg := fmt . Sprintf ( "agent '%d' is not allowed to interact with repo[%d] '%s'" , agent . ID , repo . ID , repo . FullName )
log . Error ( ) . Int64 ( "repoId" , repo . ID ) . Msg ( msg )
return errors . New ( msg )
}
2023-06-27 16:01:18 +00:00
func ( s * RPC ) completeChildrenIfParentCompleted ( completedWorkflow * model . Workflow ) {
for _ , c := range completedWorkflow . Children {
if c . Running ( ) {
2024-07-01 09:20:55 +00:00
if _ , err := pipeline . UpdateStepToStatusSkipped ( s . store , * c , completedWorkflow . Finished ) ; err != nil {
2024-01-11 18:17:07 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "done: cannot update step_id %d child state" , c . ID )
2017-04-01 11:17:04 +00:00
}
2017-03-05 07:56:08 +00:00
}
2017-04-04 09:30:06 +00:00
}
2019-06-16 08:54:31 +00:00
}
2017-04-04 09:30:06 +00:00
2023-06-27 16:01:18 +00:00
func ( s * RPC ) updateForgeStatus ( ctx context . Context , repo * model . Repo , pipeline * model . Pipeline , workflow * model . Workflow ) {
2019-06-16 08:54:31 +00:00
user , err := s . store . GetUser ( repo . UserID )
2021-12-28 16:02:49 +00:00
if err != nil {
2024-01-10 21:56:42 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "cannot get user with id '%d'" , repo . UserID )
2021-12-28 16:02:49 +00:00
return
}
2024-04-16 06:04:55 +00:00
_forge , err := server . Config . Services . Manager . ForgeFromRepo ( repo )
if err != nil {
log . Error ( ) . Err ( err ) . Msgf ( "can not get forge for repo '%s'" , repo . FullName )
return
}
forge . Refresh ( ctx , _forge , s . store , user )
2021-12-28 16:02:49 +00:00
2022-10-28 15:38:53 +00:00
// only do status updates for parent steps
2023-06-27 16:01:18 +00:00
if workflow != nil {
2024-04-16 06:04:55 +00:00
err = _forge . Status ( ctx , user , repo , pipeline , workflow )
2019-06-16 08:54:31 +00:00
if err != nil {
2022-10-18 01:24:12 +00:00
log . Error ( ) . Err ( err ) . Msgf ( "error setting commit status for %s/%d" , repo . FullName , pipeline . Number )
2019-06-16 08:54:31 +00:00
}
2017-03-05 07:56:08 +00:00
}
2019-06-16 08:54:31 +00:00
}
2017-03-05 07:56:08 +00:00
2023-10-13 05:34:33 +00:00
func ( s * RPC ) notify ( repo * model . Repo , pipeline * model . Pipeline ) ( err error ) {
2017-04-01 11:17:04 +00:00
message := pubsub . Message {
Labels : map [ string ] string {
"repo" : repo . FullName ,
2021-11-22 11:55:13 +00:00
"private" : strconv . FormatBool ( repo . IsSCMPrivate ) ,
2017-04-01 11:17:04 +00:00
} ,
}
2024-01-09 20:35:37 +00:00
message . Data , err = json . Marshal ( model . Event {
2022-10-18 01:24:12 +00:00
Repo : * repo ,
Pipeline : * pipeline ,
2017-03-05 07:56:08 +00:00
} )
2024-01-09 20:35:37 +00:00
if err != nil {
return err
}
2023-10-13 05:34:33 +00:00
s . pubsub . Publish ( message )
2021-12-08 22:36:23 +00:00
return nil
2017-03-05 07:56:08 +00:00
}
2023-01-28 13:13:04 +00:00
func ( s * RPC ) getAgentFromContext ( ctx context . Context ) ( * model . Agent , error ) {
2023-09-01 15:02:21 +00:00
md , ok := grpcMetadata . FromIncomingContext ( ctx )
2023-01-28 13:13:04 +00:00
if ! ok {
return nil , errors . New ( "metadata is not provided" )
}
values := md [ "agent_id" ]
if len ( values ) == 0 {
return nil , errors . New ( "agent_id is not provided" )
}
_agentID := values [ 0 ]
agentID , err := strconv . ParseInt ( _agentID , 10 , 64 )
if err != nil {
return nil , errors . New ( "agent_id is not a valid integer" )
}
return s . store . AgentFind ( agentID )
}
2023-12-19 13:17:36 +00:00
func ( s * RPC ) getHostnameFromContext ( ctx context . Context ) ( string , error ) {
metadata , ok := grpcMetadata . FromIncomingContext ( ctx )
if ok {
hostname , ok := metadata [ "hostname" ]
if ok && len ( hostname ) != 0 {
return hostname [ 0 ] , nil
}
}
return "" , errors . New ( "no hostname in metadata" )
}
2024-08-14 19:53:35 +00:00
func ( s * RPC ) updateAgentLastWork ( agent * model . Agent ) error {
2024-09-18 16:26:18 +00:00
// only update agent.LastWork if not recently updated
if time . Unix ( agent . LastWork , 0 ) . Add ( updateAgentLastWorkDelay ) . After ( time . Now ( ) ) {
2024-08-14 19:53:35 +00:00
return nil
}
agent . LastWork = time . Now ( ) . Unix ( )
if err := s . store . AgentUpdate ( agent ) ; err != nil {
return err
}
return nil
}