Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

commit cf327804348fe4e3b8fefce05a9cb57e425b4fd5 Author: Wim <wim@42.be> Date: Tue Feb 14 21:12:02 2017 +0000 Refactor to handle disconnects/reconnects better. Now try to reconnect every 60 seconds until forever. diff --git a/bridge/bridge.go b/bridge/bridge.go index b387812..c8c6ac4 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -106 +108 @@ import (   "github.com/42wim/matterbridge/bridge/slack"   "github.com/42wim/matterbridge/bridge/telegram"   "github.com/42wim/matterbridge/bridge/xmpp" + log "github.com/Sirupsen/logrus" +   "strings"  )   @@ -1714 +1918 @@ type Bridger interface {   Send(msg config.Message) error   Connect() error   JoinChannel(channel string) error + Disconnect() error  }    type Bridge struct {   Config config.Protocol   Bridger - Name string - Account string - Protocol string + Name string + Account string + Protocol string + ChannelsOut []string + ChannelsIn []string + ChannelOptions config.ChannelOptions  }    func New(cfg *config.Config, bridge *config.Bridge, c chan config.Message) *Bridge { @@ -663 +7215 @@ func New(cfg *config.Config, bridge *config.Bridge, c chan config.Message) *Brid   }   return b  } + +func (b *Bridge) JoinChannels() error { + exists := make(map[string]bool) + for _, channel := range append(b.ChannelsIn, b.ChannelsOut...) { + if !exists[channel] { + log.Infof("%s: joining %s", b.Account, channel) + b.JoinChannel(channel) + exists[channel] = true + } + } + return nil +} diff --git a/bridge/config/config.go b/bridge/config/config.go index ac3e939..811c97a 100644 --- a/bridge/config/config.go +++ b/bridge/config/config.go @@ -106 +107 @@ import (    const (   EVENT_JOIN_LEAVE = "join_leave" + EVENT_FAILURE = "failure"  )    type Message struct { diff --git a/bridge/discord/discord.go b/bridge/discord/discord.go index 620493b..06508b8 100644 --- a/bridge/discord/discord.go +++ b/bridge/discord/discord.go @@ -806 +8010 @@ func (b *bdiscord) Connect() error {   return nil  }   +func (b *bdiscord) Disconnect() error { + return nil +} +  func (b *bdiscord) JoinChannel(channel string) error {   idcheck := strings.Split(channel, "ID:")   if len(idcheck) > 1 { diff --git a/bridge/gitter/gitter.go b/bridge/gitter/gitter.go index 0400b4b..d1f4b40 100644 --- a/bridge/gitter/gitter.go +++ b/bridge/gitter/gitter.go @@ -456 +4511 @@ func (b *Bgitter) Connect() error {   return nil  }   +func (b *Bgitter) Disconnect() error { + return nil + +} +  func (b *Bgitter) JoinChannel(channel string) error {   room := channel   roomID := b.getRoomID(room) diff --git a/bridge/irc/irc.go b/bridge/irc/irc.go index fe8dc74..db43080 100644 --- a/bridge/irc/irc.go +++ b/bridge/irc/irc.go @@ -467 +466 @@ func New(cfg config.Protocol, account string, c chan config.Message) *Birc {   if b.Config.MessageQueue == 0 {   b.Config.MessageQueue = 30   } - b.Local = make(chan config.Message, b.Config.MessageQueue+10)   return b  }   @@ -616 +607 @@ func (b *Birc) Command(msg *config.Message) string {  }    func (b *Birc) Connect() error { + b.Local = make(chan config.Message, b.Config.MessageQueue+10)   flog.Infof("Connecting %s", b.Config.Server)   i := irc.IRC(b.Config.Nick, b.Config.Nick)   if log.GetLevel() == log.DebugLevel { @@ -916 +9112 @@ func (b *Birc) Connect() error {   return nil  }   +func (b *Birc) Disconnect() error { + b.i.Disconnect() + close(b.Local) + return nil +} +  func (b *Birc) JoinChannel(channel string) error {   b.i.Join(channel)   return nil @@ -1707 +17611 @@ func (b *Birc) handleJoinPart(event *irc.Event) {   flog.Debugf("Sending JOIN_LEAVE event from %s to gateway", b.Account)   channel := event.Arguments[0]   if event.Code == "QUIT" { - channel = "" + if event.Nick == b.Nick && strings.Contains(event.Raw, "Ping timeout") { + flog.Infof("%s reconnecting ..", b.Account) + b.Remote <- config.Message{Username: "system", Text: "reconnect", Channel: channel, Account: b.Account, Event: config.EVENT_FAILURE} + return + }   }   b.Remote <- config.Message{Username: "system", Text: event.Nick + " " + strings.ToLower(event.Code) + "s", Channel: channel, Account: b.Account, Event: config.EVENT_JOIN_LEAVE}   flog.Debugf("handle %#v", event) diff --git a/bridge/mattermost/mattermost.go b/bridge/mattermost/mattermost.go index e2bf228..126bab4 100644 --- a/bridge/mattermost/mattermost.go +++ b/bridge/mattermost/mattermost.go @@ -776 +7710 @@ func (b *Bmattermost) Connect() error {   return nil  }   +func (b *Bmattermost) Disconnect() error { + return nil +} +  func (b *Bmattermost) JoinChannel(channel string) error {   // we can only join channels using the API   if b.Config.UseAPI { diff --git a/bridge/rocketchat/rocketchat.go b/bridge/rocketchat/rocketchat.go index d87450e..4590a89 100644 --- a/bridge/rocketchat/rocketchat.go +++ b/bridge/rocketchat/rocketchat.go @@ -496 +4911 @@ func (b *Brocketchat) Connect() error {   return nil  }   +func (b *Brocketchat) Disconnect() error { + return nil + +} +  func (b *Brocketchat) JoinChannel(channel string) error {   return nil  } diff --git a/bridge/slack/slack.go b/bridge/slack/slack.go index 763231d..0f8806a 100644 --- a/bridge/slack/slack.go +++ b/bridge/slack/slack.go @@ -656 +6511 @@ func (b *Bslack) Connect() error {   return nil  }   +func (b *Bslack) Disconnect() error { + return nil + +} +  func (b *Bslack) JoinChannel(channel string) error {   // we can only join channels using the API   if b.Config.UseAPI { diff --git a/bridge/telegram/telegram.go b/bridge/telegram/telegram.go index 38d7fd1..aa63745 100644 --- a/bridge/telegram/telegram.go +++ b/bridge/telegram/telegram.go @@ -516 +5111 @@ func (b *Btelegram) Connect() error {   return nil  }   +func (b *Btelegram) Disconnect() error { + return nil + +} +  func (b *Btelegram) JoinChannel(channel string) error {   return nil  } diff --git a/bridge/xmpp/xmpp.go b/bridge/xmpp/xmpp.go index 8899e71..4dcb8ef 100644 --- a/bridge/xmpp/xmpp.go +++ b/bridge/xmpp/xmpp.go @@ -110 +110 @@  package bxmpp    import ( + "crypto/tls"   "github.com/42wim/matterbridge/bridge/config"   log "github.com/Sirupsen/logrus"   "github.com/mattn/go-xmpp" - "crypto/tls"     "strings"   "time" @@ -476 +4710 @@ func (b *Bxmpp) Connect() error {   return nil  }   +func (b *Bxmpp) Disconnect() error { + return nil +} +  func (b *Bxmpp) JoinChannel(channel string) error {   b.xc.JoinMUCNoHistory(channel+"@"+b.Config.Muc, b.Config.Nick)   return nil @@ -6311 +6711 @@ func (b *Bxmpp) createXMPP() (*xmpp.Client, error) {   tc.InsecureSkipVerify = b.Config.SkipTLSVerify   tc.ServerName = strings.Split(b.Config.Server, ":")[0]   options := xmpp.Options{ - Host: b.Config.Server, - User: b.Config.Jid, - Password: b.Config.Password, - NoTLS: true, - StartTLS: true, + Host: b.Config.Server, + User: b.Config.Jid, + Password: b.Config.Password, + NoTLS: true, + StartTLS: true,   TLSConfig: tc,     //StartTLS: false, diff --git a/gateway/gateway.go b/gateway/gateway.go index 82a76ef..1957177 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -76 +77 @@ import (   log "github.com/Sirupsen/logrus"   "reflect"   "strings" + "time"  )    type Gateway struct { @@ -3924 +4016 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error {   }   log.Infof("Starting bridge: %s ", cfg.Account)   br := bridge.New(gw.Config, cfg, gw.Message) + br.ChannelsOut = gw.ChannelsOut[br.Account] + br.ChannelsIn = gw.ChannelsIn[br.Account] + br.ChannelOptions = gw.ChannelOptions[br.Account] +   gw.Bridges[cfg.Account] = br   err := br.Connect()   if err != nil {   return fmt.Errorf("Bridge %s failed to start: %v", br.Account, err)   } - exists := make(map[string]bool) - for _, channel := range append(gw.ChannelsOut[br.Account], gw.ChannelsIn[br.Account]...) { - if !exists[br.Account+channel] { - mychannel := channel - log.Infof("%s: joining %s", br.Account, channel) - if br.Protocol == "irc" && gw.ChannelOptions[br.Account+channel].Key != "" { - log.Debugf("using key %s for channel %s", gw.ChannelOptions[br.Account+channel].Key, channel) - mychannel = mychannel + " " + gw.ChannelOptions[br.Account+channel].Key - } - br.JoinChannel(mychannel) - exists[br.Account+channel] = true - } - } + br.JoinChannels()   return nil  }   @@ -766 +6913 @@ func (gw *Gateway) handleReceive() {   for {   select {   case msg := <-gw.Message: + if msg.Event == config.EVENT_FAILURE { + for _, br := range gw.Bridges { + if msg.Account == br.Account { + go gw.reconnectBridge(br) + } + } + }   if !gw.ignoreMessage(&msg) {   for _, br := range gw.Bridges {   gw.handleMessage(msg, br) @@ -856 +8520 @@ func (gw *Gateway) handleReceive() {   }  }   +func (gw *Gateway) reconnectBridge(br *bridge.Bridge) { + br.Disconnect() + time.Sleep(time.Second * 5) +RECONNECT: + log.Infof("Reconnecting %s", br.Account) + err := br.Connect() + if err != nil { + log.Errorf("Reconnection failed: %s. Trying again in 60 seconds", err) + time.Sleep(time.Second * 60) + goto RECONNECT + } + br.JoinChannels() +} +  func (gw *Gateway) mapChannels() error {   options := make(map[string]config.ChannelOptions)   m := make(map[string][]string)