class Neo4j::Core::CypherSession::Adaptors::Bolt
Constants
- BYTE_STRINGS
Don't need to calculate these every time. Cache in memory
- GOGOBOLT
- STREAM_INSPECTOR
- SUPPORTED_VERSIONS
- VERSION
Public Class Methods
new(url, options = {})
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 25 def initialize(url, options = {}) 26 self.url = url 27 @options = options 28 @net_tcp_client_options = {read_timeout: options.fetch(:read_timeout, -1), 29 write_timeout: options.fetch(:write_timeout, -1), 30 connect_timeout: options.fetch(:connect_timeout, 10), 31 ssl: options.fetch(:ssl, {})} 32 33 open_socket 34 end
transaction_class()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 62 def self.transaction_class 63 require 'neo4j/core/cypher_session/transactions/bolt' 64 Neo4j::Core::CypherSession::Transactions::Bolt 65 end
Public Instance Methods
connect()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 36 def connect 37 handshake 38 39 init 40 41 message = flush_messages[0] 42 return if message.type == :success 43 44 data = message.args[0] 45 fail "Init did not complete successfully\n\n#{data['code']}\n#{data['message']}" 46 end
connected?()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 58 def connected? 59 !!@tcp_client && !@tcp_client.closed? 60 end
query_set(transaction, queries, options = {})
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 48 def query_set(transaction, queries, options = {}) 49 setup_queries!(queries, transaction, skip_instrumentation: options[:skip_instrumentation]) 50 51 self.class.instrument_request(self) do 52 send_query_jobs(queries) 53 54 build_response(queries, options[:wrap_level] || @options[:wrap_level]) 55 end 56 end
ssl?()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 75 def ssl? 76 @tcp_client.socket.is_a?(OpenSSL::SSL::SSLSocket) 77 end
Private Instance Methods
build_response(queries, wrap_level)
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 81 def build_response(queries, wrap_level) 82 catch(:cypher_bolt_failure) do 83 Responses::Bolt.new(queries, method(:flush_messages), wrap_level: wrap_level).results 84 end.tap do |error_data| 85 handle_failure!(error_data) if !error_data.is_a?(Array) 86 end 87 end
flush_messages()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 174 def flush_messages 175 if structures = flush_response 176 structures.map do |structure| 177 Message.new(structure.signature, *structure.list).tap do |message| 178 log_message :S, message.type, message.args.join(' ') if logger.debug? 179 end 180 end 181 end 182 end
flush_response()
click to toggle source
Replace with Enumerator?
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 196 def flush_response 197 chunk = '' 198 199 while !(header = recvmsg(2)).empty? && (chunk_size = header.unpack('s>*')[0]) > 0 200 log_message :S, :chunk_size, chunk_size 201 202 chunk << recvmsg(chunk_size) 203 end 204 205 unpacker = PackStream::Unpacker.new(StringIO.new(chunk)) 206 [].tap { |r| while arg = unpacker.unpack_value!; r << arg; end } 207 end
handle_failure!(error_data)
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 89 def handle_failure!(error_data) 90 flush_messages 91 92 send_job do |job| 93 job.add_message(:ack_failure) 94 end 95 fail 'Expected SUCCESS for ACK_FAILURE' if flush_messages[0].type != :success 96 97 fail CypherError.new_from(error_data['code'], error_data['message']) 98 end
handshake()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 124 def handshake 125 log_message :C, :handshake, nil 126 127 sendmsg(GOGOBOLT + SUPPORTED_VERSIONS.pack('l>*')) 128 129 agreed_version = recvmsg(4).unpack('l>*')[0] 130 131 if agreed_version.zero? 132 @tcp_client.close 133 134 fail "Couldn't agree on a version (Sent versions #{SUPPORTED_VERSIONS.inspect})" 135 end 136 137 logger.debug { "Agreed to version: #{agreed_version}" } 138 end
init()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 140 def init 141 send_job do |job| 142 job.add_message(:init, USER_AGENT_STRING, principal: user, credentials: password, scheme: 'basic') 143 end 144 end
log_message(side, *args)
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 184 def log_message(side, *args) 185 logger.debug do 186 if args.size == 1 187 "#{side}: #{STREAM_INSPECTOR.call(args[0])}" 188 else 189 type, message = args 190 "#{side}: #{ANSI::CYAN}#{type.to_s.upcase}#{ANSI::CLEAR} #{message}" 191 end 192 end 193 end
new_job()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 109 def new_job 110 Job.new(self) 111 end
open_socket()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 117 def open_socket 118 @tcp_client = Net::TCPClient.new(@net_tcp_client_options.merge(buffered: false, server: "#{host}:#{port}")) 119 rescue Errno::ECONNREFUSED => e 120 raise Neo4j::Core::CypherSession::ConnectionFailedError, e.message 121 end
recvmsg(size)
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 168 def recvmsg(size) 169 @tcp_client.read(size) do |result| 170 log_message :S, result 171 end 172 end
secure_connection?()
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 113 def secure_connection? 114 @is_secure_socket ||= @options.key?(:ssl) 115 end
send_job() { |job| ... }
click to toggle source
Allows you to send messages to the server Returns an array of Message
objects
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 148 def send_job 149 new_job.tap do |job| 150 yield job 151 log_message :C, :job, job 152 sendmsg(job.chunked_packed_stream) 153 end 154 end
send_query_jobs(queries)
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 100 def send_query_jobs(queries) 101 send_job do |job| 102 queries.each do |query| 103 job.add_message(:run, query.cypher, query.parameters || {}) 104 job.add_message(:pull_all) 105 end 106 end 107 end
sendmsg(message)
click to toggle source
# File lib/neo4j/core/cypher_session/adaptors/bolt.rb 163 def sendmsg(message) 164 log_message :C, message 165 @tcp_client.write(message) 166 end