gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

open.go (4406B)


      1 // GoToSocial
      2 // Copyright (C) GoToSocial Authors admin@gotosocial.org
      3 // SPDX-License-Identifier: AGPL-3.0-or-later
      4 //
      5 // This program is free software: you can redistribute it and/or modify
      6 // it under the terms of the GNU Affero General Public License as published by
      7 // the Free Software Foundation, either version 3 of the License, or
      8 // (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU Affero General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU Affero General Public License
     16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18 package stream
     19 
     20 import (
     21 	"context"
     22 	"errors"
     23 	"fmt"
     24 
     25 	"codeberg.org/gruf/go-kv"
     26 	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
     27 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
     28 	"github.com/superseriousbusiness/gotosocial/internal/id"
     29 	"github.com/superseriousbusiness/gotosocial/internal/log"
     30 	"github.com/superseriousbusiness/gotosocial/internal/stream"
     31 )
     32 
     33 // Open returns a new Stream for the given account, which will contain a channel for passing messages back to the caller.
     34 func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) {
     35 	l := log.WithContext(ctx).WithFields(kv.Fields{
     36 		{"account", account.ID},
     37 		{"streamType", streamType},
     38 	}...)
     39 	l.Debug("received open stream request")
     40 
     41 	var (
     42 		streamID string
     43 		err      error
     44 	)
     45 
     46 	// Each stream needs a unique ID so we know to close it.
     47 	streamID, err = id.NewRandomULID()
     48 	if err != nil {
     49 		return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %w", err))
     50 	}
     51 
     52 	// Each stream can be subscibed to multiple types.
     53 	// Record them in a set, and include the initial one
     54 	// if it was given to us.
     55 	streamTypes := map[string]any{}
     56 	if streamType != "" {
     57 		streamTypes[streamType] = true
     58 	}
     59 
     60 	newStream := &stream.Stream{
     61 		ID:          streamID,
     62 		StreamTypes: streamTypes,
     63 		Messages:    make(chan *stream.Message, 100),
     64 		Hangup:      make(chan interface{}, 1),
     65 		Connected:   true,
     66 	}
     67 	go p.waitToCloseStream(account, newStream)
     68 
     69 	v, ok := p.streamMap.Load(account.ID)
     70 	if ok {
     71 		// There is an entry in the streamMap
     72 		// for this account. Parse it out.
     73 		streamsForAccount, ok := v.(*stream.StreamsForAccount)
     74 		if !ok {
     75 			return nil, gtserror.NewErrorInternalError(errors.New("stream map error"))
     76 		}
     77 
     78 		// Append new stream to existing entry.
     79 		streamsForAccount.Lock()
     80 		streamsForAccount.Streams = append(streamsForAccount.Streams, newStream)
     81 		streamsForAccount.Unlock()
     82 	} else {
     83 		// There is no entry in the streamMap for
     84 		// this account yet. Create one and store it.
     85 		p.streamMap.Store(account.ID, &stream.StreamsForAccount{
     86 			Streams: []*stream.Stream{
     87 				newStream,
     88 			},
     89 		})
     90 	}
     91 
     92 	return newStream, nil
     93 }
     94 
     95 // waitToCloseStream waits until the hangup channel is closed for the given stream.
     96 // It then iterates through the map of streams stored by the processor, removes the stream from it,
     97 // and then closes the messages channel of the stream to indicate that the channel should no longer be read from.
     98 func (p *Processor) waitToCloseStream(account *gtsmodel.Account, thisStream *stream.Stream) {
     99 	<-thisStream.Hangup // wait for a hangup message
    100 
    101 	// lock the stream to prevent more messages being put in it while we work
    102 	thisStream.Lock()
    103 	defer thisStream.Unlock()
    104 
    105 	// indicate the stream is no longer connected
    106 	thisStream.Connected = false
    107 
    108 	// load and parse the entry for this account from the stream map
    109 	v, ok := p.streamMap.Load(account.ID)
    110 	if !ok || v == nil {
    111 		return
    112 	}
    113 	streamsForAccount, ok := v.(*stream.StreamsForAccount)
    114 	if !ok {
    115 		return
    116 	}
    117 
    118 	// lock the streams for account while we remove this stream from its slice
    119 	streamsForAccount.Lock()
    120 	defer streamsForAccount.Unlock()
    121 
    122 	// put everything into modified streams *except* the stream we're removing
    123 	modifiedStreams := []*stream.Stream{}
    124 	for _, s := range streamsForAccount.Streams {
    125 		if s.ID != thisStream.ID {
    126 			modifiedStreams = append(modifiedStreams, s)
    127 		}
    128 	}
    129 	streamsForAccount.Streams = modifiedStreams
    130 
    131 	// finally close the messages channel so no more messages can be read from it
    132 	close(thisStream.Messages)
    133 }