gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

stream.go (10635B)


      1 // GoToSocial
      2 // Copyright (C) GoToSocial Authors admin@gotosocial.org
      3 // SPDX-License-Identifier: AGPL-3.0-or-later
      4 //
      5 // This program is free software: you can redistribute it and/or modify
      6 // it under the terms of the GNU Affero General Public License as published by
      7 // the Free Software Foundation, either version 3 of the License, or
      8 // (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU Affero General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU Affero General Public License
     16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18 package streaming
     19 
     20 import (
     21 	"context"
     22 	"time"
     23 
     24 	"codeberg.org/gruf/go-kv"
     25 	apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
     26 	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
     27 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
     28 	"github.com/superseriousbusiness/gotosocial/internal/log"
     29 	"github.com/superseriousbusiness/gotosocial/internal/oauth"
     30 	streampkg "github.com/superseriousbusiness/gotosocial/internal/stream"
     31 	"golang.org/x/exp/slices"
     32 
     33 	"github.com/gin-gonic/gin"
     34 	"github.com/gorilla/websocket"
     35 )
     36 
     37 // StreamGETHandler swagger:operation GET /api/v1/streaming streamGet
     38 //
     39 // Initiate a websocket connection for live streaming of statuses and notifications.
     40 //
     41 // The scheme used should *always* be `wss`. The streaming basepath can be viewed at `/api/v1/instance`.
     42 //
     43 // On a successful connection, a code `101` will be returned, which indicates that the connection is being upgraded to a secure websocket connection.
     44 //
     45 // As long as the connection is open, various message types will be streamed into it.
     46 //
     47 // GoToSocial will ping the connection every 30 seconds to check whether the client is still receiving.
     48 //
     49 // If the ping fails, or something else goes wrong during transmission, then the connection will be dropped, and the client will be expected to start it again.
     50 //
     51 //	---
     52 //	tags:
     53 //	- streaming
     54 //
     55 //	produces:
     56 //	- application/json
     57 //
     58 //	schemes:
     59 //	- wss
     60 //
     61 //	parameters:
     62 //	-
     63 //		name: access_token
     64 //		type: string
     65 //		description: Access token for the requesting account.
     66 //		in: query
     67 //		required: true
     68 //	-
     69 //		name: stream
     70 //		type: string
     71 //		description: |-
     72 //			Type of stream to request.
     73 //
     74 //			Options are:
     75 //
     76 //			`user`: receive updates for the account's home timeline.
     77 //			`public`: receive updates for the public timeline.
     78 //			`public:local`: receive updates for the local timeline.
     79 //			`hashtag`: receive updates for a given hashtag.
     80 //			`hashtag:local`: receive local updates for a given hashtag.
     81 //			`list`: receive updates for a certain list of accounts.
     82 //			`direct`: receive updates for direct messages.
     83 //		in: query
     84 //		required: true
     85 //	-
     86 //		name: list
     87 //		type: string
     88 //		description: |-
     89 //			ID of the list to subscribe to.
     90 //			Only used if stream type is 'list'.
     91 //		in: query
     92 //	-
     93 //		name: tag
     94 //		type: string
     95 //		description: |-
     96 //			Name of the tag to subscribe to.
     97 //			Only used if stream type is 'hashtag' or 'hashtag:local'.
     98 //		in: query
     99 //
    100 //	security:
    101 //	- OAuth2 Bearer:
    102 //		- read:streaming
    103 //
    104 //	responses:
    105 //		'101':
    106 //			schema:
    107 //				type: object
    108 //				properties:
    109 //					stream:
    110 //						type: array
    111 //						items:
    112 //							type: string
    113 //							enum:
    114 //							- user
    115 //							- public
    116 //							- public:local
    117 //							- hashtag
    118 //							- hashtag:local
    119 //							- list
    120 //							- direct
    121 //					event:
    122 //						description: |-
    123 //							The type of event being received.
    124 //
    125 //							`update`: a new status has been received.
    126 //							`notification`: a new notification has been received.
    127 //							`delete`: a status has been deleted.
    128 //							`filters_changed`: not implemented.
    129 //						type: string
    130 //						enum:
    131 //						- update
    132 //						- notification
    133 //						- delete
    134 //						- filters_changed
    135 //					payload:
    136 //						description: |-
    137 //							The payload of the streamed message.
    138 //							Different depending on the `event` type.
    139 //
    140 //							If present, it should be parsed as a string.
    141 //
    142 //							If `event` = `update`, then the payload will be a JSON string of a status.
    143 //							If `event` = `notification`, then the payload will be a JSON string of a notification.
    144 //							If `event` = `delete`, then the payload will be a status ID.
    145 //						type: string
    146 //						example: "{\"id\":\"01FC3TZ5CFG6H65GCKCJRKA669\",\"created_at\":\"2021-08-02T16:25:52Z\",\"sensitive\":false,\"spoiler_text\":\"\",\"visibility\":\"public\",\"language\":\"en\",\"uri\":\"https://gts.superseriousbusiness.org/users/dumpsterqueer/statuses/01FC3TZ5CFG6H65GCKCJRKA669\",\"url\":\"https://gts.superseriousbusiness.org/@dumpsterqueer/statuses/01FC3TZ5CFG6H65GCKCJRKA669\",\"replies_count\":0,\"reblogs_count\":0,\"favourites_count\":0,\"favourited\":false,\"reblogged\":false,\"muted\":false,\"bookmarked\":fals…//gts.superseriousbusiness.org/fileserver/01JNN207W98SGG3CBJ76R5MVDN/header/original/019036W043D8FXPJKSKCX7G965.png\",\"header_static\":\"https://gts.superseriousbusiness.org/fileserver/01JNN207W98SGG3CBJ76R5MVDN/header/small/019036W043D8FXPJKSKCX7G965.png\",\"followers_count\":33,\"following_count\":28,\"statuses_count\":126,\"last_status_at\":\"2021-08-02T16:25:52Z\",\"emojis\":[],\"fields\":[]},\"media_attachments\":[],\"mentions\":[],\"tags\":[],\"emojis\":[],\"card\":null,\"poll\":null,\"text\":\"a\"}"
    147 //		'401':
    148 //			description: unauthorized
    149 //		'400':
    150 //			description: bad request
    151 func (m *Module) StreamGETHandler(c *gin.Context) {
    152 
    153 	// First we check for a query param provided access token
    154 	token := c.Query(AccessTokenQueryKey)
    155 	if token == "" {
    156 		// Else we check the HTTP header provided token
    157 		token = c.GetHeader(AccessTokenHeader)
    158 	}
    159 
    160 	var account *gtsmodel.Account
    161 	if token != "" {
    162 		// Check the explicit token
    163 		var errWithCode gtserror.WithCode
    164 		account, errWithCode = m.processor.Stream().Authorize(c.Request.Context(), token)
    165 		if errWithCode != nil {
    166 			apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
    167 			return
    168 		}
    169 	} else {
    170 		// If no explicit token was provided, try regular oauth
    171 		auth, errStr := oauth.Authed(c, true, true, true, true)
    172 		if errStr != nil {
    173 			err := gtserror.NewErrorUnauthorized(errStr, errStr.Error())
    174 			apiutil.ErrorHandler(c, err, m.processor.InstanceGetV1)
    175 			return
    176 		}
    177 		account = auth.Account
    178 	}
    179 
    180 	// Get the initial stream type, if there is one.
    181 	// By appending other query params to the streamType,
    182 	// we can allow for streaming for specific list IDs
    183 	// or hashtags.
    184 	streamType := c.Query(StreamQueryKey)
    185 	if list := c.Query(StreamListKey); list != "" {
    186 		streamType += ":" + list
    187 	} else if tag := c.Query(StreamTagKey); tag != "" {
    188 		streamType += ":" + tag
    189 	}
    190 
    191 	stream, errWithCode := m.processor.Stream().Open(c.Request.Context(), account, streamType)
    192 	if errWithCode != nil {
    193 		apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
    194 		return
    195 	}
    196 
    197 	l := log.WithContext(c.Request.Context()).
    198 		WithFields(kv.Fields{
    199 			{"account", account.Username},
    200 			{"streamID", stream.ID},
    201 			{"streamType", streamType},
    202 		}...)
    203 
    204 	// Upgrade the incoming HTTP request, which hijacks the underlying
    205 	// connection and reuses it for the websocket (non-http) protocol.
    206 	wsConn, err := m.wsUpgrade.Upgrade(c.Writer, c.Request, nil)
    207 	if err != nil {
    208 		l.Errorf("error upgrading websocket connection: %v", err)
    209 		close(stream.Hangup)
    210 		return
    211 	}
    212 
    213 	go func() {
    214 		// We perform the main websocket send loop in a separate
    215 		// goroutine in order to let the upgrade handler return.
    216 		// This prevents the upgrade handler from holding open any
    217 		// throttle / rate-limit request tokens which could become
    218 		// problematic on instances with multiple users.
    219 		l.Info("opened websocket connection")
    220 		defer l.Info("closed websocket connection")
    221 
    222 		// Create new context for lifetime of the connection
    223 		ctx, cncl := context.WithCancel(context.Background())
    224 
    225 		// Create ticker to send alive pings
    226 		pinger := time.NewTicker(m.dTicker)
    227 
    228 		defer func() {
    229 			// Signal done
    230 			cncl()
    231 
    232 			// Close websocket conn
    233 			_ = wsConn.Close()
    234 
    235 			// Close processor stream
    236 			close(stream.Hangup)
    237 
    238 			// Stop ping ticker
    239 			pinger.Stop()
    240 		}()
    241 
    242 		go func() {
    243 			// Signal done
    244 			defer cncl()
    245 
    246 			for {
    247 				// We have to listen for received websocket messages in
    248 				// order to trigger the underlying wsConn.PingHandler().
    249 				//
    250 				// Read JSON objects from the client and act on them
    251 				var msg map[string]string
    252 				err := wsConn.ReadJSON(&msg)
    253 				if err != nil {
    254 					if ctx.Err() == nil {
    255 						// Only log error if the connection was not closed
    256 						// by us. Uncanceled context indicates this is the case.
    257 						l.Errorf("error reading from websocket: %v", err)
    258 					}
    259 					return
    260 				}
    261 				l.Tracef("received message from websocket: %v", msg)
    262 
    263 				// If the message contains 'stream' and 'type' fields, we can
    264 				// update the set of timelines that are subscribed for events.
    265 				updateType, ok := msg["type"]
    266 				if !ok {
    267 					l.Warn("'type' field not provided")
    268 					continue
    269 				}
    270 
    271 				updateStream, ok := msg["stream"]
    272 				if !ok {
    273 					l.Warn("'stream' field not provided")
    274 					continue
    275 				}
    276 
    277 				// Ignore if the updateStreamType is unknown (or missing),
    278 				// so a bad client can't cause extra memory allocations
    279 				if !slices.Contains(streampkg.AllStatusTimelines, updateStream) {
    280 					l.Warnf("unknown 'stream' field: %v", msg)
    281 					continue
    282 				}
    283 
    284 				updateList, ok := msg["list"]
    285 				if ok {
    286 					updateStream += ":" + updateList
    287 				}
    288 
    289 				switch updateType {
    290 				case "subscribe":
    291 					stream.Lock()
    292 					stream.StreamTypes[updateStream] = true
    293 					stream.Unlock()
    294 				case "unsubscribe":
    295 					stream.Lock()
    296 					delete(stream.StreamTypes, updateStream)
    297 					stream.Unlock()
    298 				default:
    299 					l.Warnf("invalid 'type' field: %v", msg)
    300 				}
    301 			}
    302 		}()
    303 
    304 		for {
    305 			select {
    306 			// Connection closed
    307 			case <-ctx.Done():
    308 				return
    309 
    310 			// Received next stream message
    311 			case msg := <-stream.Messages:
    312 				l.Tracef("sending message to websocket: %+v", msg)
    313 				if err := wsConn.WriteJSON(msg); err != nil {
    314 					l.Debugf("error writing json to websocket: %v", err)
    315 					return
    316 				}
    317 
    318 				// Reset on each successful send.
    319 				pinger.Reset(m.dTicker)
    320 
    321 			// Send keep-alive "ping"
    322 			case <-pinger.C:
    323 				l.Trace("pinging websocket ...")
    324 				if err := wsConn.WriteMessage(
    325 					websocket.PingMessage,
    326 					[]byte{},
    327 				); err != nil {
    328 					l.Debugf("error writing ping to websocket: %v", err)
    329 					return
    330 				}
    331 			}
    332 		}
    333 	}()
    334 }