stream.go (2275B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package stream 19 20 import ( 21 "sync" 22 23 "github.com/superseriousbusiness/gotosocial/internal/oauth" 24 "github.com/superseriousbusiness/gotosocial/internal/state" 25 "github.com/superseriousbusiness/gotosocial/internal/stream" 26 ) 27 28 type Processor struct { 29 state *state.State 30 oauthServer oauth.Server 31 streamMap sync.Map 32 } 33 34 func New(state *state.State, oauthServer oauth.Server) Processor { 35 return Processor{ 36 state: state, 37 oauthServer: oauthServer, 38 } 39 } 40 41 // toAccount streams the given payload with the given event type to any streams currently open for the given account ID. 42 func (p *Processor) toAccount(payload string, event string, streamTypes []string, accountID string) error { 43 // Load all streams open for this account. 44 v, ok := p.streamMap.Load(accountID) 45 if !ok { 46 return nil // No entry = nothing to stream. 47 } 48 streamsForAccount := v.(*stream.StreamsForAccount) //nolint:forcetypeassert 49 50 streamsForAccount.Lock() 51 defer streamsForAccount.Unlock() 52 53 for _, s := range streamsForAccount.Streams { 54 s.Lock() 55 defer s.Unlock() 56 57 if !s.Connected { 58 continue 59 } 60 61 typeLoop: 62 for _, streamType := range streamTypes { 63 if _, found := s.StreamTypes[streamType]; found { 64 s.Messages <- &stream.Message{ 65 Stream: []string{streamType}, 66 Event: string(event), 67 Payload: payload, 68 } 69 70 // Break out to the outer loop, 71 // to avoid sending duplicates of 72 // the same event to the same stream. 73 break typeLoop 74 } 75 } 76 } 77 78 return nil 79 }