Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

commit c5db519afeb688299d5695e09e6abfe1e1323f5f Author: Wim <wim@42.be> Date: Sun Dec 06 17:18:25 2020 +0000 Implement ratelimiting (matrix). Fixes #1238 (#1326) diff --git a/bridge/matrix/helpers.go b/bridge/matrix/helpers.go index 8256dc7..03e448d 100644 --- a/bridge/matrix/helpers.go +++ b/bridge/matrix/helpers.go @@ -1813 +18135 @@ func (b *Bmatrix) getAvatarURL(sender string) string {     return url  } + +// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep +func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) { + httpErr := handleError(err) + if httpErr.Errcode != "M_LIMIT_EXCEEDED" { + return 0, false + } + + b.Log.Debugf("ratelimited: %s", httpErr.Err) + b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000) + + return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true +} + +// retry function will check if we're ratelimited and retries again when backoff time expired +// returns original error if not 429 ratelimit +func (b *Bmatrix) retry(f func() error) error { + b.rateMutex.Lock() + defer b.rateMutex.Unlock() + + for { + if err := f(); err != nil { + if backoff, ok := b.handleRatelimit(err); ok { + time.Sleep(backoff) + } else { + return err + } + } else { + return nil + } + } +} diff --git a/bridge/matrix/matrix.go b/bridge/matrix/matrix.go index 725f49a..acb2026 100644 --- a/bridge/matrix/matrix.go +++ b/bridge/matrix/matrix.go @@ -306 +307 @@ type Bmatrix struct {   UserID string   NicknameMap map[string]NicknameCacheEntry   RoomMap map[string]string + rateMutex sync.RWMutex   sync.RWMutex   *bridge.Config  } @@ -9925 +10018 @@ func (b *Bmatrix) Disconnect() error {  }    func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error { -retry: - resp, err := b.mc.JoinRoom(channel.Name, "", nil) - if err != nil { - httpErr := handleError(err) - if httpErr.Errcode == "M_LIMIT_EXCEEDED" { - b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before joining %s", httpErr.RetryAfterMs/1000, channel.Name) - time.Sleep((time.Duration(httpErr.RetryAfterMs) * time.Millisecond)) - - goto retry + return b.retry(func() error { + resp, err := b.mc.JoinRoom(channel.Name, "", nil) + if err != nil { + return err   }   - return err - } + b.Lock() + b.RoomMap[resp.RoomID] = channel.Name + b.Unlock()   - b.Lock() - b.RoomMap[resp.RoomID] = channel.Name - b.Unlock() - - return nil + return nil + })  }    func (b *Bmatrix) Send(msg config.Message) (string, error) { @@ -13511 +12921 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {   Body: username.plain + msg.Text,   FormattedBody: username.formatted + msg.Text,   } - resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) - if err != nil { - return "", err - } - return resp.EventID, err + + msgID := "" + + err := b.retry(func() error { + resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) + if err != nil { + return err + } + + msgID = resp.EventID + + return err + }) + + return msgID, err   }     // Delete message @@ -14717 +15134 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {   if msg.ID == "" {   return "", nil   } - resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{}) - if err != nil { - return "", err - } - return resp.EventID, err + + msgID := "" + + err := b.retry(func() error { + resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{}) + if err != nil { + return err + } + + msgID = resp.EventID + + return err + }) + + return msgID, err   }     // Upload a file if it exists   if msg.Extra != nil {   for _, rmsg := range helper.HandleExtra(&msg, b.General) { - if _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text); err != nil { + rmsg := rmsg + + err := b.retry(func() error { + _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text) + + return err + }) + if err != nil {   b.Log.Errorf("sendText failed: %s", err)   }   } @@ -1877 +20812 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {   EventID: msg.ID,   Type: "m.replace",   } - _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg) + + err := b.retry(func() error { + _, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg) + + return err + })   if err != nil {   return "", err   } @@ -20226 +22858 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {   Body: username.plain + msg.Text,   FormattedBody: username.formatted + msg.Text,   } - resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m) + + var ( + resp *matrix.RespSendEvent + err error + ) + + err = b.retry(func() error { + resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m) + + return err + })   if err != nil {   return "", err   } +   return resp.EventID, err   }     if b.GetBool("HTMLDisable") { - resp, err := b.mc.SendText(channel, username.plain+msg.Text) + var ( + resp *matrix.RespSendEvent + err error + ) + + err = b.retry(func() error { + resp, err = b.mc.SendText(channel, username.plain+msg.Text) + + return err + })   if err != nil {   return "", err   } +   return resp.EventID, err   }     // Post normal message with HTML support (eg riot.im) - resp, err := b.mc.SendFormattedText(channel, username.plain+msg.Text, username.formatted+helper.ParseMarkdown(msg.Text)) + var ( + resp *matrix.RespSendEvent + err error + ) + + err = b.retry(func() error { + resp, err = b.mc.SendFormattedText(channel, username.plain+msg.Text, + username.formatted+helper.ParseMarkdown(msg.Text)) + + return err + })   if err != nil {   return "", err   } +   return resp.EventID, err  }   @@ -42013 +47825 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf   sp := strings.Split(fi.Name, ".")   mtype := mime.TypeByExtension("." + sp[len(sp)-1])   // image and video uploads send no username, we have to do this ourself here #715 - _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment) + err := b.retry(func() error { + _, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment) + + return err + })   if err != nil {   b.Log.Errorf("file comment failed: %#v", err)   }     b.Log.Debugf("uploading file: %s %s", fi.Name, mtype) - res, err := b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data))) + + var res *matrix.RespMediaUpload + + err = b.retry(func() error { + res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data))) + + return err + }) +   if err != nil {   b.Log.Errorf("file upload failed: %#v", err)   return @@ -43540 +50556 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf   switch {   case strings.Contains(mtype, "video"):   b.Log.Debugf("sendVideo %s", res.ContentURI) - _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI) + err = b.retry(func() error { + _, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI) + + return err + })   if err != nil {   b.Log.Errorf("sendVideo failed: %#v", err)   }   case strings.Contains(mtype, "image"):   b.Log.Debugf("sendImage %s", res.ContentURI) - _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI) + err = b.retry(func() error { + _, err = b.mc.SendImage(channel, fi.Name, res.ContentURI) + + return err + })   if err != nil {   b.Log.Errorf("sendImage failed: %#v", err)   }   case strings.Contains(mtype, "audio"):   b.Log.Debugf("sendAudio %s", res.ContentURI) - _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{ - MsgType: "m.audio", - Body: fi.Name, - URL: res.ContentURI, - Info: matrix.AudioInfo{ - Mimetype: mtype, - Size: uint(len(*fi.Data)), - }, + err = b.retry(func() error { + _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{ + MsgType: "m.audio", + Body: fi.Name, + URL: res.ContentURI, + Info: matrix.AudioInfo{ + Mimetype: mtype, + Size: uint(len(*fi.Data)), + }, + }) + + return err   })   if err != nil {   b.Log.Errorf("sendAudio failed: %#v", err)   }   default:   b.Log.Debugf("sendFile %s", res.ContentURI) - _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{ - MsgType: "m.file", - Body: fi.Name, - URL: res.ContentURI, - Info: matrix.FileInfo{ - Mimetype: mtype, - Size: uint(len(*fi.Data)), - }, + err = b.retry(func() error { + _, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{ + MsgType: "m.file", + Body: fi.Name, + URL: res.ContentURI, + Info: matrix.FileInfo{ + Mimetype: mtype, + Size: uint(len(*fi.Data)), + }, + }) + + return err   })   if err != nil {   b.Log.Errorf("sendFile failed: %#v", err)