5
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2024-11-14 03:50:26 +00:00

Implement ratelimiting (matrix). Fixes #1238 (#1326)

This commit is contained in:
Wim 2020-12-06 17:18:25 +01:00 committed by GitHub
parent 8eba2d3e50
commit 2d3c26a4b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 169 additions and 51 deletions

View File

@ -181,3 +181,35 @@ func (b *Bmatrix) getAvatarURL(sender string) string {
return url return url
} }
// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep
func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) {
httpErr := handleError(err)
if httpErr.Errcode != "M_LIMIT_EXCEEDED" {
return 0, false
}
b.Log.Debugf("ratelimited: %s", httpErr.Err)
b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000)
return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true
}
// retry function will check if we're ratelimited and retries again when backoff time expired
// returns original error if not 429 ratelimit
func (b *Bmatrix) retry(f func() error) error {
b.rateMutex.Lock()
defer b.rateMutex.Unlock()
for {
if err := f(); err != nil {
if backoff, ok := b.handleRatelimit(err); ok {
time.Sleep(backoff)
} else {
return err
}
} else {
return nil
}
}
}

View File

@ -30,6 +30,7 @@ type Bmatrix struct {
UserID string UserID string
NicknameMap map[string]NicknameCacheEntry NicknameMap map[string]NicknameCacheEntry
RoomMap map[string]string RoomMap map[string]string
rateMutex sync.RWMutex
sync.RWMutex sync.RWMutex
*bridge.Config *bridge.Config
} }
@ -99,17 +100,9 @@ func (b *Bmatrix) Disconnect() error {
} }
func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error { func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error {
retry: return b.retry(func() error {
resp, err := b.mc.JoinRoom(channel.Name, "", nil) resp, err := b.mc.JoinRoom(channel.Name, "", nil)
if err != nil { if err != nil {
httpErr := handleError(err)
if httpErr.Errcode == "M_LIMIT_EXCEEDED" {
b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before joining %s", httpErr.RetryAfterMs/1000, channel.Name)
time.Sleep((time.Duration(httpErr.RetryAfterMs) * time.Millisecond))
goto retry
}
return err return err
} }
@ -118,6 +111,7 @@ retry:
b.Unlock() b.Unlock()
return nil return nil
})
} }
func (b *Bmatrix) Send(msg config.Message) (string, error) { func (b *Bmatrix) Send(msg config.Message) (string, error) {
@ -135,11 +129,21 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
Body: username.plain + msg.Text, Body: username.plain + msg.Text,
FormattedBody: username.formatted + msg.Text, FormattedBody: username.formatted + msg.Text,
} }
msgID := ""
err := b.retry(func() error {
resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
if err != nil { if err != nil {
return "", err return err
} }
return resp.EventID, err
msgID = resp.EventID
return err
})
return msgID, err
} }
// Delete message // Delete message
@ -147,17 +151,34 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
if msg.ID == "" { if msg.ID == "" {
return "", nil return "", nil
} }
msgID := ""
err := b.retry(func() error {
resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{}) resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{})
if err != nil { if err != nil {
return "", err return err
} }
return resp.EventID, err
msgID = resp.EventID
return err
})
return msgID, err
} }
// Upload a file if it exists // Upload a file if it exists
if msg.Extra != nil { if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) { for _, rmsg := range helper.HandleExtra(&msg, b.General) {
if _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text); err != nil { rmsg := rmsg
err := b.retry(func() error {
_, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text)
return err
})
if err != nil {
b.Log.Errorf("sendText failed: %s", err) b.Log.Errorf("sendText failed: %s", err)
} }
} }
@ -187,7 +208,12 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
EventID: msg.ID, EventID: msg.ID,
Type: "m.replace", Type: "m.replace",
} }
err := b.retry(func() error {
_, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg) _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg)
return err
})
if err != nil { if err != nil {
return "", err return "", err
} }
@ -202,26 +228,58 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
Body: username.plain + msg.Text, Body: username.plain + msg.Text,
FormattedBody: username.formatted + msg.Text, FormattedBody: username.formatted + msg.Text,
} }
resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
var (
resp *matrix.RespSendEvent
err error
)
err = b.retry(func() error {
resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m)
return err
})
if err != nil { if err != nil {
return "", err return "", err
} }
return resp.EventID, err return resp.EventID, err
} }
if b.GetBool("HTMLDisable") { if b.GetBool("HTMLDisable") {
resp, err := b.mc.SendText(channel, username.plain+msg.Text) var (
resp *matrix.RespSendEvent
err error
)
err = b.retry(func() error {
resp, err = b.mc.SendText(channel, username.plain+msg.Text)
return err
})
if err != nil { if err != nil {
return "", err return "", err
} }
return resp.EventID, err return resp.EventID, err
} }
// Post normal message with HTML support (eg riot.im) // Post normal message with HTML support (eg riot.im)
resp, err := b.mc.SendFormattedText(channel, username.plain+msg.Text, username.formatted+helper.ParseMarkdown(msg.Text)) var (
resp *matrix.RespSendEvent
err error
)
err = b.retry(func() error {
resp, err = b.mc.SendFormattedText(channel, username.plain+msg.Text,
username.formatted+helper.ParseMarkdown(msg.Text))
return err
})
if err != nil { if err != nil {
return "", err return "", err
} }
return resp.EventID, err return resp.EventID, err
} }
@ -420,13 +478,25 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
sp := strings.Split(fi.Name, ".") sp := strings.Split(fi.Name, ".")
mtype := mime.TypeByExtension("." + sp[len(sp)-1]) mtype := mime.TypeByExtension("." + sp[len(sp)-1])
// image and video uploads send no username, we have to do this ourself here #715 // image and video uploads send no username, we have to do this ourself here #715
err := b.retry(func() error {
_, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment) _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment)
return err
})
if err != nil { if err != nil {
b.Log.Errorf("file comment failed: %#v", err) b.Log.Errorf("file comment failed: %#v", err)
} }
b.Log.Debugf("uploading file: %s %s", fi.Name, mtype) b.Log.Debugf("uploading file: %s %s", fi.Name, mtype)
res, err := b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))
var res *matrix.RespMediaUpload
err = b.retry(func() error {
res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))
return err
})
if err != nil { if err != nil {
b.Log.Errorf("file upload failed: %#v", err) b.Log.Errorf("file upload failed: %#v", err)
return return
@ -435,18 +505,27 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
switch { switch {
case strings.Contains(mtype, "video"): case strings.Contains(mtype, "video"):
b.Log.Debugf("sendVideo %s", res.ContentURI) b.Log.Debugf("sendVideo %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI) _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI)
return err
})
if err != nil { if err != nil {
b.Log.Errorf("sendVideo failed: %#v", err) b.Log.Errorf("sendVideo failed: %#v", err)
} }
case strings.Contains(mtype, "image"): case strings.Contains(mtype, "image"):
b.Log.Debugf("sendImage %s", res.ContentURI) b.Log.Debugf("sendImage %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendImage(channel, fi.Name, res.ContentURI) _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI)
return err
})
if err != nil { if err != nil {
b.Log.Errorf("sendImage failed: %#v", err) b.Log.Errorf("sendImage failed: %#v", err)
} }
case strings.Contains(mtype, "audio"): case strings.Contains(mtype, "audio"):
b.Log.Debugf("sendAudio %s", res.ContentURI) b.Log.Debugf("sendAudio %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{ _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{
MsgType: "m.audio", MsgType: "m.audio",
Body: fi.Name, Body: fi.Name,
@ -456,11 +535,15 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
Size: uint(len(*fi.Data)), Size: uint(len(*fi.Data)),
}, },
}) })
return err
})
if err != nil { if err != nil {
b.Log.Errorf("sendAudio failed: %#v", err) b.Log.Errorf("sendAudio failed: %#v", err)
} }
default: default:
b.Log.Debugf("sendFile %s", res.ContentURI) b.Log.Debugf("sendFile %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{ _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{
MsgType: "m.file", MsgType: "m.file",
Body: fi.Name, Body: fi.Name,
@ -470,6 +553,9 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
Size: uint(len(*fi.Data)), Size: uint(len(*fi.Data)),
}, },
}) })
return err
})
if err != nil { if err != nil {
b.Log.Errorf("sendFile failed: %#v", err) b.Log.Errorf("sendFile failed: %#v", err)
} }