deliver.go (3517B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package transport 19 20 import ( 21 "context" 22 "net/http" 23 "net/url" 24 "sync" 25 26 "codeberg.org/gruf/go-byteutil" 27 apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" 28 "github.com/superseriousbusiness/gotosocial/internal/config" 29 "github.com/superseriousbusiness/gotosocial/internal/gtserror" 30 ) 31 32 func (t *transport) BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error { 33 var ( 34 // errs accumulates errors received during 35 // attempted delivery by deliverer routines. 36 errs gtserror.MultiError 37 38 // wait blocks until all sender 39 // routines have returned. 40 wait sync.WaitGroup 41 42 // mutex protects 'recipients' and 43 // 'errs' for concurrent access. 44 mutex sync.Mutex 45 46 // Get current instance host info. 47 domain = config.GetAccountDomain() 48 host = config.GetHost() 49 ) 50 51 // Block on expect no. senders. 52 wait.Add(t.controller.senders) 53 54 for i := 0; i < t.controller.senders; i++ { 55 go func() { 56 // Mark returned. 57 defer wait.Done() 58 59 for { 60 // Acquire lock. 61 mutex.Lock() 62 63 if len(recipients) == 0 { 64 // Reached end. 65 mutex.Unlock() 66 return 67 } 68 69 // Pop next recipient. 70 i := len(recipients) - 1 71 to := recipients[i] 72 recipients = recipients[:i] 73 74 // Done with lock. 75 mutex.Unlock() 76 77 // Skip delivery to recipient if it is "us". 78 if to.Host == host || to.Host == domain { 79 continue 80 } 81 82 // Attempt to deliver data to recipient. 83 if err := t.deliver(ctx, b, to); err != nil { 84 mutex.Lock() // safely append err to accumulator. 85 errs.Appendf("error delivering to %s: %v", to, err) 86 mutex.Unlock() 87 } 88 } 89 }() 90 } 91 92 // Wait for finish. 93 wait.Wait() 94 95 // Return combined err. 96 return errs.Combine() 97 } 98 99 func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error { 100 // if 'to' host is our own, skip as we don't need to deliver to ourselves... 101 if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() { 102 return nil 103 } 104 105 // Deliver data to recipient. 106 return t.deliver(ctx, b, to) 107 } 108 109 func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error { 110 url := to.String() 111 112 // Use rewindable bytes reader for body. 113 var body byteutil.ReadNopCloser 114 body.Reset(b) 115 116 req, err := http.NewRequestWithContext(ctx, "POST", url, &body) 117 if err != nil { 118 return err 119 } 120 121 req.Header.Add("Content-Type", string(apiutil.AppActivityLDJSON)) 122 req.Header.Add("Accept-Charset", "utf-8") 123 req.Header.Set("Host", to.Host) 124 125 rsp, err := t.POST(req, b) 126 if err != nil { 127 return err 128 } 129 defer rsp.Body.Close() 130 131 if code := rsp.StatusCode; code != http.StatusOK && 132 code != http.StatusCreated && code != http.StatusAccepted { 133 return gtserror.NewFromResponse(rsp) 134 } 135 136 return nil 137 }