processingmedia.go (11185B)
1 // GoToSocial 2 // Copyright (C) GoToSocial Authors admin@gotosocial.org 3 // SPDX-License-Identifier: AGPL-3.0-or-later 4 // 5 // This program is free software: you can redistribute it and/or modify 6 // it under the terms of the GNU Affero General Public License as published by 7 // the Free Software Foundation, either version 3 of the License, or 8 // (at your option) any later version. 9 // 10 // This program is distributed in the hope that it will be useful, 11 // but WITHOUT ANY WARRANTY; without even the implied warranty of 12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 // GNU Affero General Public License for more details. 14 // 15 // You should have received a copy of the GNU Affero General Public License 16 // along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18 package media 19 20 import ( 21 "bytes" 22 "context" 23 "fmt" 24 "image/jpeg" 25 "io" 26 "time" 27 28 "codeberg.org/gruf/go-errors/v2" 29 "codeberg.org/gruf/go-runners" 30 "github.com/disintegration/imaging" 31 "github.com/h2non/filetype" 32 terminator "github.com/superseriousbusiness/exif-terminator" 33 "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" 34 "github.com/superseriousbusiness/gotosocial/internal/log" 35 "github.com/superseriousbusiness/gotosocial/internal/uris" 36 ) 37 38 // ProcessingMedia represents a piece of media that is currently being processed. It exposes 39 // various functions for retrieving data from the process. 40 type ProcessingMedia struct { 41 media *gtsmodel.MediaAttachment // processing media attachment details 42 dataFn DataFunc // load-data function, returns media stream 43 recache bool // recaching existing (uncached) media 44 done bool // done is set when process finishes with non ctx canceled type error 45 proc runners.Processor // proc helps synchronize only a singular running processing instance 46 err error // error stores permanent error value when done 47 mgr *Manager // mgr instance (access to db / storage) 48 } 49 50 // AttachmentID returns the ID of the underlying media attachment without blocking processing. 51 func (p *ProcessingMedia) AttachmentID() string { 52 return p.media.ID // immutable, safe outside mutex. 53 } 54 55 // LoadAttachment blocks until the thumbnail and fullsize content has been processed, and then returns the completed attachment. 56 func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAttachment, error) { 57 // Attempt to load synchronously. 58 media, done, err := p.load(ctx) 59 60 if err == nil { 61 // No issue, return media. 62 return media, nil 63 } 64 65 if !done { 66 // Provided context was cancelled, e.g. request cancelled 67 // early. Queue this item for asynchronous processing. 68 log.Warnf(ctx, "reprocessing media %s after canceled ctx", p.media.ID) 69 go p.mgr.state.Workers.Media.Enqueue(p.Process) 70 } 71 72 return nil, err 73 } 74 75 // Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error. 76 func (p *ProcessingMedia) Process(ctx context.Context) { 77 if _, _, err := p.load(ctx); err != nil { 78 log.Errorf(ctx, "error processing media: %v", err) 79 } 80 } 81 82 // load performs a concurrency-safe load of ProcessingMedia, only marking itself as complete when returned error is NOT a context cancel. 83 func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment, bool, error) { 84 var ( 85 done bool 86 err error 87 ) 88 89 err = p.proc.Process(func() error { 90 if p.done { 91 // Already proc'd. 92 return p.err 93 } 94 95 defer func() { 96 // This is only done when ctx NOT cancelled. 97 done = err == nil || !errors.Comparable(err, 98 context.Canceled, 99 context.DeadlineExceeded, 100 ) 101 102 if !done { 103 return 104 } 105 106 // Store final values. 107 p.done = true 108 p.err = err 109 }() 110 111 // Attempt to store media and calculate 112 // full-size media attachment details. 113 if err = p.store(ctx); err != nil { 114 return err 115 } 116 117 // Finish processing by reloading media into 118 // memory to get dimension and generate a thumb. 119 if err = p.finish(ctx); err != nil { 120 return err 121 } 122 123 if p.recache { 124 // Existing attachment we're recaching, so only update. 125 err = p.mgr.state.DB.UpdateAttachment(ctx, p.media) 126 return err 127 } 128 129 // First time caching this attachment, insert it. 130 err = p.mgr.state.DB.PutAttachment(ctx, p.media) 131 return err 132 }) 133 134 if err != nil { 135 return nil, done, err 136 } 137 138 return p.media, done, nil 139 } 140 141 // store calls the data function attached to p if it hasn't been called yet, 142 // and updates the underlying attachment fields as necessary. It will then stream 143 // bytes from p's reader directly into storage so that it can be retrieved later. 144 func (p *ProcessingMedia) store(ctx context.Context) error { 145 // Load media from provided data fun 146 rc, sz, err := p.dataFn(ctx) 147 if err != nil { 148 return fmt.Errorf("error executing data function: %w", err) 149 } 150 151 defer func() { 152 // Ensure data reader gets closed on return. 153 if err := rc.Close(); err != nil { 154 log.Errorf(ctx, "error closing data reader: %v", err) 155 } 156 }() 157 158 // Byte buffer to read file header into. 159 // See: https://en.wikipedia.org/wiki/File_format#File_header 160 // and https://github.com/h2non/filetype 161 hdrBuf := make([]byte, 261) 162 163 // Read the first 261 header bytes into buffer. 164 if _, err := io.ReadFull(rc, hdrBuf); err != nil { 165 return fmt.Errorf("error reading incoming media: %w", err) 166 } 167 168 // Parse file type info from header buffer. 169 info, err := filetype.Match(hdrBuf) 170 if err != nil { 171 return fmt.Errorf("error parsing file type: %w", err) 172 } 173 174 // Recombine header bytes with remaining stream 175 r := io.MultiReader(bytes.NewReader(hdrBuf), rc) 176 177 switch info.Extension { 178 case "mp4": 179 p.media.Type = gtsmodel.FileTypeVideo 180 181 case "gif": 182 p.media.Type = gtsmodel.FileTypeImage 183 184 case "jpg", "jpeg", "png", "webp": 185 p.media.Type = gtsmodel.FileTypeImage 186 if sz > 0 { 187 // A file size was provided so we can clean exif data from image. 188 r, err = terminator.Terminate(r, int(sz), info.Extension) 189 if err != nil { 190 return fmt.Errorf("error cleaning exif data: %w", err) 191 } 192 } 193 194 default: 195 return fmt.Errorf("unsupported file type: %s", info.Extension) 196 } 197 198 // Calculate attachment file path. 199 p.media.File.Path = fmt.Sprintf( 200 "%s/%s/%s/%s.%s", 201 p.media.AccountID, 202 TypeAttachment, 203 SizeOriginal, 204 p.media.ID, 205 info.Extension, 206 ) 207 208 // This shouldn't already exist, but we do a check as it's worth logging. 209 if have, _ := p.mgr.state.Storage.Has(ctx, p.media.File.Path); have { 210 log.Warnf(ctx, "media already exists at storage path: %s", p.media.File.Path) 211 212 // Attempt to remove existing media at storage path (might be broken / out-of-date) 213 if err := p.mgr.state.Storage.Delete(ctx, p.media.File.Path); err != nil { 214 return fmt.Errorf("error removing media from storage: %v", err) 215 } 216 } 217 218 // Write the final image reader stream to our storage. 219 sz, err = p.mgr.state.Storage.PutStream(ctx, p.media.File.Path, r) 220 if err != nil { 221 return fmt.Errorf("error writing media to storage: %w", err) 222 } 223 224 // Set written image size. 225 p.media.File.FileSize = int(sz) 226 227 // Fill in remaining attachment data now it's stored. 228 p.media.URL = uris.GenerateURIForAttachment( 229 p.media.AccountID, 230 string(TypeAttachment), 231 string(SizeOriginal), 232 p.media.ID, 233 info.Extension, 234 ) 235 p.media.File.ContentType = info.MIME.Value 236 p.media.Cached = func() *bool { 237 ok := true 238 return &ok 239 }() 240 241 return nil 242 } 243 244 func (p *ProcessingMedia) finish(ctx context.Context) error { 245 // Fetch a stream to the original file in storage. 246 rc, err := p.mgr.state.Storage.GetStream(ctx, p.media.File.Path) 247 if err != nil { 248 return fmt.Errorf("error loading file from storage: %w", err) 249 } 250 defer rc.Close() 251 252 var fullImg *gtsImage 253 254 switch p.media.File.ContentType { 255 // .jpeg, .gif, .webp image type 256 case mimeImageJpeg, mimeImageGif, mimeImageWebp: 257 fullImg, err = decodeImage(rc, imaging.AutoOrientation(true)) 258 if err != nil { 259 return fmt.Errorf("error decoding image: %w", err) 260 } 261 262 // .png image (requires ancillary chunk stripping) 263 case mimeImagePng: 264 fullImg, err = decodeImage(&pngAncillaryChunkStripper{ 265 Reader: rc, 266 }, imaging.AutoOrientation(true)) 267 if err != nil { 268 return fmt.Errorf("error decoding image: %w", err) 269 } 270 271 // .mp4 video type 272 case mimeVideoMp4: 273 video, err := decodeVideoFrame(rc) 274 if err != nil { 275 return fmt.Errorf("error decoding video: %w", err) 276 } 277 278 // Set video frame as image. 279 fullImg = video.frame 280 281 // Set video metadata in attachment info. 282 p.media.FileMeta.Original.Duration = &video.duration 283 p.media.FileMeta.Original.Framerate = &video.framerate 284 p.media.FileMeta.Original.Bitrate = &video.bitrate 285 } 286 287 // The image should be in-memory by now. 288 if err := rc.Close(); err != nil { 289 return fmt.Errorf("error closing file: %w", err) 290 } 291 292 // Set full-size dimensions in attachment info. 293 p.media.FileMeta.Original.Width = int(fullImg.Width()) 294 p.media.FileMeta.Original.Height = int(fullImg.Height()) 295 p.media.FileMeta.Original.Size = int(fullImg.Size()) 296 p.media.FileMeta.Original.Aspect = fullImg.AspectRatio() 297 298 // Calculate attachment thumbnail file path 299 p.media.Thumbnail.Path = fmt.Sprintf( 300 "%s/%s/%s/%s.jpg", 301 p.media.AccountID, 302 TypeAttachment, 303 SizeSmall, 304 p.media.ID, 305 ) 306 307 // Get smaller thumbnail image 308 thumbImg := fullImg.Thumbnail() 309 310 // Garbage collector, you may 311 // now take our large son. 312 fullImg = nil 313 314 // Blurhash needs generating from thumb. 315 hash, err := thumbImg.Blurhash() 316 if err != nil { 317 return fmt.Errorf("error generating blurhash: %w", err) 318 } 319 320 // Set the attachment blurhash. 321 p.media.Blurhash = hash 322 323 // This shouldn't already exist, but we do a check as it's worth logging. 324 if have, _ := p.mgr.state.Storage.Has(ctx, p.media.Thumbnail.Path); have { 325 log.Warnf(ctx, "thumbnail already exists at storage path: %s", p.media.Thumbnail.Path) 326 327 // Attempt to remove existing thumbnail at storage path (might be broken / out-of-date) 328 if err := p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path); err != nil { 329 return fmt.Errorf("error removing thumbnail from storage: %v", err) 330 } 331 } 332 333 // Create a thumbnail JPEG encoder stream. 334 enc := thumbImg.ToJPEG(&jpeg.Options{ 335 Quality: 70, // enough for a thumbnail. 336 }) 337 338 // Stream-encode the JPEG thumbnail image into storage. 339 sz, err := p.mgr.state.Storage.PutStream(ctx, p.media.Thumbnail.Path, enc) 340 if err != nil { 341 return fmt.Errorf("error stream-encoding thumbnail to storage: %w", err) 342 } 343 344 // Fill in remaining thumbnail now it's stored 345 p.media.Thumbnail.ContentType = mimeImageJpeg 346 p.media.Thumbnail.URL = uris.GenerateURIForAttachment( 347 p.media.AccountID, 348 string(TypeAttachment), 349 string(SizeSmall), 350 p.media.ID, 351 "jpg", // always jpeg 352 ) 353 354 // Set thumbnail dimensions in attachment info. 355 p.media.FileMeta.Small = gtsmodel.Small{ 356 Width: int(thumbImg.Width()), 357 Height: int(thumbImg.Height()), 358 Size: int(thumbImg.Size()), 359 Aspect: thumbImg.AspectRatio(), 360 } 361 362 // Set written image size. 363 p.media.Thumbnail.FileSize = int(sz) 364 365 // Finally set the attachment as processed and update time. 366 p.media.Processing = gtsmodel.ProcessingStatusProcessed 367 p.media.File.UpdatedAt = time.Now() 368 369 return nil 370 }