class Statlysis::Timely

Constants

SqlColumns

Public Class Methods

new(source, opts = {}) click to toggle source
Calls superclass method Statlysis::Cron::new
# File lib/statlysis/cron/timely.rb, line 8
def initialize source, opts = {}
  super
  Statlysis.check_set_database
  SqlColumns.each {|sym| instance_variable_set "@#{sym}", (opts[sym] || []) }
  cron.setup_stat_model
  cron
end

Public Instance Methods

multiple_dimensions_output() click to toggle source
# File lib/statlysis/cron/timely/multiple_dimensions.rb, line 7
def multiple_dimensions_output
  self.send "multiple_dimensions_output_with#{cron.time_column ? '' : 'out'}_time_column"
end
one_dimension_output() click to toggle source

one dimension must have `time_column`, or there's nothing to do

TODO add to FAQ

  • if you want to statistics one column through `group_by_columns`

params, and dont need time column, then you could use `always` DSL.

# File lib/statlysis/cron/timely/one_dimension.rb, line 13
def one_dimension_output
  cron.time_range.map do |time|
    _hash = {:t => time, :timely_c => 0, :totally_c => 0}
    sum_column_to_result_columns_hash.each do |_sum_col, _result_cols|
      _result_cols.each do |_result_col|
        _hash[_result_col] = 0.0
      end
    end

    # support multiple data sources
    _first_source = nil
    cron.multiple_dataset.sources.each do |s|
      _t = DateTime1970
      _t = is_time_column_integer? ? _t.to_i : _t

      _scope_one = s.where(unit_range_query(time))
      # TODO cache pre-result
      _scope_all = s.where(unit_range_query(time, _t))

      # 1. count
      _hash[:timely_c]  += _scope_one.count
      _hash[:totally_c] += _scope_all.count

      # 2. sum
      sum_column_to_result_columns_hash.each do |_sum_col, _result_cols|
        _hash[_result_cols[0]] = _scope_one.map(&_sum_col).reduce(:+).to_f
        _hash[_result_cols[1]] = _scope_all.map(&_sum_col).reduce(:+).to_f
      end

      # 3. group_concat
      _other_json = {}
      _other_json[:group_concat_columns] ||= {}
      cron.group_concat_columns.each do |_group_concat_column|
        _other_json[:group_concat_columns][_group_concat_column] = _scope_one.map(&_group_concat_column).uniq
      end
      _hash[:other_json] = _other_json.to_json

      _first_source ||= s.where(unit_range_query(time))
    end
    logger.info "#{time.in_time_zone(cron.time_zone)} multiple_dataset:#{cron.multiple_dataset.name} _first_source:#{_first_source.inspect} timely_c:#{_hash[:timely_c]} totally_c:#{_hash[:totally_c]}" if ENV['DEBUG']

    _hash
  end.select {|r1| r1.except(:t, :other_json).values.reject {|r2| r2.zero? }.any? }
end
output() click to toggle source
# File lib/statlysis/cron/timely.rb, line 129
def output
  @output ||= (cron.group_by_columns.any? ? multiple_dimensions_output : one_dimension_output)
end
run() click to toggle source

设置数据源,并保存结果入数据库

# File lib/statlysis/cron/timely.rb, line 17
def run
  (logger.info("#{cron.multiple_dataset.name} have no result!"); return false) if cron.output.blank?

  raise "cron.output has no Enumerable" if not cron.output.class.included_modules.include? Enumerable

  num_i = 0; num_add = 999
  Statlysis.sequel.transaction do
    # delete first in range
    cron.stat_model.where("t >= ? AND t <= ?", cron.output[0][:t], cron.output[-1][:t]).delete if cron.time_column?

    # TODO partial delete
    cron.stat_model.where("").delete if cron.group_by_columns?

    while !(_a = cron.output[num_i..(num_i+num_add)]).blank? do
      # batch insert all
      cron.stat_model.multi_insert _a
      num_i += (num_add + 1)
    end
  end

  # record last executed time
  clock.update

  return self
