4
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2025-07-14 03:46:28 +00:00

Update dependencies (#2180)

* Update dependencies

* Fix whatsmeow API changes
This commit is contained in:
Wim
2024-08-27 19:04:05 +02:00
committed by GitHub
parent d16645c952
commit c4157a4d5b
589 changed files with 681707 additions and 198856 deletions

View File

@ -193,7 +193,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn error // if set, loopyWriter will exit, resulting in conn closure
closeConn error // if set, loopyWriter will exit with this error
}
func (*goAway) isTransportResponseFrame() bool { return false }
@ -336,7 +336,7 @@ func (c *controlBuffer) put(it cbItem) error {
return err
}
func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@ -344,7 +344,7 @@ func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, err
return false, c.err
}
if f != nil {
if !f(it) { // f wasn't successful
if !f() { // f wasn't successful
c.mu.Unlock()
return false, nil
}
@ -495,21 +495,22 @@ type loopyWriter struct {
ssGoAwayHandler func(*goAway) (bool, error)
}
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
ssGoAwayHandler: goAwayHandler,
}
return l
}

View File

@ -51,14 +51,10 @@ import (
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
msg := "gRPC requires HTTP/2"
http.Error(w, msg, http.StatusBadRequest)
return nil, errors.New(msg)
}
if r.Method != "POST" {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
http.Error(w, msg, http.StatusBadRequest)
http.Error(w, msg, http.StatusMethodNotAllowed)
return nil, errors.New(msg)
}
contentType := r.Header.Get("Content-Type")
@ -69,6 +65,11 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
http.Error(w, msg, http.StatusUnsupportedMediaType)
return nil, errors.New(msg)
}
if r.ProtoMajor != 2 {
msg := "gRPC requires HTTP/2"
http.Error(w, msg, http.StatusHTTPVersionNotSupported)
return nil, errors.New(msg)
}
if _, ok := w.(http.Flusher); !ok {
msg := "gRPC requires a ResponseWriter supporting http.Flusher"
http.Error(w, msg, http.StatusInternalServerError)

View File

@ -114,11 +114,11 @@ type http2Client struct {
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
nextID uint32
state transportState
activeStreams map[uint32]*Stream
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
@ -140,9 +140,7 @@ type http2Client struct {
// variable.
kpDormant bool
// Fields below are for channelz metric collection.
channelzID *channelz.Identifier
czData *channelzData
channelz *channelz.Socket
onClose func(GoAwayReason)
@ -319,6 +317,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if opts.MaxHeaderListSize != nil {
maxHeaderListSize = *opts.MaxHeaderListSize
}
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
@ -346,11 +345,25 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
}
var czSecurity credentials.ChannelzSecurityValue
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
czSecurity = au.GetSecurityValue()
}
t.channelz = channelz.RegisterSocket(
&channelz.Socket{
SocketType: channelz.SocketTypeNormal,
Parent: opts.ChannelzParent,
SocketMetrics: channelz.SocketMetrics{},
EphemeralMetrics: t.socketMetrics,
LocalAddr: t.localAddr,
RemoteAddr: t.remoteAddr,
SocketOptions: channelz.GetSocketOption(t.conn),
Security: czSecurity,
})
t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@ -381,10 +394,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
return nil, err
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
@ -399,10 +408,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
defer func() {
if err == nil {
err = <-readerErrCh
}
if err != nil {
// writerDone should be closed since the loopy goroutine
// wouldn't have started in the case this function returns an error.
close(t.writerDone)
t.Close(err)
}
}()
@ -449,8 +458,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
// Block until the server preface is received successfully or an error occurs.
if err = <-readerErrCh; err != nil {
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
@ -508,6 +521,17 @@ func (t *http2Client) getPeer() *peer.Peer {
}
}
// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway
// to be the last frame loopy writes to the transport.
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
return false, g.closeConn
}
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
ri := credentials.RequestInfo{
@ -756,8 +780,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return ErrConnClosing
}
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
t.channelz.SocketMetrics.StreamsStarted.Add(1)
t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
// If the keepalive goroutine has gone dormant, wake it up.
if t.kpDormant {
@ -772,7 +796,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it any) bool {
checkForStreamQuota := func() bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
t.waitingStreams++
@ -784,23 +808,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
t.waitingStreams--
}
t.streamQuota--
h := it.(*headerFrame)
h.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
hdr.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = hdr.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.activeStreams[s.id] = s
t.mu.Unlock()
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
case t.streamsQuotaAvailable <- struct{}{}:
@ -810,13 +835,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
var hdrListSizeErr error
checkForHeaderListSize := func(it any) bool {
checkForHeaderListSize := func() bool {
if t.maxSendHeaderListSize == nil {
return true
}
hdrFrame := it.(*headerFrame)
var sz int64
for _, f := range hdrFrame.hf {
for _, f := range hdr.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
return false
@ -825,8 +849,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
for {
success, err := t.controlBuf.executeAndPut(func(it any) bool {
return checkForHeaderListSize(it) && checkForStreamQuota(it)
success, err := t.controlBuf.executeAndPut(func() bool {
return checkForHeaderListSize() && checkForStreamQuota()
}, hdr)
if err != nil {
// Connection closed.
@ -928,16 +952,16 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
},
rst: rst,
rstCode: rstCode,
}
addBackStreamQuota := func(any) bool {
addBackStreamQuota := func() bool {
t.streamQuota++
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
@ -957,7 +981,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
// accessed anymore.
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only close once.
@ -982,10 +1006,13 @@ func (t *http2Client) Close(err error) {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
t.controlBuf.finish()
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY.
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
<-t.writerDone
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelzID)
channelz.RemoveEntry(t.channelz.ID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
@ -1090,7 +1117,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
updateIWS := func(any) bool {
updateIWS := func() bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
@ -1243,7 +1270,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
}
updateFuncs = append(updateFuncs, updateStreamQuota)
}
t.controlBuf.executeAndPut(func(any) bool {
t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}
@ -1708,7 +1735,7 @@ func (t *http2Client) keepalive() {
// keepalive timer expired. In both cases, we need to send a ping.
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
@ -1738,40 +1765,23 @@ func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
s := channelz.SocketInternalMetric{
StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.localAddr,
RemoteAddr: t.remoteAddr,
// RemoteName :
func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
return &channelz.EphemeralSocketMetrics{
LocalFlowControlWindow: int64(t.fc.getSize()),
RemoteFlowControlWindow: t.getOutFlowWindow(),
}
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
func (t *http2Client) IncrMsgSent() {
atomic.AddInt64(&t.czData.msgSent, 1)
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesSent.Add(1)
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) IncrMsgRecv() {
atomic.AddInt64(&t.czData.msgRecv, 1)
atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesReceived.Add(1)
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) getOutFlowWindow() int64 {

View File

@ -25,6 +25,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net"
"net/http"
"strconv"
@ -43,7 +44,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@ -118,8 +118,7 @@ type http2Server struct {
idle time.Time
// Fields below are for channelz metric collection.
channelzID *channelz.Identifier
czData *channelzData
channelz *channelz.Socket
bufferPool *bufferPool
connectionID uint64
@ -262,9 +261,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
var czSecurity credentials.ChannelzSecurityValue
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
czSecurity = au.GetSecurityValue()
}
t.channelz = channelz.RegisterSocket(
&channelz.Socket{
SocketType: channelz.SocketTypeNormal,
Parent: config.ChannelzParent,
SocketMetrics: channelz.SocketMetrics{},
EphemeralMetrics: t.socketMetrics,
LocalAddr: t.peer.LocalAddr,
RemoteAddr: t.peer.Addr,
SocketOptions: channelz.GetSocketOption(t.conn),
Security: czSecurity,
},
)
t.logger = prefixLoggerForServerTransport(t)
t.controlBuf = newControlBuffer(t.done)
@ -274,10 +288,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))
if err != nil {
return nil, err
}
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
t.framer.writer.Flush()
@ -320,8 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
@ -334,9 +343,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
// closed, would lead to a TCP RST instead of FIN, and the client
// encountering errors. For more info:
// https://github.com/grpc/grpc-go/issues/5358
timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case <-t.readerDone:
case <-time.After(time.Second):
case <-timer.C:
}
t.conn.Close()
}
@ -592,8 +603,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
}
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
t.channelz.SocketMetrics.StreamsStarted.Add(1)
t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
@ -658,8 +669,14 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if err := t.operateHeaders(ctx, frame, handle); err != nil {
t.Close(err)
break
// Any error processing client headers, e.g. invalid stream ID,
// is considered a protocol violation.
t.controlBuf.put(&goAway{
code: http2.ErrCodeProtocol,
debugData: []byte(err.Error()),
closeConn: err,
})
continue
}
case *http2.DataFrame:
t.handleData(frame)
@ -842,7 +859,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
return nil
})
t.controlBuf.executeAndPut(func(any) bool {
t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}
@ -996,12 +1013,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
hf := &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
onWrite: t.setResetPingStrikes,
})
}
success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
if !success {
if err != nil {
return err
@ -1190,12 +1208,12 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
return
}
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
kpTimeoutLeft = t.kp.Timeout
@ -1235,7 +1253,7 @@ func (t *http2Server) Close(err error) {
if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
}
channelz.RemoveEntry(t.channelzID)
channelz.RemoveEntry(t.channelz.ID)
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@ -1256,9 +1274,9 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
}
@ -1375,38 +1393,21 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, nil
}
func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
s := channelz.SocketInternalMetric{
StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.peer.LocalAddr,
RemoteAddr: t.peer.Addr,
// RemoteName :
func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
return &channelz.EphemeralSocketMetrics{
LocalFlowControlWindow: int64(t.fc.getSize()),
RemoteFlowControlWindow: t.getOutFlowWindow(),
}
if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Server) IncrMsgSent() {
atomic.AddInt64(&t.czData.msgSent, 1)
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesSent.Add(1)
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
}
func (t *http2Server) IncrMsgRecv() {
atomic.AddInt64(&t.czData.msgRecv, 1)
atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesReceived.Add(1)
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
}
func (t *http2Server) getOutFlowWindow() int64 {
@ -1439,7 +1440,7 @@ func getJitter(v time.Duration) time.Duration {
}
// Generate a jitter between +/- 10% of the value.
r := int64(v / 10)
j := grpcrand.Int63n(2*r) - r
j := rand.Int63n(2*r) - r
return time.Duration(j)
}

View File

@ -418,10 +418,9 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu
return f
}
func getWriteBufferPool(writeBufferSize int) *sync.Pool {
func getWriteBufferPool(size int) *sync.Pool {
writeBufferMutex.Lock()
defer writeBufferMutex.Unlock()
size := writeBufferSize * 2
pool, ok := writeBufferPoolMap[size]
if ok {
return pool

View File

@ -28,6 +28,7 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"
@ -303,7 +304,7 @@ func (s *Stream) isHeaderSent() bool {
}
// updateHeaderSent updates headerSent and returns true
// if it was alreay set. It is valid only on server-side.
// if it was already set. It is valid only on server-side.
func (s *Stream) updateHeaderSent() bool {
return atomic.SwapUint32(&s.headerSent, 1) == 1
}
@ -362,8 +363,12 @@ func (s *Stream) SendCompress() string {
// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via grpc-accept-encoding header.
func (s *Stream) ClientAdvertisedCompressors() string {
return s.clientAdvertisedCompressors
func (s *Stream) ClientAdvertisedCompressors() []string {
values := strings.Split(s.clientAdvertisedCompressors, ",")
for i, v := range values {
values[i] = strings.TrimSpace(v)
}
return values
}
// Done returns a channel which is closed when it receives the final status
@ -566,7 +571,7 @@ type ServerConfig struct {
WriteBufferSize int
ReadBufferSize int
SharedWriteBuffer bool
ChannelzParentID *channelz.Identifier
ChannelzParent *channelz.Server
MaxHeaderListSize *uint32
HeaderTableSize *uint32
}
@ -601,8 +606,8 @@ type ConnectOptions struct {
ReadBufferSize int
// SharedWriteBuffer indicates whether connections should reuse write buffer
SharedWriteBuffer bool
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID *channelz.Identifier
// ChannelzParent sets the addrConn id which initiated the creation of this client transport.
ChannelzParent *channelz.SubChannel
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
@ -815,30 +820,6 @@ const (
GoAwayTooManyPings GoAwayReason = 2
)
// channelzData is used to store channelz related data for http2Client and http2Server.
// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
type channelzData struct {
kpCount int64
// The number of streams that have started, including already finished ones.
streamsStarted int64
// Client side: The number of streams that have ended successfully by receiving
// EoS bit set frame from server.
// Server side: The number of streams that have ended successfully by sending
// frame with EoS bit set.
streamsSucceeded int64
streamsFailed int64
// lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
// instead of time.Time since it's more costly to atomically update time.Time variable than int64
// variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
lastStreamCreatedTime int64
msgSent int64
msgRecv int64
lastMsgSentTime int64
lastMsgRecvTime int64
}
// ContextErr converts the error from context package into a status error.
func ContextErr(err error) error {
switch err {