class Orderbook
This class represents the current state of the CoinBase Exchange orderbook.
Orderbook
version number. I try to keep it semantic.
Constants
- PING_INTERVAL
seconds in between pinging the connection.
- VERSION
Attributes
Array of asks
Array of bids
Coinbase::Exchange::Client object
Thread running the EM loop for the websocket
DateTime of last successful pong
Sequence number of most recently received message
Thread running the processing loop
Product ID of the orderbook
Message queue for incoming messages.
Sequence number from the initial level 3 snapshot
Coinbase::Exchange::Websocket object
Public Class Methods
Creates a new live copy of the orderbook.
If start
is set to false, the orderbook will not start automatically.
If a block
is given it is passed each message as it is received.
# File lib/orderbook.rb, line 67 def initialize(product_id: "BTC-USD", start: true, &block) @product_id = product_id @bids = [] @asks = [] @snapshot_sequence = 0 @last_sequence = 0 @queue = Queue.new @websocket = Coinbase::Exchange::Websocket.new(keepalive: true, product_id: @product_id) @client = Coinbase::Exchange::Client.new('', '', '', product_id: @product_id) @on_message = block if block_given? start && start! end
Public Instance Methods
# File lib/orderbook.rb, line 110 def on_message(&block) @on_message = block end
Used to start the thread that listens to updates on the websocket and applies them to the current orderbook to create a live book.
# File lib/orderbook.rb, line 83 def start! start_em_thread # Wait to make sure the snapshot sequence ID is higher than the sequence of # the first message in the queue. # sleep 0.3 apply_orderbook_snapshot start_processing_thread end
Stops the processing thread, EM thread, and the websocket.
# File lib/orderbook.rb, line 96 def stop! @processing_thread.kill @em_thread.kill @websocket.stop! end
Private Instance Methods
Fetch orderbook snapshot from API and convert order arrays to hashes.
# File lib/orderbook.rb, line 127 def apply_orderbook_snapshot @client.orderbook(level: 3) do |resp| @bids = resp['bids'].map { |b| order_to_hash(*b) } @asks = resp['asks'].map { |a| order_to_hash(*a) } @snapshot_sequence = resp['sequence'] @last_sequence = resp['sequence'] end end
Converts an order array from the API into a hash.
# File lib/orderbook.rb, line 118 def order_to_hash(price, size, order_id) { price: BigDecimal.new(price), size: BigDecimal.new(size), order_id: order_id } end
# File lib/orderbook.rb, line 150 def setup_error_handler EM.error_handler do |e| print "Websocket Error: #{e.message} - #{e.backtrace.join("\n")}" end end
# File lib/orderbook.rb, line 142 def setup_ping_timer EM.add_periodic_timer(PING_INTERVAL) do @websocket.ping do @last_pong = Time.now end end end
# File lib/orderbook.rb, line 136 def setup_websocket_callback @websocket.message do |message| @queue.push(message) end end
# File lib/orderbook.rb, line 156 def start_em_thread @em_thread = Thread.new do setup_websocket_callback EM.run do @websocket.start! setup_ping_timer setup_error_handler end end end
# File lib/orderbook.rb, line 167 def start_processing_thread @processing_thread = Thread.new do loop do message = @queue.shift apply(message) @on_message.call(message) unless @on_message.nil? end end end