class Blazer::Adapters::SqlAdapter
Attributes
Public Class Methods
Source
# File lib/blazer/adapters/sql_adapter.rb, line 11 def self.name "Blazer::Connection::Adapter#{object_id}" end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 6 def initialize(data_source) super @connection_model = Class.new(Blazer::Connection) do def self.name "Blazer::Connection::Adapter#{object_id}" end establish_connection(data_source.settings["url"]) if data_source.settings["url"] end end
Calls superclass method
Blazer::Adapters::BaseAdapter::new
Public Instance Methods
Source
# File lib/blazer/adapters/sql_adapter.rb, line 162 def cachable?(statement) !%w[CREATE ALTER UPDATE INSERT DELETE].include?(statement.split.first.to_s.upcase) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 151 def cancel(run_id) if postgresql? select_all("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND query LIKE ?", ["%,run_id:#{run_id}%"]) elsif redshift? first_row = select_all("SELECT pid FROM stv_recents WHERE status = 'Running' AND query LIKE ?", ["%,run_id:#{run_id}%"]).first if first_row select_all("CANCEL #{first_row["pid"].to_i}") end end end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 171 def cohort_analysis_statement(statement, period:, days:) raise "Cohort analysis not supported" unless supports_cohort_analysis? cohort_column = statement.match?(/\bcohort_time\b/) ? "cohort_time" : "conversion_time" tzname = Blazer.time_zone.tzinfo.name if mysql? time_sql = "CONVERT_TZ(cohorts.cohort_time, '+00:00', ?)" case period when "day" date_sql = "CAST(DATE_FORMAT(#{time_sql}, '%Y-%m-%d') AS DATE)" date_params = [tzname] when "week" date_sql = "CAST(DATE_FORMAT(#{time_sql} - INTERVAL ((5 + DAYOFWEEK(#{time_sql})) % 7) DAY, '%Y-%m-%d') AS DATE)" date_params = [tzname, tzname] else date_sql = "CAST(DATE_FORMAT(#{time_sql}, '%Y-%m-01') AS DATE)" date_params = [tzname] end bucket_sql = "CAST(CEIL(TIMESTAMPDIFF(SECOND, cohorts.cohort_time, query.conversion_time) / ?) AS SIGNED)" else date_sql = "date_trunc(?, cohorts.cohort_time::timestamptz AT TIME ZONE ?)::date" date_params = [period, tzname] bucket_sql = "CEIL(EXTRACT(EPOCH FROM query.conversion_time - cohorts.cohort_time) / ?)::int" end # WITH not an optimization fence in Postgres 12+ statement = <<~SQL WITH query AS ( {placeholder} ), cohorts AS ( SELECT user_id, MIN(#{cohort_column}) AS cohort_time FROM query WHERE user_id IS NOT NULL AND #{cohort_column} IS NOT NULL GROUP BY 1 ) SELECT #{date_sql} AS period, 0 AS bucket, COUNT(DISTINCT cohorts.user_id) FROM cohorts GROUP BY 1 UNION ALL SELECT #{date_sql} AS period, #{bucket_sql} AS bucket, COUNT(DISTINCT query.user_id) FROM cohorts INNER JOIN query ON query.user_id = cohorts.user_id WHERE query.conversion_time IS NOT NULL AND query.conversion_time >= cohorts.cohort_time #{cohort_column == "conversion_time" ? "AND query.conversion_time != cohorts.cohort_time" : ""} GROUP BY 1, 2 SQL params = [statement] + date_params + date_params + [days.to_i * 86400] connection_model.send(:sanitize_sql_array, params) end
TODO treat date columns as already in time zone
Source
# File lib/blazer/adapters/sql_adapter.rb, line 125 def cost(statement) result = explain(statement) if sqlserver? result["TotalSubtreeCost"] else match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result) match[1] if match end end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 135 def explain(statement) if postgresql? || redshift? select_all("EXPLAIN #{statement}").rows.first.first elsif sqlserver? begin execute("SET SHOWPLAN_ALL ON") result = select_all(statement).each.first ensure execute("SET SHOWPLAN_ALL OFF") end result end rescue nil end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 232 def parameter_binding if postgresql? :numeric elsif sqlite? && prepared_statements? # Active Record silently ignores binds with SQLite when prepared statements are disabled :numeric elsif mysql? && prepared_statements? # Active Record silently ignores binds with MySQL when prepared statements are disabled :positional elsif sqlserver? proc do |statement, variables| variables.each_with_index do |(k, _), i| statement = statement.gsub("{#{k}}", "@#{i} ") end [statement, variables.values] end end end
Redshift adapter silently ignores binds
Source
# File lib/blazer/adapters/sql_adapter.rb, line 113 def preview_statement if sqlserver? "SELECT TOP (10) * FROM {table}" else "SELECT * FROM {table} LIMIT 10" end end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 227 def quoting ->(value) { connection_model.connection.quote(value) } end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 121 def reconnect connection_model.establish_connection(settings["url"]) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 18 def run_statement(statement, comment, bind_params = []) columns = [] rows = [] error = nil begin types = [] in_transaction do |connection| set_timeout(data_source.timeout) if data_source.timeout binds = bind_params.map { |v| ActiveRecord::Relation::QueryAttribute.new(nil, v, ActiveRecord::Type::Value.new) } if sqlite? type_map = connection.send(:type_map) connection.raw_connection.prepare("#{statement} /*#{comment}*/") do |stmt| stmt.bind_params(connection.send(:type_casted_binds, binds)) columns = stmt.columns rows = stmt.to_a types = stmt.types.map { |t| type_map.lookup(t) } end else result = connection.select_all("#{statement} /*#{comment}*/", nil, binds) columns = result.columns rows = result.rows if result.column_types.any? types = columns.size.times.map { |i| result.column_types[i] } end end end # cast values if types.any? rows = rows.map do |row| row.map.with_index do |v, i| v && (t = types[i]) ? t.send(:cast_value, v) : v end end end # fix for non-ASCII column names and charts if adapter_name == "Trilogy" columns = columns.map { |k| k.dup.force_encoding(Encoding::UTF_8) } end rescue => e error = e.message.sub(/.+ERROR: /, "") error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) } error = Blazer::VARIABLE_MESSAGE if error.include?("syntax error at or near \"$") || error.include?("Incorrect syntax near '@") || error.include?("your MySQL server version for the right syntax to use near '?") if error.include?("could not determine data type of parameter") error += " - try adding casting to variables and make sure none are inside a string literal" end reconnect if error.include?("PG::ConnectionBad") end [columns, rows, error] end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 102 def schema sql = if sqlite? "SELECT NULL, t.name, c.name, c.type, c.cid FROM sqlite_master t INNER JOIN pragma_table_info(t.name) c WHERE t.type IN ('table', 'view')" else add_schemas("SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns") end result = data_source.run_statement(sql) result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }.sort_by { |t| [t[:schema] == default_schema ? "" : t[:schema], t[:table]] } end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 166 def supports_cohort_analysis? postgresql? || mysql? end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 73 def tables sql = if sqlite? "SELECT NULL, name FROM sqlite_master WHERE type IN ('table', 'view') ORDER BY name" else add_schemas("SELECT table_schema, table_name FROM information_schema.tables") end result = data_source.run_statement(sql, refresh_cache: true) if postgresql? || redshift? || snowflake? result.rows.sort_by { |r| [r[0] == default_schema ? "" : r[0], r[1]] }.map do |row| table = if row[0] == default_schema row[1] else "#{row[0]}.#{row[1]}" end table = table.downcase if snowflake? { table: table, value: connection_model.connection.quote_table_name(table) } end else result.rows.map(&:second).sort end end
Protected Instance Methods
Source
# File lib/blazer/adapters/sql_adapter.rb, line 287 def adapter_name # prevent bad data source from taking down queries/new connection_model.connection.adapter_name rescue nil end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 308 def add_schemas(query) if settings["schemas"] where = "table_schema IN (?)" schemas = settings["schemas"] elsif mysql? where = "table_schema IN (?)" schemas = [default_schema] else where = "table_schema NOT IN (?)" schemas = ["information_schema"] schemas.map!(&:upcase) if snowflake? schemas << "pg_catalog" if postgresql? || redshift? end connection_model.send(:sanitize_sql_array, ["#{query} WHERE #{where}", schemas]) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 292 def default_schema @default_schema ||= begin if postgresql? || redshift? "public" elsif sqlserver? "dbo" elsif sqlite? nil elsif connection_model.respond_to?(:connection_db_config) connection_model.connection_db_config.database else connection_model.connection_config[:database] end end end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 259 def execute(statement) connection_model.connection.execute(statement) end
seperate from select_all
to prevent mysql error
Source
# File lib/blazer/adapters/sql_adapter.rb, line 343 def in_transaction connection_model.connection_pool.with_connection do |connection| if use_transaction? connection_model.transaction do yield connection raise ActiveRecord::Rollback end else yield connection end end end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 271 def mysql? ["MySQL", "Mysql2", "Mysql2Spatial", "Trilogy"].include?(adapter_name) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 263 def postgresql? ["PostgreSQL", "PostGIS"].include?(adapter_name) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 356 def prepared_statements? connection_model.connection.prepared_statements end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 267 def redshift? ["Redshift"].include?(adapter_name) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 253 def select_all(statement, params = []) statement = connection_model.send(:sanitize_sql_array, [statement] + params) if params.any? connection_model.connection.select_all(statement) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 324 def set_timeout(timeout) if postgresql? || redshift? execute("SET #{use_transaction? ? "LOCAL " : ""}statement_timeout = #{timeout.to_i * 1000}") elsif mysql? mariadb = connection_model.connection.mariadb? rescue false if mariadb execute("SET max_statement_time = #{timeout.to_i * 1000}") else execute("SET max_execution_time = #{timeout.to_i * 1000}") end else raise Blazer::TimeoutNotSupported, "Timeout not supported for #{adapter_name} adapter" end end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 283 def snowflake? data_source.adapter == "snowflake" end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 275 def sqlite? ["SQLite"].include?(adapter_name) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 279 def sqlserver? ["SQLServer", "tinytds", "mssql"].include?(adapter_name) end
Source
# File lib/blazer/adapters/sql_adapter.rb, line 339 def use_transaction? settings.key?("use_transaction") ? settings["use_transaction"] : true end