workers.go (3151B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package workers 19 20 import ( 21 "context" 22 "log" 23 "runtime" 24 25 "codeberg.org/gruf/go-runners" 26 "codeberg.org/gruf/go-sched" 27 "github.com/superseriousbusiness/gotosocial/internal/messages" 28 ) 29 30 type Workers struct { 31 // Main task scheduler instance. 32 Scheduler sched.Scheduler 33 34 // ClientAPI provides a worker pool that handles both 35 // incoming client actions, and our own side-effects. 36 ClientAPI runners.WorkerPool 37 38 // Federator provides a worker pool that handles both 39 // incoming federated actions, and our own side-effects. 40 Federator runners.WorkerPool 41 42 // Enqueue functions for clientAPI / federator worker pools, 43 // these are pointers to Processor{}.Enqueue___() msg functions. 44 // This prevents dependency cycling as Processor depends on Workers. 45 EnqueueClientAPI func(context.Context, ...messages.FromClientAPI) 46 EnqueueFederator func(context.Context, ...messages.FromFederator) 47 48 // Media manager worker pools. 49 Media runners.WorkerPool 50 51 // prevent pass-by-value. 52 _ nocopy 53 } 54 55 // Start will start all of the contained worker pools (and global scheduler). 56 func (w *Workers) Start() { 57 // Get currently set GOMAXPROCS. 58 maxprocs := runtime.GOMAXPROCS(0) 59 60 tryUntil("starting scheduler", 5, func() bool { 61 return w.Scheduler.Start(nil) 62 }) 63 64 tryUntil("starting client API workerpool", 5, func() bool { 65 return w.ClientAPI.Start(4*maxprocs, 400*maxprocs) 66 }) 67 68 tryUntil("starting federator workerpool", 5, func() bool { 69 return w.Federator.Start(4*maxprocs, 400*maxprocs) 70 }) 71 72 tryUntil("starting media workerpool", 5, func() bool { 73 return w.Media.Start(8*maxprocs, 80*maxprocs) 74 }) 75 } 76 77 // Stop will stop all of the contained worker pools (and global scheduler). 78 func (w *Workers) Stop() { 79 tryUntil("stopping scheduler", 5, w.Scheduler.Stop) 80 tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop) 81 tryUntil("stopping federator workerpool", 5, w.Federator.Stop) 82 tryUntil("stopping media workerpool", 5, w.Media.Stop) 83 } 84 85 // nocopy when embedded will signal linter to 86 // error on pass-by-value of parent struct. 87 type nocopy struct{} 88 89 func (*nocopy) Lock() {} 90 91 func (*nocopy) Unlock() {} 92 93 // tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'. 94 func tryUntil(msg string, count int, do func() bool) { 95 for i := 0; i < count; i++ { 96 if do() { 97 return 98 } 99 } 100 log.Panicf("failed %s after %d tries", msg, count) 101 }