social_wrapped_callbacks.go (15389B)
1 package pub 2 3 import ( 4 "context" 5 "fmt" 6 "net/url" 7 8 "github.com/superseriousbusiness/activity/streams" 9 "github.com/superseriousbusiness/activity/streams/vocab" 10 ) 11 12 // SocialWrappedCallbacks lists the callback functions that already have some 13 // side effect behavior provided by the pub library. 14 // 15 // These functions are wrapped for the Social Protocol. 16 type SocialWrappedCallbacks struct { 17 // Create handles additional side effects for the Create ActivityStreams 18 // type. 19 // 20 // The wrapping callback copies the actor(s) to the 'attributedTo' 21 // property and copies recipients between the Create activity and all 22 // objects. It then saves the entry in the database. 23 Create func(context.Context, vocab.ActivityStreamsCreate) error 24 // Update handles additional side effects for the Update ActivityStreams 25 // type. 26 // 27 // The wrapping callback applies new top-level values on an object to 28 // the stored objects. Any top-level null literals will be deleted on 29 // the stored objects as well. 30 Update func(context.Context, vocab.ActivityStreamsUpdate) error 31 // Delete handles additional side effects for the Delete ActivityStreams 32 // type. 33 // 34 // The wrapping callback replaces the object(s) with tombstones in the 35 // database. 36 Delete func(context.Context, vocab.ActivityStreamsDelete) error 37 // Follow handles additional side effects for the Follow ActivityStreams 38 // type. 39 // 40 // The wrapping callback only ensures the 'Follow' has at least one 41 // 'object' entry, but otherwise has no default side effect. 42 Follow func(context.Context, vocab.ActivityStreamsFollow) error 43 // Add handles additional side effects for the Add ActivityStreams 44 // type. 45 // 46 // 47 // The wrapping function will add the 'object' IRIs to a specific 48 // 'target' collection if the 'target' collection(s) live on this 49 // server. 50 Add func(context.Context, vocab.ActivityStreamsAdd) error 51 // Remove handles additional side effects for the Remove ActivityStreams 52 // type. 53 // 54 // The wrapping function will remove all 'object' IRIs from a specific 55 // 'target' collection if the 'target' collection(s) live on this 56 // server. 57 Remove func(context.Context, vocab.ActivityStreamsRemove) error 58 // Like handles additional side effects for the Like ActivityStreams 59 // type. 60 // 61 // The wrapping function will add the objects on the activity to the 62 // "liked" collection of this actor. 63 Like func(context.Context, vocab.ActivityStreamsLike) error 64 // Undo handles additional side effects for the Undo ActivityStreams 65 // type. 66 // 67 // 68 // The wrapping function ensures the 'actor' on the 'Undo' 69 // is be the same as the 'actor' on all Activities being undone. 70 // It enforces that the actors on the Undo must correspond to all of the 71 // 'object' actors in some manner. 72 // 73 // It is expected that the application will implement the proper 74 // reversal of activities that are being undone. 75 Undo func(context.Context, vocab.ActivityStreamsUndo) error 76 // Block handles additional side effects for the Block ActivityStreams 77 // type. 78 // 79 // The wrapping callback only ensures the 'Block' has at least one 80 // 'object' entry, but otherwise has no default side effect. It is up 81 // to the wrapped application function to properly enforce the new 82 // blocking behavior. 83 // 84 // Note that go-fed does not federate 'Block' activities received in the 85 // Social Protocol. 86 Block func(context.Context, vocab.ActivityStreamsBlock) error 87 88 // Sidechannel data -- this is set at request handling time. These must 89 // be set before the callbacks are used. 90 91 // db is the Database the SocialWrappedCallbacks should use. It must be 92 // set before calling the callbacks. 93 db Database 94 // outboxIRI is the outboxIRI that is handling this callback. 95 outboxIRI *url.URL 96 // rawActivity is the JSON map literal received when deserializing the 97 // request body. 98 rawActivity map[string]interface{} 99 // clock is the server's clock. 100 clock Clock 101 // newTransport creates a new Transport. 102 newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error) 103 // undeliverable is a sidechannel out, indicating if the handled activity 104 // should not be delivered to a peer. 105 // 106 // Its provided default value will always be used when a custom function 107 // is called. 108 undeliverable *bool 109 } 110 111 // callbacks returns the WrappedCallbacks members into a single interface slice 112 // for use in streams.Resolver callbacks. 113 // 114 // If the given functions have a type that collides with the default behavior, 115 // then disable our default behavior 116 func (w SocialWrappedCallbacks) callbacks(fns []interface{}) []interface{} { 117 enableCreate := true 118 enableUpdate := true 119 enableDelete := true 120 enableFollow := true 121 enableAdd := true 122 enableRemove := true 123 enableLike := true 124 enableUndo := true 125 enableBlock := true 126 for _, fn := range fns { 127 switch fn.(type) { 128 default: 129 continue 130 case func(context.Context, vocab.ActivityStreamsCreate) error: 131 enableCreate = false 132 case func(context.Context, vocab.ActivityStreamsUpdate) error: 133 enableUpdate = false 134 case func(context.Context, vocab.ActivityStreamsDelete) error: 135 enableDelete = false 136 case func(context.Context, vocab.ActivityStreamsFollow) error: 137 enableFollow = false 138 case func(context.Context, vocab.ActivityStreamsAdd) error: 139 enableAdd = false 140 case func(context.Context, vocab.ActivityStreamsRemove) error: 141 enableRemove = false 142 case func(context.Context, vocab.ActivityStreamsLike) error: 143 enableLike = false 144 case func(context.Context, vocab.ActivityStreamsUndo) error: 145 enableUndo = false 146 case func(context.Context, vocab.ActivityStreamsBlock) error: 147 enableBlock = false 148 } 149 } 150 if enableCreate { 151 fns = append(fns, w.create) 152 } 153 if enableUpdate { 154 fns = append(fns, w.update) 155 } 156 if enableDelete { 157 fns = append(fns, w.deleteFn) 158 } 159 if enableFollow { 160 fns = append(fns, w.follow) 161 } 162 if enableAdd { 163 fns = append(fns, w.add) 164 } 165 if enableRemove { 166 fns = append(fns, w.remove) 167 } 168 if enableLike { 169 fns = append(fns, w.like) 170 } 171 if enableUndo { 172 fns = append(fns, w.undo) 173 } 174 if enableBlock { 175 fns = append(fns, w.block) 176 } 177 return fns 178 } 179 180 // create implements the social Create activity side effects. 181 func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error { 182 *w.undeliverable = false 183 op := a.GetActivityStreamsObject() 184 if op == nil || op.Len() == 0 { 185 return ErrObjectRequired 186 } 187 // Obtain all actor IRIs. 188 actors := a.GetActivityStreamsActor() 189 createActorIds := make(map[string]*url.URL) 190 if actors != nil { 191 createActorIds = make(map[string]*url.URL, actors.Len()) 192 for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() { 193 id, err := ToId(iter) 194 if err != nil { 195 return err 196 } 197 createActorIds[id.String()] = id 198 } 199 } 200 // Obtain each object's 'attributedTo' IRIs. 201 objectAttributedToIds := make([]map[string]*url.URL, op.Len()) 202 for i := range objectAttributedToIds { 203 objectAttributedToIds[i] = make(map[string]*url.URL) 204 } 205 for i := 0; i < op.Len(); i++ { 206 t := op.At(i).GetType() 207 attrToer, ok := t.(attributedToer) 208 if !ok { 209 continue 210 } 211 attr := attrToer.GetActivityStreamsAttributedTo() 212 if attr == nil { 213 attr = streams.NewActivityStreamsAttributedToProperty() 214 attrToer.SetActivityStreamsAttributedTo(attr) 215 } 216 for iter := attr.Begin(); iter != attr.End(); iter = iter.Next() { 217 id, err := ToId(iter) 218 if err != nil { 219 return err 220 } 221 objectAttributedToIds[i][id.String()] = id 222 } 223 } 224 // Put all missing actor IRIs onto all object attributedTo properties. 225 for k, v := range createActorIds { 226 for i, attributedToMap := range objectAttributedToIds { 227 if _, ok := attributedToMap[k]; !ok { 228 t := op.At(i).GetType() 229 attrToer, ok := t.(attributedToer) 230 if !ok { 231 continue 232 } 233 attr := attrToer.GetActivityStreamsAttributedTo() 234 attr.AppendIRI(v) 235 } 236 } 237 } 238 // Put all missing object attributedTo IRIs onto the actor property 239 // if there is one. 240 if actors != nil { 241 for _, attributedToMap := range objectAttributedToIds { 242 for k, v := range attributedToMap { 243 if _, ok := createActorIds[k]; !ok { 244 actors.AppendIRI(v) 245 } 246 } 247 } 248 } 249 // Copy over the 'to', 'bto', 'cc', 'bcc', and 'audience' recipients 250 // between the activity and all child objects and vice versa. 251 if err := normalizeRecipients(a); err != nil { 252 return err 253 } 254 // Create anonymous loop function to be able to properly scope the defer 255 // for the database lock at each iteration. 256 loopFn := func(i int) error { 257 obj := op.At(i).GetType() 258 id, err := GetId(obj) 259 if err != nil { 260 return err 261 } 262 var unlock func() 263 unlock, err = w.db.Lock(c, id) 264 if err != nil { 265 return err 266 } 267 defer unlock() 268 if err := w.db.Create(c, obj); err != nil { 269 return err 270 } 271 return nil 272 } 273 // Persist all objects we've created, which will include sensitive 274 // recipients such as 'bcc' and 'bto'. 275 for i := 0; i < op.Len(); i++ { 276 if err := loopFn(i); err != nil { 277 return err 278 } 279 } 280 if w.Create != nil { 281 return w.Create(c, a) 282 } 283 return nil 284 } 285 286 // update implements the social Update activity side effects. 287 func (w SocialWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error { 288 *w.undeliverable = false 289 op := a.GetActivityStreamsObject() 290 if op == nil || op.Len() == 0 { 291 return ErrObjectRequired 292 } 293 // Obtain all object ids, which should be owned by this server. 294 objIds := make([]*url.URL, 0, op.Len()) 295 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 296 id, err := ToId(iter) 297 if err != nil { 298 return err 299 } 300 objIds = append(objIds, id) 301 } 302 // Create anonymous loop function to be able to properly scope the defer 303 // for the database lock at each iteration. 304 loopFn := func(idx int, loopId *url.URL) error { 305 unlock, err := w.db.Lock(c, loopId) 306 if err != nil { 307 return err 308 } 309 defer unlock() 310 t, err := w.db.Get(c, loopId) 311 if err != nil { 312 return err 313 } 314 m, err := t.Serialize() 315 if err != nil { 316 return err 317 } 318 // Copy over new top-level values. 319 objType := op.At(idx).GetType() 320 if objType == nil { 321 return fmt.Errorf("object at index %d is not a literal type value", idx) 322 } 323 newM, err := objType.Serialize() 324 if err != nil { 325 return err 326 } 327 for k, v := range newM { 328 m[k] = v 329 } 330 // Delete top-level values where the raw Activity had nils. 331 for k, v := range w.rawActivity { 332 if _, ok := m[k]; v == nil && ok { 333 delete(m, k) 334 } 335 } 336 newT, err := streams.ToType(c, m) 337 if err != nil { 338 return err 339 } 340 if err = w.db.Update(c, newT); err != nil { 341 return err 342 } 343 return nil 344 } 345 for i, id := range objIds { 346 if err := loopFn(i, id); err != nil { 347 return err 348 } 349 } 350 if w.Update != nil { 351 return w.Update(c, a) 352 } 353 return nil 354 } 355 356 // deleteFn implements the social Delete activity side effects. 357 func (w SocialWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error { 358 *w.undeliverable = false 359 op := a.GetActivityStreamsObject() 360 if op == nil || op.Len() == 0 { 361 return ErrObjectRequired 362 } 363 // Obtain all object ids, which should be owned by this server. 364 objIds := make([]*url.URL, 0, op.Len()) 365 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 366 id, err := ToId(iter) 367 if err != nil { 368 return err 369 } 370 objIds = append(objIds, id) 371 } 372 // Create anonymous loop function to be able to properly scope the defer 373 // for the database lock at each iteration. 374 loopFn := func(idx int, loopId *url.URL) error { 375 unlock, err := w.db.Lock(c, loopId) 376 if err != nil { 377 return err 378 } 379 defer unlock() 380 t, err := w.db.Get(c, loopId) 381 if err != nil { 382 return err 383 } 384 tomb := toTombstone(t, loopId, w.clock.Now()) 385 if err := w.db.Update(c, tomb); err != nil { 386 return err 387 } 388 return nil 389 } 390 for i, id := range objIds { 391 if err := loopFn(i, id); err != nil { 392 return err 393 } 394 } 395 if w.Delete != nil { 396 return w.Delete(c, a) 397 } 398 return nil 399 } 400 401 // follow implements the social Follow activity side effects. 402 func (w SocialWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error { 403 *w.undeliverable = false 404 op := a.GetActivityStreamsObject() 405 if op == nil || op.Len() == 0 { 406 return ErrObjectRequired 407 } 408 if w.Follow != nil { 409 return w.Follow(c, a) 410 } 411 return nil 412 } 413 414 // add implements the social Add activity side effects. 415 func (w SocialWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error { 416 *w.undeliverable = false 417 op := a.GetActivityStreamsObject() 418 if op == nil || op.Len() == 0 { 419 return ErrObjectRequired 420 } 421 target := a.GetActivityStreamsTarget() 422 if target == nil || target.Len() == 0 { 423 return ErrTargetRequired 424 } 425 if err := add(c, op, target, w.db); err != nil { 426 return err 427 } 428 if w.Add != nil { 429 return w.Add(c, a) 430 } 431 return nil 432 } 433 434 // remove implements the social Remove activity side effects. 435 func (w SocialWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error { 436 *w.undeliverable = false 437 op := a.GetActivityStreamsObject() 438 if op == nil || op.Len() == 0 { 439 return ErrObjectRequired 440 } 441 target := a.GetActivityStreamsTarget() 442 if target == nil || target.Len() == 0 { 443 return ErrTargetRequired 444 } 445 if err := remove(c, op, target, w.db); err != nil { 446 return err 447 } 448 if w.Remove != nil { 449 return w.Remove(c, a) 450 } 451 return nil 452 } 453 454 // like implements the social Like activity side effects. 455 func (w SocialWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error { 456 *w.undeliverable = false 457 op := a.GetActivityStreamsObject() 458 if op == nil || op.Len() == 0 { 459 return ErrObjectRequired 460 } 461 // Get this actor's IRI. 462 unlock, err := w.db.Lock(c, w.outboxIRI) 463 if err != nil { 464 return err 465 } 466 // WARNING: Unlock not deferred. 467 actorIRI, err := w.db.ActorForOutbox(c, w.outboxIRI) 468 unlock() // unlock even on error 469 if err != nil { 470 return err 471 } 472 // Unlock must be called by now and every branch above. 473 // 474 // Now obtain this actor's 'liked' collection. 475 unlock, err = w.db.Lock(c, actorIRI) 476 if err != nil { 477 return err 478 } 479 defer unlock() 480 liked, err := w.db.Liked(c, actorIRI) 481 if err != nil { 482 return err 483 } 484 likedItems := liked.GetActivityStreamsItems() 485 if likedItems == nil { 486 likedItems = streams.NewActivityStreamsItemsProperty() 487 liked.SetActivityStreamsItems(likedItems) 488 } 489 for iter := op.Begin(); iter != op.End(); iter = iter.Next() { 490 objId, err := ToId(iter) 491 if err != nil { 492 return err 493 } 494 likedItems.PrependIRI(objId) 495 } 496 err = w.db.Update(c, liked) 497 if err != nil { 498 return err 499 } 500 if w.Like != nil { 501 return w.Like(c, a) 502 } 503 return nil 504 } 505 506 // undo implements the social Undo activity side effects. 507 func (w SocialWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error { 508 *w.undeliverable = false 509 op := a.GetActivityStreamsObject() 510 if op == nil || op.Len() == 0 { 511 return ErrObjectRequired 512 } 513 actors := a.GetActivityStreamsActor() 514 if err := mustHaveActivityActorsMatchObjectActors(c, actors, op, w.newTransport, w.outboxIRI); err != nil { 515 return err 516 } 517 if w.Undo != nil { 518 return w.Undo(c, a) 519 } 520 return nil 521 } 522 523 // block implements the social Block activity side effects. 524 func (w SocialWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error { 525 *w.undeliverable = true 526 op := a.GetActivityStreamsObject() 527 if op == nil || op.Len() == 0 { 528 return ErrObjectRequired 529 } 530 if w.Block != nil { 531 return w.Block(c, a) 532 } 533 return nil 534 }