open.go (4406B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package stream 19 20 import ( 21 "context" 22 "errors" 23 "fmt" 24 25 "codeberg.org/gruf/go-kv" 26 "github.com/superseriousbusiness/gotosocial/internal/gtserror" 27 "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" 28 "github.com/superseriousbusiness/gotosocial/internal/id" 29 "github.com/superseriousbusiness/gotosocial/internal/log" 30 "github.com/superseriousbusiness/gotosocial/internal/stream" 31 ) 32 33 // Open returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. 34 func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { 35 l := log.WithContext(ctx).WithFields(kv.Fields{ 36 {"account", account.ID}, 37 {"streamType", streamType}, 38 }...) 39 l.Debug("received open stream request") 40 41 var ( 42 streamID string 43 err error 44 ) 45 46 // Each stream needs a unique ID so we know to close it. 47 streamID, err = id.NewRandomULID() 48 if err != nil { 49 return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %w", err)) 50 } 51 52 // Each stream can be subscibed to multiple types. 53 // Record them in a set, and include the initial one 54 // if it was given to us. 55 streamTypes := map[string]any{} 56 if streamType != "" { 57 streamTypes[streamType] = true 58 } 59 60 newStream := &stream.Stream{ 61 ID: streamID, 62 StreamTypes: streamTypes, 63 Messages: make(chan *stream.Message, 100), 64 Hangup: make(chan interface{}, 1), 65 Connected: true, 66 } 67 go p.waitToCloseStream(account, newStream) 68 69 v, ok := p.streamMap.Load(account.ID) 70 if ok { 71 // There is an entry in the streamMap 72 // for this account. Parse it out. 73 streamsForAccount, ok := v.(*stream.StreamsForAccount) 74 if !ok { 75 return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) 76 } 77 78 // Append new stream to existing entry. 79 streamsForAccount.Lock() 80 streamsForAccount.Streams = append(streamsForAccount.Streams, newStream) 81 streamsForAccount.Unlock() 82 } else { 83 // There is no entry in the streamMap for 84 // this account yet. Create one and store it. 85 p.streamMap.Store(account.ID, &stream.StreamsForAccount{ 86 Streams: []*stream.Stream{ 87 newStream, 88 }, 89 }) 90 } 91 92 return newStream, nil 93 } 94 95 // waitToCloseStream waits until the hangup channel is closed for the given stream. 96 // It then iterates through the map of streams stored by the processor, removes the stream from it, 97 // and then closes the messages channel of the stream to indicate that the channel should no longer be read from. 98 func (p *Processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) { 99 <-thisStream.Hangup // wait for a hangup message 100 101 // lock the stream to prevent more messages being put in it while we work 102 thisStream.Lock() 103 defer thisStream.Unlock() 104 105 // indicate the stream is no longer connected 106 thisStream.Connected = false 107 108 // load and parse the entry for this account from the stream map 109 v, ok := p.streamMap.Load(account.ID) 110 if !ok || v == nil { 111 return 112 } 113 streamsForAccount, ok := v.(*stream.StreamsForAccount) 114 if !ok { 115 return 116 } 117 118 // lock the streams for account while we remove this stream from its slice 119 streamsForAccount.Lock() 120 defer streamsForAccount.Unlock() 121 122 // put everything into modified streams *except* the stream we're removing 123 modifiedStreams := []*stream.Stream{} 124 for _, s := range streamsForAccount.Streams { 125 if s.ID != thisStream.ID { 126 modifiedStreams = append(modifiedStreams, s) 127 } 128 } 129 streamsForAccount.Streams = modifiedStreams 130 131 // finally close the messages channel so no more messages can be read from it 132 close(thisStream.Messages) 133 }