class Tus::Storage::Gridfs

Constants

BATCH_SIZE

Attributes

bucket[R]
chunk_size[R]
client[R]
prefix[R]

Public Class Methods

new(client:, prefix: "fs", chunk_size: 256*1024) click to toggle source

Initializes the GridFS storage and creates necessary indexes.

# File lib/tus/storage/gridfs.rb, line 21
def initialize(client:, prefix: "fs", chunk_size: 256*1024)
  @client     = client
  @prefix     = prefix
  @chunk_size = chunk_size
  @bucket     = client.database.fs(bucket_name: prefix)

  @bucket.send(:ensure_indexes!)
end

Public Instance Methods

concatenate(uid, part_uids, info = {}) click to toggle source

Concatenates multiple partial uploads into a single upload, and returns the size of the resulting upload. The partial uploads are deleted after concatenation.

It concatenates by updating partial upload's GridFS chunks to point to the new upload.

Raises Tus::Error if GridFS chunks of partial uploads don't exist or aren't completely filled.

# File lib/tus/storage/gridfs.rb, line 49
def concatenate(uid, part_uids, info = {})
  grid_infos = files_collection.find(filename: {"$in" => part_uids}).to_a
  grid_infos.sort_by! { |grid_info| part_uids.index(grid_info[:filename]) }

  validate_parts!(grid_infos, part_uids)

  length       = grid_infos.map { |doc| doc[:length] }.reduce(0, :+)
  content_type = Tus::Info.new(info).type

  grid_file = create_grid_file(
    filename:     uid,
    length:       length,
    content_type: content_type,
  )

  # Update the chunks belonging to parts so that they point to the new file.
  grid_infos.inject(0) do |offset, grid_info|
    result = chunks_collection
      .find(files_id: grid_info[:_id])
      .update_many(
        "$set" => { files_id: grid_file.id },
        "$inc" => { n: offset },
      )

    offset += result.modified_count
  end

  # Delete the parts after concatenation.
  files_collection.delete_many(filename: {"$in" => part_uids})
end
create_file(uid, info = {}) click to toggle source

Creates a file for the specified upload.

# File lib/tus/storage/gridfs.rb, line 31
def create_file(uid, info = {})
  content_type = Tus::Info.new(info).metadata["content_type"]

  create_grid_file(
    filename:     uid,
    content_type: content_type,
  )
end
delete_file(uid, info = {}) click to toggle source

Deletes the GridFS file and chunks for the specified upload.

# File lib/tus/storage/gridfs.rb, line 200
def delete_file(uid, info = {})
  grid_info = files_collection.find(filename: uid).first
  bucket.delete(grid_info[:_id]) if grid_info
end
expire_files(expiration_date) click to toggle source

Deletes GridFS file and chunks of uploads older than the specified date.

# File lib/tus/storage/gridfs.rb, line 206
def expire_files(expiration_date)
  grid_infos = files_collection.find(uploadDate: {"$lte" => expiration_date}).to_a
  grid_info_ids = grid_infos.map { |info| info[:_id] }

  files_collection.delete_many(_id: {"$in" => grid_info_ids})
  chunks_collection.delete_many(files_id: {"$in" => grid_info_ids})
end
get_file(uid, info = {}, range: nil) click to toggle source

Returns a Tus::Response object through which data of the specified upload can be retrieved in a streaming fashion. Accepts an optional range parameter for selecting a subset of bytes we want to retrieve.

# File lib/tus/storage/gridfs.rb, line 151
def get_file(uid, info = {}, range: nil)
  grid_info = files_collection.find(filename: uid).first

  filter = { files_id: grid_info[:_id] }

  if range
    chunk_start = range.begin / grid_info[:chunkSize]
    chunk_stop  = range.end   / grid_info[:chunkSize]

    filter[:n] = {"$gte" => chunk_start, "$lte" => chunk_stop}
  end

  # Query only the subset of chunks specified by the range query. We
  # cannot use Mongo::FsBucket#open_download_stream here because it
  # doesn't support changing the filter.
  chunks_view = chunks_collection.find(filter).sort(n: 1)

  # Create an Enumerator which will yield chunks of the requested file
  # content, allowing tus server to efficiently stream requested content
  # to the client.
  chunks = Enumerator.new do |yielder|
    chunks_view.each do |document|
      data = document[:data].data

      if document[:n] == chunk_start && document[:n] == chunk_stop
        byte_start = range.begin % grid_info[:chunkSize]
        byte_stop  = range.end   % grid_info[:chunkSize]
      elsif document[:n] == chunk_start
        byte_start = range.begin % grid_info[:chunkSize]
        byte_stop  = grid_info[:chunkSize] - 1
      elsif document[:n] == chunk_stop
        byte_start = 0
        byte_stop  = range.end % grid_info[:chunkSize]
      end

      # If we're on the first or last chunk, return a subset of the chunk
      # specified by the given range, otherwise return the full chunk.
      if byte_start && byte_stop
        yielder << data[byte_start..byte_stop]
      else
        yielder << data
      end
    end
  end

  Tus::Response.new(chunks: chunks, close: chunks_view.method(:close_query))
end
patch_file(uid, input, info = {}) click to toggle source

Appends data to the specified upload in a streaming fashion, and returns the number of bytes it managed to save.

