gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

manager.go (10131B)


      1 // GoToSocial
      2 // Copyright (C) GoToSocial Authors admin@gotosocial.org
      3 // SPDX-License-Identifier: AGPL-3.0-or-later
      4 //
      5 // This program is free software: you can redistribute it and/or modify
      6 // it under the terms of the GNU Affero General Public License as published by
      7 // the Free Software Foundation, either version 3 of the License, or
      8 // (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU Affero General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU Affero General Public License
     16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18 package timeline
     19 
     20 import (
     21 	"context"
     22 	"sync"
     23 	"time"
     24 
     25 	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
     26 	"github.com/superseriousbusiness/gotosocial/internal/log"
     27 )
     28 
     29 const (
     30 	pruneLengthIndexed  = 400
     31 	pruneLengthPrepared = 50
     32 )
     33 
     34 // Manager abstracts functions for creating multiple timelines, and adding, removing, and fetching entries from those timelines.
     35 //
     36 // By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed
     37 // belongs in the given timeline.
     38 //
     39 // The manager makes a distinction between *indexed* items and *prepared* items.
     40 //
     41 // Indexed items consist of just that item's ID (in the database) and the time it was created. An indexed item takes up very little memory, so
     42 // it's not a huge priority to keep trimming the indexed items list.
     43 //
     44 // Prepared items consist of the item's database ID, the time it was created, AND the apimodel representation of that item, for quick serialization.
     45 // Prepared items of course take up more memory than indexed items, so they should be regularly pruned if they're not being actively served.
     46 type Manager interface {
     47 	// IngestOne takes one timelineable and indexes it into the given timeline, and then immediately prepares it for serving.
     48 	// This is useful in cases where we know the item will need to be shown at the top of a user's timeline immediately (eg., a new status is created).
     49 	//
     50 	// It should already be established before calling this function that the item actually belongs in the timeline!
     51 	//
     52 	// The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where
     53 	// a status is a boost, but a boost of the original status or the status itself already exists recently in the timeline.
     54 	IngestOne(ctx context.Context, timelineID string, item Timelineable) (bool, error)
     55 
     56 	// GetTimeline returns limit n amount of prepared entries from the given timeline, in descending chronological order.
     57 	GetTimeline(ctx context.Context, timelineID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error)
     58 
     59 	// GetIndexedLength returns the amount of items that have been indexed for the given account ID.
     60 	GetIndexedLength(ctx context.Context, timelineID string) int
     61 
     62 	// GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given timeline.
     63 	// Will be an empty string if nothing is (yet) indexed.
     64 	GetOldestIndexedID(ctx context.Context, timelineID string) string
     65 
     66 	// Remove removes one item from the given timeline.
     67 	Remove(ctx context.Context, timelineID string, itemID string) (int, error)
     68 
     69 	// RemoveTimeline completely removes one timeline.
     70 	RemoveTimeline(ctx context.Context, timelineID string) error
     71 
     72 	// WipeItemFromAllTimelines removes one item from the index and prepared items of all timelines
     73 	WipeItemFromAllTimelines(ctx context.Context, itemID string) error
     74 
     75 	// WipeStatusesFromAccountID removes all items by the given accountID from the given timeline.
     76 	WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error
     77 
     78 	// UnprepareItem unprepares/uncaches the prepared version fo the given itemID from the given timelineID.
     79 	// Use this for cache invalidation when the prepared representation of an item has changed.
     80 	UnprepareItem(ctx context.Context, timelineID string, itemID string) error
     81 
     82 	// UnprepareItemFromAllTimelines unprepares/uncaches the prepared version of the given itemID from all timelines.
     83 	// Use this for cache invalidation when the prepared representation of an item has changed.
     84 	UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error
     85 
     86 	// Prune manually triggers a prune operation for the given timelineID.
     87 	Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error)
     88 
     89 	// Start starts hourly cleanup jobs for this timeline manager.
     90 	Start() error
     91 
     92 	// Stop stops the timeline manager (currently a stub, doesn't do anything).
     93 	Stop() error
     94 }
     95 
     96 // NewManager returns a new timeline manager.
     97 func NewManager(grabFunction GrabFunction, filterFunction FilterFunction, prepareFunction PrepareFunction, skipInsertFunction SkipInsertFunction) Manager {
     98 	return &manager{
     99 		timelines:          sync.Map{},
    100 		grabFunction:       grabFunction,
    101 		filterFunction:     filterFunction,
    102 		prepareFunction:    prepareFunction,
    103 		skipInsertFunction: skipInsertFunction,
    104 	}
    105 }
    106 
    107 type manager struct {
    108 	timelines          sync.Map
    109 	grabFunction       GrabFunction
    110 	filterFunction     FilterFunction
    111 	prepareFunction    PrepareFunction
    112 	skipInsertFunction SkipInsertFunction
    113 }
    114 
    115 func (m *manager) Start() error {
    116 	// Start a background goroutine which iterates
    117 	// through all stored timelines once per hour,
    118 	// and cleans up old entries if that timeline
    119 	// hasn't been accessed in the last hour.
    120 	go func() {
    121 		for now := range time.NewTicker(1 * time.Hour).C {
    122 			// Define the range function inside here,
    123 			// so that we can use the 'now' returned
    124 			// by the ticker, instead of having to call
    125 			// time.Now() multiple times.
    126 			//
    127 			// Unless it panics, this function always
    128 			// returns 'true', to continue the Range
    129 			// call through the sync.Map.
    130 			f := func(_ any, v any) bool {
    131 				timeline, ok := v.(Timeline)
    132 				if !ok {
    133 					log.Panic(nil, "couldn't parse timeline manager sync map value as Timeline, this should never happen so panic")
    134 				}
    135 
    136 				if now.Sub(timeline.LastGot()) < 1*time.Hour {
    137 					// Timeline has been fetched in the
    138 					// last hour, move on to the next one.
    139 					return true
    140 				}
    141 
    142 				if amountPruned := timeline.Prune(pruneLengthPrepared, pruneLengthIndexed); amountPruned > 0 {
    143 					log.WithField("accountID", timeline.TimelineID()).Infof("pruned %d indexed and prepared items from timeline", amountPruned)
    144 				}
    145 
    146 				return true
    147 			}
    148 
    149 			// Execute the function for each timeline.
    150 			m.timelines.Range(f)
    151 		}
    152 	}()
    153 
    154 	return nil
    155 }
    156 
    157 func (m *manager) Stop() error {
    158 	return nil
    159 }
    160 
    161 func (m *manager) IngestOne(ctx context.Context, timelineID string, item Timelineable) (bool, error) {
    162 	return m.getOrCreateTimeline(ctx, timelineID).IndexAndPrepareOne(
    163 		ctx,
    164 		item.GetID(),
    165 		item.GetBoostOfID(),
    166 		item.GetAccountID(),
    167 		item.GetBoostOfAccountID(),
    168 	)
    169 }
    170 
    171 func (m *manager) Remove(ctx context.Context, timelineID string, itemID string) (int, error) {
    172 	return m.getOrCreateTimeline(ctx, timelineID).Remove(ctx, itemID)
    173 }
    174 
    175 func (m *manager) RemoveTimeline(ctx context.Context, timelineID string) error {
    176 	m.timelines.Delete(timelineID)
    177 	return nil
    178 }
    179 
    180 func (m *manager) GetTimeline(ctx context.Context, timelineID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) {
    181 	return m.getOrCreateTimeline(ctx, timelineID).Get(ctx, limit, maxID, sinceID, minID, true)
    182 }
    183 
    184 func (m *manager) GetIndexedLength(ctx context.Context, timelineID string) int {
    185 	return m.getOrCreateTimeline(ctx, timelineID).Len()
    186 }
    187 
    188 func (m *manager) GetOldestIndexedID(ctx context.Context, timelineID string) string {
    189 	return m.getOrCreateTimeline(ctx, timelineID).OldestIndexedItemID()
    190 }
    191 
    192 func (m *manager) WipeItemFromAllTimelines(ctx context.Context, itemID string) error {
    193 	errors := gtserror.MultiError{}
    194 
    195 	m.timelines.Range(func(_ any, v any) bool {
    196 		if _, err := v.(Timeline).Remove(ctx, itemID); err != nil {
    197 			errors.Append(err)
    198 		}
    199 
    200 		return true // always continue range
    201 	})
    202 
    203 	if len(errors) > 0 {
    204 		return gtserror.Newf("error(s) wiping status %s: %w", itemID, errors.Combine())
    205 	}
    206 
    207 	return nil
    208 }
    209 
    210 func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineID string, accountID string) error {
    211 	_, err := m.getOrCreateTimeline(ctx, timelineID).RemoveAllByOrBoosting(ctx, accountID)
    212 	return err
    213 }
    214 
    215 func (m *manager) UnprepareItemFromAllTimelines(ctx context.Context, itemID string) error {
    216 	errors := gtserror.MultiError{}
    217 
    218 	// Work through all timelines held by this
    219 	// manager, and call Unprepare for each.
    220 	m.timelines.Range(func(_ any, v any) bool {
    221 		// nolint:forcetypeassert
    222 		if err := v.(Timeline).Unprepare(ctx, itemID); err != nil {
    223 			errors.Append(err)
    224 		}
    225 
    226 		return true // always continue range
    227 	})
    228 
    229 	if len(errors) > 0 {
    230 		return gtserror.Newf("error(s) unpreparing status %s: %w", itemID, errors.Combine())
    231 	}
    232 
    233 	return nil
    234 }
    235 
    236 func (m *manager) UnprepareItem(ctx context.Context, timelineID string, itemID string) error {
    237 	return m.getOrCreateTimeline(ctx, timelineID).Unprepare(ctx, itemID)
    238 }
    239 
    240 func (m *manager) Prune(ctx context.Context, timelineID string, desiredPreparedItemsLength int, desiredIndexedItemsLength int) (int, error) {
    241 	return m.getOrCreateTimeline(ctx, timelineID).Prune(desiredPreparedItemsLength, desiredIndexedItemsLength), nil
    242 }
    243 
    244 // getOrCreateTimeline returns a timeline with the given id,
    245 // creating a new timeline with that id if necessary.
    246 func (m *manager) getOrCreateTimeline(ctx context.Context, timelineID string) Timeline {
    247 	i, ok := m.timelines.Load(timelineID)
    248 	if ok {
    249 		// Timeline already existed in sync.Map.
    250 		return i.(Timeline) //nolint:forcetypeassert
    251 	}
    252 
    253 	// Timeline did not yet exist in sync.Map.
    254 	// Create + store it.
    255 	timeline := NewTimeline(ctx, timelineID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction)
    256 	m.timelines.Store(timelineID, timeline)
    257 
    258 	return timeline
    259 }