2023-05-09 17:19:48 +00:00
/ *
*
* Copyright 2017 gRPC authors .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*
* /
package grpc
import (
"context"
"fmt"
"strings"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
2023-09-07 11:20:37 +00:00
)
2023-05-09 17:19:48 +00:00
// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
// balancer.Balancer interface. The ClientConn is free to call these methods
// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
2024-03-11 14:34:34 +00:00
// to the Balancer happen in order by performing them in the serializer, without
// any mutexes held.
2023-05-09 17:19:48 +00:00
//
// ccBalancerWrapper also implements the balancer.ClientConn interface and is
// passed to the Balancer implementations. It invokes unexported methods on the
// ClientConn to handle these calls from the Balancer.
//
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
2023-09-07 11:20:37 +00:00
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
2024-03-11 14:34:34 +00:00
cc * ClientConn
opts balancer . BuildOptions
serializer * grpcsync . CallbackSerializer
serializerCancel context . CancelFunc
2023-09-07 11:20:37 +00:00
2024-03-11 14:34:34 +00:00
// The following fields are only accessed within the serializer or during
// initialization.
2023-05-09 17:19:48 +00:00
curBalancerName string
2024-03-11 14:34:34 +00:00
balancer * gracefulswitch . Balancer
2023-05-09 17:19:48 +00:00
2024-03-11 14:34:34 +00:00
// The following field is protected by mu. Caller must take cc.mu before
// taking mu.
mu sync . Mutex
closed bool
2023-05-09 17:19:48 +00:00
}
2024-03-11 14:34:34 +00:00
// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
func newCCBalancerWrapper ( cc * ClientConn ) * ccBalancerWrapper {
ctx , cancel := context . WithCancel ( cc . ctx )
2023-05-09 17:19:48 +00:00
ccb := & ccBalancerWrapper {
2024-03-11 14:34:34 +00:00
cc : cc ,
opts : balancer . BuildOptions {
DialCreds : cc . dopts . copts . TransportCredentials ,
CredsBundle : cc . dopts . copts . CredsBundle ,
Dialer : cc . dopts . copts . Dialer ,
Authority : cc . authority ,
CustomUserAgent : cc . dopts . copts . UserAgent ,
ChannelzParentID : cc . channelzID ,
Target : cc . parsedTarget ,
} ,
2023-09-07 11:20:37 +00:00
serializer : grpcsync . NewCallbackSerializer ( ctx ) ,
serializerCancel : cancel ,
2023-05-09 17:19:48 +00:00
}
2024-03-11 14:34:34 +00:00
ccb . balancer = gracefulswitch . NewBalancer ( ccb , ccb . opts )
2023-05-09 17:19:48 +00:00
return ccb
}
// updateClientConnState is invoked by grpc to push a ClientConnState update to
2024-03-11 14:34:34 +00:00
// the underlying balancer. This is always executed from the serializer, so
// it is safe to call into the balancer here.
2023-05-09 17:19:48 +00:00
func ( ccb * ccBalancerWrapper ) updateClientConnState ( ccs * balancer . ClientConnState ) error {
2024-03-11 14:34:34 +00:00
errCh := make ( chan error )
ok := ccb . serializer . Schedule ( func ( ctx context . Context ) {
defer close ( errCh )
if ctx . Err ( ) != nil || ccb . balancer == nil {
return
}
err := ccb . balancer . UpdateClientConnState ( * ccs )
if logger . V ( 2 ) && err != nil {
logger . Infof ( "error from balancer.UpdateClientConnState: %v" , err )
}
errCh <- err
2023-09-07 11:20:37 +00:00
} )
if ! ok {
2024-03-11 14:34:34 +00:00
return nil
2023-09-07 11:20:37 +00:00
}
2024-03-11 14:34:34 +00:00
return <- errCh
2023-05-09 17:19:48 +00:00
}
2024-03-11 14:34:34 +00:00
// resolverError is invoked by grpc to push a resolver error to the underlying
// balancer. The call to the balancer is executed from the serializer.
2023-05-09 17:19:48 +00:00
func ( ccb * ccBalancerWrapper ) resolverError ( err error ) {
2024-03-11 14:34:34 +00:00
ccb . serializer . Schedule ( func ( ctx context . Context ) {
if ctx . Err ( ) != nil || ccb . balancer == nil {
return
}
2023-09-07 11:20:37 +00:00
ccb . balancer . ResolverError ( err )
} )
2023-05-09 17:19:48 +00:00
}
// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
// LB policy identified by name.
//
// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
// first good update from the name resolver, it determines the LB policy to use
// and invokes the switchTo() method. Upon receipt of every subsequent update
// from the name resolver, it invokes this method.
//
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func ( ccb * ccBalancerWrapper ) switchTo ( name string ) {
2024-03-11 14:34:34 +00:00
ccb . serializer . Schedule ( func ( ctx context . Context ) {
if ctx . Err ( ) != nil || ccb . balancer == nil {
return
}
2023-09-07 11:20:37 +00:00
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings . EqualFold ( ccb . curBalancerName , name ) {
return
}
ccb . buildLoadBalancingPolicy ( name )
} )
2023-05-09 17:19:48 +00:00
}
2023-09-07 11:20:37 +00:00
// buildLoadBalancingPolicy performs the following:
// - retrieve a balancer builder for the given name. Use the default LB
// policy, pick_first, if no LB policy with name is found in the registry.
// - instruct the gracefulswitch balancer to switch to the above builder. This
// will actually build the new balancer.
// - update the `curBalancerName` field
//
// Must be called from a serializer callback.
func ( ccb * ccBalancerWrapper ) buildLoadBalancingPolicy ( name string ) {
2023-05-09 17:19:48 +00:00
builder := balancer . Get ( name )
if builder == nil {
channelz . Warningf ( logger , ccb . cc . channelzID , "Channel switches to new LB policy %q, since the specified LB policy %q was not registered" , PickFirstBalancerName , name )
builder = newPickfirstBuilder ( )
} else {
channelz . Infof ( logger , ccb . cc . channelzID , "Channel switches to new LB policy %q" , name )
}
if err := ccb . balancer . SwitchTo ( builder ) ; err != nil {
channelz . Errorf ( logger , ccb . cc . channelzID , "Channel failed to build new LB policy %q: %v" , name , err )
return
}
ccb . curBalancerName = builder . Name ( )
}
2024-03-11 14:34:34 +00:00
// close initiates async shutdown of the wrapper. cc.mu must be held when
// calling this function. To determine the wrapper has finished shutting down,
// the channel should block on ccb.serializer.Done() without cc.mu held.
2023-09-07 11:20:37 +00:00
func ( ccb * ccBalancerWrapper ) close ( ) {
ccb . mu . Lock ( )
2024-03-11 14:34:34 +00:00
ccb . closed = true
2023-09-07 11:20:37 +00:00
ccb . mu . Unlock ( )
2024-03-11 14:34:34 +00:00
channelz . Info ( logger , ccb . cc . channelzID , "ccBalancerWrapper: closing" )
ccb . serializer . Schedule ( func ( context . Context ) {
if ccb . balancer == nil {
2023-09-07 11:20:37 +00:00
return
}
2024-03-11 14:34:34 +00:00
ccb . balancer . Close ( )
ccb . balancer = nil
2023-09-07 11:20:37 +00:00
} )
2024-03-11 14:34:34 +00:00
ccb . serializerCancel ( )
2023-09-07 11:20:37 +00:00
}
2024-03-11 14:34:34 +00:00
// exitIdle invokes the balancer's exitIdle method in the serializer.
func ( ccb * ccBalancerWrapper ) exitIdle ( ) {
ccb . serializer . Schedule ( func ( ctx context . Context ) {
if ctx . Err ( ) != nil || ccb . balancer == nil {
return
}
ccb . balancer . ExitIdle ( )
} )
2023-05-09 17:19:48 +00:00
}
func ( ccb * ccBalancerWrapper ) NewSubConn ( addrs [ ] resolver . Address , opts balancer . NewSubConnOptions ) ( balancer . SubConn , error ) {
2024-03-11 14:34:34 +00:00
ccb . cc . mu . Lock ( )
defer ccb . cc . mu . Unlock ( )
ccb . mu . Lock ( )
if ccb . closed {
ccb . mu . Unlock ( )
return nil , fmt . Errorf ( "balancer is being closed; no new SubConns allowed" )
2023-09-07 11:20:37 +00:00
}
2024-03-11 14:34:34 +00:00
ccb . mu . Unlock ( )
2023-09-07 11:20:37 +00:00
if len ( addrs ) == 0 {
2023-05-09 17:19:48 +00:00
return nil , fmt . Errorf ( "grpc: cannot create SubConn with empty address list" )
}
2024-03-11 14:34:34 +00:00
ac , err := ccb . cc . newAddrConnLocked ( addrs , opts )
2023-05-09 17:19:48 +00:00
if err != nil {
channelz . Warningf ( logger , ccb . cc . channelzID , "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v" , err )
return nil , err
}
2023-09-18 12:47:28 +00:00
acbw := & acBalancerWrapper {
ccb : ccb ,
ac : ac ,
producers : make ( map [ balancer . ProducerBuilder ] * refCountedProducer ) ,
stateListener : opts . StateListener ,
}
2023-05-09 17:19:48 +00:00
ac . acbw = acbw
return acbw , nil
}
func ( ccb * ccBalancerWrapper ) RemoveSubConn ( sc balancer . SubConn ) {
2023-09-18 12:47:28 +00:00
// The graceful switch balancer will never call this.
logger . Errorf ( "ccb RemoveSubConn(%v) called unexpectedly, sc" )
2023-05-09 17:19:48 +00:00
}
func ( ccb * ccBalancerWrapper ) UpdateAddresses ( sc balancer . SubConn , addrs [ ] resolver . Address ) {
acbw , ok := sc . ( * acBalancerWrapper )
if ! ok {
return
}
acbw . UpdateAddresses ( addrs )
}
func ( ccb * ccBalancerWrapper ) UpdateState ( s balancer . State ) {
2024-03-11 14:34:34 +00:00
ccb . cc . mu . Lock ( )
defer ccb . cc . mu . Unlock ( )
ccb . mu . Lock ( )
if ccb . closed {
ccb . mu . Unlock ( )
2023-09-07 11:20:37 +00:00
return
}
2024-03-11 14:34:34 +00:00
ccb . mu . Unlock ( )
2023-05-09 17:19:48 +00:00
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
2024-03-11 14:34:34 +00:00
// Note that there is no need to check if the balancer wrapper was closed,
// as we know the graceful switch LB policy will not call cc if it has been
// closed.
ccb . cc . pickerWrapper . updatePicker ( s . Picker )
2023-05-09 17:19:48 +00:00
ccb . cc . csMgr . updateState ( s . ConnectivityState )
}
func ( ccb * ccBalancerWrapper ) ResolveNow ( o resolver . ResolveNowOptions ) {
2024-03-11 14:34:34 +00:00
ccb . cc . mu . RLock ( )
defer ccb . cc . mu . RUnlock ( )
ccb . mu . Lock ( )
if ccb . closed {
ccb . mu . Unlock ( )
2023-09-07 11:20:37 +00:00
return
}
2024-03-11 14:34:34 +00:00
ccb . mu . Unlock ( )
ccb . cc . resolveNowLocked ( o )
2023-05-09 17:19:48 +00:00
}
func ( ccb * ccBalancerWrapper ) Target ( ) string {
return ccb . cc . target
}
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
2023-09-18 12:47:28 +00:00
ac * addrConn // read-only
ccb * ccBalancerWrapper // read-only
stateListener func ( balancer . SubConnState )
2023-09-07 11:20:37 +00:00
2023-05-09 17:19:48 +00:00
mu sync . Mutex
producers map [ balancer . ProducerBuilder ] * refCountedProducer
}
2024-03-11 14:34:34 +00:00
// updateState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func ( acbw * acBalancerWrapper ) updateState ( s connectivity . State , err error ) {
acbw . ccb . serializer . Schedule ( func ( ctx context . Context ) {
if ctx . Err ( ) != nil || acbw . ccb . balancer == nil {
return
}
// Even though it is optional for balancers, gracefulswitch ensures
// opts.StateListener is set, so this cannot ever be nil.
// TODO: delete this comment when UpdateSubConnState is removed.
acbw . stateListener ( balancer . SubConnState { ConnectivityState : s , ConnectionError : err } )
} )
}
2023-09-07 11:20:37 +00:00
func ( acbw * acBalancerWrapper ) String ( ) string {
return fmt . Sprintf ( "SubConn(id:%d)" , acbw . ac . channelzID . Int ( ) )
}
2023-05-09 17:19:48 +00:00
2023-09-07 11:20:37 +00:00
func ( acbw * acBalancerWrapper ) UpdateAddresses ( addrs [ ] resolver . Address ) {
acbw . ac . updateAddrs ( addrs )
2023-05-09 17:19:48 +00:00
}
func ( acbw * acBalancerWrapper ) Connect ( ) {
go acbw . ac . connect ( )
}
2023-09-18 12:47:28 +00:00
func ( acbw * acBalancerWrapper ) Shutdown ( ) {
2024-03-11 14:34:34 +00:00
acbw . ccb . cc . removeAddrConn ( acbw . ac , errConnDrain )
2023-09-18 12:47:28 +00:00
}
2023-05-09 17:19:48 +00:00
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
2023-09-07 11:20:37 +00:00
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
2023-05-09 17:19:48 +00:00
func ( acbw * acBalancerWrapper ) NewStream ( ctx context . Context , desc * StreamDesc , method string , opts ... CallOption ) ( ClientStream , error ) {
2023-09-07 11:20:37 +00:00
transport , err := acbw . ac . getTransport ( ctx )
if err != nil {
return nil , err
2023-05-09 17:19:48 +00:00
}
return newNonRetryClientStream ( ctx , desc , method , transport , acbw . ac , opts ... )
}
// Invoke performs a unary RPC. If the addrConn is not ready, returns
// errSubConnNotReady.
2023-09-18 12:47:28 +00:00
func ( acbw * acBalancerWrapper ) Invoke ( ctx context . Context , method string , args any , reply any , opts ... CallOption ) error {
2023-05-09 17:19:48 +00:00
cs , err := acbw . NewStream ( ctx , unaryStreamDesc , method , opts ... )
if err != nil {
return err
}
if err := cs . SendMsg ( args ) ; err != nil {
return err
}
return cs . RecvMsg ( reply )
}
type refCountedProducer struct {
producer balancer . Producer
refs int // number of current refs to the producer
close func ( ) // underlying producer's close function
}
func ( acbw * acBalancerWrapper ) GetOrBuildProducer ( pb balancer . ProducerBuilder ) ( balancer . Producer , func ( ) ) {
acbw . mu . Lock ( )
defer acbw . mu . Unlock ( )
// Look up existing producer from this builder.
pData := acbw . producers [ pb ]
if pData == nil {
// Not found; create a new one and add it to the producers map.
p , close := pb . Build ( acbw )
pData = & refCountedProducer { producer : p , close : close }
acbw . producers [ pb ] = pData
}
// Account for this new reference.
pData . refs ++
// Return a cleanup function wrapped in a OnceFunc to remove this reference
// and delete the refCountedProducer from the map if the total reference
// count goes to zero.
unref := func ( ) {
acbw . mu . Lock ( )
pData . refs --
if pData . refs == 0 {
defer pData . close ( ) // Run outside the acbw mutex
delete ( acbw . producers , pb )
}
acbw . mu . Unlock ( )
}
return pData . producer , grpcsync . OnceFunc ( unref )
}