4
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2025-07-02 02:16:18 +00:00

Bump github.com/mattermost/mattermost-server/v6 from 6.1.0 to 6.3.0 (#1686)

Bumps [github.com/mattermost/mattermost-server/v6](https://github.com/mattermost/mattermost-server) from 6.1.0 to 6.3.0.
- [Release notes](https://github.com/mattermost/mattermost-server/releases)
- [Changelog](https://github.com/mattermost/mattermost-server/blob/master/CHANGELOG.md)
- [Commits](https://github.com/mattermost/mattermost-server/compare/v6.1.0...v6.3.0)

---
updated-dependencies:
- dependency-name: github.com/mattermost/mattermost-server/v6
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
dependabot[bot]
2022-01-18 20:24:14 +01:00
committed by GitHub
parent fecca57507
commit aad60c882e
250 changed files with 43143 additions and 11479 deletions

View File

@ -140,25 +140,29 @@ func buildRootHuffmanNode() {
panic("unexpected size")
}
lazyRootHuffmanNode = newInternalNode()
for i, code := range huffmanCodes {
addDecoderNode(byte(i), code, huffmanCodeLen[i])
}
}
// allocate a leaf node for each of the 256 symbols
leaves := new([256]node)
func addDecoderNode(sym byte, code uint32, codeLen uint8) {
cur := lazyRootHuffmanNode
for codeLen > 8 {
codeLen -= 8
i := uint8(code >> codeLen)
if cur.children[i] == nil {
cur.children[i] = newInternalNode()
for sym, code := range huffmanCodes {
codeLen := huffmanCodeLen[sym]
cur := lazyRootHuffmanNode
for codeLen > 8 {
codeLen -= 8
i := uint8(code >> codeLen)
if cur.children[i] == nil {
cur.children[i] = newInternalNode()
}
cur = cur.children[i]
}
shift := 8 - codeLen
start, end := int(uint8(code<<shift)), int(1<<shift)
leaves[sym].sym = byte(sym)
leaves[sym].codeLen = codeLen
for i := start; i < start+end; i++ {
cur.children[i] = &leaves[sym]
}
cur = cur.children[i]
}
shift := 8 - codeLen
start, end := int(uint8(code<<shift)), int(1<<shift)
for i := start; i < start+end; i++ {
cur.children[i] = &node{sym: sym, codeLen: codeLen}
}
}

View File

@ -24,6 +24,7 @@ import (
"net/http"
"net/http/httptrace"
"net/textproto"
"os"
"sort"
"strconv"
"strings"
@ -130,6 +131,11 @@ type Transport struct {
// Defaults to 15s.
PingTimeout time.Duration
// WriteByteTimeout is the timeout after which the connection will be
// closed no data can be written to it. The timeout begins when data is
// available to write, and is extended whenever any bytes are written.
WriteByteTimeout time.Duration
// CountError, if non-nil, is called on HTTP/2 transport errors.
// It's intended to increment a metric for monitoring, such
// as an expvar or Prometheus metric.
@ -300,12 +306,17 @@ type ClientConn struct {
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type clientStream struct {
cc *ClientConn
req *http.Request
cc *ClientConn
// Fields of Request that we may access even after the response body is closed.
ctx context.Context
reqCancel <-chan struct{}
trace *httptrace.ClientTrace // or nil
ID uint32
bufPipe pipe // buffered pipe with the flow-controlled response payload
requestedGzip bool
isHead bool
abortOnce sync.Once
abort chan struct{} // closed to signal stream should end immediately
@ -322,7 +333,10 @@ type clientStream struct {
inflow flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
reqBody io.ReadCloser
reqBodyContentLength int64 // -1 means unknown
reqBodyClosed bool // body has been closed; guarded by cc.mu
// owned by writeRequest:
sentEndStream bool // sent an END_STREAM flag to the peer
@ -362,6 +376,10 @@ func (cs *clientStream) abortStreamLocked(err error) {
cs.abortErr = err
close(cs.abort)
})
if cs.reqBody != nil && !cs.reqBodyClosed {
cs.reqBody.Close()
cs.reqBodyClosed = true
}
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
@ -369,31 +387,43 @@ func (cs *clientStream) abortStreamLocked(err error) {
}
}
func (cs *clientStream) abortRequestBodyWrite(err error) {
if err == nil {
panic("nil error")
}
func (cs *clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
if cs.stopReqBody == nil {
cs.stopReqBody = err
defer cc.mu.Unlock()
if cs.reqBody != nil && !cs.reqBodyClosed {
cs.reqBody.Close()
cs.reqBodyClosed = true
cc.cond.Broadcast()
}
cc.mu.Unlock()
}
type stickyErrWriter struct {
w io.Writer
err *error
conn net.Conn
timeout time.Duration
err *error
}
func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
if *sew.err != nil {
return 0, *sew.err
}
n, err = sew.w.Write(p)
*sew.err = err
return
for {
if sew.timeout != 0 {
sew.conn.SetWriteDeadline(time.Now().Add(sew.timeout))
}
nn, err := sew.conn.Write(p[n:])
n += nn
if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) {
// Keep extending the deadline so long as we're making progress.
continue
}
if sew.timeout != 0 {
sew.conn.SetWriteDeadline(time.Time{})
}
*sew.err = err
return n, err
}
}
// noCachedConnError is the concrete type of ErrNoCachedConn, which
@ -648,7 +678,11 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
cc.bw = bufio.NewWriter(stickyErrWriter{
conn: c,
timeout: t.WriteByteTimeout,
err: &cc.werr,
})
cc.br = bufio.NewReader(c)
cc.fr = NewFramer(cc.bw, cc.br)
if t.CountError != nil {
@ -759,6 +793,61 @@ func (cc *ClientConn) ReserveNewRequest() bool {
return true
}
// ClientConnState describes the state of a ClientConn.
type ClientConnState struct {
// Closed is whether the connection is closed.
Closed bool
// Closing is whether the connection is in the process of
// closing. It may be closing due to shutdown, being a
// single-use connection, being marked as DoNotReuse, or
// having received a GOAWAY frame.
Closing bool
// StreamsActive is how many streams are active.
StreamsActive int
// StreamsReserved is how many streams have been reserved via
// ClientConn.ReserveNewRequest.
StreamsReserved int
// StreamsPending is how many requests have been sent in excess
// of the peer's advertised MaxConcurrentStreams setting and
// are waiting for other streams to complete.
StreamsPending int
// MaxConcurrentStreams is how many concurrent streams the
// peer advertised as acceptable. Zero means no SETTINGS
// frame has been received yet.
MaxConcurrentStreams uint32
// LastIdle, if non-zero, is when the connection last
// transitioned to idle state.
LastIdle time.Time
}
// State returns a snapshot of cc's state.
func (cc *ClientConn) State() ClientConnState {
cc.wmu.Lock()
maxConcurrent := cc.maxConcurrentStreams
if !cc.seenSettings {
maxConcurrent = 0
}
cc.wmu.Unlock()
cc.mu.Lock()
defer cc.mu.Unlock()
return ClientConnState{
Closed: cc.closed,
Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
StreamsActive: len(cc.streams),
StreamsReserved: cc.streamsReserved,
StreamsPending: cc.pendingRequests,
LastIdle: cc.lastIdle,
MaxConcurrentStreams: maxConcurrent,
}
}
// clientConnIdleState describes the suitability of a client
// connection to initiate a new RoundTrip request.
type clientConnIdleState struct {
@ -1010,15 +1099,19 @@ func (cc *ClientConn) decrStreamReservationsLocked() {
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
cs := &clientStream{
cc: cc,
req: req,
trace: httptrace.ContextClientTrace(req.Context()),
peerClosed: make(chan struct{}),
abort: make(chan struct{}),
respHeaderRecv: make(chan struct{}),
donec: make(chan struct{}),
cc: cc,
ctx: ctx,
reqCancel: req.Cancel,
isHead: req.Method == "HEAD",
reqBody: req.Body,
reqBodyContentLength: actualContentLength(req),
trace: httptrace.ContextClientTrace(ctx),
peerClosed: make(chan struct{}),
abort: make(chan struct{}),
respHeaderRecv: make(chan struct{}),
donec: make(chan struct{}),
}
go cs.doRequest()
go cs.doRequest(req)
waitDone := func() error {
select {
@ -1026,44 +1119,60 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return nil
case <-ctx.Done():
return ctx.Err()
case <-req.Cancel:
case <-cs.reqCancel:
return errRequestCanceled
}
}
handleResponseHeaders := func() (*http.Response, error) {
res := cs.res
if res.StatusCode > 299 {
// On error or status code 3xx, 4xx, 5xx, etc abort any
// ongoing write, assuming that the server doesn't care
// about our request body. If the server replied with 1xx or
// 2xx, however, then assume the server DOES potentially
// want our body (e.g. full-duplex streaming:
// golang.org/issue/13444). If it turns out the server
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
if res.Body == noBody && actualContentLength(req) == 0 {
// If there isn't a request or response body still being
// written, then wait for the stream to be closed before
// RoundTrip returns.
if err := waitDone(); err != nil {
return nil, err
}
}
return res, nil
}
for {
select {
case <-cs.respHeaderRecv:
res := cs.res
if res.StatusCode > 299 {
// On error or status code 3xx, 4xx, 5xx, etc abort any
// ongoing write, assuming that the server doesn't care
// about our request body. If the server replied with 1xx or
// 2xx, however, then assume the server DOES potentially
// want our body (e.g. full-duplex streaming:
// golang.org/issue/13444). If it turns out the server
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
cs.abortRequestBodyWrite(errStopReqBodyWrite)
}
res.Request = req
res.TLS = cc.tlsState
if res.Body == noBody && actualContentLength(req) == 0 {
// If there isn't a request or response body still being
// written, then wait for the stream to be closed before
// RoundTrip returns.
if err := waitDone(); err != nil {
return nil, err
}
}
return res, nil
return handleResponseHeaders()
case <-cs.abort:
waitDone()
return nil, cs.abortErr
select {
case <-cs.respHeaderRecv:
// If both cs.respHeaderRecv and cs.abort are signaling,
// pick respHeaderRecv. The server probably wrote the
// response and immediately reset the stream.
// golang.org/issue/49645
return handleResponseHeaders()
default:
waitDone()
return nil, cs.abortErr
}
case <-ctx.Done():
return nil, ctx.Err()
case <-req.Cancel:
err := ctx.Err()
cs.abortStream(err)
return nil, err
case <-cs.reqCancel:
cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@ -1072,8 +1181,8 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
func (cs *clientStream) doRequest() {
err := cs.writeRequest()
func (cs *clientStream) doRequest(req *http.Request) {
err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@ -1084,12 +1193,11 @@ func (cs *clientStream) doRequest() {
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
func (cs *clientStream) writeRequest() (err error) {
func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
req := cs.req
ctx := req.Context()
ctx := cs.ctx
if err := checkConnHeaders(cs.req); err != nil {
if err := checkConnHeaders(req); err != nil {
return err
}
@ -1101,7 +1209,7 @@ func (cs *clientStream) writeRequest() (err error) {
}
select {
case cc.reqHeaderMu <- struct{}{}:
case <-req.Cancel:
case <-cs.reqCancel:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@ -1118,13 +1226,16 @@ func (cs *clientStream) writeRequest() (err error) {
return err
}
cc.addStreamLocked(cs) // assigns stream ID
if isConnectionCloseRequest(req) {
cc.doNotReuse = true
}
cc.mu.Unlock()
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
!cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@ -1143,19 +1254,23 @@ func (cs *clientStream) writeRequest() (err error) {
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
cs.req.Header["Expect"],
req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
err = cs.encodeAndWriteHeaders()
// Past this point (where we send request headers), it is possible for
// RoundTrip to return successfully. Since the RoundTrip contract permits
// the caller to "mutate or reuse" the Request after closing the Response's Body,
// we must take care when referencing the Request from here on.
err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
hasBody := actualContentLength(cs.req) != 0
hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@ -1171,7 +1286,7 @@ func (cs *clientStream) writeRequest() (err error) {
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
case <-req.Cancel:
case <-cs.reqCancel:
err = errRequestCanceled
}
timer.Stop()
@ -1181,7 +1296,7 @@ func (cs *clientStream) writeRequest() (err error) {
}
}
if err = cs.writeRequestBody(req.Body); err != nil {
if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@ -1211,21 +1326,21 @@ func (cs *clientStream) writeRequest() (err error) {
case <-respHeaderTimer:
return errTimeout
case <-respHeaderRecv:
respHeaderRecv = nil
respHeaderTimer = nil // keep waiting for END_STREAM
case <-cs.abort:
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
case <-req.Cancel:
case <-cs.reqCancel:
return errRequestCanceled
}
}
}
func (cs *clientStream) encodeAndWriteHeaders() error {
func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
req := cs.req
ctx := req.Context()
ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@ -1236,7 +1351,7 @@ func (cs *clientStream) encodeAndWriteHeaders() error {
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
case <-req.Cancel:
case <-cs.reqCancel:
return errRequestCanceled
default:
}
@ -1246,14 +1361,14 @@ func (cs *clientStream) encodeAndWriteHeaders() error {
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
trailers, err := commaSeparatedTrailers(cs.req)
trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
contentLen := actualContentLength(cs.req)
contentLen := actualContentLength(req)
hasBody := contentLen != 0
hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@ -1272,7 +1387,6 @@ func (cs *clientStream) encodeAndWriteHeaders() error {
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@ -1283,10 +1397,12 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
if req.Body != nil {
if e := req.Body.Close(); err == nil {
err = e
}
cc.mu.Lock()
bodyClosed := cs.reqBodyClosed
cs.reqBodyClosed = true
cc.mu.Unlock()
if !bodyClosed && cs.reqBody != nil {
cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@ -1320,7 +1436,6 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
if cs.ID != 0 {
cc.forgetStreamID(cs.ID)
}
close(cs.donec)
cc.wmu.Lock()
werr := cc.werr
@ -1328,6 +1443,8 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
if werr != nil {
cc.Close()
}
close(cs.donec)
}
// awaitOpenSlotForStream waits until len(streams) < maxConcurrentStreams.
@ -1401,7 +1518,7 @@ func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
if n > max {
n = max
}
if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@ -1416,13 +1533,13 @@ func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
var bufPool sync.Pool // of *[]byte
func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
req := cs.req
hasTrailers := req.Trailer != nil
remainLen := actualContentLength(req)
remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@ -1463,23 +1580,26 @@ func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
return err
}
}
if err == io.EOF {
sawEOF = true
err = nil
} else if err != nil {
return err
if err != nil {
cc.mu.Lock()
bodyClosed := cs.reqBodyClosed
cc.mu.Unlock()
switch {
case bodyClosed:
return errStopReqBodyWrite
case err == io.EOF:
sawEOF = true
err = nil
default:
return err
}
}
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
switch {
case err == errStopReqBodyWrite:
return err
case err == errStopReqBodyWriteAndCancel:
return err
case err != nil:
if err != nil {
return err
}
cc.wmu.Lock()
@ -1510,16 +1630,26 @@ func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
return nil
}
// Since the RoundTrip contract permits the caller to "mutate or reuse"
// a request after the Response's Body is closed, verify that this hasn't
// happened before accessing the trailers.
cc.mu.Lock()
trailer := req.Trailer
err = cs.abortErr
cc.mu.Unlock()
if err != nil {
return err
}
cc.wmu.Lock()
defer cc.wmu.Unlock()
var trls []byte
if hasTrailers {
trls, err = cc.encodeTrailers(req)
if len(trailer) > 0 {
trls, err = cc.encodeTrailers(trailer)
if err != nil {
cc.wmu.Unlock()
return err
}
}
defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
@ -1540,23 +1670,22 @@ func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
req := cs.req
ctx := req.Context()
ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
if cs.stopReqBody != nil {
return 0, cs.stopReqBody
if cs.reqBodyClosed {
return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
case <-req.Cancel:
case <-cs.reqCancel:
return 0, errRequestCanceled
default:
}
@ -1770,11 +1899,11 @@ func shouldSendReqContentLength(method string, contentLength int64) bool {
}
// requires cc.wmu be held.
func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
for k, vv := range req.Trailer {
for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@ -1784,7 +1913,7 @@ func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
return nil, errRequestHeaderListSize
}
for k, vv := range req.Trailer {
for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@ -1920,7 +2049,13 @@ func (rl *clientConnReadLoop) cleanup() {
}
cc.closed = true
for _, cs := range cc.streams {
cs.abortStreamLocked(err)
select {
case <-cs.peerClosed:
// The server closed the stream before closing the conn,
// so no need to interrupt it.
default:
cs.abortStreamLocked(err)
}
}
cc.cond.Broadcast()
cc.mu.Unlock()
@ -2162,33 +2297,40 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
return nil, nil
}
streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
res.ContentLength = int64(cl)
} else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
} else if len(clens) > 1 {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
res.ContentLength = int64(cl)
} else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
} else if len(clens) > 1 {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
} else if f.StreamEnded() && !cs.isHead {
res.ContentLength = 0
}
if streamEnded || isHead {
if cs.isHead {
res.Body = noBody
return res, nil
}
if f.StreamEnded() {
if res.ContentLength > 0 {
res.Body = missingBody{}
} else {
res.Body = noBody
}
return res, nil
}
cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
cs.bytesRemain = res.ContentLength
res.Body = transportResponseBody{cs}
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
res.Header.Del("Content-Encoding")
res.Header.Del("Content-Length")
res.ContentLength = -1
@ -2227,8 +2369,7 @@ func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFr
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
// On Close it sends RST_STREAM if EOF wasn't already seen.
// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@ -2311,6 +2452,8 @@ func (b transportResponseBody) Close() error {
}
cc.mu.Unlock()
// TODO(dneil): Acquiring this mutex can block indefinitely.
// Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@ -2325,9 +2468,12 @@ func (b transportResponseBody) Close() error {
select {
case <-cs.donec:
case <-cs.req.Context().Done():
return cs.req.Context().Err()
case <-cs.req.Cancel:
case <-cs.ctx.Done():
// See golang/go#49366: The net/http package can cancel the
// request context after the response body is fully read.
// Don't treat this as an error.
return nil
case <-cs.reqCancel:
return errRequestCanceled
}
return nil
@ -2381,7 +2527,7 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
return nil
}
if f.Length > 0 {
if cs.req.Method == "HEAD" && len(data) > 0 {
if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
@ -2450,6 +2596,12 @@ func (rl *clientConnReadLoop) endStream(cs *clientStream) {
// server.go's (*stream).endStream method.
if !cs.readClosed {
cs.readClosed = true
// Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
// race condition: The caller can read io.EOF from Response.Body
// and close the body before we close cs.peerClosed, causing
// cleanupWriteRequest to send a RST_STREAM.
rl.cc.mu.Lock()
defer rl.cc.mu.Unlock()
cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
close(cs.peerClosed)
}
@ -2731,6 +2883,11 @@ func (t *Transport) logf(format string, args ...interface{}) {
var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
type missingBody struct{}
func (missingBody) Close() error { return nil }
func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
func strSliceContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {