5
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2024-11-25 12:11:35 +00:00

Add more rate-limit handling (slack) (#581)

This commit is contained in:
Duco van Amstel 2018-11-10 21:09:41 +00:00 committed by Wim
parent ba70691877
commit 2f042ad915
2 changed files with 139 additions and 56 deletions

View File

@ -1,6 +1,7 @@
package bslack package bslack
import ( import (
"context"
"fmt" "fmt"
"regexp" "regexp"
"strings" "strings"
@ -76,17 +77,26 @@ func (b *Bslack) populateUsers() {
b.refreshInProgress = true b.refreshInProgress = true
b.refreshMutex.Unlock() b.refreshMutex.Unlock()
users, err := b.sc.GetUsers()
if err != nil {
b.Log.Errorf("Could not reload users: %#v", err)
return
}
newUsers := map[string]*slack.User{} newUsers := map[string]*slack.User{}
for i := range users { pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
// Use array index for pointer, not the copy for {
// See: https://stackoverflow.com/a/29498133/504018 var err error
newUsers[users[i].ID] = &users[i] pagination, err = pagination.Next(context.Background())
if err != nil {
if pagination.Done(err) {
break
}
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve users: %#v", err)
return
}
continue
}
for i := range pagination.Users {
newUsers[pagination.Users[i].ID] = &pagination.Users[i]
}
} }
b.usersMutex.Lock() b.usersMutex.Lock()
@ -122,10 +132,14 @@ func (b *Bslack) populateChannels() {
for { for {
channels, nextCursor, err := b.sc.GetConversations(queryParams) channels, nextCursor, err := b.sc.GetConversations(queryParams)
if err != nil { if err != nil {
b.Log.Errorf("Could not reload channels: %#v", err) if err = b.handleRateLimit(err); err != nil {
return b.Log.Errorf("Could not retrieve channels: %#v", err)
return
}
continue
} }
for i := 0; i < len(channels); i++ {
for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i] newChannelsByID[channels[i].ID] = &channels[i]
newChannelsByName[channels[i].Name] = &channels[i] newChannelsByName[channels[i].Name] = &channels[i]
} }
@ -189,18 +203,8 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi
// First, deal with bot-originating messages but only do so when not using webhooks: we // First, deal with bot-originating messages but only do so when not using webhooks: we
// would not be able to distinguish which bot would be sending them. // would not be able to distinguish which bot would be sending them.
if ev.BotID != "" && b.GetString(outgoingWebhookConfig) == "" { if err := b.populateMessageWithBotInfo(ev, rmsg); err != nil {
bot, err := b.rtm.GetBotInfo(ev.BotID) return err
if err != nil {
return err
}
if bot.Name != "" && bot.Name != "Slack API Tester" {
rmsg.Username = bot.Name
if ev.Username != "" {
rmsg.Username = ev.Username
}
rmsg.UserID = bot.ID
}
} }
// Second, deal with "real" users if we have the necessary information. // Second, deal with "real" users if we have the necessary information.
@ -227,6 +231,35 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi
return nil return nil
} }
func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config.Message) error {
if ev.BotID == "" || b.GetString(outgoingWebhookConfig) != "" {
return nil
}
var err error
var bot *slack.Bot
for {
bot, err = b.rtm.GetBotInfo(ev.BotID)
if err == nil {
break
}
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve bot information: %#v", err)
return err
}
}
if bot.Name != "" && bot.Name != "Slack API Tester" {
rmsg.Username = bot.Name
if ev.Username != "" {
rmsg.Username = ev.Username
}
rmsg.UserID = bot.ID
}
return nil
}
var ( var (
mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`) mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`)
channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`) channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`)
@ -277,3 +310,13 @@ func (b *Bslack) replaceURL(text string) string {
} }
return text return text
} }
func (b *Bslack) handleRateLimit(err error) error {
rateLimit, ok := err.(*slack.RateLimitedError)
if !ok {
return err
}
b.Log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter)
time.Sleep(rateLimit.RetryAfter)
return nil
}

View File

@ -278,34 +278,20 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {
return "", nil return "", nil
} }
// Delete message // Handle message deletions.
if msg.Event == config.EVENT_MSG_DELETE { var handled bool
// some protocols echo deletes, but with empty ID if handled, err = b.deleteMessage(&msg, channelInfo); handled {
if msg.ID == "" { return msg.ID, err
return "", nil
}
// we get a "slack <ID>", split it
ts := strings.Fields(msg.ID)
_, _, err = b.rtm.DeleteMessage(channelInfo.ID, ts[1])
if err != nil {
return msg.ID, err
}
return msg.ID, nil
} }
// Prepend nick if configured // Prepend nickname if configured.
if b.GetBool(useNickPrefixConfig) { if b.GetBool(useNickPrefixConfig) {
msg.Text = msg.Username + msg.Text msg.Text = msg.Username + msg.Text
} }
// Edit message if we have an ID // Handle message edits.
if msg.ID != "" { if handled, err = b.editMessage(&msg, channelInfo); handled {
ts := strings.Fields(msg.ID) return msg.ID, err
_, _, _, err = b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text)
if err != nil {
return msg.ID, err
}
return msg.ID, nil
} }
messageParameters := b.prepareMessageParameters(&msg) messageParameters := b.prepareMessageParameters(&msg)
@ -319,19 +305,73 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {
} }
} }
// Upload files if necessary (from Slack, Telegram or Mattermost). // Upload files if necessary (from Slack, Telegram or Mattermost).
b.handleUploadFile(&msg, channelInfo.ID) b.uploadFile(&msg, channelInfo.ID)
} }
// Post normal message // Post message.
_, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters) return b.postMessage(&msg, messageParameters, channelInfo)
if err != nil {
return "", err
}
return "slack " + id, nil
} }
// handleUploadFile handles native upload of files func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) {
func (b *Bslack) handleUploadFile(msg *config.Message, channelID string) { if msg.Event != config.EVENT_MSG_DELETE {
return false, nil
}
// Some protocols echo deletes, but with an empty ID.
if msg.ID == "" {
return true, nil
}
// If we get a "slack <ID>", split it.
ts := strings.Fields(msg.ID)
for {
_, _, err := b.rtm.DeleteMessage(channelInfo.ID, ts[1])
if err == nil {
return true, nil
}
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Failed to delete user message from Slack: %#v", err)
return true, err
}
}
}
func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) {
if msg.ID == "" {
return false, nil
}
ts := strings.Fields(msg.ID)
for {
_, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text)
if err == nil {
return true, nil
}
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Failed to edit user message on Slack: %#v", err)
return true, err
}
}
}
func (b *Bslack) postMessage(msg *config.Message, messageParameters *slack.PostMessageParameters, channelInfo *slack.Channel) (string, error) {
for {
_, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters)
if err == nil {
return "slack " + id, nil
}
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Failed to sent user message to Slack: %#v", err)
return "", err
}
}
}
// uploadFile handles native upload of files
func (b *Bslack) uploadFile(msg *config.Message, channelID string) {
for _, f := range msg.Extra["file"] { for _, f := range msg.Extra["file"] {
fi := f.(config.FileInfo) fi := f.(config.FileInfo)
if msg.Text == fi.Comment { if msg.Text == fi.Comment {