MOON
Server: Apache
System: Linux vps.erhabenn.com.br 3.10.0-1160.119.1.el7.tuxcare.els2.x86_64 #1 SMP Mon Jul 15 12:09:18 UTC 2024 x86_64
User: sonne (1011)
PHP: 8.2.31
Disabled: NONE
Upload Files
File: //opt/microsoft/omsagent/tst/modules/helpers.py
import errno
import os
import platform
import ssl
import subprocess

from error_codes import *
from errors      import error_info, print_errors

# urlopen() in different packages in Python 2 vs 3
try:
    from urllib.request import urlopen
except ImportError:
    from urllib2 import urlopen

CONF_PATH = "/etc/opt/microsoft/omsagent/conf/omsadmin.conf"

general_info = dict()

def add_geninfo(key, value):
    general_info[key] = value
    return

def geninfo_lookup(key):
    try:
        val = general_info[key]
    except KeyError:
        updated_geninfo = update_geninfo_all()
        if (updated_geninfo != NO_ERROR):
            print_errors(updated_geninfo)
            return None
        val = general_info[key]
    if (val == ''):
        return None
    return val



# All functions that update general_info

# CPU Bits
def get_os_bits():
    cpu_info = subprocess.check_output(['lscpu'], universal_newlines=True)
    cpu_opmodes = (cpu_info.split('\n'))[1]
    cpu_bits = cpu_opmodes[-6:]
    general_info['CPU_BITS'] = cpu_bits
    return cpu_bits

dist_to_id = {'redhat' : 'rhel',
              'centos' : 'centos',
              'red hat' : 'rhel',
              'oracle' : 'oracle',
              'debian' : 'debian',
              'ubuntu' : 'ubuntu',
              'suse' : 'sles',
              'sles' : 'sles',
              'amzn' : 'amzn'}



# OS Info
def get_os_version():
    # get vm info
    try:
        (vm_dist, vm_ver, vm_id) = platform.linux_distribution()
    except AttributeError:
        (vm_dist, vm_ver, vm_id) = platform.dist()
    # if above didn't work, get vm info through os_release
    if (not vm_dist and not vm_ver):
        try:
            with open('/etc/os-release', 'r') as os_file:
                for line in os_file:
                    parsed_line = line.split('=')
                    if (parsed_line[0] == 'ID'):
                        vm_dist = (parsed_line[1].split('-'))[0]
                        vm_dist = (vm_dist.replace('\"','')).replace('\n','')
                    elif (parsed_line[0] == 'VERSION_ID'):
                        vm_ver = (parsed_line[1].split('.'))[0]
                        vm_ver = (vm_ver.replace('\"','')).replace('\n','')
        except:
            return None

    # update general_info
    general_info['OS_ID'] = vm_dist
    general_info['OS_VERSION_ID'] = vm_ver
    for dist in dist_to_id.keys():
        if (vm_dist.lower().startswith(dist)):
            general_info['OS_READABLE_ID'] = dist_to_id[dist]
    return (vm_dist, vm_ver)



# Package Manager
def update_pkg_manager():
    # try dpkg
    try:
        is_dpkg = subprocess.check_output(['which', 'dpkg'], \
                    universal_newlines=True, stderr=subprocess.STDOUT)
        if (is_dpkg != ''):
            general_info['PKG_MANAGER'] = 'dpkg'
            return NO_ERROR
    except subprocess.CalledProcessError:
        pass
    # try rpm
    try:
        is_rpm = subprocess.check_output(['which', 'rpm'], \
                    universal_newlines=True, stderr=subprocess.STDOUT)
        if (is_rpm != ''):
            general_info['PKG_MANAGER'] = 'rpm'
            return NO_ERROR
    except subprocess.CalledProcessError:
        pass
    # neither
    return ERR_PKG_MANAGER



# Package Info
def get_dpkg_pkg_version(pkg):
    try:
        dpkg_info = subprocess.check_output(['dpkg', '-s', pkg], universal_newlines=True,\
                                            stderr=subprocess.STDOUT)
        dpkg_lines = dpkg_info.split('\n')
        for line in dpkg_lines:
            if (line.startswith('Package: ') and not line.endswith(pkg)):
                # wrong package
                return None
            if (line.startswith('Status: ') and not line.endswith('installed')):
                # not properly installed
                return None
            if (line.startswith('Version: ')):
                version = (line.split())[-1]
                general_info['{0}_VERSION'.format(pkg.upper())] = version
                return version
        return None
    except subprocess.CalledProcessError:
        return None

