module Dataflow::SchemaMixin
Constants
- SAMPLE_DATA_OUTPUT
- SEPARATOR
Public Instance Methods
infer_partial_schema(where:, extended: false)
click to toggle source
# File lib/dataflow/schema_mixin.rb, line 53 def infer_partial_schema(where:, extended: false) if db_backend == :postgresql # Experimental sch = db_adapter.client.schema(read_dataset_name).to_h sch = sch.reject{ |k, v| k == :_id }.map { |k,v| [k, {type: v[:type].to_s}] }.to_h self.inferred_schema = sch save return sch end data_count = count(where: where) return {} if data_count == 0 max_per_process = 250 max_per_process = limit_per_process if respond_to? :limit_per_process equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = ordered_system_id_queries(batch_size: count_per_process, where: where) sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx| all(where: queries[idx].merge(where)) end end
infer_schema(samples_count: 0, extended: false)
click to toggle source
Generate a schema based on this collection's records. We evaluate the schema of each record and then merge all the information together. @param extended [Boolean] Set to true to keep each field as a basic type.
Set to false to reduce the terminal arrays to a single key (under the type array).
@return [Hash] with one entry per 'column'/'field'. The values
contains information about the type and usage.
# File lib/dataflow/schema_mixin.rb, line 13 def infer_schema(samples_count: 0, extended: false) if db_backend == :postgresql # Experimental sch = db_adapter.client.schema(read_dataset_name).to_h sch = sch.reject{ |k, v| k == :_id }.map { |k,v| [k, {type: v[:type].to_s}] }.to_h self.inferred_schema = sch save return sch end data_count = samples_count == 0 ? count : samples_count # invoked in the base class return {} if data_count == 0 # find out how many batches are needed max_per_process = 1000 max_per_process = limit_per_process if respond_to?(:limit_per_process) && limit_per_process > 0 equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = ordered_system_id_queries(batch_size: count_per_process)[0...data_count] self.inferred_schema_at = Time.now self.inferred_schema_from = samples_count on_schema_inference_started sch = schema_inferrer.infer_schema(batch_count: queries.count, extended: extended) do |idx| progress = (idx / queries.count.to_f * 100).ceil on_schema_inference_progressed(pct_complete: progress) all(where: queries[idx]) end self.inferred_schema = sch save on_schema_inference_finished sch end
sample_data(count: 5, mode: 'tabular')
click to toggle source
Outputs sample data. Support either output raw data (as-is) tabular data.
# File lib/dataflow/schema_mixin.rb, line 88 def sample_data(count: 5, mode: 'tabular') mode = mode.to_s.downcase unless SAMPLE_DATA_OUTPUT.include?(mode) raise Errors::InvalidConfigurationError, "Mode must be one of '#{SAMPLE_DATA_OUTPUT.join(', ')}'. Given: #{mode}" end samples = all { |x| x.limit(count) }.to_a return samples if mode == 'raw' return {} if samples.count == 0 # tabular output schm = schema_inferrer.infer_schema(dataset: samples, extended: true) keys = schm.keys res = samples.map do |sample| keys.map do |key| value = record_value(record: sample, key: key) next if value.nil? [key, value] end.compact.to_h end res end
schema_inferrer()
click to toggle source
# File lib/dataflow/schema_mixin.rb, line 79 def schema_inferrer Schema::Inference::SchemaInferrer.new( separator: SEPARATOR, convert_types_to_string: true ) end
Private Instance Methods
add_value_to_record(record:, key:, value:)
click to toggle source
# File lib/dataflow/schema_mixin.rb, line 128 def add_value_to_record(record:, key:, value:) tokens = key.is_a?(String) ? key.split(SEPARATOR) : [key] current_ref = record previous_token = tokens[0] tokens[1..-1].each_with_index do |token| if is_integer?(token) current_ref[previous_token] ||= [] current_ref = current_ref[previous_token] previous_token = token.to_i else current_ref[previous_token] ||= {} current_ref = current_ref[previous_token] previous_token = token end end current_ref[previous_token] = value end
is_integer?(value)
click to toggle source
# File lib/dataflow/schema_mixin.rb, line 148 def is_integer?(value) (/^[+-]?[0-9]+$/ =~ value).present? end
record_dig_tokens(key:, use_sym: false)
click to toggle source
# File lib/dataflow/schema_mixin.rb, line 113 def record_dig_tokens(key:, use_sym: false) return [key] unless key.is_a?(String) key.split(SEPARATOR).map do |token| # only parse integers for array indexing next token.to_i if is_integer?(token) next token.to_sym if use_sym token end end
record_value(record:, key:)
click to toggle source
# File lib/dataflow/schema_mixin.rb, line 123 def record_value(record:, key:) tokens = record_dig_tokens(key: key) record.dig(*tokens) end