Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

commit d9668c2d55d0ce48b47bd314dbcc2b45e76db296 Author: Duco van Amstel <duco.vanamstel@gmail.com> Date: Wed Mar 20 21:54:31 2019 +0000 Fix race-condition in populateUser() (#767) Fix the root-cause of #759 by introducing synchronisation points for individual user fetches. diff --git a/bridge/slack/users_channels.go b/bridge/slack/users_channels.go index 9a8568b..8712d26 100644 --- a/bridge/slack/users_channels.go +++ b/bridge/slack/users_channels.go @@ -188 +189 @@ type users struct {   log *logrus.Entry   sc *slack.Client   - users map[string]*slack.User - usersMutex sync.RWMutex + users map[string]*slack.User + usersMutex sync.RWMutex + usersSyncPoints map[string]chan struct{}     refreshInProgress bool   earliestRefresh time.Time @@ -316 +327 @@ func newUserManager(log *logrus.Entry, sc *slack.Client) *users {   log: log,   sc: sc,   users: make(map[string]*slack.User), + usersSyncPoints: make(map[string]chan struct{}),   earliestRefresh: time.Now(),   }  } @@ -6814 +7032 @@ func (b *users) getAvatar(id string) string {  }    func (b *users) populateUser(userID string) { - b.usersMutex.RLock() - _, exists := b.users[userID] - b.usersMutex.RUnlock() - if exists { - // already in cache - return + for { + b.usersMutex.Lock() + _, exists := b.users[userID] + if exists { + // already in cache + b.usersMutex.Unlock() + return + } + + if syncPoint, ok := b.usersSyncPoints[userID]; ok { + // Another goroutine is already populating this user for us so wait on it to finish. + b.usersMutex.Unlock() + <-syncPoint + // We do not return and iterate again to check that the entry does indeed exist + // in case the previous query failed for some reason. + } else { + b.usersSyncPoints[userID] = make(chan struct{}) + b.usersMutex.Unlock() + break + }   }   + // Do not hold the lock while fetching information from Slack + // as this might take an unbounded amount of time. + b.usersMutex.Unlock() +   user, err := b.sc.GetUserInfo(userID)   if err != nil {   b.log.Debugf("GetUserInfo failed for %v: %v", userID, err) @@ -8315 +10320 @@ func (b *users) populateUser(userID string) {   }     b.usersMutex.Lock() + defer b.usersMutex.Unlock() + + // Register user information.   b.users[userID] = user - b.usersMutex.Unlock() + + // Wake up any waiting goroutines and remove the synchronization point. + close(b.usersSyncPoints[userID]) + delete(b.usersSyncPoints, userID)  }    func (b *users) populateUsers(wait bool) {   b.refreshMutex.Lock()   if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { - b.log.Debugf("Not refreshing user list as it was done less than %v ago.", - minimumRefreshInterval) + b.log.Debugf("Not refreshing user list as it was done less than %v ago.", minimumRefreshInterval)   b.refreshMutex.Unlock()     return @@ -2309 +25510 @@ func (b *channels) getChannelMembers(users *users) config.ChannelMembers {    func (b *channels) registerChannel(channel slack.Channel) {   b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() +   b.channelsByID[channel.ID] = &channel   b.channelsByName[channel.Name] = &channel - b.channelsMutex.Unlock()  }    func (b *channels) populateChannels(wait bool) {