pipe.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import (
  6. "errors"
  7. "io"
  8. "sync"
  9. )
  10. // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
  11. // io.Pipe except there are no PipeReader/PipeWriter halves, and the
  12. // underlying buffer is an interface. (io.Pipe is always unbuffered)
  13. type pipe struct {
  14. mu sync.Mutex
  15. c sync.Cond // c.L lazily initialized to &p.mu
  16. b pipeBuffer
  17. err error // read error once empty. non-nil means closed.
  18. breakErr error // immediate read error (caller doesn't see rest of b)
  19. donec chan struct{} // closed on error
  20. readFn func() // optional code to run in Read before error
  21. }
  22. type pipeBuffer interface {
  23. Len() int
  24. io.Writer
  25. io.Reader
  26. }
  27. // Read waits until data is available and copies bytes
  28. // from the buffer into p.
  29. func (p *pipe) Read(d []byte) (n int, err error) {
  30. p.mu.Lock()
  31. defer p.mu.Unlock()
  32. if p.c.L == nil {
  33. p.c.L = &p.mu
  34. }
  35. for {
  36. if p.breakErr != nil {
  37. return 0, p.breakErr
  38. }
  39. if p.b.Len() > 0 {
  40. return p.b.Read(d)
  41. }
  42. if p.err != nil {
  43. if p.readFn != nil {
  44. p.readFn() // e.g. copy trailers
  45. p.readFn = nil // not sticky like p.err
  46. }
  47. return 0, p.err
  48. }
  49. p.c.Wait()
  50. }
  51. }
  52. var errClosedPipeWrite = errors.New("write on closed buffer")
  53. // Write copies bytes from p into the buffer and wakes a reader.
  54. // It is an error to write more data than the buffer can hold.
  55. func (p *pipe) Write(d []byte) (n int, err error) {
  56. p.mu.Lock()
  57. defer p.mu.Unlock()
  58. if p.c.L == nil {
  59. p.c.L = &p.mu
  60. }
  61. defer p.c.Signal()
  62. if p.err != nil {
  63. return 0, errClosedPipeWrite
  64. }
  65. return p.b.Write(d)
  66. }
  67. // CloseWithError causes the next Read (waking up a current blocked
  68. // Read if needed) to return the provided err after all data has been
  69. // read.
  70. //
  71. // The error must be non-nil.
  72. func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
  73. // BreakWithError causes the next Read (waking up a current blocked
  74. // Read if needed) to return the provided err immediately, without
  75. // waiting for unread data.
  76. func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
  77. // closeWithErrorAndCode is like CloseWithError but also sets some code to run
  78. // in the caller's goroutine before returning the error.
  79. func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
  80. func (p *pipe) closeWithError(dst *error, err error, fn func()) {
  81. if err == nil {
  82. panic("err must be non-nil")
  83. }
  84. p.mu.Lock()
  85. defer p.mu.Unlock()
  86. if p.c.L == nil {
  87. p.c.L = &p.mu
  88. }
  89. defer p.c.Signal()
  90. if *dst != nil {
  91. // Already been done.
  92. return
  93. }
  94. p.readFn = fn
  95. *dst = err
  96. p.closeDoneLocked()
  97. }
  98. // requires p.mu be held.
  99. func (p *pipe) closeDoneLocked() {
  100. if p.donec == nil {
  101. return
  102. }
  103. // Close if unclosed. This isn't racy since we always
  104. // hold p.mu while closing.
  105. select {
  106. case <-p.donec:
  107. default:
  108. close(p.donec)
  109. }
  110. }
  111. // Err returns the error (if any) first set by BreakWithError or CloseWithError.
  112. func (p *pipe) Err() error {
  113. p.mu.Lock()
  114. defer p.mu.Unlock()
  115. if p.breakErr != nil {
  116. return p.breakErr
  117. }
  118. return p.err
  119. }
  120. // Done returns a channel which is closed if and when this pipe is closed
  121. // with CloseWithError.
  122. func (p *pipe) Done() <-chan struct{} {
  123. p.mu.Lock()
  124. defer p.mu.Unlock()
  125. if p.donec == nil {
  126. p.donec = make(chan struct{})
  127. if p.err != nil || p.breakErr != nil {
  128. // Already hit an error.
  129. p.closeDoneLocked()
  130. }
  131. }
  132. return p.donec
  133. }