123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- /*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- package grpc
- import (
- "container/list"
- "fmt"
- "sync"
- "golang.org/x/net/context"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/naming"
- "google.golang.org/grpc/transport"
- )
- // Picker picks a Conn for RPC requests.
- // This is EXPERIMENTAL and please do not implement your own Picker for now.
- type Picker interface {
- // Init does initial processing for the Picker, e.g., initiate some connections.
- Init(cc *ClientConn) error
- // Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
- // or some error happens.
- Pick(ctx context.Context) (transport.ClientTransport, error)
- // PickAddr picks a peer address for connecting. This will be called repeated for
- // connecting/reconnecting.
- PickAddr() (string, error)
- // State returns the connectivity state of the underlying connections.
- State() (ConnectivityState, error)
- // WaitForStateChange blocks until the state changes to something other than
- // the sourceState. It returns the new state or error.
- WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error)
- // Close closes all the Conn's owned by this Picker.
- Close() error
- }
- // unicastPicker is the default Picker which is used when there is no custom Picker
- // specified by users. It always picks the same Conn.
- type unicastPicker struct {
- target string
- conn *Conn
- }
- func (p *unicastPicker) Init(cc *ClientConn) error {
- c, err := NewConn(cc)
- if err != nil {
- return err
- }
- p.conn = c
- return nil
- }
- func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
- return p.conn.Wait(ctx)
- }
- func (p *unicastPicker) PickAddr() (string, error) {
- return p.target, nil
- }
- func (p *unicastPicker) State() (ConnectivityState, error) {
- return p.conn.State(), nil
- }
- func (p *unicastPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
- return p.conn.WaitForStateChange(ctx, sourceState)
- }
- func (p *unicastPicker) Close() error {
- if p.conn != nil {
- return p.conn.Close()
- }
- return nil
- }
- // unicastNamingPicker picks an address from a name resolver to set up the connection.
- type unicastNamingPicker struct {
- cc *ClientConn
- resolver naming.Resolver
- watcher naming.Watcher
- mu sync.Mutex
- // The list of the addresses are obtained from watcher.
- addrs *list.List
- // It tracks the current picked addr by PickAddr(). The next PickAddr may
- // push it forward on addrs.
- pickedAddr *list.Element
- conn *Conn
- }
- // NewUnicastNamingPicker creates a Picker to pick addresses from a name resolver
- // to connect.
- func NewUnicastNamingPicker(r naming.Resolver) Picker {
- return &unicastNamingPicker{
- resolver: r,
- addrs: list.New(),
- }
- }
- type addrInfo struct {
- addr string
- // Set to true if this addrInfo needs to be deleted in the next PickAddrr() call.
- deleting bool
- }
- // processUpdates calls Watcher.Next() once and processes the obtained updates.
- func (p *unicastNamingPicker) processUpdates() error {
- updates, err := p.watcher.Next()
- if err != nil {
- return err
- }
- for _, update := range updates {
- switch update.Op {
- case naming.Add:
- p.mu.Lock()
- p.addrs.PushBack(&addrInfo{
- addr: update.Addr,
- })
- p.mu.Unlock()
- // Initial connection setup
- if p.conn == nil {
- conn, err := NewConn(p.cc)
- if err != nil {
- return err
- }
- p.conn = conn
- }
- case naming.Delete:
- p.mu.Lock()
- for e := p.addrs.Front(); e != nil; e = e.Next() {
- if update.Addr == e.Value.(*addrInfo).addr {
- if e == p.pickedAddr {
- // Do not remove the element now if it is the current picked
- // one. We leave the deletion to the next PickAddr() call.
- e.Value.(*addrInfo).deleting = true
- // Notify Conn to close it. All the live RPCs on this connection
- // will be aborted.
- p.conn.NotifyReset()
- } else {
- p.addrs.Remove(e)
- }
- }
- }
- p.mu.Unlock()
- default:
- grpclog.Println("Unknown update.Op ", update.Op)
- }
- }
- return nil
- }
- // monitor runs in a standalone goroutine to keep watching name resolution updates until the watcher
- // is closed.
- func (p *unicastNamingPicker) monitor() {
- for {
- if err := p.processUpdates(); err != nil {
- return
- }
- }
- }
- func (p *unicastNamingPicker) Init(cc *ClientConn) error {
- w, err := p.resolver.Resolve(cc.target)
- if err != nil {
- return err
- }
- p.watcher = w
- p.cc = cc
- // Get the initial name resolution.
- if err := p.processUpdates(); err != nil {
- return err
- }
- go p.monitor()
- return nil
- }
- func (p *unicastNamingPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
- return p.conn.Wait(ctx)
- }
- func (p *unicastNamingPicker) PickAddr() (string, error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.pickedAddr == nil {
- p.pickedAddr = p.addrs.Front()
- } else {
- pa := p.pickedAddr
- p.pickedAddr = pa.Next()
- if pa.Value.(*addrInfo).deleting {
- p.addrs.Remove(pa)
- }
- if p.pickedAddr == nil {
- p.pickedAddr = p.addrs.Front()
- }
- }
- if p.pickedAddr == nil {
- return "", fmt.Errorf("there is no address available to pick")
- }
- return p.pickedAddr.Value.(*addrInfo).addr, nil
- }
- func (p *unicastNamingPicker) State() (ConnectivityState, error) {
- return 0, fmt.Errorf("State() is not supported for unicastNamingPicker")
- }
- func (p *unicastNamingPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
- return 0, fmt.Errorf("WaitForStateChange is not supported for unicastNamingPciker")
- }
- func (p *unicastNamingPicker) Close() error {
- p.watcher.Close()
- p.conn.Close()
- return nil
- }
|