class Statlysis::Timely
Constants
- SqlColumns
Public Class Methods
new(source, opts = {})
click to toggle source
Calls superclass method
Statlysis::Cron::new
# File lib/statlysis/cron/timely.rb, line 8 def initialize source, opts = {} super Statlysis.check_set_database SqlColumns.each {|sym| instance_variable_set "@#{sym}", (opts[sym] || []) } cron.setup_stat_model cron end
Public Instance Methods
multiple_dimensions_output()
click to toggle source
# File lib/statlysis/cron/timely/multiple_dimensions.rb, line 7 def multiple_dimensions_output self.send "multiple_dimensions_output_with#{cron.time_column ? '' : 'out'}_time_column" end
one_dimension_output()
click to toggle source
one dimension must have `time_column`, or there's nothing to do
TODO add to FAQ
-
if you want to statistics one column through `group_by_columns`
params, and dont need time column, then you could use `always` DSL.
# File lib/statlysis/cron/timely/one_dimension.rb, line 13 def one_dimension_output cron.time_range.map do |time| _hash = {:t => time, :timely_c => 0, :totally_c => 0} sum_column_to_result_columns_hash.each do |_sum_col, _result_cols| _result_cols.each do |_result_col| _hash[_result_col] = 0.0 end end # support multiple data sources _first_source = nil cron.multiple_dataset.sources.each do |s| _t = DateTime1970 _t = is_time_column_integer? ? _t.to_i : _t _scope_one = s.where(unit_range_query(time)) # TODO cache pre-result _scope_all = s.where(unit_range_query(time, _t)) # 1. count _hash[:timely_c] += _scope_one.count _hash[:totally_c] += _scope_all.count # 2. sum sum_column_to_result_columns_hash.each do |_sum_col, _result_cols| _hash[_result_cols[0]] = _scope_one.map(&_sum_col).reduce(:+).to_f _hash[_result_cols[1]] = _scope_all.map(&_sum_col).reduce(:+).to_f end # 3. group_concat _other_json = {} _other_json[:group_concat_columns] ||= {} cron.group_concat_columns.each do |_group_concat_column| _other_json[:group_concat_columns][_group_concat_column] = _scope_one.map(&_group_concat_column).uniq end _hash[:other_json] = _other_json.to_json _first_source ||= s.where(unit_range_query(time)) end logger.info "#{time.in_time_zone(cron.time_zone)} multiple_dataset:#{cron.multiple_dataset.name} _first_source:#{_first_source.inspect} timely_c:#{_hash[:timely_c]} totally_c:#{_hash[:totally_c]}" if ENV['DEBUG'] _hash end.select {|r1| r1.except(:t, :other_json).values.reject {|r2| r2.zero? }.any? } end
output()
click to toggle source
# File lib/statlysis/cron/timely.rb, line 129 def output @output ||= (cron.group_by_columns.any? ? multiple_dimensions_output : one_dimension_output) end
run()
click to toggle source
设置数据源,并保存结果入数据库
# File lib/statlysis/cron/timely.rb, line 17 def run (logger.info("#{cron.multiple_dataset.name} have no result!"); return false) if cron.output.blank? raise "cron.output has no Enumerable" if not cron.output.class.included_modules.include? Enumerable num_i = 0; num_add = 999 Statlysis.sequel.transaction do # delete first in range cron.stat_model.where("t >= ? AND t <= ?", cron.output[0][:t], cron.output[-1][:t]).delete if cron.time_column? # TODO partial delete cron.stat_model.where("").delete if cron.group_by_columns? while !(_a = cron.output[num_i..(num_i+num_add)]).blank? do # batch insert all cron.stat_model.multi_insert _a num_i += (num_add + 1) end end # record last executed time clock.update return self end
setup_stat_model()
click to toggle source
# File lib/statlysis/cron/timely.rb, line 44 def setup_stat_model cron.stat_table_name = Utils.normalise_name cron.class.name.split("::")[-1], cron.multiple_dataset.name, cron.source_where_array, cron.group_by_columns.map {|i| i[:column_name] }, TimeUnitToTableSuffixHash[cron.time_unit] raise "mysql only support table_name in 64 characters, the size of '#{cron.stat_table_name}' is #{cron.stat_table_name.to_s.size}. please set cron.stat_table_name when you create a Cron instance" if cron.stat_table_name.to_s.size > 64 # create basic unchangeable table structure if not Statlysis.sequel.table_exists?(cron.stat_table_name) Statlysis.sequel.transaction do Statlysis.sequel.create_table cron.stat_table_name, DefaultTableOpts do primary_key :id # Add one column at least in this block to avoid `SQLite3::SQLException: near ")": syntax error (Sequel::DatabaseError)` end Statlysis.sequel.add_column cron.stat_table_name, :t, DateTime if cron.time_column? # alias for :time # add count columns if cron.time_column? count_columns = [:timely_c, :totally_c] # alias for :count count_columns.each {|w| Statlysis.sequel.add_column cron.stat_table_name, w, Integer } else Statlysis.sequel.add_column cron.stat_table_name, :c, Integer # alias for :count end end end # add group_by columns & indexes remodel cron.stat_model.cron = cron if cron.group_by_columns.any? cron.group_by_columns.each do |_h| if not cron.stat_model.columns.include?(_h[:column_name]) _h[:type] = SymbolToClassInDataType[_h[:type]] if _h[:type].is_a?(Symbol) # && (Statlysis.sequel.opts[:adapter] == :sqlite) Statlysis.sequel.add_column cron.stat_table_name, _h[:column_name], _h[:type] end end end # add sum columns remodel sum_column_to_result_columns_hash.each do |_sum_col, _result_cols| _result_cols.each do |_result_col| if not cron.stat_model.columns.include?(_result_col) # convert to Interger type in view if needed Statlysis.sequel.add_column cron.stat_table_name, _result_col, Float end end end # Fix there should be uniq index name between tables # `SQLite3::SQLException: index t_timely_c_totally_c already exists (Sequel::DatabaseError)` _group_by_columns_index_name = cron.group_by_columns.reject {|i| i[:no_index] }.map {|i| i[:column_name] } _truncated_columns = _group_by_columns_index_name.dup # only String column _group_by_columns_index_name = _group_by_columns_index_name.unshift :t if cron.time_column? # TODO use https://github.com/german/redis_orm to support full string indexes if !Statlysis.configuration.is_skip_database_index && _group_by_columns_index_name.any? mysql_per_column_length_limit_in_one_index = (1000 / 3.0 / _group_by_columns_index_name.size.to_f).to_i index_columns_str = _group_by_columns_index_name.map {|s| _truncated_columns.include?(s) ? "#{s.to_s}(#{mysql_per_column_length_limit_in_one_index})" : s.to_s }.join(", ") index_columns_str = "(#{index_columns_str})" begin # NOTE mysql indexes key length limit is 1000 bytes cron.stat_model.dataset.with_sql("CREATE INDEX #{Utils.sha1_name(_group_by_columns_index_name)} ON #{cron.stat_table_name} #{index_columns_str};").to_a rescue => e raise e if not e.inspect.match(/exists|duplicate/i) end end # add group_concat column remodel if cron.group_concat_columns.any? && !cron.stat_model.columns.include?(:other_json) Statlysis.sequel.add_column cron.stat_table_name, :other_json, :text end # add access to group_concat values in other_json remodel.class_eval do define_method("other_json_hash") do @__other_json_hash_cache ||= (JSON.parse(self.other_json) rescue {}) end cron.group_concat_columns.each do |_group_concat_column| define_method("#{_group_concat_column}_values") do self.other_json_hash[_group_concat_column.to_s] end end end remodel end
Protected Instance Methods
sum_column_to_result_columns_hash()
click to toggle source
e.g. {:fav_count=>[:timely_favcount_s, :totally_favcount_s]}
# File lib/statlysis/cron/timely.rb, line 145 def sum_column_to_result_columns_hash cron.sum_columns.inject({}) do |h, _col| [:timely, :totally].each do |_pre| h[_col] ||= [] h[_col] << Utils.normalise_name(_pre, _col, 's').to_sym end h end end
unit_range_query(time, time_begin = nil)
click to toggle source
# File lib/statlysis/cron/timely.rb, line 134 def unit_range_query time, time_begin = nil # time begin and end tb = time te = (time+1.send(cron.time_unit)-1.second) tb, te = tb.to_i, te.to_i if is_time_column_integer? tb = time_begin || tb return ["#{cron.time_column} >= ? AND #{cron.time_column} < ?", tb, te] if is_activerecord? return {cron.time_column => {"$gte" => tb.utc, "$lt" => te.utc}} if is_mongoid? # .utc [fix undefined method `__bson_dump__' for Sun, 16 Dec 2012 16:00:00 +0000:DateTime] end
Private Instance Methods
multiple_dimensions_output_with_time_column()
click to toggle source
# File lib/statlysis/cron/timely/multiple_dimensions.rb, line 12 def multiple_dimensions_output_with_time_column cron.time_range.map do |time| raise DefaultNotImplementWrongMessage # TODO end end
multiple_dimensions_output_without_time_column()
click to toggle source
TODO encapsulate Mongoid MapReduce
in collection output mode TODO support large dataset, e.g. a million.
# File lib/statlysis/cron/timely/multiple_dimensions.rb, line 20 def multiple_dimensions_output_without_time_column mr = Javascript::MultiDimensionalCount.new(cron) array = [] cron.multiple_dataset.sources.each do |_source| # _source = _source.time_range # TODO array += _source.map_reduce(mr.map_func, mr.reduce_func) .out(inline: 1) # TODO use replace mode .to_a.map do |i| v = i['value'] _h = {:c => v['count']} cron.group_by_columns.each do |_group_by_column| _h[_group_by_column[:column_name]] = v[_group_by_column[:column_name].to_s] end _h[:other_json] = {} cron.group_concat_columns.each do |_group_concat_column| _h[:other_json][_group_concat_column] = v["#{_group_concat_column}_values"].inject({}) {|_h2, i2| _h2[i2] ||= 0; _h2[i2] += 1; _h2 } end _h[:other_json] = _h[:other_json].to_json _h end end array # TODO support sum_columns end
reclock()
click to toggle source
# File lib/statlysis/cron/timely.rb, line 170 def reclock # setup a clock to record the last updated @clock = Clock.new "last_updated_at__#{cron.stat_table_name}" end
remodel()
click to toggle source
# File lib/statlysis/cron/timely.rb, line 156 def remodel @clock ||= reclock n = cron.stat_table_name.to_s.singularize.camelize cron.stat_model = class_eval <<-MODEL, __FILE__, __LINE__+1 class ::#{n} < Sequel::Model; self.set_dataset :#{cron.stat_table_name} cattr_accessor :cron end #{n} MODEL end