Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

commit f31ba861996fa81ee3e9449549d4b92bb1b3778d Author: Wim <wim@42.be> Date: Fri Jan 18 18:35:31 2019 +0000 Add initial support for getting ChannelMember info of all bridges (#678) * Add initial support for getting ChannelMember info of all bridges. Adds an EventGetChannelMembers event, which gets send every x time to all bridges. Bridges should respond on this event with a Message containing ChannelMembers in the EventGetChannelMembers key in the Extra field. handleEventGetChannelMembers will handle this Message and sets the contained ChannelMembers to the Bridge struct. * Add ChannelMembers support to the slack bridge diff --git a/bridge/bridge.go b/bridge/bridge.go index 336e2e2..6b955a9 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -56 +57 @@ import (     "github.com/42wim/matterbridge/bridge/config"   "github.com/sirupsen/logrus" + "sync"  )    type Bridger interface { @@ -1614 +1716 @@ type Bridger interface {    type Bridge struct {   Bridger - Name string - Account string - Protocol string - Channels map[string]config.ChannelInfo - Joined map[string]bool - Log *logrus.Entry - Config config.Config - General *config.Protocol + Name string + Account string + Protocol string + Channels map[string]config.ChannelInfo + Joined map[string]bool + ChannelMembers *config.ChannelMembers + Log *logrus.Entry + Config config.Config + General *config.Protocol + *sync.RWMutex  }    type Config struct { @@ -3715 +4017 @@ type Config struct {  type Factory func(*Config) Bridger    func New(bridge *config.Bridge) *Bridge { - b := new(Bridge) - b.Channels = make(map[string]config.ChannelInfo) + b := &Bridge{ + Channels: make(map[string]config.ChannelInfo), + RWMutex: new(sync.RWMutex), + Joined: make(map[string]bool), + }   accInfo := strings.Split(bridge.Account, ".")   protocol := accInfo[0]   name := accInfo[1]   b.Name = name   b.Protocol = protocol   b.Account = bridge.Account - b.Joined = make(map[string]bool)   return b  }   @@ -546 +5913 @@ func (b *Bridge) JoinChannels() error {   return err  }   +// SetChannelMembers sets the newMembers to the bridge ChannelMembers +func (b *Bridge) SetChannelMembers(newMembers *config.ChannelMembers) { + b.Lock() + b.ChannelMembers = newMembers + b.Unlock() +} +  func (b *Bridge) joinChannels(channels map[string]config.ChannelInfo, exists map[string]bool) error {   for ID, channel := range channels {   if !exists[ID] { diff --git a/bridge/config/config.go b/bridge/config/config.go index 932de2e..3d1206c 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -1416 +1417 @@ import (  )    const ( - EventJoinLeave = "join_leave" - EventTopicChange = "topic_change" - EventFailure = "failure" - EventFileFailureSize = "file_failure_size" - EventAvatarDownload = "avatar_download" - EventRejoinChannels = "rejoin_channels" - EventUserAction = "user_action" - EventMsgDelete = "msg_delete" - EventAPIConnected = "api_connected" - EventUserTyping = "user_typing" + EventJoinLeave = "join_leave" + EventTopicChange = "topic_change" + EventFailure = "failure" + EventFileFailureSize = "file_failure_size" + EventAvatarDownload = "avatar_download" + EventRejoinChannels = "rejoin_channels" + EventUserAction = "user_action" + EventMsgDelete = "msg_delete" + EventAPIConnected = "api_connected" + EventUserTyping = "user_typing" + EventGetChannelMembers = "get_channel_members"  )    type Message struct { @@ -616 +6216 @@ type ChannelInfo struct {   Options ChannelOptions  }   +type ChannelMember struct { + Username string + Nick string + UserID string + ChannelID string + ChannelName string +} + +type ChannelMembers []ChannelMember +  type Protocol struct {   AuthCode string // steam   BindAddress string // mattermost, slack // DEPRECATED diff --git a/bridge/slack/handlers.go b/bridge/slack/handlers.go index f46eb59..5e601ea 100644 --- a/bridge/slack/handlers.go +++ b/bridge/slack/handlers.go @@ -3096 +30958 @@ func (b *Bslack) handleDownloadFile(rmsg *config.Message, file *slack.File, retr   return nil  }   +// handleGetChannelMembers handles messages containing the GetChannelMembers event +// Sends a message to the router containing *config.ChannelMembers +func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool { + if rmsg.Event != config.EventGetChannelMembers { + return false + } + + cMembers := config.ChannelMembers{} + + b.channelMembersMutex.RLock() + + for channelID, members := range b.channelMembers { + for _, member := range members { + channelName := "" + userName := "" + userNick := "" + user := b.getUser(member) + if user != nil { + userName = user.Name + userNick = user.Profile.DisplayName + } + channel, _ := b.getChannelByID(channelID) + if channel != nil { + channelName = channel.Name + } + cMember := config.ChannelMember{ + Username: userName, + Nick: userNick, + UserID: member, + ChannelID: channelID, + ChannelName: channelName, + } + cMembers = append(cMembers, cMember) + } + } + + b.channelMembersMutex.RUnlock() + + extra := make(map[string][]interface{}) + extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers) + msg := config.Message{ + Extra: extra, + Event: config.EventGetChannelMembers, + Account: b.Account, + } + + b.Log.Debugf("sending msg to remote %#v", msg) + b.Remote <- msg + + return true +} +  // fileCached implements Matterbridge's caching logic for files  // shared via Slack.  // diff --git a/bridge/slack/helpers.go b/bridge/slack/helpers.go index 4e6e565..d84353f 100644 --- a/bridge/slack/helpers.go +++ b/bridge/slack/helpers.go @@ -937 +939 @@ func (b *Bslack) populateUsers(wait bool) {   return   }   for b.refreshInProgress { + b.refreshMutex.Unlock()   time.Sleep(time.Second) + b.refreshMutex.Lock()   }   b.refreshInProgress = true   b.refreshMutex.Unlock() @@ -13913 +14116 @@ func (b *Bslack) populateChannels(wait bool) {   return   }   for b.refreshInProgress { + b.refreshMutex.Unlock()   time.Sleep(time.Second) + b.refreshMutex.Lock()   }   b.refreshInProgress = true   b.refreshMutex.Unlock()     newChannelsByID := map[string]*slack.Channel{}   newChannelsByName := map[string]*slack.Channel{} + newChannelMembers := make(map[string][]string)     // We only retrieve public and private channels, not IMs   // and MPIMs as those do not have a channel name. @@ -1667 +17118 @@ func (b *Bslack) populateChannels(wait bool) {   for i := range channels {   newChannelsByID[channels[i].ID] = &channels[i]   newChannelsByName[channels[i].Name] = &channels[i] + // also find all the members in every channel + members, err := b.getUsersInConversation(channels[i].ID) + if err != nil { + if err = b.handleRateLimit(err); err != nil { + b.Log.Errorf("Could not retrieve channel members: %#v", err) + return + } + continue + } + newChannelMembers[channels[i].ID] = members   } +   if nextCursor == "" {   break   } @@ -1786 +19410 @@ func (b *Bslack) populateChannels(wait bool) {   b.channelsByID = newChannelsByID   b.channelsByName = newChannelsByName   + b.channelMembersMutex.Lock() + defer b.channelMembersMutex.Unlock() + b.channelMembers = newChannelMembers +   b.refreshMutex.Lock()   defer b.refreshMutex.Unlock()   b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval) @@ -3673 +38729 @@ func (b *Bslack) handleRateLimit(err error) error {   time.Sleep(rateLimit.RetryAfter)   return nil  } + +// getUsersInConversation returns an array of userIDs that are members of channelID +func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) { + channelMembers := []string{} + for { + queryParams := &slack.GetUsersInConversationParameters{ + ChannelID: channelID, + } + + members, nextCursor, err := b.sc.GetUsersInConversation(queryParams) + if err != nil { + if err = b.handleRateLimit(err); err != nil { + return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err) + } + continue + } + + channelMembers = append(channelMembers, members...) + + if nextCursor == "" { + break + } + queryParams.Cursor = nextCursor + } + return channelMembers, nil +} diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index b943027..001b126 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -376 +379 @@ type Bslack struct {   channelsByName map[string]*slack.Channel   channelsMutex sync.RWMutex   + channelMembers map[string][]string + channelMembersMutex sync.RWMutex +   refreshInProgress bool   earliestChannelRefresh time.Time   earliestUserRefresh time.Time @@ -2676 +27011 @@ func (b *Bslack) sendWebhook(msg config.Message) error {  }    func (b *Bslack) sendRTM(msg config.Message) (string, error) { + // Handle channelmember messages. + if handled := b.handleGetChannelMembers(&msg); handled { + return "", nil + } +   channelInfo, err := b.getChannel(msg.Channel)   if err != nil {   return "", fmt.Errorf("could not send message: %v", err) diff --git a/gateway/handlers.go b/gateway/handlers.go index 741c312..5af13c1 100644 --- a/gateway/handlers.go +++ b/gateway/handlers.go @@ -306 +3023 @@ func (r *Router) handleEventFailure(msg *config.Message) {   }  }   +// handleEventGetChannelMembers handles channel members +func (r *Router) handleEventGetChannelMembers(msg *config.Message) { + if msg.Event != config.EventGetChannelMembers { + return + } + for _, gw := range r.Gateways { + for _, br := range gw.Bridges { + if msg.Account == br.Account { + cMembers := msg.Extra[config.EventGetChannelMembers][0].(config.ChannelMembers) + flog.Debugf("Syncing channelmembers from %s", msg.Account) + br.SetChannelMembers(&cMembers) + return + } + } + } +} +  // handleEventRejoinChannels handles rejoining of channels.  func (r *Router) handleEventRejoinChannels(msg *config.Message) {   if msg.Event != config.EventRejoinChannels { diff --git a/gateway/router.go b/gateway/router.go index a7181b9..d3c33b2 100644 --- a/gateway/router.go +++ b/gateway/router.go @@ -26 +27 @@ package gateway    import (   "fmt" + "sync"   "time"     "github.com/42wim/matterbridge/bridge" @@ -166 +177 @@ type Router struct {   Gateways map[string]*Gateway   Message chan config.Message   MattermostPlugin chan config.Message + sync.RWMutex  }    func NewRouter(cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) { @@ -816 +837 @@ func (r *Router) Start() error {   }   }   go r.handleReceive() + go r.updateChannelMembers()   return nil  }   @@ -1086 +1117 @@ func (r *Router) getBridge(account string) *bridge.Bridge {  func (r *Router) handleReceive() {   for msg := range r.Message {   msg := msg // scopelint + r.handleEventGetChannelMembers(&msg)   r.handleEventFailure(&msg)   r.handleEventRejoinChannels(&msg)   for _, gw := range r.Gateways { @@ -1293 +13321 @@ func (r *Router) handleReceive() {   }   }  } + +// updateChannelMembers sends every minute an GetChannelMembers event to all bridges. +func (r *Router) updateChannelMembers() { + // TODO sleep a minute because slack can take a while + // fix this by having actually connectionDone events send to the router + time.Sleep(time.Minute) + for { + for _, gw := range r.Gateways { + for _, br := range gw.Bridges { + flog.Debugf("sending %s to %s", config.EventGetChannelMembers, br.Account) + if _, err := br.Send(config.Message{Event: config.EventGetChannelMembers}); err != nil { + flog.Errorf("updateChannelMembers: %s", err) + } + } + } + time.Sleep(time.Minute) + } +}