blob_writer.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package client
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "time"
  9. "github.com/docker/distribution"
  10. "github.com/docker/distribution/context"
  11. )
  12. type httpBlobUpload struct {
  13. statter distribution.BlobStatter
  14. client *http.Client
  15. uuid string
  16. startedAt time.Time
  17. location string // always the last value of the location header.
  18. offset int64
  19. closed bool
  20. }
  21. func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
  22. panic("Not implemented")
  23. }
  24. func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
  25. if resp.StatusCode == http.StatusNotFound {
  26. return distribution.ErrBlobUploadUnknown
  27. }
  28. return HandleErrorResponse(resp)
  29. }
  30. func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
  31. req, err := http.NewRequest("PATCH", hbu.location, ioutil.NopCloser(r))
  32. if err != nil {
  33. return 0, err
  34. }
  35. defer req.Body.Close()
  36. resp, err := hbu.client.Do(req)
  37. if err != nil {
  38. return 0, err
  39. }
  40. if !SuccessStatus(resp.StatusCode) {
  41. return 0, hbu.handleErrorResponse(resp)
  42. }
  43. hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
  44. hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
  45. if err != nil {
  46. return 0, err
  47. }
  48. rng := resp.Header.Get("Range")
  49. var start, end int64
  50. if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
  51. return 0, err
  52. } else if n != 2 || end < start {
  53. return 0, fmt.Errorf("bad range format: %s", rng)
  54. }
  55. return (end - start + 1), nil
  56. }
  57. func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
  58. req, err := http.NewRequest("PATCH", hbu.location, bytes.NewReader(p))
  59. if err != nil {
  60. return 0, err
  61. }
  62. req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
  63. req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
  64. req.Header.Set("Content-Type", "application/octet-stream")
  65. resp, err := hbu.client.Do(req)
  66. if err != nil {
  67. return 0, err
  68. }
  69. if !SuccessStatus(resp.StatusCode) {
  70. return 0, hbu.handleErrorResponse(resp)
  71. }
  72. hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
  73. hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
  74. if err != nil {
  75. return 0, err
  76. }
  77. rng := resp.Header.Get("Range")
  78. var start, end int
  79. if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
  80. return 0, err
  81. } else if n != 2 || end < start {
  82. return 0, fmt.Errorf("bad range format: %s", rng)
  83. }
  84. return (end - start + 1), nil
  85. }
  86. func (hbu *httpBlobUpload) Size() int64 {
  87. return hbu.offset
  88. }
  89. func (hbu *httpBlobUpload) ID() string {
  90. return hbu.uuid
  91. }
  92. func (hbu *httpBlobUpload) StartedAt() time.Time {
  93. return hbu.startedAt
  94. }
  95. func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
  96. // TODO(dmcgowan): Check if already finished, if so just fetch
  97. req, err := http.NewRequest("PUT", hbu.location, nil)
  98. if err != nil {
  99. return distribution.Descriptor{}, err
  100. }
  101. values := req.URL.Query()
  102. values.Set("digest", desc.Digest.String())
  103. req.URL.RawQuery = values.Encode()
  104. resp, err := hbu.client.Do(req)
  105. if err != nil {
  106. return distribution.Descriptor{}, err
  107. }
  108. defer resp.Body.Close()
  109. if !SuccessStatus(resp.StatusCode) {
  110. return distribution.Descriptor{}, hbu.handleErrorResponse(resp)
  111. }
  112. return hbu.statter.Stat(ctx, desc.Digest)
  113. }
  114. func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
  115. req, err := http.NewRequest("DELETE", hbu.location, nil)
  116. if err != nil {
  117. return err
  118. }
  119. resp, err := hbu.client.Do(req)
  120. if err != nil {
  121. return err
  122. }
  123. defer resp.Body.Close()
  124. if resp.StatusCode == http.StatusNotFound || SuccessStatus(resp.StatusCode) {
  125. return nil
  126. }
  127. return hbu.handleErrorResponse(resp)
  128. }
  129. func (hbu *httpBlobUpload) Close() error {
  130. hbu.closed = true
  131. return nil
  132. }