Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

Viewing file on branch master

1package gateway
2
3import (
4 "fmt"
5 "sync"
6 "time"
7
8 "github.com/matterbridge-org/matterbridge/bridge"
9 "github.com/matterbridge-org/matterbridge/bridge/config"
10 "github.com/matterbridge-org/matterbridge/gateway/samechannel"
11 "github.com/sirupsen/logrus"
12)
13
14type Router struct {
15 config.Config
16 sync.RWMutex
17
18 BridgeMap map[string]bridge.Factory
19 Gateways map[string]*Gateway
20 Message chan config.Message
21 MattermostPlugin chan config.Message
22
23 logger *logrus.Entry
24}
25
26// NewRouter initializes a new Matterbridge router for the specified configuration and
27// sets up all required gateways.
28func NewRouter(rootLogger *logrus.Logger, cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) {
29 logger := rootLogger.WithFields(logrus.Fields{"prefix": "router"})
30
31 r := &Router{
32 Config: cfg,
33 BridgeMap: bridgeMap,
34 Message: make(chan config.Message),
35 MattermostPlugin: make(chan config.Message),
36 Gateways: make(map[string]*Gateway),
37 logger: logger,
38 }
39 sgw := samechannel.New(cfg)
40 gwconfigs := append(sgw.GetConfig(), cfg.BridgeValues().Gateway...)
41
42 for idx := range gwconfigs {
43 entry := &gwconfigs[idx]
44 if !entry.Enable {
45 continue
46 }
47 if entry.Name == "" {
48 return nil, fmt.Errorf("%s", "Gateway without name found")
49 }
50 if _, ok := r.Gateways[entry.Name]; ok {
51 return nil, fmt.Errorf("Gateway with name %s already exists", entry.Name)
52 }
53 r.Gateways[entry.Name] = New(rootLogger, entry, r)
54 }
55 return r, nil
56}
57
58// Start will connect all gateways belonging to this router and subsequently route messages
59// between them.
60func (r *Router) Start() error {
61 // Deprecating MediaServerUpload. Remove in future v2.1 release
62 deprecatedValue, _ := r.GetString("MediaServerUpload")
63 if deprecatedValue != "" {
64 r.logger.Fatal("MediaServerUpload config option has been deprecated. You should either remove this option from your configuration, or help us document it.")
65 }
66
67 m := make(map[string]*bridge.Bridge)
68 if len(r.Gateways) == 0 {
69 return fmt.Errorf("no [[gateway]] configured. See https://github.com/42wim/matterbridge/wiki/How-to-create-your-config for more info")
70 }
71 for _, gw := range r.Gateways {
72 r.logger.Infof("Parsing gateway %s", gw.Name)
73 if len(gw.Bridges) == 0 {
74 return fmt.Errorf("no bridges configured for gateway %s. See https://github.com/42wim/matterbridge/wiki/How-to-create-your-config for more info", gw.Name)
75 }
76 for _, br := range gw.Bridges {
77 m[br.Account] = br
78 }
79 }
80 for _, br := range m {
81 r.logger.Infof("Starting bridge: %s ", br.Account)
82 err := br.Connect()
83 if err != nil {
84 e := fmt.Errorf("Bridge %s failed to start: %v", br.Account, err)
85 if r.disableBridge(br, e) {
86 continue
87 }
88 return e
89 }
90 err = br.JoinChannels()
91 if err != nil {
92 e := fmt.Errorf("Bridge %s failed to join channel: %v", br.Account, err)
93 if r.disableBridge(br, e) {
94 continue
95 }
96 return e
97 }
98 }
99 // remove unused bridges
100 for _, gw := range r.Gateways {
101 for i, br := range gw.Bridges {
102 if br.Bridger == nil {
103 r.logger.Errorf("removing failed bridge %s", i)
104 delete(gw.Bridges, i)
105 }
106 }
107 }
108 go r.handleReceive()
109 //go r.updateChannelMembers()
110 return nil
111}
112
113// disableBridge returns true and empties a bridge if we have IgnoreFailureOnStart configured
114// otherwise returns false
115func (r *Router) disableBridge(br *bridge.Bridge, err error) bool {
116 if r.BridgeValues().General.IgnoreFailureOnStart {
117 r.logger.Error(err)
118 // setting this bridge empty
119 *br = bridge.Bridge{
120 Log: br.Log,
121 }
122 return true
123 }
124 return false
125}
126
127func (r *Router) getBridge(account string) *bridge.Bridge {
128 for _, gw := range r.Gateways {
129 if br, ok := gw.Bridges[account]; ok {
130 return br
131 }
132 }
133 return nil
134}
135
136func (r *Router) handleReceive() {
137 for msg := range r.Message {
138 msg := msg // scopelint
139 r.handleEventGetChannelMembers(&msg)
140 r.handleEventFailure(&msg)
141 r.handleEventRejoinChannels(&msg)
142
143 // Set message protocol based on the account it came from
144 msg.Protocol = r.getBridge(msg.Account).Protocol
145
146 filesHandled := false
147 for _, gw := range r.Gateways {
148 // record all the message ID's of the different bridges
149 var msgIDs []*BrMsgID
150 if gw.ignoreMessage(&msg) {
151 continue
152 }
153 msg.Timestamp = time.Now()
154 gw.modifyMessage(&msg)
155 if !filesHandled {
156 gw.handleFiles(&msg)
157 filesHandled = true
158 }
159 for _, br := range gw.Bridges {
160 msgIDs = append(msgIDs, gw.handleMessage(&msg, br)...)
161 }
162
163 if msg.ID != "" {
164 _, exists := gw.Messages.Get(msg.Protocol + " " + msg.ID)
165
166 // Only add the message ID if it doesn't already exist
167 //
168 // For some bridges we always add/update the message ID.
169 // This is necessary as msgIDs will change if a bridge returns
170 // a different ID in response to edits.
171 if !exists {
172 gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs)
173 }
174 }
175 }
176 }
177}
178
179// updateChannelMembers sends every minute an GetChannelMembers event to all bridges.
180func (r *Router) updateChannelMembers() {
181 // TODO sleep a minute because slack can take a while
182 // fix this by having actually connectionDone events send to the router
183 time.Sleep(time.Minute)
184 for {
185 for _, gw := range r.Gateways {
186 for _, br := range gw.Bridges {
187 // only for slack now
188 if br.Protocol != "slack" {
189 continue
190 }
191 r.logger.Debugf("sending %s to %s", config.EventGetChannelMembers, br.Account)
192 if _, err := br.Send(config.Message{Event: config.EventGetChannelMembers}); err != nil {
193 r.logger.Errorf("updateChannelMembers: %s", err)
194 }
195 }
196 }
197 time.Sleep(time.Minute)
198 }
199}
200