streaming_test.go (7402B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package streaming_test 19 20 import ( 21 "bufio" 22 "errors" 23 "fmt" 24 "io/ioutil" 25 "net" 26 "net/http" 27 "net/http/httptest" 28 "testing" 29 "time" 30 31 "github.com/gin-gonic/gin" 32 "github.com/stretchr/testify/suite" 33 "github.com/superseriousbusiness/gotosocial/internal/api/client/streaming" 34 "github.com/superseriousbusiness/gotosocial/internal/db" 35 "github.com/superseriousbusiness/gotosocial/internal/email" 36 "github.com/superseriousbusiness/gotosocial/internal/federation" 37 "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" 38 "github.com/superseriousbusiness/gotosocial/internal/media" 39 "github.com/superseriousbusiness/gotosocial/internal/oauth" 40 "github.com/superseriousbusiness/gotosocial/internal/processing" 41 "github.com/superseriousbusiness/gotosocial/internal/state" 42 "github.com/superseriousbusiness/gotosocial/internal/storage" 43 "github.com/superseriousbusiness/gotosocial/internal/typeutils" 44 "github.com/superseriousbusiness/gotosocial/internal/visibility" 45 "github.com/superseriousbusiness/gotosocial/testrig" 46 ) 47 48 type StreamingTestSuite struct { 49 // standard suite interfaces 50 suite.Suite 51 db db.DB 52 tc typeutils.TypeConverter 53 mediaManager *media.Manager 54 federator federation.Federator 55 emailSender email.Sender 56 processor *processing.Processor 57 storage *storage.Driver 58 state state.State 59 60 // standard suite models 61 testTokens map[string]*gtsmodel.Token 62 testClients map[string]*gtsmodel.Client 63 testApplications map[string]*gtsmodel.Application 64 testUsers map[string]*gtsmodel.User 65 testAccounts map[string]*gtsmodel.Account 66 testAttachments map[string]*gtsmodel.MediaAttachment 67 testStatuses map[string]*gtsmodel.Status 68 testFollows map[string]*gtsmodel.Follow 69 70 // module being tested 71 streamingModule *streaming.Module 72 } 73 74 func (suite *StreamingTestSuite) SetupSuite() { 75 suite.testTokens = testrig.NewTestTokens() 76 suite.testClients = testrig.NewTestClients() 77 suite.testApplications = testrig.NewTestApplications() 78 suite.testUsers = testrig.NewTestUsers() 79 suite.testAccounts = testrig.NewTestAccounts() 80 suite.testAttachments = testrig.NewTestAttachments() 81 suite.testStatuses = testrig.NewTestStatuses() 82 suite.testFollows = testrig.NewTestFollows() 83 } 84 85 func (suite *StreamingTestSuite) SetupTest() { 86 suite.state.Caches.Init() 87 testrig.StartWorkers(&suite.state) 88 89 testrig.InitTestConfig() 90 testrig.InitTestLog() 91 92 suite.db = testrig.NewTestDB(&suite.state) 93 suite.state.DB = suite.db 94 suite.storage = testrig.NewInMemoryStorage() 95 suite.state.Storage = suite.storage 96 97 suite.tc = testrig.NewTestTypeConverter(suite.db) 98 99 testrig.StartTimelines( 100 &suite.state, 101 visibility.NewFilter(&suite.state), 102 suite.tc, 103 ) 104 105 testrig.StandardDBSetup(suite.db, nil) 106 testrig.StandardStorageSetup(suite.storage, "../../../../testrig/media") 107 108 suite.mediaManager = testrig.NewTestMediaManager(&suite.state) 109 suite.federator = testrig.NewTestFederator(&suite.state, testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../../testrig/media")), suite.mediaManager) 110 suite.emailSender = testrig.NewEmailSender("../../../../web/template/", nil) 111 suite.processor = testrig.NewTestProcessor(&suite.state, suite.federator, suite.emailSender, suite.mediaManager) 112 suite.streamingModule = streaming.New(suite.processor, 1, 4096) 113 } 114 115 func (suite *StreamingTestSuite) TearDownTest() { 116 testrig.StandardDBTeardown(suite.db) 117 testrig.StandardStorageTeardown(suite.storage) 118 testrig.StopWorkers(&suite.state) 119 } 120 121 // Addr is a fake network interface which implements the net.Addr interface 122 type Addr struct { 123 NetworkString string 124 AddrString string 125 } 126 127 func (a Addr) Network() string { 128 return a.NetworkString 129 } 130 131 func (a Addr) String() string { 132 return a.AddrString 133 } 134 135 type connTester struct { 136 deadline time.Time 137 writes int 138 } 139 140 func (c *connTester) Read(b []byte) (n int, err error) { 141 return 0, nil 142 } 143 144 func (c *connTester) SetDeadline(t time.Time) error { 145 c.deadline = t 146 return nil 147 } 148 149 func (c *connTester) SetReadDeadline(t time.Time) error { 150 return nil 151 } 152 153 func (c *connTester) SetWriteDeadline(t time.Time) error { 154 return nil 155 } 156 157 func (c *connTester) Write(p []byte) (int, error) { 158 c.writes++ 159 if c.writes > 1 { 160 return 0, errors.New("timeout") 161 } 162 return 0, nil 163 } 164 165 func (c *connTester) Close() error { 166 return nil 167 } 168 169 func (c *connTester) LocalAddr() net.Addr { 170 return Addr{ 171 NetworkString: "tcp", 172 AddrString: "127.0.0.1", 173 } 174 } 175 176 func (c *connTester) RemoteAddr() net.Addr { 177 return Addr{ 178 NetworkString: "tcp", 179 AddrString: "127.0.0.1", 180 } 181 } 182 183 type TestResponseRecorder struct { 184 *httptest.ResponseRecorder 185 w gin.ResponseWriter 186 closeChannel chan bool 187 } 188 189 func (r *TestResponseRecorder) CloseNotify() <-chan bool { 190 return r.closeChannel 191 } 192 193 func (r *TestResponseRecorder) closeClient() { 194 r.closeChannel <- true 195 } 196 197 func (r *TestResponseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) { 198 conn := &connTester{ 199 writes: 0, 200 } 201 brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) 202 return conn, brw, nil 203 } 204 205 func CreateTestResponseRecorder() *TestResponseRecorder { 206 w := new(gin.ResponseWriter) 207 return &TestResponseRecorder{ 208 httptest.NewRecorder(), 209 *w, 210 make(chan bool, 1), 211 } 212 } 213 214 func (suite *StreamingTestSuite) TestSecurityHeader() { 215 // set up the context for the request 216 t := suite.testTokens["local_account_1"] 217 oauthToken := oauth.DBTokenToToken(t) 218 recorder := CreateTestResponseRecorder() 219 ctx, _ := testrig.CreateGinTestContext(recorder, nil) 220 ctx.Set(oauth.SessionAuthorizedApplication, suite.testApplications["application_1"]) 221 ctx.Set(oauth.SessionAuthorizedToken, oauthToken) 222 ctx.Set(oauth.SessionAuthorizedUser, suite.testUsers["local_account_1"]) 223 ctx.Set(oauth.SessionAuthorizedAccount, suite.testAccounts["local_account_1"]) 224 ctx.Request = httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://localhost:8080/%s?stream=user", streaming.BasePath), nil) // the endpoint we're hitting 225 ctx.Request.Header.Set("accept", "application/json") 226 ctx.Request.Header.Set(streaming.AccessTokenHeader, oauthToken.Access) 227 ctx.Request.Header.Set("Connection", "upgrade") 228 ctx.Request.Header.Set("Upgrade", "websocket") 229 ctx.Request.Header.Set("Sec-Websocket-Version", "13") 230 ctx.Request.Header.Set("Sec-Websocket-Key", "abcd") 231 232 suite.streamingModule.StreamGETHandler(ctx) 233 234 // check response 235 suite.EqualValues(http.StatusOK, recorder.Code) 236 237 result := recorder.Result() 238 defer result.Body.Close() 239 _, err := ioutil.ReadAll(result.Body) 240 suite.NoError(err) 241 } 242 243 func TestStreamingTestSuite(t *testing.T) { 244 suite.Run(t, new(StreamingTestSuite)) 245 }