gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

deliver.go (3517B)


      1 // GoToSocial
      2 // Copyright (C) GoToSocial Authors admin@gotosocial.org
      3 // SPDX-License-Identifier: AGPL-3.0-or-later
      4 //
      5 // This program is free software: you can redistribute it and/or modify
      6 // it under the terms of the GNU Affero General Public License as published by
      7 // the Free Software Foundation, either version 3 of the License, or
      8 // (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU Affero General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU Affero General Public License
     16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18 package transport
     19 
     20 import (
     21 	"context"
     22 	"net/http"
     23 	"net/url"
     24 	"sync"
     25 
     26 	"codeberg.org/gruf/go-byteutil"
     27 	apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
     28 	"github.com/superseriousbusiness/gotosocial/internal/config"
     29 	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
     30 )
     31 
     32 func (t *transport) BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error {
     33 	var (
     34 		// errs accumulates errors received during
     35 		// attempted delivery by deliverer routines.
     36 		errs gtserror.MultiError
     37 
     38 		// wait blocks until all sender
     39 		// routines have returned.
     40 		wait sync.WaitGroup
     41 
     42 		// mutex protects 'recipients' and
     43 		// 'errs' for concurrent access.
     44 		mutex sync.Mutex
     45 
     46 		// Get current instance host info.
     47 		domain = config.GetAccountDomain()
     48 		host   = config.GetHost()
     49 	)
     50 
     51 	// Block on expect no. senders.
     52 	wait.Add(t.controller.senders)
     53 
     54 	for i := 0; i < t.controller.senders; i++ {
     55 		go func() {
     56 			// Mark returned.
     57 			defer wait.Done()
     58 
     59 			for {
     60 				// Acquire lock.
     61 				mutex.Lock()
     62 
     63 				if len(recipients) == 0 {
     64 					// Reached end.
     65 					mutex.Unlock()
     66 					return
     67 				}
     68 
     69 				// Pop next recipient.
     70 				i := len(recipients) - 1
     71 				to := recipients[i]
     72 				recipients = recipients[:i]
     73 
     74 				// Done with lock.
     75 				mutex.Unlock()
     76 
     77 				// Skip delivery to recipient if it is "us".
     78 				if to.Host == host || to.Host == domain {
     79 					continue
     80 				}
     81 
     82 				// Attempt to deliver data to recipient.
     83 				if err := t.deliver(ctx, b, to); err != nil {
     84 					mutex.Lock() // safely append err to accumulator.
     85 					errs.Appendf("error delivering to %s: %v", to, err)
     86 					mutex.Unlock()
     87 				}
     88 			}
     89 		}()
     90 	}
     91 
     92 	// Wait for finish.
     93 	wait.Wait()
     94 
     95 	// Return combined err.
     96 	return errs.Combine()
     97 }
     98 
     99 func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error {
    100 	// if 'to' host is our own, skip as we don't need to deliver to ourselves...
    101 	if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() {
    102 		return nil
    103 	}
    104 
    105 	// Deliver data to recipient.
    106 	return t.deliver(ctx, b, to)
    107 }
    108 
    109 func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error {
    110 	url := to.String()
    111 
    112 	// Use rewindable bytes reader for body.
    113 	var body byteutil.ReadNopCloser
    114 	body.Reset(b)
    115 
    116 	req, err := http.NewRequestWithContext(ctx, "POST", url, &body)
    117 	if err != nil {
    118 		return err
    119 	}
    120 
    121 	req.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON))
    122 	req.Header.Add("Accept-Charset", "utf-8")
    123 	req.Header.Set("Host", to.Host)
    124 
    125 	rsp, err := t.POST(req, b)
    126 	if err != nil {
    127 		return err
    128 	}
    129 	defer rsp.Body.Close()
    130 
    131 	if code := rsp.StatusCode; code != http.StatusOK &&
    132 		code != http.StatusCreated && code != http.StatusAccepted {
    133 		return gtserror.NewFromResponse(rsp)
    134 	}
    135 
    136 	return nil
    137 }