processor.go (5671B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package processing 19 20 import ( 21 "context" 22 23 "github.com/superseriousbusiness/gotosocial/internal/email" 24 "github.com/superseriousbusiness/gotosocial/internal/federation" 25 "github.com/superseriousbusiness/gotosocial/internal/log" 26 mm "github.com/superseriousbusiness/gotosocial/internal/media" 27 "github.com/superseriousbusiness/gotosocial/internal/messages" 28 "github.com/superseriousbusiness/gotosocial/internal/oauth" 29 "github.com/superseriousbusiness/gotosocial/internal/processing/account" 30 "github.com/superseriousbusiness/gotosocial/internal/processing/admin" 31 "github.com/superseriousbusiness/gotosocial/internal/processing/fedi" 32 "github.com/superseriousbusiness/gotosocial/internal/processing/list" 33 "github.com/superseriousbusiness/gotosocial/internal/processing/media" 34 "github.com/superseriousbusiness/gotosocial/internal/processing/report" 35 "github.com/superseriousbusiness/gotosocial/internal/processing/search" 36 "github.com/superseriousbusiness/gotosocial/internal/processing/status" 37 "github.com/superseriousbusiness/gotosocial/internal/processing/stream" 38 "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" 39 "github.com/superseriousbusiness/gotosocial/internal/processing/user" 40 "github.com/superseriousbusiness/gotosocial/internal/state" 41 "github.com/superseriousbusiness/gotosocial/internal/typeutils" 42 "github.com/superseriousbusiness/gotosocial/internal/visibility" 43 ) 44 45 type Processor struct { 46 federator federation.Federator 47 tc typeutils.TypeConverter 48 oauthServer oauth.Server 49 mediaManager *mm.Manager 50 state *state.State 51 emailSender email.Sender 52 filter *visibility.Filter 53 54 /* 55 SUB-PROCESSORS 56 */ 57 58 account account.Processor 59 admin admin.Processor 60 fedi fedi.Processor 61 list list.Processor 62 media media.Processor 63 report report.Processor 64 search search.Processor 65 status status.Processor 66 stream stream.Processor 67 timeline timeline.Processor 68 user user.Processor 69 } 70 71 func (p *Processor) Account() *account.Processor { 72 return &p.account 73 } 74 75 func (p *Processor) Admin() *admin.Processor { 76 return &p.admin 77 } 78 79 func (p *Processor) Fedi() *fedi.Processor { 80 return &p.fedi 81 } 82 83 func (p *Processor) List() *list.Processor { 84 return &p.list 85 } 86 87 func (p *Processor) Media() *media.Processor { 88 return &p.media 89 } 90 91 func (p *Processor) Report() *report.Processor { 92 return &p.report 93 } 94 95 func (p *Processor) Search() *search.Processor { 96 return &p.search 97 } 98 99 func (p *Processor) Status() *status.Processor { 100 return &p.status 101 } 102 103 func (p *Processor) Stream() *stream.Processor { 104 return &p.stream 105 } 106 107 func (p *Processor) Timeline() *timeline.Processor { 108 return &p.timeline 109 } 110 111 func (p *Processor) User() *user.Processor { 112 return &p.user 113 } 114 115 // NewProcessor returns a new Processor. 116 func NewProcessor( 117 tc typeutils.TypeConverter, 118 federator federation.Federator, 119 oauthServer oauth.Server, 120 mediaManager *mm.Manager, 121 state *state.State, 122 emailSender email.Sender, 123 ) *Processor { 124 parseMentionFunc := GetParseMentionFunc(state.DB, federator) 125 126 filter := visibility.NewFilter(state) 127 128 processor := &Processor{ 129 federator: federator, 130 tc: tc, 131 oauthServer: oauthServer, 132 mediaManager: mediaManager, 133 state: state, 134 filter: filter, 135 emailSender: emailSender, 136 } 137 138 // Instantiate sub processors. 139 processor.account = account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc) 140 processor.admin = admin.New(state, tc, mediaManager, federator.TransportController(), emailSender) 141 processor.fedi = fedi.New(state, tc, federator, filter) 142 processor.list = list.New(state, tc) 143 processor.media = media.New(state, tc, mediaManager, federator.TransportController()) 144 processor.report = report.New(state, tc) 145 processor.timeline = timeline.New(state, tc, filter) 146 processor.search = search.New(state, federator, tc, filter) 147 processor.status = status.New(state, federator, tc, filter, parseMentionFunc) 148 processor.stream = stream.New(state, oauthServer) 149 processor.user = user.New(state, emailSender) 150 151 return processor 152 } 153 154 func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) { 155 log.Trace(ctx, "enqueuing") 156 _ = p.state.Workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) { 157 for _, msg := range msgs { 158 log.Trace(ctx, "processing: %+v", msg) 159 if err := p.ProcessFromClientAPI(ctx, msg); err != nil { 160 log.Errorf(ctx, "error processing client API message: %v", err) 161 } 162 } 163 }) 164 } 165 166 func (p *Processor) EnqueueFederator(ctx context.Context, msgs ...messages.FromFederator) { 167 log.Trace(ctx, "enqueuing") 168 _ = p.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { 169 for _, msg := range msgs { 170 log.Trace(ctx, "processing: %+v", msg) 171 if err := p.ProcessFromFederator(ctx, msg); err != nil { 172 log.Errorf(ctx, "error processing federator message: %v", err) 173 } 174 } 175 }) 176 }