Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

commit 6104caee01adbdee4fe5d20592528d703772fd16 Author: Duco van Amstel <duco.vanamstel@gmail.com> Date: Sat Nov 10 21:09:41 2018 +0000 Add more rate-limit handling (slack) (#581) diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go index fdcef2e..861a875 100644 --- a/bridge/slack/helpers.go +++ b/bridge/slack/helpers.go @@ -16 +17 @@  package bslack    import ( + "context"   "fmt"   "regexp"   "strings" @@ -7617 +7726 @@ func (b *Bslack) populateUsers() {   b.refreshInProgress = true   b.refreshMutex.Unlock()   - users, err := b.sc.GetUsers() - if err != nil { - b.Log.Errorf("Could not reload users: %#v", err) - return - } -   newUsers := map[string]*slack.User{} - for i := range users { - // Use array index for pointer, not the copy - // See: https://stackoverflow.com/a/29498133/504018 - newUsers[users[i].ID] = &users[i] + pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200)) + for { + var err error + pagination, err = pagination.Next(context.Background()) + if err != nil { + if pagination.Done(err) { + break + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve users: %#v", err) + return + } + continue + } + + for i := range pagination.Users { + newUsers[pagination.Users[i].ID] = &pagination.Users[i] + }   }     b.usersMutex.Lock() @@ -12210 +13214 @@ func (b *Bslack) populateChannels() {   for {   channels, nextCursor, err := b.sc.GetConversations(queryParams)   if err != nil { - b.Log.Errorf("Could not reload channels: %#v", err) - return + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve channels: %#v", err) + return + } + continue   } - for i := 0; i < len(channels); i++ { + + for i := range channels {   newChannelsByID[channels[i].ID] = &channels[i]   newChannelsByName[channels[i].Name] = &channels[i]   } @@ -18918 +2038 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi     // First, deal with bot-originating messages but only do so when not using webhooks: we   // would not be able to distinguish which bot would be sending them. - if ev.BotID != "" && b.GetString(outgoingWebhookConfig) == "" { - bot, err := b.rtm.GetBotInfo(ev.BotID) - if err != nil { - return err - } - if bot.Name != "" && bot.Name != "Slack API Tester" { - rmsg.Username = bot.Name - if ev.Username != "" { - rmsg.Username = ev.Username - } - rmsg.UserID = bot.ID - } + if err := b.populateMessageWithBotInfo(ev, rmsg); err != nil { + return err   }     // Second, deal with "real" users if we have the necessary information. @@ -2276 +23135 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi   return nil  }   +func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config.Message) error { + if ev.BotID == "" || b.GetString(outgoingWebhookConfig) != "" { + return nil + } + + var err error + var bot *slack.Bot + for { + bot, err = b.rtm.GetBotInfo(ev.BotID) + if err == nil { + break + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve bot information: %#v", err) + return err + } + } + + if bot.Name != "" && bot.Name != "Slack API Tester" { + rmsg.Username = bot.Name + if ev.Username != "" { + rmsg.Username = ev.Username + } + rmsg.UserID = bot.ID + } + return nil +} +  var (   mentionRE = regexp.MustCompile(`<@([a-zA-Z0-9]+)>`)   channelRE = regexp.MustCompile(`<#[a-zA-Z0-9]+\|(.+?)>`) @@ -2773 +31013 @@ func (b *Bslack) replaceURL(text string) string {   }   return text  } + +func (b *Bslack) handleRateLimit(err error) error { + rateLimit, ok := err.(*slack.RateLimitedError) + if !ok { + return err + } + b.Log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter) + time.Sleep(rateLimit.RetryAfter) + return nil +} diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index fd8e379..038e0d9 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -27834 +27820 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {   return "", nil   }   - // Delete message - if msg.Event == config.EVENT_MSG_DELETE { - // some protocols echo deletes, but with empty ID - if msg.ID == "" { - return "", nil - } - // we get a "slack <ID>", split it - ts := strings.Fields(msg.ID) - _, _, err = b.rtm.DeleteMessage(channelInfo.ID, ts[1]) - if err != nil { - return msg.ID, err - } - return msg.ID, nil + // Handle message deletions. + var handled bool + if handled, err = b.deleteMessage(&msg, channelInfo); handled { + return msg.ID, err   }   - // Prepend nick if configured + // Prepend nickname if configured.   if b.GetBool(useNickPrefixConfig) {   msg.Text = msg.Username + msg.Text   }   - // Edit message if we have an ID - if msg.ID != "" { - ts := strings.Fields(msg.ID) - _, _, _, err = b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text) - if err != nil { - return msg.ID, err - } - return msg.ID, nil + // Handle message edits. + if handled, err = b.editMessage(&msg, channelInfo); handled { + return msg.ID, err   }     messageParameters := b.prepareMessageParameters(&msg) @@ -31919 +30573 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {   }   }   // Upload files if necessary (from Slack, Telegram or Mattermost). - b.handleUploadFile(&msg, channelInfo.ID) + b.uploadFile(&msg, channelInfo.ID)   }   - // Post normal message - _, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters) - if err != nil { - return "", err + // Post message. + return b.postMessage(&msg, messageParameters, channelInfo) +} + +func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) { + if msg.Event != config.EVENT_MSG_DELETE { + return false, nil + } + + // Some protocols echo deletes, but with an empty ID. + if msg.ID == "" { + return true, nil + } + + // If we get a "slack <ID>", split it. + ts := strings.Fields(msg.ID) + for { + _, _, err := b.rtm.DeleteMessage(channelInfo.ID, ts[1]) + if err == nil { + return true, nil + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Failed to delete user message from Slack: %#v", err) + return true, err + } + } +} + +func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (bool, error) { + if msg.ID == "" { + return false, nil + } + + ts := strings.Fields(msg.ID) + for { + _, _, _, err := b.rtm.UpdateMessage(channelInfo.ID, ts[1], msg.Text) + if err == nil { + return true, nil + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Failed to edit user message on Slack: %#v", err) + return true, err + } + } +} + +func (b *Bslack) postMessage(msg *config.Message, messageParameters *slack.PostMessageParameters, channelInfo *slack.Channel) (string, error) { + for { + _, id, err := b.rtm.PostMessage(channelInfo.ID, msg.Text, *messageParameters) + if err == nil { + return "slack " + id, nil + } + + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Failed to sent user message to Slack: %#v", err) + return "", err + }   } - return "slack " + id, nil  }   -// handleUploadFile handles native upload of files -func (b *Bslack) handleUploadFile(msg *config.Message, channelID string) { +// uploadFile handles native upload of files +func (b *Bslack) uploadFile(msg *config.Message, channelID string) {   for _, f := range msg.Extra["file"] {   fi := f.(config.FileInfo)   if msg.Text == fi.Comment {