gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

stream.go (2275B)


      1 // GoToSocial
      2 // Copyright (C) GoToSocial Authors admin@gotosocial.org
      3 // SPDX-License-Identifier: AGPL-3.0-or-later
      4 //
      5 // This program is free software: you can redistribute it and/or modify
      6 // it under the terms of the GNU Affero General Public License as published by
      7 // the Free Software Foundation, either version 3 of the License, or
      8 // (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU Affero General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU Affero General Public License
     16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18 package stream
     19 
     20 import (
     21 	"sync"
     22 
     23 	"github.com/superseriousbusiness/gotosocial/internal/oauth"
     24 	"github.com/superseriousbusiness/gotosocial/internal/state"
     25 	"github.com/superseriousbusiness/gotosocial/internal/stream"
     26 )
     27 
     28 type Processor struct {
     29 	state       *state.State
     30 	oauthServer oauth.Server
     31 	streamMap   sync.Map
     32 }
     33 
     34 func New(state *state.State, oauthServer oauth.Server) Processor {
     35 	return Processor{
     36 		state:       state,
     37 		oauthServer: oauthServer,
     38 	}
     39 }
     40 
     41 // toAccount streams the given payload with the given event type to any streams currently open for the given account ID.
     42 func (p *Processor) toAccount(payload string, event string, streamTypes []string, accountID string) error {
     43 	// Load all streams open for this account.
     44 	v, ok := p.streamMap.Load(accountID)
     45 	if !ok {
     46 		return nil // No entry = nothing to stream.
     47 	}
     48 	streamsForAccount := v.(*stream.StreamsForAccount) //nolint:forcetypeassert
     49 
     50 	streamsForAccount.Lock()
     51 	defer streamsForAccount.Unlock()
     52 
     53 	for _, s := range streamsForAccount.Streams {
     54 		s.Lock()
     55 		defer s.Unlock()
     56 
     57 		if !s.Connected {
     58 			continue
     59 		}
     60 
     61 	typeLoop:
     62 		for _, streamType := range streamTypes {
     63 			if _, found := s.StreamTypes[streamType]; found {
     64 				s.Messages <- &stream.Message{
     65 					Stream:  []string{streamType},
     66 					Event:   string(event),
     67 					Payload: payload,
     68 				}
     69 
     70 				// Break out to the outer loop,
     71 				// to avoid sending duplicates of
     72 				// the same event to the same stream.
     73 				break typeLoop
     74 			}
     75 		}
     76 	}
     77 
     78 	return nil
     79 }