clientconn.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "errors"
  36. "fmt"
  37. "net"
  38. "strings"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "golang.org/x/net/trace"
  43. "google.golang.org/grpc/credentials"
  44. "google.golang.org/grpc/grpclog"
  45. "google.golang.org/grpc/transport"
  46. )
  47. var (
  48. // ErrUnspecTarget indicates that the target address is unspecified.
  49. ErrUnspecTarget = errors.New("grpc: target is unspecified")
  50. // ErrNoTransportSecurity indicates that there is no transport security
  51. // being set for ClientConn. Users should either set one or explicitly
  52. // call WithInsecure DialOption to disable security.
  53. ErrNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  54. // ErrCredentialsMisuse indicates that users want to transmit security information
  55. // (e.g., oauth2 token) which requires secure connection on an insecure
  56. // connection.
  57. ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
  58. // ErrClientConnClosing indicates that the operation is illegal because
  59. // the session is closing.
  60. ErrClientConnClosing = errors.New("grpc: the client connection is closing")
  61. // ErrClientConnTimeout indicates that the connection could not be
  62. // established or re-established within the specified timeout.
  63. ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
  64. // minimum time to give a connection to complete
  65. minConnectTimeout = 20 * time.Second
  66. )
  67. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  68. // values passed to Dial.
  69. type dialOptions struct {
  70. codec Codec
  71. cp Compressor
  72. dc Decompressor
  73. bs backoffStrategy
  74. picker Picker
  75. block bool
  76. insecure bool
  77. copts transport.ConnectOptions
  78. }
  79. // DialOption configures how we set up the connection.
  80. type DialOption func(*dialOptions)
  81. // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
  82. func WithCodec(c Codec) DialOption {
  83. return func(o *dialOptions) {
  84. o.codec = c
  85. }
  86. }
  87. // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
  88. // compressor.
  89. func WithCompressor(cp Compressor) DialOption {
  90. return func(o *dialOptions) {
  91. o.cp = cp
  92. }
  93. }
  94. // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
  95. // message decompressor.
  96. func WithDecompressor(dc Decompressor) DialOption {
  97. return func(o *dialOptions) {
  98. o.dc = dc
  99. }
  100. }
  101. // WithPicker returns a DialOption which sets a picker for connection selection.
  102. func WithPicker(p Picker) DialOption {
  103. return func(o *dialOptions) {
  104. o.picker = p
  105. }
  106. }
  107. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  108. // when backing off after failed connection attempts.
  109. func WithBackoffMaxDelay(md time.Duration) DialOption {
  110. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  111. }
  112. // WithBackoffConfig configures the dialer to use the provided backoff
  113. // parameters after connection failures.
  114. //
  115. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  116. // for use.
  117. func WithBackoffConfig(b BackoffConfig) DialOption {
  118. // Set defaults to ensure that provided BackoffConfig is valid and
  119. // unexported fields get default values.
  120. setDefaults(&b)
  121. return withBackoff(b)
  122. }
  123. // withBackoff sets the backoff strategy used for retries after a
  124. // failed connection attempt.
  125. //
  126. // This can be exported if arbitrary backoff strategies are allowed by GRPC.
  127. func withBackoff(bs backoffStrategy) DialOption {
  128. return func(o *dialOptions) {
  129. o.bs = bs
  130. }
  131. }
  132. // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
  133. // connection is up. Without this, Dial returns immediately and connecting the server
  134. // happens in background.
  135. func WithBlock() DialOption {
  136. return func(o *dialOptions) {
  137. o.block = true
  138. }
  139. }
  140. // WithInsecure returns a DialOption which disables transport security for this ClientConn.
  141. // Note that transport security is required unless WithInsecure is set.
  142. func WithInsecure() DialOption {
  143. return func(o *dialOptions) {
  144. o.insecure = true
  145. }
  146. }
  147. // WithTransportCredentials returns a DialOption which configures a
  148. // connection level security credentials (e.g., TLS/SSL).
  149. func WithTransportCredentials(creds credentials.TransportAuthenticator) DialOption {
  150. return func(o *dialOptions) {
  151. o.copts.AuthOptions = append(o.copts.AuthOptions, creds)
  152. }
  153. }
  154. // WithPerRPCCredentials returns a DialOption which sets
  155. // credentials which will place auth state on each outbound RPC.
  156. func WithPerRPCCredentials(creds credentials.Credentials) DialOption {
  157. return func(o *dialOptions) {
  158. o.copts.AuthOptions = append(o.copts.AuthOptions, creds)
  159. }
  160. }
  161. // WithTimeout returns a DialOption that configures a timeout for dialing a client connection.
  162. func WithTimeout(d time.Duration) DialOption {
  163. return func(o *dialOptions) {
  164. o.copts.Timeout = d
  165. }
  166. }
  167. // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
  168. func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption {
  169. return func(o *dialOptions) {
  170. o.copts.Dialer = f
  171. }
  172. }
  173. // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
  174. func WithUserAgent(s string) DialOption {
  175. return func(o *dialOptions) {
  176. o.copts.UserAgent = s
  177. }
  178. }
  179. // Dial creates a client connection the given target.
  180. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  181. cc := &ClientConn{
  182. target: target,
  183. }
  184. for _, opt := range opts {
  185. opt(&cc.dopts)
  186. }
  187. if cc.dopts.codec == nil {
  188. // Set the default codec.
  189. cc.dopts.codec = protoCodec{}
  190. }
  191. if cc.dopts.bs == nil {
  192. cc.dopts.bs = DefaultBackoffConfig
  193. }
  194. if cc.dopts.picker == nil {
  195. cc.dopts.picker = &unicastPicker{
  196. target: target,
  197. }
  198. }
  199. if err := cc.dopts.picker.Init(cc); err != nil {
  200. return nil, err
  201. }
  202. colonPos := strings.LastIndex(target, ":")
  203. if colonPos == -1 {
  204. colonPos = len(target)
  205. }
  206. cc.authority = target[:colonPos]
  207. return cc, nil
  208. }
  209. // ConnectivityState indicates the state of a client connection.
  210. type ConnectivityState int
  211. const (
  212. // Idle indicates the ClientConn is idle.
  213. Idle ConnectivityState = iota
  214. // Connecting indicates the ClienConn is connecting.
  215. Connecting
  216. // Ready indicates the ClientConn is ready for work.
  217. Ready
  218. // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
  219. TransientFailure
  220. // Shutdown indicates the ClientConn has started shutting down.
  221. Shutdown
  222. )
  223. func (s ConnectivityState) String() string {
  224. switch s {
  225. case Idle:
  226. return "IDLE"
  227. case Connecting:
  228. return "CONNECTING"
  229. case Ready:
  230. return "READY"
  231. case TransientFailure:
  232. return "TRANSIENT_FAILURE"
  233. case Shutdown:
  234. return "SHUTDOWN"
  235. default:
  236. panic(fmt.Sprintf("unknown connectivity state: %d", s))
  237. }
  238. }
  239. // ClientConn represents a client connection to an RPC service.
  240. type ClientConn struct {
  241. target string
  242. authority string
  243. dopts dialOptions
  244. }
  245. // State returns the connectivity state of cc.
  246. // This is EXPERIMENTAL API.
  247. func (cc *ClientConn) State() (ConnectivityState, error) {
  248. return cc.dopts.picker.State()
  249. }
  250. // WaitForStateChange blocks until the state changes to something other than the sourceState.
  251. // It returns the new state or error.
  252. // This is EXPERIMENTAL API.
  253. func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  254. return cc.dopts.picker.WaitForStateChange(ctx, sourceState)
  255. }
  256. // Close starts to tear down the ClientConn.
  257. func (cc *ClientConn) Close() error {
  258. return cc.dopts.picker.Close()
  259. }
  260. // Conn is a client connection to a single destination.
  261. type Conn struct {
  262. target string
  263. dopts dialOptions
  264. resetChan chan int
  265. shutdownChan chan struct{}
  266. events trace.EventLog
  267. mu sync.Mutex
  268. state ConnectivityState
  269. stateCV *sync.Cond
  270. // ready is closed and becomes nil when a new transport is up or failed
  271. // due to timeout.
  272. ready chan struct{}
  273. transport transport.ClientTransport
  274. }
  275. // NewConn creates a Conn.
  276. func NewConn(cc *ClientConn) (*Conn, error) {
  277. if cc.target == "" {
  278. return nil, ErrUnspecTarget
  279. }
  280. c := &Conn{
  281. target: cc.target,
  282. dopts: cc.dopts,
  283. resetChan: make(chan int, 1),
  284. shutdownChan: make(chan struct{}),
  285. }
  286. if EnableTracing {
  287. c.events = trace.NewEventLog("grpc.ClientConn", c.target)
  288. }
  289. if !c.dopts.insecure {
  290. var ok bool
  291. for _, cd := range c.dopts.copts.AuthOptions {
  292. if _, ok = cd.(credentials.TransportAuthenticator); ok {
  293. break
  294. }
  295. }
  296. if !ok {
  297. return nil, ErrNoTransportSecurity
  298. }
  299. } else {
  300. for _, cd := range c.dopts.copts.AuthOptions {
  301. if cd.RequireTransportSecurity() {
  302. return nil, ErrCredentialsMisuse
  303. }
  304. }
  305. }
  306. c.stateCV = sync.NewCond(&c.mu)
  307. if c.dopts.block {
  308. if err := c.resetTransport(false); err != nil {
  309. c.Close()
  310. return nil, err
  311. }
  312. // Start to monitor the error status of transport.
  313. go c.transportMonitor()
  314. } else {
  315. // Start a goroutine connecting to the server asynchronously.
  316. go func() {
  317. if err := c.resetTransport(false); err != nil {
  318. grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err)
  319. c.Close()
  320. return
  321. }
  322. c.transportMonitor()
  323. }()
  324. }
  325. return c, nil
  326. }
  327. // printf records an event in cc's event log, unless cc has been closed.
  328. // REQUIRES cc.mu is held.
  329. func (cc *Conn) printf(format string, a ...interface{}) {
  330. if cc.events != nil {
  331. cc.events.Printf(format, a...)
  332. }
  333. }
  334. // errorf records an error in cc's event log, unless cc has been closed.
  335. // REQUIRES cc.mu is held.
  336. func (cc *Conn) errorf(format string, a ...interface{}) {
  337. if cc.events != nil {
  338. cc.events.Errorf(format, a...)
  339. }
  340. }
  341. // State returns the connectivity state of the Conn
  342. func (cc *Conn) State() ConnectivityState {
  343. cc.mu.Lock()
  344. defer cc.mu.Unlock()
  345. return cc.state
  346. }
  347. // WaitForStateChange blocks until the state changes to something other than the sourceState.
  348. func (cc *Conn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  349. cc.mu.Lock()
  350. defer cc.mu.Unlock()
  351. if sourceState != cc.state {
  352. return cc.state, nil
  353. }
  354. done := make(chan struct{})
  355. var err error
  356. go func() {
  357. select {
  358. case <-ctx.Done():
  359. cc.mu.Lock()
  360. err = ctx.Err()
  361. cc.stateCV.Broadcast()
  362. cc.mu.Unlock()
  363. case <-done:
  364. }
  365. }()
  366. defer close(done)
  367. for sourceState == cc.state {
  368. cc.stateCV.Wait()
  369. if err != nil {
  370. return cc.state, err
  371. }
  372. }
  373. return cc.state, nil
  374. }
  375. // NotifyReset tries to signal the underlying transport needs to be reset due to
  376. // for example a name resolution change in flight.
  377. func (cc *Conn) NotifyReset() {
  378. select {
  379. case cc.resetChan <- 0:
  380. default:
  381. }
  382. }
  383. func (cc *Conn) resetTransport(closeTransport bool) error {
  384. var retries int
  385. start := time.Now()
  386. for {
  387. cc.mu.Lock()
  388. cc.printf("connecting")
  389. if cc.state == Shutdown {
  390. // cc.Close() has been invoked.
  391. cc.mu.Unlock()
  392. return ErrClientConnClosing
  393. }
  394. cc.state = Connecting
  395. cc.stateCV.Broadcast()
  396. cc.mu.Unlock()
  397. if closeTransport {
  398. cc.transport.Close()
  399. }
  400. // Adjust timeout for the current try.
  401. copts := cc.dopts.copts
  402. if copts.Timeout < 0 {
  403. cc.Close()
  404. return ErrClientConnTimeout
  405. }
  406. if copts.Timeout > 0 {
  407. copts.Timeout -= time.Since(start)
  408. if copts.Timeout <= 0 {
  409. cc.Close()
  410. return ErrClientConnTimeout
  411. }
  412. }
  413. sleepTime := cc.dopts.bs.backoff(retries)
  414. timeout := sleepTime
  415. if timeout < minConnectTimeout {
  416. timeout = minConnectTimeout
  417. }
  418. if copts.Timeout == 0 || copts.Timeout > timeout {
  419. copts.Timeout = timeout
  420. }
  421. connectTime := time.Now()
  422. addr, err := cc.dopts.picker.PickAddr()
  423. var newTransport transport.ClientTransport
  424. if err == nil {
  425. newTransport, err = transport.NewClientTransport(addr, &copts)
  426. }
  427. if err != nil {
  428. cc.mu.Lock()
  429. if cc.state == Shutdown {
  430. // cc.Close() has been invoked.
  431. cc.mu.Unlock()
  432. return ErrClientConnClosing
  433. }
  434. cc.errorf("transient failure: %v", err)
  435. cc.state = TransientFailure
  436. cc.stateCV.Broadcast()
  437. if cc.ready != nil {
  438. close(cc.ready)
  439. cc.ready = nil
  440. }
  441. cc.mu.Unlock()
  442. sleepTime -= time.Since(connectTime)
  443. if sleepTime < 0 {
  444. sleepTime = 0
  445. }
  446. // Fail early before falling into sleep.
  447. if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) {
  448. cc.mu.Lock()
  449. cc.errorf("connection timeout")
  450. cc.mu.Unlock()
  451. cc.Close()
  452. return ErrClientConnTimeout
  453. }
  454. closeTransport = false
  455. time.Sleep(sleepTime)
  456. retries++
  457. grpclog.Printf("grpc: Conn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
  458. continue
  459. }
  460. cc.mu.Lock()
  461. cc.printf("ready")
  462. if cc.state == Shutdown {
  463. // cc.Close() has been invoked.
  464. cc.mu.Unlock()
  465. newTransport.Close()
  466. return ErrClientConnClosing
  467. }
  468. cc.state = Ready
  469. cc.stateCV.Broadcast()
  470. cc.transport = newTransport
  471. if cc.ready != nil {
  472. close(cc.ready)
  473. cc.ready = nil
  474. }
  475. cc.mu.Unlock()
  476. return nil
  477. }
  478. }
  479. func (cc *Conn) reconnect() bool {
  480. cc.mu.Lock()
  481. if cc.state == Shutdown {
  482. // cc.Close() has been invoked.
  483. cc.mu.Unlock()
  484. return false
  485. }
  486. cc.state = TransientFailure
  487. cc.stateCV.Broadcast()
  488. cc.mu.Unlock()
  489. if err := cc.resetTransport(true); err != nil {
  490. // The ClientConn is closing.
  491. cc.mu.Lock()
  492. cc.printf("transport exiting: %v", err)
  493. cc.mu.Unlock()
  494. grpclog.Printf("grpc: Conn.transportMonitor exits due to: %v", err)
  495. return false
  496. }
  497. return true
  498. }
  499. // Run in a goroutine to track the error in transport and create the
  500. // new transport if an error happens. It returns when the channel is closing.
  501. func (cc *Conn) transportMonitor() {
  502. for {
  503. select {
  504. // shutdownChan is needed to detect the teardown when
  505. // the ClientConn is idle (i.e., no RPC in flight).
  506. case <-cc.shutdownChan:
  507. return
  508. case <-cc.resetChan:
  509. if !cc.reconnect() {
  510. return
  511. }
  512. case <-cc.transport.Error():
  513. if !cc.reconnect() {
  514. return
  515. }
  516. // Tries to drain reset signal if there is any since it is out-dated.
  517. select {
  518. case <-cc.resetChan:
  519. default:
  520. }
  521. }
  522. }
  523. }
  524. // Wait blocks until i) the new transport is up or ii) ctx is done or iii) cc is closed.
  525. func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) {
  526. for {
  527. cc.mu.Lock()
  528. switch {
  529. case cc.state == Shutdown:
  530. cc.mu.Unlock()
  531. return nil, ErrClientConnClosing
  532. case cc.state == Ready:
  533. ct := cc.transport
  534. cc.mu.Unlock()
  535. return ct, nil
  536. default:
  537. ready := cc.ready
  538. if ready == nil {
  539. ready = make(chan struct{})
  540. cc.ready = ready
  541. }
  542. cc.mu.Unlock()
  543. select {
  544. case <-ctx.Done():
  545. return nil, transport.ContextErr(ctx.Err())
  546. // Wait until the new transport is ready or failed.
  547. case <-ready:
  548. }
  549. }
  550. }
  551. }
  552. // Close starts to tear down the Conn. Returns ErrClientConnClosing if
  553. // it has been closed (mostly due to dial time-out).
  554. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  555. // some edge cases (e.g., the caller opens and closes many ClientConn's in a
  556. // tight loop.
  557. func (cc *Conn) Close() error {
  558. cc.mu.Lock()
  559. defer cc.mu.Unlock()
  560. if cc.state == Shutdown {
  561. return ErrClientConnClosing
  562. }
  563. cc.state = Shutdown
  564. cc.stateCV.Broadcast()
  565. if cc.events != nil {
  566. cc.events.Finish()
  567. cc.events = nil
  568. }
  569. if cc.ready != nil {
  570. close(cc.ready)
  571. cc.ready = nil
  572. }
  573. if cc.transport != nil {
  574. cc.transport.Close()
  575. }
  576. if cc.shutdownChan != nil {
  577. close(cc.shutdownChan)
  578. }
  579. return nil
  580. }