Viewing file: plesk.py (26.7 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals
import os import re import xml.etree.ElementTree as ET from collections import defaultdict from functools import wraps from traceback import format_exc import time
from typing import List, Any, Tuple, Dict, AnyStr, Optional # NOQA
from clcommon.const import Feature from clcommon.utils import find_module_param_in_config
try: import MySQLdb except ImportError: MySQLdb = None else: from MySQLdb.cursors import DictCursor
from clcommon.cpapi.cpapiexceptions import ( NotSupported, NoPanelUser, NoPackage, NoDomain, DuplicateData ) from clcommon import ClPwd from clcommon.clfunc import uid_max from clcommon.cpapi.GeneralPanel import GeneralPanelPluginV1 from clcommon.utils import run_command, ExternalProgramFailed
PSA_SHADOW_PATH = "/etc/psa/.psa.shadow" SUPPORTED_CPINFO = {'cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'} UID_MAX = uid_max()
__cpname__ = 'Plesk'
# WARN: Probably will be deprecated for our "official" plugins. # See pluginlib.detect_panel_fast() def detect(): return os.path.isfile('/usr/local/psa/version')
def db_access(_pass_path=PSA_SHADOW_PATH): access = dict() access['login'] = 'admin' with open(_pass_path, 'r') as f: access['pass'] = f.read().strip() return access
def query_sql(query, data=None, _access=None, _dbname='psa', as_dict=False): """ Return the result of a Plesk database query
:param query: SQL query string with possible parameters :param data: arguments for the SQL parameter insertion :param _access: database authentication data :param _dbname: the name of the database :param as_dict: controls the format of the output data :type query: str :type _access: dict :type as_dict: bool :return: Tuple of rows according to the query in the format specified by as_dict :rtype: tuple(tuple) or tuple(dict) """ # Example of returned data: # >>> query_sql('SELECT login from sys_users') # ((u'cltest',), (u'cltest3',), (u'user2',), (u'user1tst',)) # >>> query_sql('SELECT login from sys_users', as_dict=True) # ({'login': u'cltest'}, # {'login': u'cltest3'}, # {'login': u'user2'}, # {'login': u'user1tst'}) if not MySQLdb: raise NoPackage( 'Can not connect to database; MySQL-client libraries package not installed.' ) access = _access or db_access() dbhost = access.get('host', 'localhost') dblogin = access['login'] dbpass = access['pass'] db = MySQLdb.connect(host=dbhost, user=dblogin, passwd=dbpass, db=_dbname, use_unicode=True, charset='utf8') try: if as_dict: cursor = db.cursor(DictCursor) else: cursor = db.cursor() if data is not None: cursor.execute(query, data) else: cursor.execute(query) data = cursor.fetchall() finally: db.close() return data
def cpusers(_access=None, _dbname='psa'): sql = r"SELECT login FROM sys_users;" cpusers_lst = [fetched_one[0] for fetched_one in query_sql(sql)] return cpusers_lst
def resellers(): sql = "SELECT clients.login FROM clients WHERE clients.type='reseller'" return [cplogin for (cplogin, ) in query_sql(sql)]
def admins(): sql = "SELECT clients.login FROM clients WHERE clients.type='admin'" return set([cplogin for (cplogin, ) in query_sql(sql)])
def is_reseller(username): sql = "SELECT clients.type FROM clients WHERE clients.login=%s" try: return query_sql(sql, (username,))[0][0] == 'reseller' except IndexError: return False
def _sys_users_info(sys_login, keyls): # type: (Any[str, None], Tuple[str]) -> List[Tuple] # Templates.name can be None and it is ok mapping = { 'cplogin': 'sys_users.login AS cplogin', 'mail': 'clients.email AS email', 'reseller': 'reseller.login AS reseller', 'dns': 'domains.name AS dns', 'locale': 'clients.locale AS local', 'package': 'Templates.name AS package' }
select_query = ', '.join([mapping[key] for key in keyls]) sql = r"""SELECT {} FROM sys_users JOIN hosting ON hosting.sys_user_id=sys_users.id JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0 JOIN clients ON clients.id=domains.cl_id JOIN clients reseller ON reseller.id=domains.vendor_id LEFT JOIN Subscriptions ON Subscriptions.object_type = "domain" AND domains.id = Subscriptions.object_id LEFT JOIN PlansSubscriptions ON PlansSubscriptions.subscription_id = Subscriptions.id LEFT JOIN Templates AS Templates ON Templates.id = PlansSubscriptions.plan_id AND "domain" = Templates.type """.format(select_query)
# make query like "where x in (%s, %s, %s, ...)" if isinstance(sys_login, (list, tuple)): sql += r" WHERE sys_users.login IN ({})".format(','.join(['%s'] * len(sys_login)))
users = query_sql(sql, data=sys_login) return users
def _resellers_info(sys_login, keyls): # type: (Any[str, None], Tuple[str]) -> List[Tuple] # items with 'NULL' are not available for this panel mapping = { 'cplogin': 'clients.login AS cplogin', 'mail': 'clients.email AS email', 'reseller': 'NULL as reseller', 'dns': 'NULL as dns', 'locale': 'clients.locale AS local', 'package': 'NULL as package' }
select_query = ', '.join([mapping[key] for key in keyls]) sql = "SELECT {} FROM clients WHERE clients.type IN (\"reseller\", \"admin\")".format(select_query)
# make query like "where x in (%s, %s, %s, ...)" if isinstance(sys_login, (list, tuple)): sql += r" AND clients.login IN ({})".format(','.join(['%s'] * len(sys_login)))
users = query_sql(sql, data=sys_login) return users
def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True): """ Get info about user[s] or about reseller[s]. :param str|None cpuser: get info about specified login, None for all :param list|tuple keyls: keys to return :param bool search_sys_users: work with sys users or with resellers :rtype: tuple[tuple] """ if isinstance(cpuser, str): cpuser = [cpuser]
# just for developers for key in keyls: if key not in SUPPORTED_CPINFO: raise NotSupported('Key {} is not supported for this control panel. ' 'Available keys: {}'.format(key, SUPPORTED_CPINFO))
if search_sys_users: return _sys_users_info(cpuser, keyls) return _resellers_info(cpuser, keyls)
def get_admin_email(*args, **kwargs): try: return query_sql(r"SELECT val FROM misc WHERE param='admin_email';")[0][0] except IndexError: return None
def docroot(domain): # type: (str) -> Any[None, Tuple[str, str]] sql = r""" SELECT hosting.www_root, sys_users.login FROM hosting JOIN domains ON hosting.dom_id=domains.id JOIN sys_users ON hosting.sys_user_id=sys_users.id WHERE domains.name=%s """ try: return query_sql(sql, data=(domain,))[0] except IndexError: raise NoDomain('Cannot obtain document root for {}'.format(domain))
def reseller_users(resellername): """ Return reseller users :param resellername: reseller name; return empty list if None :return list[str]: user names list """ if resellername is None: return [] sql = """ SELECT sys_users.login FROM clients as reseller JOIN domains ON domains.vendor_id=reseller.id JOIN hosting ON hosting.dom_id=domains.id JOIN sys_users ON hosting.sys_user_id=sys_users.id WHERE domains.webspace_id=0 AND reseller.login=%s; """ return [sys_login for (sys_login,) in query_sql(sql, data=(resellername,))]
def memoize(f):
cache = {'userdomains_map': {}}
@wraps(f) def wrapper(cpuser, *args, **kwargs): if cpuser not in cache['userdomains_map']: cache['userdomains_map'] = f(cpuser, *args, **kwargs) return cache['userdomains_map'][cpuser]
return wrapper
@memoize def userdomains(cpuser, _access=None, _dbname='psa'): """ Return domains of given user
:param str cpuser: Username :param str _dbname: Database name where is located data :return: List of domains pairs such as (domain_name, None) to be suitable for domain_lib, starting from a main domain. :rtype: list of tuples :raises NoPanelUser: User is not found in Plesk database. """ # WARN: ORDER BY columns must be present in SELECT columns for newer Mysql # webspace_id == 0 is main domain sql = r""" SELECT DISTINCT su.login, d.name, h.www_root, d.webspace_id FROM domains as d, hosting as h, sys_users as su WHERE h.sys_user_id = su.id AND h.dom_id = d.id ORDER BY d.webspace_id ASC; """ # data: # ( # (u'customer1', u'customer1.org', 10L), # (u'customer1', u'mk.customer1.org.customer1.org', 10L) # ) data = query_sql(sql, as_dict=True) # _user_to_domains_map: # { 'user1': [('user1.org', '/var/www/vhosts/user1.com/httpdocs'), # ('mk.user1.org', '/var/www/vhosts/user1.com/mk.user1.org')] } _user_to_domains_map = defaultdict(list) for data_dict in data: _user_to_domains_map[data_dict['login']].append((data_dict['name'], data_dict['www_root'])) if cpuser not in _user_to_domains_map: raise NoPanelUser('User {} not found in the database'.format(cpuser)) return _user_to_domains_map
def domain_owner(domain, _access=None, _dbname='psa'): """ Return domain owner :param str domain: Domain/sub-domain/add-domain name :param str _dbname: Database name where is located data :return: user name or None if domain not found :rtype: str """ sql = r""" SELECT DISTINCT `su`.`login` FROM `sys_users` `su`, `hosting` `h`, `domains` `d`, `domains` `sd` WHERE `h`.`sys_user_id`=`su`.`id` AND `h`.`dom_id`=`d`.`id` AND (`d`.`name`=%s OR `d`.`id`=`sd`.`webspace_id` AND `sd`.`name`=%s)""" users_list = [u[0] for u in query_sql(sql, (domain, domain))] # FIXME: how this possible? if len(users_list) > 1: raise DuplicateData("domain %s belongs to few users: [%s]" % ( domain, ','.join(users_list))) if len(users_list) == 0: return None return users_list[0]
def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False): raise NotSupported('Getting binding credentials in the database to the user name in the system is not currently ' 'supported.')
def homedirs(_sysusers=None, _cpusers=None): """ Detects and returns list of folders contained the home dirs of users of the Plesk
:param str|None _sysusers: for testing :param str|None _cpusers: for testing :return: list of folders, which are parent of home dirs of users of the panel """ homedirs = []
if _cpusers is None: try: results = cpusers() except NoPackage: results = None else: results = _cpusers
users = [] if results is not None: users = [line[0] for line in results]
# Plesk assumes MIN_UID as 10000 clpwd = ClPwd(10000) users_dict = clpwd.get_user_dict()
# for testing only if isinstance(_sysusers, (list, tuple)): class pw: def __init__(self, name, dir): self.pw_name = name self.pw_dir = dir
users_dict = {} for (name, dir) in _sysusers: users_dict[name] = pw(name, dir)
for user_name in users_dict: if len(users) and user_name not in users: continue homedir = os.path.dirname(users_dict[user_name].pw_dir) if homedir not in homedirs: homedirs.append(homedir)
return homedirs
def get_user_login_url(domain): return 'https://{domain}:8443'.format(domain=domain)
def get_reseller_id_pairs(): """ Plesk has no user associated with reseller, but we need some id for out internal purposes. Let's take it from database. """ sql = """SELECT clients.login, clients.id + %s FROM clients WHERE clients.type='reseller'""" return dict(query_sql(sql, data=[UID_MAX]))
def reseller_domains(resellername): # type: (str) -> Dict[str, str] if not resellername: return dict()
sql = r"""SELECT sys_users.login AS cplogin, domains.name AS dns FROM sys_users JOIN hosting ON hosting.sys_user_id=sys_users.id JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0 JOIN clients reseller ON reseller.id=domains.vendor_id WHERE reseller.login=%s """
users = query_sql(sql, data=[resellername]) return dict(users)
def _extract_xml_value(xml_string, key): """ Plesk stores some information in simple xml formatted strings. """ try: elem = ET.fromstring(xml_string).find(key) except ET.ParseError: return None else: return elem.text if elem is not None else None
def get_domains_php_info(): """ Plesk stores the information about the handler in xml format. Return the php version info for each domain. Example output: {'cltest.com': {'handler_type': 'fpm', 'php_version_id': 'plesk-php71-fpm', 'username': 'cltest'},` 'cltest2.com': {'handler_type': 'fastcgi', 'php_version_id': 'x-httpd-lsphp-custom', 'username': 'kek_2'}, 'cltest3.com': {'handler_type': 'fastcgi', 'php_version_id': 'plesk-php56-fastcgi', 'username': 'cltest3'}, 'omg.kek': {'handler_type': 'fastcgi', 'php_version_id': 'plesk-php71-fastcgi', 'username': 'cltest'}} :rtype: dict[str, dict] """ sql = r""" SELECT sys_users.login, d.name, h.php_handler_id, handlers.value FROM domains AS d JOIN hosting AS h ON h.dom_id=d.id JOIN sys_users ON h.sys_user_id=sys_users.id JOIN (SELECT ServiceNodeEnvironment.* FROM ServiceNodeEnvironment WHERE (serviceNodeId = '1' AND section = 'phphandlers')) AS handlers ON handlers.name=h.php_handler_id WHERE h.php='true' """
# Php hanlder info xml example: # # <?xml version="1.0" encoding="UTF-8"?> # <handler> # <id>plesk-php71-fpm</id> # <type>fpm</type> # <typeName>FPM application</typeName> # <version>7.1</version> # <fullVersion>7.1.22</fullVersion> # <displayname>7.1.22</displayname> # <path>/opt/plesk/php/7.1/sbin/php-fpm</path> # <clipath>/opt/plesk/php/7.1/bin/php</clipath> # <phpini>/opt/plesk/php/7.1/etc/php.ini</phpini> # <custom>true</custom> # <registered>true</registered> # <service>plesk-php71-fpm</service> # <poold>/opt/plesk/php/7.1/etc/php-fpm.d</poold> # <outdated /> # </handler>
domains_php_info = query_sql(sql) return { domain: { 'username': username, 'php_version_id': php_handler_id, 'handler_type': _extract_xml_value(handler_xml, key='type') or 'unknown' } for username, domain, php_handler_id, handler_xml in domains_php_info }
def get_main_username_by_uid(uid: int) -> str: """ Get "main" panel username by uid. :param uid: uid :return Username """ if uid == 0: return 'root' try: _clpwd = ClPwd() pwd_list = _clpwd.get_pw_by_uid(uid) for user_pwd in pwd_list: username = user_pwd.pw_name try: userdomains(username) return username except NoPanelUser: pass except ClPwd.NoSuchUserException: pass return 'N/A'
class PanelPlugin(GeneralPanelPluginV1): def __init__(self): super(PanelPlugin, self).__init__() self.HTTPD_MPM_CONFIG = '/etc/httpd/conf.modules.d/01-cgi.conf' # Defaults of MaxRequestWorkers for all possible mpm modules self.MPM_MODULES = { "prefork": 256, "worker": 400, "event": 400 } # Vars for httpd modules caching self.httpd_modules_ts = 0 self.httpd_modules = ""
def getCPName(self): """ Return panel name :return: """ return __cpname__
def get_cp_description(self): """ Retrieve panel name and it's version :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'} or None if can't get info """ try: f = open("/usr/local/psa/version", "r") out = f.read() f.close() return {'name': __cpname__, 'version': out.split()[0], 'additional_info': None} except: return None
def db_access(self): """ Getting root access to mysql database. For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}
:return: root access to mysql database :rtype: dict :raises: NoDBAccessData """ return db_access()
def cpusers(self): """ Generates a list of cpusers registered in the control panel
:return: list of cpusers registered in the control panel :rtype: tuple """ return cpusers()
def resellers(self): """ Generates a list of resellers in the control panel
:return: list of cpusers registered in the control panel :rtype: tuple """ return resellers()
def is_reseller(self, username): """ Check if given user is reseller; :type username: str :rtype: bool """ return is_reseller(username)
def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False): """ Get mapping between system and DB users @param cplogin_lst :list: list with usernames for generate mapping @param with_system_users :bool: add system users to result list or no. default: False """ return dblogin_cplogin_pairs(cplogin_lst, with_system_users)
def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True): """ Retrieves info about panel user(s) :param str|unicode|list|tuple|None cpuser: user login :param keyls: list of data which is necessary to obtain the user, the values​can be: cplogin - name/login user control panel mail - Email users reseller - name reseller/owner users locale - localization of the user account package - User name of the package dns - domain of the user :param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk) :return: returns a tuple of tuples of data in the same sequence as specified keys in keylst :rtype: tuple """ return cpinfo(cpuser, keyls, search_sys_users=search_sys_users)
def get_admin_email(self): """ Retrieve admin email address :return: Host admin's email """ return get_admin_email()
def docroot(self, domain): """ Return document root for domain :param str|unicode domain: :return str: document root for domain """ return docroot(domain)
@staticmethod def useraliases(cpuser, domain): """ Return aliases from user domain :param str|unicode cpuser: user login :param str|unicode domain: :return list of aliases """ sql = """ SELECT a.name, d.name FROM domains AS d INNER JOIN domain_aliases AS a ON a.dom_id = d.id INNER JOIN hosting AS h ON h.dom_id = d.id INNER JOIN sys_users AS su ON h.sys_user_id = su.id WHERE su.login = %s AND d.name = %s """ return [item[0] for item in query_sql(sql, (cpuser, domain))]
def userdomains(self, cpuser): """ Return domain and document root pairs for control panel user first domain is main domain :param str|unicode cpuser: user login :return list of tuples (domain_name, documen_root) """ return userdomains(cpuser)
def homedirs(self): """ Detects and returns list of folders contained the home dirs of users of the cPanel :return: list of folders, which are parent of home dirs of users of the panel """ return homedirs()
def reseller_users(self, resellername=None): """ Return reseller users :param resellername: reseller name; autodetect name if None :return list[str]: user names list """ return reseller_users(resellername)
def reseller_domains(self, reseller_name=None): """ Get dict[user, domain] :param reseller_name: reseller's name :rtype: dict[str, str|None] :raises DomainException: if cannot obtain domains """ return reseller_domains(reseller_name)
def get_user_login_url(self, domain): """ Get login url for current panel; :type domain: str :rtype: str """ return get_user_login_url(domain)
def admins(self): """ List all admins names in given control panel :return: list of strings """ return admins()
def get_reseller_id_pairs(self): """ Plesk has no user associated with reseller, but we need some id for out internal purposes. Let's take it from database. """ return get_reseller_id_pairs()
def domain_owner(self, domain): """ Return domain's owner :param domain: Domain/sub-domain/add-domain name :rtype: str :return: user name or None if domain not found """ return domain_owner(domain)
def get_domains_php_info(self): """ Return php version information for each domain :return: domain to php info mapping :rtype: dict[str, dict] """ return get_domains_php_info()
def get_installed_php_versions(self): """ Get the list of PHP version installed in panel in the form of 'versionXY', for example alt-php56 or plesk-php80 "Versions by OS vendor" in Plesk DB have names: - module - fpm - fastcgi - cgi - synced They are FILTERED from the list :return: list """ sql = """ SELECT ServiceNodeEnvironment.name FROM ServiceNodeEnvironment WHERE (serviceNodeId = '1' AND section = 'phphandlers') """ # handler list example: # ['alt-php44-cgi', 'alt-php44-fastcgi', 'alt-php51-cgi', 'alt-php51-fastcgi'... ] ver_name_pattern = re.compile(r'^(alt-|plesk-)') named_php_handlers = [item[0] for item in query_sql(sql) if ver_name_pattern.match(item[0])] versions_set = set('-'.join(item.split('-')[:2]) for item in named_php_handlers) return list(sorted(versions_set))
def get_supported_cl_features(self) -> Dict[str, bool]: supported_features = super(PanelPlugin, self).get_supported_cl_features() return { **supported_features, Feature.PHP_SELECTOR: True, Feature.RUBY_SELECTOR: False, Feature.PYTHON_SELECTOR: False, Feature.NODEJS_SELECTOR: False, Feature.LSAPI: True, Feature.GOVERNOR: True, Feature.CAGEFS: True, Feature.RESELLER_LIMITS: True, Feature.XRAY: True, Feature.WPOS: False, }
def _get_active_apache_mpm_module(self) -> Optional[AnyStr]: """ Determines active MPM module for Apache Web Server :return: apache_active_module_name apache_active_module_name: 'prefork', 'event', 'worker' """ try: # Caching httpd output and refresh it only one time in hour if time.time() - self.httpd_modules_ts > 3600: self.httpd_modules = run_command(["httpd", "-M"]) self.httpd_modules_ts = time.time() except (OSError, IOError, ExternalProgramFailed): self.httpd_modules = "" self.httpd_modules_ts = time.time() for mpm_module in self.MPM_MODULES: if f"mpm_{mpm_module}_module" in self.httpd_modules: return mpm_module return None
def _get_max_request_workers_for_module(self, apache_module_name: str) \ -> Tuple[int, str]: """ Determine MaxRequestWorkers directive value for specified apache module Reads config file /etc/httpd/conf.modules.d/01-cgi.conf :param apache_module_name: Current apache's module name: 'prefork', 'event', 'worker' :return: tuple (max_req_num, message) max_req_num - Maximum request apache workers number or 0 if error message - OK/Error message """ try: return find_module_param_in_config(self.HTTPD_MPM_CONFIG, apache_module_name, 'MaxRequestWorkers', self.MPM_MODULES[apache_module_name]) except (OSError, IOError, IndexError, ValueError): return 0, format_exc()
def get_apache_max_request_workers(self) -> Tuple[int, str]: """ Get current maximum request apache workers from httpd's config :return: tuple (max_req_num, message) max_req_num - Maximum request apache workers number or 0 if error message - OK/Error message """ apache_active_module = self._get_active_apache_mpm_module() if apache_active_module is None: return 0, "httpd service doesn't work or mpm modules are absent" return self._get_max_request_workers_for_module(apache_active_module)
@staticmethod def get_main_username_by_uid(uid: int) -> str: """ Get "main" panel username by uid. :param uid: uid :return Username """ return get_main_username_by_uid(uid)
|