class LogStash::Inputs::Salesforce
This Logstash input plugin allows you to query Salesforce
using SOQL and puts the results into Logstash, one row per event. You can configure it to pull entire sObjects or only specific fields.
NOTE: This input plugin will stop after all the results of the query are processed and will need to be re-run to fetch new results. It does not utilize the streaming API.
In order to use this plugin, you will need to create a new SFDC Application using oauth. More details can be found here: help.salesforce.com/apex/HTViewHelpDoc?id=connected_app_create.htm
You will also need a username, password, and security token for your salesforce instance. More details for generating a token can be found here: help.salesforce.com/apex/HTViewHelpDoc?id=user_security_token.htm
In addition to specifying an sObject, you can also supply a list of API fields that will be used in the SOQL query.
Example¶ ↑
This example prints all the Salesforce
Opportunities to standard out
- source,ruby
input {
salesforce { client_id => 'OAUTH CLIENT ID FROM YOUR SFDC APP' client_secret => 'OAUTH CLIENT SECRET FROM YOUR SFDC APP' username => 'email@example.com' password => 'super-secret' security_token => 'SECURITY TOKEN FOR THIS USER' sfdc_object_name => 'Opportunity' }
}
output {
stdout { codec => rubydebug }
}
Public Instance Methods
Source
# File lib/logstash/inputs/salesforce.rb, line 138 def register require 'restforce' obj_desc = client.describe(@sfdc_object_name) @sfdc_field_types = get_field_types(obj_desc) @sfdc_fields = get_all_fields if @sfdc_fields.empty? end
Source
# File lib/logstash/inputs/salesforce.rb, line 146 def run(queue) while !stop? start = Time.now results = client.query(get_query()) latest_tracking_field_value = nil if results && results.first results.each do |result| event = LogStash::Event.new() decorate(event) @sfdc_fields.each do |field| field_type = @sfdc_field_types[field] value = result.send(field) event_key = @to_underscores ? underscore(field) : field unless value.nil? case field_type when 'datetime', 'date' event.set(event_key, format_time(value)) else event.set(event_key, value) end end end queue << event unless @tracking_field.nil? latest_tracking_field_value = result[@tracking_field] end end # loop sObjects end unless @tracking_field_value_file.nil? unless latest_tracking_field_value.nil? @logger.debug("Writing latest tracking field value " + latest_tracking_field_value + " to " + @tracking_field_value_file) File.write(@tracking_field_value_file, latest_tracking_field_value) else @logger.debug("No tracking field value found in result, not updating " + @tracking_field_value_file) end end if @interval == -1 break else duration = Time.now - start # Sleep for the remainder of the interval, or 0 if the duration ran # longer than the interval. sleeptime = [0, @interval - duration].max if sleeptime == 0 @logger.warn("Execution ran longer than the interval. Skipping sleep.", :duration => duration, :interval => @interval) end Stud.stoppable_sleep(sleeptime) { stop? } end # end interval check end end
Private Instance Methods
Source
# File lib/logstash/inputs/salesforce.rb, line 262 def append_to_where_clause(changed_data_filter_interpolated, where) if where.empty? where << ["WHERE"] else where << ["AND"] end where << [changed_data_filter_interpolated] end
Source
# File lib/logstash/inputs/salesforce.rb, line 202 def client if @use_tooling_api @client ||= Restforce.tooling client_options else @client ||= Restforce.new client_options end end
Source
# File lib/logstash/inputs/salesforce.rb, line 211 def client_options options = { :username => @username, :password => @password.value, :security_token => @security_token.value, :client_id => @client_id, :client_secret => @client_secret.value, :timeout => @timeout } # configure the endpoint to which restforce connects to for authentication if @sfdc_instance_url && @use_test_sandbox raise ::LogStash::ConfigurationError.new("Both \"use_test_sandbox\" and \"sfdc_instance_url\" can't be set simultaneously. Please specify either \"use_test_sandbox\" or \"sfdc_instance_url\"") elsif @sfdc_instance_url options.merge!({ :host => @sfdc_instance_url }) elsif @use_test_sandbox options.merge!({ :host => "test.salesforce.com" }) end options.merge!({ :api_version => @api_version }) if @api_version return options end
Source
# File lib/logstash/inputs/salesforce.rb, line 297 def format_time(string) # salesforce can use different time formats so until we have a higher # performance requirement we can just use Time.parse # otherwise it's possible to use a sequence of DateTime.strptime, for example LogStash::Timestamp.new(Time.parse(string)) end
Source
# File lib/logstash/inputs/salesforce.rb, line 282 def get_all_fields return @sfdc_field_types.keys end
Source
# File lib/logstash/inputs/salesforce.rb, line 272 def get_field_types(obj_desc) field_types = {} obj_desc.fields.each do |f| field_types[f.name] = f.type end @logger.debug? && @logger.debug("Field types", :field_types => field_types.to_s) return field_types end
Source
# File lib/logstash/inputs/salesforce.rb, line 233 def get_query() sfdc_fields = @sfdc_fields.dup unless @tracking_field.nil? unless sfdc_fields.include?(@tracking_field) sfdc_fields << [@tracking_field] end end query = ["SELECT", sfdc_fields.join(','), "FROM", @sfdc_object_name] where = [] unless @sfdc_filters.empty? append_to_where_clause(@sfdc_filters, where) end unless @changed_data_filter.nil? if File.exist?(@tracking_field_value_file) last_tracking_field_value = File.read(@tracking_field_value_file) changed_data_filter_interpolated = @changed_data_filter % { :last_tracking_field_value => last_tracking_field_value } append_to_where_clause(changed_data_filter_interpolated, where) end end query << where unless @tracking_field.nil? query << ["ORDER BY", @tracking_field, "ASC"] end query_str = query.flatten.join(" ") @logger.debug? && @logger.debug("SFDC Query", :query => query_str) return query_str end
Source
# File lib/logstash/inputs/salesforce.rb, line 288 def underscore(camel_cased_word) camel_cased_word.to_s.gsub(/::/, '/'). gsub(/([A-Z]+)([A-Z][a-z])/,'\1_\2'). gsub(/([a-z\d])([A-Z])/,'\1_\2'). tr("-", "_"). downcase end