File: //opt/microsoft/omsagent/plugin/out_oms_diag.rb
module Fluent
class OutputOMSDiag < BufferedOutput
Plugin.register_output('out_oms_diag', self)
def initialize
super
require 'net/http'
require 'net/https'
require 'uri'
require_relative 'omslog'
require_relative 'oms_configuration'
require_relative 'oms_common'
require_relative 'oms_diag_lib'
require_relative 'agent_telemetry_script'
end
config_param :omsadmin_conf_path, :string, :default => '/etc/opt/microsoft/omsagent/conf/omsadmin.conf'
config_param :cert_path, :string, :default => '/etc/opt/microsoft/omsagent/certs/oms.crt'
config_param :key_path, :string, :default => '/etc/opt/microsoft/omsagent/certs/oms.key'
config_param :proxy_conf_path, :string, :default => '/etc/opt/microsoft/omsagent/proxy.conf'
config_param :compress, :bool, :default => true
config_param :run_in_background, :bool, :default => false
def configure(conf)
super
end
def start
super
@proxy_config = OMS::Configuration.get_proxy_config(@proxy_conf_path)
end
def shutdown
super
OMS::BackgroundJobs.instance.cleanup
end
def handle_record(ipname, record)
@log.trace "Handling diagnostic records for ipname : #{ipname}"
extra_headers = {
OMS::CaseSensitiveString.new('x-ms-client-request-retry-count') => "#{@num_errors}"
}
req = OMS::Common.create_ods_request(OMS::Configuration.diagnostic_endpoint.path, record, @compress, extra_headers)
unless req.nil?
http = OMS::Common.create_ods_http(OMS::Configuration.diagnostic_endpoint, @proxy_config)
start = Time.now
# This method will raise on failure alerting the engine to retry sending this data
OMS::Common.start_request(req, http)
ends = Time.now
time = ends - start
count = record[OMS::Diag::RECORD_DATAITEMS].size
@log.debug "Success sending diagnotic logs #{ipname} x #{count} in #{time.round(2)}s"
return OMS::Telemetry.push_qos_event(OMS::SEND_BATCH, "true", "", ipname, record, count, time)
end
rescue OMS::RetryRequestException => e
@log.info "Encountered retryable exception. Will retry sending diagnostic data later."
@log.debug "Error with diagnostic log:'#{e}'"
# Re-raise the exception to inform the fluentd engine we want to retry sending this chunk of data later.
raise e.message
rescue => e
# We encountered something unexpected. We drop the data because
# if bad data caused the exception, the engine will continuously
# try and fail to resend it. (Infinite failure loop)
msg = "Unexpected exception, dropping diagnostic data. Error:'#{e}'"
OMS::Log.error_once(msg)
return msg
end
# This method is called when an event reaches to Fluentd.
# Convert the event to a raw string.
def format(tag, time, record)
if record != {}
return record.to_msgpack
else
return ""
end
end
def self_write(chunk)
# ipname to dataitems array hash
ipnameRecords = Hash.new
# Aggregation based on ipname
chunk.msgpack_each do |dataitem|
ipname = OMS::Diag::DEFAULT_IPNAME
if dataitem.is_a?(Hash)
if dataitem.key?(OMS::Diag::DI_KEY_IPNAME)
ipname = dataitem[OMS::Diag::DI_KEY_IPNAME]
end
if ipnameRecords.key?(ipname)
ipnameRecords[ipname] << dataitem
else
ipnameRecords[ipname] = [dataitem]
end
end
end
# getting diag records out of aggregated dataitems for serialization
ret = []
ipnameRecords.each do |ipname, dataitemArray|
OMS::Diag.ProcessDataItemsPostAggregation(dataitemArray, OMS::Configuration.agent_id)
records = OMS::Diag.CreateDiagRecord(dataitemArray, ipname)
ret << handle_record(ipname, records)
ret << {'source': ipname, 'event': handle_record(ipname, records)}
end
return ret
end
def write(chunk)
# Quick exit if we are missing something
if !OMS::Configuration.load_configuration(omsadmin_conf_path, cert_path, key_path)
raise OMS::RetryRequestException, 'Missing configuration. Make sure to onboard.'
end
if run_in_background
OMS::BackgroundJobs.instance.run_job_and_wait { self_write(chunk) }
else
self_write(chunk)
end
end
end
end