Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

Viewing file on branch master

1package bxmpp
2
3import (
4 "crypto/sha1"
5 "encoding/hex"
6 "encoding/xml"
7 "fmt"
8 "strings"
9 "sync"
10 "time"
11
12 "github.com/jpillora/backoff"
13 "github.com/matterbridge-org/matterbridge/bridge"
14 "github.com/matterbridge-org/matterbridge/bridge/config"
15 "gosrc.io/xmpp"
16 "gosrc.io/xmpp/stanza"
17)
18
19const xmppMsgCacheSize = 100
20
21type xmppReply struct {
22 XMLName xml.Name `xml:"urn:xmpp:reply:0 reply"`
23 ID string `xml:"id,attr"`
24 To string `xml:"to,attr"`
25}
26
27type xmppStanzaID struct {
28 XMLName xml.Name `xml:"urn:xmpp:sid:0 stanza-id"`
29 ID string `xml:"id,attr"`
30 By string `xml:"by,attr"`
31}
32
33type xmppFallback struct {
34 XMLName xml.Name `xml:"urn:xmpp:fallback:0 fallback"`
35 For string `xml:"for,attr"`
36 Body *xmppFallbackBody `xml:"body"`
37}
38
39type xmppFallbackBody struct {
40 Start int `xml:"start,attr"`
41 End int `xml:"end,attr"`
42}
43
44type xmppCachedMessage struct {
45 ID string
46 Nick string
47 Body string
48}
49
50func init() {
51 stanza.TypeRegistry.MapExtension(stanza.PKTMessage, xml.Name{Space: "urn:xmpp:reply:0", Local: "reply"}, xmppReply{})
52 stanza.TypeRegistry.MapExtension(stanza.PKTMessage, xml.Name{Space: "urn:xmpp:sid:0", Local: "stanza-id"}, xmppStanzaID{})
53 stanza.TypeRegistry.MapExtension(stanza.PKTMessage, xml.Name{Space: "urn:xmpp:fallback:0", Local: "fallback"}, xmppFallback{})
54}
55
56type Bxmpp struct {
57 *bridge.Config
58
59 component *xmpp.Component
60 router *xmpp.Router
61
62 domain string
63 mucDomain string
64 botNick string
65
66 mu sync.RWMutex
67 rooms map[string]*mucRoom
68 connected bool
69 reconnectCh chan struct{}
70
71 cacheMu sync.RWMutex
72 cacheRing []string
73 cacheMap map[string]xmppCachedMessage
74 cacheIdx int
75}
76
77type mucRoom struct {
78 Channel string
79 MucJID string
80 BotNick string
81 PuppetNicks map[string]string
82}
83
84func New(cfg *bridge.Config) bridge.Bridger {
85 return &Bxmpp{
86 Config: cfg,
87 rooms: make(map[string]*mucRoom),
88 cacheRing: make([]string, 0, xmppMsgCacheSize),
89 cacheMap: make(map[string]xmppCachedMessage, xmppMsgCacheSize),
90 }
91}
92
93func (b *Bxmpp) Connect() error {
94 b.domain = b.GetString("Domain")
95 if b.domain == "" {
96 return fmt.Errorf("xmpp Domain is required for component gateway")
97 }
98
99 b.mucDomain = b.GetString("Muc")
100 if b.mucDomain == "" {
101 b.mucDomain = "muc." + b.domain
102 }
103
104 b.botNick = b.GetString("Nick")
105 if b.botNick == "" {
106 b.botNick = "matterbridge"
107 }
108
109 b.Log.Debugf("XMPP component config: server=%s domain=%s muc=%s bot=%s", b.GetString("Server"), b.domain, b.mucDomain, b.botNick)
110 b.reconnectCh = make(chan struct{}, 1)
111
112 opts := xmpp.ComponentOptions{
113 TransportConfiguration: xmpp.TransportConfiguration{
114 Address: b.GetString("Server"),
115 Domain: b.domain,
116 },
117 Domain: b.domain,
118 Secret: b.GetString("Secret"),
119 Name: "matterbridge",
120 }
121
122 b.router = xmpp.NewRouter()
123 b.router.HandleFunc("message", b.handleMessage)
124
125 b.Log.Debug("XMPP component: creating component")
126 component, err := xmpp.NewComponent(opts, b.router, func(err error) {
127 b.handleComponentError(err)
128 })
129 if err != nil {
130 return err
131 }
132 b.component = component
133
134 if err := b.component.Connect(); err != nil {
135 return err
136 }
137
138 b.setConnected(true)
139 go b.manageConnection()
140 b.Log.Info("XMPP component connected")
141 return nil
142}
143
144func (b *Bxmpp) Disconnect() error {
145 b.leaveAllRooms()
146 if b.reconnectCh != nil {
147 close(b.reconnectCh)
148 b.reconnectCh = nil
149 }
150 b.setConnected(false)
151 return nil
152}
153
154func (b *Bxmpp) JoinChannel(channel config.ChannelInfo) error {
155 room := b.ensureRoom(channel.Name)
156 b.Log.Debugf("XMPP join MUC: channel=%s muc=%s bot=%s", room.Channel, room.MucJID, room.BotNick)
157 return b.joinMuc(room.MucJID, room.BotNick, b.botJID())
158}
159
160func (b *Bxmpp) Send(msg config.Message) (string, error) {
161 if !b.Connected() {
162 b.Log.Warnf("XMPP not connected, dropping message on bridge %s", b.Account)
163 return "", fmt.Errorf("bridge %s not connected, dropping message", b.Account)
164 }
165
166 if msg.Event == config.EventMsgDelete || msg.Event == config.EventAvatarDownload {
167 return "", nil
168 }
169 if msg.Event != "" && msg.Event != config.EventUserAction {
170 return "", nil
171 }
172
173 room := b.ensureRoom(msg.Channel)
174 puppetJID, _ := b.ensurePuppet(room, msg.Username, msg.UserID, msg.Protocol)
175
176 body := msg.Text
177 if msg.Event == config.EventUserAction {
178 body = "/me " + body
179 }
180
181 return "", b.sendMucMessage(room.MucJID, puppetJID, body)
182}
183
184func (b *Bxmpp) handleMessage(s xmpp.Sender, p stanza.Packet) {
185 msg, ok := p.(stanza.Message)
186 if !ok {
187 return
188 }
189 if msg.Type != stanza.MessageTypeGroupchat {
190 return
191 }
192 if msg.Body == "" {
193 return
194 }
195 if msg.To != b.botJID() {
196 return
197 }
198
199 b.Log.Debugf("XMPP recv message: from=%s to=%s type=%s id=%s body_len=%d", msg.From, msg.To, msg.Type, msg.Id, len(msg.Body))
200
201 roomJID, nick := parseRoomNick(msg.From)
202 if roomJID == "" || nick == "" {
203 return
204 }
205
206 room := b.getRoomByJID(roomJID)
207 if room == nil {
208 return
209 }
210
211 text := msg.Body
212 event := ""
213 if strings.HasPrefix(text, "/me ") {
214 text = strings.TrimPrefix(text, "/me ")
215 event = config.EventUserAction
216 }
217
218 replyID := b.getReplyID(msg, roomJID)
219 b.cacheMessage(replyID, nick, text)
220
221 if nick == room.BotNick {
222 return
223 }
224 if _, ok := room.PuppetNicks[nick]; ok {
225 return
226 }
227
228 text = b.handleQuote(msg, text)
229
230 rmsg := config.Message{
231 Username: nick,
232 Text: text,
233 Channel: room.Channel,
234 Account: b.Account,
235 UserID: msg.From,
236 Event: event,
237 }
238 b.Remote <- rmsg
239}
240
241func (b *Bxmpp) getReplyID(msg stanza.Message, roomJID string) string {
242 if msg.Type != stanza.MessageTypeGroupchat {
243 return ""
244 }
245
246 var sid xmppStanzaID
247 if msg.Get(&sid) && sid.ID != "" {
248 if bareJID(sid.By) == roomJID {
249 return sid.ID
250 }
251 }
252 return ""
253}
254
255func (b *Bxmpp) stripReplyFallback(msg stanza.Message, text string) string {
256 var fb xmppFallback
257 if !msg.Get(&fb) || fb.For != "urn:xmpp:reply:0" || fb.Body == nil {
258 return text
259 }
260
261 return removeRuneRange(text, fb.Body.Start, fb.Body.End)
262}
263
264func (b *Bxmpp) cacheMessage(id, nick, body string) {
265 if id == "" {
266 return
267 }
268 b.cacheMu.Lock()
269 defer b.cacheMu.Unlock()
270
271 entry := xmppCachedMessage{ID: id, Nick: nick, Body: body}
272
273 if _, exists := b.cacheMap[id]; exists {
274 b.cacheMap[id] = entry
275 return
276 }
277
278 if len(b.cacheRing) < xmppMsgCacheSize {
279 b.cacheRing = append(b.cacheRing, id)
280 b.cacheMap[id] = entry
281 return
282 }
283
284 evictID := b.cacheRing[b.cacheIdx]
285 delete(b.cacheMap, evictID)
286
287 b.cacheRing[b.cacheIdx] = id
288 b.cacheMap[id] = entry
289 b.cacheIdx = (b.cacheIdx + 1) % xmppMsgCacheSize
290}
291
292func (b *Bxmpp) findCachedMessage(id string) (xmppCachedMessage, bool) {
293 b.cacheMu.RLock()
294 defer b.cacheMu.RUnlock()
295
296 msg, ok := b.cacheMap[id]
297 return msg, ok
298}
299
300func (b *Bxmpp) handleQuote(msg stanza.Message, text string) string {
301 if b.GetBool("QuoteDisable") {
302 return text
303 }
304
305 var reply xmppReply
306 if !msg.Get(&reply) || reply.ID == "" {
307 return text
308 }
309 b.Log.Debugf("XMPP reply details: id=%s to=%s", reply.ID, reply.To)
310
311 cached, ok := b.findCachedMessage(reply.ID)
312 if !ok {
313 return text
314 }
315
316 text = b.stripReplyFallback(msg, text)
317
318 quoteMessage := cached.Body
319 quoteNick := cached.Nick
320
321 format := b.GetString("quoteformat")
322 if format == "" {
323 format = "{MESSAGE} (re @{QUOTENICK}: {QUOTEMESSAGE})"
324 }
325
326 limit := b.GetInt("QuoteLengthLimit")
327 if limit != 0 {
328 runes := []rune(quoteMessage)
329 if len(runes) > limit {
330 quoteMessage = string(runes[:limit]) + "..."
331 }
332 }
333
334 replacer := strings.NewReplacer(
335 "{MESSAGE}", text,
336 "{QUOTENICK}", quoteNick,
337 "{QUOTEMESSAGE}", quoteMessage,
338 )
339 return replacer.Replace(format)
340}
341
342func (b *Bxmpp) ensureRoom(channel string) *mucRoom {
343 b.mu.Lock()
344 defer b.mu.Unlock()
345
346 if room, ok := b.rooms[channel]; ok {
347 return room
348 }
349
350 mucJID := b.buildMucJID(channel)
351 room := &mucRoom{
352 Channel: channel,
353 MucJID: mucJID,
354 BotNick: b.botNick,
355 PuppetNicks: make(map[string]string),
356 }
357 b.rooms[channel] = room
358 return room
359}
360
361func (b *Bxmpp) getRoomByJID(roomJID string) *mucRoom {
362 b.mu.RLock()
363 defer b.mu.RUnlock()
364
365 for _, room := range b.rooms {
366 if room.MucJID == roomJID {
367 return room
368 }
369 }
370 return nil
371}
372
373func (b *Bxmpp) buildMucJID(channel string) string {
374 return channel + "@" + b.mucDomain
375}
376
377func (b *Bxmpp) botJID() string {
378 return b.botNick + "@" + b.domain
379}
380
381func (b *Bxmpp) ensurePuppet(room *mucRoom, displayName, userID, protocol string) (string, string) {
382 puppetNick := displayName
383 puppetJID := b.buildPuppetJID(displayName, userID, protocol)
384
385 if _, ok := room.PuppetNicks[puppetNick]; !ok {
386 b.ensureMemberAffiliation(room.MucJID, puppetJID)
387 if err := b.joinMuc(room.MucJID, puppetNick, puppetJID); err != nil {
388 b.Log.WithError(err).Errorf("XMPP puppet join failed: room=%s nick=%s jid=%s", room.MucJID, puppetNick, puppetJID)
389 }
390 room.PuppetNicks[puppetNick] = puppetJID
391 }
392
393 return puppetJID, puppetNick
394}
395
396func (b *Bxmpp) buildPuppetJID(displayName, userID, protocol string) string {
397 if b.GetBool("UsePerProtocolJID") {
398 local := protocol + "-" + shortHash(userID)
399 return local + "@" + b.domain
400 } else {
401 local := shortHash(displayName)
402 return local + "@" + b.domain
403 }
404}
405
406func shortHash(value string) string {
407 sum := sha1.Sum([]byte(value))
408 return hex.EncodeToString(sum[:])[:12]
409}
410
411func (b *Bxmpp) ensureMemberAffiliation(mucJID, puppetJID string) {
412 // horrible, but go-xmpp doesn't have muc#admin yet
413 iq := fmt.Sprintf("<iq type='set' from='%s' to='%s' id='mb-affil'><query xmlns='http://jabber.org/protocol/muc#admin'><item affiliation='member' jid='%s'/></query></iq>", b.botJID(), mucJID, puppetJID)
414 if err := b.component.SendRaw(iq); err != nil {
415 b.Log.WithError(err).Errorf("XMPP set affiliation failed: room=%s jid=%s", mucJID, puppetJID)
416 }
417}
418
419func (b *Bxmpp) joinMuc(mucJID, nick, fromJID string) error {
420 pres := stanza.NewPresence(stanza.Attrs{
421 To: mucJID + "/" + nick,
422 From: fromJID,
423 })
424
425 mucExt := stanza.MucPresence{
426 History: stanza.History{
427 MaxStanzas: stanza.NewNullableInt(0),
428 },
429 }
430 pres.Extensions = append(pres.Extensions, mucExt)
431
432 if err := b.component.Send(pres); err != nil {
433 b.Log.WithError(err).Errorf("XMPP join presence failed: to=%s from=%s", mucJID+"/"+nick, fromJID)
434 return err
435 }
436 return nil
437}
438
439func (b *Bxmpp) sendMucMessage(mucJID, fromJID, body string) error {
440 b.Log.Debugf("XMPP send message: to=%s from=%s body_len=%d", mucJID, fromJID, len(body))
441 msg := stanza.NewMessage(stanza.Attrs{
442 Type: stanza.MessageTypeGroupchat,
443 To: mucJID,
444 From: fromJID,
445 })
446 msg.Body = body
447
448 if err := b.component.Send(msg); err != nil {
449 b.Log.WithError(err).Errorf("XMPP send failed: to=%s from=%s", mucJID, fromJID)
450 return err
451 }
452 return nil
453}
454
455func (b *Bxmpp) leaveAllRooms() {
456 b.mu.RLock()
457 defer b.mu.RUnlock()
458
459 for _, room := range b.rooms {
460 pres := stanza.NewPresence(stanza.Attrs{
461 Type: stanza.PresenceTypeUnavailable,
462 To: room.MucJID + "/" + room.BotNick,
463 From: b.botJID(),
464 })
465 _ = b.component.Send(pres)
466
467 for nick, jid := range room.PuppetNicks {
468 pres := stanza.NewPresence(stanza.Attrs{
469 Type: stanza.PresenceTypeUnavailable,
470 To: room.MucJID + "/" + nick,
471 From: jid,
472 })
473 _ = b.component.Send(pres)
474 }
475 }
476}
477
478func (b *Bxmpp) handleComponentError(err error) {
479 b.Log.WithError(err).Error("XMPP component error")
480 _ = b.component.Disconnect()
481 b.setConnected(false)
482 b.signalReconnect()
483}
484
485func (b *Bxmpp) signalReconnect() {
486 if b.reconnectCh == nil {
487 return
488 }
489 select {
490 case b.reconnectCh <- struct{}{}:
491 default:
492 }
493}
494
495func (b *Bxmpp) manageConnection() {
496 bf := &backoff.Backoff{
497 Min: time.Second,
498 Max: 5 * time.Minute,
499 Jitter: true,
500 }
501
502 for {
503 _, ok := <-b.reconnectCh
504 if !ok {
505 return
506 }
507
508 for {
509 d := bf.Duration()
510 b.Log.Infof("Reconnecting in %s.", d)
511 time.Sleep(d)
512
513 b.Log.Infof("Reconnecting now.")
514 if err := b.component.Resume(); err == nil {
515 b.setConnected(true)
516 bf.Reset()
517
518 b.Remote <- config.Message{
519 Username: "system",
520 Text: "rejoin",
521 Channel: "",
522 Account: b.Account,
523 Event: config.EventRejoinChannels,
524 }
525 break
526 }
527 b.Log.Warn("Failed to reconnect.")
528 }
529 }
530}
531
532func parseRoomNick(from string) (string, string) {
533 parts := strings.Split(from, "/")
534 if len(parts) != 2 {
535 return "", ""
536 }
537 return parts[0], parts[1]
538}
539
540func bareJID(jid string) string {
541 if i := strings.Index(jid, "/"); i >= 0 {
542 return jid[:i]
543 }
544 return jid
545}
546
547func removeRuneRange(text string, start, end int) string {
548 if start < 0 || end <= start {
549 return text
550 }
551 runes := []rune(text)
552 if start > len(runes) || end > len(runes) {
553 return text
554 }
555 return string(append(runes[:start], runes[end:]...))
556}
557
558func (b *Bxmpp) setConnected(state bool) {
559 b.Lock()
560 b.connected = state
561 b.Unlock()
562}
563
564func (b *Bxmpp) Connected() bool {
565 b.RLock()
566 defer b.RUnlock()
567 return b.connected
568}
569