gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

federating_wrapped_callbacks.go (28036B)


      1 package pub
      2 
      3 import (
      4 	"context"
      5 	"encoding/json"
      6 	"fmt"
      7 	"net/url"
      8 
      9 	"github.com/superseriousbusiness/activity/streams"
     10 	"github.com/superseriousbusiness/activity/streams/vocab"
     11 )
     12 
     13 // OnFollowBehavior enumerates the different default actions that the go-fed
     14 // library can provide when receiving a Follow Activity from a peer.
     15 type OnFollowBehavior int
     16 
     17 const (
     18 	// OnFollowDoNothing does not take any action when a Follow Activity
     19 	// is received.
     20 	OnFollowDoNothing OnFollowBehavior = iota
     21 	// OnFollowAutomaticallyAccept triggers the side effect of sending an
     22 	// Accept of this Follow request in response.
     23 	OnFollowAutomaticallyAccept
     24 	// OnFollowAutomaticallyAccept triggers the side effect of sending a
     25 	// Reject of this Follow request in response.
     26 	OnFollowAutomaticallyReject
     27 )
     28 
     29 // FederatingWrappedCallbacks lists the callback functions that already have
     30 // some side effect behavior provided by the pub library.
     31 //
     32 // These functions are wrapped for the Federating Protocol.
     33 type FederatingWrappedCallbacks struct {
     34 	// Create handles additional side effects for the Create ActivityStreams
     35 	// type, specific to the application using go-fed.
     36 	//
     37 	// The wrapping callback for the Federating Protocol ensures the
     38 	// 'object' property is created in the database.
     39 	//
     40 	// Create calls Create for each object in the federated Activity.
     41 	Create func(context.Context, vocab.ActivityStreamsCreate) error
     42 	// Update handles additional side effects for the Update ActivityStreams
     43 	// type, specific to the application using go-fed.
     44 	//
     45 	// The wrapping callback for the Federating Protocol ensures the
     46 	// 'object' property is updated in the database.
     47 	//
     48 	// Update calls Update on the federated entry from the database, with a
     49 	// new value.
     50 	Update func(context.Context, vocab.ActivityStreamsUpdate) error
     51 	// Delete handles additional side effects for the Delete ActivityStreams
     52 	// type, specific to the application using go-fed.
     53 	//
     54 	// Delete removes the federated entry from the database.
     55 	Delete func(context.Context, vocab.ActivityStreamsDelete) error
     56 	// Follow handles additional side effects for the Follow ActivityStreams
     57 	// type, specific to the application using go-fed.
     58 	//
     59 	// The wrapping function can have one of several default behaviors,
     60 	// depending on the value of the OnFollow setting.
     61 	Follow func(context.Context, vocab.ActivityStreamsFollow) error
     62 	// OnFollow determines what action to take for this particular callback
     63 	// if a Follow Activity is handled.
     64 	OnFollow OnFollowBehavior
     65 	// Accept handles additional side effects for the Accept ActivityStreams
     66 	// type, specific to the application using go-fed.
     67 	//
     68 	// The wrapping function determines if this 'Accept' is in response to a
     69 	// 'Follow'. If so, then the 'actor' is added to the original 'actor's
     70 	// 'following' collection.
     71 	//
     72 	// Otherwise, no side effects are done by go-fed.
     73 	Accept func(context.Context, vocab.ActivityStreamsAccept) error
     74 	// Reject handles additional side effects for the Reject ActivityStreams
     75 	// type, specific to the application using go-fed.
     76 	//
     77 	// The wrapping function has no default side effects. However, if this
     78 	// 'Reject' is in response to a 'Follow' then the client MUST NOT go
     79 	// forward with adding the 'actor' to the original 'actor's 'following'
     80 	// collection by the client application.
     81 	Reject func(context.Context, vocab.ActivityStreamsReject) error
     82 	// Add handles additional side effects for the Add ActivityStreams
     83 	// type, specific to the application using go-fed.
     84 	//
     85 	// The wrapping function will add the 'object' IRIs to a specific
     86 	// 'target' collection if the 'target' collection(s) live on this
     87 	// server.
     88 	Add func(context.Context, vocab.ActivityStreamsAdd) error
     89 	// Remove handles additional side effects for the Remove ActivityStreams
     90 	// type, specific to the application using go-fed.
     91 	//
     92 	// The wrapping function will remove all 'object' IRIs from a specific
     93 	// 'target' collection if the 'target' collection(s) live on this
     94 	// server.
     95 	Remove func(context.Context, vocab.ActivityStreamsRemove) error
     96 	// Like handles additional side effects for the Like ActivityStreams
     97 	// type, specific to the application using go-fed.
     98 	//
     99 	// The wrapping function will add the activity to the "likes" collection
    100 	// on all 'object' targets owned by this server.
    101 	Like func(context.Context, vocab.ActivityStreamsLike) error
    102 	// Announce handles additional side effects for the Announce
    103 	// ActivityStreams type, specific to the application using go-fed.
    104 	//
    105 	// The wrapping function will add the activity to the "shares"
    106 	// collection on all 'object' targets owned by this server.
    107 	Announce func(context.Context, vocab.ActivityStreamsAnnounce) error
    108 	// Undo handles additional side effects for the Undo ActivityStreams
    109 	// type, specific to the application using go-fed.
    110 	//
    111 	// The wrapping function ensures the 'actor' on the 'Undo'
    112 	// is be the same as the 'actor' on all Activities being undone.
    113 	// It enforces that the actors on the Undo must correspond to all of the
    114 	// 'object' actors in some manner.
    115 	//
    116 	// It is expected that the application will implement the proper
    117 	// reversal of activities that are being undone.
    118 	Undo func(context.Context, vocab.ActivityStreamsUndo) error
    119 	// Block handles additional side effects for the Block ActivityStreams
    120 	// type, specific to the application using go-fed.
    121 	//
    122 	// The wrapping function provides no default side effects. It simply
    123 	// calls the wrapped function. However, note that Blocks should not be
    124 	// received from a federated peer, as delivering Blocks explicitly
    125 	// deviates from the original ActivityPub specification.
    126 	Block func(context.Context, vocab.ActivityStreamsBlock) error
    127 
    128 	// Sidechannel data -- this is set at request handling time. These must
    129 	// be set before the callbacks are used.
    130 
    131 	// db is the Database the FederatingWrappedCallbacks should use.
    132 	db Database
    133 	// inboxIRI is the inboxIRI that is handling this callback.
    134 	inboxIRI *url.URL
    135 	// addNewIds creates new 'id' entries on an activity and its objects if
    136 	// it is a Create activity.
    137 	addNewIds func(c context.Context, activity Activity) error
    138 	// deliver delivers an outgoing message.
    139 	deliver func(c context.Context, outboxIRI *url.URL, activity Activity) error
    140 	// newTransport creates a new Transport.
    141 	newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
    142 }
    143 
    144 // callbacks returns the WrappedCallbacks members into a single interface slice
    145 // for use in streams.Resolver callbacks.
    146 //
    147 // If the given functions have a type that collides with the default behavior,
    148 // then disable our default behavior
    149 func (w FederatingWrappedCallbacks) callbacks(fns []interface{}) []interface{} {
    150 	enableCreate := true
    151 	enableUpdate := true
    152 	enableDelete := true
    153 	enableFollow := true
    154 	enableAccept := true
    155 	enableReject := true
    156 	enableAdd := true
    157 	enableRemove := true
    158 	enableLike := true
    159 	enableAnnounce := true
    160 	enableUndo := true
    161 	enableBlock := true
    162 	for _, fn := range fns {
    163 		switch fn.(type) {
    164 		default:
    165 			continue
    166 		case func(context.Context, vocab.ActivityStreamsCreate) error:
    167 			enableCreate = false
    168 		case func(context.Context, vocab.ActivityStreamsUpdate) error:
    169 			enableUpdate = false
    170 		case func(context.Context, vocab.ActivityStreamsDelete) error:
    171 			enableDelete = false
    172 		case func(context.Context, vocab.ActivityStreamsFollow) error:
    173 			enableFollow = false
    174 		case func(context.Context, vocab.ActivityStreamsAccept) error:
    175 			enableAccept = false
    176 		case func(context.Context, vocab.ActivityStreamsReject) error:
    177 			enableReject = false
    178 		case func(context.Context, vocab.ActivityStreamsAdd) error:
    179 			enableAdd = false
    180 		case func(context.Context, vocab.ActivityStreamsRemove) error:
    181 			enableRemove = false
    182 		case func(context.Context, vocab.ActivityStreamsLike) error:
    183 			enableLike = false
    184 		case func(context.Context, vocab.ActivityStreamsAnnounce) error:
    185 			enableAnnounce = false
    186 		case func(context.Context, vocab.ActivityStreamsUndo) error:
    187 			enableUndo = false
    188 		case func(context.Context, vocab.ActivityStreamsBlock) error:
    189 			enableBlock = false
    190 		}
    191 	}
    192 	if enableCreate {
    193 		fns = append(fns, w.create)
    194 	}
    195 	if enableUpdate {
    196 		fns = append(fns, w.update)
    197 	}
    198 	if enableDelete {
    199 		fns = append(fns, w.deleteFn)
    200 	}
    201 	if enableFollow {
    202 		fns = append(fns, w.follow)
    203 	}
    204 	if enableAccept {
    205 		fns = append(fns, w.accept)
    206 	}
    207 	if enableReject {
    208 		fns = append(fns, w.reject)
    209 	}
    210 	if enableAdd {
    211 		fns = append(fns, w.add)
    212 	}
    213 	if enableRemove {
    214 		fns = append(fns, w.remove)
    215 	}
    216 	if enableLike {
    217 		fns = append(fns, w.like)
    218 	}
    219 	if enableAnnounce {
    220 		fns = append(fns, w.announce)
    221 	}
    222 	if enableUndo {
    223 		fns = append(fns, w.undo)
    224 	}
    225 	if enableBlock {
    226 		fns = append(fns, w.block)
    227 	}
    228 	return fns
    229 }
    230 
    231 // create implements the federating Create activity side effects.
    232 func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error {
    233 	op := a.GetActivityStreamsObject()
    234 	if op == nil || op.Len() == 0 {
    235 		return ErrObjectRequired
    236 	}
    237 	// Create anonymous loop function to be able to properly scope the defer
    238 	// for the database lock at each iteration.
    239 	loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
    240 		t := iter.GetType()
    241 		if t == nil && iter.IsIRI() {
    242 			// Attempt to dereference the IRI instead
    243 			tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
    244 			if err != nil {
    245 				return err
    246 			}
    247 			b, err := tport.Dereference(c, iter.GetIRI())
    248 			if err != nil {
    249 				return err
    250 			}
    251 			var m map[string]interface{}
    252 			if err = json.Unmarshal(b, &m); err != nil {
    253 				return err
    254 			}
    255 			t, err = streams.ToType(c, m)
    256 			if err != nil {
    257 				return err
    258 			}
    259 		} else if t == nil {
    260 			return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
    261 		}
    262 		id, err := GetId(t)
    263 		if err != nil {
    264 			return err
    265 		}
    266 		var unlock func()
    267 		unlock, err = w.db.Lock(c, id)
    268 		if err != nil {
    269 			return err
    270 		}
    271 		defer unlock()
    272 		if err := w.db.Create(c, t); err != nil {
    273 			return err
    274 		}
    275 		return nil
    276 	}
    277 	for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    278 		if err := loopFn(iter); err != nil {
    279 			return err
    280 		}
    281 	}
    282 	if w.Create != nil {
    283 		return w.Create(c, a)
    284 	}
    285 	return nil
    286 }
    287 
    288 // update implements the federating Update activity side effects.
    289 func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error {
    290 	op := a.GetActivityStreamsObject()
    291 	if op == nil || op.Len() == 0 {
    292 		return ErrObjectRequired
    293 	}
    294 	if err := mustHaveActivityOriginMatchObjects(a); err != nil {
    295 		return err
    296 	}
    297 	// Create anonymous loop function to be able to properly scope the defer
    298 	// for the database lock at each iteration.
    299 	loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
    300 		t := iter.GetType()
    301 		if t == nil {
    302 			return fmt.Errorf("update requires an object to be wholly provided")
    303 		}
    304 		id, err := GetId(t)
    305 		if err != nil {
    306 			return err
    307 		}
    308 		var unlock func()
    309 		unlock, err = w.db.Lock(c, id)
    310 		if err != nil {
    311 			return err
    312 		}
    313 		defer unlock()
    314 		if err := w.db.Update(c, t); err != nil {
    315 			return err
    316 		}
    317 		return nil
    318 	}
    319 	for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    320 		if err := loopFn(iter); err != nil {
    321 			return err
    322 		}
    323 	}
    324 	if w.Update != nil {
    325 		return w.Update(c, a)
    326 	}
    327 	return nil
    328 }
    329 
    330 // deleteFn implements the federating Delete activity side effects.
    331 func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error {
    332 	op := a.GetActivityStreamsObject()
    333 	if op == nil || op.Len() == 0 {
    334 		return ErrObjectRequired
    335 	}
    336 	if err := mustHaveActivityOriginMatchObjects(a); err != nil {
    337 		return err
    338 	}
    339 	// Create anonymous loop function to be able to properly scope the defer
    340 	// for the database lock at each iteration.
    341 	loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
    342 		id, err := ToId(iter)
    343 		if err != nil {
    344 			return err
    345 		}
    346 		var unlock func()
    347 		unlock, err = w.db.Lock(c, id)
    348 		if err != nil {
    349 			return err
    350 		}
    351 		defer unlock()
    352 		if err := w.db.Delete(c, id); err != nil {
    353 			return err
    354 		}
    355 		return nil
    356 	}
    357 	for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    358 		if err := loopFn(iter); err != nil {
    359 			return err
    360 		}
    361 	}
    362 	if w.Delete != nil {
    363 		return w.Delete(c, a)
    364 	}
    365 	return nil
    366 }
    367 
    368 // follow implements the federating Follow activity side effects.
    369 func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error {
    370 	op := a.GetActivityStreamsObject()
    371 	if op == nil || op.Len() == 0 {
    372 		return ErrObjectRequired
    373 	}
    374 	// Check that we own at least one of the 'object' properties, and ensure
    375 	// it is to the actor that owns this inbox.
    376 	//
    377 	// If not then don't send a response. It was federated to us as an FYI,
    378 	// by mistake, or some other reason.
    379 	unlock, err := w.db.Lock(c, w.inboxIRI)
    380 	if err != nil {
    381 		return err
    382 	}
    383 	// WARNING: Unlock not deferred.
    384 	actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
    385 	unlock() // unlock even on error
    386 	if err != nil {
    387 		return err
    388 	}
    389 	// Unlock must be called by now and every branch above.
    390 	isMe := false
    391 	if w.OnFollow != OnFollowDoNothing {
    392 		for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    393 			id, err := ToId(iter)
    394 			if err != nil {
    395 				return err
    396 			}
    397 			if id.String() == actorIRI.String() {
    398 				isMe = true
    399 				break
    400 			}
    401 		}
    402 	}
    403 	if isMe {
    404 		// Prepare the response.
    405 		var response Activity
    406 		if w.OnFollow == OnFollowAutomaticallyAccept {
    407 			response = streams.NewActivityStreamsAccept()
    408 		} else if w.OnFollow == OnFollowAutomaticallyReject {
    409 			response = streams.NewActivityStreamsReject()
    410 		} else {
    411 			return fmt.Errorf("unknown OnFollowBehavior: %d", w.OnFollow)
    412 		}
    413 		// Set us as the 'actor'.
    414 		me := streams.NewActivityStreamsActorProperty()
    415 		response.SetActivityStreamsActor(me)
    416 		me.AppendIRI(actorIRI)
    417 		// Set the Follow as the 'object' property.
    418 		op := streams.NewActivityStreamsObjectProperty()
    419 		response.SetActivityStreamsObject(op)
    420 		op.AppendActivityStreamsFollow(a)
    421 		// Add all actors on the original Follow to the 'to' property.
    422 		recipients := make([]*url.URL, 0)
    423 		to := streams.NewActivityStreamsToProperty()
    424 		response.SetActivityStreamsTo(to)
    425 		followActors := a.GetActivityStreamsActor()
    426 		for iter := followActors.Begin(); iter != followActors.End(); iter = iter.Next() {
    427 			id, err := ToId(iter)
    428 			if err != nil {
    429 				return err
    430 			}
    431 			to.AppendIRI(id)
    432 			recipients = append(recipients, id)
    433 		}
    434 		if w.OnFollow == OnFollowAutomaticallyAccept {
    435 			// If automatically accepting, then also update our
    436 			// followers collection with the new actors.
    437 			//
    438 			// If automatically rejecting, do not update the
    439 			// followers collection.
    440 			unlock, err := w.db.Lock(c, actorIRI)
    441 			if err != nil {
    442 				return err
    443 			}
    444 			// WARNING: Unlock not deferred.
    445 			followers, err := w.db.Followers(c, actorIRI)
    446 			if err != nil {
    447 				unlock()
    448 				return err
    449 			}
    450 			items := followers.GetActivityStreamsItems()
    451 			if items == nil {
    452 				items = streams.NewActivityStreamsItemsProperty()
    453 				followers.SetActivityStreamsItems(items)
    454 			}
    455 			for _, elem := range recipients {
    456 				items.PrependIRI(elem)
    457 			}
    458 			err = w.db.Update(c, followers)
    459 			unlock() // unlock even on error
    460 			if err != nil {
    461 				return err
    462 			}
    463 			// Unlock must be called by now and every branch above.
    464 		}
    465 		// Lock without defer!
    466 		unlock, err := w.db.Lock(c, w.inboxIRI)
    467 		if err != nil {
    468 			return err
    469 		}
    470 		outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI)
    471 		unlock() // unlock after, regardless
    472 		if err != nil {
    473 			return err
    474 		}
    475 		// Everything must be unlocked by now.
    476 		if err := w.addNewIds(c, response); err != nil {
    477 			return err
    478 		} else if err := w.deliver(c, outboxIRI, response); err != nil {
    479 			return err
    480 		}
    481 	}
    482 	if w.Follow != nil {
    483 		return w.Follow(c, a)
    484 	}
    485 	return nil
    486 }
    487 
    488 // accept implements the federating Accept activity side effects.
    489 func (w FederatingWrappedCallbacks) accept(c context.Context, a vocab.ActivityStreamsAccept) error {
    490 	op := a.GetActivityStreamsObject()
    491 	if op != nil && op.Len() > 0 {
    492 		// Get this actor's id.
    493 		unlock, err := w.db.Lock(c, w.inboxIRI)
    494 		if err != nil {
    495 			return err
    496 		}
    497 		// WARNING: Unlock not deferred.
    498 		actorIRI, err := w.db.ActorForInbox(c, w.inboxIRI)
    499 		unlock() // unlock after regardless
    500 		if err != nil {
    501 			return err
    502 		}
    503 		// Unlock must be called by now and every branch above.
    504 		//
    505 		// Determine if we are in a follow on the 'object' property.
    506 		//
    507 		// TODO: Handle Accept multiple Follow.
    508 		var maybeMyFollowIRI *url.URL
    509 		for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    510 			t := iter.GetType()
    511 			if t == nil && iter.IsIRI() {
    512 				// Attempt to dereference the IRI instead
    513 				tport, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
    514 				if err != nil {
    515 					return err
    516 				}
    517 				b, err := tport.Dereference(c, iter.GetIRI())
    518 				if err != nil {
    519 					return err
    520 				}
    521 				var m map[string]interface{}
    522 				if err = json.Unmarshal(b, &m); err != nil {
    523 					return err
    524 				}
    525 				t, err = streams.ToType(c, m)
    526 				if err != nil {
    527 					return err
    528 				}
    529 			} else if t == nil {
    530 				return fmt.Errorf("cannot handle federated create: object is neither a value nor IRI")
    531 			}
    532 			// Ensure it is a Follow.
    533 			if !streams.IsOrExtendsActivityStreamsFollow(t) {
    534 				continue
    535 			}
    536 			follow, ok := t.(Activity)
    537 			if !ok {
    538 				return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
    539 			}
    540 			followId, err := GetId(follow)
    541 			if err != nil {
    542 				return err
    543 			}
    544 			// Ensure that we are one of the actors on the Follow.
    545 			actors := follow.GetActivityStreamsActor()
    546 			for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
    547 				id, err := ToId(iter)
    548 				if err != nil {
    549 					return err
    550 				}
    551 				if id.String() == actorIRI.String() {
    552 					maybeMyFollowIRI = followId
    553 					break
    554 				}
    555 			}
    556 			// Continue breaking if we found ourselves
    557 			if maybeMyFollowIRI != nil {
    558 				break
    559 			}
    560 		}
    561 		// If we received an Accept whose 'object' is a Follow with an
    562 		// Accept that we sent, add to the following collection.
    563 		if maybeMyFollowIRI != nil {
    564 			// Verify our Follow request exists and the peer didn't
    565 			// fabricate it.
    566 			activityActors := a.GetActivityStreamsActor()
    567 			if activityActors == nil || activityActors.Len() == 0 {
    568 				return fmt.Errorf("an Accept with a Follow has no actors")
    569 			}
    570 			// This may be a duplicate check if we dereferenced the
    571 			// Follow above. TODO: Separate this logic to avoid
    572 			// redundancy.
    573 			//
    574 			// Use an anonymous function to properly scope the
    575 			// database lock, immediately call it.
    576 			err = func() error {
    577 				unlock, err := w.db.Lock(c, maybeMyFollowIRI)
    578 				if err != nil {
    579 					return err
    580 				}
    581 				defer unlock()
    582 				t, err := w.db.Get(c, maybeMyFollowIRI)
    583 				if err != nil {
    584 					return err
    585 				}
    586 				if !streams.IsOrExtendsActivityStreamsFollow(t) {
    587 					return fmt.Errorf("peer gave an Accept wrapping a Follow but provided a non-Follow id")
    588 				}
    589 				follow, ok := t.(Activity)
    590 				if !ok {
    591 					return fmt.Errorf("a Follow in an Accept does not satisfy the Activity interface")
    592 				}
    593 				// Ensure that we are one of the actors on the Follow.
    594 				ok = false
    595 				actors := follow.GetActivityStreamsActor()
    596 				for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
    597 					id, err := ToId(iter)
    598 					if err != nil {
    599 						return err
    600 					}
    601 					if id.String() == actorIRI.String() {
    602 						ok = true
    603 						break
    604 					}
    605 				}
    606 				if !ok {
    607 					return fmt.Errorf("peer gave an Accept wrapping a Follow but we are not the actor on that Follow")
    608 				}
    609 				// Build map of original Accept actors
    610 				acceptActors := make(map[string]bool)
    611 				for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
    612 					id, err := ToId(iter)
    613 					if err != nil {
    614 						return err
    615 					}
    616 					acceptActors[id.String()] = false
    617 				}
    618 				// Verify all actor(s) were on the original Follow.
    619 				followObj := follow.GetActivityStreamsObject()
    620 				for iter := followObj.Begin(); iter != followObj.End(); iter = iter.Next() {
    621 					id, err := ToId(iter)
    622 					if err != nil {
    623 						return err
    624 					}
    625 					if _, ok := acceptActors[id.String()]; ok {
    626 						acceptActors[id.String()] = true
    627 					}
    628 				}
    629 				for _, found := range acceptActors {
    630 					if !found {
    631 						return fmt.Errorf("peer gave an Accept wrapping a Follow but was not an object in the original Follow")
    632 					}
    633 				}
    634 				return nil
    635 			}()
    636 			if err != nil {
    637 				return err
    638 			}
    639 			// Add the peer to our following collection.
    640 			unlock, err := w.db.Lock(c, actorIRI)
    641 			if err != nil {
    642 				return err
    643 			}
    644 			// WARNING: Unlock not deferred.
    645 			following, err := w.db.Following(c, actorIRI)
    646 			if err != nil {
    647 				unlock()
    648 				return err
    649 			}
    650 			items := following.GetActivityStreamsItems()
    651 			if items == nil {
    652 				items = streams.NewActivityStreamsItemsProperty()
    653 				following.SetActivityStreamsItems(items)
    654 			}
    655 			for iter := activityActors.Begin(); iter != activityActors.End(); iter = iter.Next() {
    656 				id, err := ToId(iter)
    657 				if err != nil {
    658 					unlock()
    659 					return err
    660 				}
    661 				items.PrependIRI(id)
    662 			}
    663 			err = w.db.Update(c, following)
    664 			unlock() // unlock after regardless
    665 			if err != nil {
    666 				return err
    667 			}
    668 			// Unlock must be called by now and every branch above.
    669 		}
    670 	}
    671 	if w.Accept != nil {
    672 		return w.Accept(c, a)
    673 	}
    674 	return nil
    675 }
    676 
    677 // reject implements the federating Reject activity side effects.
    678 func (w FederatingWrappedCallbacks) reject(c context.Context, a vocab.ActivityStreamsReject) error {
    679 	if w.Reject != nil {
    680 		return w.Reject(c, a)
    681 	}
    682 	return nil
    683 }
    684 
    685 // add implements the federating Add activity side effects.
    686 func (w FederatingWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error {
    687 	op := a.GetActivityStreamsObject()
    688 	if op == nil || op.Len() == 0 {
    689 		return ErrObjectRequired
    690 	}
    691 	target := a.GetActivityStreamsTarget()
    692 	if target == nil || target.Len() == 0 {
    693 		return ErrTargetRequired
    694 	}
    695 	if err := add(c, op, target, w.db); err != nil {
    696 		return err
    697 	}
    698 	if w.Add != nil {
    699 		return w.Add(c, a)
    700 	}
    701 	return nil
    702 }
    703 
    704 // remove implements the federating Remove activity side effects.
    705 func (w FederatingWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error {
    706 	op := a.GetActivityStreamsObject()
    707 	if op == nil || op.Len() == 0 {
    708 		return ErrObjectRequired
    709 	}
    710 	target := a.GetActivityStreamsTarget()
    711 	if target == nil || target.Len() == 0 {
    712 		return ErrTargetRequired
    713 	}
    714 	if err := remove(c, op, target, w.db); err != nil {
    715 		return err
    716 	}
    717 	if w.Remove != nil {
    718 		return w.Remove(c, a)
    719 	}
    720 	return nil
    721 }
    722 
    723 // like implements the federating Like activity side effects.
    724 func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error {
    725 	op := a.GetActivityStreamsObject()
    726 	if op == nil || op.Len() == 0 {
    727 		return ErrObjectRequired
    728 	}
    729 	id, err := GetId(a)
    730 	if err != nil {
    731 		return err
    732 	}
    733 	// Create anonymous loop function to be able to properly scope the defer
    734 	// for the database lock at each iteration.
    735 	loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
    736 		objId, err := ToId(iter)
    737 		if err != nil {
    738 			return err
    739 		}
    740 		unlock, err := w.db.Lock(c, objId)
    741 		if err != nil {
    742 			return err
    743 		}
    744 		defer unlock()
    745 		if owns, err := w.db.Owns(c, objId); err != nil {
    746 			return err
    747 		} else if !owns {
    748 			return nil
    749 		}
    750 		t, err := w.db.Get(c, objId)
    751 		if err != nil {
    752 			return err
    753 		}
    754 		l, ok := t.(likeser)
    755 		if !ok {
    756 			return fmt.Errorf("cannot add Like to likes collection for type %T", t)
    757 		}
    758 		// Get 'likes' property on the object, creating default if
    759 		// necessary.
    760 		likes := l.GetActivityStreamsLikes()
    761 		if likes == nil {
    762 			likes = streams.NewActivityStreamsLikesProperty()
    763 			l.SetActivityStreamsLikes(likes)
    764 		}
    765 		// Get 'likes' value, defaulting to a collection.
    766 		likesT := likes.GetType()
    767 		if likesT == nil {
    768 			col := streams.NewActivityStreamsCollection()
    769 			likesT = col
    770 			likes.SetActivityStreamsCollection(col)
    771 		}
    772 		// Prepend the activity's 'id' on the 'likes' Collection or
    773 		// OrderedCollection.
    774 		if col, ok := likesT.(itemser); ok {
    775 			items := col.GetActivityStreamsItems()
    776 			if items == nil {
    777 				items = streams.NewActivityStreamsItemsProperty()
    778 				col.SetActivityStreamsItems(items)
    779 			}
    780 			items.PrependIRI(id)
    781 		} else if oCol, ok := likesT.(orderedItemser); ok {
    782 			oItems := oCol.GetActivityStreamsOrderedItems()
    783 			if oItems == nil {
    784 				oItems = streams.NewActivityStreamsOrderedItemsProperty()
    785 				oCol.SetActivityStreamsOrderedItems(oItems)
    786 			}
    787 			oItems.PrependIRI(id)
    788 		} else {
    789 			return fmt.Errorf("likes type is neither a Collection nor an OrderedCollection: %T", likesT)
    790 		}
    791 		err = w.db.Update(c, t)
    792 		if err != nil {
    793 			return err
    794 		}
    795 		return nil
    796 	}
    797 	for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    798 		if err := loopFn(iter); err != nil {
    799 			return err
    800 		}
    801 	}
    802 	if w.Like != nil {
    803 		return w.Like(c, a)
    804 	}
    805 	return nil
    806 }
    807 
    808 // announce implements the federating Announce activity side effects.
    809 func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.ActivityStreamsAnnounce) error {
    810 	id, err := GetId(a)
    811 	if err != nil {
    812 		return err
    813 	}
    814 	op := a.GetActivityStreamsObject()
    815 	// Create anonymous loop function to be able to properly scope the defer
    816 	// for the database lock at each iteration.
    817 	loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
    818 		objId, err := ToId(iter)
    819 		if err != nil {
    820 			return err
    821 		}
    822 		unlock, err := w.db.Lock(c, objId)
    823 		if err != nil {
    824 			return err
    825 		}
    826 		defer unlock()
    827 		if owns, err := w.db.Owns(c, objId); err != nil {
    828 			return err
    829 		} else if !owns {
    830 			return nil
    831 		}
    832 		t, err := w.db.Get(c, objId)
    833 		if err != nil {
    834 			return err
    835 		}
    836 		s, ok := t.(shareser)
    837 		if !ok {
    838 			return fmt.Errorf("cannot add Announce to Shares collection for type %T", t)
    839 		}
    840 		// Get 'shares' property on the object, creating default if
    841 		// necessary.
    842 		shares := s.GetActivityStreamsShares()
    843 		if shares == nil {
    844 			shares = streams.NewActivityStreamsSharesProperty()
    845 			s.SetActivityStreamsShares(shares)
    846 		}
    847 		// Get 'shares' value, defaulting to a collection.
    848 		sharesT := shares.GetType()
    849 		if sharesT == nil {
    850 			col := streams.NewActivityStreamsCollection()
    851 			sharesT = col
    852 			shares.SetActivityStreamsCollection(col)
    853 		}
    854 		// Prepend the activity's 'id' on the 'shares' Collection or
    855 		// OrderedCollection.
    856 		if col, ok := sharesT.(itemser); ok {
    857 			items := col.GetActivityStreamsItems()
    858 			if items == nil {
    859 				items = streams.NewActivityStreamsItemsProperty()
    860 				col.SetActivityStreamsItems(items)
    861 			}
    862 			items.PrependIRI(id)
    863 		} else if oCol, ok := sharesT.(orderedItemser); ok {
    864 			oItems := oCol.GetActivityStreamsOrderedItems()
    865 			if oItems == nil {
    866 				oItems = streams.NewActivityStreamsOrderedItemsProperty()
    867 				oCol.SetActivityStreamsOrderedItems(oItems)
    868 			}
    869 			oItems.PrependIRI(id)
    870 		} else {
    871 			return fmt.Errorf("shares type is neither a Collection nor an OrderedCollection: %T", sharesT)
    872 		}
    873 		err = w.db.Update(c, t)
    874 		if err != nil {
    875 			return err
    876 		}
    877 		return nil
    878 	}
    879 	if op != nil {
    880 		for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    881 			if err := loopFn(iter); err != nil {
    882 				return err
    883 			}
    884 		}
    885 	}
    886 	if w.Announce != nil {
    887 		return w.Announce(c, a)
    888 	}
    889 	return nil
    890 }
    891 
    892 // undo implements the federating Undo activity side effects.
    893 func (w FederatingWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error {
    894 	op := a.GetActivityStreamsObject()
    895 	if op == nil || op.Len() == 0 {
    896 		return ErrObjectRequired
    897 	}
    898 	actors := a.GetActivityStreamsActor()
    899 	if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.inboxIRI); err != nil {
    900 		return err
    901 	}
    902 	if w.Undo != nil {
    903 		return w.Undo(c, a)
    904 	}
    905 	return nil
    906 }
    907 
    908 // block implements the federating Block activity side effects.
    909 func (w FederatingWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error {
    910 	op := a.GetActivityStreamsObject()
    911 	if op == nil || op.Len() == 0 {
    912 		return ErrObjectRequired
    913 	}
    914 	if w.Block != nil {
    915 		return w.Block(c, a)
    916 	}
    917 	return nil
    918 }