class Kafka::Sasl::AwsMskIam
Constants
- AWS_MSK_IAM
Public Class Methods
new(aws_region:, access_key_id:, secret_key_id:, session_token: nil, logger:)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 12 def initialize(aws_region:, access_key_id:, secret_key_id:, session_token: nil, logger:) @semaphore = Mutex.new @aws_region = aws_region @access_key_id = access_key_id @secret_key_id = secret_key_id @session_token = session_token @logger = TaggedLogger.new(logger) end
Public Instance Methods
authenticate!(host, encoder, decoder)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 30 def authenticate!(host, encoder, decoder) @logger.debug "Authenticating #{@access_key_id} with SASL #{AWS_MSK_IAM}" host_without_port = host.split(':', -1).first time_now = Time.now.utc msg = authentication_payload(host: host_without_port, time_now: time_now) @logger.debug "Sending first client SASL AWS_MSK_IAM message:" @logger.debug msg encoder.write_bytes(msg) begin @server_first_message = decoder.bytes @logger.debug "Received first server SASL AWS_MSK_IAM message: #{@server_first_message}" raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: unknown error" unless @server_first_message rescue Errno::ETIMEDOUT, EOFError => e raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: #{e.message}" end @logger.debug "SASL #{AWS_MSK_IAM} authentication successful" end
configured?()
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 26 def configured? @aws_region && @access_key_id && @secret_key_id end
ident()
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 22 def ident AWS_MSK_IAM end
Private Instance Methods
authentication_payload(host:, time_now:)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 64 def authentication_payload(host:, time_now:) { 'version' => "2020_10_22", 'host' => host, 'user-agent' => "ruby-kafka", 'action' => "kafka-cluster:Connect", 'x-amz-algorithm' => "AWS4-HMAC-SHA256", 'x-amz-credential' => @access_key_id + "/" + time_now.strftime("%Y%m%d") + "/" + @aws_region + "/kafka-cluster/aws4_request", 'x-amz-date' => time_now.strftime("%Y%m%dT%H%M%SZ"), 'x-amz-signedheaders' => "host", 'x-amz-expires' => "900", 'x-amz-security-token' => @session_token, 'x-amz-signature' => signature(host: host, time_now: time_now) }.delete_if { |_, v| v.nil? }.to_json end
bin_to_hex(s)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 56 def bin_to_hex(s) s.each_byte.map { |b| b.to_s(16).rjust(2, '0') }.join end
canonical_headers(host:)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 103 def canonical_headers(host:) "host" + ":" + host + "\n" end
canonical_query_string(time_now:)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 89 def canonical_query_string(time_now:) params = { "Action" => "kafka-cluster:Connect", "X-Amz-Algorithm" => "AWS4-HMAC-SHA256", "X-Amz-Credential" => @access_key_id + "/" + time_now.strftime("%Y%m%d") + "/" + @aws_region + "/kafka-cluster/aws4_request", "X-Amz-Date" => time_now.strftime("%Y%m%dT%H%M%SZ"), "X-Amz-Expires" => "900", "X-Amz-Security-Token" => @session_token, "X-Amz-SignedHeaders" => "host" }.delete_if { |_, v| v.nil? } URI.encode_www_form(params) end
canonical_request(host:, time_now:)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 80 def canonical_request(host:, time_now:) "GET\n" + "/\n" + canonical_query_string(time_now: time_now) + "\n" + canonical_headers(host: host) + "\n" + signed_headers + "\n" + hashed_payload end
digest()
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 60 def digest @digest ||= OpenSSL::Digest::SHA256.new end
hashed_payload()
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 111 def hashed_payload bin_to_hex(digest.digest("")) end
signature(host:, time_now:)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 122 def signature(host:, time_now:) date_key = OpenSSL::HMAC.digest("SHA256", "AWS4" + @secret_key_id, time_now.strftime("%Y%m%d")) date_region_key = OpenSSL::HMAC.digest("SHA256", date_key, @aws_region) date_region_service_key = OpenSSL::HMAC.digest("SHA256", date_region_key, "kafka-cluster") signing_key = OpenSSL::HMAC.digest("SHA256", date_region_service_key, "aws4_request") signature = bin_to_hex(OpenSSL::HMAC.digest("SHA256", signing_key, string_to_sign(host: host, time_now: time_now))) signature end
signed_headers()
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 107 def signed_headers "host" end
string_to_sign(host:, time_now:)
click to toggle source
# File lib/kafka/sasl/awsmskiam.rb, line 115 def string_to_sign(host:, time_now:) "AWS4-HMAC-SHA256" + "\n" + time_now.strftime("%Y%m%dT%H%M%SZ") + "\n" + time_now.strftime("%Y%m%d") + "/" + @aws_region + "/kafka-cluster/aws4_request" + "\n" + bin_to_hex(digest.digest(canonical_request(host: host, time_now: time_now))) end