class Miner
Constants
- DEFAULTS
Attributes
active_files[R]
batch_lines[R]
eof_seconds[R]
files[R]
paths[R]
registry_path[R]
Public Class Methods
new(options = {})
click to toggle source
Create a new file miner instance
@param [Hash] options @option options [String] :registry_path (/var/lib/fileminer/registry) @option options [Array] :paths @option options [Integer] :eof_seconds (86400) @option options [Integer] :batch_lines (50) @option options [String] :host (Socket.gethostname)
# File lib/fileminer/miner.rb, line 25 def initialize(options = {}) # fix options by DEFAULTS DEFAULTS.each { |k, v| options[k] = v unless options.key? k } @registry_path = options[:registry_path] if @registry_path.nil? @registry_path = "#{options[:work_dir]}/registry" end @history_path = options[:history_path] if @history_path.nil? @history_path = "#{options[:work_dir]}/history" end @max_eof_files = options[:max_eof_files] @paths = options[:paths] @eof_seconds = options[:eof_seconds] @batch_lines = options[:batch_lines] @host = options[:host] if @host.nil? require 'socket' @host = Socket.gethostname end @files = [] @active_files = [] @history = [] if File.exist? @history_path @history = File.open(@history_path) { |io| JSON.parse(io.read) } else parent_dir = File.dirname @history_path Dir.mkdirs parent_dir unless Dir.exist? parent_dir end if File.exist? @registry_path @files = File.open(@registry_path) { |io| JSON.parse(io.read, {symbolize_names: true}) } @active_files = @files.select { |record| !record[:eof] } else parent_dir = File.dirname @registry_path Dir.mkdirs parent_dir unless Dir.exist? parent_dir end end
Public Instance Methods
files_need_refresh?(refresh_files_time_trigger)
click to toggle source
# File lib/fileminer/miner.rb, line 126 def files_need_refresh?(refresh_files_time_trigger) Time.now - @files_refresh_time >= refresh_files_time_trigger end
read_lines(record)
click to toggle source
Read lines
# File lib/fileminer/miner.rb, line 101 def read_lines(record) file_path = record[:path] File.open file_path do |io| lines = [] io.pos = record[:pos] while lines.size < @batch_lines line = {host: @host, path: file_path, pos: io.pos} begin data = io.readline break if data.nil? if data[-1] != "\n" io.pos = line[:pos] break end rescue EOFError break end line[:end] = io.pos line[:data] = data lines << line end lines end end
refresh_files()
click to toggle source
Refresh
# File lib/fileminer/miner.rb, line 80 def refresh_files now = Time.now file_paths = Set.new file_paths.merge Dir[*@paths].select { |path| File.file? path } active_files, eof_size = select_active_files now, file_paths if eof_size > @max_eof_files move_eof_to_history end history_files = Set.new @history file_paths.select do |path| ! history_files.member? path end.each do |path| record = {path: path, pos: 0, eof: false} @files << record active_files << record end @active_files = active_files @files_refresh_time = now end
save_history()
click to toggle source
Save history
# File lib/fileminer/miner.rb, line 75 def save_history File.open(@history_path, 'w') { |io| io.write @history.to_json } end
save_registry()
click to toggle source
Save registry
# File lib/fileminer/miner.rb, line 70 def save_registry File.open(@registry_path, 'w') { |io| io.write @files.to_json } end
save_work_status()
click to toggle source
Save work status
# File lib/fileminer/miner.rb, line 64 def save_work_status save_history save_registry end
Private Instance Methods
move_eof_to_history()
click to toggle source
# File lib/fileminer/miner.rb, line 158 def move_eof_to_history to_removed_paths = @files.select do |record| record[:eof] end.map do |record| record[:path] end path_set = Set.new to_removed_paths @files.delete_if { |record| path_set.member? record[:path] } @history.concat to_removed_paths end
select_active_files(now, file_paths)
click to toggle source
# File lib/fileminer/miner.rb, line 131 def select_active_files(now, file_paths) eof_seconds = @eof_seconds eof_size = 0 active_files = @files.select do |record| path = record[:path] file_exists = file_paths.delete? path if record[:eof] eof_size += 1 else if file_exists # check if EOF if record[:pos] == File.size(path) && now - File.mtime(path) > eof_seconds record[:eof] = true eof_size += 1 end else # missing file, set :eof to true record[:eof] = true eof_size += 1 end end !record[:eof] end [active_files, eof_size] end