class LogStash::Outputs::Riak
Riak
is a distributed k/v store from Basho. It's based on the Dynamo model.
Public Instance Methods
receive(event)
click to toggle source
# File lib/logstash/outputs/riak.rb, line 109 def receive(event) return unless output?(event) @bucket.each do |b| # setup our bucket(s) bukkit = @client.bucket(event.sprintf(b)) # Disable bucket props for now # Need to detect params passed that should be converted to int # otherwise setting props fails =( # Logstash syntax only supports strings and bools # likely fix is to either hack in is_numeric? # or whitelist certain params and call to_i ##@logger.debug("Setting bucket props", :props => @bucket_props) ##bukkit.props = @bucket_props if @bucket_props ##@logger.debug("Bucket", :bukkit => bukkit.inspect) if @enable_search @logger.debug("Enable search requested", :bucket => bukkit.inspect) # Check if search is enabled @logger.debug("Checking bucket status", :search_enabled => bukkit.is_indexed?) bukkit.enable_index! unless bukkit.is_indexed? @logger.debug("Rechecking bucket status", :search_enabled => bukkit.is_indexed?) end @key_name.nil? ? evt_key=nil : evt_key=event.sprintf(@key_name) evt = Riak::RObject.new(bukkit, evt_key) @logger.debug("RObject", :robject => evt.to_s) begin evt.content_type = "application/json" evt.data = event if @indices @indices.each do |k| idx_name = "#{k.gsub('@','')}_bin" @logger.debug("Riak index name", :idx => idx_name) @logger.info("Indexes", :indexes => evt.indexes.to_s) evt.indexes[idx_name] << event.sprintf("%{#{k}}") end end evt.store rescue Exception => e @logger.warn("Exception storing", :message => e.message) end end end
register()
click to toggle source
Metadata (NYI) Allow the user to set custom metadata on the object Should consider converting logstash data to metadata as well
# File lib/logstash/outputs/riak.rb, line 82 def register require 'riak' riak_opts = {} cluster_nodes = Array.new @logger.debug("Setting protocol", :protocol => @proto) proto_type = "#{@proto}_port".to_sym @nodes.each do |node,port| @logger.debug("Adding node", :node => node, :port => port) cluster_nodes << {:host => node, proto_type => port} end @logger.debug("Cluster nodes", :nodes => cluster_nodes) if @enable_ssl @logger.debug("SSL requested") if @ssl_opts @logger.debug("SSL options provided", @ssl_opts) riak_opts.merge!(@ssl_opts.inject({}) {|h,(k,v)| h[k.to_sym] = v; h}) else riak_opts.merge!({:ssl => true}) end @logger.debug("Riak options:", :riak_opts => riak_opts) end riak_opts.merge!({:nodes => cluster_nodes}) @logger.debug("Riak options:", :riak_opts => riak_opts) @client = Riak::Client.new(riak_opts) end