Thumbnail

rani/matterbridge.git

Clone URL: https://git.buni.party/rani/matterbridge.git

Viewing file on branch master

1package api
2
3import (
4 "encoding/base64"
5 "encoding/json"
6 "net/http"
7 "strings"
8 "sync"
9 "time"
10
11 "github.com/olahol/melody"
12
13 "github.com/labstack/echo/v4"
14 "github.com/labstack/echo/v4/middleware"
15 "github.com/matterbridge-org/matterbridge/bridge"
16 "github.com/matterbridge-org/matterbridge/bridge/config"
17 "github.com/mitchellh/mapstructure"
18 ring "github.com/zfjagann/golang-ring"
19)
20
21type API struct {
22 Messages ring.Ring
23 sync.RWMutex
24 *bridge.Config
25 mrouter *melody.Melody
26}
27
28type Message struct {
29 Text string `json:"text"`
30 Username string `json:"username"`
31 UserID string `json:"userid"`
32 Avatar string `json:"avatar"`
33 Gateway string `json:"gateway"`
34}
35
36func New(cfg *bridge.Config) bridge.Bridger {
37 b := &API{Config: cfg}
38 e := echo.New()
39 e.HideBanner = true
40 e.HidePort = true
41
42 b.mrouter = melody.New()
43 b.mrouter.HandleMessage(func(s *melody.Session, msg []byte) {
44 message := config.Message{}
45 err := json.Unmarshal(msg, &message)
46 if err != nil {
47 b.Log.Errorf("failed to decode message from byte[] '%s'", string(msg))
48 return
49 }
50 b.handleWebsocketMessage(message, s)
51 })
52 b.mrouter.HandleConnect(func(session *melody.Session) {
53 greet := b.getGreeting()
54 data, err := json.Marshal(greet)
55 if err != nil {
56 b.Log.Errorf("failed to encode message '%v'", greet)
57 return
58 }
59 err = session.Write(data)
60 if err != nil {
61 b.Log.Errorf("failed to write message '%s'", string(data))
62 return
63 }
64 // TODO: send message history buffer from `b.Messages` here
65 })
66
67 b.Messages = ring.Ring{}
68 if b.GetInt("Buffer") != 0 {
69 b.Messages.SetCapacity(b.GetInt("Buffer"))
70 }
71 if b.GetString("Token") != "" {
72 e.Use(middleware.KeyAuth(func(key string, c echo.Context) (bool, error) {
73 return key == b.GetString("Token"), nil
74 }))
75 }
76
77 // Set RemoteNickFormat to a sane default
78 if !b.IsKeySet("RemoteNickFormat") {
79 b.Log.Debugln("RemoteNickFormat is unset, defaulting to \"{NICK}\"")
80 b.Config.Config.Viper().Set(b.GetConfigKey("RemoteNickFormat"), "{NICK}")
81 }
82
83 e.GET("/api/health", b.handleHealthcheck)
84 e.GET("/api/messages", b.handleMessages)
85 e.GET("/api/stream", b.handleStream)
86 e.GET("/api/websocket", b.handleWebsocket)
87 e.POST("/api/message", b.handlePostMessage)
88 go func() {
89 if b.GetString("BindAddress") == "" {
90 b.Log.Fatalf("No BindAddress configured.")
91 }
92 b.Log.Infof("Listening on %s", b.GetString("BindAddress"))
93 b.Log.Fatal(e.Start(b.GetString("BindAddress")))
94 }()
95 return b
96}
97
98func (b *API) Connect() error {
99 return nil
100}
101
102func (b *API) Disconnect() error {
103 return nil
104}
105
106func (b *API) JoinChannel(channel config.ChannelInfo) error {
107 return nil
108}
109
110func (b *API) Send(msg config.Message) (string, error) {
111 b.Lock()
112 defer b.Unlock()
113 // ignore delete messages
114 if msg.Event == config.EventMsgDelete {
115 return "", nil
116 }
117 b.Log.Debugf("enqueueing message from %s on ring buffer", msg.Username)
118 b.Messages.Enqueue(msg)
119
120 data, err := json.Marshal(msg)
121 if err != nil {
122 b.Log.Errorf("failed to encode message '%s'", msg)
123 }
124 _ = b.mrouter.Broadcast(data)
125 return "", nil
126}
127
128func (b *API) handleHealthcheck(c echo.Context) error {
129 return c.String(http.StatusOK, "OK")
130}
131
132func (b *API) handlePostMessage(c echo.Context) error {
133 message := config.Message{}
134 if err := c.Bind(&message); err != nil {
135 return err
136 }
137 // these values are fixed
138 message.Channel = "api"
139 message.Protocol = "api"
140 message.Account = b.Account
141 message.ID = ""
142 message.Timestamp = time.Now()
143
144 var (
145 fm map[string]interface{}
146 ds string
147 ok bool
148 )
149
150 for i, f := range message.Extra["file"] {
151 fi := config.FileInfo{}
152 if fm, ok = f.(map[string]interface{}); !ok {
153 return echo.NewHTTPError(http.StatusInternalServerError, "invalid format for extra")
154 }
155 err := mapstructure.Decode(fm, &fi)
156 if err != nil {
157 if !strings.Contains(err.Error(), "got string") {
158 return err
159 }
160 }
161 // mapstructure doesn't decode base64 into []byte, so it must be done manually for fi.Data
162 if ds, ok = fm["Data"].(string); !ok {
163 return echo.NewHTTPError(http.StatusInternalServerError, "invalid format for data")
164 }
165
166 data, err := base64.StdEncoding.DecodeString(ds)
167 if err != nil {
168 return err
169 }
170 fi.Data = &data
171 message.Extra["file"][i] = fi
172 }
173 b.Log.Debugf("Sending message from %s on %s to gateway", message.Username, "api")
174 b.Remote <- message
175 return c.JSON(http.StatusOK, message)
176}
177
178func (b *API) handleMessages(c echo.Context) error {
179 b.Lock()
180 defer b.Unlock()
181 c.JSONPretty(http.StatusOK, b.Messages.Values(), " ")
182 b.Messages = ring.Ring{}
183 return nil
184}
185
186func (b *API) getGreeting() config.Message {
187 return config.Message{
188 Event: config.EventAPIConnected,
189 Timestamp: time.Now(),
190 }
191}
192
193func (b *API) handleStream(c echo.Context) error {
194 c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
195 c.Response().WriteHeader(http.StatusOK)
196 greet := b.getGreeting()
197 if err := json.NewEncoder(c.Response()).Encode(greet); err != nil {
198 return err
199 }
200 c.Response().Flush()
201 for {
202 select {
203 // TODO: this causes issues, messages should be broadcasted to all connected clients
204 default:
205 msg := b.Messages.Dequeue()
206 if msg != nil {
207 if err := json.NewEncoder(c.Response()).Encode(msg); err != nil {
208 return err
209 }
210 c.Response().Flush()
211 }
212 time.Sleep(100 * time.Millisecond)
213 case <-c.Request().Context().Done():
214 return nil
215 }
216 }
217}
218
219func (b *API) handleWebsocketMessage(message config.Message, s *melody.Session) {
220 message.Channel = "api"
221 message.Protocol = "api"
222 message.Account = b.Account
223 message.ID = ""
224 message.Timestamp = time.Now()
225
226 data, err := json.Marshal(message)
227 if err != nil {
228 b.Log.Errorf("failed to encode message for loopback '%v'", message)
229 return
230 }
231 _ = b.mrouter.BroadcastOthers(data, s)
232
233 b.Log.Debugf("Sending websocket message from %s on %s to gateway", message.Username, "api")
234 b.Remote <- message
235}
236
237func (b *API) handleWebsocket(c echo.Context) error {
238 err := b.mrouter.HandleRequest(c.Response(), c.Request())
239 if err != nil {
240 b.Log.Errorf("error in websocket handling '%v'", err)
241 return err
242 }
243
244 return nil
245}
246