manager.go (10131B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package timeline 19 20 import ( 21 "context" 22 "sync" 23 "time" 24 25 "github.com/superseriousbusiness/gotosocial/internal/gtserror" 26 "github.com/superseriousbusiness/gotosocial/internal/log" 27 ) 28 29 const ( 30 pruneLengthIndexed = 400 31 pruneLengthPrepared = 50 32 ) 33 34 // Manager abstracts functions for creating multiple timelines, and adding, removing, and fetching entries from those timelines. 35 // 36 // By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed 37 // belongs in the given timeline. 38 // 39 // The manager makes a distinction between *indexed* items and *prepared* items. 40 // 41 // Indexed items consist of just that item's ID (in the database) and the time it was created. An indexed item takes up very little memory, so 42 // it's not a huge priority to keep trimming the indexed items list. 43 // 44 // Prepared items consist of the item's database ID, the time it was created, AND the apimodel representation of that item, for quick serialization. 45 // Prepared items of course take up more memory than indexed items, so they should be regularly pruned if they're not being actively served. 46 type Manager interface { 47 // IngestOne takes one timelineable and indexes it into the given timeline, and then immediately prepares it for serving. 48 // This is useful in cases where we know the item will need to be shown at the top of a user's timeline immediately (eg., a new status is created). 49 // 50 // It should already be established before calling this function that the item actually belongs in the timeline! 51 // 52 // The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where 53 // a status is a boost, but a boost of the original status or the status itself already exists recently in the timeline. 54 IngestOne(ctx context.Context, timelineID string, item Timelineable) (bool, error) 55 56 // GetTimeline returns limit n amount of prepared entries from the given timeline, in descending chronological order. 57 GetTimeline(ctx context.Context, timelineID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) 58 59 // GetIndexedLength returns the amount of items that have been indexed for the given account ID. 60 GetIndexedLength(ctx context.Context, timelineID string) int 61 62 // GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given timeline. 63 // Will be an empty string if nothing is (yet) indexed. 64 GetOldestIndexedID(ctx context.Context, timelineID string) string 65 66 // Remove removes one item from the given timeline. 67 Remove(ctx context.Context, timelineID string, itemID string) (int, error) 68 69 // RemoveTimeline completely removes one timeline. 70 RemoveTimeline(ctx context.Context, timelineID string) error 71 72 // WipeItemFromAllTimelines removes one item from the index and prepared items of all timelines 73 WipeItemFromAllTimelines(ctx context.Context, itemID string) error 74 75 // WipeStatusesFromAccountID removes all items by the given accountID from the given timeline. 76 WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error 77 78 // UnprepareItem unprepares/uncaches the prepared version fo the given itemID from the given timelineID. 79 // Use this for cache invalidation when the prepared representation of an item has changed. 80 UnprepareItem(ctx context.Context, timelineID string, itemID string) error 81 82 // UnprepareItemFromAllTimelines unprepares/uncaches the prepared version of the given itemID from all timelines. 83 // Use this for cache invalidation when the prepared representation of an item has changed. 84 UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error 85 86 // Prune manually triggers a prune operation for the given timelineID. 87 Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error) 88 89 // Start starts hourly cleanup jobs for this timeline manager. 90 Start() error 91 92 // Stop stops the timeline manager (currently a stub, doesn't do anything). 93 Stop() error 94 } 95 96 // NewManager returns a new timeline manager. 97 func NewManager(grabFunction GrabFunction, filterFunction FilterFunction, prepareFunction PrepareFunction, skipInsertFunction SkipInsertFunction) Manager { 98 return &manager{ 99 timelines: sync.Map{}, 100 grabFunction: grabFunction, 101 filterFunction: filterFunction, 102 prepareFunction: prepareFunction, 103 skipInsertFunction: skipInsertFunction, 104 } 105 } 106 107 type manager struct { 108 timelines sync.Map 109 grabFunction GrabFunction 110 filterFunction FilterFunction 111 prepareFunction PrepareFunction 112 skipInsertFunction SkipInsertFunction 113 } 114 115 func (m *manager) Start() error { 116 // Start a background goroutine which iterates 117 // through all stored timelines once per hour, 118 // and cleans up old entries if that timeline 119 // hasn't been accessed in the last hour. 120 go func() { 121 for now := range time.NewTicker(1 * time.Hour).C { 122 // Define the range function inside here, 123 // so that we can use the 'now' returned 124 // by the ticker, instead of having to call 125 // time.Now() multiple times. 126 // 127 // Unless it panics, this function always 128 // returns 'true', to continue the Range 129 // call through the sync.Map. 130 f := func(_ any, v any) bool { 131 timeline, ok := v.(Timeline) 132 if !ok { 133 log.Panic(nil, "couldn't parse timeline manager sync map value as Timeline, this should never happen so panic") 134 } 135 136 if now.Sub(timeline.LastGot()) < 1*time.Hour { 137 // Timeline has been fetched in the 138 // last hour, move on to the next one. 139 return true 140 } 141 142 if amountPruned := timeline.Prune(pruneLengthPrepared, pruneLengthIndexed); amountPruned > 0 { 143 log.WithField("accountID", timeline.TimelineID()).Infof("pruned %d indexed and prepared items from timeline", amountPruned) 144 } 145 146 return true 147 } 148 149 // Execute the function for each timeline. 150 m.timelines.Range(f) 151 } 152 }() 153 154 return nil 155 } 156 157 func (m *manager) Stop() error { 158 return nil 159 } 160 161 func (m *manager) IngestOne(ctx context.Context, timelineID string, item Timelineable) (bool, error) { 162 return m.getOrCreateTimeline(ctx, timelineID).IndexAndPrepareOne( 163 ctx, 164 item.GetID(), 165 item.GetBoostOfID(), 166 item.GetAccountID(), 167 item.GetBoostOfAccountID(), 168 ) 169 } 170 171 func (m *manager) Remove(ctx context.Context, timelineID string, itemID string) (int, error) { 172 return m.getOrCreateTimeline(ctx, timelineID).Remove(ctx, itemID) 173 } 174 175 func (m *manager) RemoveTimeline(ctx context.Context, timelineID string) error { 176 m.timelines.Delete(timelineID) 177 return nil 178 } 179 180 func (m *manager) GetTimeline(ctx context.Context, timelineID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) { 181 return m.getOrCreateTimeline(ctx, timelineID).Get(ctx, limit, maxID, sinceID, minID, true) 182 } 183 184 func (m *manager) GetIndexedLength(ctx context.Context, timelineID string) int { 185 return m.getOrCreateTimeline(ctx, timelineID).Len() 186 } 187 188 func (m *manager) GetOldestIndexedID(ctx context.Context, timelineID string) string { 189 return m.getOrCreateTimeline(ctx, timelineID).OldestIndexedItemID() 190 } 191 192 func (m *manager) WipeItemFromAllTimelines(ctx context.Context, itemID string) error { 193 errors := gtserror.MultiError{} 194 195 m.timelines.Range(func(_ any, v any) bool { 196 if _, err := v.(Timeline).Remove(ctx, itemID); err != nil { 197 errors.Append(err) 198 } 199 200 return true // always continue range 201 }) 202 203 if len(errors) > 0 { 204 return gtserror.Newf("error(s) wiping status %s: %w", itemID, errors.Combine()) 205 } 206 207 return nil 208 } 209 210 func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error { 211 _, err := m.getOrCreateTimeline(ctx, timelineID).RemoveAllByOrBoosting(ctx, accountID) 212 return err 213 } 214 215 func (m *manager) UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error { 216 errors := gtserror.MultiError{} 217 218 // Work through all timelines held by this 219 // manager, and call Unprepare for each. 220 m.timelines.Range(func(_ any, v any) bool { 221 // nolint:forcetypeassert 222 if err := v.(Timeline).Unprepare(ctx, itemID); err != nil { 223 errors.Append(err) 224 } 225 226 return true // always continue range 227 }) 228 229 if len(errors) > 0 { 230 return gtserror.Newf("error(s) unpreparing status %s: %w", itemID, errors.Combine()) 231 } 232 233 return nil 234 } 235 236 func (m *manager) UnprepareItem(ctx context.Context, timelineID string, itemID string) error { 237 return m.getOrCreateTimeline(ctx, timelineID).Unprepare(ctx, itemID) 238 } 239 240 func (m *manager) Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error) { 241 return m.getOrCreateTimeline(ctx, timelineID).Prune(desiredPreparedItemsLength, desiredIndexedItemsLength), nil 242 } 243 244 // getOrCreateTimeline returns a timeline with the given id, 245 // creating a new timeline with that id if necessary. 246 func (m *manager) getOrCreateTimeline(ctx context.Context, timelineID string) Timeline { 247 i, ok := m.timelines.Load(timelineID) 248 if ok { 249 // Timeline already existed in sync.Map. 250 return i.(Timeline) //nolint:forcetypeassert 251 } 252 253 // Timeline did not yet exist in sync.Map. 254 // Create + store it. 255 timeline := NewTimeline(ctx, timelineID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction) 256 m.timelines.Store(timelineID, timeline) 257 258 return timeline 259 }