gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

social_wrapped_callbacks.go (15389B)


      1 package pub
      2 
      3 import (
      4 	"context"
      5 	"fmt"
      6 	"net/url"
      7 
      8 	"github.com/superseriousbusiness/activity/streams"
      9 	"github.com/superseriousbusiness/activity/streams/vocab"
     10 )
     11 
     12 // SocialWrappedCallbacks lists the callback functions that already have some
     13 // side effect behavior provided by the pub library.
     14 //
     15 // These functions are wrapped for the Social Protocol.
     16 type SocialWrappedCallbacks struct {
     17 	// Create handles additional side effects for the Create ActivityStreams
     18 	// type.
     19 	//
     20 	// The wrapping callback copies the actor(s) to the 'attributedTo'
     21 	// property and copies recipients between the Create activity and all
     22 	// objects. It then saves the entry in the database.
     23 	Create func(context.Context, vocab.ActivityStreamsCreate) error
     24 	// Update handles additional side effects for the Update ActivityStreams
     25 	// type.
     26 	//
     27 	// The wrapping callback applies new top-level values on an object to
     28 	// the stored objects. Any top-level null literals will be deleted on
     29 	// the stored objects as well.
     30 	Update func(context.Context, vocab.ActivityStreamsUpdate) error
     31 	// Delete handles additional side effects for the Delete ActivityStreams
     32 	// type.
     33 	//
     34 	// The wrapping callback replaces the object(s) with tombstones in the
     35 	// database.
     36 	Delete func(context.Context, vocab.ActivityStreamsDelete) error
     37 	// Follow handles additional side effects for the Follow ActivityStreams
     38 	// type.
     39 	//
     40 	// The wrapping callback only ensures the 'Follow' has at least one
     41 	// 'object' entry, but otherwise has no default side effect.
     42 	Follow func(context.Context, vocab.ActivityStreamsFollow) error
     43 	// Add handles additional side effects for the Add ActivityStreams
     44 	// type.
     45 	//
     46 	//
     47 	// The wrapping function will add the 'object' IRIs to a specific
     48 	// 'target' collection if the 'target' collection(s) live on this
     49 	// server.
     50 	Add func(context.Context, vocab.ActivityStreamsAdd) error
     51 	// Remove handles additional side effects for the Remove ActivityStreams
     52 	// type.
     53 	//
     54 	// The wrapping function will remove all 'object' IRIs from a specific
     55 	// 'target' collection if the 'target' collection(s) live on this
     56 	// server.
     57 	Remove func(context.Context, vocab.ActivityStreamsRemove) error
     58 	// Like handles additional side effects for the Like ActivityStreams
     59 	// type.
     60 	//
     61 	// The wrapping function will add the objects on the activity to the
     62 	// "liked" collection of this actor.
     63 	Like func(context.Context, vocab.ActivityStreamsLike) error
     64 	// Undo handles additional side effects for the Undo ActivityStreams
     65 	// type.
     66 	//
     67 	//
     68 	// The wrapping function ensures the 'actor' on the 'Undo'
     69 	// is be the same as the 'actor' on all Activities being undone.
     70 	// It enforces that the actors on the Undo must correspond to all of the
     71 	// 'object' actors in some manner.
     72 	//
     73 	// It is expected that the application will implement the proper
     74 	// reversal of activities that are being undone.
     75 	Undo func(context.Context, vocab.ActivityStreamsUndo) error
     76 	// Block handles additional side effects for the Block ActivityStreams
     77 	// type.
     78 	//
     79 	// The wrapping callback only ensures the 'Block' has at least one
     80 	// 'object' entry, but otherwise has no default side effect. It is up
     81 	// to the wrapped application function to properly enforce the new
     82 	// blocking behavior.
     83 	//
     84 	// Note that go-fed does not federate 'Block' activities received in the
     85 	// Social Protocol.
     86 	Block func(context.Context, vocab.ActivityStreamsBlock) error
     87 
     88 	// Sidechannel data -- this is set at request handling time. These must
     89 	// be set before the callbacks are used.
     90 
     91 	// db is the Database the SocialWrappedCallbacks should use. It must be
     92 	// set before calling the callbacks.
     93 	db Database
     94 	// outboxIRI is the outboxIRI that is handling this callback.
     95 	outboxIRI *url.URL
     96 	// rawActivity is the JSON map literal received when deserializing the
     97 	// request body.
     98 	rawActivity map[string]interface{}
     99 	// clock is the server's clock.
    100 	clock Clock
    101 	// newTransport creates a new Transport.
    102 	newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
    103 	// undeliverable is a sidechannel out, indicating if the handled activity
    104 	// should not be delivered to a peer.
    105 	//
    106 	// Its provided default value will always be used when a custom function
    107 	// is called.
    108 	undeliverable *bool
    109 }
    110 
    111 // callbacks returns the WrappedCallbacks members into a single interface slice
    112 // for use in streams.Resolver callbacks.
    113 //
    114 // If the given functions have a type that collides with the default behavior,
    115 // then disable our default behavior
    116 func (w SocialWrappedCallbacks) callbacks(fns []interface{}) []interface{} {
    117 	enableCreate := true
    118 	enableUpdate := true
    119 	enableDelete := true
    120 	enableFollow := true
    121 	enableAdd := true
    122 	enableRemove := true
    123 	enableLike := true
    124 	enableUndo := true
    125 	enableBlock := true
    126 	for _, fn := range fns {
    127 		switch fn.(type) {
    128 		default:
    129 			continue
    130 		case func(context.Context, vocab.ActivityStreamsCreate) error:
    131 			enableCreate = false
    132 		case func(context.Context, vocab.ActivityStreamsUpdate) error:
    133 			enableUpdate = false
    134 		case func(context.Context, vocab.ActivityStreamsDelete) error:
    135 			enableDelete = false
    136 		case func(context.Context, vocab.ActivityStreamsFollow) error:
    137 			enableFollow = false
    138 		case func(context.Context, vocab.ActivityStreamsAdd) error:
    139 			enableAdd = false
    140 		case func(context.Context, vocab.ActivityStreamsRemove) error:
    141 			enableRemove = false
    142 		case func(context.Context, vocab.ActivityStreamsLike) error:
    143 			enableLike = false
    144 		case func(context.Context, vocab.ActivityStreamsUndo) error:
    145 			enableUndo = false
    146 		case func(context.Context, vocab.ActivityStreamsBlock) error:
    147 			enableBlock = false
    148 		}
    149 	}
    150 	if enableCreate {
    151 		fns = append(fns, w.create)
    152 	}
    153 	if enableUpdate {
    154 		fns = append(fns, w.update)
    155 	}
    156 	if enableDelete {
    157 		fns = append(fns, w.deleteFn)
    158 	}
    159 	if enableFollow {
    160 		fns = append(fns, w.follow)
    161 	}
    162 	if enableAdd {
    163 		fns = append(fns, w.add)
    164 	}
    165 	if enableRemove {
    166 		fns = append(fns, w.remove)
    167 	}
    168 	if enableLike {
    169 		fns = append(fns, w.like)
    170 	}
    171 	if enableUndo {
    172 		fns = append(fns, w.undo)
    173 	}
    174 	if enableBlock {
    175 		fns = append(fns, w.block)
    176 	}
    177 	return fns
    178 }
    179 
    180 // create implements the social Create activity side effects.
    181 func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error {
    182 	*w.undeliverable = false
    183 	op := a.GetActivityStreamsObject()
    184 	if op == nil || op.Len() == 0 {
    185 		return ErrObjectRequired
    186 	}
    187 	// Obtain all actor IRIs.
    188 	actors := a.GetActivityStreamsActor()
    189 	createActorIds := make(map[string]*url.URL)
    190 	if actors != nil {
    191 		createActorIds = make(map[string]*url.URL, actors.Len())
    192 		for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
    193 			id, err := ToId(iter)
    194 			if err != nil {
    195 				return err
    196 			}
    197 			createActorIds[id.String()] = id
    198 		}
    199 	}
    200 	// Obtain each object's 'attributedTo' IRIs.
    201 	objectAttributedToIds := make([]map[string]*url.URL, op.Len())
    202 	for i := range objectAttributedToIds {
    203 		objectAttributedToIds[i] = make(map[string]*url.URL)
    204 	}
    205 	for i := 0; i < op.Len(); i++ {
    206 		t := op.At(i).GetType()
    207 		attrToer, ok := t.(attributedToer)
    208 		if !ok {
    209 			continue
    210 		}
    211 		attr := attrToer.GetActivityStreamsAttributedTo()
    212 		if attr == nil {
    213 			attr = streams.NewActivityStreamsAttributedToProperty()
    214 			attrToer.SetActivityStreamsAttributedTo(attr)
    215 		}
    216 		for iter := attr.Begin(); iter != attr.End(); iter = iter.Next() {
    217 			id, err := ToId(iter)
    218 			if err != nil {
    219 				return err
    220 			}
    221 			objectAttributedToIds[i][id.String()] = id
    222 		}
    223 	}
    224 	// Put all missing actor IRIs onto all object attributedTo properties.
    225 	for k, v := range createActorIds {
    226 		for i, attributedToMap := range objectAttributedToIds {
    227 			if _, ok := attributedToMap[k]; !ok {
    228 				t := op.At(i).GetType()
    229 				attrToer, ok := t.(attributedToer)
    230 				if !ok {
    231 					continue
    232 				}
    233 				attr := attrToer.GetActivityStreamsAttributedTo()
    234 				attr.AppendIRI(v)
    235 			}
    236 		}
    237 	}
    238 	// Put all missing object attributedTo IRIs onto the actor property
    239 	// if there is one.
    240 	if actors != nil {
    241 		for _, attributedToMap := range objectAttributedToIds {
    242 			for k, v := range attributedToMap {
    243 				if _, ok := createActorIds[k]; !ok {
    244 					actors.AppendIRI(v)
    245 				}
    246 			}
    247 		}
    248 	}
    249 	// Copy over the 'to', 'bto', 'cc', 'bcc', and 'audience' recipients
    250 	// between the activity and all child objects and vice versa.
    251 	if err := normalizeRecipients(a); err != nil {
    252 		return err
    253 	}
    254 	// Create anonymous loop function to be able to properly scope the defer
    255 	// for the database lock at each iteration.
    256 	loopFn := func(i int) error {
    257 		obj := op.At(i).GetType()
    258 		id, err := GetId(obj)
    259 		if err != nil {
    260 			return err
    261 		}
    262 		var unlock func()
    263 		unlock, err = w.db.Lock(c, id)
    264 		if err != nil {
    265 			return err
    266 		}
    267 		defer unlock()
    268 		if err := w.db.Create(c, obj); err != nil {
    269 			return err
    270 		}
    271 		return nil
    272 	}
    273 	// Persist all objects we've created, which will include sensitive
    274 	// recipients such as 'bcc' and 'bto'.
    275 	for i := 0; i < op.Len(); i++ {
    276 		if err := loopFn(i); err != nil {
    277 			return err
    278 		}
    279 	}
    280 	if w.Create != nil {
    281 		return w.Create(c, a)
    282 	}
    283 	return nil
    284 }
    285 
    286 // update implements the social Update activity side effects.
    287 func (w SocialWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error {
    288 	*w.undeliverable = false
    289 	op := a.GetActivityStreamsObject()
    290 	if op == nil || op.Len() == 0 {
    291 		return ErrObjectRequired
    292 	}
    293 	// Obtain all object ids, which should be owned by this server.
    294 	objIds := make([]*url.URL, 0, op.Len())
    295 	for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    296 		id, err := ToId(iter)
    297 		if err != nil {
    298 			return err
    299 		}
    300 		objIds = append(objIds, id)
    301 	}
    302 	// Create anonymous loop function to be able to properly scope the defer
    303 	// for the database lock at each iteration.
    304 	loopFn := func(idx int, loopId *url.URL) error {
    305 		unlock, err := w.db.Lock(c, loopId)
    306 		if err != nil {
    307 			return err
    308 		}
    309 		defer unlock()
    310 		t, err := w.db.Get(c, loopId)
    311 		if err != nil {
    312 			return err
    313 		}
    314 		m, err := t.Serialize()
    315 		if err != nil {
    316 			return err
    317 		}
    318 		// Copy over new top-level values.
    319 		objType := op.At(idx).GetType()
    320 		if objType == nil {
    321 			return fmt.Errorf("object at index %d is not a literal type value", idx)
    322 		}
    323 		newM, err := objType.Serialize()
    324 		if err != nil {
    325 			return err
    326 		}
    327 		for k, v := range newM {
    328 			m[k] = v
    329 		}
    330 		// Delete top-level values where the raw Activity had nils.
    331 		for k, v := range w.rawActivity {
    332 			if _, ok := m[k]; v == nil && ok {
    333 				delete(m, k)
    334 			}
    335 		}
    336 		newT, err := streams.ToType(c, m)
    337 		if err != nil {
    338 			return err
    339 		}
    340 		if err = w.db.Update(c, newT); err != nil {
    341 			return err
    342 		}
    343 		return nil
    344 	}
    345 	for i, id := range objIds {
    346 		if err := loopFn(i, id); err != nil {
    347 			return err
    348 		}
    349 	}
    350 	if w.Update != nil {
    351 		return w.Update(c, a)
    352 	}
    353 	return nil
    354 }
    355 
    356 // deleteFn implements the social Delete activity side effects.
    357 func (w SocialWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error {
    358 	*w.undeliverable = false
    359 	op := a.GetActivityStreamsObject()
    360 	if op == nil || op.Len() == 0 {
    361 		return ErrObjectRequired
    362 	}
    363 	// Obtain all object ids, which should be owned by this server.
    364 	objIds := make([]*url.URL, 0, op.Len())
    365 	for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    366 		id, err := ToId(iter)
    367 		if err != nil {
    368 			return err
    369 		}
    370 		objIds = append(objIds, id)
    371 	}
    372 	// Create anonymous loop function to be able to properly scope the defer
    373 	// for the database lock at each iteration.
    374 	loopFn := func(idx int, loopId *url.URL) error {
    375 		unlock, err := w.db.Lock(c, loopId)
    376 		if err != nil {
    377 			return err
    378 		}
    379 		defer unlock()
    380 		t, err := w.db.Get(c, loopId)
    381 		if err != nil {
    382 			return err
    383 		}
    384 		tomb := toTombstone(t, loopId, w.clock.Now())
    385 		if err := w.db.Update(c, tomb); err != nil {
    386 			return err
    387 		}
    388 		return nil
    389 	}
    390 	for i, id := range objIds {
    391 		if err := loopFn(i, id); err != nil {
    392 			return err
    393 		}
    394 	}
    395 	if w.Delete != nil {
    396 		return w.Delete(c, a)
    397 	}
    398 	return nil
    399 }
    400 
    401 // follow implements the social Follow activity side effects.
    402 func (w SocialWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error {
    403 	*w.undeliverable = false
    404 	op := a.GetActivityStreamsObject()
    405 	if op == nil || op.Len() == 0 {
    406 		return ErrObjectRequired
    407 	}
    408 	if w.Follow != nil {
    409 		return w.Follow(c, a)
    410 	}
    411 	return nil
    412 }
    413 
    414 // add implements the social Add activity side effects.
    415 func (w SocialWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error {
    416 	*w.undeliverable = false
    417 	op := a.GetActivityStreamsObject()
    418 	if op == nil || op.Len() == 0 {
    419 		return ErrObjectRequired
    420 	}
    421 	target := a.GetActivityStreamsTarget()
    422 	if target == nil || target.Len() == 0 {
    423 		return ErrTargetRequired
    424 	}
    425 	if err := add(c, op, target, w.db); err != nil {
    426 		return err
    427 	}
    428 	if w.Add != nil {
    429 		return w.Add(c, a)
    430 	}
    431 	return nil
    432 }
    433 
    434 // remove implements the social Remove activity side effects.
    435 func (w SocialWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error {
    436 	*w.undeliverable = false
    437 	op := a.GetActivityStreamsObject()
    438 	if op == nil || op.Len() == 0 {
    439 		return ErrObjectRequired
    440 	}
    441 	target := a.GetActivityStreamsTarget()
    442 	if target == nil || target.Len() == 0 {
    443 		return ErrTargetRequired
    444 	}
    445 	if err := remove(c, op, target, w.db); err != nil {
    446 		return err
    447 	}
    448 	if w.Remove != nil {
    449 		return w.Remove(c, a)
    450 	}
    451 	return nil
    452 }
    453 
    454 // like implements the social Like activity side effects.
    455 func (w SocialWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error {
    456 	*w.undeliverable = false
    457 	op := a.GetActivityStreamsObject()
    458 	if op == nil || op.Len() == 0 {
    459 		return ErrObjectRequired
    460 	}
    461 	// Get this actor's IRI.
    462 	unlock, err := w.db.Lock(c, w.outboxIRI)
    463 	if err != nil {
    464 		return err
    465 	}
    466 	// WARNING: Unlock not deferred.
    467 	actorIRI, err := w.db.ActorForOutbox(c, w.outboxIRI)
    468 	unlock() // unlock even on error
    469 	if err != nil {
    470 		return err
    471 	}
    472 	// Unlock must be called by now and every branch above.
    473 	//
    474 	// Now obtain this actor's 'liked' collection.
    475 	unlock, err = w.db.Lock(c, actorIRI)
    476 	if err != nil {
    477 		return err
    478 	}
    479 	defer unlock()
    480 	liked, err := w.db.Liked(c, actorIRI)
    481 	if err != nil {
    482 		return err
    483 	}
    484 	likedItems := liked.GetActivityStreamsItems()
    485 	if likedItems == nil {
    486 		likedItems = streams.NewActivityStreamsItemsProperty()
    487 		liked.SetActivityStreamsItems(likedItems)
    488 	}
    489 	for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
    490 		objId, err := ToId(iter)
    491 		if err != nil {
    492 			return err
    493 		}
    494 		likedItems.PrependIRI(objId)
    495 	}
    496 	err = w.db.Update(c, liked)
    497 	if err != nil {
    498 		return err
    499 	}
    500 	if w.Like != nil {
    501 		return w.Like(c, a)
    502 	}
    503 	return nil
    504 }
    505 
    506 // undo implements the social Undo activity side effects.
    507 func (w SocialWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error {
    508 	*w.undeliverable = false
    509 	op := a.GetActivityStreamsObject()
    510 	if op == nil || op.Len() == 0 {
    511 		return ErrObjectRequired
    512 	}
    513 	actors := a.GetActivityStreamsActor()
    514 	if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.outboxIRI); err != nil {
    515 		return err
    516 	}
    517 	if w.Undo != nil {
    518 		return w.Undo(c, a)
    519 	}
    520 	return nil
    521 }
    522 
    523 // block implements the social Block activity side effects.
    524 func (w SocialWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error {
    525 	*w.undeliverable = true
    526 	op := a.GetActivityStreamsObject()
    527 	if op == nil || op.Len() == 0 {
    528 		return ErrObjectRequired
    529 	}
    530 	if w.Block != nil {
    531 		return w.Block(c, a)
    532 	}
    533 	return nil
    534 }