gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

thread.go (8918B)


      1 // GoToSocial
      2 // Copyright (C) GoToSocial Authors admin@gotosocial.org
      3 // SPDX-License-Identifier: AGPL-3.0-or-later
      4 //
      5 // This program is free software: you can redistribute it and/or modify
      6 // it under the terms of the GNU Affero General Public License as published by
      7 // the Free Software Foundation, either version 3 of the License, or
      8 // (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU Affero General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU Affero General Public License
     16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18 package dereferencing
     19 
     20 import (
     21 	"context"
     22 	"net/url"
     23 
     24 	"codeberg.org/gruf/go-kv"
     25 	"github.com/superseriousbusiness/activity/pub"
     26 	"github.com/superseriousbusiness/activity/streams/vocab"
     27 	"github.com/superseriousbusiness/gotosocial/internal/ap"
     28 	"github.com/superseriousbusiness/gotosocial/internal/config"
     29 	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
     30 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
     31 	"github.com/superseriousbusiness/gotosocial/internal/log"
     32 	"github.com/superseriousbusiness/gotosocial/internal/uris"
     33 )
     34 
     35 // maxIter defines how many iterations of descendants or
     36 // ancesters we are willing to follow before returning error.
     37 const maxIter = 1000
     38 
     39 // dereferenceThread will dereference statuses both above and below the given status in a thread, it returns no error and is intended to be called asychronously.
     40 func (d *deref) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) {
     41 	// Ensure that ancestors have been fully dereferenced
     42 	if err := d.dereferenceStatusAncestors(ctx, username, status); err != nil {
     43 		log.Error(ctx, err) // log entry and error will include caller prefixes
     44 	}
     45 
     46 	// Ensure that descendants have been fully dereferenced
     47 	if err := d.dereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil {
     48 		log.Error(ctx, err) // log entry and error will include caller prefixes
     49 	}
     50 }
     51 
     52 // dereferenceAncestors has the goal of reaching the oldest ancestor of a given status, and stashing all statuses along the way.
     53 func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string, status *gtsmodel.Status) error {
     54 	// Take ref to original
     55 	ogIRI := status.URI
     56 
     57 	// Start log entry with fields
     58 	l := log.WithContext(ctx).
     59 		WithFields(kv.Fields{
     60 			{"username", username},
     61 			{"statusIRI", ogIRI},
     62 		}...)
     63 
     64 	// Log function start
     65 	l.Trace("beginning")
     66 
     67 	for i := 0; i < maxIter; i++ {
     68 		if status.InReplyToURI == "" {
     69 			// status doesn't reply to anything
     70 			return nil
     71 		}
     72 
     73 		// Parse this status's replied IRI
     74 		replyIRI, err := url.Parse(status.InReplyToURI)
     75 		if err != nil {
     76 			return gtserror.Newf("invalid status InReplyToURI %q: %w", status.InReplyToURI, err)
     77 		}
     78 
     79 		if replyIRI.Host == config.GetHost() {
     80 			l.Tracef("following local status ancestors: %s", status.InReplyToURI)
     81 
     82 			// This is our status, extract ID from path
     83 			_, id, err := uris.ParseStatusesPath(replyIRI)
     84 			if err != nil {
     85 				return gtserror.Newf("invalid local status IRI %q: %w", status.InReplyToURI, err)
     86 			}
     87 
     88 			// Fetch this status from the database
     89 			localStatus, err := d.state.DB.GetStatusByID(ctx, id)
     90 			if err != nil {
     91 				return gtserror.Newf("error fetching local status %q: %w", id, err)
     92 			}
     93 
     94 			// Set the fetched status
     95 			status = localStatus
     96 
     97 		} else {
     98 			l.Tracef("following remote status ancestors: %s", status.InReplyToURI)
     99 
    100 			// Fetch the remote status found at this IRI
    101 			remoteStatus, _, err := d.getStatusByURI(
    102 				ctx,
    103 				username,
    104 				replyIRI,
    105 			)
    106 			if err != nil {
    107 				return gtserror.Newf("error fetching remote status %q: %w", status.InReplyToURI, err)
    108 			}
    109 
    110 			// Set the fetched status
    111 			status = remoteStatus
    112 		}
    113 	}
    114 
    115 	return gtserror.Newf("reached %d ancestor iterations for %q", maxIter, ogIRI)
    116 }
    117 
    118 func (d *deref) dereferenceStatusDescendants(ctx context.Context, username string, statusIRI *url.URL, parent ap.Statusable) error {
    119 	// Take ref to original
    120 	ogIRI := statusIRI
    121 
    122 	// Start log entry with fields
    123 	l := log.WithContext(ctx).
    124 		WithFields(kv.Fields{
    125 			{"username", username},
    126 			{"statusIRI", ogIRI},
    127 		}...)
    128 
    129 	// Log function start
    130 	l.Trace("beginning")
    131 
    132 	// frame represents a single stack frame when iteratively
    133 	// dereferencing status descendants. where statusIRI and
    134 	// statusable are of the status whose children we are to
    135 	// descend, page is the current activity streams collection
    136 	// page of entities we are on (as we often push a frame to
    137 	// stack mid-paging), and item___ are entity iterators for
    138 	// this activity streams collection page.
    139 	type frame struct {
    140 		statusIRI  *url.URL
    141 		statusable ap.Statusable
    142 		page       ap.CollectionPageable
    143 		itemIter   vocab.ActivityStreamsItemsPropertyIterator
    144 	}
    145 
    146 	var (
    147 		// current is the current stack frame
    148 		current *frame
    149 
    150 		// stack is a list of "shelved" descendand iterator
    151 		// frames. this is pushed to when a child status frame
    152 		// is found that we need to further iterate down, and
    153 		// popped from into 'current' when that child's tree
    154 		// of further descendants is exhausted.
    155 		stack = []*frame{
    156 			{
    157 				// Starting input is first frame
    158 				statusIRI:  statusIRI,
    159 				statusable: parent,
    160 			},
    161 		}
    162 
    163 		// popStack will remove and return the top frame
    164 		// from the stack, or nil if currently empty.
    165 		popStack = func() *frame {
    166 			if len(stack) == 0 {
    167 				return nil
    168 			}
    169 
    170 			// Get frame index
    171 			idx := len(stack) - 1
    172 
    173 			// Pop last frame
    174 			frame := stack[idx]
    175 			stack = stack[:idx]
    176 
    177 			return frame
    178 		}
    179 	)
    180 
    181 stackLoop:
    182 	for i := 0; i < maxIter; i++ {
    183 		// Pop next frame, nil means we are at end
    184 		if current = popStack(); current == nil {
    185 			return nil
    186 		}
    187 
    188 		if current.page == nil {
    189 			if current.statusIRI.Host == config.GetHost() {
    190 				// This is a local status, no looping to do
    191 				continue stackLoop
    192 			}
    193 
    194 			l.Tracef("following remote status descendants: %s", current.statusIRI)
    195 
    196 			// Look for an attached status replies (as collection)
    197 			replies := current.statusable.GetActivityStreamsReplies()
    198 			if replies == nil {
    199 				continue stackLoop
    200 			}
    201 
    202 			// Get the status replies collection
    203 			collection := replies.GetActivityStreamsCollection()
    204 			if collection == nil {
    205 				continue stackLoop
    206 			}
    207 
    208 			// Get the "first" property of the replies collection
    209 			first := collection.GetActivityStreamsFirst()
    210 			if first == nil {
    211 				continue stackLoop
    212 			}
    213 
    214 			// Set the first activity stream collection page
    215 			current.page = first.GetActivityStreamsCollectionPage()
    216 			if current.page == nil {
    217 				continue stackLoop
    218 			}
    219 		}
    220 
    221 	pageLoop:
    222 		for {
    223 			if current.itemIter == nil {
    224 				// Get the items associated with this page
    225 				items := current.page.GetActivityStreamsItems()
    226 				if items == nil {
    227 					continue stackLoop
    228 				}
    229 
    230 				// Start off the item iterator
    231 				current.itemIter = items.Begin()
    232 			}
    233 
    234 		itemLoop:
    235 			for {
    236 				// Check for remaining iter
    237 				if current.itemIter == nil {
    238 					break itemLoop
    239 				}
    240 
    241 				// Get current item iterator
    242 				itemIter := current.itemIter
    243 
    244 				// Set the next available iterator
    245 				current.itemIter = itemIter.Next()
    246 
    247 				// Check for available IRI on item
    248 				itemIRI, _ := pub.ToId(itemIter)
    249 				if itemIRI == nil {
    250 					continue itemLoop
    251 				}
    252 
    253 				if itemIRI.Host == config.GetHost() {
    254 					// This child is one of ours,
    255 					continue itemLoop
    256 				}
    257 
    258 				// Dereference the remote status and store in the database.
    259 				_, statusable, err := d.getStatusByURI(ctx, username, itemIRI)
    260 				if err != nil {
    261 					l.Errorf("error dereferencing remote status %s: %v", itemIRI, err)
    262 					continue itemLoop
    263 				}
    264 
    265 				if statusable == nil {
    266 					// Already up-to-date.
    267 					continue itemLoop
    268 				}
    269 
    270 				// Put current and next frame at top of stack
    271 				stack = append(stack, current, &frame{
    272 					statusIRI:  itemIRI,
    273 					statusable: statusable,
    274 				})
    275 
    276 				// Now start at top of loop
    277 				continue stackLoop
    278 			}
    279 
    280 			// Get the current page's "next" property
    281 			pageNext := current.page.GetActivityStreamsNext()
    282 			if pageNext == nil {
    283 				continue stackLoop
    284 			}
    285 
    286 			// Get the "next" page property IRI
    287 			pageNextIRI := pageNext.GetIRI()
    288 			if pageNextIRI == nil {
    289 				continue stackLoop
    290 			}
    291 
    292 			// Dereference this next collection page by its IRI
    293 			collectionPage, err := d.dereferenceCollectionPage(ctx,
    294 				username,
    295 				pageNextIRI,
    296 			)
    297 			if err != nil {
    298 				l.Errorf("error dereferencing remote collection page %q: %s", pageNextIRI.String(), err)
    299 				continue stackLoop
    300 			}
    301 
    302 			// Set the updated collection page
    303 			current.page = collectionPage
    304 			continue pageLoop
    305 		}
    306 	}
    307 
    308 	return gtserror.Newf("reached %d descendant iterations for %q", maxIter, ogIRI.String())
    309 }