Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

commit c03e9f5d028f674c2ad53e14224e3d05e36bd147 Author: Duco van Amstel <duco.vanamstel@gmail.com> Date: Fri Mar 15 20:23:09 2019 +0000 Refactor channel and user management (slack) (#766) diff --git a/bridge/slack/handlers.go b/bridge/slack/handlers.go index 5cfcc42..b756393 100644 --- a/bridge/slack/handlers.go +++ b/bridge/slack/handlers.go @@ -347 +347 @@ func (b *Bslack) handleSlack() {   message.Text = html.UnescapeString(message.Text)     // Add the avatar - message.Avatar = b.getAvatar(message.UserID) + message.Avatar = b.users.getAvatar(message.UserID)     b.Log.Debugf("<= Message is %#v", message)   b.Remote <- *message @@ -7520 +7517 @@ func (b *Bslack) handleSlackClient(messages chan *config.Message) {   // When we join a channel we update the full list of users as   // well as the information for the channel that we joined as this   // should now tell that we are a member of it. - b.channelsMutex.Lock() - b.channelsByID[ev.Channel.ID] = &ev.Channel - b.channelsByName[ev.Channel.Name] = &ev.Channel - b.channelsMutex.Unlock() + b.channels.registerChannel(ev.Channel)   case *slack.ConnectedEvent:   b.si = ev.Info - b.populateChannels(true) - b.populateUsers(true) + b.channels.populateChannels(true) + b.users.populateUsers(true)   case *slack.InvalidAuthEvent:   b.Log.Fatalf("Invalid Token %#v", ev)   case *slack.ConnectionErrorEvent:   b.Log.Errorf("Connection failed %#v %#v", ev.Error(), ev.ErrorObj)   case *slack.MemberJoinedChannelEvent: - b.populateUser(ev.User) + b.users.populateUser(ev.User)   case *slack.LatencyReport:   continue   default: @@ -2107 +2077 @@ func (b *Bslack) handleStatusEvent(ev *slack.MessageEvent, rmsg *config.Message)   rmsg.Username = sSystemUser   rmsg.Event = config.EventJoinLeave   case sChannelTopic, sChannelPurpose: - b.populateChannels(false) + b.channels.populateChannels(false)   rmsg.Event = config.EventTopicChange   case sMessageChanged:   rmsg.Text = ev.SubMessage.Text @@ -2667 +2637 @@ func (b *Bslack) handleAttachments(ev *slack.MessageEvent, rmsg *config.Message)  }    func (b *Bslack) handleTypingEvent(ev *slack.UserTypingEvent) (*config.Message, error) { - channelInfo, err := b.getChannelByID(ev.Channel) + channelInfo, err := b.channels.getChannelByID(ev.Channel)   if err != nil {   return nil, err   } @@ -31636 +3137 @@ func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool {   return false   }   - cMembers := config.ChannelMembers{} - - b.channelMembersMutex.RLock() - - for channelID, members := range b.channelMembers { - for _, member := range members { - channelName := "" - userName := "" - userNick := "" - user := b.getUser(member) - if user != nil { - userName = user.Name - userNick = user.Profile.DisplayName - } - channel, _ := b.getChannelByID(channelID) - if channel != nil { - channelName = channel.Name - } - cMember := config.ChannelMember{ - Username: userName, - Nick: userNick, - UserID: member, - ChannelID: channelID, - ChannelName: channelName, - } - cMembers = append(cMembers, cMember) - } - } - - b.channelMembersMutex.RUnlock() + cMembers := b.channels.getChannelMembers(b.users)     extra := make(map[string][]interface{})   extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers) diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go index ddbc770..7c012a4 100644 --- a/bridge/slack/helpers.go +++ b/bridge/slack/helpers.go @@ -157 +157 @@ import (  // router before we apply message-dependent modifications.  func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) {   // Use our own func because rtm.GetChannelInfo doesn't work for private channels. - channel, err := b.getChannelByID(ev.Channel) + channel, err := b.channels.getChannelByID(ev.Channel)   if err != nil {   return nil, err   } @@ -777 +777 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi   return nil   }   - user := b.getUser(userID) + user := b.users.getUser(userID)   if user == nil {   return fmt.Errorf("could not find information for user with id %s", ev.User)   } @@ -1487 +1487 @@ func (b *Bslack) extractTopicOrPurpose(text string) (string, string) {  func (b *Bslack) replaceMention(text string) string {   replaceFunc := func(match string) string {   userID := strings.Trim(match, "@<>") - if username := b.getUsername(userID); userID != "" { + if username := b.users.getUsername(userID); userID != "" {   return "@" + username   }   return match diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index f0f7768..36b74b9 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -3020 +308 @@ type Bslack struct {   uuid string   useChannelID bool   - users map[string]*slack.User - usersMutex sync.RWMutex - - channelsByID map[string]*slack.Channel - channelsByName map[string]*slack.Channel - channelsMutex sync.RWMutex - - channelMembers map[string][]string - channelMembersMutex sync.RWMutex - - refreshInProgress bool - earliestChannelRefresh time.Time - earliestUserRefresh time.Time - refreshMutex sync.Mutex + channels *channels + users *users  }    const ( @@ -9414 +829 @@ func newBridge(cfg *bridge.Config) *Bslack {   cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err)   }   b := &Bslack{ - Config: cfg, - uuid: xid.New().String(), - cache: newCache, - users: map[string]*slack.User{}, - channelsByID: map[string]*slack.Channel{}, - channelsByName: map[string]*slack.Channel{}, - earliestChannelRefresh: time.Now(), - earliestUserRefresh: time.Now(), + Config: cfg, + uuid: xid.New().String(), + cache: newCache,   }   return b  } @@ -1217 +10412 @@ func (b *Bslack) Connect() error {   // If we have a token we use the Slack websocket-based RTM for both sending and receiving.   if token := b.GetString(tokenConfig); token != "" {   b.Log.Info("Connecting using token") +   b.sc = slack.New(token, slack.OptionDebug(b.GetBool("Debug"))) + + b.channels = newChannelManager(b.Log, b.sc) + b.users = newUserManager(b.Log, b.sc) +   b.rtm = b.sc.NewRTM()   go b.rtm.ManageConnection()   go b.handleSlack() @@ -1639 +1519 @@ func (b *Bslack) JoinChannel(channel config.ChannelInfo) error {   return nil   }   - b.populateChannels(false) + b.channels.populateChannels(false)   - channelInfo, err := b.getChannel(channel.Name) + channelInfo, err := b.channels.getChannel(channel.Name)   if err != nil {   return fmt.Errorf("could not join channel: %#v", err)   } @@ -2757 +2637 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {   return "", nil   }   - channelInfo, err := b.getChannel(msg.Channel) + channelInfo, err := b.channels.getChannel(msg.Channel)   if err != nil {   return "", fmt.Errorf("could not send message: %v", err)   } diff --git a/bridge/slack/users_channels.go b/bridge/slack/users_channels.go index 7eadd03..9a8568b 100644 --- a/bridge/slack/users_channels.go +++ b/bridge/slack/users_channels.go @@ -414 +438 @@ import (   "context"   "fmt"   "strings" + "sync"   "time"   + "github.com/42wim/matterbridge/bridge/config"   "github.com/nlopes/slack" + "github.com/sirupsen/logrus"  )    const minimumRefreshInterval = 10 * time.Second   -func (b *Bslack) getUser(id string) *slack.User { +type users struct { + log *logrus.Entry + sc *slack.Client + + users map[string]*slack.User + usersMutex sync.RWMutex + + refreshInProgress bool + earliestRefresh time.Time + refreshMutex sync.Mutex +} + +func newUserManager(log *logrus.Entry, sc *slack.Client) *users { + return &users{ + log: log, + sc: sc, + users: make(map[string]*slack.User), + earliestRefresh: time.Now(), + } +} + +func (b *users) getUser(id string) *slack.User {   b.usersMutex.RLock()   user, ok := b.users[id]   b.usersMutex.RUnlock() @@ -2525 +4925 @@ func (b *Bslack) getUser(id string) *slack.User {   return b.users[id]  }   -func (b *Bslack) getUsername(id string) string { +func (b *users) getUsername(id string) string {   if user := b.getUser(id); user != nil {   if user.Profile.DisplayName != "" {   return user.Profile.DisplayName   }   return user.Name   } - b.Log.Warnf("Could not find user with ID '%s'", id) + b.log.Warnf("Could not find user with ID '%s'", id)   return ""  }   -func (b *Bslack) getAvatar(id string) string { +func (b *users) getAvatar(id string) string {   if user := b.getUser(id); user != nil {   return user.Profile.Image48   }   return ""  }   -func (b *Bslack) populateUser(userID string) { +func (b *users) populateUser(userID string) {   b.usersMutex.RLock()   _, exists := b.users[userID]   b.usersMutex.RUnlock() @@ -547 +787 @@ func (b *Bslack) populateUser(userID string) {     user, err := b.sc.GetUserInfo(userID)   if err != nil { - b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err) + b.log.Debugf("GetUserInfo failed for %v: %v", userID, err)   return   }   @@ -6310 +8710 @@ func (b *Bslack) populateUser(userID string) {   b.usersMutex.Unlock()  }   -func (b *Bslack) populateUsers(wait bool) { +func (b *users) populateUsers(wait bool) {   b.refreshMutex.Lock() - if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) { - b.Log.Debugf("Not refreshing user list as it was done less than %v ago.", + if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { + b.log.Debugf("Not refreshing user list as it was done less than %v ago.",   minimumRefreshInterval)   b.refreshMutex.Unlock()   @@ -928 +1168 @@ func (b *Bslack) populateUsers(wait bool) {   break   }   - if err = handleRateLimit(b.Log, err); err != nil { - b.Log.Errorf("Could not retrieve users: %#v", err) + if err = handleRateLimit(b.log, err); err != nil { + b.log.Errorf("Could not retrieve users: %#v", err)   return   }   continue @@ -10211 +12611 @@ func (b *Bslack) populateUsers(wait bool) {   for i := range pagination.Users {   newUsers[pagination.Users[i].ID] = &pagination.Users[i]   } - b.Log.Debugf("getting %d users", len(pagination.Users)) + b.log.Debugf("getting %d users", len(pagination.Users))   count++   // more > 2000 users, slack will complain and ratelimit. break   if count > 10 { - b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.") + b.log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")   break   }   } @@ -11740 +141104 @@ func (b *Bslack) populateUsers(wait bool) {     b.refreshMutex.Lock()   defer b.refreshMutex.Unlock() - b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval) + b.earliestRefresh = time.Now().Add(minimumRefreshInterval)   b.refreshInProgress = false  }   -func (b *Bslack) getChannel(channel string) (*slack.Channel, error) { +type channels struct { + log *logrus.Entry + sc *slack.Client + + channelsByID map[string]*slack.Channel + channelsByName map[string]*slack.Channel + channelsMutex sync.RWMutex + + channelMembers map[string][]string + channelMembersMutex sync.RWMutex + + refreshInProgress bool + earliestRefresh time.Time + refreshMutex sync.Mutex +} + +func newChannelManager(log *logrus.Entry, sc *slack.Client) *channels { + return &channels{ + log: log, + sc: sc, + channelsByID: make(map[string]*slack.Channel), + channelsByName: make(map[string]*slack.Channel), + earliestRefresh: time.Now(), + } +} + +func (b *channels) getChannel(channel string) (*slack.Channel, error) {   if strings.HasPrefix(channel, "ID:") {   return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))   }   return b.getChannelByName(channel)  }   -func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) { +func (b *channels) getChannelByName(name string) (*slack.Channel, error) {   return b.getChannelBy(name, b.channelsByName)  }   -func (b *Bslack) getChannelByID(id string) (*slack.Channel, error) { +func (b *channels) getChannelByID(id string) (*slack.Channel, error) {   return b.getChannelBy(id, b.channelsByID)  }   -func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) { +func (b *channels) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {   b.channelsMutex.RLock()   defer b.channelsMutex.RUnlock()     if channel, ok := lookupMap[lookupKey]; ok {   return channel, nil   } - return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey) + return nil, fmt.Errorf("channel %s not found", lookupKey)  }   -func (b *Bslack) populateChannels(wait bool) { +func (b *channels) getChannelMembers(users *users) config.ChannelMembers { + b.channelMembersMutex.RLock() + defer b.channelMembersMutex.RUnlock() + + membersInfo := config.ChannelMembers{} + for channelID, members := range b.channelMembers { + for _, member := range members { + channelName := "" + userName := "" + userNick := "" + user := users.getUser(member) + if user != nil { + userName = user.Name + userNick = user.Profile.DisplayName + } + channel, _ := b.getChannelByID(channelID) + if channel != nil { + channelName = channel.Name + } + memberInfo := config.ChannelMember{ + Username: userName, + Nick: userNick, + UserID: member, + ChannelID: channelID, + ChannelName: channelName, + } + membersInfo = append(membersInfo, memberInfo) + } + } + return membersInfo +} + +func (b *channels) registerChannel(channel slack.Channel) { + b.channelsMutex.Lock() + b.channelsByID[channel.ID] = &channel + b.channelsByName[channel.Name] = &channel + b.channelsMutex.Unlock() +} + +func (b *channels) populateChannels(wait bool) {   b.refreshMutex.Lock() - if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) { - b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", - minimumRefreshInterval) + if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { + b.log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", minimumRefreshInterval)   b.refreshMutex.Unlock()   return   } @@ -1758 +2638 @@ func (b *Bslack) populateChannels(wait bool) {   for {   channels, nextCursor, err := b.sc.GetConversations(queryParams)   if err != nil { - if err = handleRateLimit(b.Log, err); err != nil { - b.Log.Errorf("Could not retrieve channels: %#v", err) + if err = handleRateLimit(b.log, err); err != nil { + b.log.Errorf("Could not retrieve channels: %#v", err)   return   }   continue @@ -2176 +3056 @@ func (b *Bslack) populateChannels(wait bool) {     b.refreshMutex.Lock()   defer b.refreshMutex.Unlock() - b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval) + b.earliestRefresh = time.Now().Add(minimumRefreshInterval)   b.refreshInProgress = false  }