class Hive::Stream

Hive::Stream allows a live view of the HIVE blockchain.

Example streaming blocks:

stream = Hive::Stream.new

stream.blocks do |block, block_num|
  puts "#{block_num} :: #{block.witness}"
end

Example streaming transactions:

stream = Hive::Stream.new

stream.transactions do |trx, trx_id, block_num|
  puts "#{block_num} :: #{trx_id} :: operations: #{trx.operations.size}"
end

Example streaming operations:

stream = Hive::Stream.new

stream.operations do |op, trx_id, block_num|
  puts "#{block_num} :: #{trx_id} :: #{op.type}: #{op.value.to_json}"
end

Allows streaming of block headers, full blocks, transactions, operations and virtual operations.

Constants

BLOCK_INTERVAL
MAX_BACKOFF_BLOCK_INTERVAL
MAX_RETRY_COUNT
MAX_VOP_READ_AHEAD
SHUFFLE_ROUND_LENGTH
VOP_TRX_ID

Attributes

account_history_api[R]
block_api[R]
database_api[R]
mode[R]

Public Class Methods

new(options = {mode: :irreversible}) click to toggle source

@param options [Hash] additional options @option options [Hive::DatabaseApi] :database_api @option options [Hive::BlockApi] :block_api @option options [Hive::AccountHistoryApi || Hive::CondenserApi] :account_history_api @option options [Symbol] :mode we have the choice between

* :head the last block
* :irreversible the block that is confirmed by 2/3 of all block producers and is thus irreversible!

@option options [Boolean] :no_warn do not generate warnings

# File lib/hive/stream.rb, line 49
def initialize(options = {mode: :irreversible})
  @instance_options = options
  @database_api = options[:database_api] || Hive::DatabaseApi.new(options)
  @block_api = options[:block_api] || Hive::BlockApi.new(options)
  @account_history_api = options[:account_history_api]
  @mode = options[:mode] || :irreversible
  @no_warn = !!options[:no_warn]
end

Public Instance Methods

block_headers(options = {}, &block) click to toggle source

Use this method to stream block headers. This is quite a bit faster than requesting full blocks.

@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.

# File lib/hive/stream.rb, line 76
def block_headers(options = {}, &block)
  block_objects(options.merge(object: :block_headers), block)
end
block_numbers(options = {}, &block) click to toggle source

