class LogStash::Inputs::SQS
Pull events from an Amazon Web Services Simple Queue Service (SQS
) queue.
SQS
is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS
is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS
works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account * Setup an SQS queue * Create an identify that has access to consume messages from the queue.
The “consumer” identity must have the following permissions on the queue:
* sqs:ChangeMessageVisibility * sqs:ChangeMessageVisibilityBatch * sqs:DeleteMessage * sqs:DeleteMessageBatch * sqs:GetQueueAttributes * sqs:GetQueueUrl * sqs:ListQueues * sqs:ReceiveMessage
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
{ "Statement": [ { "Action": [ "sqs:ChangeMessageVisibility", "sqs:ChangeMessageVisibilityBatch", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ListQueues", "sqs:SendMessage", "sqs:SendMessageBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:sqs:us-east-1:123456789012:Logstash" ] } ] }
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Public Instance Methods
# File lib/logstash/inputs/sqs.rb, line 80 def aws_service_endpoint(region) return { :sqs_endpoint => "sqs.#{region}.amazonaws.com" } end
# File lib/logstash/inputs/sqs.rb, line 87 def register @logger.info("Registering SQS input", :queue => @queue) require "aws-sdk" @sqs = AWS::SQS.new(aws_options_hash) begin @logger.debug("Connecting to AWS SQS queue", :queue => @queue) @sqs_queue = @sqs.queues.named(@queue) @logger.info("Connected to AWS SQS queue successfully.", :queue => @queue) rescue Exception => e @logger.error("Unable to access SQS queue.", :error => e.to_s, :queue => @queue) throw e end # begin/rescue end
# File lib/logstash/inputs/sqs.rb, line 104 def run(output_queue) @logger.debug("Polling SQS queue", :queue => @queue) receive_opts = { :limit => 10, :visibility_timeout => 30, :attributes => [:sent_at] } continue_polling = true while running? && continue_polling continue_polling = run_with_backoff(60, 1) do @sqs_queue.receive_message(receive_opts) do |message| if message @codec.decode(message.body) do |event| decorate(event) if @id_field event[@id_field] = message.id end if @md5_field event[@md5_field] = message.md5 end if @sent_timestamp_field event[@sent_timestamp_field] = message.sent_timestamp.utc end @logger.debug? && @logger.debug("Processed SQS message", :message_id => message.id, :message_md5 => message.md5, :sent_timestamp => message.sent_timestamp, :queue => @queue) output_queue << event message.delete end # codec.decode end # valid SQS message end # receive_message end # run_with_backoff end # polling loop end
# File lib/logstash/inputs/sqs.rb, line 139 def teardown @sqs_queue = nil finished end
Private Instance Methods
Runs an AWS request inside a Ruby block with an exponential backoff in case we exceed the allowed AWS RequestLimit.
@param [Integer] max_time maximum amount of time to sleep before giving up. @param [Integer] sleep_time the initial amount of time to sleep before retrying. @param [Block] block Ruby code block to execute.
# File lib/logstash/inputs/sqs.rb, line 151 def run_with_backoff(max_time, sleep_time, &block) if sleep_time > max_time @logger.error("AWS::EC2::Errors::RequestLimitExceeded ... failed.", :queue => @queue) return false end # retry limit exceeded begin block.call rescue AWS::EC2::Errors::RequestLimitExceeded @logger.info("AWS::EC2::Errors::RequestLimitExceeded ... retrying SQS request", :queue => @queue, :sleep_time => sleep_time) sleep sleep_time run_with_backoff(max_time, sleep_time * 2, &block) rescue AWS::EC2::Errors::InstanceLimitExceeded @logger.warn("AWS::EC2::Errors::InstanceLimitExceeded ... aborting SQS message retreival.") return false rescue Exception => bang @logger.error("Error reading SQS queue.", :error => bang, :queue => @queue) return false end # begin/rescue return true end