MOON
Server: Apache
System: Linux vps.erhabenn.com.br 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User: sonne (1011)
PHP: 8.2.31
Disabled: NONE
Upload Files
File: //opt/microsoft/omsagent/plugin/out_oms_diag.rb
module Fluent

  class OutputOMSDiag < BufferedOutput

    Plugin.register_output('out_oms_diag', self)

    def initialize
      super
      require 'net/http'
      require 'net/https'
      require 'uri'
      require_relative 'omslog'
      require_relative 'oms_configuration'
      require_relative 'oms_common'
      require_relative 'oms_diag_lib'
      require_relative 'agent_telemetry_script'
    end

    config_param :omsadmin_conf_path, :string, :default => '/etc/opt/microsoft/omsagent/conf/omsadmin.conf'
    config_param :cert_path, :string, :default => '/etc/opt/microsoft/omsagent/certs/oms.crt'
    config_param :key_path, :string, :default => '/etc/opt/microsoft/omsagent/certs/oms.key'
    config_param :proxy_conf_path, :string, :default => '/etc/opt/microsoft/omsagent/proxy.conf'
    config_param :compress, :bool, :default => true
    config_param :run_in_background, :bool, :default => false

    def configure(conf)
      super
    end

    def start
      super
      @proxy_config = OMS::Configuration.get_proxy_config(@proxy_conf_path)
    end

    def shutdown
      super
      OMS::BackgroundJobs.instance.cleanup
    end

    def handle_record(ipname, record)
      @log.trace "Handling diagnostic records for ipname : #{ipname}"
      extra_headers = {
        OMS::CaseSensitiveString.new('x-ms-client-request-retry-count') => "#{@num_errors}"
      }
      req = OMS::Common.create_ods_request(OMS::Configuration.diagnostic_endpoint.path, record, @compress, extra_headers)
      unless req.nil?
        http = OMS::Common.create_ods_http(OMS::Configuration.diagnostic_endpoint, @proxy_config)
        start = Time.now

        # This method will raise on failure alerting the engine to retry sending this data
        OMS::Common.start_request(req, http)

        ends = Time.now
        time = ends - start
        count = record[OMS::Diag::RECORD_DATAITEMS].size
        @log.debug "Success sending diagnotic logs #{ipname} x #{count} in #{time.round(2)}s"
        return OMS::Telemetry.push_qos_event(OMS::SEND_BATCH, "true", "", ipname, record, count, time)
      end
    rescue OMS::RetryRequestException => e
      @log.info "Encountered retryable exception. Will retry sending diagnostic data later."
      @log.debug "Error with diagnostic log:'#{e}'"
      # Re-raise the exception to inform the fluentd engine we want to retry sending this chunk of data later.
      raise e.message
    rescue => e
      # We encountered something unexpected. We drop the data because
      # if bad data caused the exception, the engine will continuously
      # try and fail to resend it. (Infinite failure loop)
      msg = "Unexpected exception, dropping diagnostic data. Error:'#{e}'"
      OMS::Log.error_once(msg)
      return msg
    end

    # This method is called when an event reaches to Fluentd.
    # Convert the event to a raw string.
    def format(tag, time, record)
      if record != {}
        return record.to_msgpack
      else
        return ""
      end
    end

    def self_write(chunk)
      # ipname to dataitems array hash
      ipnameRecords = Hash.new

      # Aggregation based on ipname
      chunk.msgpack_each do |dataitem|
        ipname = OMS::Diag::DEFAULT_IPNAME
        if dataitem.is_a?(Hash)
          if dataitem.key?(OMS::Diag::DI_KEY_IPNAME)
            ipname = dataitem[OMS::Diag::DI_KEY_IPNAME]
          end
          if ipnameRecords.key?(ipname)
            ipnameRecords[ipname] << dataitem
          else
            ipnameRecords[ipname] = [dataitem]
          end
        end
      end

      # getting diag records out of aggregated dataitems for serialization
      ret = []
      ipnameRecords.each do |ipname, dataitemArray|
        OMS::Diag.ProcessDataItemsPostAggregation(dataitemArray, OMS::Configuration.agent_id)
        records = OMS::Diag.CreateDiagRecord(dataitemArray, ipname)
        ret << handle_record(ipname, records)
        ret << {'source': ipname, 'event':  handle_record(ipname, records)}
      end
      return ret
    end

    def write(chunk)
      # Quick exit if we are missing something
      if !OMS::Configuration.load_configuration(omsadmin_conf_path, cert_path, key_path)
        raise OMS::RetryRequestException, 'Missing configuration. Make sure to onboard.'
      end

      if run_in_background
        OMS::BackgroundJobs.instance.run_job_and_wait { self_write(chunk) }
      else
        self_write(chunk)
      end
    end

  end

end