picker.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "container/list"
  36. "fmt"
  37. "sync"
  38. "golang.org/x/net/context"
  39. "google.golang.org/grpc/grpclog"
  40. "google.golang.org/grpc/naming"
  41. "google.golang.org/grpc/transport"
  42. )
  43. // Picker picks a Conn for RPC requests.
  44. // This is EXPERIMENTAL and please do not implement your own Picker for now.
  45. type Picker interface {
  46. // Init does initial processing for the Picker, e.g., initiate some connections.
  47. Init(cc *ClientConn) error
  48. // Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
  49. // or some error happens.
  50. Pick(ctx context.Context) (transport.ClientTransport, error)
  51. // PickAddr picks a peer address for connecting. This will be called repeated for
  52. // connecting/reconnecting.
  53. PickAddr() (string, error)
  54. // State returns the connectivity state of the underlying connections.
  55. State() (ConnectivityState, error)
  56. // WaitForStateChange blocks until the state changes to something other than
  57. // the sourceState. It returns the new state or error.
  58. WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error)
  59. // Close closes all the Conn's owned by this Picker.
  60. Close() error
  61. }
  62. // unicastPicker is the default Picker which is used when there is no custom Picker
  63. // specified by users. It always picks the same Conn.
  64. type unicastPicker struct {
  65. target string
  66. conn *Conn
  67. }
  68. func (p *unicastPicker) Init(cc *ClientConn) error {
  69. c, err := NewConn(cc)
  70. if err != nil {
  71. return err
  72. }
  73. p.conn = c
  74. return nil
  75. }
  76. func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
  77. return p.conn.Wait(ctx)
  78. }
  79. func (p *unicastPicker) PickAddr() (string, error) {
  80. return p.target, nil
  81. }
  82. func (p *unicastPicker) State() (ConnectivityState, error) {
  83. return p.conn.State(), nil
  84. }
  85. func (p *unicastPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  86. return p.conn.WaitForStateChange(ctx, sourceState)
  87. }
  88. func (p *unicastPicker) Close() error {
  89. if p.conn != nil {
  90. return p.conn.Close()
  91. }
  92. return nil
  93. }
  94. // unicastNamingPicker picks an address from a name resolver to set up the connection.
  95. type unicastNamingPicker struct {
  96. cc *ClientConn
  97. resolver naming.Resolver
  98. watcher naming.Watcher
  99. mu sync.Mutex
  100. // The list of the addresses are obtained from watcher.
  101. addrs *list.List
  102. // It tracks the current picked addr by PickAddr(). The next PickAddr may
  103. // push it forward on addrs.
  104. pickedAddr *list.Element
  105. conn *Conn
  106. }
  107. // NewUnicastNamingPicker creates a Picker to pick addresses from a name resolver
  108. // to connect.
  109. func NewUnicastNamingPicker(r naming.Resolver) Picker {
  110. return &unicastNamingPicker{
  111. resolver: r,
  112. addrs: list.New(),
  113. }
  114. }
  115. type addrInfo struct {
  116. addr string
  117. // Set to true if this addrInfo needs to be deleted in the next PickAddrr() call.
  118. deleting bool
  119. }
  120. // processUpdates calls Watcher.Next() once and processes the obtained updates.
  121. func (p *unicastNamingPicker) processUpdates() error {
  122. updates, err := p.watcher.Next()
  123. if err != nil {
  124. return err
  125. }
  126. for _, update := range updates {
  127. switch update.Op {
  128. case naming.Add:
  129. p.mu.Lock()
  130. p.addrs.PushBack(&addrInfo{
  131. addr: update.Addr,
  132. })
  133. p.mu.Unlock()
  134. // Initial connection setup
  135. if p.conn == nil {
  136. conn, err := NewConn(p.cc)
  137. if err != nil {
  138. return err
  139. }
  140. p.conn = conn
  141. }
  142. case naming.Delete:
  143. p.mu.Lock()
  144. for e := p.addrs.Front(); e != nil; e = e.Next() {
  145. if update.Addr == e.Value.(*addrInfo).addr {
  146. if e == p.pickedAddr {
  147. // Do not remove the element now if it is the current picked
  148. // one. We leave the deletion to the next PickAddr() call.
  149. e.Value.(*addrInfo).deleting = true
  150. // Notify Conn to close it. All the live RPCs on this connection
  151. // will be aborted.
  152. p.conn.NotifyReset()
  153. } else {
  154. p.addrs.Remove(e)
  155. }
  156. }
  157. }
  158. p.mu.Unlock()
  159. default:
  160. grpclog.Println("Unknown update.Op ", update.Op)
  161. }
  162. }
  163. return nil
  164. }
  165. // monitor runs in a standalone goroutine to keep watching name resolution updates until the watcher
  166. // is closed.
  167. func (p *unicastNamingPicker) monitor() {
  168. for {
  169. if err := p.processUpdates(); err != nil {
  170. return
  171. }
  172. }
  173. }
  174. func (p *unicastNamingPicker) Init(cc *ClientConn) error {
  175. w, err := p.resolver.Resolve(cc.target)
  176. if err != nil {
  177. return err
  178. }
  179. p.watcher = w
  180. p.cc = cc
  181. // Get the initial name resolution.
  182. if err := p.processUpdates(); err != nil {
  183. return err
  184. }
  185. go p.monitor()
  186. return nil
  187. }
  188. func (p *unicastNamingPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
  189. return p.conn.Wait(ctx)
  190. }
  191. func (p *unicastNamingPicker) PickAddr() (string, error) {
  192. p.mu.Lock()
  193. defer p.mu.Unlock()
  194. if p.pickedAddr == nil {
  195. p.pickedAddr = p.addrs.Front()
  196. } else {
  197. pa := p.pickedAddr
  198. p.pickedAddr = pa.Next()
  199. if pa.Value.(*addrInfo).deleting {
  200. p.addrs.Remove(pa)
  201. }
  202. if p.pickedAddr == nil {
  203. p.pickedAddr = p.addrs.Front()
  204. }
  205. }
  206. if p.pickedAddr == nil {
  207. return "", fmt.Errorf("there is no address available to pick")
  208. }
  209. return p.pickedAddr.Value.(*addrInfo).addr, nil
  210. }
  211. func (p *unicastNamingPicker) State() (ConnectivityState, error) {
  212. return 0, fmt.Errorf("State() is not supported for unicastNamingPicker")
  213. }
  214. func (p *unicastNamingPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
  215. return 0, fmt.Errorf("WaitForStateChange is not supported for unicastNamingPciker")
  216. }
  217. func (p *unicastNamingPicker) Close() error {
  218. p.watcher.Close()
  219. p.conn.Close()
  220. return nil
  221. }