4
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2025-07-03 07:17:44 +00:00

Update dependencies (#1831)

This commit is contained in:
Wim
2022-05-09 23:00:23 +02:00
committed by GitHub
parent 700b95546b
commit 1e55dd47f2
306 changed files with 435896 additions and 195113 deletions

View File

@ -112,6 +112,8 @@ type Client struct {
AutoTrustIdentity bool
DebugDecodeBeforeSend bool
OneMessageAtATime bool
messageSendLock sync.Mutex
uniqueID string
idCounter uint32
@ -262,7 +264,7 @@ func (cli *Client) onDisconnect(ns *socket.NoiseSocket, remote bool) {
defer cli.socketLock.Unlock()
if cli.socket == ns {
cli.socket = nil
cli.clearResponseWaiters()
cli.clearResponseWaiters(xmlStreamEndNode)
if !cli.isExpectedDisconnect() && remote {
cli.Log.Debugf("Emitting Disconnected event")
go cli.dispatchEvent(&events.Disconnected{})
@ -294,9 +296,9 @@ func (cli *Client) autoReconnect() {
return
}
for {
cli.AutoReconnectErrors++
autoReconnectDelay := time.Duration(cli.AutoReconnectErrors) * 2 * time.Second
cli.Log.Debugf("Automatically reconnecting after %v", autoReconnectDelay)
cli.AutoReconnectErrors++
time.Sleep(autoReconnectDelay)
err := cli.Connect()
if errors.Is(err, ErrAlreadyConnected) {
@ -489,30 +491,35 @@ func (cli *Client) handlerQueueLoop(ctx context.Context) {
}
}
func (cli *Client) sendNode(node waBinary.Node) error {
func (cli *Client) sendNodeDebug(node waBinary.Node) ([]byte, error) {
cli.socketLock.RLock()
sock := cli.socket
cli.socketLock.RUnlock()
if sock == nil {
return ErrNotConnected
return nil, ErrNotConnected
}
payload, err := waBinary.Marshal(node)
if err != nil {
return fmt.Errorf("failed to marshal node: %w", err)
return nil, fmt.Errorf("failed to marshal node: %w", err)
}
if cli.DebugDecodeBeforeSend {
var decoded *waBinary.Node
decoded, err = waBinary.Unmarshal(payload[1:])
if err != nil {
cli.Log.Infof("Malformed payload: %s", base64.URLEncoding.EncodeToString(payload))
return fmt.Errorf("failed to decode the binary we just produced: %w", err)
return nil, fmt.Errorf("failed to decode the binary we just produced: %w", err)
}
node = *decoded
}
cli.sendLog.Debugf("%s", node.XMLString())
return sock.SendFrame(payload)
return payload, sock.SendFrame(payload)
}
func (cli *Client) sendNode(node waBinary.Node) error {
_, err := cli.sendNodeDebug(node)
return err
}
func (cli *Client) dispatchEvent(evt interface{}) {

View File

@ -17,6 +17,7 @@ import (
func (cli *Client) handleStreamError(node *waBinary.Node) {
atomic.StoreUint32(&cli.isLoggedIn, 0)
cli.clearResponseWaiters(node)
code, _ := node.Attrs["code"].(string)
conflict, _ := node.GetOptionalChildByTag("conflict")
conflictType := conflict.AttrGetter().OptionalString("type")

View File

@ -15,11 +15,10 @@ import (
// Miscellaneous errors
var (
ErrNoSession = errors.New("can't encrypt message for device: no signal session established")
ErrIQTimedOut = errors.New("info query timed out")
ErrIQDisconnected = errors.New("websocket disconnected before info query returned response")
ErrNotConnected = errors.New("websocket not connected")
ErrNotLoggedIn = errors.New("the store doesn't contain a device JID")
ErrNoSession = errors.New("can't encrypt message for device: no signal session established")
ErrIQTimedOut = errors.New("info query timed out")
ErrNotConnected = errors.New("websocket not connected")
ErrNotLoggedIn = errors.New("the store doesn't contain a device JID")
ErrAlreadyConnected = errors.New("websocket is already connected")
@ -47,6 +46,10 @@ var (
ErrBusinessMessageLinkNotFound = errors.New("that business message link does not exist or has been revoked")
// ErrInvalidImageFormat is returned by SetGroupPhoto if the given photo is not in the correct format.
ErrInvalidImageFormat = errors.New("the given data is not a valid image")
// ErrMediaNotAvailableOnPhone is returned by DecryptMediaRetryNotification if the given event contains error code 2.
ErrMediaNotAvailableOnPhone = errors.New("media no longer available on phone")
// ErrUnknownMediaRetryError is returned by DecryptMediaRetryNotification if the given event contains an unknown error code.
ErrUnknownMediaRetryError = errors.New("unknown media retry error")
)
// Some errors that Client.SendMessage can return
@ -54,7 +57,6 @@ var (
ErrBroadcastListUnsupported = errors.New("sending to broadcast lists is not yet supported")
ErrUnknownServer = errors.New("can't send message to unknown server")
ErrRecipientADJID = errors.New("message recipient must be normal (non-AD) JID")
ErrSendDisconnected = errors.New("websocket disconnected before message send returned response")
)
// Some errors that Client.Download can return
@ -157,3 +159,23 @@ type ElementMissingError struct {
func (eme *ElementMissingError) Error() string {
return fmt.Sprintf("missing <%s> element in %s", eme.Tag, eme.In)
}
var ErrIQDisconnected = &DisconnectedError{Action: "info query"}
// DisconnectedError is returned if the websocket disconnects before an info query or other request gets a response.
type DisconnectedError struct {
Action string
Node *waBinary.Node
}
func (err *DisconnectedError) Error() string {
return fmt.Sprintf("websocket disconnected before %s returned response", err.Action)
}
func (err *DisconnectedError) Is(other error) bool {
otherDisc, ok := other.(*DisconnectedError)
if !ok {
return false
}
return otherDisc.Action == err.Action
}

View File

@ -105,20 +105,22 @@ func (cli *Client) SendMediaRetryReceipt(message *types.MessageInfo, mediaKey []
// DecryptMediaRetryNotification decrypts a media retry notification using the media key.
func DecryptMediaRetryNotification(evt *events.MediaRetry, mediaKey []byte) (*waProto.MediaRetryNotification, error) {
gcm, err := prepareMediaRetryGCM(mediaKey)
if err != nil {
return nil, err
}
plaintext, err := gcm.Open(nil, evt.IV, evt.Ciphertext, []byte(evt.MessageID))
if err != nil {
return nil, fmt.Errorf("failed to decrypt notification: %w", err)
}
var notif waProto.MediaRetryNotification
err = proto.Unmarshal(plaintext, &notif)
if err != nil {
var plaintext []byte
if evt.Error != nil && evt.Ciphertext == nil {
if evt.Error.Code == 2 {
return nil, ErrMediaNotAvailableOnPhone
}
return nil, fmt.Errorf("%w (code: %d)", ErrUnknownMediaRetryError, evt.Error.Code)
} else if gcm, err := prepareMediaRetryGCM(mediaKey); err != nil {
return nil, err
} else if plaintext, err = gcm.Open(nil, evt.IV, evt.Ciphertext, []byte(evt.MessageID)); err != nil {
return nil, fmt.Errorf("failed to decrypt notification: %w", err)
} else if err = proto.Unmarshal(plaintext, &notif); err != nil {
return nil, fmt.Errorf("failed to unmarshal notification (invalid encryption key?): %w", err)
} else {
return &notif, nil
}
return &notif, nil
}
func parseMediaRetryNotification(node *waBinary.Node) (*events.MediaRetry, error) {
@ -141,6 +143,14 @@ func parseMediaRetryNotification(node *waBinary.Node) (*events.MediaRetry, error
return nil, fmt.Errorf("missing attributes in <rmr> tag: %w", rmrAG.Error())
}
errNode, ok := node.GetOptionalChildByTag("error")
if ok {
evt.Error = &events.MediaRetryError{
Code: errNode.AttrGetter().Int("code"),
}
return &evt, nil
}
evt.Ciphertext, ok = node.GetChildByTag("encrypt", "enc_p").Content.([]byte)
if !ok {
return nil, &ElementMissingError{Tag: "enc_p", In: fmt.Sprintf("retry notification %s", evt.MessageID)}
@ -153,7 +163,6 @@ func parseMediaRetryNotification(node *waBinary.Node) (*events.MediaRetry, error
}
func (cli *Client) handleMediaRetryNotification(node *waBinary.Node) {
// TODO handle errors (e.g. <error code="2"/>)
evt, err := parseMediaRetryNotification(node)
if err != nil {
cli.Log.Warnf("Failed to parse media retry notification: %v", err)

View File

@ -8,6 +8,7 @@ package whatsmeow
import (
"context"
"encoding/base64"
"strconv"
"sync/atomic"
"time"
@ -20,13 +21,17 @@ func (cli *Client) generateRequestID() string {
return cli.uniqueID + strconv.FormatUint(uint64(atomic.AddUint32(&cli.idCounter, 1)), 10)
}
var closedNode = &waBinary.Node{Tag: "xmlstreamend"}
var xmlStreamEndNode = &waBinary.Node{Tag: "xmlstreamend"}
func (cli *Client) clearResponseWaiters() {
func isDisconnectNode(node *waBinary.Node) bool {
return node == xmlStreamEndNode || node.Tag == "stream:error"
}
func (cli *Client) clearResponseWaiters(node *waBinary.Node) {
cli.responseWaitersLock.Lock()
for _, waiter := range cli.responseWaiters {
select {
case waiter <- closedNode:
case waiter <- node:
default:
close(waiter)
}
@ -86,7 +91,7 @@ type infoQuery struct {
Context context.Context
}
func (cli *Client) sendIQAsync(query infoQuery) (<-chan *waBinary.Node, error) {
func (cli *Client) sendIQAsyncDebug(query infoQuery) (<-chan *waBinary.Node, []byte, error) {
if len(query.ID) == 0 {
query.ID = cli.generateRequestID()
}
@ -102,20 +107,25 @@ func (cli *Client) sendIQAsync(query infoQuery) (<-chan *waBinary.Node, error) {
if !query.Target.IsEmpty() {
attrs["target"] = query.Target
}
err := cli.sendNode(waBinary.Node{
data, err := cli.sendNodeDebug(waBinary.Node{
Tag: "iq",
Attrs: attrs,
Content: query.Content,
})
if err != nil {
cli.cancelResponse(query.ID, waiter)
return nil, err
return nil, data, err
}
return waiter, nil
return waiter, data, nil
}
func (cli *Client) sendIQAsync(query infoQuery) (<-chan *waBinary.Node, error) {
ch, _, err := cli.sendIQAsyncDebug(query)
return ch, err
}
func (cli *Client) sendIQ(query infoQuery) (*waBinary.Node, error) {
resChan, err := cli.sendIQAsync(query)
resChan, data, err := cli.sendIQAsyncDebug(query)
if err != nil {
return nil, err
}
@ -127,8 +137,11 @@ func (cli *Client) sendIQ(query infoQuery) (*waBinary.Node, error) {
}
select {
case res := <-resChan:
if res == closedNode {
return nil, ErrIQDisconnected
if isDisconnectNode(res) {
if cli.DebugDecodeBeforeSend && res.Tag == "stream:error" && res.GetChildByTag("xml-not-well-formed").Tag != "" {
cli.Log.Debugf("Info query that was interrupted by xml-not-well-formed: %s", base64.URLEncoding.EncodeToString(data))
}
return nil, &DisconnectedError{Action: "info query", Node: res}
}
resType, _ := res.Attrs["type"].(string)
if res.Tag != "iq" || (resType != "result" && resType != "error") {

View File

@ -148,6 +148,13 @@ func (cli *Client) handleRetryReceipt(receipt *events.Receipt, node *waBinary.No
} else if bundle == nil {
return fmt.Errorf("didn't get prekey bundle for %s (response size: %d)", senderAD, len(keys))
}
if retryCount > 3 {
cli.Log.Debugf("Erasing existing session for %s due to retry receipt with count>3", receipt.Sender)
err = cli.Store.Sessions.DeleteSession(receipt.Sender.SignalAddress().String())
if err != nil {
return fmt.Errorf("failed to delete session for %s: %w", senderAD, err)
}
}
}
encrypted, includeDeviceIdentity, err := cli.encryptMessageForDevice(plaintext, receipt.Sender, bundle)
if err != nil {
@ -161,6 +168,9 @@ func (cli *Client) handleRetryReceipt(receipt *events.Receipt, node *waBinary.No
"id": messageID,
"t": timestamp.Unix(),
}
if !receipt.IsGroup {
attrs["device_fanout"] = false
}
if participant, ok := node.Attrs["participant"]; ok {
attrs["participant"] = participant
}

View File

@ -73,15 +73,21 @@ func (cli *Client) SendMessage(to types.JID, id types.MessageID, message *waProt
id = GenerateMessageID()
}
if cli.OneMessageAtATime {
cli.messageSendLock.Lock()
defer cli.messageSendLock.Unlock()
}
cli.addRecentMessage(to, id, message)
respChan := cli.waitResponse(id)
var err error
var phash string
var data []byte
switch to.Server {
case types.GroupServer:
phash, err = cli.sendGroup(to, id, message)
phash, data, err = cli.sendGroup(to, id, message)
case types.DefaultUserServer:
err = cli.sendDM(to, id, message)
data, err = cli.sendDM(to, id, message)
case types.BroadcastServer:
err = ErrBroadcastListUnsupported
default:
@ -92,8 +98,11 @@ func (cli *Client) SendMessage(to types.JID, id types.MessageID, message *waProt
return time.Time{}, err
}
resp := <-respChan
if resp == closedNode {
return time.Time{}, ErrSendDisconnected
if isDisconnectNode(resp) {
if cli.DebugDecodeBeforeSend && resp.Tag == "stream:error" && resp.GetChildByTag("xml-not-well-formed").Tag != "" {
cli.Log.Debugf("Message that was interrupted by xml-not-well-formed: %s", base64.URLEncoding.EncodeToString(data))
}
return time.Time{}, &DisconnectedError{Action: "message send", Node: resp}
}
ag := resp.AttrGetter()
ts := time.Unix(ag.Int64("t"), 0)
@ -137,22 +146,22 @@ func participantListHashV2(participants []types.JID) string {
return fmt.Sprintf("2:%s", base64.RawStdEncoding.EncodeToString(hash[:6]))
}
func (cli *Client) sendGroup(to types.JID, id types.MessageID, message *waProto.Message) (string, error) {
func (cli *Client) sendGroup(to types.JID, id types.MessageID, message *waProto.Message) (string, []byte, error) {
participants, err := cli.getGroupMembers(to)
if err != nil {
return "", fmt.Errorf("failed to get group members: %w", err)
return "", nil, fmt.Errorf("failed to get group members: %w", err)
}
plaintext, _, err := marshalMessage(to, message)
if err != nil {
return "", err
return "", nil, err
}
builder := groups.NewGroupSessionBuilder(cli.Store, pbSerializer)
senderKeyName := protocol.NewSenderKeyName(to.String(), cli.Store.ID.SignalAddress())
signalSKDMessage, err := builder.Create(senderKeyName)
if err != nil {
return "", fmt.Errorf("failed to create sender key distribution message to send %s to %s: %w", id, to, err)
return "", nil, fmt.Errorf("failed to create sender key distribution message to send %s to %s: %w", id, to, err)
}
skdMessage := &waProto.Message{
SenderKeyDistributionMessage: &waProto.SenderKeyDistributionMessage{
@ -162,19 +171,19 @@ func (cli *Client) sendGroup(to types.JID, id types.MessageID, message *waProto.
}
skdPlaintext, err := proto.Marshal(skdMessage)
if err != nil {
return "", fmt.Errorf("failed to marshal sender key distribution message to send %s to %s: %w", id, to, err)
return "", nil, fmt.Errorf("failed to marshal sender key distribution message to send %s to %s: %w", id, to, err)
}
cipher := groups.NewGroupCipher(builder, senderKeyName, cli.Store)
encrypted, err := cipher.Encrypt(padMessage(plaintext))
if err != nil {
return "", fmt.Errorf("failed to encrypt group message to send %s to %s: %w", id, to, err)
return "", nil, fmt.Errorf("failed to encrypt group message to send %s to %s: %w", id, to, err)
}
ciphertext := encrypted.SignedSerialize()
node, allDevices, err := cli.prepareMessageNode(to, id, message, participants, skdPlaintext, nil)
if err != nil {
return "", err
return "", nil, err
}
phash := participantListHashV2(allDevices)
@ -185,28 +194,28 @@ func (cli *Client) sendGroup(to types.JID, id types.MessageID, message *waProto.
Attrs: waBinary.Attrs{"v": "2", "type": "skmsg"},
})
err = cli.sendNode(*node)
data, err := cli.sendNodeDebug(*node)
if err != nil {
return "", fmt.Errorf("failed to send message node: %w", err)
return "", nil, fmt.Errorf("failed to send message node: %w", err)
}
return phash, nil
return phash, data, nil
}
func (cli *Client) sendDM(to types.JID, id types.MessageID, message *waProto.Message) error {
func (cli *Client) sendDM(to types.JID, id types.MessageID, message *waProto.Message) ([]byte, error) {
messagePlaintext, deviceSentMessagePlaintext, err := marshalMessage(to, message)
if err != nil {
return err
return nil, err
}
node, _, err := cli.prepareMessageNode(to, id, message, []types.JID{to, *cli.Store.ID}, messagePlaintext, deviceSentMessagePlaintext)
node, _, err := cli.prepareMessageNode(to, id, message, []types.JID{to, cli.Store.ID.ToNonAD()}, messagePlaintext, deviceSentMessagePlaintext)
if err != nil {
return err
return nil, err
}
err = cli.sendNode(*node)
data, err := cli.sendNodeDebug(*node)
if err != nil {
return fmt.Errorf("failed to send message node: %w", err)
return nil, fmt.Errorf("failed to send message node: %w", err)
}
return nil
return data, nil
}
func (cli *Client) prepareMessageNode(to types.JID, id types.MessageID, message *waProto.Message, participants []types.JID, plaintext, dsmPlaintext []byte) (*waBinary.Node, []types.JID, error) {
@ -307,7 +316,7 @@ func (cli *Client) encryptMessageForDevices(allDevices []types.JID, id string, m
if len(retryDevices) > 0 {
bundles, err := cli.fetchPreKeys(retryDevices)
if err != nil {
cli.Log.Warnf("Failed to fetch prekeys for %d to retry encryption: %v", retryDevices, err)
cli.Log.Warnf("Failed to fetch prekeys for %v to retry encryption: %v", retryDevices, err)
} else {
for _, jid := range retryDevices {
resp := bundles[jid]

View File

@ -347,11 +347,18 @@ type OfflineSyncCompleted struct {
Count int
}
type MediaRetryError struct {
Code int
}
// MediaRetry is emitted when the phone sends a response to a media retry request.
type MediaRetry struct {
Ciphertext []byte
IV []byte
// Sometimes there's an unencrypted media retry error. In these cases, Ciphertext and IV will be nil.
Error *MediaRetryError
Timestamp time.Time // The time of the response.
MessageID types.MessageID // The ID of the message.