federating_wrapped_callbacks.go (28036B)
1 package pub 2 3 import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "net/url" 8 9 "github.com/superseriousbusiness/activity/streams" 10 "github.com/superseriousbusiness/activity/streams/vocab" 11 ) 12 13 // OnFollowBehavior enumerates the different default actions that the go-fed 14 // library can provide when receiving a Follow Activity from a peer. 15 type OnFollowBehavior int 16 17 const ( 18 // OnFollowDoNothing does not take any action when a Follow Activity 19 // is received. 20 OnFollowDoNothing OnFollowBehavior = iota 21 // OnFollowAutomaticallyAccept triggers the side effect of sending an 22 // Accept of this Follow request in response. 23 OnFollowAutomaticallyAccept 24 // OnFollowAutomaticallyAccept triggers the side effect of sending a 25 // Reject of this Follow request in response. 26 OnFollowAutomaticallyReject 27 ) 28 29 // FederatingWrappedCallbacks lists the callback functions that already have 30 // some side effect behavior provided by the pub library. 31 // 32 // These functions are wrapped for the Federating Protocol. 33 type FederatingWrappedCallbacks struct { 34 // Create handles additional side effects for the Create ActivityStreams 35 // type, specific to the application using go-fed. 36 // 37 // The wrapping callback for the Federating Protocol ensures the 38 // 'object' property is created in the database. 39 // 40 // Create calls Create for each object in the federated Activity. 41 Create func(context.Context, vocab.ActivityStreamsCreate) error 42 // Update handles additional side effects for the Update ActivityStreams 43 // type, specific to the application using go-fed. 44 // 45 // The wrapping callback for the Federating Protocol ensures the 46 // 'object' property is updated in the database. 47 // 48 // Update calls Update on the federated entry from the database, with a 49 // new value. 50 Update func(context.Context, vocab.ActivityStreamsUpdate) error 51 // Delete handles additional side effects for the Delete ActivityStreams 52 // type, specific to the application using go-fed. 53 // 54 // Delete removes the federated entry from the database. 55 Delete func(context.Context, vocab.ActivityStreamsDelete) error 56 // Follow handles additional side effects for the Follow ActivityStreams 57 // type, specific to the application using go-fed. 58 // 59 // The wrapping function can have one of several default behaviors, 60 // depending on the value of the OnFollow setting. 61 Follow func(context.Context, vocab.ActivityStreamsFollow) error 62 // OnFollow determines what action to take for this particular callback 63 // if a Follow Activity is handled. 64 OnFollow OnFollowBehavior 65 // Accept handles additional side effects for the Accept ActivityStreams 66 // type, specific to the application using go-fed. 67 // 68 // The wrapping function determines if this 'Accept' is in response to a 69 // 'Follow'. If so, then the 'actor' is added to the original 'actor's 70 // 'following' collection. 71 // 72 // Otherwise, no side effects are done by go-fed. 73 Accept func(context.Context, vocab.ActivityStreamsAccept) error 74 // Reject handles additional side effects for the Reject ActivityStreams 75 // type, specific to the application using go-fed. 76 // 77 // The wrapping function has no default side effects. However, if this 78 // 'Reject' is in response to a 'Follow' then the client MUST NOT go 79 // forward with adding the 'actor' to the original 'actor's 'following' 80 // collection by the client application. 81 Reject func(context.Context, vocab.ActivityStreamsReject) error 82 // Add handles additional side effects for the Add ActivityStreams 83 // type, specific to the application using go-fed. 84 // 85 // The wrapping function will add the 'object' IRIs to a specific 86 // 'target' collection if the 'target' collection(s) live on this 87 // server. 88 Add func(context.Context, vocab.ActivityStreamsAdd) error 89 // Remove handles additional side effects for the Remove ActivityStreams 90 // type, specific to the application using go-fed. 91 // 92 // The wrapping function will remove all 'object' IRIs from a specific 93 // 'target' collection if the 'target' collection(s) live on this 94 // server. 95 Remove func(context.Context, vocab.ActivityStreamsRemove) error 96 // Like handles additional side effects for the Like ActivityStreams 97 // type, specific to the application using go-fed. 98 // 99 // The wrapping function will add the activity to the "likes" collection 100 // on all 'object' targets owned by this server. 101 Like func(context.Context, vocab.ActivityStreamsLike) error 102 // Announce handles additional side effects for the Announce 103 // ActivityStreams type, specific to the application using go-fed. 104 // 105 // The wrapping function will add the activity to the "shares" 106 // collection on all 'object' targets owned by this server. 107 Announce func(context.Context, vocab.ActivityStreamsAnnounce) error 108 // Undo handles additional side effects for the Undo ActivityStreams 109 // type, specific to the application using go-fed. 110 // 111 // The wrapping function ensures the 'actor' on the 'Undo' 112 // is be the same as the 'actor' on all Activities being undone. 113 // It enforces that the actors on the Undo must correspond to all of the 114 // 'object' actors in some manner. 115 // 116 // It is expected that the application will implement the proper 117 // reversal of activities that are being undone. 118 Undo func(context.Context, vocab.ActivityStreamsUndo) error 119 // Block handles additional side effects for the Block ActivityStreams 120 // type, specific to the application using go-fed. 121 // 122 // The wrapping function provides no default side effects. It simply 123 // calls the wrapped function. However, note that Blocks should not be 124 // received from a federated peer, as delivering Blocks explicitly 125 // deviates from the original ActivityPub specification. 126 Block func(context.Context, vocab.ActivityStreamsBlock) error 127 128 // Sidechannel data -- this is set at request handling time. These must 129 // be set before the callbacks are used. 130 131 // db is the Database the FederatingWrappedCallbacks should use. 132 db Database 133 // inboxIRI is the inboxIRI that is handling this callback. 134 inboxIRI *url.URL 135 // addNewIds creates new 'id' entries on an activity and its objects if 136 // it is a Create activity. 137 addNewIds func(c context.Context, activity Activity) error 138 // deliver delivers an outgoing message. 139 deliver func(c context.Context, outboxIRI *url.URL, activity Activity) error 140 // newTransport creates a new Transport. 141 newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error) 142 } 143 144 // callbacks returns the WrappedCallbacks members into a single interface slice 145 // for use in streams.Resolver callbacks. 146 // 147 // If the given functions have a type that collides with the default behavior, 148 // then disable our default behavior 149 func (w FederatingWrappedCallbacks) callbacks(fns []interface{}) []interface{} { 150 enableCreate := true 151 enableUpdate := true 152 enableDelete := true 153 enableFollow := true 154 enableAccept := true 155 enableReject := true 156 enableAdd := true 157 enableRemove := true 158 enableLike := true 159 enableAnnounce := true 160 enableUndo := true 161 enableBlock := true 162 for _, fn := range fns { 163 switch fn.(type) { 164 default: 165 continue 166 case func(context.Context, vocab.ActivityStreamsCreate) error: 167 enableCreate = false 168 case func(context.Context, vocab.ActivityStreamsUpdate) error: 169 enableUpdate = false 170 case func(context.Context, vocab.ActivityStreamsDelete) error: 171 enableDelete = false 172 case func(context.Context, vocab.ActivityStreamsFollow) error: 173 enableFollow = false 174 case func(context.Context, vocab.ActivityStreamsAccept) error: 175 enableAccept = false 176 case func(context.Context, vocab.ActivityStreamsReject) error: 177 enableReject = false 178 case func(context.Context, vocab.ActivityStreamsAdd) error: 179 enableAdd = false 180 case func(context.Context, vocab.ActivityStreamsRemove) error: 181 enableRemove = false 182 case func(context.Context, vocab.ActivityStreamsLike) error: 183 enableLike = false 184 case func(context.Context, vocab.ActivityStreamsAnnounce) error: 185 enableAnnounce = false 186 case func(context.Context, vocab.ActivityStreamsUndo) error: 187 enableUndo = false 188 case func(context.Context, vocab.ActivityStreamsBlock) error: 189 enableBlock = false 190 } 191 } 192 if enableCreate { 193 fns = append(fns, w.create) 194 } 195 if enableUpdate { 196 fns = append(fns, w.update) 197 } 198 if enableDelete { 199 fns = append(fns, w.deleteFn) 200 } 201 if enableFollow { 202 fns = append(fns, w.follow) 203 } 204 if enableAccept { 205 fns = append(fns, w.accept) 206 } 207 if enableReject { 208 fns = append(fns, w.reject) 209 } 210 if enableAdd { 211 fns = append(fns, w.add) 212 } 213 if enableRemove { 214 fns = append(fns, w.remove) 215 } 216 if enableLike { 217 fns = append(fns, w.like) 218 } 219 if enableAnnounce { 220 fns = append(fns, w.announce) 221 } 222 if enableUndo { 223 fns = append(fns, w.undo) 224 } 225 if enableBlock { 226 fns = append(fns, w.block) 227 } 228 return fns 229 } 230 231 // create implements the federating Create activity side effects. 232 func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error { 233 op := a.GetActivityStreamsObject() 234 if op == nil || op.Len() == 0 { 235 return ErrObjectRequired 236 } 237 // Create anonymous loop function to be able to properly scope the defer 238 // for the database lock at each iteration. 239 loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { 240 t := iter.GetType() 241 if t == nil && iter.IsIRI() { 242 // Attempt to dereference the IRI instead 243 tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent()) 244 if err != nil { 245 return err 246 } 247 b, err := tport.Dereference(c, iter.GetIRI()) 248 if err != nil { 249 return err 250 } 251 var m map[string]interface{} 252 if err = json.Unmarshal(b, &m); err != nil { 253 return err 254 } 255 t, err = streams.ToType(c, m) 256 if err != nil { 257 return err 258 } 259 } else if t == nil { 260 return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI") 261 } 262 id, err := GetId(t) 263 if err != nil { 264 return err 265 } 266 var unlock func() 267 unlock, err = w.db.Lock(c, id) 268 if err != nil { 269 return err 270 } 271 defer unlock() 272 if err := w.db.Create(c, t); err != nil { 273 return err 274 } 275 return nil 276 } 277 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 278 if err := loopFn(iter); err != nil { 279 return err 280 } 281 } 282 if w.Create != nil { 283 return w.Create(c, a) 284 } 285 return nil 286 } 287 288 // update implements the federating Update activity side effects. 289 func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error { 290 op := a.GetActivityStreamsObject() 291 if op == nil || op.Len() == 0 { 292 return ErrObjectRequired 293 } 294 if err := mustHaveActivityOriginMatchObjects(a); err != nil { 295 return err 296 } 297 // Create anonymous loop function to be able to properly scope the defer 298 // for the database lock at each iteration. 299 loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { 300 t := iter.GetType() 301 if t == nil { 302 return fmt.Errorf("update requires an object to be wholly provided") 303 } 304 id, err := GetId(t) 305 if err != nil { 306 return err 307 } 308 var unlock func() 309 unlock, err = w.db.Lock(c, id) 310 if err != nil { 311 return err 312 } 313 defer unlock() 314 if err := w.db.Update(c, t); err != nil { 315 return err 316 } 317 return nil 318 } 319 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 320 if err := loopFn(iter); err != nil { 321 return err 322 } 323 } 324 if w.Update != nil { 325 return w.Update(c, a) 326 } 327 return nil 328 } 329 330 // deleteFn implements the federating Delete activity side effects. 331 func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error { 332 op := a.GetActivityStreamsObject() 333 if op == nil || op.Len() == 0 { 334 return ErrObjectRequired 335 } 336 if err := mustHaveActivityOriginMatchObjects(a); err != nil { 337 return err 338 } 339 // Create anonymous loop function to be able to properly scope the defer 340 // for the database lock at each iteration. 341 loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { 342 id, err := ToId(iter) 343 if err != nil { 344 return err 345 } 346 var unlock func() 347 unlock, err = w.db.Lock(c, id) 348 if err != nil { 349 return err 350 } 351 defer unlock() 352 if err := w.db.Delete(c, id); err != nil { 353 return err 354 } 355 return nil 356 } 357 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 358 if err := loopFn(iter); err != nil { 359 return err 360 } 361 } 362 if w.Delete != nil { 363 return w.Delete(c, a) 364 } 365 return nil 366 } 367 368 // follow implements the federating Follow activity side effects. 369 func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error { 370 op := a.GetActivityStreamsObject() 371 if op == nil || op.Len() == 0 { 372 return ErrObjectRequired 373 } 374 // Check that we own at least one of the 'object' properties, and ensure 375 // it is to the actor that owns this inbox. 376 // 377 // If not then don't send a response. It was federated to us as an FYI, 378 // by mistake, or some other reason. 379 unlock, err := w.db.Lock(c, w.inboxIRI) 380 if err != nil { 381 return err 382 } 383 // WARNING: Unlock not deferred. 384 actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI) 385 unlock() // unlock even on error 386 if err != nil { 387 return err 388 } 389 // Unlock must be called by now and every branch above. 390 isMe := false 391 if w.OnFollow != OnFollowDoNothing { 392 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 393 id, err := ToId(iter) 394 if err != nil { 395 return err 396 } 397 if id.String() == actorIRI.String() { 398 isMe = true 399 break 400 } 401 } 402 } 403 if isMe { 404 // Prepare the response. 405 var response Activity 406 if w.OnFollow == OnFollowAutomaticallyAccept { 407 response = streams.NewActivityStreamsAccept() 408 } else if w.OnFollow == OnFollowAutomaticallyReject { 409 response = streams.NewActivityStreamsReject() 410 } else { 411 return fmt.Errorf("unknown OnFollowBehavior: %d", w.OnFollow) 412 } 413 // Set us as the 'actor'. 414 me := streams.NewActivityStreamsActorProperty() 415 response.SetActivityStreamsActor(me) 416 me.AppendIRI(actorIRI) 417 // Set the Follow as the 'object' property. 418 op := streams.NewActivityStreamsObjectProperty() 419 response.SetActivityStreamsObject(op) 420 op.AppendActivityStreamsFollow(a) 421 // Add all actors on the original Follow to the 'to' property. 422 recipients := make([]*url.URL, 0) 423 to := streams.NewActivityStreamsToProperty() 424 response.SetActivityStreamsTo(to) 425 followActors := a.GetActivityStreamsActor() 426 for iter := followActors.Begin(); iter != followActors.End(); iter = iter.Next() { 427 id, err := ToId(iter) 428 if err != nil { 429 return err 430 } 431 to.AppendIRI(id) 432 recipients = append(recipients, id) 433 } 434 if w.OnFollow == OnFollowAutomaticallyAccept { 435 // If automatically accepting, then also update our 436 // followers collection with the new actors. 437 // 438 // If automatically rejecting, do not update the 439 // followers collection. 440 unlock, err := w.db.Lock(c, actorIRI) 441 if err != nil { 442 return err 443 } 444 // WARNING: Unlock not deferred. 445 followers, err := w.db.Followers(c, actorIRI) 446 if err != nil { 447 unlock() 448 return err 449 } 450 items := followers.GetActivityStreamsItems() 451 if items == nil { 452 items = streams.NewActivityStreamsItemsProperty() 453 followers.SetActivityStreamsItems(items) 454 } 455 for _, elem := range recipients { 456 items.PrependIRI(elem) 457 } 458 err = w.db.Update(c, followers) 459 unlock() // unlock even on error 460 if err != nil { 461 return err 462 } 463 // Unlock must be called by now and every branch above. 464 } 465 // Lock without defer! 466 unlock, err := w.db.Lock(c, w.inboxIRI) 467 if err != nil { 468 return err 469 } 470 outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI) 471 unlock() // unlock after, regardless 472 if err != nil { 473 return err 474 } 475 // Everything must be unlocked by now. 476 if err := w.addNewIds(c, response); err != nil { 477 return err 478 } else if err := w.deliver(c, outboxIRI, response); err != nil { 479 return err 480 } 481 } 482 if w.Follow != nil { 483 return w.Follow(c, a) 484 } 485 return nil 486 } 487 488 // accept implements the federating Accept activity side effects. 489 func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivityStreamsAccept) error { 490 op := a.GetActivityStreamsObject() 491 if op != nil && op.Len() > 0 { 492 // Get this actor's id. 493 unlock, err := w.db.Lock(c, w.inboxIRI) 494 if err != nil { 495 return err 496 } 497 // WARNING: Unlock not deferred. 498 actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI) 499 unlock() // unlock after regardless 500 if err != nil { 501 return err 502 } 503 // Unlock must be called by now and every branch above. 504 // 505 // Determine if we are in a follow on the 'object' property. 506 // 507 // TODO: Handle Accept multiple Follow. 508 var maybeMyFollowIRI *url.URL 509 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 510 t := iter.GetType() 511 if t == nil && iter.IsIRI() { 512 // Attempt to dereference the IRI instead 513 tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent()) 514 if err != nil { 515 return err 516 } 517 b, err := tport.Dereference(c, iter.GetIRI()) 518 if err != nil { 519 return err 520 } 521 var m map[string]interface{} 522 if err = json.Unmarshal(b, &m); err != nil { 523 return err 524 } 525 t, err = streams.ToType(c, m) 526 if err != nil { 527 return err 528 } 529 } else if t == nil { 530 return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI") 531 } 532 // Ensure it is a Follow. 533 if !streams.IsOrExtendsActivityStreamsFollow(t) { 534 continue 535 } 536 follow, ok := t.(Activity) 537 if !ok { 538 return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface") 539 } 540 followId, err := GetId(follow) 541 if err != nil { 542 return err 543 } 544 // Ensure that we are one of the actors on the Follow. 545 actors := follow.GetActivityStreamsActor() 546 for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() { 547 id, err := ToId(iter) 548 if err != nil { 549 return err 550 } 551 if id.String() == actorIRI.String() { 552 maybeMyFollowIRI = followId 553 break 554 } 555 } 556 // Continue breaking if we found ourselves 557 if maybeMyFollowIRI != nil { 558 break 559 } 560 } 561 // If we received an Accept whose 'object' is a Follow with an 562 // Accept that we sent, add to the following collection. 563 if maybeMyFollowIRI != nil { 564 // Verify our Follow request exists and the peer didn't 565 // fabricate it. 566 activityActors := a.GetActivityStreamsActor() 567 if activityActors == nil || activityActors.Len() == 0 { 568 return fmt.Errorf("an Accept with a Follow has no actors") 569 } 570 // This may be a duplicate check if we dereferenced the 571 // Follow above. TODO: Separate this logic to avoid 572 // redundancy. 573 // 574 // Use an anonymous function to properly scope the 575 // database lock, immediately call it. 576 err = func() error { 577 unlock, err := w.db.Lock(c, maybeMyFollowIRI) 578 if err != nil { 579 return err 580 } 581 defer unlock() 582 t, err := w.db.Get(c, maybeMyFollowIRI) 583 if err != nil { 584 return err 585 } 586 if !streams.IsOrExtendsActivityStreamsFollow(t) { 587 return fmt.Errorf("peer gave an Accept wrapping a Follow but provided a non-Follow id") 588 } 589 follow, ok := t.(Activity) 590 if !ok { 591 return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface") 592 } 593 // Ensure that we are one of the actors on the Follow. 594 ok = false 595 actors := follow.GetActivityStreamsActor() 596 for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() { 597 id, err := ToId(iter) 598 if err != nil { 599 return err 600 } 601 if id.String() == actorIRI.String() { 602 ok = true 603 break 604 } 605 } 606 if !ok { 607 return fmt.Errorf("peer gave an Accept wrapping a Follow but we are not the actor on that Follow") 608 } 609 // Build map of original Accept actors 610 acceptActors := make(map[string]bool) 611 for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() { 612 id, err := ToId(iter) 613 if err != nil { 614 return err 615 } 616 acceptActors[id.String()] = false 617 } 618 // Verify all actor(s) were on the original Follow. 619 followObj := follow.GetActivityStreamsObject() 620 for iter := followObj.Begin(); iter != followObj.End(); iter = iter.Next() { 621 id, err := ToId(iter) 622 if err != nil { 623 return err 624 } 625 if _, ok := acceptActors[id.String()]; ok { 626 acceptActors[id.String()] = true 627 } 628 } 629 for _, found := range acceptActors { 630 if !found { 631 return fmt.Errorf("peer gave an Accept wrapping a Follow but was not an object in the original Follow") 632 } 633 } 634 return nil 635 }() 636 if err != nil { 637 return err 638 } 639 // Add the peer to our following collection. 640 unlock, err := w.db.Lock(c, actorIRI) 641 if err != nil { 642 return err 643 } 644 // WARNING: Unlock not deferred. 645 following, err := w.db.Following(c, actorIRI) 646 if err != nil { 647 unlock() 648 return err 649 } 650 items := following.GetActivityStreamsItems() 651 if items == nil { 652 items = streams.NewActivityStreamsItemsProperty() 653 following.SetActivityStreamsItems(items) 654 } 655 for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() { 656 id, err := ToId(iter) 657 if err != nil { 658 unlock() 659 return err 660 } 661 items.PrependIRI(id) 662 } 663 err = w.db.Update(c, following) 664 unlock() // unlock after regardless 665 if err != nil { 666 return err 667 } 668 // Unlock must be called by now and every branch above. 669 } 670 } 671 if w.Accept != nil { 672 return w.Accept(c, a) 673 } 674 return nil 675 } 676 677 // reject implements the federating Reject activity side effects. 678 func (w FederatingWrappedCallbacks) reject(c context.Context, a vocab.ActivityStreamsReject) error { 679 if w.Reject != nil { 680 return w.Reject(c, a) 681 } 682 return nil 683 } 684 685 // add implements the federating Add activity side effects. 686 func (w FederatingWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error { 687 op := a.GetActivityStreamsObject() 688 if op == nil || op.Len() == 0 { 689 return ErrObjectRequired 690 } 691 target := a.GetActivityStreamsTarget() 692 if target == nil || target.Len() == 0 { 693 return ErrTargetRequired 694 } 695 if err := add(c, op, target, w.db); err != nil { 696 return err 697 } 698 if w.Add != nil { 699 return w.Add(c, a) 700 } 701 return nil 702 } 703 704 // remove implements the federating Remove activity side effects. 705 func (w FederatingWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error { 706 op := a.GetActivityStreamsObject() 707 if op == nil || op.Len() == 0 { 708 return ErrObjectRequired 709 } 710 target := a.GetActivityStreamsTarget() 711 if target == nil || target.Len() == 0 { 712 return ErrTargetRequired 713 } 714 if err := remove(c, op, target, w.db); err != nil { 715 return err 716 } 717 if w.Remove != nil { 718 return w.Remove(c, a) 719 } 720 return nil 721 } 722 723 // like implements the federating Like activity side effects. 724 func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error { 725 op := a.GetActivityStreamsObject() 726 if op == nil || op.Len() == 0 { 727 return ErrObjectRequired 728 } 729 id, err := GetId(a) 730 if err != nil { 731 return err 732 } 733 // Create anonymous loop function to be able to properly scope the defer 734 // for the database lock at each iteration. 735 loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { 736 objId, err := ToId(iter) 737 if err != nil { 738 return err 739 } 740 unlock, err := w.db.Lock(c, objId) 741 if err != nil { 742 return err 743 } 744 defer unlock() 745 if owns, err := w.db.Owns(c, objId); err != nil { 746 return err 747 } else if !owns { 748 return nil 749 } 750 t, err := w.db.Get(c, objId) 751 if err != nil { 752 return err 753 } 754 l, ok := t.(likeser) 755 if !ok { 756 return fmt.Errorf("cannot add Like to likes collection for type %T", t) 757 } 758 // Get 'likes' property on the object, creating default if 759 // necessary. 760 likes := l.GetActivityStreamsLikes() 761 if likes == nil { 762 likes = streams.NewActivityStreamsLikesProperty() 763 l.SetActivityStreamsLikes(likes) 764 } 765 // Get 'likes' value, defaulting to a collection. 766 likesT := likes.GetType() 767 if likesT == nil { 768 col := streams.NewActivityStreamsCollection() 769 likesT = col 770 likes.SetActivityStreamsCollection(col) 771 } 772 // Prepend the activity's 'id' on the 'likes' Collection or 773 // OrderedCollection. 774 if col, ok := likesT.(itemser); ok { 775 items := col.GetActivityStreamsItems() 776 if items == nil { 777 items = streams.NewActivityStreamsItemsProperty() 778 col.SetActivityStreamsItems(items) 779 } 780 items.PrependIRI(id) 781 } else if oCol, ok := likesT.(orderedItemser); ok { 782 oItems := oCol.GetActivityStreamsOrderedItems() 783 if oItems == nil { 784 oItems = streams.NewActivityStreamsOrderedItemsProperty() 785 oCol.SetActivityStreamsOrderedItems(oItems) 786 } 787 oItems.PrependIRI(id) 788 } else { 789 return fmt.Errorf("likes type is neither a Collection nor an OrderedCollection: %T", likesT) 790 } 791 err = w.db.Update(c, t) 792 if err != nil { 793 return err 794 } 795 return nil 796 } 797 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 798 if err := loopFn(iter); err != nil { 799 return err 800 } 801 } 802 if w.Like != nil { 803 return w.Like(c, a) 804 } 805 return nil 806 } 807 808 // announce implements the federating Announce activity side effects. 809 func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.ActivityStreamsAnnounce) error { 810 id, err := GetId(a) 811 if err != nil { 812 return err 813 } 814 op := a.GetActivityStreamsObject() 815 // Create anonymous loop function to be able to properly scope the defer 816 // for the database lock at each iteration. 817 loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { 818 objId, err := ToId(iter) 819 if err != nil { 820 return err 821 } 822 unlock, err := w.db.Lock(c, objId) 823 if err != nil { 824 return err 825 } 826 defer unlock() 827 if owns, err := w.db.Owns(c, objId); err != nil { 828 return err 829 } else if !owns { 830 return nil 831 } 832 t, err := w.db.Get(c, objId) 833 if err != nil { 834 return err 835 } 836 s, ok := t.(shareser) 837 if !ok { 838 return fmt.Errorf("cannot add Announce to Shares collection for type %T", t) 839 } 840 // Get 'shares' property on the object, creating default if 841 // necessary. 842 shares := s.GetActivityStreamsShares() 843 if shares == nil { 844 shares = streams.NewActivityStreamsSharesProperty() 845 s.SetActivityStreamsShares(shares) 846 } 847 // Get 'shares' value, defaulting to a collection. 848 sharesT := shares.GetType() 849 if sharesT == nil { 850 col := streams.NewActivityStreamsCollection() 851 sharesT = col 852 shares.SetActivityStreamsCollection(col) 853 } 854 // Prepend the activity's 'id' on the 'shares' Collection or 855 // OrderedCollection. 856 if col, ok := sharesT.(itemser); ok { 857 items := col.GetActivityStreamsItems() 858 if items == nil { 859 items = streams.NewActivityStreamsItemsProperty() 860 col.SetActivityStreamsItems(items) 861 } 862 items.PrependIRI(id) 863 } else if oCol, ok := sharesT.(orderedItemser); ok { 864 oItems := oCol.GetActivityStreamsOrderedItems() 865 if oItems == nil { 866 oItems = streams.NewActivityStreamsOrderedItemsProperty() 867 oCol.SetActivityStreamsOrderedItems(oItems) 868 } 869 oItems.PrependIRI(id) 870 } else { 871 return fmt.Errorf("shares type is neither a Collection nor an OrderedCollection: %T", sharesT) 872 } 873 err = w.db.Update(c, t) 874 if err != nil { 875 return err 876 } 877 return nil 878 } 879 if op != nil { 880 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 881 if err := loopFn(iter); err != nil { 882 return err 883 } 884 } 885 } 886 if w.Announce != nil { 887 return w.Announce(c, a) 888 } 889 return nil 890 } 891 892 // undo implements the federating Undo activity side effects. 893 func (w FederatingWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error { 894 op := a.GetActivityStreamsObject() 895 if op == nil || op.Len() == 0 { 896 return ErrObjectRequired 897 } 898 actors := a.GetActivityStreamsActor() 899 if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.inboxIRI); err != nil { 900 return err 901 } 902 if w.Undo != nil { 903 return w.Undo(c, a) 904 } 905 return nil 906 } 907 908 // block implements the federating Block activity side effects. 909 func (w FederatingWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error { 910 op := a.GetActivityStreamsObject() 911 if op == nil || op.Len() == 0 { 912 return ErrObjectRequired 913 } 914 if w.Block != nil { 915 return w.Block(c, a) 916 } 917 return nil 918 }