MOON
Server: Apache
System: Linux vps.erhabenn.com.br 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User: sonne (1011)
PHP: 8.2.31
Disabled: NONE
Upload Files
File: //opt/microsoft/omsagent/plugin/in_auoms.rb
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fileutils'
require 'socket'

require 'cool.io'
require 'yajl'

require 'fluent/input'
require 'fluent/event'

module Fluent
  class AuOMSInput < Input
    Plugin.register_input('auoms', self)

    config_param :blocking_timeout, :time, default: 0.5

    desc 'Tag of the output events.'
    config_param :tag, :string, default: nil
    desc 'The path to your Unix Domain Socket.'
    config_param :path, :string, default: nil
    desc 'The backlog of Unix Domain Socket.'
    config_param :backlog, :integer, default: nil

    def start
      @loop = Coolio::Loop.new
      @lsock = listen
      @loop.attach(@lsock)
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      @loop.watchers.each {|w| w.detach }
      @loop.stop
      @lsock.close
      @thread.join
    end

    def listen
      if File.exist?(@path)
        File.unlink(@path)
      end
      FileUtils.mkdir_p File.dirname(@path)
      @log.info "Listening auoms socket at #{@path}"
      s = Coolio::UNIXServer.new(@path, Handler, @log, method(:on_message))
      s.listen(@backlog) unless @backlog.nil?
      s
    end

    def run
      @loop.run(@blocking_timeout)
    rescue
      @log.error "unexpected auoms error", error: $!.to_s
      @log.error_backtrace
    end

    private

    # message Message {
    #   1: float time
    #   2: object record
    # }
    def on_message(msg)
      return if !msg.is_a?(Array)
      return if msg.size < 2
      time = msg[0]
      time = Engine.now if time.to_i == 0
      record = msg[1]
      return if record.nil?

      router.emit(@tag, time, record)
    end

    class Handler < Coolio::Socket
      def initialize(io, log, on_message)
        super(io)
        @on_message = on_message
        @log = log
      end

      def on_connect
        @log.trace { "Connected auoms socket object_id=#{self.object_id}" }
      end

      def on_read(data)
        first = data[0]
        if first == '{' || first == '['
          m = method(:on_read_json)
          @y = Yajl::Parser.new
          @y.on_parse_complete = @on_message
        else
          m = method(:on_read_msgpack)
          @u = Fluent::Engine.msgpack_factory.unpacker
        end

        (class << self; self; end).module_eval do
          define_method(:on_read, m)
        end
        m.call(data)
      end

      def on_read_json(data)
        @y << data
      rescue
        @log.error "Unexpected auoms error while reading json", error: $!.to_s, data: data
        @log.error_backtrace
        close
      end

      def on_read_msgpack(data)
        @u.feed_each(data, &@on_message)
      rescue
        @log.error "Unexpected auoms error while reading msgpack", error: $!.to_s
        @log.error_backtrace
        close
      end

      def on_close
        @log.trace { "Closed auoms socket object_id=#{self.object_id}" }
      end
    end
  end
end