stream.go (10635B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package streaming 19 20 import ( 21 "context" 22 "time" 23 24 "codeberg.org/gruf/go-kv" 25 apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" 26 "github.com/superseriousbusiness/gotosocial/internal/gtserror" 27 "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" 28 "github.com/superseriousbusiness/gotosocial/internal/log" 29 "github.com/superseriousbusiness/gotosocial/internal/oauth" 30 streampkg "github.com/superseriousbusiness/gotosocial/internal/stream" 31 "golang.org/x/exp/slices" 32 33 "github.com/gin-gonic/gin" 34 "github.com/gorilla/websocket" 35 ) 36 37 // StreamGETHandler swagger:operation GET /api/v1/streaming streamGet 38 // 39 // Initiate a websocket connection for live streaming of statuses and notifications. 40 // 41 // The scheme used should *always* be `wss`. The streaming basepath can be viewed at `/api/v1/instance`. 42 // 43 // On a successful connection, a code `101` will be returned, which indicates that the connection is being upgraded to a secure websocket connection. 44 // 45 // As long as the connection is open, various message types will be streamed into it. 46 // 47 // GoToSocial will ping the connection every 30 seconds to check whether the client is still receiving. 48 // 49 // If the ping fails, or something else goes wrong during transmission, then the connection will be dropped, and the client will be expected to start it again. 50 // 51 // --- 52 // tags: 53 // - streaming 54 // 55 // produces: 56 // - application/json 57 // 58 // schemes: 59 // - wss 60 // 61 // parameters: 62 // - 63 // name: access_token 64 // type: string 65 // description: Access token for the requesting account. 66 // in: query 67 // required: true 68 // - 69 // name: stream 70 // type: string 71 // description: |- 72 // Type of stream to request. 73 // 74 // Options are: 75 // 76 // `user`: receive updates for the account's home timeline. 77 // `public`: receive updates for the public timeline. 78 // `public:local`: receive updates for the local timeline. 79 // `hashtag`: receive updates for a given hashtag. 80 // `hashtag:local`: receive local updates for a given hashtag. 81 // `list`: receive updates for a certain list of accounts. 82 // `direct`: receive updates for direct messages. 83 // in: query 84 // required: true 85 // - 86 // name: list 87 // type: string 88 // description: |- 89 // ID of the list to subscribe to. 90 // Only used if stream type is 'list'. 91 // in: query 92 // - 93 // name: tag 94 // type: string 95 // description: |- 96 // Name of the tag to subscribe to. 97 // Only used if stream type is 'hashtag' or 'hashtag:local'. 98 // in: query 99 // 100 // security: 101 // - OAuth2 Bearer: 102 // - read:streaming 103 // 104 // responses: 105 // '101': 106 // schema: 107 // type: object 108 // properties: 109 // stream: 110 // type: array 111 // items: 112 // type: string 113 // enum: 114 // - user 115 // - public 116 // - public:local 117 // - hashtag 118 // - hashtag:local 119 // - list 120 // - direct 121 // event: 122 // description: |- 123 // The type of event being received. 124 // 125 // `update`: a new status has been received. 126 // `notification`: a new notification has been received. 127 // `delete`: a status has been deleted. 128 // `filters_changed`: not implemented. 129 // type: string 130 // enum: 131 // - update 132 // - notification 133 // - delete 134 // - filters_changed 135 // payload: 136 // description: |- 137 // The payload of the streamed message. 138 // Different depending on the `event` type. 139 // 140 // If present, it should be parsed as a string. 141 // 142 // If `event` = `update`, then the payload will be a JSON string of a status. 143 // If `event` = `notification`, then the payload will be a JSON string of a notification. 144 // If `event` = `delete`, then the payload will be a status ID. 145 // type: string 146 // example: "{\"id\":\"01FC3TZ5CFG6H65GCKCJRKA669\",\"created_at\":\"2021-08-02T16:25:52Z\",\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://gts.superseriousbusiness.org/users/dumpsterqueer/statuses/01FC3TZ5CFG6H65GCKCJRKA669\",\"url\":\"https://gts.superseriousbusiness.org/@dumpsterqueer/statuses/01FC3TZ5CFG6H65GCKCJRKA669\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":fals…//gts.superseriousbusiness.org/fileserver/01JNN207W98SGG3CBJ76R5MVDN/header/original/019036W043D8FXPJKSKCX7G965.png\",\"header_static\":\"https://gts.superseriousbusiness.org/fileserver/01JNN207W98SGG3CBJ76R5MVDN/header/small/019036W043D8FXPJKSKCX7G965.png\",\"followers_count\":33,\"following_count\":28,\"statuses_count\":126,\"last_status_at\":\"2021-08-02T16:25:52Z\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null,\"text\":\"a\"}" 147 // '401': 148 // description: unauthorized 149 // '400': 150 // description: bad request 151 func (m *Module) StreamGETHandler(c *gin.Context) { 152 153 // First we check for a query param provided access token 154 token := c.Query(AccessTokenQueryKey) 155 if token == "" { 156 // Else we check the HTTP header provided token 157 token = c.GetHeader(AccessTokenHeader) 158 } 159 160 var account *gtsmodel.Account 161 if token != "" { 162 // Check the explicit token 163 var errWithCode gtserror.WithCode 164 account, errWithCode = m.processor.Stream().Authorize(c.Request.Context(), token) 165 if errWithCode != nil { 166 apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) 167 return 168 } 169 } else { 170 // If no explicit token was provided, try regular oauth 171 auth, errStr := oauth.Authed(c, true, true, true, true) 172 if errStr != nil { 173 err := gtserror.NewErrorUnauthorized(errStr, errStr.Error()) 174 apiutil.ErrorHandler(c, err, m.processor.InstanceGetV1) 175 return 176 } 177 account = auth.Account 178 } 179 180 // Get the initial stream type, if there is one. 181 // By appending other query params to the streamType, 182 // we can allow for streaming for specific list IDs 183 // or hashtags. 184 streamType := c.Query(StreamQueryKey) 185 if list := c.Query(StreamListKey); list != "" { 186 streamType += ":" + list 187 } else if tag := c.Query(StreamTagKey); tag != "" { 188 streamType += ":" + tag 189 } 190 191 stream, errWithCode := m.processor.Stream().Open(c.Request.Context(), account, streamType) 192 if errWithCode != nil { 193 apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1) 194 return 195 } 196 197 l := log.WithContext(c.Request.Context()). 198 WithFields(kv.Fields{ 199 {"account", account.Username}, 200 {"streamID", stream.ID}, 201 {"streamType", streamType}, 202 }...) 203 204 // Upgrade the incoming HTTP request, which hijacks the underlying 205 // connection and reuses it for the websocket (non-http) protocol. 206 wsConn, err := m.wsUpgrade.Upgrade(c.Writer, c.Request, nil) 207 if err != nil { 208 l.Errorf("error upgrading websocket connection: %v", err) 209 close(stream.Hangup) 210 return 211 } 212 213 go func() { 214 // We perform the main websocket send loop in a separate 215 // goroutine in order to let the upgrade handler return. 216 // This prevents the upgrade handler from holding open any 217 // throttle / rate-limit request tokens which could become 218 // problematic on instances with multiple users. 219 l.Info("opened websocket connection") 220 defer l.Info("closed websocket connection") 221 222 // Create new context for lifetime of the connection 223 ctx, cncl := context.WithCancel(context.Background()) 224 225 // Create ticker to send alive pings 226 pinger := time.NewTicker(m.dTicker) 227 228 defer func() { 229 // Signal done 230 cncl() 231 232 // Close websocket conn 233 _ = wsConn.Close() 234 235 // Close processor stream 236 close(stream.Hangup) 237 238 // Stop ping ticker 239 pinger.Stop() 240 }() 241 242 go func() { 243 // Signal done 244 defer cncl() 245 246 for { 247 // We have to listen for received websocket messages in 248 // order to trigger the underlying wsConn.PingHandler(). 249 // 250 // Read JSON objects from the client and act on them 251 var msg map[string]string 252 err := wsConn.ReadJSON(&msg) 253 if err != nil { 254 if ctx.Err() == nil { 255 // Only log error if the connection was not closed 256 // by us. Uncanceled context indicates this is the case. 257 l.Errorf("error reading from websocket: %v", err) 258 } 259 return 260 } 261 l.Tracef("received message from websocket: %v", msg) 262 263 // If the message contains 'stream' and 'type' fields, we can 264 // update the set of timelines that are subscribed for events. 265 updateType, ok := msg["type"] 266 if !ok { 267 l.Warn("'type' field not provided") 268 continue 269 } 270 271 updateStream, ok := msg["stream"] 272 if !ok { 273 l.Warn("'stream' field not provided") 274 continue 275 } 276 277 // Ignore if the updateStreamType is unknown (or missing), 278 // so a bad client can't cause extra memory allocations 279 if !slices.Contains(streampkg.AllStatusTimelines, updateStream) { 280 l.Warnf("unknown 'stream' field: %v", msg) 281 continue 282 } 283 284 updateList, ok := msg["list"] 285 if ok { 286 updateStream += ":" + updateList 287 } 288 289 switch updateType { 290 case "subscribe": 291 stream.Lock() 292 stream.StreamTypes[updateStream] = true 293 stream.Unlock() 294 case "unsubscribe": 295 stream.Lock() 296 delete(stream.StreamTypes, updateStream) 297 stream.Unlock() 298 default: 299 l.Warnf("invalid 'type' field: %v", msg) 300 } 301 } 302 }() 303 304 for { 305 select { 306 // Connection closed 307 case <-ctx.Done(): 308 return 309 310 // Received next stream message 311 case msg := <-stream.Messages: 312 l.Tracef("sending message to websocket: %+v", msg) 313 if err := wsConn.WriteJSON(msg); err != nil { 314 l.Debugf("error writing json to websocket: %v", err) 315 return 316 } 317 318 // Reset on each successful send. 319 pinger.Reset(m.dTicker) 320 321 // Send keep-alive "ping" 322 case <-pinger.C: 323 l.Trace("pinging websocket ...") 324 if err := wsConn.WriteMessage( 325 websocket.PingMessage, 326 []byte{}, 327 ); err != nil { 328 l.Debugf("error writing ping to websocket: %v", err) 329 return 330 } 331 } 332 } 333 }() 334 }