gtsocial-umbx

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

processingmedia.go (11185B)


      1 // GoToSocial
      2 // Copyright (C) GoToSocial Authors admin@gotosocial.org
      3 // SPDX-License-Identifier: AGPL-3.0-or-later
      4 //
      5 // This program is free software: you can redistribute it and/or modify
      6 // it under the terms of the GNU Affero General Public License as published by
      7 // the Free Software Foundation, either version 3 of the License, or
      8 // (at your option) any later version.
      9 //
     10 // This program is distributed in the hope that it will be useful,
     11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
     12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     13 // GNU Affero General Public License for more details.
     14 //
     15 // You should have received a copy of the GNU Affero General Public License
     16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     17 
     18 package media
     19 
     20 import (
     21 	"bytes"
     22 	"context"
     23 	"fmt"
     24 	"image/jpeg"
     25 	"io"
     26 	"time"
     27 
     28 	"codeberg.org/gruf/go-errors/v2"
     29 	"codeberg.org/gruf/go-runners"
     30 	"github.com/disintegration/imaging"
     31 	"github.com/h2non/filetype"
     32 	terminator "github.com/superseriousbusiness/exif-terminator"
     33 	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
     34 	"github.com/superseriousbusiness/gotosocial/internal/log"
     35 	"github.com/superseriousbusiness/gotosocial/internal/uris"
     36 )
     37 
     38 // ProcessingMedia represents a piece of media that is currently being processed. It exposes
     39 // various functions for retrieving data from the process.
     40 type ProcessingMedia struct {
     41 	media   *gtsmodel.MediaAttachment // processing media attachment details
     42 	dataFn  DataFunc                  // load-data function, returns media stream
     43 	recache bool                      // recaching existing (uncached) media
     44 	done    bool                      // done is set when process finishes with non ctx canceled type error
     45 	proc    runners.Processor         // proc helps synchronize only a singular running processing instance
     46 	err     error                     // error stores permanent error value when done
     47 	mgr     *Manager                  // mgr instance (access to db / storage)
     48 }
     49 
     50 // AttachmentID returns the ID of the underlying media attachment without blocking processing.
     51 func (p *ProcessingMedia) AttachmentID() string {
     52 	return p.media.ID // immutable, safe outside mutex.
     53 }
     54 
     55 // LoadAttachment blocks until the thumbnail and fullsize content has been processed, and then returns the completed attachment.
     56 func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAttachment, error) {
     57 	// Attempt to load synchronously.
     58 	media, done, err := p.load(ctx)
     59 
     60 	if err == nil {
     61 		// No issue, return media.
     62 		return media, nil
     63 	}
     64 
     65 	if !done {
     66 		// Provided context was cancelled, e.g. request cancelled
     67 		// early. Queue this item for asynchronous processing.
     68 		log.Warnf(ctx, "reprocessing media %s after canceled ctx", p.media.ID)
     69 		go p.mgr.state.Workers.Media.Enqueue(p.Process)
     70 	}
     71 
     72 	return nil, err
     73 }
     74 
     75 // Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error.
     76 func (p *ProcessingMedia) Process(ctx context.Context) {
     77 	if _, _, err := p.load(ctx); err != nil {
     78 		log.Errorf(ctx, "error processing media: %v", err)
     79 	}
     80 }
     81 
     82 // load performs a concurrency-safe load of ProcessingMedia, only marking itself as complete when returned error is NOT a context cancel.
     83 func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment, bool, error) {
     84 	var (
     85 		done bool
     86 		err  error
     87 	)
     88 
     89 	err = p.proc.Process(func() error {
     90 		if p.done {
     91 			// Already proc'd.
     92 			return p.err
     93 		}
     94 
     95 		defer func() {
     96 			// This is only done when ctx NOT cancelled.
     97 			done = err == nil || !errors.Comparable(err,
     98 				context.Canceled,
     99 				context.DeadlineExceeded,
    100 			)
    101 
    102 			if !done {
    103 				return
    104 			}
    105 
    106 			// Store final values.
    107 			p.done = true
    108 			p.err = err
    109 		}()
    110 
    111 		// Attempt to store media and calculate
    112 		// full-size media attachment details.
    113 		if err = p.store(ctx); err != nil {
    114 			return err
    115 		}
    116 
    117 		// Finish processing by reloading media into
    118 		// memory to get dimension and generate a thumb.
    119 		if err = p.finish(ctx); err != nil {
    120 			return err
    121 		}
    122 
    123 		if p.recache {
    124 			// Existing attachment we're recaching, so only update.
    125 			err = p.mgr.state.DB.UpdateAttachment(ctx, p.media)
    126 			return err
    127 		}
    128 
    129 		// First time caching this attachment, insert it.
    130 		err = p.mgr.state.DB.PutAttachment(ctx, p.media)
    131 		return err
    132 	})
    133 
    134 	if err != nil {
    135 		return nil, done, err
    136 	}
    137 
    138 	return p.media, done, nil
    139 }
    140 
    141 // store calls the data function attached to p if it hasn't been called yet,
    142 // and updates the underlying attachment fields as necessary. It will then stream
    143 // bytes from p's reader directly into storage so that it can be retrieved later.
    144 func (p *ProcessingMedia) store(ctx context.Context) error {
    145 	// Load media from provided data fun
    146 	rc, sz, err := p.dataFn(ctx)
    147 	if err != nil {
    148 		return fmt.Errorf("error executing data function: %w", err)
    149 	}
    150 
    151 	defer func() {
    152 		// Ensure data reader gets closed on return.
    153 		if err := rc.Close(); err != nil {
    154 			log.Errorf(ctx, "error closing data reader: %v", err)
    155 		}
    156 	}()
    157 
    158 	// Byte buffer to read file header into.
    159 	// See: https://en.wikipedia.org/wiki/File_format#File_header
    160 	// and https://github.com/h2non/filetype
    161 	hdrBuf := make([]byte, 261)
    162 
    163 	// Read the first 261 header bytes into buffer.
    164 	if _, err := io.ReadFull(rc, hdrBuf); err != nil {
    165 		return fmt.Errorf("error reading incoming media: %w", err)
    166 	}
    167 
    168 	// Parse file type info from header buffer.
    169 	info, err := filetype.Match(hdrBuf)
    170 	if err != nil {
    171 		return fmt.Errorf("error parsing file type: %w", err)
    172 	}
    173 
    174 	// Recombine header bytes with remaining stream
    175 	r := io.MultiReader(bytes.NewReader(hdrBuf), rc)
    176 
    177 	switch info.Extension {
    178 	case "mp4":
    179 		p.media.Type = gtsmodel.FileTypeVideo
    180 
    181 	case "gif":
    182 		p.media.Type = gtsmodel.FileTypeImage
    183 
    184 	case "jpg", "jpeg", "png", "webp":
    185 		p.media.Type = gtsmodel.FileTypeImage
    186 		if sz > 0 {
    187 			// A file size was provided so we can clean exif data from image.
    188 			r, err = terminator.Terminate(r, int(sz), info.Extension)
    189 			if err != nil {
    190 				return fmt.Errorf("error cleaning exif data: %w", err)
    191 			}
    192 		}
    193 
    194 	default:
    195 		return fmt.Errorf("unsupported file type: %s", info.Extension)
    196 	}
    197 
    198 	// Calculate attachment file path.
    199 	p.media.File.Path = fmt.Sprintf(
    200 		"%s/%s/%s/%s.%s",
    201 		p.media.AccountID,
    202 		TypeAttachment,
    203 		SizeOriginal,
    204 		p.media.ID,
    205 		info.Extension,
    206 	)
    207 
    208 	// This shouldn't already exist, but we do a check as it's worth logging.
    209 	if have, _ := p.mgr.state.Storage.Has(ctx, p.media.File.Path); have {
    210 		log.Warnf(ctx, "media already exists at storage path: %s", p.media.File.Path)
    211 
    212 		// Attempt to remove existing media at storage path (might be broken / out-of-date)
    213 		if err := p.mgr.state.Storage.Delete(ctx, p.media.File.Path); err != nil {
    214 			return fmt.Errorf("error removing media from storage: %v", err)
    215 		}
    216 	}
    217 
    218 	// Write the final image reader stream to our storage.
    219 	sz, err = p.mgr.state.Storage.PutStream(ctx, p.media.File.Path, r)
    220 	if err != nil {
    221 		return fmt.Errorf("error writing media to storage: %w", err)
    222 	}
    223 
    224 	// Set written image size.
    225 	p.media.File.FileSize = int(sz)
    226 
    227 	// Fill in remaining attachment data now it's stored.
    228 	p.media.URL = uris.GenerateURIForAttachment(
    229 		p.media.AccountID,
    230 		string(TypeAttachment),
    231 		string(SizeOriginal),
    232 		p.media.ID,
    233 		info.Extension,
    234 	)
    235 	p.media.File.ContentType = info.MIME.Value
    236 	p.media.Cached = func() *bool {
    237 		ok := true
    238 		return &ok
    239 	}()
    240 
    241 	return nil
    242 }
    243 
    244 func (p *ProcessingMedia) finish(ctx context.Context) error {
    245 	// Fetch a stream to the original file in storage.
    246 	rc, err := p.mgr.state.Storage.GetStream(ctx, p.media.File.Path)
    247 	if err != nil {
    248 		return fmt.Errorf("error loading file from storage: %w", err)
    249 	}
    250 	defer rc.Close()
    251 
    252 	var fullImg *gtsImage
    253 
    254 	switch p.media.File.ContentType {
    255 	// .jpeg, .gif, .webp image type
    256 	case mimeImageJpeg, mimeImageGif, mimeImageWebp:
    257 		fullImg, err = decodeImage(rc, imaging.AutoOrientation(true))
    258 		if err != nil {
    259 			return fmt.Errorf("error decoding image: %w", err)
    260 		}
    261 
    262 	// .png image (requires ancillary chunk stripping)
    263 	case mimeImagePng:
    264 		fullImg, err = decodeImage(&pngAncillaryChunkStripper{
    265 			Reader: rc,
    266 		}, imaging.AutoOrientation(true))
    267 		if err != nil {
    268 			return fmt.Errorf("error decoding image: %w", err)
    269 		}
    270 
    271 	// .mp4 video type
    272 	case mimeVideoMp4:
    273 		video, err := decodeVideoFrame(rc)
    274 		if err != nil {
    275 			return fmt.Errorf("error decoding video: %w", err)
    276 		}
    277 
    278 		// Set video frame as image.
    279 		fullImg = video.frame
    280 
    281 		// Set video metadata in attachment info.
    282 		p.media.FileMeta.Original.Duration = &video.duration
    283 		p.media.FileMeta.Original.Framerate = &video.framerate
    284 		p.media.FileMeta.Original.Bitrate = &video.bitrate
    285 	}
    286 
    287 	// The image should be in-memory by now.
    288 	if err := rc.Close(); err != nil {
    289 		return fmt.Errorf("error closing file: %w", err)
    290 	}
    291 
    292 	// Set full-size dimensions in attachment info.
    293 	p.media.FileMeta.Original.Width = int(fullImg.Width())
    294 	p.media.FileMeta.Original.Height = int(fullImg.Height())
    295 	p.media.FileMeta.Original.Size = int(fullImg.Size())
    296 	p.media.FileMeta.Original.Aspect = fullImg.AspectRatio()
    297 
    298 	// Calculate attachment thumbnail file path
    299 	p.media.Thumbnail.Path = fmt.Sprintf(
    300 		"%s/%s/%s/%s.jpg",
    301 		p.media.AccountID,
    302 		TypeAttachment,
    303 		SizeSmall,
    304 		p.media.ID,
    305 	)
    306 
    307 	// Get smaller thumbnail image
    308 	thumbImg := fullImg.Thumbnail()
    309 
    310 	// Garbage collector, you may
    311 	// now take our large son.
    312 	fullImg = nil
    313 
    314 	// Blurhash needs generating from thumb.
    315 	hash, err := thumbImg.Blurhash()
    316 	if err != nil {
    317 		return fmt.Errorf("error generating blurhash: %w", err)
    318 	}
    319 
    320 	// Set the attachment blurhash.
    321 	p.media.Blurhash = hash
    322 
    323 	// This shouldn't already exist, but we do a check as it's worth logging.
    324 	if have, _ := p.mgr.state.Storage.Has(ctx, p.media.Thumbnail.Path); have {
    325 		log.Warnf(ctx, "thumbnail already exists at storage path: %s", p.media.Thumbnail.Path)
    326 
    327 		// Attempt to remove existing thumbnail at storage path (might be broken / out-of-date)
    328 		if err := p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path); err != nil {
    329 			return fmt.Errorf("error removing thumbnail from storage: %v", err)
    330 		}
    331 	}
    332 
    333 	// Create a thumbnail JPEG encoder stream.
    334 	enc := thumbImg.ToJPEG(&jpeg.Options{
    335 		Quality: 70, // enough for a thumbnail.
    336 	})
    337 
    338 	// Stream-encode the JPEG thumbnail image into storage.
    339 	sz, err := p.mgr.state.Storage.PutStream(ctx, p.media.Thumbnail.Path, enc)
    340 	if err != nil {
    341 		return fmt.Errorf("error stream-encoding thumbnail to storage: %w", err)
    342 	}
    343 
    344 	// Fill in remaining thumbnail now it's stored
    345 	p.media.Thumbnail.ContentType = mimeImageJpeg
    346 	p.media.Thumbnail.URL = uris.GenerateURIForAttachment(
    347 		p.media.AccountID,
    348 		string(TypeAttachment),
    349 		string(SizeSmall),
    350 		p.media.ID,
    351 		"jpg", // always jpeg
    352 	)
    353 
    354 	// Set thumbnail dimensions in attachment info.
    355 	p.media.FileMeta.Small = gtsmodel.Small{
    356 		Width:  int(thumbImg.Width()),
    357 		Height: int(thumbImg.Height()),
    358 		Size:   int(thumbImg.Size()),
    359 		Aspect: thumbImg.AspectRatio(),
    360 	}
    361 
    362 	// Set written image size.
    363 	p.media.Thumbnail.FileSize = int(sz)
    364 
    365 	// Finally set the attachment as processed and update time.
    366 	p.media.Processing = gtsmodel.ProcessingStatusProcessed
    367 	p.media.File.UpdatedAt = time.Now()
    368 
    369 	return nil
    370 }