http2_server.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package transport
  34. import (
  35. "bytes"
  36. "errors"
  37. "io"
  38. "math"
  39. "net"
  40. "strconv"
  41. "sync"
  42. "golang.org/x/net/context"
  43. "golang.org/x/net/http2"
  44. "golang.org/x/net/http2/hpack"
  45. "google.golang.org/grpc/codes"
  46. "google.golang.org/grpc/credentials"
  47. "google.golang.org/grpc/grpclog"
  48. "google.golang.org/grpc/metadata"
  49. "google.golang.org/grpc/peer"
  50. )
  51. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  52. // the stream's state.
  53. var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  54. // http2Server implements the ServerTransport interface with HTTP2.
  55. type http2Server struct {
  56. conn net.Conn
  57. maxStreamID uint32 // max stream ID ever seen
  58. authInfo credentials.AuthInfo // auth info about the connection
  59. // writableChan synchronizes write access to the transport.
  60. // A writer acquires the write lock by receiving a value on writableChan
  61. // and releases it by sending on writableChan.
  62. writableChan chan int
  63. // shutdownChan is closed when Close is called.
  64. // Blocking operations should select on shutdownChan to avoid
  65. // blocking forever after Close.
  66. shutdownChan chan struct{}
  67. framer *framer
  68. hBuf *bytes.Buffer // the buffer for HPACK encoding
  69. hEnc *hpack.Encoder // HPACK encoder
  70. // The max number of concurrent streams.
  71. maxStreams uint32
  72. // controlBuf delivers all the control related tasks (e.g., window
  73. // updates, reset streams, and various settings) to the controller.
  74. controlBuf *recvBuffer
  75. fc *inFlow
  76. // sendQuotaPool provides flow control to outbound message.
  77. sendQuotaPool *quotaPool
  78. mu sync.Mutex // guard the following
  79. state transportState
  80. activeStreams map[uint32]*Stream
  81. // the per-stream outbound flow control window size set by the peer.
  82. streamSendQuota uint32
  83. }
  84. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  85. // returned if something goes wrong.
  86. func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
  87. framer := newFramer(conn)
  88. // Send initial settings as connection preface to client.
  89. var settings []http2.Setting
  90. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  91. // permitted in the HTTP2 spec.
  92. if maxStreams == 0 {
  93. maxStreams = math.MaxUint32
  94. } else {
  95. settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams})
  96. }
  97. if initialWindowSize != defaultWindowSize {
  98. settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
  99. }
  100. if err := framer.writeSettings(true, settings...); err != nil {
  101. return nil, ConnectionErrorf("transport: %v", err)
  102. }
  103. // Adjust the connection flow control window if needed.
  104. if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
  105. if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
  106. return nil, ConnectionErrorf("transport: %v", err)
  107. }
  108. }
  109. var buf bytes.Buffer
  110. t := &http2Server{
  111. conn: conn,
  112. authInfo: authInfo,
  113. framer: framer,
  114. hBuf: &buf,
  115. hEnc: hpack.NewEncoder(&buf),
  116. maxStreams: maxStreams,
  117. controlBuf: newRecvBuffer(),
  118. fc: &inFlow{limit: initialConnWindowSize},
  119. sendQuotaPool: newQuotaPool(defaultWindowSize),
  120. state: reachable,
  121. writableChan: make(chan int, 1),
  122. shutdownChan: make(chan struct{}),
  123. activeStreams: make(map[uint32]*Stream),
  124. streamSendQuota: defaultWindowSize,
  125. }
  126. go t.controller()
  127. t.writableChan <- 0
  128. return t, nil
  129. }
  130. // operateHeader takes action on the decoded headers.
  131. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
  132. buf := newRecvBuffer()
  133. s := &Stream{
  134. id: frame.Header().StreamID,
  135. st: t,
  136. buf: buf,
  137. fc: &inFlow{limit: initialWindowSize},
  138. }
  139. var state decodeState
  140. for _, hf := range frame.Fields {
  141. state.processHeaderField(hf)
  142. }
  143. if err := state.err; err != nil {
  144. if se, ok := err.(StreamError); ok {
  145. t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
  146. }
  147. return
  148. }
  149. if frame.StreamEnded() {
  150. // s is just created by the caller. No lock needed.
  151. s.state = streamReadDone
  152. }
  153. s.recvCompress = state.encoding
  154. if state.timeoutSet {
  155. s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
  156. } else {
  157. s.ctx, s.cancel = context.WithCancel(context.TODO())
  158. }
  159. pr := &peer.Peer{
  160. Addr: t.conn.RemoteAddr(),
  161. }
  162. // Attach Auth info if there is any.
  163. if t.authInfo != nil {
  164. pr.AuthInfo = t.authInfo
  165. }
  166. s.ctx = peer.NewContext(s.ctx, pr)
  167. // Cache the current stream to the context so that the server application
  168. // can find out. Required when the server wants to send some metadata
  169. // back to the client (unary call only).
  170. s.ctx = newContextWithStream(s.ctx, s)
  171. // Attach the received metadata to the context.
  172. if len(state.mdata) > 0 {
  173. s.ctx = metadata.NewContext(s.ctx, state.mdata)
  174. }
  175. s.dec = &recvBufferReader{
  176. ctx: s.ctx,
  177. recv: s.buf,
  178. }
  179. s.recvCompress = state.encoding
  180. s.method = state.method
  181. t.mu.Lock()
  182. if t.state != reachable {
  183. t.mu.Unlock()
  184. return
  185. }
  186. if uint32(len(t.activeStreams)) >= t.maxStreams {
  187. t.mu.Unlock()
  188. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
  189. return
  190. }
  191. s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
  192. t.activeStreams[s.id] = s
  193. t.mu.Unlock()
  194. s.windowHandler = func(n int) {
  195. t.updateWindow(s, uint32(n))
  196. }
  197. handle(s)
  198. }
  199. // HandleStreams receives incoming streams using the given handler. This is
  200. // typically run in a separate goroutine.
  201. func (t *http2Server) HandleStreams(handle func(*Stream)) {
  202. // Check the validity of client preface.
  203. preface := make([]byte, len(clientPreface))
  204. if _, err := io.ReadFull(t.conn, preface); err != nil {
  205. grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  206. t.Close()
  207. return
  208. }
  209. if !bytes.Equal(preface, clientPreface) {
  210. grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  211. t.Close()
  212. return
  213. }
  214. frame, err := t.framer.readFrame()
  215. if err != nil {
  216. grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  217. t.Close()
  218. return
  219. }
  220. sf, ok := frame.(*http2.SettingsFrame)
  221. if !ok {
  222. grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  223. t.Close()
  224. return
  225. }
  226. t.handleSettings(sf)
  227. for {
  228. frame, err := t.framer.readFrame()
  229. if err != nil {
  230. if se, ok := err.(http2.StreamError); ok {
  231. t.mu.Lock()
  232. s := t.activeStreams[se.StreamID]
  233. t.mu.Unlock()
  234. if s != nil {
  235. t.closeStream(s)
  236. }
  237. t.controlBuf.put(&resetStream{se.StreamID, se.Code})
  238. continue
  239. }
  240. t.Close()
  241. return
  242. }
  243. switch frame := frame.(type) {
  244. case *http2.MetaHeadersFrame:
  245. id := frame.Header().StreamID
  246. if id%2 != 1 || id <= t.maxStreamID {
  247. // illegal gRPC stream id.
  248. grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
  249. t.Close()
  250. break
  251. }
  252. t.maxStreamID = id
  253. t.operateHeaders(frame, handle)
  254. case *http2.DataFrame:
  255. t.handleData(frame)
  256. case *http2.RSTStreamFrame:
  257. t.handleRSTStream(frame)
  258. case *http2.SettingsFrame:
  259. t.handleSettings(frame)
  260. case *http2.PingFrame:
  261. t.handlePing(frame)
  262. case *http2.WindowUpdateFrame:
  263. t.handleWindowUpdate(frame)
  264. case *http2.GoAwayFrame:
  265. break
  266. default:
  267. grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  268. }
  269. }
  270. }
  271. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  272. t.mu.Lock()
  273. defer t.mu.Unlock()
  274. if t.activeStreams == nil {
  275. // The transport is closing.
  276. return nil, false
  277. }
  278. s, ok := t.activeStreams[f.Header().StreamID]
  279. if !ok {
  280. // The stream is already done.
  281. return nil, false
  282. }
  283. return s, true
  284. }
  285. // updateWindow adjusts the inbound quota for the stream and the transport.
  286. // Window updates will deliver to the controller for sending when
  287. // the cumulative quota exceeds the corresponding threshold.
  288. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  289. s.mu.Lock()
  290. defer s.mu.Unlock()
  291. if s.state == streamDone {
  292. return
  293. }
  294. if w := t.fc.onRead(n); w > 0 {
  295. t.controlBuf.put(&windowUpdate{0, w})
  296. }
  297. if w := s.fc.onRead(n); w > 0 {
  298. t.controlBuf.put(&windowUpdate{s.id, w})
  299. }
  300. }
  301. func (t *http2Server) handleData(f *http2.DataFrame) {
  302. size := len(f.Data())
  303. if err := t.fc.onData(uint32(size)); err != nil {
  304. grpclog.Printf("transport: http2Server %v", err)
  305. t.Close()
  306. return
  307. }
  308. // Select the right stream to dispatch.
  309. s, ok := t.getStream(f)
  310. if !ok {
  311. if w := t.fc.onRead(uint32(size)); w > 0 {
  312. t.controlBuf.put(&windowUpdate{0, w})
  313. }
  314. return
  315. }
  316. if size > 0 {
  317. s.mu.Lock()
  318. if s.state == streamDone {
  319. s.mu.Unlock()
  320. // The stream has been closed. Release the corresponding quota.
  321. if w := t.fc.onRead(uint32(size)); w > 0 {
  322. t.controlBuf.put(&windowUpdate{0, w})
  323. }
  324. return
  325. }
  326. if err := s.fc.onData(uint32(size)); err != nil {
  327. s.mu.Unlock()
  328. t.closeStream(s)
  329. t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
  330. return
  331. }
  332. s.mu.Unlock()
  333. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  334. // guarantee f.Data() is consumed before the arrival of next frame.
  335. // Can this copy be eliminated?
  336. data := make([]byte, size)
  337. copy(data, f.Data())
  338. s.write(recvMsg{data: data})
  339. }
  340. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  341. // Received the end of stream from the client.
  342. s.mu.Lock()
  343. if s.state != streamDone {
  344. if s.state == streamWriteDone {
  345. s.state = streamDone
  346. } else {
  347. s.state = streamReadDone
  348. }
  349. }
  350. s.mu.Unlock()
  351. s.write(recvMsg{err: io.EOF})
  352. }
  353. }
  354. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  355. s, ok := t.getStream(f)
  356. if !ok {
  357. return
  358. }
  359. t.closeStream(s)
  360. }
  361. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  362. if f.IsAck() {
  363. return
  364. }
  365. var ss []http2.Setting
  366. f.ForeachSetting(func(s http2.Setting) error {
  367. ss = append(ss, s)
  368. return nil
  369. })
  370. // The settings will be applied once the ack is sent.
  371. t.controlBuf.put(&settings{ack: true, ss: ss})
  372. }
  373. func (t *http2Server) handlePing(f *http2.PingFrame) {
  374. pingAck := &ping{ack: true}
  375. copy(pingAck.data[:], f.Data[:])
  376. t.controlBuf.put(pingAck)
  377. }
  378. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  379. id := f.Header().StreamID
  380. incr := f.Increment
  381. if id == 0 {
  382. t.sendQuotaPool.add(int(incr))
  383. return
  384. }
  385. if s, ok := t.getStream(f); ok {
  386. s.sendQuotaPool.add(int(incr))
  387. }
  388. }
  389. func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
  390. first := true
  391. endHeaders := false
  392. var err error
  393. // Sends the headers in a single batch.
  394. for !endHeaders {
  395. size := t.hBuf.Len()
  396. if size > http2MaxFrameLen {
  397. size = http2MaxFrameLen
  398. } else {
  399. endHeaders = true
  400. }
  401. if first {
  402. p := http2.HeadersFrameParam{
  403. StreamID: s.id,
  404. BlockFragment: b.Next(size),
  405. EndStream: endStream,
  406. EndHeaders: endHeaders,
  407. }
  408. err = t.framer.writeHeaders(endHeaders, p)
  409. first = false
  410. } else {
  411. err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
  412. }
  413. if err != nil {
  414. t.Close()
  415. return ConnectionErrorf("transport: %v", err)
  416. }
  417. }
  418. return nil
  419. }
  420. // WriteHeader sends the header metedata md back to the client.
  421. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  422. s.mu.Lock()
  423. if s.headerOk || s.state == streamDone {
  424. s.mu.Unlock()
  425. return ErrIllegalHeaderWrite
  426. }
  427. s.headerOk = true
  428. s.mu.Unlock()
  429. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
  430. return err
  431. }
  432. t.hBuf.Reset()
  433. t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
  434. t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  435. if s.sendCompress != "" {
  436. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  437. }
  438. for k, v := range md {
  439. for _, entry := range v {
  440. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
  441. }
  442. }
  443. if err := t.writeHeaders(s, t.hBuf, false); err != nil {
  444. return err
  445. }
  446. t.writableChan <- 0
  447. return nil
  448. }
  449. // WriteStatus sends stream status to the client and terminates the stream.
  450. // There is no further I/O operations being able to perform on this stream.
  451. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  452. // OK is adopted.
  453. func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
  454. var headersSent bool
  455. s.mu.Lock()
  456. if s.state == streamDone {
  457. s.mu.Unlock()
  458. return nil
  459. }
  460. if s.headerOk {
  461. headersSent = true
  462. }
  463. s.mu.Unlock()
  464. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
  465. return err
  466. }
  467. t.hBuf.Reset()
  468. if !headersSent {
  469. t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
  470. t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  471. }
  472. t.hEnc.WriteField(
  473. hpack.HeaderField{
  474. Name: "grpc-status",
  475. Value: strconv.Itoa(int(statusCode)),
  476. })
  477. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc})
  478. // Attach the trailer metadata.
  479. for k, v := range s.trailer {
  480. for _, entry := range v {
  481. t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
  482. }
  483. }
  484. if err := t.writeHeaders(s, t.hBuf, true); err != nil {
  485. t.Close()
  486. return err
  487. }
  488. t.closeStream(s)
  489. t.writableChan <- 0
  490. return nil
  491. }
  492. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  493. // is returns if it fails (e.g., framing error, transport error).
  494. func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
  495. // TODO(zhaoq): Support multi-writers for a single stream.
  496. var writeHeaderFrame bool
  497. s.mu.Lock()
  498. if s.state == streamDone {
  499. s.mu.Unlock()
  500. return StreamErrorf(codes.Unknown, "the stream has been done")
  501. }
  502. if !s.headerOk {
  503. writeHeaderFrame = true
  504. s.headerOk = true
  505. }
  506. s.mu.Unlock()
  507. if writeHeaderFrame {
  508. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
  509. return err
  510. }
  511. t.hBuf.Reset()
  512. t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
  513. t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
  514. if s.sendCompress != "" {
  515. t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  516. }
  517. p := http2.HeadersFrameParam{
  518. StreamID: s.id,
  519. BlockFragment: t.hBuf.Bytes(),
  520. EndHeaders: true,
  521. }
  522. if err := t.framer.writeHeaders(false, p); err != nil {
  523. t.Close()
  524. return ConnectionErrorf("transport: %v", err)
  525. }
  526. t.writableChan <- 0
  527. }
  528. r := bytes.NewBuffer(data)
  529. for {
  530. if r.Len() == 0 {
  531. return nil
  532. }
  533. size := http2MaxFrameLen
  534. s.sendQuotaPool.add(0)
  535. // Wait until the stream has some quota to send the data.
  536. sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
  537. if err != nil {
  538. return err
  539. }
  540. t.sendQuotaPool.add(0)
  541. // Wait until the transport has some quota to send the data.
  542. tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
  543. if err != nil {
  544. if _, ok := err.(StreamError); ok {
  545. t.sendQuotaPool.cancel()
  546. }
  547. return err
  548. }
  549. if sq < size {
  550. size = sq
  551. }
  552. if tq < size {
  553. size = tq
  554. }
  555. p := r.Next(size)
  556. ps := len(p)
  557. if ps < sq {
  558. // Overbooked stream quota. Return it back.
  559. s.sendQuotaPool.add(sq - ps)
  560. }
  561. if ps < tq {
  562. // Overbooked transport quota. Return it back.
  563. t.sendQuotaPool.add(tq - ps)
  564. }
  565. t.framer.adjustNumWriters(1)
  566. // Got some quota. Try to acquire writing privilege on the
  567. // transport.
  568. if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
  569. if _, ok := err.(StreamError); ok {
  570. // Return the connection quota back.
  571. t.sendQuotaPool.add(ps)
  572. }
  573. if t.framer.adjustNumWriters(-1) == 0 {
  574. // This writer is the last one in this batch and has the
  575. // responsibility to flush the buffered frames. It queues
  576. // a flush request to controlBuf instead of flushing directly
  577. // in order to avoid the race with other writing or flushing.
  578. t.controlBuf.put(&flushIO{})
  579. }
  580. return err
  581. }
  582. select {
  583. case <-s.ctx.Done():
  584. t.sendQuotaPool.add(ps)
  585. if t.framer.adjustNumWriters(-1) == 0 {
  586. t.controlBuf.put(&flushIO{})
  587. }
  588. t.writableChan <- 0
  589. return ContextErr(s.ctx.Err())
  590. default:
  591. }
  592. var forceFlush bool
  593. if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
  594. forceFlush = true
  595. }
  596. if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
  597. t.Close()
  598. return ConnectionErrorf("transport: %v", err)
  599. }
  600. if t.framer.adjustNumWriters(-1) == 0 {
  601. t.framer.flushWrite()
  602. }
  603. t.writableChan <- 0
  604. }
  605. }
  606. func (t *http2Server) applySettings(ss []http2.Setting) {
  607. for _, s := range ss {
  608. if s.ID == http2.SettingInitialWindowSize {
  609. t.mu.Lock()
  610. defer t.mu.Unlock()
  611. for _, stream := range t.activeStreams {
  612. stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
  613. }
  614. t.streamSendQuota = s.Val
  615. }
  616. }
  617. }
  618. // controller running in a separate goroutine takes charge of sending control
  619. // frames (e.g., window update, reset stream, setting, etc.) to the server.
  620. func (t *http2Server) controller() {
  621. for {
  622. select {
  623. case i := <-t.controlBuf.get():
  624. t.controlBuf.load()
  625. select {
  626. case <-t.writableChan:
  627. switch i := i.(type) {
  628. case *windowUpdate:
  629. t.framer.writeWindowUpdate(true, i.streamID, i.increment)
  630. case *settings:
  631. if i.ack {
  632. t.framer.writeSettingsAck(true)
  633. t.applySettings(i.ss)
  634. } else {
  635. t.framer.writeSettings(true, i.ss...)
  636. }
  637. case *resetStream:
  638. t.framer.writeRSTStream(true, i.streamID, i.code)
  639. case *flushIO:
  640. t.framer.flushWrite()
  641. case *ping:
  642. t.framer.writePing(true, i.ack, i.data)
  643. default:
  644. grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
  645. }
  646. t.writableChan <- 0
  647. continue
  648. case <-t.shutdownChan:
  649. return
  650. }
  651. case <-t.shutdownChan:
  652. return
  653. }
  654. }
  655. }
  656. // Close starts shutting down the http2Server transport.
  657. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  658. // could cause some resource issue. Revisit this later.
  659. func (t *http2Server) Close() (err error) {
  660. t.mu.Lock()
  661. if t.state == closing {
  662. t.mu.Unlock()
  663. return errors.New("transport: Close() was already called")
  664. }
  665. t.state = closing
  666. streams := t.activeStreams
  667. t.activeStreams = nil
  668. t.mu.Unlock()
  669. close(t.shutdownChan)
  670. err = t.conn.Close()
  671. // Cancel all active streams.
  672. for _, s := range streams {
  673. s.cancel()
  674. }
  675. return
  676. }
  677. // closeStream clears the footprint of a stream when the stream is not needed
  678. // any more.
  679. func (t *http2Server) closeStream(s *Stream) {
  680. t.mu.Lock()
  681. delete(t.activeStreams, s.id)
  682. t.mu.Unlock()
  683. // In case stream sending and receiving are invoked in separate
  684. // goroutines (e.g., bi-directional streaming), cancel needs to be
  685. // called to interrupt the potential blocking on other goroutines.
  686. s.cancel()
  687. s.mu.Lock()
  688. if q := s.fc.resetPendingData(); q > 0 {
  689. if w := t.fc.onRead(q); w > 0 {
  690. t.controlBuf.put(&windowUpdate{0, w})
  691. }
  692. }
  693. if s.state == streamDone {
  694. s.mu.Unlock()
  695. return
  696. }
  697. s.state = streamDone
  698. s.mu.Unlock()
  699. }
  700. func (t *http2Server) RemoteAddr() net.Addr {
  701. return t.conn.RemoteAddr()
  702. }