File: //opt/microsoft/omsagent/plugin/filter_syslog.rb
module Fluent
class SyslogFilter < Filter
Fluent::Plugin.register_filter('filter_syslog', self)
def initialize
super
require 'socket'
require_relative 'omslog'
require_relative 'oms_common'
end
# Interval in seconds to refresh the cache
config_param :ip_cache_refresh_interval, :integer, :default => 300
def configure(conf)
super
@ip_cache = OMS::IPcache.new @ip_cache_refresh_interval
end
def start
super
end
def shutdown
super
end
# For perf debugging
def check_eps()
if @eps_counter == nil
@eps_counter = 0
@eps_thread = Thread.new {
current_time = Time.now
previous_time = current_time
loop {
current_time = Time.now
diff_time_ms = (current_time - previous_time)
if diff_time_ms >= 1
$log.info("EPS #{@eps_counter}, for #{diff_time_ms} second")
@eps_counter = 0
end
previous_time = current_time
sleep 1
}
}
end
@eps_counter += 1
end
def filter(tag, time, record)
# check_eps()
pid = record["pid"]
hostname = record["host"]
tags = tag.split('.') # The tag should looks like this : oms.syslog.authpriv.notice
new_record = {
'ident' => record['ident'],
# Use Time.now, because it is the only way to get subsecond precision in version 0.12.
# The time may be slightly in the future from the ingestion time.
'Timestamp' => OMS::Common::fast_utc_to_iso8601_format(Time.now.utc),
'EventTime' => OMS::Common::fast_utc_to_iso8601_format(Time.at(time).utc),
'Host' => hostname,
'HostIP' => 'Unknown IP',
'Message' => record['message']
}
new_record["ProcessId"] = pid if pid
host_ip = @ip_cache.get_ip(hostname)
if host_ip.nil?
OMS::Log.warn_once("Failed to get the IP for #{hostname}.")
else
new_record["HostIP"] = host_ip
end
if tags.size == 4
new_record["Facility"] = tags[2]
new_record["Severity"] = tags[3]
else
$log.error "The syslog tag does not have 4 parts #{tag}"
end
wrapper = {
"DataType"=>"LINUX_SYSLOGS_BLOB",
"IPName"=>"LOGMANAGEMENT",
"DataItems"=>[new_record]
}
wrapper
end
end
end