Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

Viewing file on branch master

1package bslack
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "github.com/matterbridge-org/matterbridge/bridge/config"
11 "github.com/sirupsen/logrus"
12 "github.com/slack-go/slack"
13)
14
15const minimumRefreshInterval = 10 * time.Second
16
17type users struct {
18 log *logrus.Entry
19 sc *slack.Client
20
21 users map[string]*slack.User
22 usersMutex sync.RWMutex
23 usersSyncPoints map[string]chan struct{}
24
25 refreshInProgress bool
26 earliestRefresh time.Time
27 refreshMutex sync.Mutex
28}
29
30func newUserManager(log *logrus.Entry, sc *slack.Client) *users {
31 return &users{
32 log: log,
33 sc: sc,
34 users: make(map[string]*slack.User),
35 usersSyncPoints: make(map[string]chan struct{}),
36 earliestRefresh: time.Now(),
37 }
38}
39
40func (b *users) getUser(id string) *slack.User {
41 b.usersMutex.RLock()
42 user, ok := b.users[id]
43 b.usersMutex.RUnlock()
44 if ok {
45 return user
46 }
47 b.populateUser(id)
48 b.usersMutex.RLock()
49 defer b.usersMutex.RUnlock()
50
51 return b.users[id]
52}
53
54func (b *users) getUsername(id string) string {
55 if user := b.getUser(id); user != nil {
56 if user.Profile.DisplayName != "" {
57 return user.Profile.DisplayName
58 }
59 return user.Name
60 }
61 b.log.Warnf("Could not find user with ID '%s'", id)
62 return ""
63}
64
65func (b *users) getAvatar(id string) string {
66 if user := b.getUser(id); user != nil {
67 return user.Profile.Image48
68 }
69 return ""
70}
71
72func (b *users) populateUser(userID string) {
73 for {
74 b.usersMutex.Lock()
75 _, exists := b.users[userID]
76 if exists {
77 // already in cache
78 b.usersMutex.Unlock()
79 return
80 }
81
82 if syncPoint, ok := b.usersSyncPoints[userID]; ok {
83 // Another goroutine is already populating this user for us so wait on it to finish.
84 b.usersMutex.Unlock()
85 <-syncPoint
86 // We do not return and iterate again to check that the entry does indeed exist
87 // in case the previous query failed for some reason.
88 } else {
89 b.usersSyncPoints[userID] = make(chan struct{})
90 defer func() {
91 // Wake up any waiting goroutines and remove the synchronization point.
92 close(b.usersSyncPoints[userID])
93 delete(b.usersSyncPoints, userID)
94 }()
95 break
96 }
97 }
98
99 // Do not hold the lock while fetching information from Slack
100 // as this might take an unbounded amount of time.
101 b.usersMutex.Unlock()
102
103 user, err := b.sc.GetUserInfo(userID)
104 if err != nil {
105 b.log.Debugf("GetUserInfo failed for %v: %v", userID, err)
106 return
107 }
108
109 b.usersMutex.Lock()
110 defer b.usersMutex.Unlock()
111
112 // Register user information.
113 b.users[userID] = user
114}
115
116func (b *users) invalidateUser(userID string) {
117 b.usersMutex.Lock()
118 defer b.usersMutex.Unlock()
119 delete(b.users, userID)
120}
121
122func (b *users) populateUsers(wait bool) {
123 b.refreshMutex.Lock()
124 if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) {
125 b.log.Debugf("Not refreshing user list as it was done less than %v ago.", minimumRefreshInterval)
126 b.refreshMutex.Unlock()
127
128 return
129 }
130 for b.refreshInProgress {
131 b.refreshMutex.Unlock()
132 time.Sleep(time.Second)
133 b.refreshMutex.Lock()
134 }
135 b.refreshInProgress = true
136 b.refreshMutex.Unlock()
137
138 newUsers := map[string]*slack.User{}
139 pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
140 count := 0
141 for {
142 var err error
143 pagination, err = pagination.Next(context.Background())
144 time.Sleep(time.Second)
145 if err != nil {
146 if pagination.Done(err) {
147 break
148 }
149
150 if err = handleRateLimit(b.log, err); err != nil {
151 b.log.Errorf("Could not retrieve users: %#v", err)
152 return
153 }
154 continue
155 }
156
157 for i := range pagination.Users {
158 newUsers[pagination.Users[i].ID] = &pagination.Users[i]
159 }
160 b.log.Debugf("getting %d users", len(pagination.Users))
161 count++
162 // more > 2000 users, slack will complain and ratelimit. break
163 if count > 10 {
164 b.log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")
165 break
166 }
167 }
168
169 b.usersMutex.Lock()
170 defer b.usersMutex.Unlock()
171 b.users = newUsers
172
173 b.refreshMutex.Lock()
174 defer b.refreshMutex.Unlock()
175 b.earliestRefresh = time.Now().Add(minimumRefreshInterval)
176 b.refreshInProgress = false
177}
178
179type channels struct {
180 log *logrus.Entry
181 sc *slack.Client
182
183 channelsByID map[string]*slack.Channel
184 channelsByName map[string]*slack.Channel
185 channelsMutex sync.RWMutex
186
187 channelMembers map[string][]string
188 channelMembersMutex sync.RWMutex
189
190 refreshInProgress bool
191 earliestRefresh time.Time
192 refreshMutex sync.Mutex
193}
194
195func newChannelManager(log *logrus.Entry, sc *slack.Client) *channels {
196 return &channels{
197 log: log,
198 sc: sc,
199 channelsByID: make(map[string]*slack.Channel),
200 channelsByName: make(map[string]*slack.Channel),
201 earliestRefresh: time.Now(),
202 }
203}
204
205func (b *channels) getChannel(channel string) (*slack.Channel, error) {
206 if strings.HasPrefix(channel, "ID:") {
207 return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))
208 }
209 return b.getChannelByName(channel)
210}
211
212func (b *channels) getChannelByName(name string) (*slack.Channel, error) {
213 return b.getChannelBy(name, b.channelsByName)
214}
215
216func (b *channels) getChannelByID(id string) (*slack.Channel, error) {
217 return b.getChannelBy(id, b.channelsByID)
218}
219
220func (b *channels) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {
221 b.channelsMutex.RLock()
222 defer b.channelsMutex.RUnlock()
223
224 if channel, ok := lookupMap[lookupKey]; ok {
225 return channel, nil
226 }
227 return nil, fmt.Errorf("channel %s not found", lookupKey)
228}
229
230func (b *channels) getChannelMembers(users *users) config.ChannelMembers {
231 b.channelMembersMutex.RLock()
232 defer b.channelMembersMutex.RUnlock()
233
234 membersInfo := config.ChannelMembers{}
235 for channelID, members := range b.channelMembers {
236 for _, member := range members {
237 channelName := ""
238 userName := ""
239 userNick := ""
240 user := users.getUser(member)
241 if user != nil {
242 userName = user.Name
243 userNick = user.Profile.DisplayName
244 }
245 channel, _ := b.getChannelByID(channelID)
246 if channel != nil {
247 channelName = channel.Name
248 }
249 memberInfo := config.ChannelMember{
250 Username: userName,
251 Nick: userNick,
252 UserID: member,
253 ChannelID: channelID,
254 ChannelName: channelName,
255 }
256 membersInfo = append(membersInfo, memberInfo)
257 }
258 }
259 return membersInfo
260}
261
262func (b *channels) registerChannel(channel slack.Channel) {
263 b.channelsMutex.Lock()
264 defer b.channelsMutex.Unlock()
265
266 b.channelsByID[channel.ID] = &channel
267 b.channelsByName[channel.Name] = &channel
268}
269
270func (b *channels) populateChannels(wait bool) {
271 b.refreshMutex.Lock()
272 if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) {
273 b.log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", minimumRefreshInterval)
274 b.refreshMutex.Unlock()
275 return
276 }
277 for b.refreshInProgress {
278 b.refreshMutex.Unlock()
279 time.Sleep(time.Second)
280 b.refreshMutex.Lock()
281 }
282 b.refreshInProgress = true
283 b.refreshMutex.Unlock()
284
285 newChannelsByID := map[string]*slack.Channel{}
286 newChannelsByName := map[string]*slack.Channel{}
287 newChannelMembers := make(map[string][]string)
288
289 // We only retrieve public and private channels, not IMs
290 // and MPIMs as those do not have a channel name.
291 queryParams := &slack.GetConversationsParameters{
292 ExcludeArchived: true,
293 Types: []string{"public_channel,private_channel"},
294 Limit: 1000,
295 }
296 for {
297 channels, nextCursor, err := b.sc.GetConversations(queryParams)
298 if err != nil {
299 if err = handleRateLimit(b.log, err); err != nil {
300 b.log.Errorf("Could not retrieve channels: %#v", err)
301 return
302 }
303 continue
304 }
305
306 for i := range channels {
307 newChannelsByID[channels[i].ID] = &channels[i]
308 newChannelsByName[channels[i].Name] = &channels[i]
309 // also find all the members in every channel
310 // comment for now, issues on big slacks
311 /*
312 members, err := b.getUsersInConversation(channels[i].ID)
313 if err != nil {
314 if err = b.handleRateLimit(err); err != nil {
315 b.Log.Errorf("Could not retrieve channel members: %#v", err)
316 return
317 }
318 continue
319 }
320 newChannelMembers[channels[i].ID] = members
321 */
322 }
323
324 if nextCursor == "" {
325 break
326 }
327 queryParams.Cursor = nextCursor
328 }
329
330 b.channelsMutex.Lock()
331 defer b.channelsMutex.Unlock()
332 b.channelsByID = newChannelsByID
333 b.channelsByName = newChannelsByName
334
335 b.channelMembersMutex.Lock()
336 defer b.channelMembersMutex.Unlock()
337 b.channelMembers = newChannelMembers
338
339 b.refreshMutex.Lock()
340 defer b.refreshMutex.Unlock()
341 b.earliestRefresh = time.Now().Add(minimumRefreshInterval)
342 b.refreshInProgress = false
343}
344