end
setup_stat_model() click to toggle source
# File lib/statlysis/cron/timely.rb, line 44
def setup_stat_model
  cron.stat_table_name = Utils.normalise_name cron.class.name.split("::")[-1], cron.multiple_dataset.name, cron.source_where_array, cron.group_by_columns.map {|i| i[:column_name] }, TimeUnitToTableSuffixHash[cron.time_unit]
  raise "mysql only support table_name in 64 characters, the size of '#{cron.stat_table_name}' is #{cron.stat_table_name.to_s.size}. please set cron.stat_table_name when you create a Cron instance" if cron.stat_table_name.to_s.size > 64


  # create basic unchangeable table structure
  if not Statlysis.sequel.table_exists?(cron.stat_table_name)
    Statlysis.sequel.transaction do
      Statlysis.sequel.create_table cron.stat_table_name, DefaultTableOpts do
        primary_key :id # Add one column at least in this block to avoid `SQLite3::SQLException: near ")": syntax error (Sequel::DatabaseError)`
      end
      Statlysis.sequel.add_column   cron.stat_table_name, :t, DateTime if cron.time_column? # alias for :time

      # add count columns
      if cron.time_column?
        count_columns = [:timely_c, :totally_c] # alias for :count
        count_columns.each {|w| Statlysis.sequel.add_column cron.stat_table_name, w, Integer }
      else
        Statlysis.sequel.add_column cron.stat_table_name, :c, Integer # alias for :count
      end

    end
  end
  # add group_by columns & indexes
  remodel
  cron.stat_model.cron = cron
  if cron.group_by_columns.any?
    cron.group_by_columns.each do |_h|
      if not cron.stat_model.columns.include?(_h[:column_name])
        _h[:type] = SymbolToClassInDataType[_h[:type]] if _h[:type].is_a?(Symbol) # && (Statlysis.sequel.opts[:adapter] == :sqlite)
        Statlysis.sequel.add_column cron.stat_table_name, _h[:column_name], _h[:type]
      end
    end
  end

  # add sum columns
  remodel
  sum_column_to_result_columns_hash.each do |_sum_col, _result_cols|
    _result_cols.each do |_result_col|
      if not cron.stat_model.columns.include?(_result_col)
        # convert to Interger type in view if needed
        Statlysis.sequel.add_column cron.stat_table_name, _result_col, Float
      end
    end
  end

  # Fix there should be uniq index name between tables
  # `SQLite3::SQLException: index t_timely_c_totally_c already exists (Sequel::DatabaseError)`
  _group_by_columns_index_name = cron.group_by_columns.reject {|i| i[:no_index] }.map {|i| i[:column_name] }
  _truncated_columns = _group_by_columns_index_name.dup # only String column
  _group_by_columns_index_name = _group_by_columns_index_name.unshift :t if cron.time_column?
  # TODO use https://github.com/german/redis_orm to support full string indexes
  if !Statlysis.configuration.is_skip_database_index && _group_by_columns_index_name.any?
    mysql_per_column_length_limit_in_one_index = (1000 / 3.0 / _group_by_columns_index_name.size.to_f).to_i
    index_columns_str = _group_by_columns_index_name.map {|s| _truncated_columns.include?(s) ? "#{s.to_s}(#{mysql_per_column_length_limit_in_one_index})" : s.to_s }.join(", ")
    index_columns_str = "(#{index_columns_str})"
    begin
      # NOTE mysql indexes key length limit is 1000 bytes
      cron.stat_model.dataset.with_sql("CREATE INDEX #{Utils.sha1_name(_group_by_columns_index_name)} ON #{cron.stat_table_name} #{index_columns_str};").to_a
    rescue => e
      raise e if not e.inspect.match(/exists|duplicate/i)
    end
  end

  # add group_concat column
  remodel
  if cron.group_concat_columns.any? && !cron.stat_model.columns.include?(:other_json)
    Statlysis.sequel.add_column cron.stat_table_name, :other_json, :text
  end

  # add access to group_concat values in other_json
  remodel.class_eval do
    define_method("other_json_hash") do
      @__other_json_hash_cache ||= (JSON.parse(self.other_json) rescue {})
    end
    cron.group_concat_columns.each do |_group_concat_column|
      define_method("#{_group_concat_column}_values") do
        self.other_json_hash[_group_concat_column.to_s]
      end
    end
  end

  remodel
