thread.go (8918B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package dereferencing 19 20 import ( 21 "context" 22 "net/url" 23 24 "codeberg.org/gruf/go-kv" 25 "github.com/superseriousbusiness/activity/pub" 26 "github.com/superseriousbusiness/activity/streams/vocab" 27 "github.com/superseriousbusiness/gotosocial/internal/ap" 28 "github.com/superseriousbusiness/gotosocial/internal/config" 29 "github.com/superseriousbusiness/gotosocial/internal/gtserror" 30 "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" 31 "github.com/superseriousbusiness/gotosocial/internal/log" 32 "github.com/superseriousbusiness/gotosocial/internal/uris" 33 ) 34 35 // maxIter defines how many iterations of descendants or 36 // ancesters we are willing to follow before returning error. 37 const maxIter = 1000 38 39 // dereferenceThread will dereference statuses both above and below the given status in a thread, it returns no error and is intended to be called asychronously. 40 func (d *deref) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) { 41 // Ensure that ancestors have been fully dereferenced 42 if err := d.dereferenceStatusAncestors(ctx, username, status); err != nil { 43 log.Error(ctx, err) // log entry and error will include caller prefixes 44 } 45 46 // Ensure that descendants have been fully dereferenced 47 if err := d.dereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil { 48 log.Error(ctx, err) // log entry and error will include caller prefixes 49 } 50 } 51 52 // dereferenceAncestors has the goal of reaching the oldest ancestor of a given status, and stashing all statuses along the way. 53 func (d *deref) dereferenceStatusAncestors(ctx context.Context, username string, status *gtsmodel.Status) error { 54 // Take ref to original 55 ogIRI := status.URI 56 57 // Start log entry with fields 58 l := log.WithContext(ctx). 59 WithFields(kv.Fields{ 60 {"username", username}, 61 {"statusIRI", ogIRI}, 62 }...) 63 64 // Log function start 65 l.Trace("beginning") 66 67 for i := 0; i < maxIter; i++ { 68 if status.InReplyToURI == "" { 69 // status doesn't reply to anything 70 return nil 71 } 72 73 // Parse this status's replied IRI 74 replyIRI, err := url.Parse(status.InReplyToURI) 75 if err != nil { 76 return gtserror.Newf("invalid status InReplyToURI %q: %w", status.InReplyToURI, err) 77 } 78 79 if replyIRI.Host == config.GetHost() { 80 l.Tracef("following local status ancestors: %s", status.InReplyToURI) 81 82 // This is our status, extract ID from path 83 _, id, err := uris.ParseStatusesPath(replyIRI) 84 if err != nil { 85 return gtserror.Newf("invalid local status IRI %q: %w", status.InReplyToURI, err) 86 } 87 88 // Fetch this status from the database 89 localStatus, err := d.state.DB.GetStatusByID(ctx, id) 90 if err != nil { 91 return gtserror.Newf("error fetching local status %q: %w", id, err) 92 } 93 94 // Set the fetched status 95 status = localStatus 96 97 } else { 98 l.Tracef("following remote status ancestors: %s", status.InReplyToURI) 99 100 // Fetch the remote status found at this IRI 101 remoteStatus, _, err := d.getStatusByURI( 102 ctx, 103 username, 104 replyIRI, 105 ) 106 if err != nil { 107 return gtserror.Newf("error fetching remote status %q: %w", status.InReplyToURI, err) 108 } 109 110 // Set the fetched status 111 status = remoteStatus 112 } 113 } 114 115 return gtserror.Newf("reached %d ancestor iterations for %q", maxIter, ogIRI) 116 } 117 118 func (d *deref) dereferenceStatusDescendants(ctx context.Context, username string, statusIRI *url.URL, parent ap.Statusable) error { 119 // Take ref to original 120 ogIRI := statusIRI 121 122 // Start log entry with fields 123 l := log.WithContext(ctx). 124 WithFields(kv.Fields{ 125 {"username", username}, 126 {"statusIRI", ogIRI}, 127 }...) 128 129 // Log function start 130 l.Trace("beginning") 131 132 // frame represents a single stack frame when iteratively 133 // dereferencing status descendants. where statusIRI and 134 // statusable are of the status whose children we are to 135 // descend, page is the current activity streams collection 136 // page of entities we are on (as we often push a frame to 137 // stack mid-paging), and item___ are entity iterators for 138 // this activity streams collection page. 139 type frame struct { 140 statusIRI *url.URL 141 statusable ap.Statusable 142 page ap.CollectionPageable 143 itemIter vocab.ActivityStreamsItemsPropertyIterator 144 } 145 146 var ( 147 // current is the current stack frame 148 current *frame 149 150 // stack is a list of "shelved" descendand iterator 151 // frames. this is pushed to when a child status frame 152 // is found that we need to further iterate down, and 153 // popped from into 'current' when that child's tree 154 // of further descendants is exhausted. 155 stack = []*frame{ 156 { 157 // Starting input is first frame 158 statusIRI: statusIRI, 159 statusable: parent, 160 }, 161 } 162 163 // popStack will remove and return the top frame 164 // from the stack, or nil if currently empty. 165 popStack = func() *frame { 166 if len(stack) == 0 { 167 return nil 168 } 169 170 // Get frame index 171 idx := len(stack) - 1 172 173 // Pop last frame 174 frame := stack[idx] 175 stack = stack[:idx] 176 177 return frame 178 } 179 ) 180 181 stackLoop: 182 for i := 0; i < maxIter; i++ { 183 // Pop next frame, nil means we are at end 184 if current = popStack(); current == nil { 185 return nil 186 } 187 188 if current.page == nil { 189 if current.statusIRI.Host == config.GetHost() { 190 // This is a local status, no looping to do 191 continue stackLoop 192 } 193 194 l.Tracef("following remote status descendants: %s", current.statusIRI) 195 196 // Look for an attached status replies (as collection) 197 replies := current.statusable.GetActivityStreamsReplies() 198 if replies == nil { 199 continue stackLoop 200 } 201 202 // Get the status replies collection 203 collection := replies.GetActivityStreamsCollection() 204 if collection == nil { 205 continue stackLoop 206 } 207 208 // Get the "first" property of the replies collection 209 first := collection.GetActivityStreamsFirst() 210 if first == nil { 211 continue stackLoop 212 } 213 214 // Set the first activity stream collection page 215 current.page = first.GetActivityStreamsCollectionPage() 216 if current.page == nil { 217 continue stackLoop 218 } 219 } 220 221 pageLoop: 222 for { 223 if current.itemIter == nil { 224 // Get the items associated with this page 225 items := current.page.GetActivityStreamsItems() 226 if items == nil { 227 continue stackLoop 228 } 229 230 // Start off the item iterator 231 current.itemIter = items.Begin() 232 } 233 234 itemLoop: 235 for { 236 // Check for remaining iter 237 if current.itemIter == nil { 238 break itemLoop 239 } 240 241 // Get current item iterator 242 itemIter := current.itemIter 243 244 // Set the next available iterator 245 current.itemIter = itemIter.Next() 246 247 // Check for available IRI on item 248 itemIRI, _ := pub.ToId(itemIter) 249 if itemIRI == nil { 250 continue itemLoop 251 } 252 253 if itemIRI.Host == config.GetHost() { 254 // This child is one of ours, 255 continue itemLoop 256 } 257 258 // Dereference the remote status and store in the database. 259 _, statusable, err := d.getStatusByURI(ctx, username, itemIRI) 260 if err != nil { 261 l.Errorf("error dereferencing remote status %s: %v", itemIRI, err) 262 continue itemLoop 263 } 264 265 if statusable == nil { 266 // Already up-to-date. 267 continue itemLoop 268 } 269 270 // Put current and next frame at top of stack 271 stack = append(stack, current, &frame{ 272 statusIRI: itemIRI, 273 statusable: statusable, 274 }) 275 276 // Now start at top of loop 277 continue stackLoop 278 } 279 280 // Get the current page's "next" property 281 pageNext := current.page.GetActivityStreamsNext() 282 if pageNext == nil { 283 continue stackLoop 284 } 285 286 // Get the "next" page property IRI 287 pageNextIRI := pageNext.GetIRI() 288 if pageNextIRI == nil { 289 continue stackLoop 290 } 291 292 // Dereference this next collection page by its IRI 293 collectionPage, err := d.dereferenceCollectionPage(ctx, 294 username, 295 pageNextIRI, 296 ) 297 if err != nil { 298 l.Errorf("error dereferencing remote collection page %q: %s", pageNextIRI.String(), err) 299 continue stackLoop 300 } 301 302 // Set the updated collection page 303 current.page = collectionPage 304 continue pageLoop 305 } 306 } 307 308 return gtserror.Newf("reached %d descendant iterations for %q", maxIter, ogIRI.String()) 309 }