class LogStash::Outputs::SQS
Push events to an Amazon Web Services Simple Queue Service (SQS
) queue.
SQS
is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS
is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS
works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account * Setup an SQS queue * Create an identify that has access to publish messages to the queue.
The “consumer” identity must have the following permissions on the queue:
* sqs:ChangeMessageVisibility * sqs:ChangeMessageVisibilityBatch * sqs:GetQueueAttributes * sqs:GetQueueUrl * sqs:ListQueues * sqs:SendMessage * sqs:SendMessageBatch
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
{ "Statement": [ { "Sid": "Stmt1347986764948", "Action": [ "sqs:ChangeMessageVisibility", "sqs:ChangeMessageVisibilityBatch", "sqs:DeleteMessage", "sqs:DeleteMessageBatch", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ListQueues", "sqs:ReceiveMessage" ], "Effect": "Allow", "Resource": [ "arn:aws:sqs:us-east-1:200850199751:Logstash" ] } ] }
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Public Instance Methods
# File lib/logstash/outputs/sqs.rb, line 82 def aws_service_endpoint(region) return { :sqs_endpoint => "sqs.#{region}.amazonaws.com" } end
called from Stud::Buffer#buffer_flush when there are events to flush
# File lib/logstash/outputs/sqs.rb, line 131 def flush(events, teardown=false) @sqs_queue.batch_send(events) end
# File lib/logstash/outputs/sqs.rb, line 122 def receive(event) if @batch buffer_receive(event.to_json) return end @sqs_queue.send_message(event.to_json) end
# File lib/logstash/outputs/sqs.rb, line 89 def register require "aws-sdk" @sqs = AWS::SQS.new(aws_options_hash) if @batch if @batch_events > 10 raise RuntimeError.new( "AWS only allows a batch_events parameter of 10 or less" ) elsif @batch_events <= 1 raise RuntimeError.new( "batch_events parameter must be greater than 1 (or its not a batch)" ) end buffer_initialize( :max_items => @batch_events, :max_interval => @batch_timeout, :logger => @logger ) end begin @logger.debug("Connecting to AWS SQS queue '#{@queue}'...") @sqs_queue = @sqs.queues.named(@queue) rescue Exception => e @logger.error("Unable to access SQS queue '#{@queue}': #{e.to_s}") end # begin/rescue @logger.info("Connected to AWS SQS queue '#{@queue}' successfully.") end
# File lib/logstash/outputs/sqs.rb, line 136 def teardown buffer_flush(:final => true) @sqs_queue = nil finished end