end

Protected Instance Methods

sum_column_to_result_columns_hash() click to toggle source

e.g. {:fav_count=>[:timely_favcount_s, :totally_favcount_s]}

# File lib/statlysis/cron/timely.rb, line 145
def sum_column_to_result_columns_hash
  cron.sum_columns.inject({}) do |h, _col|
    [:timely, :totally].each do |_pre|
      h[_col] ||= []
      h[_col] << Utils.normalise_name(_pre, _col, 's').to_sym
    end
    h
  end
end
unit_range_query(time, time_begin = nil) click to toggle source
# File lib/statlysis/cron/timely.rb, line 134
def unit_range_query time, time_begin = nil
  # time begin and end
  tb = time
  te = (time+1.send(cron.time_unit)-1.second)
  tb, te = tb.to_i, te.to_i if is_time_column_integer?
  tb = time_begin || tb
  return ["#{cron.time_column} >= ? AND #{cron.time_column} < ?", tb, te] if is_activerecord?
  return {cron.time_column => {"$gte" => tb.utc, "$lt" => te.utc}} if is_mongoid? # .utc  [fix undefined method `__bson_dump__' for Sun, 16 Dec 2012 16:00:00 +0000:DateTime]
end

Private Instance Methods

multiple_dimensions_output_with_time_column() click to toggle source
# File lib/statlysis/cron/timely/multiple_dimensions.rb, line 12
def multiple_dimensions_output_with_time_column
  cron.time_range.map do |time|
    raise DefaultNotImplementWrongMessage # TODO
  end
end
multiple_dimensions_output_without_time_column() click to toggle source

TODO encapsulate Mongoid MapReduce in collection output mode TODO support large dataset, e.g. a million.

# File lib/statlysis/cron/timely/multiple_dimensions.rb, line 20
def multiple_dimensions_output_without_time_column
  mr = Javascript::MultiDimensionalCount.new(cron)

  array = []
  cron.multiple_dataset.sources.each do |_source|
    # _source = _source.time_range # TODO
    array += _source.map_reduce(mr.map_func, mr.reduce_func)
                    .out(inline: 1) # TODO use replace mode
                    .to_a.map do |i|
                      v = i['value']
                      _h = {:c => v['count']}

                      cron.group_by_columns.each do |_group_by_column|
                        _h[_group_by_column[:column_name]] = v[_group_by_column[:column_name].to_s]
                      end

                      _h[:other_json] = {}
                      cron.group_concat_columns.each do |_group_concat_column|
                        _h[:other_json][_group_concat_column] = v["#{_group_concat_column}_values"].inject({}) {|_h2, i2| _h2[i2] ||= 0; _h2[i2] += 1; _h2 }
                      end
                      _h[:other_json] = _h[:other_json].to_json

                      _h
                    end
  end
  array

  # TODO support sum_columns
end
reclock() click to toggle source
# File lib/statlysis/cron/timely.rb, line 170
def reclock
  # setup a clock to record the last updated
  @clock = Clock.new "last_updated_at__#{cron.stat_table_name}"
end
remodel() click to toggle source
# File lib/statlysis/cron/timely.rb, line 156
    def remodel
      @clock ||= reclock

      n = cron.stat_table_name.to_s.singularize.camelize
      cron.stat_model = class_eval <<-MODEL, __FILE__, __LINE__+1
        class ::#{n} < Sequel::Model;
          self.set_dataset :#{cron.stat_table_name}

          cattr_accessor :cron
        end
        #{n}
      MODEL
    end