It does so by reading the input data in batches of chunks, creating a new GridFS chunk for each chunk of data and appending it to the existing list.

# File lib/tus/storage/gridfs.rb, line 86
def patch_file(uid, input, info = {})
  grid_info      = files_collection.find(filename: uid).first
  current_length = grid_info[:length]
  chunk_size     = grid_info[:chunkSize]
  bytes_saved    = 0

  # It's possible that the previous data append didn't fill in the last
  # GridFS chunk completely, so we fill in that gap now before creating
  # new GridFS chunks.
  bytes_saved += patch_last_chunk(input, grid_info) if current_length % chunk_size != 0

  # Create an Enumerator which yields chunks of input data which have the
  # size of the configured :chunkSize of the GridFS file.
  chunks_enumerator = Enumerator.new do |yielder|
    while (data = input.read(chunk_size))
      yielder << data
    end
  end

  chunks_in_batch = (BATCH_SIZE.to_f / chunk_size).ceil
  chunks_offset   = chunks_collection.count(files_id: grid_info[:_id]) - 1

  # Iterate in batches of data chunks and bulk-insert new GridFS chunks.
  # This way we try to have a balance between bulking inserts and keeping
  # memory usage low.
  chunks_enumerator.each_slice(chunks_in_batch) do |chunks|
    grid_chunks = chunks.map do |data|
      Mongo::Grid::File::Chunk.new(
        data: BSON::Binary.new(data),
        files_id: grid_info[:_id],
        n: chunks_offset += 1,
      )
    end

    chunks_collection.insert_many(grid_chunks)

    # Update the total length and refresh the upload date on each update,
    # which are used in #get_file, #concatenate and #expire_files.
    files_collection.find(filename: uid).update_one(
      "$inc" => { length: chunks.map(&:bytesize).inject(0, :+) },
      "$set" => { uploadDate: Time.now.utc },
    )
    bytes_saved += chunks.map(&:bytesize).inject(0, :+)

    chunks.each(&:clear) # deallocate strings
  end

  bytes_saved
end
read_info(uid) click to toggle source

Returns info of the specified upload. Raises Tus::NotFound if the upload wasn't found.

# File lib/tus/storage/gridfs.rb, line 138
def read_info(uid)
  grid_info = files_collection.find(filename: uid).first or raise Tus::NotFound
  grid_info[:metadata]
end
update_info(uid, info) click to toggle source

Updates info of the specified upload.

# File lib/tus/storage/gridfs.rb, line 144
def update_info(uid, info)
  files_collection.update_one({filename: uid}, {"$set" => {metadata: info}})
end

Private Instance Methods

chunks_collection() click to toggle source
# File lib/tus/storage/gridfs.rb, line 270
def chunks_collection
  bucket.chunks_collection
end
create_grid_file(**options) click to toggle source

Creates a GridFS file.

# File lib/tus/storage/gridfs.rb, line 217
def create_grid_file(**options)
  grid_file = Mongo::Grid::File.new("", metadata: {}, chunk_size: chunk_size, **options)

  bucket.insert_one(grid_file)

  grid_file
end
files_collection() click to toggle source
# File lib/tus/storage/gridfs.rb, line 266
def files_collection
  bucket.files_collection
end
patch_last_chunk(input, grid_info) click to toggle source

If the last GridFS chunk of the file is incomplete (meaning it's smaller than the configured :chunkSize of the GridFS file), fills the missing data by reading a chunk of the input data.

# File lib/tus/storage/gridfs.rb, line 228
def patch_last_chunk(input, grid_info)
  last_chunk = chunks_collection.find(files_id: grid_info[:_id]).sort(n: -1).limit(1).first
  data = last_chunk[:data].data
  patch = input.read(grid_info[:chunkSize] - data.bytesize)
  data << patch

  chunks_collection.find(files_id: grid_info[:_id], n: last_chunk[:n])
    .update_one("$set" => { data: BSON::Binary.new(data) })

  files_collection.find(_id: grid_info[:_id])
    .update_one("$inc" => { length: patch.bytesize })

  patch.bytesize
end
validate_parts!(grid_infos, part_uids) click to toggle source

Validates that GridFS files of partial uploads are suitable for concatentation.

# File lib/tus/storage/gridfs.rb, line 245
def validate_parts!(grid_infos, part_uids)
  validate_parts_presence!(grid_infos, part_uids)
  validate_parts_full_chunks!(grid_infos)
end
validate_parts_full_chunks!(grid_infos) click to toggle source

Validates that GridFS chunks of each file are filled completely.

# File lib/tus/storage/gridfs.rb, line 258
def validate_parts_full_chunks!(grid_infos)
  grid_infos.each do |grid_info|
    if grid_info[:length] % grid_info[:chunkSize] != 0 && grid_info != grid_infos.last
      raise Tus::Error, "cannot concatenate parts which aren't evenly distributed across chunks"
    end
  end
end
validate_parts_presence!(grid_infos, part_uids) click to toggle source

Validates that each partial upload has a corresponding GridFS file.

# File lib/tus/storage/gridfs.rb, line 251
def validate_parts_presence!(grid_infos, part_uids)
  if grid_infos.count != part_uids.count
    raise Tus::Error, "some parts for concatenation are missing"
  end
end