manager.go (13356B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package media 19 20 import ( 21 "context" 22 "errors" 23 "fmt" 24 "io" 25 "time" 26 27 "codeberg.org/gruf/go-iotools" 28 "codeberg.org/gruf/go-runners" 29 "codeberg.org/gruf/go-sched" 30 "codeberg.org/gruf/go-store/v2/storage" 31 "github.com/superseriousbusiness/gotosocial/internal/config" 32 "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" 33 "github.com/superseriousbusiness/gotosocial/internal/id" 34 "github.com/superseriousbusiness/gotosocial/internal/log" 35 "github.com/superseriousbusiness/gotosocial/internal/state" 36 "github.com/superseriousbusiness/gotosocial/internal/uris" 37 ) 38 39 var SupportedMIMETypes = []string{ 40 mimeImageJpeg, 41 mimeImageGif, 42 mimeImagePng, 43 mimeImageWebp, 44 mimeVideoMp4, 45 } 46 47 var SupportedEmojiMIMETypes = []string{ 48 mimeImageGif, 49 mimeImagePng, 50 } 51 52 type Manager struct { 53 state *state.State 54 } 55 56 // NewManager returns a media manager with the given db and underlying storage. 57 // 58 // A worker pool will also be initialized for the manager, to ensure that only 59 // a limited number of media will be processed in parallel. The numbers of workers 60 // is determined from the $GOMAXPROCS environment variable (usually no. CPU cores). 61 // See internal/concurrency.NewWorkerPool() documentation for further information. 62 func NewManager(state *state.State) *Manager { 63 m := &Manager{state: state} 64 scheduleCleanupJobs(m) 65 return m 66 } 67 68 // PreProcessMedia begins the process of decoding and storing the given data as an attachment. 69 // It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting 70 // the finished media, thumbnail, attachment, etc. 71 // 72 // data should be a function that the media manager can call to return a reader containing the media data. 73 // 74 // accountID should be the account that the media belongs to. 75 // 76 // ai is optional and can be nil. Any additional information about the attachment provided will be put in the database. 77 // 78 // Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed. 79 func (m *Manager) PreProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { 80 id, err := id.NewRandomULID() 81 if err != nil { 82 return nil, err 83 } 84 85 avatar := false 86 header := false 87 cached := false 88 now := time.Now() 89 90 // populate initial fields on the media attachment -- some of these will be overwritten as we proceed 91 attachment := >smodel.MediaAttachment{ 92 ID: id, 93 CreatedAt: now, 94 UpdatedAt: now, 95 StatusID: "", 96 URL: "", // we don't know yet because it depends on the uncalled DataFunc 97 RemoteURL: "", 98 Type: gtsmodel.FileTypeUnknown, // we don't know yet because it depends on the uncalled DataFunc 99 FileMeta: gtsmodel.FileMeta{}, 100 AccountID: accountID, 101 Description: "", 102 ScheduledStatusID: "", 103 Blurhash: "", 104 Processing: gtsmodel.ProcessingStatusReceived, 105 File: gtsmodel.File{UpdatedAt: now}, 106 Thumbnail: gtsmodel.Thumbnail{UpdatedAt: now}, 107 Avatar: &avatar, 108 Header: &header, 109 Cached: &cached, 110 } 111 112 // check if we have additional info to add to the attachment, 113 // and overwrite some of the attachment fields if so 114 if ai != nil { 115 if ai.CreatedAt != nil { 116 attachment.CreatedAt = *ai.CreatedAt 117 } 118 119 if ai.StatusID != nil { 120 attachment.StatusID = *ai.StatusID 121 } 122 123 if ai.RemoteURL != nil { 124 attachment.RemoteURL = *ai.RemoteURL 125 } 126 127 if ai.Description != nil { 128 attachment.Description = *ai.Description 129 } 130 131 if ai.ScheduledStatusID != nil { 132 attachment.ScheduledStatusID = *ai.ScheduledStatusID 133 } 134 135 if ai.Blurhash != nil { 136 attachment.Blurhash = *ai.Blurhash 137 } 138 139 if ai.Avatar != nil { 140 attachment.Avatar = ai.Avatar 141 } 142 143 if ai.Header != nil { 144 attachment.Header = ai.Header 145 } 146 147 if ai.FocusX != nil { 148 attachment.FileMeta.Focus.X = *ai.FocusX 149 } 150 151 if ai.FocusY != nil { 152 attachment.FileMeta.Focus.Y = *ai.FocusY 153 } 154 } 155 156 processingMedia := &ProcessingMedia{ 157 media: attachment, 158 dataFn: data, 159 mgr: m, 160 } 161 162 return processingMedia, nil 163 } 164 165 // PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote. 166 // 167 // Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed. 168 func (m *Manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, attachmentID string) (*ProcessingMedia, error) { 169 // get the existing attachment from database. 170 attachment, err := m.state.DB.GetAttachmentByID(ctx, attachmentID) 171 if err != nil { 172 return nil, err 173 } 174 175 processingMedia := &ProcessingMedia{ 176 media: attachment, 177 dataFn: data, 178 recache: true, // indicate it's a recache 179 mgr: m, 180 } 181 182 return processingMedia, nil 183 } 184 185 // ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue. 186 func (m *Manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { 187 // Create a new processing media object for this media request. 188 media, err := m.PreProcessMedia(ctx, data, accountID, ai) 189 if err != nil { 190 return nil, err 191 } 192 193 // Attempt to add this media processing item to the worker queue. 194 _ = m.state.Workers.Media.MustEnqueueCtx(ctx, media.Process) 195 196 return media, nil 197 } 198 199 // PreProcessEmoji begins the process of decoding and storing the given data as an emoji. 200 // It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting 201 // the finished media, thumbnail, attachment, etc. 202 // 203 // data should be a function that the media manager can call to return a reader containing the emoji data. 204 // 205 // shortcode should be the emoji shortcode without the ':'s around it. 206 // 207 // id is the database ID that should be used to store the emoji. 208 // 209 // uri is the ActivityPub URI/ID of the emoji. 210 // 211 // ai is optional and can be nil. Any additional information about the emoji provided will be put in the database. 212 // 213 // Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed. 214 func (m *Manager) PreProcessEmoji(ctx context.Context, data DataFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { 215 instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "") 216 if err != nil { 217 return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err) 218 } 219 220 var ( 221 newPathID string 222 emoji *gtsmodel.Emoji 223 now = time.Now() 224 ) 225 226 if refresh { 227 // Look for existing emoji by given ID. 228 emoji, err = m.state.DB.GetEmojiByID(ctx, emojiID) 229 if err != nil { 230 return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err) 231 } 232 233 // if this is a refresh, we will end up with new images 234 // stored for this emoji, so we can use an io.Closer callback 235 // to perform clean up of the old images from storage 236 originalData := data 237 originalImagePath := emoji.ImagePath 238 originalImageStaticPath := emoji.ImageStaticPath 239 data = func(ctx context.Context) (io.ReadCloser, int64, error) { 240 // Call original data func. 241 rc, sz, err := originalData(ctx) 242 if err != nil { 243 return nil, 0, err 244 } 245 246 // Wrap closer to cleanup old data. 247 c := iotools.CloserCallback(rc, func() { 248 if err := m.state.Storage.Delete(ctx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) { 249 log.Errorf(ctx, "error removing old emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err) 250 } 251 252 if err := m.state.Storage.Delete(ctx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) { 253 log.Errorf(ctx, "error removing old static emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err) 254 } 255 }) 256 257 // Return newly wrapped readcloser and size. 258 return iotools.ReadCloser(rc, c), sz, nil 259 } 260 261 newPathID, err = id.NewRandomULID() 262 if err != nil { 263 return nil, fmt.Errorf("preProcessEmoji: error generating alternateID for emoji refresh: %s", err) 264 } 265 266 // store + serve static image at new path ID 267 emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng) 268 emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, newPathID, mimePng) 269 270 emoji.Shortcode = shortcode 271 emoji.URI = uri 272 } else { 273 disabled := false 274 visibleInPicker := true 275 276 // populate initial fields on the emoji -- some of these will be overwritten as we proceed 277 emoji = >smodel.Emoji{ 278 ID: emojiID, 279 CreatedAt: now, 280 Shortcode: shortcode, 281 Domain: "", // assume our own domain unless told otherwise 282 ImageRemoteURL: "", 283 ImageStaticRemoteURL: "", 284 ImageURL: "", // we don't know yet 285 ImageStaticURL: uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png 286 ImagePath: "", // we don't know yet 287 ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png 288 ImageContentType: "", // we don't know yet 289 ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png 290 ImageFileSize: 0, 291 ImageStaticFileSize: 0, 292 Disabled: &disabled, 293 URI: uri, 294 VisibleInPicker: &visibleInPicker, 295 CategoryID: "", 296 } 297 } 298 299 emoji.ImageUpdatedAt = now 300 emoji.UpdatedAt = now 301 302 // check if we have additional info to add to the emoji, 303 // and overwrite some of the emoji fields if so 304 if ai != nil { 305 if ai.CreatedAt != nil { 306 emoji.CreatedAt = *ai.CreatedAt 307 } 308 309 if ai.Domain != nil { 310 emoji.Domain = *ai.Domain 311 } 312 313 if ai.ImageRemoteURL != nil { 314 emoji.ImageRemoteURL = *ai.ImageRemoteURL 315 } 316 317 if ai.ImageStaticRemoteURL != nil { 318 emoji.ImageStaticRemoteURL = *ai.ImageStaticRemoteURL 319 } 320 321 if ai.Disabled != nil { 322 emoji.Disabled = ai.Disabled 323 } 324 325 if ai.VisibleInPicker != nil { 326 emoji.VisibleInPicker = ai.VisibleInPicker 327 } 328 329 if ai.CategoryID != nil { 330 emoji.CategoryID = *ai.CategoryID 331 } 332 } 333 334 processingEmoji := &ProcessingEmoji{ 335 instAccID: instanceAccount.ID, 336 emoji: emoji, 337 refresh: refresh, 338 newPathID: newPathID, 339 dataFn: data, 340 mgr: m, 341 } 342 343 return processingEmoji, nil 344 } 345 346 // ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue. 347 func (m *Manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { 348 // Create a new processing emoji object for this emoji request. 349 emoji, err := m.PreProcessEmoji(ctx, data, shortcode, id, uri, ai, refresh) 350 if err != nil { 351 return nil, err 352 } 353 354 // Attempt to add this emoji processing item to the worker queue. 355 _ = m.state.Workers.Media.MustEnqueueCtx(ctx, emoji.Process) 356 357 return emoji, nil 358 } 359 360 func scheduleCleanupJobs(m *Manager) { 361 const day = time.Hour * 24 362 363 // Calculate closest midnight. 364 now := time.Now() 365 midnight := now.Round(day) 366 367 if midnight.Before(now) { 368 // since <= 11:59am rounds down. 369 midnight = midnight.Add(day) 370 } 371 372 // Get ctx associated with scheduler run state. 373 done := m.state.Workers.Scheduler.Done() 374 doneCtx := runners.CancelCtx(done) 375 376 // TODO: we'll need to do some thinking to make these 377 // jobs restartable if we want to implement reloads in 378 // the future that make call to Workers.Stop() -> Workers.Start(). 379 380 // Schedule the PruneAll task to execute every day at midnight. 381 m.state.Workers.Scheduler.Schedule(sched.NewJob(func(now time.Time) { 382 err := m.PruneAll(doneCtx, config.GetMediaRemoteCacheDays(), true) 383 if err != nil { 384 log.Errorf(nil, "error during prune: %v", err) 385 } 386 log.Infof(nil, "finished pruning all in %s", time.Since(now)) 387 }).EveryAt(midnight, day)) 388 }