123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624 |
- /*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- package grpc
- import (
- "errors"
- "fmt"
- "net"
- "strings"
- "sync"
- "time"
- "golang.org/x/net/context"
- "golang.org/x/net/trace"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/transport"
- )
- var (
- // ErrUnspecTarget indicates that the target address is unspecified.
- ErrUnspecTarget = errors.New("grpc: target is unspecified")
- // ErrNoTransportSecurity indicates that there is no transport security
- // being set for ClientConn. Users should either set one or explicitly
- // call WithInsecure DialOption to disable security.
- ErrNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
- // ErrCredentialsMisuse indicates that users want to transmit security information
- // (e.g., oauth2 token) which requires secure connection on an insecure
- // connection.
- ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
- // ErrClientConnClosing indicates that the operation is illegal because
- // the session is closing.
- ErrClientConnClosing = errors.New("grpc: the client connection is closing")
- // ErrClientConnTimeout indicates that the connection could not be
- // established or re-established within the specified timeout.
- ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
- // minimum time to give a connection to complete
- minConnectTimeout = 20 * time.Second
- )
- // dialOptions configure a Dial call. dialOptions are set by the DialOption
- // values passed to Dial.
- type dialOptions struct {
- codec Codec
- cp Compressor
- dc Decompressor
- bs backoffStrategy
- picker Picker
- block bool
- insecure bool
- copts transport.ConnectOptions
- }
- // DialOption configures how we set up the connection.
- type DialOption func(*dialOptions)
- // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
- func WithCodec(c Codec) DialOption {
- return func(o *dialOptions) {
- o.codec = c
- }
- }
- // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
- // compressor.
- func WithCompressor(cp Compressor) DialOption {
- return func(o *dialOptions) {
- o.cp = cp
- }
- }
- // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
- // message decompressor.
- func WithDecompressor(dc Decompressor) DialOption {
- return func(o *dialOptions) {
- o.dc = dc
- }
- }
- // WithPicker returns a DialOption which sets a picker for connection selection.
- func WithPicker(p Picker) DialOption {
- return func(o *dialOptions) {
- o.picker = p
- }
- }
- // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
- // when backing off after failed connection attempts.
- func WithBackoffMaxDelay(md time.Duration) DialOption {
- return WithBackoffConfig(BackoffConfig{MaxDelay: md})
- }
- // WithBackoffConfig configures the dialer to use the provided backoff
- // parameters after connection failures.
- //
- // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
- // for use.
- func WithBackoffConfig(b BackoffConfig) DialOption {
- // Set defaults to ensure that provided BackoffConfig is valid and
- // unexported fields get default values.
- setDefaults(&b)
- return withBackoff(b)
- }
- // withBackoff sets the backoff strategy used for retries after a
- // failed connection attempt.
- //
- // This can be exported if arbitrary backoff strategies are allowed by GRPC.
- func withBackoff(bs backoffStrategy) DialOption {
- return func(o *dialOptions) {
- o.bs = bs
- }
- }
- // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
- // connection is up. Without this, Dial returns immediately and connecting the server
- // happens in background.
- func WithBlock() DialOption {
- return func(o *dialOptions) {
- o.block = true
- }
- }
- // WithInsecure returns a DialOption which disables transport security for this ClientConn.
- // Note that transport security is required unless WithInsecure is set.
- func WithInsecure() DialOption {
- return func(o *dialOptions) {
- o.insecure = true
- }
- }
- // WithTransportCredentials returns a DialOption which configures a
- // connection level security credentials (e.g., TLS/SSL).
- func WithTransportCredentials(creds credentials.TransportAuthenticator) DialOption {
- return func(o *dialOptions) {
- o.copts.AuthOptions = append(o.copts.AuthOptions, creds)
- }
- }
- // WithPerRPCCredentials returns a DialOption which sets
- // credentials which will place auth state on each outbound RPC.
- func WithPerRPCCredentials(creds credentials.Credentials) DialOption {
- return func(o *dialOptions) {
- o.copts.AuthOptions = append(o.copts.AuthOptions, creds)
- }
- }
- // WithTimeout returns a DialOption that configures a timeout for dialing a client connection.
- func WithTimeout(d time.Duration) DialOption {
- return func(o *dialOptions) {
- o.copts.Timeout = d
- }
- }
- // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
- func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption {
- return func(o *dialOptions) {
- o.copts.Dialer = f
- }
- }
- // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
- func WithUserAgent(s string) DialOption {
- return func(o *dialOptions) {
- o.copts.UserAgent = s
- }
- }
- // Dial creates a client connection the given target.
- func Dial(target string, opts ...DialOption) (*ClientConn, error) {
- cc := &ClientConn{
- target: target,
- }
- for _, opt := range opts {
- opt(&cc.dopts)
- }
- if cc.dopts.codec == nil {
- // Set the default codec.
- cc.dopts.codec = protoCodec{}
- }
- if cc.dopts.bs == nil {
- cc.dopts.bs = DefaultBackoffConfig
- }
- if cc.dopts.picker == nil {
- cc.dopts.picker = &unicastPicker{
- target: target,
- }
- }
- if err := cc.dopts.picker.Init(cc); err != nil {
- return nil, err
- }
- colonPos := strings.LastIndex(target, ":")
- if colonPos == -1 {
- colonPos = len(target)
- }
- cc.authority = target[:colonPos]
- return cc, nil
- }
- // ConnectivityState indicates the state of a client connection.
- type ConnectivityState int
- const (
- // Idle indicates the ClientConn is idle.
- Idle ConnectivityState = iota
- // Connecting indicates the ClienConn is connecting.
- Connecting
- // Ready indicates the ClientConn is ready for work.
- Ready
- // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
- TransientFailure
- // Shutdown indicates the ClientConn has started shutting down.
- Shutdown
- )
- func (s ConnectivityState) String() string {
- switch s {
- case Idle:
- return "IDLE"
- case Connecting:
- return "CONNECTING"
- case Ready:
- return "READY"
- case TransientFailure:
- return "TRANSIENT_FAILURE"
- case Shutdown:
- return "SHUTDOWN"
- default:
- panic(fmt.Sprintf("unknown connectivity state: %d", s))
- }
- }
- // ClientConn represents a client connection to an RPC service.
- type ClientConn struct {
- target string
- authority string
- dopts dialOptions
- }
- // State returns the connectivity state of cc.
- // This is EXPERIMENTAL API.
- func (cc *ClientConn) State() (ConnectivityState, error) {
- return cc.dopts.picker.State()
- }
- // WaitForStateChange blocks until the state changes to something other than the sourceState.
- // It returns the new state or error.
- // This is EXPERIMENTAL API.
- func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
- return cc.dopts.picker.WaitForStateChange(ctx, sourceState)
- }
- // Close starts to tear down the ClientConn.
- func (cc *ClientConn) Close() error {
- return cc.dopts.picker.Close()
- }
- // Conn is a client connection to a single destination.
- type Conn struct {
- target string
- dopts dialOptions
- resetChan chan int
- shutdownChan chan struct{}
- events trace.EventLog
- mu sync.Mutex
- state ConnectivityState
- stateCV *sync.Cond
- // ready is closed and becomes nil when a new transport is up or failed
- // due to timeout.
- ready chan struct{}
- transport transport.ClientTransport
- }
- // NewConn creates a Conn.
- func NewConn(cc *ClientConn) (*Conn, error) {
- if cc.target == "" {
- return nil, ErrUnspecTarget
- }
- c := &Conn{
- target: cc.target,
- dopts: cc.dopts,
- resetChan: make(chan int, 1),
- shutdownChan: make(chan struct{}),
- }
- if EnableTracing {
- c.events = trace.NewEventLog("grpc.ClientConn", c.target)
- }
- if !c.dopts.insecure {
- var ok bool
- for _, cd := range c.dopts.copts.AuthOptions {
- if _, ok = cd.(credentials.TransportAuthenticator); ok {
- break
- }
- }
- if !ok {
- return nil, ErrNoTransportSecurity
- }
- } else {
- for _, cd := range c.dopts.copts.AuthOptions {
- if cd.RequireTransportSecurity() {
- return nil, ErrCredentialsMisuse
- }
- }
- }
- c.stateCV = sync.NewCond(&c.mu)
- if c.dopts.block {
- if err := c.resetTransport(false); err != nil {
- c.Close()
- return nil, err
- }
- // Start to monitor the error status of transport.
- go c.transportMonitor()
- } else {
- // Start a goroutine connecting to the server asynchronously.
- go func() {
- if err := c.resetTransport(false); err != nil {
- grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err)
- c.Close()
- return
- }
- c.transportMonitor()
- }()
- }
- return c, nil
- }
- // printf records an event in cc's event log, unless cc has been closed.
- // REQUIRES cc.mu is held.
- func (cc *Conn) printf(format string, a ...interface{}) {
- if cc.events != nil {
- cc.events.Printf(format, a...)
- }
- }
- // errorf records an error in cc's event log, unless cc has been closed.
- // REQUIRES cc.mu is held.
- func (cc *Conn) errorf(format string, a ...interface{}) {
- if cc.events != nil {
- cc.events.Errorf(format, a...)
- }
- }
- // State returns the connectivity state of the Conn
- func (cc *Conn) State() ConnectivityState {
- cc.mu.Lock()
- defer cc.mu.Unlock()
- return cc.state
- }
- // WaitForStateChange blocks until the state changes to something other than the sourceState.
- func (cc *Conn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
- cc.mu.Lock()
- defer cc.mu.Unlock()
- if sourceState != cc.state {
- return cc.state, nil
- }
- done := make(chan struct{})
- var err error
- go func() {
- select {
- case <-ctx.Done():
- cc.mu.Lock()
- err = ctx.Err()
- cc.stateCV.Broadcast()
- cc.mu.Unlock()
- case <-done:
- }
- }()
- defer close(done)
- for sourceState == cc.state {
- cc.stateCV.Wait()
- if err != nil {
- return cc.state, err
- }
- }
- return cc.state, nil
- }
- // NotifyReset tries to signal the underlying transport needs to be reset due to
- // for example a name resolution change in flight.
- func (cc *Conn) NotifyReset() {
- select {
- case cc.resetChan <- 0:
- default:
- }
- }
- func (cc *Conn) resetTransport(closeTransport bool) error {
- var retries int
- start := time.Now()
- for {
- cc.mu.Lock()
- cc.printf("connecting")
- if cc.state == Shutdown {
- // cc.Close() has been invoked.
- cc.mu.Unlock()
- return ErrClientConnClosing
- }
- cc.state = Connecting
- cc.stateCV.Broadcast()
- cc.mu.Unlock()
- if closeTransport {
- cc.transport.Close()
- }
- // Adjust timeout for the current try.
- copts := cc.dopts.copts
- if copts.Timeout < 0 {
- cc.Close()
- return ErrClientConnTimeout
- }
- if copts.Timeout > 0 {
- copts.Timeout -= time.Since(start)
- if copts.Timeout <= 0 {
- cc.Close()
- return ErrClientConnTimeout
- }
- }
- sleepTime := cc.dopts.bs.backoff(retries)
- timeout := sleepTime
- if timeout < minConnectTimeout {
- timeout = minConnectTimeout
- }
- if copts.Timeout == 0 || copts.Timeout > timeout {
- copts.Timeout = timeout
- }
- connectTime := time.Now()
- addr, err := cc.dopts.picker.PickAddr()
- var newTransport transport.ClientTransport
- if err == nil {
- newTransport, err = transport.NewClientTransport(addr, &copts)
- }
- if err != nil {
- cc.mu.Lock()
- if cc.state == Shutdown {
- // cc.Close() has been invoked.
- cc.mu.Unlock()
- return ErrClientConnClosing
- }
- cc.errorf("transient failure: %v", err)
- cc.state = TransientFailure
- cc.stateCV.Broadcast()
- if cc.ready != nil {
- close(cc.ready)
- cc.ready = nil
- }
- cc.mu.Unlock()
- sleepTime -= time.Since(connectTime)
- if sleepTime < 0 {
- sleepTime = 0
- }
- // Fail early before falling into sleep.
- if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) {
- cc.mu.Lock()
- cc.errorf("connection timeout")
- cc.mu.Unlock()
- cc.Close()
- return ErrClientConnTimeout
- }
- closeTransport = false
- time.Sleep(sleepTime)
- retries++
- grpclog.Printf("grpc: Conn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
- continue
- }
- cc.mu.Lock()
- cc.printf("ready")
- if cc.state == Shutdown {
- // cc.Close() has been invoked.
- cc.mu.Unlock()
- newTransport.Close()
- return ErrClientConnClosing
- }
- cc.state = Ready
- cc.stateCV.Broadcast()
- cc.transport = newTransport
- if cc.ready != nil {
- close(cc.ready)
- cc.ready = nil
- }
- cc.mu.Unlock()
- return nil
- }
- }
- func (cc *Conn) reconnect() bool {
- cc.mu.Lock()
- if cc.state == Shutdown {
- // cc.Close() has been invoked.
- cc.mu.Unlock()
- return false
- }
- cc.state = TransientFailure
- cc.stateCV.Broadcast()
- cc.mu.Unlock()
- if err := cc.resetTransport(true); err != nil {
- // The ClientConn is closing.
- cc.mu.Lock()
- cc.printf("transport exiting: %v", err)
- cc.mu.Unlock()
- grpclog.Printf("grpc: Conn.transportMonitor exits due to: %v", err)
- return false
- }
- return true
- }
- // Run in a goroutine to track the error in transport and create the
- // new transport if an error happens. It returns when the channel is closing.
- func (cc *Conn) transportMonitor() {
- for {
- select {
- // shutdownChan is needed to detect the teardown when
- // the ClientConn is idle (i.e., no RPC in flight).
- case <-cc.shutdownChan:
- return
- case <-cc.resetChan:
- if !cc.reconnect() {
- return
- }
- case <-cc.transport.Error():
- if !cc.reconnect() {
- return
- }
- // Tries to drain reset signal if there is any since it is out-dated.
- select {
- case <-cc.resetChan:
- default:
- }
- }
- }
- }
- // Wait blocks until i) the new transport is up or ii) ctx is done or iii) cc is closed.
- func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) {
- for {
- cc.mu.Lock()
- switch {
- case cc.state == Shutdown:
- cc.mu.Unlock()
- return nil, ErrClientConnClosing
- case cc.state == Ready:
- ct := cc.transport
- cc.mu.Unlock()
- return ct, nil
- default:
- ready := cc.ready
- if ready == nil {
- ready = make(chan struct{})
- cc.ready = ready
- }
- cc.mu.Unlock()
- select {
- case <-ctx.Done():
- return nil, transport.ContextErr(ctx.Err())
- // Wait until the new transport is ready or failed.
- case <-ready:
- }
- }
- }
- }
- // Close starts to tear down the Conn. Returns ErrClientConnClosing if
- // it has been closed (mostly due to dial time-out).
- // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
- // some edge cases (e.g., the caller opens and closes many ClientConn's in a
- // tight loop.
- func (cc *Conn) Close() error {
- cc.mu.Lock()
- defer cc.mu.Unlock()
- if cc.state == Shutdown {
- return ErrClientConnClosing
- }
- cc.state = Shutdown
- cc.stateCV.Broadcast()
- if cc.events != nil {
- cc.events.Finish()
- cc.events = nil
- }
- if cc.ready != nil {
- close(cc.ready)
- cc.ready = nil
- }
- if cc.transport != nil {
- cc.transport.Close()
- }
- if cc.shutdownChan != nil {
- close(cc.shutdownChan)
- }
- return nil
- }
|