commit 969c194fcd8ea277a49244a6bf00ed92f4df4e09
parent c951ba1d238f63925decab8f9294e52239887d0f
Author: tobi <31960611+tsmethurst@users.noreply.github.com>
Date: Sat, 27 Aug 2022 12:00:19 +0200
[bugfix] Relax outgoing http request queue (#760)
* add request queue trace logging
* fix misleading wording
* implement request slots per host per method
* undo formatting change (?)
* remove gratuitous trace logging
* rename done -> release
avoids confusion with ctx.Done
Diffstat:
3 files changed, 187 insertions(+), 6 deletions(-)
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go
@@ -80,7 +80,7 @@ type Config struct {
// is available (context channels still respected)
type Client struct {
client http.Client
- queue chan struct{}
+ rc *requestQueue
bmax int64
}
@@ -118,7 +118,9 @@ func New(cfg Config) *Client {
// Prepare client fields
c.bmax = cfg.MaxBodySize
- c.queue = make(chan struct{}, cfg.MaxOpenConns)
+ c.rc = &requestQueue{
+ maxOpenConns: cfg.MaxOpenConns,
+ }
c.client.Timeout = cfg.Timeout
// Set underlying HTTP client roundtripper
@@ -143,13 +145,18 @@ func New(cfg Config) *Client {
// as the standard http.Client{}.Do() implementation except that response body will
// be wrapped by an io.LimitReader() to limit response body sizes.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
+ // request a spot in the wait queue...
+ wait, release := c.rc.getWaitSpot(req.Host, req.Method)
+
+ // ... and wait our turn
select {
- // Request context cancelled
case <-req.Context().Done():
+ // the request was canceled before we
+ // got to our turn: no need to release
return nil, req.Context().Err()
+ case wait <- struct{}{}:
+ // it's our turn!
- // Slot in queue acquired
- case c.queue <- struct{}{}:
// NOTE:
// Ideally here we would set the slot release to happen either
// on error return, or via callback from the response body closer.
@@ -160,7 +167,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
// that connections may not be closed until response body is closed.
// The current implementation will reduce the viability of denial of
// service attacks, but if there are future issues heed this advice :]
- defer func() { <-c.queue }()
+ defer release()
}
// Firstly, ensure this is a valid request
diff --git a/internal/httpclient/queue.go b/internal/httpclient/queue.go
@@ -0,0 +1,68 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package httpclient
+
+import (
+ "strings"
+ "sync"
+
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+)
+
+type requestQueue struct {
+ hostQueues sync.Map // map of `hostQueue`
+ maxOpenConns int // max open conns per host per request method
+}
+
+type hostQueue struct {
+ slotsByMethod sync.Map
+}
+
+// getWaitSpot returns a wait channel and release function for http clients
+// that want to do requests politely: that is, wait for their turn.
+//
+// To wait, a caller should do a select on an attempted insert into the
+// returned wait channel. Once the insert succeeds, then the caller should
+// proceed with the http request that pertains to the given host + method.
+// It doesn't matter what's put into the wait channel, just any interface{}.
+//
+// When the caller is finished with their http request, they should free up the
+// slot they were occupying in the wait queue, by calling the release function.
+//
+// The reason for the caller needing to provide host and method, is that each
+// remote host has a separate wait queue, and there's a separate wait queue
+// per method for that host as well. This ensures that outgoing requests can still
+// proceed for others hosts and methods while other requests are undergoing,
+// while also preventing one host from being spammed with, for example, a
+// shitload of GET requests all at once.
+func (rc *requestQueue) getWaitSpot(host string, method string) (wait chan<- interface{}, release func()) {
+ hostQueueI, _ := rc.hostQueues.LoadOrStore(host, new(hostQueue))
+ hostQueue, ok := hostQueueI.(*hostQueue)
+ if !ok {
+ log.Panic("hostQueueI was not a *hostQueue")
+ }
+
+ waitSlotI, _ := hostQueue.slotsByMethod.LoadOrStore(strings.ToUpper(method), make(chan interface{}, rc.maxOpenConns))
+ methodQueue, ok := waitSlotI.(chan interface{})
+ if !ok {
+ log.Panic("waitSlotI was not a chan interface{}")
+ }
+
+ return methodQueue, func() { <-methodQueue }
+}
diff --git a/internal/httpclient/queue_test.go b/internal/httpclient/queue_test.go
@@ -0,0 +1,106 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package httpclient
+
+import (
+ "net/http"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+)
+
+type QueueTestSuite struct {
+ suite.Suite
+}
+
+func (suite *QueueTestSuite) TestQueue() {
+ maxOpenConns := 5
+ waitTimeout := 1 * time.Second
+
+ rc := &requestQueue{
+ maxOpenConns: maxOpenConns,
+ }
+
+ // fill all the open connections
+ var release func()
+ for i, n := range make([]interface{}, maxOpenConns) {
+ w, r := rc.getWaitSpot("example.org", http.MethodPost)
+ w <- n
+ if i == maxOpenConns-1 {
+ // save the last release function
+ release = r
+ }
+ }
+
+ // try to wait again for the same host/method combo, it should timeout
+ waitAgain, _ := rc.getWaitSpot("example.org", "post")
+
+ select {
+ case waitAgain <- struct{}{}:
+ suite.FailNow("first wait did not time out")
+ case <-time.After(waitTimeout):
+ break
+ }
+
+ // now close the final release that we derived earlier
+ release()
+
+ // try waiting again, it should work this time
+ select {
+ case waitAgain <- struct{}{}:
+ break
+ case <-time.After(waitTimeout):
+ suite.FailNow("second wait timed out")
+ }
+
+ // the POST queue is now sitting on full
+ suite.Len(waitAgain, maxOpenConns)
+
+ // we should still be able to make a GET for the same host though
+ getWait, getRelease := rc.getWaitSpot("example.org", http.MethodGet)
+ select {
+ case getWait <- struct{}{}:
+ break
+ case <-time.After(waitTimeout):
+ suite.FailNow("get wait timed out")
+ }
+
+ // the GET queue has one request waiting
+ suite.Len(getWait, 1)
+ // clear it...
+ getRelease()
+ suite.Empty(getWait)
+
+ // even though the POST queue for example.org is full, we
+ // should still be able to make a POST request to another host :)
+ waitForAnotherHost, _ := rc.getWaitSpot("somewhere.else", http.MethodPost)
+ select {
+ case waitForAnotherHost <- struct{}{}:
+ break
+ case <-time.After(waitTimeout):
+ suite.FailNow("get wait timed out")
+ }
+
+ suite.Len(waitForAnotherHost, 1)
+}
+
+func TestQueueTestSuite(t *testing.T) {
+ suite.Run(t, &QueueTestSuite{})
+}