class Hive::Stream
Hive::Stream
allows a live view of the HIVE blockchain.
Example streaming blocks:
stream = Hive::Stream.new stream.blocks do |block, block_num| puts "#{block_num} :: #{block.witness}" end
Example streaming transactions:
stream = Hive::Stream.new stream.transactions do |trx, trx_id, block_num| puts "#{block_num} :: #{trx_id} :: operations: #{trx.operations.size}" end
Example streaming operations:
stream = Hive::Stream.new stream.operations do |op, trx_id, block_num| puts "#{block_num} :: #{trx_id} :: #{op.type}: #{op.value.to_json}" end
Allows streaming of block headers, full blocks, transactions, operations and virtual operations.
Constants
- BLOCK_INTERVAL
- MAX_BACKOFF_BLOCK_INTERVAL
- MAX_RETRY_COUNT
- MAX_VOP_READ_AHEAD
- SHUFFLE_ROUND_LENGTH
- VOP_TRX_ID
Attributes
Public Class Methods
@param options [Hash] additional options @option options [Hive::DatabaseApi] :database_api @option options [Hive::BlockApi] :block_api @option options [Hive::AccountHistoryApi || Hive::CondenserApi] :account_history_api @option options [Symbol] :mode we have the choice between
* :head the last block * :irreversible the block that is confirmed by 2/3 of all block producers and is thus irreversible!
@option options [Boolean] :no_warn do not generate warnings
# File lib/hive/stream.rb, line 49 def initialize(options = {mode: :irreversible}) @instance_options = options @database_api = options[:database_api] || Hive::DatabaseApi.new(options) @block_api = options[:block_api] || Hive::BlockApi.new(options) @account_history_api = options[:account_history_api] @mode = options[:mode] || :irreversible @no_warn = !!options[:no_warn] end
Public Instance Methods
Use this method to stream block headers. This is quite a bit faster than requesting full blocks.
@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.
# File lib/hive/stream.rb, line 76 def block_headers(options = {}, &block) block_objects(options.merge(object: :block_headers), block) end
Use this method to stream block numbers. This is significantly faster than requesting full blocks and even block headers. Basically, the only thing this method does is call {Hive::Database#get_dynamic_global_properties} at 3 second intervals.
@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.
# File lib/hive/stream.rb, line 66 def block_numbers(options = {}, &block) block_objects(options.merge(object: :block_numbers), block) end
Use this method to stream full blocks.
@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.
# File lib/hive/stream.rb, line 85 def blocks(options = {}, &block) block_objects(options.merge(object: :blocks), block) end
Returns the latest operations from the blockchain.
stream = Hive::Stream.new stream.operations do |op| puts op.to_json end
If symbol are passed to `types` option, then only that operation is returned. Expected symbols are:
account_create_operation account_create_with_delegation_operation account_update_operation account_witness_proxy_operation account_witness_vote_operation cancel_transfer_from_savings_operation change_recovery_account_operation claim_reward_balance_operation comment_operation comment_options_operation convert_operation custom_operation custom_json_operation decline_voting_rights_operation delegate_vesting_shares_operation delete_comment_operation escrow_approve_operation escrow_dispute_operation escrow_release_operation escrow_transfer_operation feed_publish_operation limit_order_cancel_operation limit_order_create_operation limit_order_create2_operation pow_operation pow2_operation recover_account_operation request_account_recovery_operation set_withdraw_vesting_route_operation transfer_operation transfer_from_savings_operation transfer_to_savings_operation transfer_to_vesting_operation vote_operation withdraw_vesting_operation witness_update_operation
For example, to stream only votes:
stream = Hive::Stream.new stream.operations(types: :vote_operation) do |vote| puts vote.to_json end
… Or …
stream = Hive::Stream.new stream.operations(:vote_operation) do |vote| puts vote.to_json end
You can also stream virtual operations:
stream = Hive::Stream.new stream.operations(types: :author_reward_operation, only_virtual: true) do |vop| v = vop.value puts "#{v.author} got paid for #{v.permlink}: #{[v.hbd_payout, v.hive_payout, v.vesting_payout]}" end
… or multiple virtual operation types;
stream = Hive::Stream.new stream.operations(types: [:producer_reward_operation, :author_reward_operation], only_virtual: true) do |vop| puts vop.to_json end
… or all types, including virtual operation types from the head block number:
stream = Hive::Stream.new(mode: :head) stream.operations(include_virtual: true) do |op| puts op.to_json end
Expected virtual operation types:
producer_reward_operation author_reward_operation curation_reward_operation fill_convert_request_operation fill_order_operation fill_vesting_withdraw_operation interest_operation shutdown_witness_operation
@param args [Symbol || Array<Symbol> || Hash] the type(s) of operation or hash of expanded options, optional. @option args [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option args [Integer] :until_block_num Ends the stream at the given block number. Default: nil. @option args [Symbol || Array<Symbol>] :types the type(s) of operation, optional. @option args [Boolean] :only_virtual Only stream virtual options. Setting this true will improve performance because the stream only needs block numbers to then retrieve virtual operations. Default: false. @option args [Boolean] :include_virtual Also stream virtual options. Setting this true will impact performance. Default: false. @param block the block to execute for each result. Yields: |op, trx_id, block_num|
# File lib/hive/stream.rb, line 213 def operations(*args, &block) options = {} types = [] only_virtual = false include_virtual = false last_block_num = nil within_shuffle_round = nil initial_head_block_number = database_api.get_dynamic_global_properties do |dgpo| dgpo.head_block_number end case args.first when Hash options = args.first types = transform_types(options[:types]) only_virtual = !!options[:only_virtual] || false include_virtual = !!options[:include_virtual] || only_virtual || false when Symbol, Array then types = transform_types(args) end if only_virtual block_numbers(options) do |block_num| within_shuffle_round ||= initial_head_block_number - block_num < SHUFFLE_ROUND_LENGTH * 2 get_virtual_ops(types, block_num, within_shuffle_round, block) end else transactions(options) do |transaction, trx_id, block_num| transaction.operations.each do |op| yield op, trx_id, block_num if types.none? || types.include?(op.type) next unless last_block_num != block_num last_block_num = block_num within_shuffle_round ||= initial_head_block_number - block_num < SHUFFLE_ROUND_LENGTH * 2 get_virtual_ops(types, block_num, within_shuffle_round, block) if include_virtual end end end end
Use this method to stream each transaction.
@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.
# File lib/hive/stream.rb, line 94 def transactions(options = {}, &block) blocks(options) do |block, block_num| if block.nil? warn "Batch missing block_num: #{block_num}, retrying ..." unless @no_warn block = block_api.get_block(block_num: block_num) do |result| result.block end end block.transactions.each_with_index do |transaction, index| trx_id = block.transaction_ids[index] yield transaction, trx_id, block_num end end end
Private Instance Methods
@private
# File lib/hive/stream.rb, line 265 def block_objects(options = {}, block) object = options[:object] object_method = "get_#{object}".to_sym block_interval = BLOCK_INTERVAL use_block_range = true at_block_num, until_block_num = if !!block_range = options[:block_range] [block_range.first, block_range.last] else [options[:at_block_num], options[:until_block_num]] end loop do break if !!until_block_num && !!at_block_num && until_block_num < at_block_num database_api.get_dynamic_global_properties do |properties| current_block_num = find_block_number(properties) current_block_num = [current_block_num, until_block_num].compact.min at_block_num ||= current_block_num if current_block_num >= at_block_num range = at_block_num..current_block_num if object == :block_numbers range.each do |n| block.call n block_interval = BLOCK_INTERVAL end else loop do begin if use_block_range block_api.send(object_method, block_range: range) do |b, n| block.call b, n block_interval = BLOCK_INTERVAL end else range.each do |block_num| block_api.get_block(block_num: block_num) do |b, n| block.call b.block, b.block.block_id[0..7].to_i(16) block_interval = BLOCK_INTERVAL end end end rescue Hive::UnknownError => e if e.message =~ /Could not find method get_block_range/ use_block_range = false redo end raise e end break end end at_block_num = range.max + 1 else # The stream has stalled, so let's back off and let the node sync # up. We'll catch up with a bigger batch in the next cycle. block_interval = [block_interval * 2, MAX_BACKOFF_BLOCK_INTERVAL].min end end sleep block_interval end end
@private
# File lib/hive/stream.rb, line 336 def find_block_number(properties) block_num = case mode when :head then properties.head_block_number when :irreversible then properties.last_irreversible_block_num else; raise Hive::ArgumentError, "Unknown mode: #{mode}" end block_num end
@private
# File lib/hive/stream.rb, line 361 def get_virtual_ops(types, block_num, within_shuffle_round, block) retries = 0 vop_read_ahead = within_shuffle_round ? 1 : MAX_VOP_READ_AHEAD @virtual_ops_cache ||= {} @virtual_ops_cache = @virtual_ops_cache.reject do |k, v| if k < block_num warn "Found orphaned virtual operations for block_num #{k}: #{v.to_json}" unless @no_warn true end false end loop do vops_found = false if account_history_api.class == Hive::AccountHistoryApi || @enum_virtual_ops_supported.nil? && @enum_virtual_ops_supported != false begin # Use account_history_api.enum_virtual_ops, if supported. if @virtual_ops_cache.empty? || !@virtual_ops_cache.keys.include?(block_num) (block_num..(block_num + vop_read_ahead)).each do |block_num| @virtual_ops_cache[block_num] = [] end enum_virtual_ops_options = { block_range_begin: block_num, block_range_end: block_num + vop_read_ahead, # TODO Use: mode != :irreversible include_reversible: true } account_history_api.enum_virtual_ops(enum_virtual_ops_options) do |result| @enum_virtual_ops_supported = true result.ops.each do |vop| @virtual_ops_cache[vop.block] << vop end end end vops_found = true if !!@virtual_ops_cache[block_num] @virtual_ops_cache[block_num].each do |vop| next unless block_num == vop.block next if types.any? && !types.include?(vop.op.type) if vop.virtual_op == 0 # require 'pry' ; binding.pry if vop.op.type == 'producer_reward_operation' warn "Found non-virtual operation (#{vop.op.type}) in enum_virtual_ops result for block: #{block_num}" unless @no_warn next end block.call vop.op, vop.trx_id, block_num end @virtual_ops_cache.delete(block_num) end rescue Hive::UnknownError => e if e.message =~ /This API is not supported for account history backed by Chainbase/ warn "Retrying with get_ops_in_block (api does not support enum_virtual_ops)" unless @no_warn @enum_virtual_ops_supported = false vops_found = false else raise e end end end break if vops_found # Fallback to previous method. warn "Retrying with get_ops_in_block (did not find ops for block #{block_num} using enum_virtual_ops)" unless @no_warn response = case account_history_api when Hive::CondenserApi account_history_api.get_ops_in_block(block_num, true) when Hive::AccountHistoryApi account_history_api.get_ops_in_block( block_num: block_num, only_virtual: true, # TODO Use: mode != :irreversible include_reversible: true ) end if response.nil? || (result = response.result).nil? if retries < MAX_RETRY_COUNT warn "Retrying get_ops_in_block on block #{block_num}" unless @no_warn retries = retries + 1 sleep 9 redo else raise TooManyRetriesError, "unable to get valid result while finding virtual operations for block: #{block_num}" end end ops = case account_history_api when Hive::CondenserApi result.map do |trx| op = {type: trx.op[0] + '_operation', value: trx.op[1]} op = Hashie::Mash.new(op) end when Hive::AccountHistoryApi then result.ops.map { |trx| trx.op } end if ops.empty? if retries < MAX_RETRY_COUNT sleep 3 retries = retries + 1 redo else warn "unable to find virtual operations for block: #{block_num}" unless @no_warn # raise TooManyRetriesError, "unable to find virtual operations for block: #{block_num}" end end ops.each do |op| next if types.any? && !types.include?(op.type) block.call op, vop.trx_id, block_num end break end end
@private
# File lib/hive/stream.rb, line 347 def transform_types(types) [types].compact.flatten.map do |type| type = type.to_s unless type.end_with? '_operation' warn "Op type #{type} is deprecated. Use #{type}_operation instead." unless @no_warn type += '_operation' end type end end