http2_client.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package transport
  34. import (
  35. "bytes"
  36. "errors"
  37. "io"
  38. "math"
  39. "net"
  40. "strings"
  41. "sync"
  42. "time"
  43. "golang.org/x/net/context"
  44. "golang.org/x/net/http2"
  45. "golang.org/x/net/http2/hpack"
  46. "google.golang.org/grpc/codes"
  47. "google.golang.org/grpc/credentials"
  48. "google.golang.org/grpc/grpclog"
  49. "google.golang.org/grpc/metadata"
  50. "google.golang.org/grpc/peer"
  51. )
  52. // http2Client implements the ClientTransport interface with HTTP2.
  53. type http2Client struct {
  54. target string // server name/addr
  55. userAgent string
  56. conn net.Conn // underlying communication channel
  57. authInfo credentials.AuthInfo // auth info about the connection
  58. nextID uint32 // the next stream ID to be used
  59. // writableChan synchronizes write access to the transport.
  60. // A writer acquires the write lock by sending a value on writableChan
  61. // and releases it by receiving from writableChan.
  62. writableChan chan int
  63. // shutdownChan is closed when Close is called.
  64. // Blocking operations should select on shutdownChan to avoid
  65. // blocking forever after Close.
  66. // TODO(zhaoq): Maybe have a channel context?
  67. shutdownChan chan struct{}
  68. // errorChan is closed to notify the I/O error to the caller.
  69. errorChan chan struct{}
  70. framer *framer
  71. hBuf *bytes.Buffer // the buffer for HPACK encoding
  72. hEnc *hpack.Encoder // HPACK encoder
  73. // controlBuf delivers all the control related tasks (e.g., window
  74. // updates, reset streams, and various settings) to the controller.
  75. controlBuf *recvBuffer
  76. fc *inFlow
  77. // sendQuotaPool provides flow control to outbound message.
  78. sendQuotaPool *quotaPool
  79. // streamsQuota limits the max number of concurrent streams.
  80. streamsQuota *quotaPool
  81. // The scheme used: https if TLS is on, http otherwise.
  82. scheme string
  83. authCreds []credentials.Credentials
  84. mu sync.Mutex // guard the following variables
  85. state transportState // the state of underlying connection
  86. activeStreams map[uint32]*Stream
  87. // The max number of concurrent streams
  88. maxStreams int
  89. // the per-stream outbound flow control window size set by the peer.
  90. streamSendQuota uint32
  91. }
  92. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  93. // and starts to receive messages on it. Non-nil error returns if construction
  94. // fails.
  95. func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err error) {
  96. if opts.Dialer == nil {
  97. // Set the default Dialer.
  98. opts.Dialer = func(addr string, timeout time.Duration) (net.Conn, error) {
  99. return net.DialTimeout("tcp", addr, timeout)
  100. }
  101. }
  102. scheme := "http"
  103. startT := time.Now()
  104. timeout := opts.Timeout
  105. conn, connErr := opts.Dialer(addr, timeout)
  106. if connErr != nil {
  107. return nil, ConnectionErrorf("transport: %v", connErr)
  108. }
  109. var authInfo credentials.AuthInfo
  110. for _, c := range opts.AuthOptions {
  111. if ccreds, ok := c.(credentials.TransportAuthenticator); ok {
  112. scheme = "https"
  113. // TODO(zhaoq): Now the first TransportAuthenticator is used if there are
  114. // multiple ones provided. Revisit this if it is not appropriate. Probably
  115. // place the ClientTransport construction into a separate function to make
  116. // things clear.
  117. if timeout > 0 {
  118. timeout -= time.Since(startT)
  119. }
  120. conn, authInfo, connErr = ccreds.ClientHandshake(addr, conn, timeout)
  121. break
  122. }
  123. }
  124. if connErr != nil {
  125. return nil, ConnectionErrorf("transport: %v", connErr)
  126. }
  127. defer func() {
  128. if err != nil {
  129. conn.Close()
  130. }
  131. }()
  132. ua := primaryUA
  133. if opts.UserAgent != "" {
  134. ua = opts.UserAgent + " " + ua
  135. }
  136. var buf bytes.Buffer
  137. t := &http2Client{
  138. target: addr,
  139. userAgent: ua,
  140. conn: conn,
  141. authInfo: authInfo,
  142. // The client initiated stream id is odd starting from 1.
  143. nextID: 1,
  144. writableChan: make(chan int, 1),
  145. shutdownChan: make(chan struct{}),
  146. errorChan: make(chan struct{}),
  147. framer: newFramer(conn),
  148. hBuf: &buf,
  149. hEnc: hpack.NewEncoder(&buf),
  150. controlBuf: newRecvBuffer(),
  151. fc: &inFlow{limit: initialConnWindowSize},
  152. sendQuotaPool: newQuotaPool(defaultWindowSize),
  153. scheme: scheme,
  154. state: reachable,
  155. activeStreams: make(map[uint32]*Stream),
  156. authCreds: opts.AuthOptions,
  157. maxStreams: math.MaxInt32,
  158. streamSendQuota: defaultWindowSize,
  159. }
  160. // Start the reader goroutine for incoming message. Each transport has
  161. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  162. // dispatches the frame to the corresponding stream entity.
  163. go t.reader()
  164. // Send connection preface to server.
  165. n, err := t.conn.Write(clientPreface)
  166. if err != nil {
  167. t.Close()
  168. return nil, ConnectionErrorf("transport: %v", err)
  169. }
  170. if n != len(clientPreface) {
  171. t.Close()
  172. return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  173. }
  174. if initialWindowSize != defaultWindowSize {
  175. err = t.framer.writeSettings(true, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
  176. } else {
  177. err = t.framer.writeSettings(true)
  178. }
  179. if err != nil {
  180. t.Close()
  181. return nil, ConnectionErrorf("transport: %v", err)
  182. }
  183. // Adjust the connection flow control window if needed.
  184. if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
  185. if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
  186. t.Close()
  187. return nil, ConnectionErrorf("transport: %v", err)
  188. }
  189. }
  190. go t.controller()
  191. t.writableChan <- 0
  192. return t, nil
  193. }
  194. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  195. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  196. s := &Stream{
  197. id: t.nextID,
  198. method: callHdr.Method,
  199. sendCompress: callHdr.SendCompress,
  200. buf: newRecvBuffer(),
  201. fc: &inFlow{limit: initialWindowSize},
  202. sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
  203. headerChan: make(chan struct{}),
  204. }
  205. t.nextID += 2
  206. s.windowHandler = func(n int) {
  207. t.updateWindow(s, uint32(n))
  208. }
  209. // Make a stream be able to cancel the pending operations by itself.
  210. s.ctx, s.cancel = context.WithCancel(ctx)
  211. s.dec = &recvBufferReader{
  212. ctx: s.ctx,
  213. recv: s.buf,
  214. }
  215. return s
  216. }
  217. // NewStream creates a stream and register it into the transport as "active"
  218. // streams.
  219. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  220. // Record the timeout value on the context.
  221. var timeout time.Duration
  222. if dl, ok := ctx.Deadline(); ok {
  223. timeout = dl.Sub(time.Now())
  224. }
  225. select {
  226. case <-ctx.Done():
  227. return nil, ContextErr(ctx.Err())
  228. default:
  229. }
  230. pr := &peer.Peer{
  231. Addr: t.conn.RemoteAddr(),
  232. }
  233. // Attach Auth info if there is any.
  234. if t.authInfo != nil {
  235. pr.AuthInfo = t.authInfo
  236. }
  237. ctx = peer.NewContext(ctx, pr)
  238. authData := make(map[string]string)
  239. for _, c := range t.authCreds {
  240. // Construct URI required to get auth request metadata.
  241. var port string
  242. if pos := strings.LastIndex(t.target, ":"); pos != -1 {
  243. // Omit port if it is the default one.
  244. if t.target[pos+1:] != "443" {
  245. port = ":" + t.target[pos+1:]
  246. }
  247. }
  248. pos := strings.LastIndex(callHdr.Method, "/")
  249. if pos == -1 {
  250. return nil, StreamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
  251. }
  252. audience := "https://" + callHdr.Host + port + callHdr.Method[:pos]
  253. data, err := c.GetRequestMetadata(ctx, audience)
  254. if err != nil {
  255. return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err)
  256. }
  257. for k, v := range data {
  258. authData[k] = v
  259. }
  260. }
  261. t.mu.Lock()
  262. if t.state != reachable {
  263. t.mu.Unlock()
  264. return nil, ErrConnClosing
  265. }
  266. checkStreamsQuota := t.streamsQuota != nil
  267. t.mu.Unlock()
  268. if checkStreamsQuota {
  269. sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
  270. if err != nil {
  271. return nil, err
  272. }
  273. // Returns the quota balance back.
  274. if sq > 1 {
  275. t.streamsQuota.add(sq - 1)
  276. }
  277. }
  278. if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
  279. // t.streamsQuota will be updated when t.CloseStream is invoked.
  280. return nil, err
  281. }
  282. t.mu.Lock()
  283. if t.state != reachable {
  284. t.mu.Unlock()
  285. return nil, ErrConnClosing
  286. }
  287. s := t.newStream(ctx, callHdr)
  288. t.activeStreams[s.id] = s
  289. // This stream is not counted when applySetings(...) initialize t.streamsQuota.
  290. // Reset t.streamsQuota to the right value.
  291. var reset bool
  292. if !checkStreamsQuota && t.streamsQuota != nil {
  293. reset = true
  294. }
  295. t.mu.Unlock()
  296. if reset {
  297. t.streamsQuota.reset(-1)
  298. }
  299. // HPACK encodes various headers. Note that once WriteField(...) is
  300. // called, the corresponding headers/continuation frame has to be sent
  301. // because hpack.Encoder is stateful.
  302. t.hBuf.Reset()
  303. t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
  304. t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  305. t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  306. t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  307. t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  308. t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  309. t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
  310. if callHdr.SendCompress != "" {
  311. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  312. }
  313. if timeout > 0 {
  314. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)})
  315. }
  316. for k, v := range authData {
  317. // Capital header names are illegal in HTTP/2.
  318. k = strings.ToLower(k)
  319. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
  320. }
  321. var (
  322. hasMD bool
  323. endHeaders bool
  324. )
  325. if md, ok := metadata.FromContext(ctx); ok {
  326. hasMD = true
  327. for k, v := range md {
  328. for _, entry := range v {
  329. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
  330. }
  331. }
  332. }
  333. first := true
  334. // Sends the headers in a single batch even when they span multiple frames.
  335. for !endHeaders {
  336. size := t.hBuf.Len()
  337. if size > http2MaxFrameLen {
  338. size = http2MaxFrameLen
  339. } else {
  340. endHeaders = true
  341. }
  342. var flush bool
  343. if endHeaders && (hasMD || callHdr.Flush) {
  344. flush = true
  345. }
  346. if first {
  347. // Sends a HeadersFrame to server to start a new stream.
  348. p := http2.HeadersFrameParam{
  349. StreamID: s.id,
  350. BlockFragment: t.hBuf.Next(size),
  351. EndStream: false,
  352. EndHeaders: endHeaders,
  353. }
  354. // Do a force flush for the buffered frames iff it is the last headers frame
  355. // and there is header metadata to be sent. Otherwise, there is flushing until
  356. // the corresponding data frame is written.
  357. err = t.framer.writeHeaders(flush, p)
  358. first = false
  359. } else {
  360. // Sends Continuation frames for the leftover headers.
  361. err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
  362. }
  363. if err != nil {
  364. t.notifyError(err)
  365. return nil, ConnectionErrorf("transport: %v", err)
  366. }
  367. }
  368. t.writableChan <- 0
  369. return s, nil
  370. }
  371. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  372. // This must not be executed in reader's goroutine.
  373. func (t *http2Client) CloseStream(s *Stream, err error) {
  374. var updateStreams bool
  375. t.mu.Lock()
  376. if t.streamsQuota != nil {
  377. updateStreams = true
  378. }
  379. delete(t.activeStreams, s.id)
  380. t.mu.Unlock()
  381. if updateStreams {
  382. t.streamsQuota.add(1)
  383. }
  384. // In case stream sending and receiving are invoked in separate
  385. // goroutines (e.g., bi-directional streaming), the caller needs
  386. // to call cancel on the stream to interrupt the blocking on
  387. // other goroutines.
  388. s.cancel()
  389. s.mu.Lock()
  390. if q := s.fc.resetPendingData(); q > 0 {
  391. if n := t.fc.onRead(q); n > 0 {
  392. t.controlBuf.put(&windowUpdate{0, n})
  393. }
  394. }
  395. if s.state == streamDone {
  396. s.mu.Unlock()
  397. return
  398. }
  399. if !s.headerDone {
  400. close(s.headerChan)
  401. s.headerDone = true
  402. }
  403. s.state = streamDone
  404. s.mu.Unlock()
  405. if _, ok := err.(StreamError); ok {
  406. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
  407. }
  408. }
  409. // Close kicks off the shutdown process of the transport. This should be called
  410. // only once on a transport. Once it is called, the transport should not be
  411. // accessed any more.
  412. func (t *http2Client) Close() (err error) {
  413. t.mu.Lock()
  414. if t.state == reachable {
  415. close(t.errorChan)
  416. }
  417. if t.state == closing {
  418. t.mu.Unlock()
  419. return errors.New("transport: Close() was already called")
  420. }
  421. t.state = closing
  422. t.mu.Unlock()
  423. close(t.shutdownChan)
  424. err = t.conn.Close()
  425. t.mu.Lock()
  426. streams := t.activeStreams
  427. t.activeStreams = nil
  428. t.mu.Unlock()
  429. // Notify all active streams.
  430. for _, s := range streams {
  431. s.mu.Lock()
  432. if !s.headerDone {
  433. close(s.headerChan)
  434. s.headerDone = true
  435. }
  436. s.mu.Unlock()
  437. s.write(recvMsg{err: ErrConnClosing})
  438. }
  439. return
  440. }
  441. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  442. // should proceed only if Write returns nil.
  443. // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
  444. // if it improves the performance.
  445. func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
  446. r := bytes.NewBuffer(data)
  447. for {
  448. var p []byte
  449. if r.Len() > 0 {
  450. size := http2MaxFrameLen
  451. s.sendQuotaPool.add(0)
  452. // Wait until the stream has some quota to send the data.
  453. sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
  454. if err != nil {
  455. return err
  456. }
  457. t.sendQuotaPool.add(0)
  458. // Wait until the transport has some quota to send the data.
  459. tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
  460. if err != nil {
  461. if _, ok := err.(StreamError); ok {
  462. t.sendQuotaPool.cancel()
  463. }
  464. return err
  465. }
  466. if sq < size {
  467. size = sq
  468. }
  469. if tq < size {
  470. size = tq
  471. }
  472. p = r.Next(size)
  473. ps := len(p)
  474. if ps < sq {
  475. // Overbooked stream quota. Return it back.
  476. s.sendQuotaPool.add(sq - ps)
  477. }
  478. if ps < tq {
  479. // Overbooked transport quota. Return it back.
  480. t.sendQuotaPool.add(tq - ps)
  481. }
  482. }
  483. var (
  484. endStream bool
  485. forceFlush bool
  486. )
  487. if opts.Last && r.Len() == 0 {
  488. endStream = true
  489. }
  490. // Indicate there is a writer who is about to write a data frame.
  491. t.framer.adjustNumWriters(1)
  492. // Got some quota. Try to acquire writing privilege on the transport.
  493. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
  494. if _, ok := err.(StreamError); ok {
  495. // Return the connection quota back.
  496. t.sendQuotaPool.add(len(p))
  497. }
  498. if t.framer.adjustNumWriters(-1) == 0 {
  499. // This writer is the last one in this batch and has the
  500. // responsibility to flush the buffered frames. It queues
  501. // a flush request to controlBuf instead of flushing directly
  502. // in order to avoid the race with other writing or flushing.
  503. t.controlBuf.put(&flushIO{})
  504. }
  505. return err
  506. }
  507. select {
  508. case <-s.ctx.Done():
  509. t.sendQuotaPool.add(len(p))
  510. if t.framer.adjustNumWriters(-1) == 0 {
  511. t.controlBuf.put(&flushIO{})
  512. }
  513. t.writableChan <- 0
  514. return ContextErr(s.ctx.Err())
  515. default:
  516. }
  517. if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
  518. // Do a force flush iff this is last frame for the entire gRPC message
  519. // and the caller is the only writer at this moment.
  520. forceFlush = true
  521. }
  522. // If WriteData fails, all the pending streams will be handled
  523. // by http2Client.Close(). No explicit CloseStream() needs to be
  524. // invoked.
  525. if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
  526. t.notifyError(err)
  527. return ConnectionErrorf("transport: %v", err)
  528. }
  529. if t.framer.adjustNumWriters(-1) == 0 {
  530. t.framer.flushWrite()
  531. }
  532. t.writableChan <- 0
  533. if r.Len() == 0 {
  534. break
  535. }
  536. }
  537. if !opts.Last {
  538. return nil
  539. }
  540. s.mu.Lock()
  541. if s.state != streamDone {
  542. if s.state == streamReadDone {
  543. s.state = streamDone
  544. } else {
  545. s.state = streamWriteDone
  546. }
  547. }
  548. s.mu.Unlock()
  549. return nil
  550. }
  551. func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
  552. t.mu.Lock()
  553. defer t.mu.Unlock()
  554. s, ok := t.activeStreams[f.Header().StreamID]
  555. return s, ok
  556. }
  557. // updateWindow adjusts the inbound quota for the stream and the transport.
  558. // Window updates will deliver to the controller for sending when
  559. // the cumulative quota exceeds the corresponding threshold.
  560. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  561. s.mu.Lock()
  562. defer s.mu.Unlock()
  563. if s.state == streamDone {
  564. return
  565. }
  566. if w := t.fc.onRead(n); w > 0 {
  567. t.controlBuf.put(&windowUpdate{0, w})
  568. }
  569. if w := s.fc.onRead(n); w > 0 {
  570. t.controlBuf.put(&windowUpdate{s.id, w})
  571. }
  572. }
  573. func (t *http2Client) handleData(f *http2.DataFrame) {
  574. size := len(f.Data())
  575. if err := t.fc.onData(uint32(size)); err != nil {
  576. t.notifyError(ConnectionErrorf("%v", err))
  577. return
  578. }
  579. // Select the right stream to dispatch.
  580. s, ok := t.getStream(f)
  581. if !ok {
  582. if w := t.fc.onRead(uint32(size)); w > 0 {
  583. t.controlBuf.put(&windowUpdate{0, w})
  584. }
  585. return
  586. }
  587. if size > 0 {
  588. s.mu.Lock()
  589. if s.state == streamDone {
  590. s.mu.Unlock()
  591. // The stream has been closed. Release the corresponding quota.
  592. if w := t.fc.onRead(uint32(size)); w > 0 {
  593. t.controlBuf.put(&windowUpdate{0, w})
  594. }
  595. return
  596. }
  597. if err := s.fc.onData(uint32(size)); err != nil {
  598. s.state = streamDone
  599. s.statusCode = codes.Internal
  600. s.statusDesc = err.Error()
  601. s.mu.Unlock()
  602. s.write(recvMsg{err: io.EOF})
  603. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
  604. return
  605. }
  606. s.mu.Unlock()
  607. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  608. // guarantee f.Data() is consumed before the arrival of next frame.
  609. // Can this copy be eliminated?
  610. data := make([]byte, size)
  611. copy(data, f.Data())
  612. s.write(recvMsg{data: data})
  613. }
  614. // The server has closed the stream without sending trailers. Record that
  615. // the read direction is closed, and set the status appropriately.
  616. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  617. s.mu.Lock()
  618. if s.state == streamWriteDone {
  619. s.state = streamDone
  620. } else {
  621. s.state = streamReadDone
  622. }
  623. s.statusCode = codes.Internal
  624. s.statusDesc = "server closed the stream without sending trailers"
  625. s.mu.Unlock()
  626. s.write(recvMsg{err: io.EOF})
  627. }
  628. }
  629. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  630. s, ok := t.getStream(f)
  631. if !ok {
  632. return
  633. }
  634. s.mu.Lock()
  635. if s.state == streamDone {
  636. s.mu.Unlock()
  637. return
  638. }
  639. s.state = streamDone
  640. if !s.headerDone {
  641. close(s.headerChan)
  642. s.headerDone = true
  643. }
  644. s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)]
  645. if !ok {
  646. grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
  647. s.statusCode = codes.Unknown
  648. }
  649. s.mu.Unlock()
  650. s.write(recvMsg{err: io.EOF})
  651. }
  652. func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
  653. if f.IsAck() {
  654. return
  655. }
  656. var ss []http2.Setting
  657. f.ForeachSetting(func(s http2.Setting) error {
  658. ss = append(ss, s)
  659. return nil
  660. })
  661. // The settings will be applied once the ack is sent.
  662. t.controlBuf.put(&settings{ack: true, ss: ss})
  663. }
  664. func (t *http2Client) handlePing(f *http2.PingFrame) {
  665. pingAck := &ping{ack: true}
  666. copy(pingAck.data[:], f.Data[:])
  667. t.controlBuf.put(pingAck)
  668. }
  669. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  670. // TODO(zhaoq): GoAwayFrame handler to be implemented
  671. }
  672. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  673. id := f.Header().StreamID
  674. incr := f.Increment
  675. if id == 0 {
  676. t.sendQuotaPool.add(int(incr))
  677. return
  678. }
  679. if s, ok := t.getStream(f); ok {
  680. s.sendQuotaPool.add(int(incr))
  681. }
  682. }
  683. // operateHeaders takes action on the decoded headers.
  684. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  685. s, ok := t.getStream(frame)
  686. if !ok {
  687. return
  688. }
  689. var state decodeState
  690. for _, hf := range frame.Fields {
  691. state.processHeaderField(hf)
  692. }
  693. if state.err != nil {
  694. s.write(recvMsg{err: state.err})
  695. // Something wrong. Stops reading even when there is remaining.
  696. return
  697. }
  698. endStream := frame.StreamEnded()
  699. s.mu.Lock()
  700. if !endStream {
  701. s.recvCompress = state.encoding
  702. }
  703. if !s.headerDone {
  704. if !endStream && len(state.mdata) > 0 {
  705. s.header = state.mdata
  706. }
  707. close(s.headerChan)
  708. s.headerDone = true
  709. }
  710. if !endStream || s.state == streamDone {
  711. s.mu.Unlock()
  712. return
  713. }
  714. if len(state.mdata) > 0 {
  715. s.trailer = state.mdata
  716. }
  717. s.state = streamDone
  718. s.statusCode = state.statusCode
  719. s.statusDesc = state.statusDesc
  720. s.mu.Unlock()
  721. s.write(recvMsg{err: io.EOF})
  722. }
  723. func handleMalformedHTTP2(s *Stream, err error) {
  724. s.mu.Lock()
  725. if !s.headerDone {
  726. close(s.headerChan)
  727. s.headerDone = true
  728. }
  729. s.mu.Unlock()
  730. s.write(recvMsg{err: err})
  731. }
  732. // reader runs as a separate goroutine in charge of reading data from network
  733. // connection.
  734. //
  735. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  736. // optimal.
  737. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  738. func (t *http2Client) reader() {
  739. // Check the validity of server preface.
  740. frame, err := t.framer.readFrame()
  741. if err != nil {
  742. t.notifyError(err)
  743. return
  744. }
  745. sf, ok := frame.(*http2.SettingsFrame)
  746. if !ok {
  747. t.notifyError(err)
  748. return
  749. }
  750. t.handleSettings(sf)
  751. // loop to keep reading incoming messages on this transport.
  752. for {
  753. frame, err := t.framer.readFrame()
  754. if err != nil {
  755. // Abort an active stream if the http2.Framer returns a
  756. // http2.StreamError. This can happen only if the server's response
  757. // is malformed http2.
  758. if se, ok := err.(http2.StreamError); ok {
  759. t.mu.Lock()
  760. s := t.activeStreams[se.StreamID]
  761. t.mu.Unlock()
  762. if s != nil {
  763. // use error detail to provide better err message
  764. handleMalformedHTTP2(s, StreamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
  765. }
  766. continue
  767. } else {
  768. // Transport error.
  769. t.notifyError(err)
  770. return
  771. }
  772. }
  773. switch frame := frame.(type) {
  774. case *http2.MetaHeadersFrame:
  775. t.operateHeaders(frame)
  776. case *http2.DataFrame:
  777. t.handleData(frame)
  778. case *http2.RSTStreamFrame:
  779. t.handleRSTStream(frame)
  780. case *http2.SettingsFrame:
  781. t.handleSettings(frame)
  782. case *http2.PingFrame:
  783. t.handlePing(frame)
  784. case *http2.GoAwayFrame:
  785. t.handleGoAway(frame)
  786. case *http2.WindowUpdateFrame:
  787. t.handleWindowUpdate(frame)
  788. default:
  789. grpclog.Printf("transport: http2Client.reader got unhandled frame type %v.", frame)
  790. }
  791. }
  792. }
  793. func (t *http2Client) applySettings(ss []http2.Setting) {
  794. for _, s := range ss {
  795. switch s.ID {
  796. case http2.SettingMaxConcurrentStreams:
  797. // TODO(zhaoq): This is a hack to avoid significant refactoring of the
  798. // code to deal with the unrealistic int32 overflow. Probably will try
  799. // to find a better way to handle this later.
  800. if s.Val > math.MaxInt32 {
  801. s.Val = math.MaxInt32
  802. }
  803. t.mu.Lock()
  804. reset := t.streamsQuota != nil
  805. if !reset {
  806. t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
  807. }
  808. ms := t.maxStreams
  809. t.maxStreams = int(s.Val)
  810. t.mu.Unlock()
  811. if reset {
  812. t.streamsQuota.reset(int(s.Val) - ms)
  813. }
  814. case http2.SettingInitialWindowSize:
  815. t.mu.Lock()
  816. for _, stream := range t.activeStreams {
  817. // Adjust the sending quota for each stream.
  818. stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
  819. }
  820. t.streamSendQuota = s.Val
  821. t.mu.Unlock()
  822. }
  823. }
  824. }
  825. // controller running in a separate goroutine takes charge of sending control
  826. // frames (e.g., window update, reset stream, setting, etc.) to the server.
  827. func (t *http2Client) controller() {
  828. for {
  829. select {
  830. case i := <-t.controlBuf.get():
  831. t.controlBuf.load()
  832. select {
  833. case <-t.writableChan:
  834. switch i := i.(type) {
  835. case *windowUpdate:
  836. t.framer.writeWindowUpdate(true, i.streamID, i.increment)
  837. case *settings:
  838. if i.ack {
  839. t.framer.writeSettingsAck(true)
  840. t.applySettings(i.ss)
  841. } else {
  842. t.framer.writeSettings(true, i.ss...)
  843. }
  844. case *resetStream:
  845. t.framer.writeRSTStream(true, i.streamID, i.code)
  846. case *flushIO:
  847. t.framer.flushWrite()
  848. case *ping:
  849. t.framer.writePing(true, i.ack, i.data)
  850. default:
  851. grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
  852. }
  853. t.writableChan <- 0
  854. continue
  855. case <-t.shutdownChan:
  856. return
  857. }
  858. case <-t.shutdownChan:
  859. return
  860. }
  861. }
  862. }
  863. func (t *http2Client) Error() <-chan struct{} {
  864. return t.errorChan
  865. }
  866. func (t *http2Client) notifyError(err error) {
  867. t.mu.Lock()
  868. defer t.mu.Unlock()
  869. // make sure t.errorChan is closed only once.
  870. if t.state == reachable {
  871. t.state = unreachable
  872. close(t.errorChan)
  873. grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
  874. }
  875. }