class Tus::Storage::Gridfs
Constants
- BATCH_SIZE
Attributes
Public Class Methods
Initializes the GridFS storage and creates necessary indexes.
# File lib/tus/storage/gridfs.rb, line 21 def initialize(client:, prefix: "fs", chunk_size: 256*1024) @client = client @prefix = prefix @chunk_size = chunk_size @bucket = client.database.fs(bucket_name: prefix) @bucket.send(:ensure_indexes!) end
Public Instance Methods
Concatenates multiple partial uploads into a single upload, and returns the size of the resulting upload. The partial uploads are deleted after concatenation.
It concatenates by updating partial upload's GridFS chunks to point to the new upload.
Raises Tus::Error if GridFS chunks of partial uploads don't exist or aren't completely filled.
# File lib/tus/storage/gridfs.rb, line 49 def concatenate(uid, part_uids, info = {}) grid_infos = files_collection.find(filename: {"$in" => part_uids}).to_a grid_infos.sort_by! { |grid_info| part_uids.index(grid_info[:filename]) } validate_parts!(grid_infos, part_uids) length = grid_infos.map { |doc| doc[:length] }.reduce(0, :+) content_type = Tus::Info.new(info).type grid_file = create_grid_file( filename: uid, length: length, content_type: content_type, ) # Update the chunks belonging to parts so that they point to the new file. grid_infos.inject(0) do |offset, grid_info| result = chunks_collection .find(files_id: grid_info[:_id]) .update_many( "$set" => { files_id: grid_file.id }, "$inc" => { n: offset }, ) offset += result.modified_count end # Delete the parts after concatenation. files_collection.delete_many(filename: {"$in" => part_uids}) end
Creates a file for the specified upload.
# File lib/tus/storage/gridfs.rb, line 31 def create_file(uid, info = {}) content_type = Tus::Info.new(info).metadata["content_type"] create_grid_file( filename: uid, content_type: content_type, ) end
Deletes the GridFS file and chunks for the specified upload.
# File lib/tus/storage/gridfs.rb, line 200 def delete_file(uid, info = {}) grid_info = files_collection.find(filename: uid).first bucket.delete(grid_info[:_id]) if grid_info end
Deletes GridFS file and chunks of uploads older than the specified date.
# File lib/tus/storage/gridfs.rb, line 206 def expire_files(expiration_date) grid_infos = files_collection.find(uploadDate: {"$lte" => expiration_date}).to_a grid_info_ids = grid_infos.map { |info| info[:_id] } files_collection.delete_many(_id: {"$in" => grid_info_ids}) chunks_collection.delete_many(files_id: {"$in" => grid_info_ids}) end
Returns a Tus::Response
object through which data of the specified upload can be retrieved in a streaming fashion. Accepts an optional range parameter for selecting a subset of bytes we want to retrieve.
# File lib/tus/storage/gridfs.rb, line 151 def get_file(uid, info = {}, range: nil) grid_info = files_collection.find(filename: uid).first filter = { files_id: grid_info[:_id] } if range chunk_start = range.begin / grid_info[:chunkSize] chunk_stop = range.end / grid_info[:chunkSize] filter[:n] = {"$gte" => chunk_start, "$lte" => chunk_stop} end # Query only the subset of chunks specified by the range query. We # cannot use Mongo::FsBucket#open_download_stream here because it # doesn't support changing the filter. chunks_view = chunks_collection.find(filter).sort(n: 1) # Create an Enumerator which will yield chunks of the requested file # content, allowing tus server to efficiently stream requested content # to the client. chunks = Enumerator.new do |yielder| chunks_view.each do |document| data = document[:data].data if document[:n] == chunk_start && document[:n] == chunk_stop byte_start = range.begin % grid_info[:chunkSize] byte_stop = range.end % grid_info[:chunkSize] elsif document[:n] == chunk_start byte_start = range.begin % grid_info[:chunkSize] byte_stop = grid_info[:chunkSize] - 1 elsif document[:n] == chunk_stop byte_start = 0 byte_stop = range.end % grid_info[:chunkSize] end # If we're on the first or last chunk, return a subset of the chunk # specified by the given range, otherwise return the full chunk. if byte_start && byte_stop yielder << data[byte_start..byte_stop] else yielder << data end end end Tus::Response.new(chunks: chunks, close: chunks_view.method(:close_query)) end
Appends data to the specified upload in a streaming fashion, and returns the number of bytes it managed to save.
It does so by reading the input data in batches of chunks, creating a new GridFS chunk for each chunk of data and appending it to the existing list.
# File lib/tus/storage/gridfs.rb, line 86 def patch_file(uid, input, info = {}) grid_info = files_collection.find(filename: uid).first current_length = grid_info[:length] chunk_size = grid_info[:chunkSize] bytes_saved = 0 # It's possible that the previous data append didn't fill in the last # GridFS chunk completely, so we fill in that gap now before creating # new GridFS chunks. bytes_saved += patch_last_chunk(input, grid_info) if current_length % chunk_size != 0 # Create an Enumerator which yields chunks of input data which have the # size of the configured :chunkSize of the GridFS file. chunks_enumerator = Enumerator.new do |yielder| while (data = input.read(chunk_size)) yielder << data end end chunks_in_batch = (BATCH_SIZE.to_f / chunk_size).ceil chunks_offset = chunks_collection.count(files_id: grid_info[:_id]) - 1 # Iterate in batches of data chunks and bulk-insert new GridFS chunks. # This way we try to have a balance between bulking inserts and keeping # memory usage low. chunks_enumerator.each_slice(chunks_in_batch) do |chunks| grid_chunks = chunks.map do |data| Mongo::Grid::File::Chunk.new( data: BSON::Binary.new(data), files_id: grid_info[:_id], n: chunks_offset += 1, ) end chunks_collection.insert_many(grid_chunks) # Update the total length and refresh the upload date on each update, # which are used in #get_file, #concatenate and #expire_files. files_collection.find(filename: uid).update_one( "$inc" => { length: chunks.map(&:bytesize).inject(0, :+) }, "$set" => { uploadDate: Time.now.utc }, ) bytes_saved += chunks.map(&:bytesize).inject(0, :+) chunks.each(&:clear) # deallocate strings end bytes_saved end
Returns info of the specified upload. Raises Tus::NotFound if the upload wasn't found.
# File lib/tus/storage/gridfs.rb, line 138 def read_info(uid) grid_info = files_collection.find(filename: uid).first or raise Tus::NotFound grid_info[:metadata] end
Updates info of the specified upload.
# File lib/tus/storage/gridfs.rb, line 144 def update_info(uid, info) files_collection.update_one({filename: uid}, {"$set" => {metadata: info}}) end
Private Instance Methods
# File lib/tus/storage/gridfs.rb, line 270 def chunks_collection bucket.chunks_collection end
Creates a GridFS file.
# File lib/tus/storage/gridfs.rb, line 217 def create_grid_file(**options) grid_file = Mongo::Grid::File.new("", metadata: {}, chunk_size: chunk_size, **options) bucket.insert_one(grid_file) grid_file end
# File lib/tus/storage/gridfs.rb, line 266 def files_collection bucket.files_collection end
If the last GridFS chunk of the file is incomplete (meaning it's smaller than the configured :chunkSize of the GridFS file), fills the missing data by reading a chunk of the input data.
# File lib/tus/storage/gridfs.rb, line 228 def patch_last_chunk(input, grid_info) last_chunk = chunks_collection.find(files_id: grid_info[:_id]).sort(n: -1).limit(1).first data = last_chunk[:data].data patch = input.read(grid_info[:chunkSize] - data.bytesize) data << patch chunks_collection.find(files_id: grid_info[:_id], n: last_chunk[:n]) .update_one("$set" => { data: BSON::Binary.new(data) }) files_collection.find(_id: grid_info[:_id]) .update_one("$inc" => { length: patch.bytesize }) patch.bytesize end
Validates that GridFS files of partial uploads are suitable for concatentation.
# File lib/tus/storage/gridfs.rb, line 245 def validate_parts!(grid_infos, part_uids) validate_parts_presence!(grid_infos, part_uids) validate_parts_full_chunks!(grid_infos) end
Validates that GridFS chunks of each file are filled completely.
# File lib/tus/storage/gridfs.rb, line 258 def validate_parts_full_chunks!(grid_infos) grid_infos.each do |grid_info| if grid_info[:length] % grid_info[:chunkSize] != 0 && grid_info != grid_infos.last raise Tus::Error, "cannot concatenate parts which aren't evenly distributed across chunks" end end end
Validates that each partial upload has a corresponding GridFS file.
# File lib/tus/storage/gridfs.rb, line 251 def validate_parts_presence!(grid_infos, part_uids) if grid_infos.count != part_uids.count raise Tus::Error, "some parts for concatenation are missing" end end