commit 3caae376e77a270f57733093163eafa3db8c71bc
parent a7882fabc7831c7e43e1611b886eb3d3a1daa1b1
Author: tobi <31960611+tsmethurst@users.noreply.github.com>
Date: Mon, 22 Nov 2021 19:03:21 +0100
Fix streamed messages ending up in wrong timeline(s) (#325)
* define timeline consts
* remove double stream of status
* change test stream creation up a bit
* stream messages more selectively
* add test for streaming new status creation via clientAPI
* tidy code + comments a bit
* tidy up tests
* make sure new status isn't streamed to public
Diffstat:
10 files changed, 208 insertions(+), 50 deletions(-)
diff --git a/internal/processing/fromclientapi_test.go b/internal/processing/fromclientapi_test.go
@@ -0,0 +1,118 @@
+/*
+ GoToSocial
+ Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package processing_test
+
+import (
+ "context"
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/ap"
+ "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type FromClientAPITestSuite struct {
+ ProcessingStandardTestSuite
+}
+
+func (suite *FromClientAPITestSuite) TestProcessStreamNewStatus() {
+ ctx := context.Background()
+
+ // let's say that the admin account posts a new status: it should end up in the
+ // timeline of any account that follows it and has a stream open
+ postingAccount := suite.testAccounts["admin_account"]
+ receivingAccount := suite.testAccounts["local_account_1"]
+
+ // open a home timeline stream for zork
+ wssStream, errWithCode := suite.processor.OpenStreamForAccount(ctx, receivingAccount, stream.TimelineHome)
+ suite.NoError(errWithCode)
+
+ // open another stream for zork, but for a different timeline;
+ // this shouldn't get stuff streamed into it, since it's for the public timeline
+ irrelevantStream, errWithCode := suite.processor.OpenStreamForAccount(ctx, receivingAccount, stream.TimelinePublic)
+ suite.NoError(errWithCode)
+
+ // make a new status from admin account
+ newStatus := >smodel.Status{
+ ID: "01FN4B2F88TF9676DYNXWE1WSS",
+ URI: "http://localhost:8080/users/admin/statuses/01FN4B2F88TF9676DYNXWE1WSS",
+ URL: "http://localhost:8080/@admin/statuses/01FN4B2F88TF9676DYNXWE1WSS",
+ Content: "this status should stream :)",
+ AttachmentIDs: []string{},
+ TagIDs: []string{},
+ MentionIDs: []string{},
+ EmojiIDs: []string{},
+ CreatedAt: testrig.TimeMustParse("2021-10-20T11:36:45Z"),
+ UpdatedAt: testrig.TimeMustParse("2021-10-20T11:36:45Z"),
+ Local: true,
+ AccountURI: "http://localhost:8080/users/admin",
+ AccountID: "01F8MH17FWEB39HZJ76B6VXSKF",
+ InReplyToID: "",
+ BoostOfID: "",
+ ContentWarning: "",
+ Visibility: gtsmodel.VisibilityFollowersOnly,
+ Sensitive: false,
+ Language: "en",
+ CreatedWithApplicationID: "01F8MGXQRHYF5QPMTMXP78QC2F",
+ Federated: false, // set federated as false for this one, since we're not testing federation stuff now
+ Boostable: true,
+ Replyable: true,
+ Likeable: true,
+ ActivityStreamsType: ap.ObjectNote,
+ }
+
+ // put the status in the db first, to mimic what would have already happened earlier up the flow
+ err := suite.db.PutStatus(ctx, newStatus)
+ suite.NoError(err)
+
+ // process the new status
+ err = suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: newStatus,
+ OriginAccount: postingAccount,
+ })
+ suite.NoError(err)
+
+ // zork's stream should have the newly created status in it now
+ msg := <-wssStream.Messages
+ suite.Equal(stream.EventTypeUpdate, msg.Event)
+ suite.NotEmpty(msg.Payload)
+ suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)
+ statusStreamed := &model.Status{}
+ err = json.Unmarshal([]byte(msg.Payload), statusStreamed)
+ suite.NoError(err)
+ suite.Equal("01FN4B2F88TF9676DYNXWE1WSS", statusStreamed.ID)
+ suite.Equal("this status should stream :)", statusStreamed.Content)
+
+ // and stream should now be empty
+ suite.Empty(wssStream.Messages)
+
+ // the irrelevant messages stream should also be empty
+ suite.Empty(irrelevantStream.Messages)
+}
+
+func TestFromClientAPITestSuite(t *testing.T) {
+ suite.Run(t, &FromClientAPITestSuite{})
+}
diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go
@@ -27,6 +27,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
)
func (p *processor) notifyStatus(ctx context.Context, status *gtsmodel.Status) error {
@@ -328,6 +329,8 @@ func (p *processor) notifyAnnounce(ctx context.Context, status *gtsmodel.Status)
return nil
}
+// timelineStatus processes the given new status and inserts it into
+// the HOME timelines of accounts that follow the status author.
func (p *processor) timelineStatus(ctx context.Context, status *gtsmodel.Status) error {
// make sure the author account is pinned onto the status
if status.Account == nil {
@@ -376,14 +379,18 @@ func (p *processor) timelineStatus(ctx context.Context, status *gtsmodel.Status)
close(errors)
if len(errs) != 0 {
- // we have some errors
+ // we have at least one error
return fmt.Errorf("timelineStatus: one or more errors timelining statuses: %s", strings.Join(errs, ";"))
}
- // no errors, nice
return nil
}
+// timelineStatusForAccount puts the given status in the HOME timeline
+// of the account with given accountID, if it's hometimelineable.
+//
+// If the status was inserted into the home timeline of the given account,
+// it will also be streamed via websockets to the user.
func (p *processor) timelineStatusForAccount(ctx context.Context, status *gtsmodel.Status, accountID string, errors chan error, wg *sync.WaitGroup) {
defer wg.Done()
@@ -412,28 +419,22 @@ func (p *processor) timelineStatusForAccount(ctx context.Context, status *gtsmod
return
}
- // the status was inserted to stream it to the user
+ // the status was inserted so stream it to the user
if inserted {
apiStatus, err := p.tc.StatusToAPIStatus(ctx, status, timelineAccount)
if err != nil {
errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err)
- } else {
- if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount); err != nil {
- errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err)
- }
+ return
}
- }
- apiStatus, err := p.tc.StatusToAPIStatus(ctx, status, timelineAccount)
- if err != nil {
- errors <- fmt.Errorf("timelineStatusForAccount: error converting status %s to frontend representation: %s", status.ID, err)
- } else {
- if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount); err != nil {
+ if err := p.streamingProcessor.StreamUpdateToAccount(apiStatus, timelineAccount, stream.TimelineHome); err != nil {
errors <- fmt.Errorf("timelineStatusForAccount: error streaming status %s: %s", status.ID, err)
}
}
}
+// deleteStatusFromTimelines completely removes the given status from all timelines.
+// It will also stream deletion of the status to all open streams.
func (p *processor) deleteStatusFromTimelines(ctx context.Context, status *gtsmodel.Status) error {
if err := p.timelineManager.WipeStatusFromAllTimelines(ctx, status.ID); err != nil {
return err
diff --git a/internal/processing/fromfederator_test.go b/internal/processing/fromfederator_test.go
@@ -32,6 +32,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
"github.com/superseriousbusiness/gotosocial/testrig"
)
@@ -115,6 +116,9 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() {
Likeable: true,
}
+ wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), repliedAccount, stream.TimelineHome)
+ suite.NoError(errWithCode)
+
// id the status based on the time it was created
statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt)
suite.NoError(err)
@@ -153,6 +157,17 @@ func (suite *FromFederatorTestSuite) TestProcessReplyMention() {
suite.Equal(replyingStatus.AccountID, notif.OriginAccountID)
suite.Equal(replyingStatus.ID, notif.StatusID)
suite.False(notif.Read)
+
+ // the notification should also be streamed
+ msg := <-wssStream.Messages
+ suite.Equal(stream.EventTypeNotification, msg.Event)
+ suite.NotEmpty(msg.Payload)
+ suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)
+ notifStreamed := &model.Notification{}
+ err = json.Unmarshal([]byte(msg.Payload), notifStreamed)
+ suite.NoError(err)
+ suite.Equal("mention", notifStreamed.Type)
+ suite.Equal(replyingAccount.ID, notifStreamed.Account.ID)
}
func (suite *FromFederatorTestSuite) TestProcessFave() {
@@ -160,7 +175,7 @@ func (suite *FromFederatorTestSuite) TestProcessFave() {
favedStatus := suite.testStatuses["local_account_1_status_1"]
favingAccount := suite.testAccounts["remote_account_1"]
- stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), favedAccount, "user")
+ wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), favedAccount, stream.TimelineNotifications)
suite.NoError(errWithCode)
fave := >smodel.StatusFave{
@@ -210,10 +225,10 @@ func (suite *FromFederatorTestSuite) TestProcessFave() {
suite.False(notif.Read)
// 2. a notification should be streamed
- msg := <-stream.Messages
- suite.Equal("notification", msg.Event)
+ msg := <-wssStream.Messages
+ suite.Equal(stream.EventTypeNotification, msg.Event)
suite.NotEmpty(msg.Payload)
- suite.EqualValues([]string{"user"}, msg.Stream)
+ suite.EqualValues([]string{stream.TimelineNotifications}, msg.Stream)
}
// TestProcessFaveWithDifferentReceivingAccount ensures that when an account receives a fave that's for
@@ -227,7 +242,7 @@ func (suite *FromFederatorTestSuite) TestProcessFaveWithDifferentReceivingAccoun
favedStatus := suite.testStatuses["local_account_1_status_1"]
favingAccount := suite.testAccounts["remote_account_1"]
- stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), receivingAccount, "user")
+ wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), receivingAccount, stream.TimelineHome)
suite.NoError(errWithCode)
fave := >smodel.StatusFave{
@@ -277,7 +292,7 @@ func (suite *FromFederatorTestSuite) TestProcessFaveWithDifferentReceivingAccoun
suite.False(notif.Read)
// 2. no notification should be streamed to the account that received the fave message, because they weren't the target
- suite.Empty(stream.Messages)
+ suite.Empty(wssStream.Messages)
}
func (suite *FromFederatorTestSuite) TestProcessAccountDelete() {
@@ -368,7 +383,7 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestLocked() {
// target is a locked account
targetAccount := suite.testAccounts["local_account_2"]
- stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, "user")
+ wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, stream.TimelineHome)
suite.NoError(errWithCode)
// put the follow request in the database as though it had passed through the federating db already
@@ -397,10 +412,10 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestLocked() {
suite.NoError(err)
// a notification should be streamed
- msg := <-stream.Messages
- suite.Equal("notification", msg.Event)
+ msg := <-wssStream.Messages
+ suite.Equal(stream.EventTypeNotification, msg.Event)
suite.NotEmpty(msg.Payload)
- suite.EqualValues([]string{"user"}, msg.Stream)
+ suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)
notif := &model.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notif)
suite.NoError(err)
@@ -419,7 +434,7 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() {
// target is an unlocked account
targetAccount := suite.testAccounts["local_account_1"]
- stream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, "user")
+ wssStream, errWithCode := suite.processor.OpenStreamForAccount(context.Background(), targetAccount, stream.TimelineHome)
suite.NoError(errWithCode)
// put the follow request in the database as though it had passed through the federating db already
@@ -448,10 +463,10 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() {
suite.NoError(err)
// a notification should be streamed
- msg := <-stream.Messages
- suite.Equal("notification", msg.Event)
+ msg := <-wssStream.Messages
+ suite.Equal(stream.EventTypeNotification, msg.Event)
suite.NotEmpty(msg.Payload)
- suite.EqualValues([]string{"user"}, msg.Stream)
+ suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)
notif := &model.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notif)
suite.NoError(err)
diff --git a/internal/processing/streaming/notification.go b/internal/processing/streaming/notification.go
@@ -33,5 +33,5 @@ func (p *processor) StreamNotificationToAccount(n *apimodel.Notification, accoun
return fmt.Errorf("error marshalling notification to json: %s", err)
}
- return p.streamToAccount(string(bytes), stream.EventTypeNotification, account.ID)
+ return p.streamToAccount(string(bytes), stream.EventTypeNotification, []string{stream.TimelineNotifications, stream.TimelineHome}, account.ID)
}
diff --git a/internal/processing/streaming/openstream.go b/internal/processing/streaming/openstream.go
@@ -30,11 +30,11 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/stream"
)
-func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) {
+func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) {
l := logrus.WithFields(logrus.Fields{
"func": "OpenStreamForAccount",
"account": account.ID,
- "streamType": streamType,
+ "streamType": streamTimeline,
})
l.Debug("received open stream request")
@@ -46,7 +46,7 @@ func (p *processor) OpenStreamForAccount(ctx context.Context, account *gtsmodel.
thisStream := &stream.Stream{
ID: streamID,
- Type: streamType,
+ Timeline: streamTimeline,
Messages: make(chan *stream.Message, 100),
Hangup: make(chan interface{}, 1),
Connected: true,
diff --git a/internal/processing/streaming/streamdelete.go b/internal/processing/streaming/streamdelete.go
@@ -37,7 +37,7 @@ func (p *processor) StreamDelete(statusID string) error {
// stream the delete to every account
for _, accountID := range accountIDs {
- if err := p.streamToAccount(statusID, stream.EventTypeDelete, accountID); err != nil {
+ if err := p.streamToAccount(statusID, stream.EventTypeDelete, stream.AllStatusTimelines, accountID); err != nil {
errs = append(errs, err.Error())
}
}
diff --git a/internal/processing/streaming/streaming.go b/internal/processing/streaming/streaming.go
@@ -35,9 +35,9 @@ type Processor interface {
// AuthorizeStreamingRequest returns an oauth2 token info in response to an access token query from the streaming API
AuthorizeStreamingRequest(ctx context.Context, accessToken string) (*gtsmodel.Account, error)
// OpenStreamForAccount returns a new Stream for the given account, which will contain a channel for passing messages back to the caller.
- OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode)
+ OpenStreamForAccount(ctx context.Context, account *gtsmodel.Account, timeline string) (*stream.Stream, gtserror.WithCode)
// StreamUpdateToAccount streams the given update to any open, appropriate streams belonging to the given account.
- StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account) error
+ StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error
// StreamNotificationToAccount streams the given notification to any open, appropriate streams belonging to the given account.
StreamNotificationToAccount(n *apimodel.Notification, account *gtsmodel.Account) error
// StreamDelete streams the delete of the given statusID to *ALL* open streams.
diff --git a/internal/processing/streaming/streamtoaccount.go b/internal/processing/streaming/streamtoaccount.go
@@ -25,7 +25,7 @@ import (
)
// streamToAccount streams the given payload with the given event type to any streams currently open for the given account ID.
-func (p *processor) streamToAccount(payload string, event stream.EventType, accountID string) error {
+func (p *processor) streamToAccount(payload string, event string, timelines []string, accountID string) error {
v, ok := p.streamMap.Load(accountID)
if !ok {
// no open connections so nothing to stream
@@ -42,11 +42,17 @@ func (p *processor) streamToAccount(payload string, event stream.EventType, acco
for _, s := range streamsForAccount.Streams {
s.Lock()
defer s.Unlock()
- if s.Connected {
- s.Messages <- &stream.Message{
- Stream: []string{s.Type},
- Event: string(event),
- Payload: payload,
+ if !s.Connected {
+ continue
+ }
+
+ for _, t := range timelines {
+ if s.Timeline == string(t) {
+ s.Messages <- &stream.Message{
+ Stream: []string{string(t)},
+ Event: string(event),
+ Payload: payload,
+ }
}
}
}
diff --git a/internal/processing/streaming/update.go b/internal/processing/streaming/update.go
@@ -27,11 +27,11 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/stream"
)
-func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account) error {
+func (p *processor) StreamUpdateToAccount(s *apimodel.Status, account *gtsmodel.Account, timeline string) error {
bytes, err := json.Marshal(s)
if err != nil {
return fmt.Errorf("error marshalling status to json: %s", err)
}
- return p.streamToAccount(string(bytes), stream.EventTypeUpdate, account.ID)
+ return p.streamToAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID)
}
diff --git a/internal/stream/stream.go b/internal/stream/stream.go
@@ -2,18 +2,36 @@ package stream
import "sync"
-// EventType models a type of stream event.
-type EventType string
-
const (
// EventTypeNotification -- a user should be shown a notification
- EventTypeNotification EventType = "notification"
+ EventTypeNotification string = "notification"
// EventTypeUpdate -- a user should be shown an update in their timeline
- EventTypeUpdate EventType = "update"
+ EventTypeUpdate string = "update"
// EventTypeDelete -- something should be deleted from a user
- EventTypeDelete EventType = "delete"
+ EventTypeDelete string = "delete"
+)
+
+const (
+ // TimelineLocal -- public statuses from the LOCAL timeline.
+ TimelineLocal string = "public:local"
+ // TimelinePublic -- public statuses, including federated ones.
+ TimelinePublic string = "public"
+ // TimelineHome -- statuses for a user's Home timeline.
+ TimelineHome string = "user"
+ // TimelineNotifications -- notification events.
+ TimelineNotifications string = "user:notification"
+ // TimelineDirect -- statuses sent to a user directly.
+ TimelineDirect string = "direct"
)
+// AllStatusTimelines contains all Timelines that a status could conceivably be delivered to -- useful for doing deletes.
+var AllStatusTimelines = []string{
+ TimelineLocal,
+ TimelinePublic,
+ TimelineHome,
+ TimelineDirect,
+}
+
// StreamsForAccount is a wrapper for the multiple streams that one account can have running at the same time.
// TODO: put a limit on this
type StreamsForAccount struct {
@@ -27,8 +45,8 @@ type StreamsForAccount struct {
type Stream struct {
// ID of this stream, generated during creation.
ID string
- // Type of this stream: user/public/etc
- Type string
+ // Timeline of this stream: user/public/etc
+ Timeline string
// Channel of messages for the client to read from
Messages chan *Message
// Channel to close when the client drops away