Use this method to stream block numbers. This is significantly faster than requesting full blocks and even block headers. Basically, the only thing this method does is call {Hive::Database#get_dynamic_global_properties} at 3 second intervals.

@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.

# File lib/hive/stream.rb, line 66
def block_numbers(options = {}, &block)
  block_objects(options.merge(object: :block_numbers), block)
end
blocks(options = {}, &block) click to toggle source

Use this method to stream full blocks.

@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.

# File lib/hive/stream.rb, line 85
def blocks(options = {}, &block)
  block_objects(options.merge(object: :blocks), block)
end
operations(*args) { |op, trx_id, block_num| ... } click to toggle source

Returns the latest operations from the blockchain.

stream = Hive::Stream.new
stream.operations do |op|
  puts op.to_json
end

If symbol are passed to `types` option, then only that operation is returned. Expected symbols are:

account_create_operation
account_create_with_delegation_operation
account_update_operation
account_witness_proxy_operation
account_witness_vote_operation
cancel_transfer_from_savings_operation
change_recovery_account_operation
claim_reward_balance_operation
comment_operation
comment_options_operation
convert_operation
custom_operation
custom_json_operation
decline_voting_rights_operation
delegate_vesting_shares_operation
delete_comment_operation
escrow_approve_operation
escrow_dispute_operation
escrow_release_operation
escrow_transfer_operation
feed_publish_operation
limit_order_cancel_operation
limit_order_create_operation
limit_order_create2_operation
pow_operation
pow2_operation
recover_account_operation
request_account_recovery_operation
set_withdraw_vesting_route_operation
transfer_operation
transfer_from_savings_operation
transfer_to_savings_operation
transfer_to_vesting_operation
vote_operation
withdraw_vesting_operation
witness_update_operation

For example, to stream only votes:

stream = Hive::Stream.new
stream.operations(types: :vote_operation) do |vote|
  puts vote.to_json
end

… Or …

stream = Hive::Stream.new
stream.operations(:vote_operation) do |vote|
  puts vote.to_json
end

You can also stream virtual operations:

stream = Hive::Stream.new
stream.operations(types: :author_reward_operation, only_virtual: true) do |vop|
  v = vop.value
  puts "#{v.author} got paid for #{v.permlink}: #{[v.hbd_payout, v.hive_payout, v.vesting_payout]}"
end

… or multiple virtual operation types;

stream = Hive::Stream.new
stream.operations(types: [:producer_reward_operation, :author_reward_operation], only_virtual: true) do |vop|
  puts vop.to_json
end

… or all types, including virtual operation types from the head block number:

stream = Hive::Stream.new(mode: :head)
stream.operations(include_virtual: true) do |op|
  puts op.to_json
end

Expected virtual operation types:

producer_reward_operation
author_reward_operation
curation_reward_operation
fill_convert_request_operation
fill_order_operation
fill_vesting_withdraw_operation
interest_operation
shutdown_witness_operation

@param args [Symbol || Array<Symbol> || Hash] the type(s) of operation or hash of expanded options, optional. @option args [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option args [Integer] :until_block_num Ends the stream at the given block number. Default: nil. @option args [Symbol || Array<Symbol>] :types the type(s) of operation, optional. @option args [Boolean] :only_virtual Only stream virtual options. Setting this true will improve performance because the stream only needs block numbers to then retrieve virtual operations. Default: false. @option args [Boolean] :include_virtual Also stream virtual options. Setting this true will impact performance. Default: false. @param block the block to execute for each result. Yields: |op, trx_id, block_num|

# File lib/hive/stream.rb, line 213
def operations(*args, &block)
  options = {}
  types = []
  only_virtual = false
  include_virtual = false
  last_block_num = nil
  within_shuffle_round = nil
  initial_head_block_number = database_api.get_dynamic_global_properties do |dgpo|
    dgpo.head_block_number
  end
  
  case args.first
  when Hash
    options = args.first
    types = transform_types(options[:types])
    only_virtual = !!options[:only_virtual] || false
    include_virtual = !!options[:include_virtual] || only_virtual || false
  when Symbol, Array then types = transform_types(args)
  end
  
  if only_virtual
    block_numbers(options) do |block_num|
      within_shuffle_round ||= initial_head_block_number - block_num < SHUFFLE_ROUND_LENGTH * 2

      get_virtual_ops(types, block_num, within_shuffle_round, block)
    end
  else
    transactions(options) do |transaction, trx_id, block_num|
      transaction.operations.each do |op|
        yield op, trx_id, block_num if types.none? || types.include?(op.type)
        
        next unless last_block_num != block_num
        
        last_block_num = block_num
        within_shuffle_round ||= initial_head_block_number - block_num < SHUFFLE_ROUND_LENGTH * 2
        
        get_virtual_ops(types, block_num, within_shuffle_round, block) if include_virtual
      end
    end
  end
end
transactions(options = {}) { |transaction, trx_id, block_num| ... } click to toggle source

Use this method to stream each transaction.

@param options [Hash] additional options @option options [Integer] :at_block_num Starts the stream at the given block number. Default: nil. @option options [Integer] :until_block_num Ends the stream at the given block number. Default: nil.

# File lib/hive/stream.rb, line 94
def transactions(options = {}, &block)
  blocks(options) do |block, block_num|
    if block.nil?
      warn "Batch missing block_num: #{block_num}, retrying ..." unless @no_warn
      
      block = block_api.get_block(block_num: block_num) do |result|
        result.block
      end
    end
    
    block.transactions.each_with_index do |transaction, index|
      trx_id = block.transaction_ids[index]
      
      yield transaction, trx_id, block_num
    end
  end
end

Private Instance Methods

block_objects(options = {}, block) click to toggle source

@private

# File lib/hive/stream.rb, line 265
def block_objects(options = {}, block)
  object = options[:object]
  object_method = "get_#{object}".to_sym
  block_interval = BLOCK_INTERVAL
  use_block_range = true
  
  at_block_num, until_block_num = if !!block_range = options[:block_range]
    [block_range.first, block_range.last]
  else
    [options[:at_block_num], options[:until_block_num]]
  end
  
  loop do
    break if !!until_block_num && !!at_block_num && until_block_num < at_block_num
    
    database_api.get_dynamic_global_properties do |properties|
      current_block_num = find_block_number(properties)
      current_block_num = [current_block_num, until_block_num].compact.min
      at_block_num ||= current_block_num
      
      if current_block_num >= at_block_num
        range = at_block_num..current_block_num
        
        if object == :block_numbers
          range.each do |n|
            block.call n
            block_interval = BLOCK_INTERVAL
          end
        else
          loop do
            begin
              if use_block_range
                block_api.send(object_method, block_range: range) do |b, n|
                  block.call b, n
                  block_interval = BLOCK_INTERVAL
                end
              else
                range.each do |block_num|
                  block_api.get_block(block_num: block_num) do |b, n|
                    block.call b.block, b.block.block_id[0..7].to_i(16)
                    block_interval = BLOCK_INTERVAL
                  end
                end
              end
            rescue Hive::UnknownError => e
              if e.message =~ /Could not find method get_block_range/
                use_block_range = false
                
                redo
              end
              
              raise e
            end
            
            break
          end
        end
        
        at_block_num = range.max + 1
      else
        # The stream has stalled, so let's back off and let the node sync
        # up.  We'll catch up with a bigger batch in the next cycle.
        block_interval = [block_interval * 2, MAX_BACKOFF_BLOCK_INTERVAL].min
      end
    end
    
    sleep block_interval
  end
end
find_block_number(properties) click to toggle source

@private

# File lib/hive/stream.rb, line 336
def find_block_number(properties)
  block_num = case mode
  when :head then properties.head_block_number
  when :irreversible then properties.last_irreversible_block_num
  else; raise Hive::ArgumentError, "Unknown mode: #{mode}"
  end
  
  block_num
end
get_virtual_ops(types, block_num, within_shuffle_round, block) click to toggle source

@private

# File lib/hive/stream.rb, line 361
def get_virtual_ops(types, block_num, within_shuffle_round, block)
  retries = 0
  vop_read_ahead = within_shuffle_round ? 1 : MAX_VOP_READ_AHEAD

  @virtual_ops_cache ||= {}
  @virtual_ops_cache = @virtual_ops_cache.reject do |k, v|
    if k < block_num
      warn "Found orphaned virtual operations for block_num #{k}: #{v.to_json}" unless @no_warn
      
      true
    end
    
    false
  end
  
  loop do
    vops_found = false
    
    if account_history_api.class == Hive::AccountHistoryApi || @enum_virtual_ops_supported.nil? && @enum_virtual_ops_supported != false
      begin
        # Use account_history_api.enum_virtual_ops, if supported.
        
        if @virtual_ops_cache.empty? || !@virtual_ops_cache.keys.include?(block_num)
          (block_num..(block_num + vop_read_ahead)).each do |block_num|
            @virtual_ops_cache[block_num] = []
          end
          
          enum_virtual_ops_options = {
            block_range_begin: block_num,
            block_range_end: block_num + vop_read_ahead,
            # TODO Use: mode != :irreversible
            include_reversible: true
          }
          
          account_history_api.enum_virtual_ops(enum_virtual_ops_options) do |result|
            @enum_virtual_ops_supported = true
            
            result.ops.each do |vop|
              @virtual_ops_cache[vop.block] << vop
            end
          end
        end
        
        vops_found = true
        
        if !!@virtual_ops_cache[block_num]
          @virtual_ops_cache[block_num].each do |vop|
            next unless block_num == vop.block
            next if types.any? && !types.include?(vop.op.type)
            
            if vop.virtual_op == 0
              # require 'pry' ; binding.pry if vop.op.type == 'producer_reward_operation'
              warn "Found non-virtual operation (#{vop.op.type}) in enum_virtual_ops result for block: #{block_num}" unless @no_warn
              
              next
            end
            
            block.call vop.op, vop.trx_id, block_num
          end
          
          @virtual_ops_cache.delete(block_num)
        end
      rescue Hive::UnknownError => e
        if e.message =~ /This API is not supported for account history backed by Chainbase/
          warn "Retrying with get_ops_in_block (api does not support enum_virtual_ops)" unless @no_warn
          @enum_virtual_ops_supported = false
          vops_found = false
        else
          raise e
        end
      end
    end
    
    break if vops_found
    
    # Fallback to previous method.
    warn "Retrying with get_ops_in_block (did not find ops for block #{block_num} using enum_virtual_ops)" unless @no_warn
    
    response = case account_history_api
    when Hive::CondenserApi
      account_history_api.get_ops_in_block(block_num, true)
    when Hive::AccountHistoryApi
      account_history_api.get_ops_in_block(
        block_num: block_num,
        only_virtual: true,
        # TODO Use: mode != :irreversible
        include_reversible: true
      )
    end
    
    if response.nil? || (result = response.result).nil?
      if retries < MAX_RETRY_COUNT
        warn "Retrying get_ops_in_block on block #{block_num}" unless @no_warn
        retries = retries + 1
        sleep 9
        redo
      else
        raise TooManyRetriesError, "unable to get valid result while finding virtual operations for block: #{block_num}"
      end
    end
    
    ops = case account_history_api
    when Hive::CondenserApi
      result.map do |trx|
        op = {type: trx.op[0] + '_operation', value: trx.op[1]}
        op = Hashie::Mash.new(op)
      end
    when Hive::AccountHistoryApi then result.ops.map { |trx| trx.op }
    end
    
    if ops.empty?
      if retries < MAX_RETRY_COUNT
        sleep 3
        retries = retries + 1
        redo
      else
        warn "unable to find virtual operations for block: #{block_num}" unless @no_warn
        # raise TooManyRetriesError, "unable to find virtual operations for block: #{block_num}"
      end
    end
    
    ops.each do |op|
      next if types.any? && !types.include?(op.type)
      
      block.call op, vop.trx_id, block_num
    end
    
    break
  end
end
transform_types(types) click to toggle source

@private

# File lib/hive/stream.rb, line 347
def transform_types(types)
  [types].compact.flatten.map do |type|
    type = type.to_s
    
    unless type.end_with? '_operation'
      warn "Op type #{type} is deprecated.  Use #{type}_operation instead." unless @no_warn
      type += '_operation'
    end
    
    type
  end
end