class Fluent::GoogleCloudOutput
fluentd output plugin for the Stackdriver Logging API
Constants
- GRPC_SEVERITY_MAPPING
- PLUGIN_NAME
- PLUGIN_VERSION
Follows semver.org format.
- SEVERITY_TRANSLATIONS
Translates other severity strings to one of the valid values above.
- VALID_SEVERITIES
Values permitted by the API for ‘severity’ (which is an enum).
Attributes
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
Expose attr_readers to make testing of metadata more direct than only testing it indirectly through metadata sent with logs.
Public Class Methods
# File lib/fluent/plugin/out_google_cloud.rb, line 436 def initialize super # use the global logger @log = $log # rubocop:disable Style/GlobalVars @failed_requests_count = nil @successful_requests_count = nil @dropped_entries_count = nil @ingested_entries_count = nil @retried_entries_count = nil @ok_code = nil @uptime_update_time = Time.now.to_i end
# File lib/fluent/plugin/out_google_cloud.rb, line 811 def self.version_string @version_string ||= "google-fluentd/#{PLUGIN_VERSION}" end
Public Instance Methods
# File lib/fluent/plugin/out_google_cloud.rb, line 451 def configure(conf) super # TODO(qingling128): Remove this warning after the support is added. Also # remove the comment in the description of this configuration. unless @logging_api_url == DEFAULT_LOGGING_API_URL || @use_grpc @log.warn 'Detected customized logging_api_url while use_grpc is not' \ ' enabled. Customized logging_api_url for the non-gRPC path' \ ' is not supported. The logging_api_url option will be' \ ' ignored.' end # Alert on old authentication configuration. unless @auth_method.nil? && @private_key_email.nil? && @private_key_path.nil? && @private_key_passphrase.nil? extra = [] extra << 'auth_method' unless @auth_method.nil? extra << 'private_key_email' unless @private_key_email.nil? extra << 'private_key_path' unless @private_key_path.nil? extra << 'private_key_passphrase' unless @private_key_passphrase.nil? raise Fluent::ConfigError, "#{PLUGIN_NAME} no longer supports auth_method.\n" \ "Please remove configuration parameters: #{extra.join(' ')}" end set_regexp_patterns @utils = Common::Utils.new(@log) @platform = @utils.detect_platform(@use_metadata_service) # Treat an empty setting of the credentials file path environment variable # as unset. This way the googleauth lib could fetch the credentials # following the fallback path. ENV.delete(CREDENTIALS_PATH_ENV_VAR) if ENV[CREDENTIALS_PATH_ENV_VAR] == '' # Set required variables: @project_id, @vm_id, @vm_name and @zone. @project_id = @utils.get_project_id(@platform, @project_id) @vm_id = @utils.get_vm_id(@platform, @vm_id) @vm_name = @utils.get_vm_name(@vm_name) @zone = @utils.get_location(@platform, @zone, @use_aws_availability_zone) # All metadata parameters must now be set. @utils.check_required_metadata_variables( @platform, @project_id, @zone, @vm_id ) # Retrieve monitored resource. # Fail over to retrieve monitored resource via the legacy path if we fail # to get it from Metadata Agent. @resource ||= @utils.determine_agent_level_monitored_resource_via_legacy( @platform, @subservice_name, @detect_subservice, @vm_id, @zone ) if @metrics_resource unless @metrics_resource[:type].is_a?(String) raise Fluent::ConfigError, 'metrics_resource.type must be a string:' \ " #{@metrics_resource}." end if @metrics_resource.key?(:labels) unless @metrics_resource[:labels].is_a?(Hash) raise Fluent::ConfigError, 'metrics_resource.labels must be a hash:' \ " #{@metrics_resource}." end extra_keys = @metrics_resource.reject do |k, _| %i[type labels].include?(k) end unless extra_keys.empty? raise Fluent::ConfigError, "metrics_resource has unrecognized keys: #{extra_keys.keys}." end else extra_keys = @metrics_resource.reject do |k, _| k == :type || k.to_s.start_with?('labels.') end unless extra_keys.empty? raise Fluent::ConfigError, "metrics_resource has unrecognized keys: #{extra_keys.keys}." end # Transform the Hash form of the metrics_resource config if necessary. resource_type = @metrics_resource[:type] resource_labels = @metrics_resource.each_with_object({}) \ do |(k, v), h| h[k.to_s.sub('labels.', '')] = v if k.to_s.start_with? 'labels.' end @metrics_resource = { type: resource_type, labels: resource_labels } end end # If monitoring is enabled, register metrics in the default registry # and store metric objects for future use. if @enable_monitoring unless Monitoring::MonitoringRegistryFactory.supports_monitoring_type( @monitoring_type ) @log.warn "monitoring_type '#{@monitoring_type}' is unknown; "\ 'there will be no metrics' end @monitoring_resource = if @metrics_resource @utils.create_monitored_resource( @metrics_resource[:type], @metrics_resource[:labels] ) else @resource end @registry = Monitoring::MonitoringRegistryFactory .create(@monitoring_type, @project_id, @monitoring_resource, @gcm_service_address) # Export metrics every 60 seconds. timer_execute(:export_metrics, 60) { @registry.export } # Uptime should be a gauge, but the metric definition is a counter and # we can't change it. @uptime_metric = @registry.counter( :uptime, [:version], 'Uptime of Logging agent', 'agent.googleapis.com/agent', 'CUMULATIVE' ) update_uptime timer_execute(:update_uptime, 1) { update_uptime } @successful_requests_count = @registry.counter( :stackdriver_successful_requests_count, %i[grpc code], 'A number of successful requests to the Stackdriver Logging API', 'agent.googleapis.com/agent', 'CUMULATIVE' ) @failed_requests_count = @registry.counter( :stackdriver_failed_requests_count, %i[grpc code], 'A number of failed requests to the Stackdriver Logging '\ 'API, broken down by the error code', 'agent.googleapis.com/agent', 'CUMULATIVE' ) @ingested_entries_count = @registry.counter( :stackdriver_ingested_entries_count, %i[grpc code], 'A number of log entries ingested by Stackdriver Logging', 'agent.googleapis.com/agent', 'CUMULATIVE' ) @dropped_entries_count = @registry.counter( :stackdriver_dropped_entries_count, %i[grpc code], 'A number of log entries dropped by the Stackdriver output plugin', 'agent.googleapis.com/agent', 'CUMULATIVE' ) @retried_entries_count = @registry.counter( :stackdriver_retried_entries_count, %i[grpc code], 'The number of log entries that failed to be ingested by '\ 'the Stackdriver output plugin due to a transient error '\ 'and were retried', 'agent.googleapis.com/agent', 'CUMULATIVE' ) @ok_code = @use_grpc ? GRPC::Core::StatusCodes::OK : 200 end # Set regexp that we should match tags against later on. Using a list # instead of a map to ensure order. @tag_regexp_list = [] if @resource.type == GKE_CONSTANTS[:resource_type] @tag_regexp_list << [ GKE_CONSTANTS[:resource_type], @compiled_kubernetes_tag_regexp ] end # Determine the common labels that should be added to all log entries # processed by this logging agent. @common_labels = determine_agent_level_common_labels(@resource) # The resource and labels are now set up; ensure they can't be modified # without first duping them. @resource.freeze @resource.labels.freeze @common_labels.freeze if @use_grpc @construct_log_entry = method(:construct_log_entry_in_grpc_format) @write_request = method(:write_request_via_grpc) else @construct_log_entry = method(:construct_log_entry_in_rest_format) @write_request = method(:write_request_via_rest) end return unless [Common::Platform::GCE, Common::Platform::EC2].include?(@platform) # Log an informational message containing the Logs viewer URL @log.info 'Logs viewer address: https://console.cloud.google.com/logs/', "viewer?project=#{@project_id}&resource=#{@resource.type}/", "instance_id/#{@vm_id}" end
# File lib/fluent/plugin/out_google_cloud.rb, line 807 def multi_workers_ready? true end
# File lib/fluent/plugin/out_google_cloud.rb, line 664 def shutdown super # Export metrics on shutdown. This is a best-effort attempt, and it might # fail, for instance if there was a recent write to the same time series. @registry&.export end
# File lib/fluent/plugin/out_google_cloud.rb, line 644 def start super init_api_client @successful_call = false @timenanos_warning = false return unless @statusz_port.positive? @log.info "Starting statusz server on port #{@statusz_port}" server_create(:out_google_cloud_statusz, @statusz_port, bind: '127.0.0.1') do |data, conn| if data.split(' ')[1] == '/statusz' write_html_response(data, conn, 200, Statusz.response(self)) else write_html_response(data, conn, 404, "Not found\n") end end end
# File lib/fluent/plugin/out_google_cloud.rb, line 815 def update_uptime now = Time.now.to_i @uptime_metric.increment( by: now - @uptime_update_time, labels: { version: Fluent::GoogleCloudOutput.version_string } ) @uptime_update_time = now end
# File lib/fluent/plugin/out_google_cloud.rb, line 671 def write(chunk) grouped_entries = group_log_entries_by_tag_and_local_resource_id(chunk) requests_to_send = [] grouped_entries.each do |(tag, local_resource_id), arr| entries = [] group_level_resource, group_level_common_labels = determine_group_level_monitored_resource_and_labels( tag, local_resource_id ) arr.each do |time, record| entry_level_resource, entry_level_common_labels = determine_entry_level_monitored_resource_and_labels( group_level_resource, group_level_common_labels, record ) is_json = false if @detect_json # Save the following fields if available, then clear them out to # allow for determining whether we should parse the log or message # field. # This list should be in sync with # https://cloud.google.com/logging/docs/agent/configuration#special-fields. preserved_keys = [ 'time', 'timeNanos', 'timestamp', 'timestampNanos', 'timestampSeconds', 'severity', @http_request_key, @insert_id_key, @labels_key, @operation_key, @source_location_key, @span_id_key, @trace_key, @trace_sampled_key ] # If the log is json, we want to export it as a structured log # unless there is additional metadata that would be lost. record_json = nil if (record.keys - preserved_keys).length == 1 %w[log message msg].each do |field| record_json = parse_json_or_nil(record[field]) if record.key?(field) end end unless record_json.nil? # Propagate these if necessary. Note that we don't want to # override these keys in the JSON we've just parsed. preserved_keys.each do |key| record_json[key] ||= record[key] if record.key?(key) && !record_json.key?(key) end record = record_json is_json = true end end ts_secs, ts_nanos, timestamp = compute_timestamp(record, time) ts_secs, ts_nanos = adjust_timestamp_if_invalid(timestamp, Time.now) \ if @adjust_invalid_timestamps && timestamp severity = compute_severity( entry_level_resource.type, record, entry_level_common_labels ) dynamic_labels_from_payload = parse_labels(record) if dynamic_labels_from_payload entry_level_common_labels.merge!( dynamic_labels_from_payload ) end entry = @construct_log_entry.call(entry_level_common_labels, entry_level_resource, severity, ts_secs, ts_nanos) insert_id = record.delete(@insert_id_key) entry.insert_id = insert_id if insert_id span_id = record.delete(@span_id_key) entry.span_id = span_id if span_id trace = record.delete(@trace_key) entry.trace = compute_trace(trace) if trace trace_sampled = record.delete(@trace_sampled_key) entry.trace_sampled = parse_bool(trace_sampled) unless trace_sampled.nil? set_log_entry_fields(record, entry) set_payload(entry_level_resource.type, record, entry, is_json) entries.push(entry) end # Don't send an empty request if we rejected all the entries. next if entries.empty? log_name = "projects/#{@project_id}/logs/#{log_name( tag, group_level_resource )}" requests_to_send << { entries: entries, log_name: log_name, resource: group_level_resource, labels: group_level_common_labels } end if @split_logs_by_tag requests_to_send.each do |request| @write_request.call(**request) end else # Combine all requests into one. The request level "log_name" will be # ported to the entry level. The request level "resource" and "labels" # are ignored as they should have been folded into the entry level # "resource" and "labels" already anyway. combined_entries = [] requests_to_send.each do |request| request[:entries].each do |entry| # Modify entries in-place as they are not needed later on. entry.log_name = request[:log_name] end combined_entries.concat(request[:entries]) end @write_request.call(entries: combined_entries) unless combined_entries.empty? end end
Private Instance Methods
Adjust timestamps from the future. The base case is:
-
The parsed timestamp is less than one day into the future.
This is allowed by the API, and should be left unchanged.
Beyond that, there are two cases:
-
The parsed timestamp is later in the current year:
This can happen when system log lines from previous years are missing the year, so the date parser assumes the current year. We treat these lines as coming from last year. This could label 2-year-old logs incorrectly, but this probably isn’t super important.
-
The parsed timestamp is past the end of the current year:
Since the year is different from the current year, this isn’t the missing year in system logs. It is unlikely that users explicitly write logs at a future date. This could result from an unsynchronized clock on a VM, or some random value being parsed as the timestamp. We reset the timestamp on those lines to the default value and let the downstream API handle it.
# File lib/fluent/plugin/out_google_cloud.rb, line 1427 def adjust_timestamp_if_invalid(timestamp, current_time) ts_secs = timestamp.tv_sec ts_nanos = timestamp.tv_nsec next_year = Time.mktime(current_time.year + 1) one_day_later = current_time.to_datetime.next_day.to_time if timestamp < one_day_later # Case 0. # Leave the timestamp as-is. elsif timestamp >= next_year # Case 2. ts_secs = 0 ts_nanos = 0 else # Case 1. adjusted_timestamp = timestamp.to_datetime.prev_year.to_time ts_secs = adjusted_timestamp.tv_sec # The value of ts_nanos should not change when subtracting a year. end [ts_secs, ts_nanos] end
# File lib/fluent/plugin/out_google_cloud.rb, line 1853 def api_client # For gRPC side, the Channel will take care of tokens and their renewal # (https://grpc.io/docs/guides/auth.html#authentication-api). if !@use_grpc && @client.authorization.expired? begin @client.authorization.fetch_access_token! rescue MultiJson::ParseError # Workaround an issue in the API client; just re-raise a more # descriptive error for the user (which will still cause a retry). raise Google::APIClient::ClientError, 'Unable to fetch access token (no scopes configured?)' end end @client end
# File lib/fluent/plugin/out_google_cloud.rb, line 1447 def compute_severity(resource_type, record, entry_level_common_labels) if record.key?('severity') return parse_severity(record.delete('severity')) elsif resource_type == GKE_CONSTANTS[:resource_type] stream = entry_level_common_labels["#{GKE_CONSTANTS[:service]}/stream"] return GKE_CONSTANTS[:stream_severity_map].fetch(stream, 'DEFAULT') end 'DEFAULT' end
# File lib/fluent/plugin/out_google_cloud.rb, line 1353 def compute_timestamp(record, time) if record.key?('timestamp') && record['timestamp'].is_a?(Hash) && record['timestamp'].key?('seconds') && record['timestamp'].key?('nanos') ts_secs = record['timestamp']['seconds'] ts_nanos = record['timestamp']['nanos'] record.delete('timestamp') timestamp = time_or_nil(ts_secs, ts_nanos) elsif record.key?('timestampSeconds') && record.key?('timestampNanos') ts_secs = record.delete('timestampSeconds') ts_nanos = record.delete('timestampNanos') timestamp = time_or_nil(ts_secs, ts_nanos) elsif record.key?('timeNanos') # This is deprecated since the precision is insufficient. # Use timestampSeconds/timestampNanos instead nanos = record.delete('timeNanos') ts_secs = (nanos / 1_000_000_000).to_i ts_nanos = nanos % 1_000_000_000 unless @timenanos_warning # Warn the user this is deprecated, but only once to avoid spam. @timenanos_warning = true @log.warn 'timeNanos is deprecated - please use ' \ 'timestampSeconds and timestampNanos instead.' end timestamp = time_or_nil(ts_secs, ts_nanos) elsif record.key?('time') # k8s ISO8601 timestamp begin timestamp = Time.iso8601(record.delete('time')) rescue StandardError timestamp = Time.at(time) end ts_secs = timestamp.tv_sec ts_nanos = timestamp.tv_nsec else timestamp = Time.at(time) ts_secs = timestamp.tv_sec ts_nanos = timestamp.tv_nsec end ts_secs = begin Integer ts_secs rescue ArgumentError, TypeError ts_secs end ts_nanos = begin Integer ts_nanos rescue ArgumentError, TypeError ts_nanos end [ts_secs, ts_nanos, timestamp] end
# File lib/fluent/plugin/out_google_cloud.rb, line 837 def compute_trace(trace) return trace unless @autoformat_stackdriver_trace && STACKDRIVER_TRACE_ID_REGEXP.match(trace) "projects/#{@project_id}/traces/#{trace}" end
Extract a map of error details from a potentially partially successful REST request.
The keys in this map are [error_code, error_message] pairs, and the values are a list of stringified indexes of log entries that failed due to this error.
A sample error.body looks like: {
"error": { "code": 403, "message": "User not authorized.", "status": "PERMISSION_DENIED", "details": [ { "@type": "type.googleapis.com/google.logging.v2.WriteLogEntriesPar tialErrors", "logEntryErrors": { "0": { "code": 7, "message": "User not authorized." }, "1": { "code": 3, "message": "Log name contains illegal character :" }, "3": { "code": 3, "message": "Log name contains illegal character :" } } }, { "@type": "type.googleapis.com/google.rpc.DebugInfo", "detail": ... } ] }
}
The root level “code”, “message”, and “status” simply match the root cause of the first failed log entry. For example, if we switched the order of the log entries, then we would get: {
"error" : { "code" : 400, "message" : "Log name contains illegal character :", "status" : "INVALID_ARGUMENT", "details": ... }
} We will ignore it anyway and look at the details instead which includes info for all failed log entries.
In this example, the logEntryErrors that we care are: {
"0": { "code": 7, "message": "User not authorized." }, "1": { "code": 3, "message": "Log name contains illegal character :" }, "3": { "code": 3, "message": "Log name contains illegal character :" }
}
The ultimate map that is constructed is: {
[7, 'User not authorized.']: ['0'], [3, 'Log name contains illegal character :']: ['1', '3']
}
# File lib/fluent/plugin/out_google_cloud.rb, line 1969 def construct_error_details_map(error) error_details_map = Hash.new { |h, k| h[k] = [] } error_details = ensure_array( ensure_hash(ensure_hash(JSON.parse(error.body))['error'])['details'] ) partial_errors = error_details.detect( -> { raise JSON::ParserError, "No type #{PARTIAL_ERROR_FIELD}." } ) do |error_detail| ensure_hash(error_detail)['@type'] == PARTIAL_ERROR_FIELD end log_entry_errors = ensure_hash( ensure_hash(partial_errors)['logEntryErrors'] ) log_entry_errors.each do |index, log_entry_error| error_hash = ensure_hash(log_entry_error) unless error_hash['code'] && error_hash['message'] raise JSON::ParserError, "Entry #{index} is missing 'code' or 'message'." end error_key = [error_hash['code'], error_hash['message']].freeze # TODO(qingling128): Convert indexes to integers. error_details_map[error_key] << index end error_details_map rescue JSON::ParserError => e @log.warn 'Failed to extract log entry errors from the error details:' \ " #{error.body}.", error: e {} end
Extract a map of error details from a potentially partially successful gRPC request.
The keys in this map are [error_code, error_message] pairs, and the values are a list of indexes of log entries that failed due to this error.
A sample error looks like: <Google::Cloud::PermissionDeniedError:
message: 'User not authorized.', details: [ <Google::Cloud::Logging::V2::WriteLogEntriesPartialErrors: log_entry_errors: { 0 => <Google::Rpc::Status: code: 7, message: "User not authorized.", details: []>, 1 => <Google::Rpc::Status: code: 3, message: "Log name contains illegal character :", details: []>, 3 => <Google::Rpc::Status: code: 3, message: "Log name contains illegal character :", details: []> } >, <Google::Rpc::DebugInfo: stack_entries: [], detail: "..." > ] cause: <GRPC::PermissionDenied: 7:User not authorized.>
}
The ultimate map that is constructed is: {
[7, 'User not authorized.']: [0], [3, 'Log name contains illegal character :']: [1, 3]
}
# File lib/fluent/plugin/out_google_cloud.rb, line 2039 def construct_error_details_map_grpc(gax_error) @log.error "construct_error_details_map_grpc: #{gax_error}" error_details_map = Hash.new { |h, k| h[k] = [] } error_details = ensure_array(gax_error.status_details) raise JSON::ParserError, 'The error details are empty.' if error_details.empty? raise JSON::ParserError, 'No partial error info in error details.' unless error_details[0].is_a?( Google::Cloud::Logging::V2::WriteLogEntriesPartialErrors ) log_entry_errors = ensure_hash(error_details[0].log_entry_errors) log_entry_errors.each do |index, log_entry_error| error_key = [log_entry_error[:code], log_entry_error[:message]].freeze error_details_map[error_key] << index end error_details_map rescue JSON::ParserError => e @log.warn 'Failed to extract log entry errors from the error details:' \ " #{gax_error.details.inspect}.", error: e {} end
# File lib/fluent/plugin/out_google_cloud.rb, line 844 def construct_log_entry_in_grpc_format(labels, resource, severity, ts_secs, ts_nanos) entry = Google::Cloud::Logging::V2::LogEntry.new( labels: labels, resource: Google::Api::MonitoredResource.new( type: resource.type, labels: resource.labels.to_h ), severity: grpc_severity(severity) ) # If "seconds" is null or not an integer, we will omit the timestamp # field and defer the decision on how to handle it to the downstream # Logging API. If "nanos" is null or not an integer, it will be set # to 0. if ts_secs.is_a?(Integer) ts_nanos = 0 unless ts_nanos.is_a?(Integer) entry.timestamp = Google::Protobuf::Timestamp.new( seconds: ts_secs, nanos: ts_nanos ) end entry end
# File lib/fluent/plugin/out_google_cloud.rb, line 871 def construct_log_entry_in_rest_format(labels, resource, severity, ts_secs, ts_nanos) # Remove the labels if we didn't populate them with anything. resource.labels = nil if resource.labels.empty? Google::Apis::LoggingV2::LogEntry.new( labels: labels, resource: resource, severity: severity, timestamp: { seconds: ts_secs, nanos: ts_nanos } ) end
Encode as UTF-8. If ‘coerce_to_utf8’ is set to true in the config, any non-UTF-8 character would be replaced by the string specified by ‘non_utf8_replacement_string’. If ‘coerce_to_utf8’ is set to false, any non-UTF-8 character would trigger the plugin to error out.
# File lib/fluent/plugin/out_google_cloud.rb, line 1873 def convert_to_utf8(input) if @coerce_to_utf8 input.encode( 'utf-8', invalid: :replace, undef: :replace, replace: @non_utf8_replacement_string ) else begin input.encode('utf-8') rescue EncodingError @log.error 'Encountered encoding issues potentially due to non ' \ 'UTF-8 characters. To allow non-UTF-8 characters and ' \ 'replace them with spaces, please set "coerce_to_utf8" ' \ 'to true.' raise end end end
For every original_label => new_label pair in the label_map, delete the original_label from the hash map if it exists, and extract the value to form a map with the new_label as the key.
# File lib/fluent/plugin/out_google_cloud.rb, line 1697 def delete_and_extract_labels(hash, label_map) return {} if label_map.nil? || !label_map.is_a?(Hash) || hash.nil? || !hash.is_a?(Hash) label_map.each_with_object({}) \ do |(original_label, new_label), extracted_labels| value = hash.delete(original_label) extracted_labels[new_label] = convert_to_utf8(value.to_s) if value end end
Determine the common labels that should be added to all log entries processed by this logging agent.
# File lib/fluent/plugin/out_google_cloud.rb, line 1117 def determine_agent_level_common_labels(resource) labels = {} # User can specify labels via config. We want to capture those as well. labels.merge!(@labels) if @labels case resource.type # GAE, Cloud Dataflow, Cloud Dataproc and Cloud ML. when APPENGINE_CONSTANTS[:resource_type], DATAFLOW_CONSTANTS[:resource_type], DATAPROC_CONSTANTS[:resource_type], ML_CONSTANTS[:resource_type] labels.merge!( "#{COMPUTE_CONSTANTS[:service]}/resource_id" => @vm_id, "#{COMPUTE_CONSTANTS[:service]}/resource_name" => @vm_name, "#{COMPUTE_CONSTANTS[:service]}/zone" => @zone ) # GCE instance and GKE container. when COMPUTE_CONSTANTS[:resource_type], GKE_CONSTANTS[:resource_type] labels["#{COMPUTE_CONSTANTS[:service]}/resource_name"] = @vm_name # EC2. when EC2_CONSTANTS[:resource_type] labels["#{EC2_CONSTANTS[:service]}/resource_name"] = @vm_name end labels end
Extract entry level monitored resource and common labels that should be applied to individual entries.
# File lib/fluent/plugin/out_google_cloud.rb, line 1274 def determine_entry_level_monitored_resource_and_labels( group_level_resource, group_level_common_labels, record ) resource = group_level_resource.dup resource.labels = group_level_resource.labels.dup common_labels = group_level_common_labels.dup case resource.type # GKE container. when GKE_CONSTANTS[:resource_type] # Move the stdout/stderr annotation from the record into a label. common_labels.merge!( delete_and_extract_labels( record, 'stream' => "#{GKE_CONSTANTS[:service]}/stream" ) ) # If the record has been annotated by the kubernetes_metadata_filter # plugin, then use that metadata. Otherwise, rely on commonLabels # populated from the group's tag. if record.key?('kubernetes') resource.labels.merge!( delete_and_extract_labels( record['kubernetes'], GKE_CONSTANTS[:extra_resource_labels] .map { |l| [l, l] }.to_h ) ) common_labels.merge!( delete_and_extract_labels( record['kubernetes'], GKE_CONSTANTS[:extra_common_labels] .map { |l| [l, "#{GKE_CONSTANTS[:service]}/#{l}"] }.to_h ) ) # Prepend label/ to all user-defined labels' keys. if record['kubernetes'].key?('labels') common_labels.merge!( delete_and_extract_labels( record['kubernetes']['labels'], record['kubernetes']['labels'] .map { |key, _| [key, "label/#{key}"] }.to_h ) ) end # We've explicitly consumed all the fields we care about -- don't # litter the log entries with the remaining fields that the kubernetes # metadata filter plugin includes (or an empty 'kubernetes' field). record.delete('kubernetes') record.delete('docker') end end # If the name of a field in the record is present in the @label_map # configured by users, report its value as a label and do not send that # field as part of the payload. common_labels.merge!(delete_and_extract_labels(record, @label_map)) # Cloud Dataflow and Cloud ML. # These labels can be set via the 'labels' or 'label_map' options. # Report them as monitored resource labels instead of common labels. # e.g. "dataflow.googleapis.com/job_id" => "job_id" [DATAFLOW_CONSTANTS, ML_CONSTANTS].each do |service_constants| next unless resource.type == service_constants[:resource_type] resource.labels.merge!( delete_and_extract_labels( common_labels, service_constants[:extra_resource_labels] .map { |l| ["#{service_constants[:service]}/#{l}", l] }.to_h ) ) end [resource, common_labels] end
Determine the group level monitored resource and common labels shared by a collection of entries.
# File lib/fluent/plugin/out_google_cloud.rb, line 1174 def determine_group_level_monitored_resource_and_labels(tag, local_resource_id) resource = @resource.dup resource.labels = @resource.labels.dup common_labels = @common_labels.dup # Change the resource type and set matched_regexp_group if the tag matches # certain regexp. matched_regexp_group = nil # @tag_regexp_list can be an empty list. @tag_regexp_list.each do |derived_type, tag_regexp| matched_regexp_group = tag_regexp.match(tag) if matched_regexp_group resource.type = derived_type break end end # Determine the monitored resource based on the local_resource_id. # Different monitored resource types have unique ids in different format. # We will query Metadata Agent for the monitored resource. Return the # legacy monitored resource (either the instance resource or the resource # inferred from the tag) if failed to get a monitored resource from # Metadata Agent with this key. # # Examples: # // GKE Pod. # "k8s_pod.<namespace_name>.<pod_name>" # // GKE container. # "k8s_container.<namespace_name>.<pod_name>.<container_name>" if local_resource_id converted_resource = monitored_resource_from_local_resource_id( local_resource_id ) resource = converted_resource if converted_resource end # Once the resource type is settled down, determine the labels. case resource.type # GKE container. when GKE_CONSTANTS[:resource_type] if matched_regexp_group # We only expect one occurrence of each key in the match group. resource_labels_candidates = matched_regexp_group.names.zip(matched_regexp_group.captures).to_h common_labels_candidates = resource_labels_candidates.dup resource.labels.merge!( delete_and_extract_labels( resource_labels_candidates, # The kubernetes_tag_regexp is poorly named. 'namespace_name' is # in fact 'namespace_id'. 'pod_name' is in fact 'pod_id'. # TODO(qingling128): Figure out how to put this map into # constants like GKE_CONSTANTS[:extra_resource_labels]. 'container_name' => 'container_name', 'namespace_name' => 'namespace_id', 'pod_name' => 'pod_id' ) ) common_labels.merge!( delete_and_extract_labels( common_labels_candidates, GKE_CONSTANTS[:extra_common_labels] .map { |l| [l, "#{GKE_CONSTANTS[:service]}/#{l}"] }.to_h ) ) end # TODO(qingling128): Temporary fallback for metadata agent restarts. # K8s resources. when K8S_CONTAINER_CONSTANTS[:resource_type], K8S_POD_CONSTANTS[:resource_type], K8S_NODE_CONSTANTS[:resource_type] common_labels.delete("#{COMPUTE_CONSTANTS[:service]}/resource_name") end # Cloud Dataflow and Cloud ML. # These labels can be set via the 'labels' option. # Report them as monitored resource labels instead of common labels. # e.g. "dataflow.googleapis.com/job_id" => "job_id" [DATAFLOW_CONSTANTS, ML_CONSTANTS].each do |service_constants| next unless resource.type == service_constants[:resource_type] resource.labels.merge!( delete_and_extract_labels( common_labels, service_constants[:extra_resource_labels] .map { |l| ["#{service_constants[:service]}/#{l}", l] }.to_h ) ) end resource.freeze resource.labels.freeze common_labels.freeze [resource, common_labels] end
Convert the value to a Ruby array.
# File lib/fluent/plugin/out_google_cloud.rb, line 2136 def ensure_array(value) Array.try_convert(value) || (raise JSON::ParserError, value.class.to_s) end
Convert the value to a Ruby hash.
# File lib/fluent/plugin/out_google_cloud.rb, line 2141 def ensure_hash(value) Hash.try_convert(value) || (raise JSON::ParserError, value.class.to_s) end
# File lib/fluent/plugin/out_google_cloud.rb, line 1671 def format(tag, time, record) Fluent::MessagePackFactory .engine_factory .packer .write([tag, time, record]) .to_s end
Group the log entries by tag and local_resource_id pairs. Also filter out invalid non-Hash entries.
# File lib/fluent/plugin/out_google_cloud.rb, line 1148 def group_log_entries_by_tag_and_local_resource_id(chunk) groups = {} chunk.msgpack_each do |tag, time, record| unless record.is_a?(Hash) @log.warn 'Dropping log entries with malformed record: ' \ "'#{record.inspect}' from tag '#{tag}' at '#{time}'. " \ 'A log record should be in JSON format.' next end sanitized_tag = sanitize_tag(tag) if sanitized_tag.nil? @log.warn "Dropping log entries with invalid tag: '#{tag.inspect}'." \ ' A tag should be a string with utf8 characters.' next end local_resource_id = record.delete(LOCAL_RESOURCE_ID_KEY) # A nil local_resource_id means "fall back to legacy". hash_key = [sanitized_tag, local_resource_id].freeze groups[hash_key] ||= [] groups[hash_key].push([time, record]) end groups end
# File lib/fluent/plugin/out_google_cloud.rb, line 1622 def grpc_severity(severity) # TODO: find out why this doesn't work. # if severity.is_a? String # return Google::Cloud::Logging::Type::LogSeverity.resolve(severity) # end return GRPC_SEVERITY_MAPPING[severity] if GRPC_SEVERITY_MAPPING.key?(severity) severity end
Increment the metric for the number of log entries that were dropped and not ingested by the Stackdriver Logging API.
# File lib/fluent/plugin/out_google_cloud.rb, line 2176 def increment_dropped_entries_count(count, code) return unless @dropped_entries_count @dropped_entries_count.increment( labels: { grpc: @use_grpc, code: code }, by: count ) end
Increment the metric for the number of failed requests, labeled by the provided status code.
# File lib/fluent/plugin/out_google_cloud.rb, line 2156 def increment_failed_requests_count(code) return unless @failed_requests_count @failed_requests_count.increment( labels: { grpc: @use_grpc, code: code } ) end
Increment the metric for the number of log entries, successfully ingested by the Stackdriver Logging API.
# File lib/fluent/plugin/out_google_cloud.rb, line 2166 def increment_ingested_entries_count(count) return unless @ingested_entries_count @ingested_entries_count.increment( labels: { grpc: @use_grpc, code: @ok_code }, by: count ) end
Increment the metric for the number of log entries that were dropped and not ingested by the Stackdriver Logging API.
# File lib/fluent/plugin/out_google_cloud.rb, line 2186 def increment_retried_entries_count(count, code) return unless @retried_entries_count @retried_entries_count.increment( labels: { grpc: @use_grpc, code: code }, by: count ) end
Increment the metric for the number of successful requests.
# File lib/fluent/plugin/out_google_cloud.rb, line 2146 def increment_successful_requests_count return unless @successful_requests_count @successful_requests_count.increment( labels: { grpc: @use_grpc, code: @ok_code } ) end
# File lib/fluent/plugin/out_google_cloud.rb, line 1803 def init_api_client # Set up the logger for the auto-generated Google Cloud APIs. Google::Apis.logger = @log if @use_grpc uri = URI.parse(@logging_api_url) host = uri.host unless host raise Fluent::ConfigError, 'The logging_api_url option specifies an invalid URL:' \ " #{@logging_api_url}." end if @grpc_compression_algorithm compression_options = GRPC::Core::CompressionOptions.new( default_algorithm: @grpc_compression_algorithm ) compression_channel_args = compression_options.to_channel_arg_hash else compression_channel_args = {} end if uri.scheme == 'https' ssl_creds = GRPC::Core::ChannelCredentials.new authentication = Google::Auth.get_application_default creds = GRPC::Core::CallCredentials.new(authentication.updater_proc) creds = ssl_creds.compose(creds) else creds = :this_channel_is_insecure end port = ":#{uri.port}" if uri.port user_agent = \ "#{PLUGIN_NAME}/#{PLUGIN_VERSION} grpc-ruby/#{GRPC::VERSION} " \ "#{Google::Apis::OS_VERSION}" channel_args = { 'grpc.primary_user_agent' => user_agent } .merge!(compression_channel_args) @client = Google::Cloud::Logging::V2::LoggingService::Client.new do |config| config.credentials = GRPC::Core::Channel.new( "#{host}#{port}", channel_args, creds ) end else # TODO: Use a non-default ClientOptions object. Google::Apis::ClientOptions.default.application_name = PLUGIN_NAME Google::Apis::ClientOptions.default.application_version = PLUGIN_VERSION @client = Google::Apis::LoggingV2::LoggingService.new @client.authorization = Google::Auth.get_application_default( Common::LOGGING_SCOPE ) end end
# File lib/fluent/plugin/out_google_cloud.rb, line 1736 def list_from_ruby(arr) ret = Google::Protobuf::ListValue.new arr.each do |v| ret.values << value_from_ruby(v) end ret end
# File lib/fluent/plugin/out_google_cloud.rb, line 1788 def log_name(tag, resource) if resource.type == APPENGINE_CONSTANTS[:resource_type] # Add a prefix to Managed VM logs to prevent namespace collisions. tag = "#{APPENGINE_CONSTANTS[:service]}/#{tag}" elsif resource.type == GKE_CONSTANTS[:resource_type] # For Kubernetes logs, use just the container name as the log name # if we have it. if resource.labels&.key?('container_name') sanitized_tag = sanitize_tag(resource.labels['container_name']) tag = sanitized_tag unless sanitized_tag.nil? end end ERB::Util.url_encode(tag) end
Take a locally unique resource id and convert it to the globally unique monitored resource.
# File lib/fluent/plugin/out_google_cloud.rb, line 2064 def monitored_resource_from_local_resource_id(local_resource_id) return unless /^ (?<resource_type>k8s_container) \.(?<namespace_name>[0-9a-z-]+) \.(?<pod_name>[.0-9a-z-]+) \.(?<container_name>[0-9a-z-]+)$/x =~ local_resource_id || /^ (?<resource_type>k8s_pod) \.(?<namespace_name>[0-9a-z-]+) \.(?<pod_name>[.0-9a-z-]+)$/x =~ local_resource_id || /^ (?<resource_type>k8s_node) \.(?<node_name>[0-9a-z-]+)$/x =~ local_resource_id # Clear name and location if they're explicitly set to empty. @k8s_cluster_name = nil if @k8s_cluster_name == '' @k8s_cluster_location = nil if @k8s_cluster_location == '' begin @k8s_cluster_name ||= @utils.fetch_gce_metadata( @platform, 'instance/attributes/cluster-name' ) @k8s_cluster_location ||= @utils.fetch_gce_metadata( @platform, 'instance/attributes/cluster-location' ) rescue StandardError => e @log.error 'Failed to retrieve k8s cluster name and location.', \ error: e end case resource_type when K8S_CONTAINER_CONSTANTS[:resource_type] labels = { 'namespace_name' => namespace_name, 'pod_name' => pod_name, 'container_name' => container_name, 'cluster_name' => @k8s_cluster_name, 'location' => @k8s_cluster_location } fallback_resource = GKE_CONSTANTS[:resource_type] when K8S_POD_CONSTANTS[:resource_type] labels = { 'namespace_name' => namespace_name, 'pod_name' => pod_name, 'cluster_name' => @k8s_cluster_name, 'location' => @k8s_cluster_location } fallback_resource = GKE_CONSTANTS[:resource_type] when K8S_NODE_CONSTANTS[:resource_type] labels = { 'node_name' => node_name, 'cluster_name' => @k8s_cluster_name, 'location' => @k8s_cluster_location } fallback_resource = COMPUTE_CONSTANTS[:resource_type] end unless @k8s_cluster_name && @k8s_cluster_location @log.error "Failed to construct #{resource_type} resource locally." \ ' Falling back to writing logs against' \ " #{fallback_resource} resource.", error: e return end constructed_resource = Google::Apis::LoggingV2::MonitoredResource.new( type: resource_type, labels: labels ) @log.debug("Constructed #{resource_type} resource locally: " \ "#{constructed_resource.inspect}") constructed_resource end
# File lib/fluent/plugin/out_google_cloud.rb, line 1640 def parse_bool(value) [true, 'true', 1].include?(value) end
# File lib/fluent/plugin/out_google_cloud.rb, line 1636 def parse_int(value) value.to_i end
# File lib/fluent/plugin/out_google_cloud.rb, line 1084 def parse_json_or_nil(input) return nil unless input.is_a?(String) input.each_codepoint do |c| if c == 123 # left curly bracket (U+007B) begin return JSON.parse(input) rescue JSON::ParserError return nil end else # Break (and return nil) unless the current character is whitespace, # in which case we continue to look for a left curly bracket. # Whitespace as per the JSON spec are: tabulation (U+0009), # line feed (U+000A), carriage return (U+000D), and space (U+0020). break unless [9, 10, 13, 32].include?(c) end end nil end
Parse labels. Return nil if not set.
# File lib/fluent/plugin/out_google_cloud.rb, line 1509 def parse_labels(record) payload_labels = record.delete(@labels_key) return nil unless payload_labels unless payload_labels.is_a?(Hash) @log.error "Invalid value of '#{@labels_key}' in the payload: " \ "#{payload_labels}. Labels need to be a JSON object." return nil end non_string_keys = payload_labels.each_with_object([]) do |(k, v), a| a << k unless k.is_a?(String) && v.is_a?(String) end unless non_string_keys.empty? @log.error "Invalid value of '#{@labels_key}' in the payload: " \ "#{payload_labels}. Labels need string values for all " \ "keys; keys #{non_string_keys} don't." return nil end payload_labels rescue StandardError => e @log.error "Failed to extract '#{@labels_key}' from payload.", e nil end
# File lib/fluent/plugin/out_google_cloud.rb, line 1644 def parse_latency(latency) # Parse latency. # If no valid format is detected, return nil so we can later skip # setting latency. # Format: whitespace (opt.) + integer + point & decimal (opt.) # + whitespace (opt.) + "s" + whitespace (opt.) # e.g.: "1.42 s" match = @compiled_http_latency_regexp.match(latency) return nil unless match # Split the integer and decimal parts in order to calculate # seconds and nanos. seconds = match['seconds'].to_i nanos = (match['decimal'].to_f * 1000 * 1000 * 1000).round if @use_grpc Google::Protobuf::Duration.new( seconds: seconds, nanos: nanos ) else { seconds: seconds, nanos: nanos }.delete_if { |_, v| v.zero? } end end
# File lib/fluent/plugin/out_google_cloud.rb, line 1569 def parse_severity(severity_str) # The API is case insensitive, but uppercase to make things simpler. severity = severity_str.to_s.upcase.strip # If the severity is already valid, just return it. return severity if VALID_SEVERITIES.include?(severity) # If the severity is an integer (string) return it as an integer, # truncated to the closest valid value (multiples of 100 between 0-800). if /\A\d+\z/ =~ severity begin numeric_severity = (severity.to_i / 100) * 100 case when numeric_severity.negative? return 0 when numeric_severity > 800 return 800 else return numeric_severity end rescue StandardError return 'DEFAULT' end end # Try to translate the severity. return SEVERITY_TRANSLATIONS[severity] if SEVERITY_TRANSLATIONS.key?(severity) # If all else fails, use 'DEFAULT'. 'DEFAULT' end
# File lib/fluent/plugin/out_google_cloud.rb, line 1632 def parse_string(value) value.to_s end
Given a tag, returns the corresponding valid tag if possible, or nil if the tag should be rejected. If ‘require_valid_tags’ is false, non-string tags are converted to strings, and invalid characters are sanitized; otherwise such tags are rejected.
# File lib/fluent/plugin/out_google_cloud.rb, line 1683 def sanitize_tag(tag) if @require_valid_tags && (!tag.is_a?(String) || tag == '' || convert_to_utf8(tag) != tag) return nil end tag = convert_to_utf8(tag.to_s) tag = '_' if tag == '' tag end
# File lib/fluent/plugin/out_google_cloud.rb, line 1458 def set_log_entry_fields(record, entry) # TODO(qingling128) On the next major after 0.7.4, make all logEntry # subfields behave the same way: if the field is not in the correct # format, log an error in the Fluentd log and remove this field from # payload. This is the preferred behavior per PM decision. LOG_ENTRY_FIELDS_MAP.each do |field_name, config| payload_key, subfields, grpc_class, non_grpc_class = config begin payload_key = instance_variable_get(payload_key) fields = record[payload_key] record.delete(payload_key) if fields.nil? next unless fields.is_a?(Hash) extracted_subfields = subfields.each_with_object({}) \ do |(original_key, destination_key, cast_fn), extracted_fields| value = fields.delete(original_key) next if value.nil? begin casted_value = send(cast_fn, value) rescue TypeError @log.error "Failed to #{cast_fn} for #{field_name}." \ "#{original_key} with value #{value.inspect}.", err next end next if casted_value.nil? extracted_fields[destination_key] = casted_value end next unless extracted_subfields output = if @use_grpc Object.const_get(grpc_class).new else Object.const_get(non_grpc_class).new end extracted_subfields.each do |key, value| output.send("#{key}=", value) end record.delete(payload_key) if fields.empty? entry.send("#{field_name}=", output) rescue StandardError => e @log.error "Failed to set log entry field for #{field_name}.", e end end end
TODO(qingling128): Fix the inconsistent behavior of ‘message’, ‘log’ and ‘msg’ in the next major version 1.0.0.
# File lib/fluent/plugin/out_google_cloud.rb, line 1754 def set_payload(resource_type, record, entry, is_json) # Only one of {text_payload, json_payload} will be set. text_payload = nil json_payload = nil # Use JSON if we found valid JSON, or text payload in the following # cases: # 1. This is an unstructured Container log and the 'log' key is available # 2. The only remaining key is 'message' if is_json json_payload = record elsif GKE_CONSTANTS[:resource_type] == resource_type && record.key?('log') text_payload = record['log'] elsif record.size == 1 && record.key?('message') text_payload = record['message'] else json_payload = record end if json_payload entry.json_payload = if @use_grpc struct_from_ruby(json_payload) else json_payload end elsif text_payload text_payload = text_payload.to_s entry.text_payload = if @use_grpc convert_to_utf8(text_payload) else text_payload end end end
Set regexp patterns to parse tags and logs.
# File lib/fluent/plugin/out_google_cloud.rb, line 1107 def set_regexp_patterns @compiled_kubernetes_tag_regexp = Regexp.new(@kubernetes_tag_regexp) if @kubernetes_tag_regexp @compiled_http_latency_regexp = /^\s*(?<seconds>\d+)(?<decimal>\.\d+)?\s*s\s*$/ end
# File lib/fluent/plugin/out_google_cloud.rb, line 1744 def struct_from_ruby(hash) ret = Google::Protobuf::Struct.new hash.each do |k, v| ret.fields[convert_to_utf8(k.to_s)] ||= value_from_ruby(v) end ret end
# File lib/fluent/plugin/out_google_cloud.rb, line 1347 def time_or_nil(ts_secs, ts_nanos) Time.at((Integer ts_secs), (Integer ts_nanos) / 1_000.0) rescue ArgumentError, TypeError nil end
# File lib/fluent/plugin/out_google_cloud.rb, line 1708 def value_from_ruby(value) ret = Google::Protobuf::Value.new case value when NilClass ret.null_value = 0 when Numeric ret.number_value = value when String ret.string_value = convert_to_utf8(value) when TrueClass ret.bool_value = true when FalseClass ret.bool_value = false when Google::Protobuf::Struct ret.struct_value = value when Hash ret.struct_value = struct_from_ruby(value) when Google::Protobuf::ListValue ret.list_value = value when Array ret.list_value = list_from_ruby(value) else @log.error "Unknown type: #{value.class}" raise Google::Protobuf::Error, "Unknown type: #{value.class}" end ret end
# File lib/fluent/plugin/out_google_cloud.rb, line 826 def write_html_response(data, conn, code, response) @log.info "#{conn.remote_host} - - " \ "#{Time.now.strftime('%d/%b/%Y:%H:%M:%S %z')} " \ "\"#{data.lines.first.strip}\" #{code} #{response.bytesize}" conn.write "HTTP/1.1 #{code}\r\n" conn.write "Content-Type: text/html\r\n" conn.write "Content-Length: #{response.bytesize}\r\n" conn.write "\r\n" conn.write response end
# File lib/fluent/plugin/out_google_cloud.rb, line 889 def write_request_via_grpc(entries:, log_name: '', resource: nil, labels: {}) client = api_client entries_count = entries.length client.write_log_entries( entries: entries, log_name: log_name, # Leave resource nil if it's nil. resource: if resource Google::Api::MonitoredResource.new( type: resource.type, labels: resource.labels.to_h ) end, labels: labels.map do |k, v| [k.encode('utf-8'), convert_to_utf8(v)] end.to_h, partial_success: true ) increment_successful_requests_count increment_ingested_entries_count(entries_count) # Let the user explicitly know when the first call succeeded, to # aid with verification and troubleshooting. unless @successful_call @successful_call = true @log.info 'Successfully sent gRPC to Stackdriver Logging API.' end rescue Google::Cloud::Error => e # GRPC::BadStatus is wrapped in error.cause. error = e.cause # See the mapping between HTTP status and gRPC status code at: # https://github.com/grpc/grpc/blob/master/src/core/lib/transport/status_conversion.cc case error # Server error, so retry via re-raising the error. when \ # HTTP status 500 (Internal Server Error). GRPC::Internal, # HTTP status 501 (Not Implemented). GRPC::Unimplemented, # HTTP status 503 (Service Unavailable). GRPC::Unavailable, # HTTP status 504 (Gateway Timeout). GRPC::DeadlineExceeded increment_retried_entries_count(entries_count, error.code) @log.debug "Retrying #{entries_count} log message(s) later.", error: error.to_s, error_code: error.code.to_s raise error # Most client errors indicate a problem with the request itself and # should not be retried. when \ # HTTP status 401 (Unauthorized). # These are usually solved via a `gcloud auth` call, or by modifying # the permissions on the Google Cloud project. GRPC::Unauthenticated, # HTTP status 404 (Not Found). GRPC::NotFound, # HTTP status 409 (Conflict). GRPC::Aborted, # HTTP status 412 (Precondition Failed). GRPC::FailedPrecondition, # HTTP status 429 (Too Many Requests). GRPC::ResourceExhausted, # HTTP status 499 (Client Closed Request). GRPC::Cancelled, # the remaining http codes in both 4xx and 5xx category. # It's debatable whether to retry or drop these log entries. # This decision is made to avoid retrying forever due to # client errors. GRPC::Unknown increment_failed_requests_count(error.code) increment_dropped_entries_count(entries_count, error.code) @log.warn "Dropping #{entries_count} log message(s)", error: error.to_s, error_code: error.code.to_s # As partial_success is enabled, valid entries should have been # written even if some other entries fail due to InvalidArgument or # PermissionDenied errors. Only invalid entries will be dropped. when \ # HTTP status 400 (Bad Request). GRPC::InvalidArgument, # HTTP status 403 (Forbidden). GRPC::PermissionDenied error_details_map = construct_error_details_map_grpc(e) if error_details_map.empty? increment_failed_requests_count(error.code) increment_dropped_entries_count(entries_count, error.code) @log.warn "Dropping #{entries_count} log message(s)", error: error.to_s, error_code: error.code.to_s else error_details_map.each do |(error_code, error_message), indexes| partial_errors_count = indexes.length increment_dropped_entries_count(partial_errors_count, error_code) entries_count -= partial_errors_count @log.warn "Dropping #{partial_errors_count} log message(s)", error: error_message, error_code: error_code.to_s end # Consider partially successful requests successful. increment_successful_requests_count increment_ingested_entries_count(entries_count) end else # Assume it's a problem with the request itself and don't retry. error_code = if error.respond_to?(:code) error.code else GRPC::Core::StatusCodes::UNKNOWN end increment_failed_requests_count(error_code) increment_dropped_entries_count(entries_count, error_code) @log.error "Unknown response code #{error_code} from the server," \ " dropping #{entries_count} log message(s)", error: error.to_s, error_code: error_code.to_s end # Got an unexpected error (not Google::Cloud::Error) from the # google-cloud-logging lib. rescue StandardError => e increment_failed_requests_count(GRPC::Core::StatusCodes::UNKNOWN) increment_dropped_entries_count(entries_count, GRPC::Core::StatusCodes::UNKNOWN) @log.error "Unexpected error type #{e.class.name} from the client" \ " library, dropping #{entries_count} log message(s)", error: e.to_s end
# File lib/fluent/plugin/out_google_cloud.rb, line 1021 def write_request_via_rest(entries:, log_name: '', resource: nil, labels: {}) client = api_client entries_count = entries.length client.write_entry_log_entries( Google::Apis::LoggingV2::WriteLogEntriesRequest.new( entries: entries, log_name: log_name, resource: resource, labels: labels, partial_success: true ), options: { api_format_version: '2' } ) increment_successful_requests_count increment_ingested_entries_count(entries_count) # Let the user explicitly know when the first call succeeded, to aid # with verification and troubleshooting. unless @successful_call @successful_call = true @log.info 'Successfully sent to Stackdriver Logging API.' end rescue Google::Apis::ServerError => e # 5xx server errors. Retry via re-raising the error. increment_retried_entries_count(entries_count, e.status_code) @log.debug "Retrying #{entries_count} log message(s) later.", error: e.to_s, error_code: e.status_code.to_s raise e rescue Google::Apis::AuthorizationError => e # 401 authorization error. # These are usually solved via a `gcloud auth` call, or by modifying # the permissions on the Google Cloud project. increment_failed_requests_count(e.status_code) increment_dropped_entries_count(entries_count, e.status_code) @log.warn "Dropping #{entries_count} log message(s)", error: e.to_s, error_code: e.status_code.to_s rescue Google::Apis::ClientError => e # 4xx client errors. Most client errors indicate a problem with the # request itself and should not be retried. error_details_map = construct_error_details_map(e) if error_details_map.empty? increment_failed_requests_count(e.status_code) increment_dropped_entries_count(entries_count, e.status_code) @log.warn "Dropping #{entries_count} log message(s)", error: e.to_s, error_code: e.status_code.to_s else error_details_map.each do |(error_code, error_message), indexes| partial_errors_count = indexes.length increment_dropped_entries_count(partial_errors_count, error_code) entries_count -= partial_errors_count @log.warn "Dropping #{partial_errors_count} log message(s)", error: error_message, error_code: "google.rpc.Code[#{error_code}]" end # Consider partially successful requests successful. increment_successful_requests_count increment_ingested_entries_count(entries_count) end end