class LogStash::Outputs::SQS

Push events to an Amazon Web Services Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.

Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

To use this plugin, you must:

* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to publish messages to the queue.

The “consumer” identity must have the following permissions on the queue:

* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:SendMessage
* sqs:SendMessageBatch

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:

{
  "Statement": [
    {
      "Sid": "Stmt1347986764948",
      "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueues",
        "sqs:ReceiveMessage"
      ],
      "Effect": "Allow",
      "Resource": [
        "arn:aws:sqs:us-east-1:200850199751:Logstash"
      ]
    }
  ]
}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

Public Instance Methods

aws_service_endpoint(region) click to toggle source
# File lib/logstash/outputs/sqs.rb, line 82
def aws_service_endpoint(region)
  return {
      :sqs_endpoint => "sqs.#{region}.amazonaws.com"
  }
end
flush(events, teardown=false) click to toggle source

called from Stud::Buffer#buffer_flush when there are events to flush

# File lib/logstash/outputs/sqs.rb, line 131
def flush(events, teardown=false)
  @sqs_queue.batch_send(events)
end
receive(event) click to toggle source
# File lib/logstash/outputs/sqs.rb, line 122
def receive(event)
  if @batch
    buffer_receive(event.to_json)
    return
  end
  @sqs_queue.send_message(event.to_json)
end
register() click to toggle source
# File lib/logstash/outputs/sqs.rb, line 89
def register
  require "aws-sdk"

  @sqs = AWS::SQS.new(aws_options_hash)

  if @batch
    if @batch_events > 10
      raise RuntimeError.new(
        "AWS only allows a batch_events parameter of 10 or less"
      )
    elsif @batch_events <= 1
      raise RuntimeError.new(
        "batch_events parameter must be greater than 1 (or its not a batch)"
      )
    end
    buffer_initialize(
      :max_items => @batch_events,
      :max_interval => @batch_timeout,
      :logger => @logger
    )
  end

  begin
    @logger.debug("Connecting to AWS SQS queue '#{@queue}'...")
    @sqs_queue = @sqs.queues.named(@queue)
  rescue Exception => e
    @logger.error("Unable to access SQS queue '#{@queue}': #{e.to_s}")
  end # begin/rescue

  @logger.info("Connected to AWS SQS queue '#{@queue}' successfully.")
end
teardown() click to toggle source
# File lib/logstash/outputs/sqs.rb, line 136
def teardown
  buffer_flush(:final => true)
  @sqs_queue = nil
  finished
end