123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736 |
- /*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- package transport
- import (
- "bytes"
- "errors"
- "io"
- "math"
- "net"
- "strconv"
- "sync"
- "golang.org/x/net/context"
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/hpack"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
- )
- // ErrIllegalHeaderWrite indicates that setting header is illegal because of
- // the stream's state.
- var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
- // http2Server implements the ServerTransport interface with HTTP2.
- type http2Server struct {
- conn net.Conn
- maxStreamID uint32 // max stream ID ever seen
- authInfo credentials.AuthInfo // auth info about the connection
- // writableChan synchronizes write access to the transport.
- // A writer acquires the write lock by receiving a value on writableChan
- // and releases it by sending on writableChan.
- writableChan chan int
- // shutdownChan is closed when Close is called.
- // Blocking operations should select on shutdownChan to avoid
- // blocking forever after Close.
- shutdownChan chan struct{}
- framer *framer
- hBuf *bytes.Buffer // the buffer for HPACK encoding
- hEnc *hpack.Encoder // HPACK encoder
- // The max number of concurrent streams.
- maxStreams uint32
- // controlBuf delivers all the control related tasks (e.g., window
- // updates, reset streams, and various settings) to the controller.
- controlBuf *recvBuffer
- fc *inFlow
- // sendQuotaPool provides flow control to outbound message.
- sendQuotaPool *quotaPool
- mu sync.Mutex // guard the following
- state transportState
- activeStreams map[uint32]*Stream
- // the per-stream outbound flow control window size set by the peer.
- streamSendQuota uint32
- }
- // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
- // returned if something goes wrong.
- func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
- framer := newFramer(conn)
- // Send initial settings as connection preface to client.
- var settings []http2.Setting
- // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
- // permitted in the HTTP2 spec.
- if maxStreams == 0 {
- maxStreams = math.MaxUint32
- } else {
- settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams})
- }
- if initialWindowSize != defaultWindowSize {
- settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
- }
- if err := framer.writeSettings(true, settings...); err != nil {
- return nil, ConnectionErrorf("transport: %v", err)
- }
- // Adjust the connection flow control window if needed.
- if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
- if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
- return nil, ConnectionErrorf("transport: %v", err)
- }
- }
- var buf bytes.Buffer
- t := &http2Server{
- conn: conn,
- authInfo: authInfo,
- framer: framer,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- maxStreams: maxStreams,
- controlBuf: newRecvBuffer(),
- fc: &inFlow{limit: initialConnWindowSize},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
- state: reachable,
- writableChan: make(chan int, 1),
- shutdownChan: make(chan struct{}),
- activeStreams: make(map[uint32]*Stream),
- streamSendQuota: defaultWindowSize,
- }
- go t.controller()
- t.writableChan <- 0
- return t, nil
- }
- // operateHeader takes action on the decoded headers.
- func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
- buf := newRecvBuffer()
- s := &Stream{
- id: frame.Header().StreamID,
- st: t,
- buf: buf,
- fc: &inFlow{limit: initialWindowSize},
- }
- var state decodeState
- for _, hf := range frame.Fields {
- state.processHeaderField(hf)
- }
- if err := state.err; err != nil {
- if se, ok := err.(StreamError); ok {
- t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
- }
- return
- }
- if frame.StreamEnded() {
- // s is just created by the caller. No lock needed.
- s.state = streamReadDone
- }
- s.recvCompress = state.encoding
- if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
- } else {
- s.ctx, s.cancel = context.WithCancel(context.TODO())
- }
- pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
- }
- // Attach Auth info if there is any.
- if t.authInfo != nil {
- pr.AuthInfo = t.authInfo
- }
- s.ctx = peer.NewContext(s.ctx, pr)
- // Cache the current stream to the context so that the server application
- // can find out. Required when the server wants to send some metadata
- // back to the client (unary call only).
- s.ctx = newContextWithStream(s.ctx, s)
- // Attach the received metadata to the context.
- if len(state.mdata) > 0 {
- s.ctx = metadata.NewContext(s.ctx, state.mdata)
- }
- s.dec = &recvBufferReader{
- ctx: s.ctx,
- recv: s.buf,
- }
- s.recvCompress = state.encoding
- s.method = state.method
- t.mu.Lock()
- if t.state != reachable {
- t.mu.Unlock()
- return
- }
- if uint32(len(t.activeStreams)) >= t.maxStreams {
- t.mu.Unlock()
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
- return
- }
- s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
- t.activeStreams[s.id] = s
- t.mu.Unlock()
- s.windowHandler = func(n int) {
- t.updateWindow(s, uint32(n))
- }
- handle(s)
- }
- // HandleStreams receives incoming streams using the given handler. This is
- // typically run in a separate goroutine.
- func (t *http2Server) HandleStreams(handle func(*Stream)) {
- // Check the validity of client preface.
- preface := make([]byte, len(clientPreface))
- if _, err := io.ReadFull(t.conn, preface); err != nil {
- grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
- t.Close()
- return
- }
- if !bytes.Equal(preface, clientPreface) {
- grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
- t.Close()
- return
- }
- frame, err := t.framer.readFrame()
- if err != nil {
- grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
- t.Close()
- return
- }
- sf, ok := frame.(*http2.SettingsFrame)
- if !ok {
- grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
- t.Close()
- return
- }
- t.handleSettings(sf)
- for {
- frame, err := t.framer.readFrame()
- if err != nil {
- if se, ok := err.(http2.StreamError); ok {
- t.mu.Lock()
- s := t.activeStreams[se.StreamID]
- t.mu.Unlock()
- if s != nil {
- t.closeStream(s)
- }
- t.controlBuf.put(&resetStream{se.StreamID, se.Code})
- continue
- }
- t.Close()
- return
- }
- switch frame := frame.(type) {
- case *http2.MetaHeadersFrame:
- id := frame.Header().StreamID
- if id%2 != 1 || id <= t.maxStreamID {
- // illegal gRPC stream id.
- grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
- t.Close()
- break
- }
- t.maxStreamID = id
- t.operateHeaders(frame, handle)
- case *http2.DataFrame:
- t.handleData(frame)
- case *http2.RSTStreamFrame:
- t.handleRSTStream(frame)
- case *http2.SettingsFrame:
- t.handleSettings(frame)
- case *http2.PingFrame:
- t.handlePing(frame)
- case *http2.WindowUpdateFrame:
- t.handleWindowUpdate(frame)
- case *http2.GoAwayFrame:
- break
- default:
- grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
- }
- }
- }
- func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
- t.mu.Lock()
- defer t.mu.Unlock()
- if t.activeStreams == nil {
- // The transport is closing.
- return nil, false
- }
- s, ok := t.activeStreams[f.Header().StreamID]
- if !ok {
- // The stream is already done.
- return nil, false
- }
- return s, true
- }
- // updateWindow adjusts the inbound quota for the stream and the transport.
- // Window updates will deliver to the controller for sending when
- // the cumulative quota exceeds the corresponding threshold.
- func (t *http2Server) updateWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
- if w := t.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- if w := s.fc.onRead(n); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
- }
- }
- func (t *http2Server) handleData(f *http2.DataFrame) {
- size := len(f.Data())
- if err := t.fc.onData(uint32(size)); err != nil {
- grpclog.Printf("transport: http2Server %v", err)
- t.Close()
- return
- }
- // Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- return
- }
- if size > 0 {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- // The stream has been closed. Release the corresponding quota.
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- return
- }
- if err := s.fc.onData(uint32(size)); err != nil {
- s.mu.Unlock()
- t.closeStream(s)
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
- return
- }
- s.mu.Unlock()
- // TODO(bradfitz, zhaoq): A copy is required here because there is no
- // guarantee f.Data() is consumed before the arrival of next frame.
- // Can this copy be eliminated?
- data := make([]byte, size)
- copy(data, f.Data())
- s.write(recvMsg{data: data})
- }
- if f.Header().Flags.Has(http2.FlagDataEndStream) {
- // Received the end of stream from the client.
- s.mu.Lock()
- if s.state != streamDone {
- if s.state == streamWriteDone {
- s.state = streamDone
- } else {
- s.state = streamReadDone
- }
- }
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
- }
- }
- func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
- return
- }
- t.closeStream(s)
- }
- func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
- if f.IsAck() {
- return
- }
- var ss []http2.Setting
- f.ForeachSetting(func(s http2.Setting) error {
- ss = append(ss, s)
- return nil
- })
- // The settings will be applied once the ack is sent.
- t.controlBuf.put(&settings{ack: true, ss: ss})
- }
- func (t *http2Server) handlePing(f *http2.PingFrame) {
- pingAck := &ping{ack: true}
- copy(pingAck.data[:], f.Data[:])
- t.controlBuf.put(pingAck)
- }
- func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
- id := f.Header().StreamID
- incr := f.Increment
- if id == 0 {
- t.sendQuotaPool.add(int(incr))
- return
- }
- if s, ok := t.getStream(f); ok {
- s.sendQuotaPool.add(int(incr))
- }
- }
- func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
- first := true
- endHeaders := false
- var err error
- // Sends the headers in a single batch.
- for !endHeaders {
- size := t.hBuf.Len()
- if size > http2MaxFrameLen {
- size = http2MaxFrameLen
- } else {
- endHeaders = true
- }
- if first {
- p := http2.HeadersFrameParam{
- StreamID: s.id,
- BlockFragment: b.Next(size),
- EndStream: endStream,
- EndHeaders: endHeaders,
- }
- err = t.framer.writeHeaders(endHeaders, p)
- first = false
- } else {
- err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
- }
- if err != nil {
- t.Close()
- return ConnectionErrorf("transport: %v", err)
- }
- }
- return nil
- }
- // WriteHeader sends the header metedata md back to the client.
- func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
- s.mu.Lock()
- if s.headerOk || s.state == streamDone {
- s.mu.Unlock()
- return ErrIllegalHeaderWrite
- }
- s.headerOk = true
- s.mu.Unlock()
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
- return err
- }
- t.hBuf.Reset()
- t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- if s.sendCompress != "" {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
- }
- for k, v := range md {
- for _, entry := range v {
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
- }
- }
- if err := t.writeHeaders(s, t.hBuf, false); err != nil {
- return err
- }
- t.writableChan <- 0
- return nil
- }
- // WriteStatus sends stream status to the client and terminates the stream.
- // There is no further I/O operations being able to perform on this stream.
- // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
- // OK is adopted.
- func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
- var headersSent bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return nil
- }
- if s.headerOk {
- headersSent = true
- }
- s.mu.Unlock()
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
- return err
- }
- t.hBuf.Reset()
- if !headersSent {
- t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- }
- t.hEnc.WriteField(
- hpack.HeaderField{
- Name: "grpc-status",
- Value: strconv.Itoa(int(statusCode)),
- })
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc})
- // Attach the trailer metadata.
- for k, v := range s.trailer {
- for _, entry := range v {
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
- }
- }
- if err := t.writeHeaders(s, t.hBuf, true); err != nil {
- t.Close()
- return err
- }
- t.closeStream(s)
- t.writableChan <- 0
- return nil
- }
- // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
- // is returns if it fails (e.g., framing error, transport error).
- func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
- // TODO(zhaoq): Support multi-writers for a single stream.
- var writeHeaderFrame bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return StreamErrorf(codes.Unknown, "the stream has been done")
- }
- if !s.headerOk {
- writeHeaderFrame = true
- s.headerOk = true
- }
- s.mu.Unlock()
- if writeHeaderFrame {
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
- return err
- }
- t.hBuf.Reset()
- t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- if s.sendCompress != "" {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
- }
- p := http2.HeadersFrameParam{
- StreamID: s.id,
- BlockFragment: t.hBuf.Bytes(),
- EndHeaders: true,
- }
- if err := t.framer.writeHeaders(false, p); err != nil {
- t.Close()
- return ConnectionErrorf("transport: %v", err)
- }
- t.writableChan <- 0
- }
- r := bytes.NewBuffer(data)
- for {
- if r.Len() == 0 {
- return nil
- }
- size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
- // Wait until the stream has some quota to send the data.
- sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
- if err != nil {
- return err
- }
- t.sendQuotaPool.add(0)
- // Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
- if err != nil {
- if _, ok := err.(StreamError); ok {
- t.sendQuotaPool.cancel()
- }
- return err
- }
- if sq < size {
- size = sq
- }
- if tq < size {
- size = tq
- }
- p := r.Next(size)
- ps := len(p)
- if ps < sq {
- // Overbooked stream quota. Return it back.
- s.sendQuotaPool.add(sq - ps)
- }
- if ps < tq {
- // Overbooked transport quota. Return it back.
- t.sendQuotaPool.add(tq - ps)
- }
- t.framer.adjustNumWriters(1)
- // Got some quota. Try to acquire writing privilege on the
- // transport.
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
- if _, ok := err.(StreamError); ok {
- // Return the connection quota back.
- t.sendQuotaPool.add(ps)
- }
- if t.framer.adjustNumWriters(-1) == 0 {
- // This writer is the last one in this batch and has the
- // responsibility to flush the buffered frames. It queues
- // a flush request to controlBuf instead of flushing directly
- // in order to avoid the race with other writing or flushing.
- t.controlBuf.put(&flushIO{})
- }
- return err
- }
- select {
- case <-s.ctx.Done():
- t.sendQuotaPool.add(ps)
- if t.framer.adjustNumWriters(-1) == 0 {
- t.controlBuf.put(&flushIO{})
- }
- t.writableChan <- 0
- return ContextErr(s.ctx.Err())
- default:
- }
- var forceFlush bool
- if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
- forceFlush = true
- }
- if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
- t.Close()
- return ConnectionErrorf("transport: %v", err)
- }
- if t.framer.adjustNumWriters(-1) == 0 {
- t.framer.flushWrite()
- }
- t.writableChan <- 0
- }
- }
- func (t *http2Server) applySettings(ss []http2.Setting) {
- for _, s := range ss {
- if s.ID == http2.SettingInitialWindowSize {
- t.mu.Lock()
- defer t.mu.Unlock()
- for _, stream := range t.activeStreams {
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
- }
- t.streamSendQuota = s.Val
- }
- }
- }
- // controller running in a separate goroutine takes charge of sending control
- // frames (e.g., window update, reset stream, setting, etc.) to the server.
- func (t *http2Server) controller() {
- for {
- select {
- case i := <-t.controlBuf.get():
- t.controlBuf.load()
- select {
- case <-t.writableChan:
- switch i := i.(type) {
- case *windowUpdate:
- t.framer.writeWindowUpdate(true, i.streamID, i.increment)
- case *settings:
- if i.ack {
- t.framer.writeSettingsAck(true)
- t.applySettings(i.ss)
- } else {
- t.framer.writeSettings(true, i.ss...)
- }
- case *resetStream:
- t.framer.writeRSTStream(true, i.streamID, i.code)
- case *flushIO:
- t.framer.flushWrite()
- case *ping:
- t.framer.writePing(true, i.ack, i.data)
- default:
- grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
- }
- t.writableChan <- 0
- continue
- case <-t.shutdownChan:
- return
- }
- case <-t.shutdownChan:
- return
- }
- }
- }
- // Close starts shutting down the http2Server transport.
- // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
- // could cause some resource issue. Revisit this later.
- func (t *http2Server) Close() (err error) {
- t.mu.Lock()
- if t.state == closing {
- t.mu.Unlock()
- return errors.New("transport: Close() was already called")
- }
- t.state = closing
- streams := t.activeStreams
- t.activeStreams = nil
- t.mu.Unlock()
- close(t.shutdownChan)
- err = t.conn.Close()
- // Cancel all active streams.
- for _, s := range streams {
- s.cancel()
- }
- return
- }
- // closeStream clears the footprint of a stream when the stream is not needed
- // any more.
- func (t *http2Server) closeStream(s *Stream) {
- t.mu.Lock()
- delete(t.activeStreams, s.id)
- t.mu.Unlock()
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), cancel needs to be
- // called to interrupt the potential blocking on other goroutines.
- s.cancel()
- s.mu.Lock()
- if q := s.fc.resetPendingData(); q > 0 {
- if w := t.fc.onRead(q); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
- }
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- s.state = streamDone
- s.mu.Unlock()
- }
- func (t *http2Server) RemoteAddr() net.Addr {
- return t.conn.RemoteAddr()
- }
|