def get_rpm_pkg_version(pkg):
    try:
        rpm_info = subprocess.check_output(['rpm', '-qi', pkg], universal_newlines=True,\
                                            stderr=subprocess.STDOUT)
        if ("package {0} is not installed".format(pkg) in rpm_info):
            # didn't find package
            return None
        rpm_lines = rpm_info.split('\n')
        for line in rpm_lines:
            # parse line
            # note: parsing is weird bc rpm puts various information into two columns
            parsed_line = line.split()
            if (parsed_line[0] == 'Name'):
                # ['Name', ':', name, 'Relocations', ':', relocations]
                name = parsed_line[2]
                if (name != pkg):
                    # wrong package
                    return None
            if (parsed_line[0] == 'Version'):
                # ['Version', ':', version, 'Vendor', ':', 'MSFT']
                version = parsed_line[2]
                general_info['{0}_VERSION'.format(pkg.upper())] = version
                return version
        return None
    except subprocess.CalledProcessError:
        return None


# Recent OMS Version
def get_curr_oms_version(OMSAGENT_URL):
    try:
        doc_file = urlopen(OMSAGENT_URL)
        for line in doc_file.readlines():
            line = line.decode('utf8')
            if line.startswith("omsagent | "):
                parsed_line = line.split(' | ') # [package, version, description]
                general_info['UPDATED_OMS_VERSION'] = parsed_line[1]
                return (parsed_line[1], NO_ERROR)
        return (None, None)

    except Exception as e:
        return (None, e)



# omsadmin.conf
def update_omsadmin():
    try:
        with open(CONF_PATH, 'r') as conf_file:
            for line in conf_file:
                parsed_line = (line.rstrip('\n')).split('=')
                general_info[parsed_line[0]] = '='.join(parsed_line[1:])
        return NO_ERROR
    except IOError as e:
        if (e.errno == errno.EACCES):
            error_info.append((CONF_PATH,))
            return ERR_SUDO_PERMS
        elif (e.errno == errno.ENOENT):
            error_info.append(('file', CONF_PATH))
            return ERR_FILE_MISSING
        else:
            raise



# service controller
def check_service_controller():
    # check systemd
    try:
        is_systemctl = subprocess.check_output(['which', 'systemctl'], \
                        universal_newlines=True, stderr=subprocess.STDOUT)
        general_info['SERVICE_CONTROLLER'] = 'systemctl'
        return NO_ERROR
    except subprocess.CalledProcessError:
        # systemd not on VM, try other service controllers
        INVOKE_RC_PATH = "/usr/sbin/invoke-rc.d"
        SERVICE_PATH = "/sbin/service"
        SYSTEMCTL_PATH = "/bin/systemctl"
        def is_exec(fp):
            return (os.path.isfile(fp) and os.access(fp, os.X_OK))
        # try invoke-rc.d
        if (is_exec(INVOKE_RC_PATH)):
            general_info['SERVICE_CONTROLLER'] = INVOKE_RC_PATH
            return NO_ERROR
        # try service
        elif (is_exec(SERVICE_PATH)):
            general_info['SERVICE_CONTROLLER'] = SERVICE_PATH
            return NO_ERROR
        # try systemctl (via direct path)
        elif (is_exec(SYSTEMCTL_PATH)):
            general_info['SERVICE_CONTROLLER'] = SYSTEMCTL_PATH
            return NO_ERROR
        # none worked
        else:
            return ERR_SERVICE_CONTROLLER



# update all
def update_geninfo_all():
    # cpu_bits
    bits = get_os_bits()
    if (bits not in ['32-bit', '64-bit']):
        return ERR_BITS
    # os info
    os = get_os_version()
    if (os == None):
        return ERR_FINDING_OS
    # package manager
    pkg = update_pkg_manager()
    if (pkg != NO_ERROR):
        return pkg
    # dpkg packages
    if (general_info['PKG_MANAGER'] == 'dpkg'):
        if (get_dpkg_pkg_version('omsconfig') == None):
            return ERR_OMSCONFIG
        if (get_dpkg_pkg_version('omi') == None):
            return ERR_OMI
        if (get_dpkg_pkg_version('scx') == None):
            return ERR_SCX
        if (get_dpkg_pkg_version('omsagent') == None):
            return ERR_OMS_INSTALL
    # rpm packages
    elif (general_info['PKG_MANAGER'] == 'rpm'):
        if (get_rpm_pkg_version('omsconfig') == None):
            return ERR_OMSCONFIG
        if (get_rpm_pkg_version('omi') == None):
            return ERR_OMI
        if (get_rpm_pkg_version('scx') == None):
            return ERR_SCX
        if (get_rpm_pkg_version('omsagent') == None):
            return ERR_OMS_INSTALL
    # omsadmin info
    omsadmin = update_omsadmin()
    if (omsadmin != NO_ERROR):
        return omsadmin
    # service controller
    service_controller = check_service_controller()
    if (service_controller != NO_ERROR):
        return service_controller
    # all successful
    return NO_ERROR