client_conn_pool.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code's client connection pooling.
  5. package http2
  6. import (
  7. "crypto/tls"
  8. "net/http"
  9. "sync"
  10. )
  11. // ClientConnPool manages a pool of HTTP/2 client connections.
  12. type ClientConnPool interface {
  13. GetClientConn(req *http.Request, addr string) (*ClientConn, error)
  14. MarkDead(*ClientConn)
  15. }
  16. // TODO: use singleflight for dialing and addConnCalls?
  17. type clientConnPool struct {
  18. t *Transport
  19. mu sync.Mutex // TODO: maybe switch to RWMutex
  20. // TODO: add support for sharing conns based on cert names
  21. // (e.g. share conn for googleapis.com and appspot.com)
  22. conns map[string][]*ClientConn // key is host:port
  23. dialing map[string]*dialCall // currently in-flight dials
  24. keys map[*ClientConn][]string
  25. addConnCalls map[string]*addConnCall // in-flight addConnIfNeede calls
  26. }
  27. func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  28. return p.getClientConn(req, addr, dialOnMiss)
  29. }
  30. const (
  31. dialOnMiss = true
  32. noDialOnMiss = false
  33. )
  34. func (p *clientConnPool) getClientConn(_ *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
  35. p.mu.Lock()
  36. for _, cc := range p.conns[addr] {
  37. if cc.CanTakeNewRequest() {
  38. p.mu.Unlock()
  39. return cc, nil
  40. }
  41. }
  42. if !dialOnMiss {
  43. p.mu.Unlock()
  44. return nil, ErrNoCachedConn
  45. }
  46. call := p.getStartDialLocked(addr)
  47. p.mu.Unlock()
  48. <-call.done
  49. return call.res, call.err
  50. }
  51. // dialCall is an in-flight Transport dial call to a host.
  52. type dialCall struct {
  53. p *clientConnPool
  54. done chan struct{} // closed when done
  55. res *ClientConn // valid after done is closed
  56. err error // valid after done is closed
  57. }
  58. // requires p.mu is held.
  59. func (p *clientConnPool) getStartDialLocked(addr string) *dialCall {
  60. if call, ok := p.dialing[addr]; ok {
  61. // A dial is already in-flight. Don't start another.
  62. return call
  63. }
  64. call := &dialCall{p: p, done: make(chan struct{})}
  65. if p.dialing == nil {
  66. p.dialing = make(map[string]*dialCall)
  67. }
  68. p.dialing[addr] = call
  69. go call.dial(addr)
  70. return call
  71. }
  72. // run in its own goroutine.
  73. func (c *dialCall) dial(addr string) {
  74. c.res, c.err = c.p.t.dialClientConn(addr)
  75. close(c.done)
  76. c.p.mu.Lock()
  77. delete(c.p.dialing, addr)
  78. if c.err == nil {
  79. c.p.addConnLocked(addr, c.res)
  80. }
  81. c.p.mu.Unlock()
  82. }
  83. // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
  84. // already exist. It coalesces concurrent calls with the same key.
  85. // This is used by the http1 Transport code when it creates a new connection. Because
  86. // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
  87. // the protocol), it can get into a situation where it has multiple TLS connections.
  88. // This code decides which ones live or die.
  89. // The return value used is whether c was used.
  90. // c is never closed.
  91. func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
  92. p.mu.Lock()
  93. for _, cc := range p.conns[key] {
  94. if cc.CanTakeNewRequest() {
  95. p.mu.Unlock()
  96. return false, nil
  97. }
  98. }
  99. call, dup := p.addConnCalls[key]
  100. if !dup {
  101. if p.addConnCalls == nil {
  102. p.addConnCalls = make(map[string]*addConnCall)
  103. }
  104. call = &addConnCall{
  105. p: p,
  106. done: make(chan struct{}),
  107. }
  108. p.addConnCalls[key] = call
  109. go call.run(t, key, c)
  110. }
  111. p.mu.Unlock()
  112. <-call.done
  113. if call.err != nil {
  114. return false, call.err
  115. }
  116. return !dup, nil
  117. }
  118. type addConnCall struct {
  119. p *clientConnPool
  120. done chan struct{} // closed when done
  121. err error
  122. }
  123. func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
  124. cc, err := t.NewClientConn(tc)
  125. p := c.p
  126. p.mu.Lock()
  127. if err != nil {
  128. c.err = err
  129. } else {
  130. p.addConnLocked(key, cc)
  131. }
  132. delete(p.addConnCalls, key)
  133. p.mu.Unlock()
  134. close(c.done)
  135. }
  136. func (p *clientConnPool) addConn(key string, cc *ClientConn) {
  137. p.mu.Lock()
  138. p.addConnLocked(key, cc)
  139. p.mu.Unlock()
  140. }
  141. // p.mu must be held
  142. func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
  143. for _, v := range p.conns[key] {
  144. if v == cc {
  145. return
  146. }
  147. }
  148. if p.conns == nil {
  149. p.conns = make(map[string][]*ClientConn)
  150. }
  151. if p.keys == nil {
  152. p.keys = make(map[*ClientConn][]string)
  153. }
  154. p.conns[key] = append(p.conns[key], cc)
  155. p.keys[cc] = append(p.keys[cc], key)
  156. }
  157. func (p *clientConnPool) MarkDead(cc *ClientConn) {
  158. p.mu.Lock()
  159. defer p.mu.Unlock()
  160. for _, key := range p.keys[cc] {
  161. vv, ok := p.conns[key]
  162. if !ok {
  163. continue
  164. }
  165. newList := filterOutClientConn(vv, cc)
  166. if len(newList) > 0 {
  167. p.conns[key] = newList
  168. } else {
  169. delete(p.conns, key)
  170. }
  171. }
  172. delete(p.keys, cc)
  173. }
  174. func (p *clientConnPool) closeIdleConnections() {
  175. p.mu.Lock()
  176. defer p.mu.Unlock()
  177. // TODO: don't close a cc if it was just added to the pool
  178. // milliseconds ago and has never been used. There's currently
  179. // a small race window with the HTTP/1 Transport's integration
  180. // where it can add an idle conn just before using it, and
  181. // somebody else can concurrently call CloseIdleConns and
  182. // break some caller's RoundTrip.
  183. for _, vv := range p.conns {
  184. for _, cc := range vv {
  185. cc.closeIfIdle()
  186. }
  187. }
  188. }
  189. func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
  190. out := in[:0]
  191. for _, v := range in {
  192. if v != exclude {
  193. out = append(out, v)
  194. }
  195. }
  196. // If we filtered it out, zero out the last item to prevent
  197. // the GC from seeing it.
  198. if len(in) != len(out) {
  199. in[len(in)-1] = nil
  200. }
  201. return out
  202. }