/* * * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpc import ( "context" "fmt" "strings" "sync" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/resolver" ) type ccbMode int const ( ccbModeActive = iota ccbModeIdle ccbModeClosed ccbModeExitingIdle ) // ccBalancerWrapper sits between the ClientConn and the Balancer. // // ccBalancerWrapper implements methods corresponding to the ones on the // balancer.Balancer interface. The ClientConn is free to call these methods // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn // to the Balancer happen synchronously and in order. // // ccBalancerWrapper also implements the balancer.ClientConn interface and is // passed to the Balancer implementations. It invokes unexported methods on the // ClientConn to handle these calls from the Balancer. // // It uses the gracefulswitch.Balancer internally to ensure that balancer // switches happen in a graceful manner. type ccBalancerWrapper struct { // The following fields are initialized when the wrapper is created and are // read-only afterwards, and therefore can be accessed without a mutex. cc *ClientConn opts balancer.BuildOptions // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a // mutually exclusive manner as they are scheduled in the serializer. Fields // accessed *only* in these serializer callbacks, can therefore be accessed // without a mutex. balancer *gracefulswitch.Balancer curBalancerName string // mu guards access to the below fields. Access to the serializer and its // cancel function needs to be mutex protected because they are overwritten // when the wrapper exits idle mode. mu sync.Mutex serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. mode ccbMode // Tracks the current mode of the wrapper. } // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer // is not created until the switchTo() method is invoked. func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { ctx, cancel := context.WithCancel(context.Background()) ccb := &ccBalancerWrapper{ cc: cc, opts: bopts, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) return ccb } // updateClientConnState is invoked by grpc to push a ClientConnState update to // the underlying balancer. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { ccb.mu.Lock() errCh := make(chan error, 1) // Here and everywhere else where Schedule() is called, it is done with the // lock held. But the lock guards only the scheduling part. The actual // callback is called asynchronously without the lock being held. ok := ccb.serializer.Schedule(func(_ context.Context) { errCh <- ccb.balancer.UpdateClientConnState(*ccs) }) if !ok { // If we are unable to schedule a function with the serializer, it // indicates that it has been closed. A serializer is only closed when // the wrapper is closed or is in idle. ccb.mu.Unlock() return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") } ccb.mu.Unlock() // We get here only if the above call to Schedule succeeds, in which case it // is guaranteed that the scheduled function will run. Therefore it is safe // to block on this channel. err := <-errCh if logger.V(2) && err != nil { logger.Infof("error from balancer.UpdateClientConnState: %v", err) } return err } // updateSubConnState is invoked by grpc to push a subConn state update to the // underlying balancer. func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { // Even though it is optional for balancers, gracefulswitch ensures // opts.StateListener is set, so this cannot ever be nil. sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) }) ccb.mu.Unlock() } func (ccb *ccBalancerWrapper) resolverError(err error) { ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { ccb.balancer.ResolverError(err) }) ccb.mu.Unlock() } // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the // LB policy identified by name. // // ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the // first good update from the name resolver, it determines the LB policy to use // and invokes the switchTo() method. Upon receipt of every subsequent update // from the name resolver, it invokes this method. // // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { // TODO: Other languages use case-sensitive balancer registries. We should // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. if strings.EqualFold(ccb.curBalancerName, name) { return } ccb.buildLoadBalancingPolicy(name) }) ccb.mu.Unlock() } // buildLoadBalancingPolicy performs the following: // - retrieve a balancer builder for the given name. Use the default LB // policy, pick_first, if no LB policy with name is found in the registry. // - instruct the gracefulswitch balancer to switch to the above builder. This // will actually build the new balancer. // - update the `curBalancerName` field // // Must be called from a serializer callback. func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { builder := balancer.Get(name) if builder == nil { channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) builder = newPickfirstBuilder() } else { channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) } if err := ccb.balancer.SwitchTo(builder); err != nil { channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) return } ccb.curBalancerName = builder.Name() } func (ccb *ccBalancerWrapper) close() { channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") ccb.closeBalancer(ccbModeClosed) } // enterIdleMode is invoked by grpc when the channel enters idle mode upon // expiry of idle_timeout. This call blocks until the balancer is closed. func (ccb *ccBalancerWrapper) enterIdleMode() { channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") ccb.closeBalancer(ccbModeIdle) } // closeBalancer is invoked when the channel is being closed or when it enters // idle mode upon expiry of idle_timeout. func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { ccb.mu.Lock() if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { ccb.mu.Unlock() return } ccb.mode = m done := ccb.serializer.Done() b := ccb.balancer ok := ccb.serializer.Schedule(func(_ context.Context) { // Close the serializer to ensure that no more calls from gRPC are sent // to the balancer. ccb.serializerCancel() // Empty the current balancer name because we don't have a balancer // anymore and also so that we act on the next call to switchTo by // creating a new balancer specified by the new resolver. ccb.curBalancerName = "" }) if !ok { ccb.mu.Unlock() return } ccb.mu.Unlock() // Give enqueued callbacks a chance to finish before closing the balancer. <-done b.Close() } // exitIdleMode is invoked by grpc when the channel exits idle mode either // because of an RPC or because of an invocation of the Connect() API. This // recreates the balancer that was closed previously when entering idle mode. // // If the channel is not in idle mode, we know for a fact that we are here as a // result of the user calling the Connect() method on the ClientConn. In this // case, we can simply forward the call to the underlying balancer, instructing // it to reconnect to the backends. func (ccb *ccBalancerWrapper) exitIdleMode() { ccb.mu.Lock() if ccb.mode == ccbModeClosed { // Request to exit idle is a no-op when wrapper is already closed. ccb.mu.Unlock() return } if ccb.mode == ccbModeIdle { // Recreate the serializer which was closed when we entered idle. ctx, cancel := context.WithCancel(context.Background()) ccb.serializer = grpcsync.NewCallbackSerializer(ctx) ccb.serializerCancel = cancel } // The ClientConn guarantees that mutual exclusion between close() and // exitIdleMode(), and since we just created a new serializer, we can be // sure that the below function will be scheduled. done := make(chan struct{}) ccb.serializer.Schedule(func(_ context.Context) { defer close(done) ccb.mu.Lock() defer ccb.mu.Unlock() if ccb.mode != ccbModeIdle { ccb.balancer.ExitIdle() return } // Gracefulswitch balancer does not support a switchTo operation after // being closed. Hence we need to create a new one here. ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) ccb.mode = ccbModeActive channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") }) ccb.mu.Unlock() <-done } func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { ccb.mu.Lock() defer ccb.mu.Unlock() return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { if ccb.isIdleOrClosed() { return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") } if len(addrs) == 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } ac, err := ccb.cc.newAddrConn(addrs, opts) if err != nil { channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err } acbw := &acBalancerWrapper{ ccb: ccb, ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer), stateListener: opts.StateListener, } ac.acbw = acbw return acbw, nil } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { // The graceful switch balancer will never call this. logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc") } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { if ccb.isIdleOrClosed() { return } acbw, ok := sc.(*acBalancerWrapper) if !ok { return } acbw.UpdateAddresses(addrs) } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { if ccb.isIdleOrClosed() { return } // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is // updated later, we could call the "connecting" picker when the state is // updated, and then call the "ready" picker after the picker gets updated. ccb.cc.blockingpicker.updatePicker(s.Picker) ccb.cc.csMgr.updateState(s.ConnectivityState) } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { if ccb.isIdleOrClosed() { return } ccb.cc.resolveNow(o) } func (ccb *ccBalancerWrapper) Target() string { return ccb.cc.target } // acBalancerWrapper is a wrapper on top of ac for balancers. // It implements balancer.SubConn interface. type acBalancerWrapper struct { ac *addrConn // read-only ccb *ccBalancerWrapper // read-only stateListener func(balancer.SubConnState) mu sync.Mutex producers map[balancer.ProducerBuilder]*refCountedProducer } func (acbw *acBalancerWrapper) String() string { return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int()) } func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { acbw.ac.updateAddrs(addrs) } func (acbw *acBalancerWrapper) Connect() { go acbw.ac.connect() } func (acbw *acBalancerWrapper) Shutdown() { ccb := acbw.ccb if ccb.isIdleOrClosed() { // It it safe to ignore this call when the balancer is closed or in idle // because the ClientConn takes care of closing the connections. // // Not returning early from here when the balancer is closed or in idle // leads to a deadlock though, because of the following sequence of // calls when holding cc.mu: // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> // ccb.RemoveAddrConn --> cc.removeAddrConn return } ccb.cc.removeAddrConn(acbw.ac, errConnDrain) } // NewStream begins a streaming RPC on the addrConn. If the addrConn is not // ready, blocks until it is or ctx expires. Returns an error when the context // expires or the addrConn is shut down. func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { transport, err := acbw.ac.getTransport(ctx) if err != nil { return nil, err } return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...) } // Invoke performs a unary RPC. If the addrConn is not ready, returns // errSubConnNotReady. func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error { cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...) if err != nil { return err } if err := cs.SendMsg(args); err != nil { return err } return cs.RecvMsg(reply) } type refCountedProducer struct { producer balancer.Producer refs int // number of current refs to the producer close func() // underlying producer's close function } func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { acbw.mu.Lock() defer acbw.mu.Unlock() // Look up existing producer from this builder. pData := acbw.producers[pb] if pData == nil { // Not found; create a new one and add it to the producers map. p, close := pb.Build(acbw) pData = &refCountedProducer{producer: p, close: close} acbw.producers[pb] = pData } // Account for this new reference. pData.refs++ // Return a cleanup function wrapped in a OnceFunc to remove this reference // and delete the refCountedProducer from the map if the total reference // count goes to zero. unref := func() { acbw.mu.Lock() pData.refs-- if pData.refs == 0 { defer pData.close() // Run outside the acbw mutex delete(acbw.producers, pb) } acbw.mu.Unlock() } return pData.producer, grpcsync.OnceFunc(unref) }