403Webshell
Server IP : 103.88.176.108  /  Your IP : 216.73.216.82
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux webserver 5.4.0-42-generic #46-Ubuntu SMP Fri Jul 10 00:24:02 UTC 2020 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/uaclient/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/uaclient/cli.py
#!/usr/bin/env python

"""Client to manage Ubuntu Advantage services on a machine."""

import argparse
from functools import wraps
import json
import logging
import os
import pathlib
import sys
import textwrap

from uaclient import config
from uaclient import contract
from uaclient import entitlements
from uaclient import exceptions
from uaclient import status as ua_status
from uaclient import util
from uaclient import version
from uaclient.clouds import identity

NAME = "ua"

USAGE_TMPL = "{name} {command} [flags]"
EPILOG_TMPL = (
    "Use {name} {command} --help for more information about a command."
)

STATUS_HEADER_TMPL = """\
Account: {account}
Subscription: {subscription}
Valid until: {contract_expiry}
Technical support level: {tech_support_level}
"""
UA_AUTH_TOKEN_URL = "https://auth.contracts.canonical.com"

DEFAULT_LOG_FORMAT = (
    "%(asctime)s - %(filename)s:(%(lineno)d) [%(levelname)s]: %(message)s"
)

STATUS_FORMATS = ["tabular", "json"]


def assert_root(f):
    """Decorator asserting root user"""

    @wraps(f)
    def new_f(*args, **kwargs):
        if os.getuid() != 0:
            raise exceptions.NonRootUserError()
        return f(*args, **kwargs)

    return new_f


def assert_attached(unattached_msg_tmpl=None):
    """Decorator asserting attached config.

    :param unattached_msg_tmpl: Optional msg template to format if raising an
        UnattachedError
    """

    def wrapper(f):
        @wraps(f)
        def new_f(args, cfg):
            if not cfg.is_attached:
                if unattached_msg_tmpl:
                    name = getattr(args, "name", "None")
                    msg = unattached_msg_tmpl.format(name=name)
                    exception = exceptions.UnattachedError(msg)
                else:
                    exception = exceptions.UnattachedError()
                raise exception
            return f(args, cfg)

        return new_f

    return wrapper


def assert_not_attached(f):
    """Decorator asserting unattached config."""

    @wraps(f)
    def new_f(args, cfg):
        if cfg.is_attached:
            raise exceptions.AlreadyAttachedError(cfg)
        return f(args, cfg)

    return new_f


def require_valid_entitlement_name(operation: str):
    """Decorator ensuring that args.name is a valid service.

    :param operation: the operation name to use in error messages
    """

    def wrapper(f):
        @wraps(f)
        def new_f(args, cfg):
            if hasattr(args, "name"):
                name = args.name
                tmpl = ua_status.MESSAGE_INVALID_SERVICE_OP_FAILURE_TMPL
                if name not in entitlements.ENTITLEMENT_CLASS_BY_NAME:
                    raise exceptions.UserFacingError(
                        tmpl.format(operation=operation, name=name)
                    )
            return f(args, cfg)

        return new_f

    return wrapper


def auto_attach_parser(parser):
    """Build or extend an arg parser for auto-attach subcommand."""
    parser.prog = "auto-attach"
    parser.usage = USAGE_TMPL.format(name=NAME, command=parser.prog)
    parser._optionals.title = "Flags"
    return parser


def attach_parser(parser):
    """Build or extend an arg parser for attach subcommand."""
    parser.usage = USAGE_TMPL.format(name=NAME, command="attach <token>")
    parser.prog = "attach"
    parser._optionals.title = "Flags"
    parser.add_argument(
        "token",
        nargs="?",  # action_attach asserts this required argument
        help="token obtained for Ubuntu Advantage authentication: {}".format(
            UA_AUTH_TOKEN_URL
        ),
    )
    parser.add_argument(
        "--no-auto-enable",
        action="store_false",
        dest="auto_enable",
        help="do not enable any recommended services automatically",
    )
    return parser


def detach_parser(parser):
    """Build or extend an arg parser for detach subcommand."""
    usage = USAGE_TMPL.format(name=NAME, command="detach")
    parser.usage = usage
    parser.prog = "detach"
    parser._optionals.title = "Flags"
    parser.add_argument(
        "--assume-yes",
        action="store_true",
        help="do not prompt for confirmation before performing the detach",
    )
    return parser


def enable_parser(parser):
    """Build or extend an arg parser for enable subcommand."""
    usage = USAGE_TMPL.format(name=NAME, command="enable") + " <name>"
    parser.usage = usage
    parser.prog = "enable"
    parser._positionals.title = "Arguments"
    parser._optionals.title = "Flags"
    parser.add_argument(
        "name",
        action="store",
        help="the name of the Ubuntu Advantage service to enable",
    )
    return parser


def disable_parser(parser):
    """Build or extend an arg parser for disable subcommand."""
    usage = USAGE_TMPL.format(name=NAME, command="disable") + " <name>"
    parser.usage = usage
    parser.prog = "disable"
    parser._positionals.title = "Arguments"
    parser._optionals.title = "Flags"
    parser.add_argument(
        "name",
        action="store",
        help="the name of the Ubuntu Advantage service to disable",
    )
    return parser


def status_parser(parser):
    """Build or extend an arg parser for status subcommand."""
    usage = USAGE_TMPL.format(name=NAME, command="status")
    parser.usage = usage
    parser.prog = "status"
    # This formatter_class ensures that our formatting below isn't lost
    parser.formatter_class = argparse.RawDescriptionHelpFormatter
    parser.description = textwrap.dedent(
        """\
        Report current status of Ubuntu Advantage services on system.

        This shows whether this machine is attached to an Ubuntu Advantage
        support contract. When attached, the report includes the specific
        support contract details including contract name, expiry dates, and the
        status of each service on this system.

        The attached status output has four columns:

        * SERVICE: name of the service
        * ENTITLED: whether the contract to which this machine is attached
          entitles use of this service. Possible values are: yes or no
        * STATUS: whether the service is enabled on this machine. Possible
          values are: enabled, disabled, n/a (if your contract entitles
          you to the service, but it isn't available for this machine) or — (if
          you aren't entitled to this service)
        * DESCRIPTION: a brief description of the service

        The unattached status output instead has three columns. SERVICE
        and DESCRIPTION are the same as above, and there is the addition
        of:

        * AVAILABLE: whether this service would be available if this machine
          were attached. The possible values are yes or no.
        """
    )

    parser.add_argument(
        "--format",
        action="store",
        choices=STATUS_FORMATS,
        default=STATUS_FORMATS[0],
        help=(
            "output status in the specified format (default: {})".format(
                STATUS_FORMATS[0]
            )
        ),
    )
    parser._optionals.title = "Flags"
    return parser


@assert_root
@require_valid_entitlement_name("disable")
@assert_attached(ua_status.MESSAGE_ENABLE_FAILURE_UNATTACHED_TMPL)
def action_disable(args, cfg):
    """Perform the disable action on a named entitlement.

    @return: 0 on success, 1 otherwise
    """
    ent_cls = entitlements.ENTITLEMENT_CLASS_BY_NAME[args.name]
    entitlement = ent_cls(cfg)
    ret = 0 if entitlement.disable() else 1
    cfg.status()  # Update the status cache
    return ret


def _perform_enable(
    entitlement_name: str,
    cfg: config.UAConfig,
    *,
    silent_if_inapplicable: bool = False
) -> bool:
    """Perform the enable action on a named entitlement.

    (This helper excludes any messaging, so that different enablement code
    paths can message themselves.)

    :param entitlement_name: the name of the entitlement to enable
    :param cfg: the UAConfig to pass to the entitlement
    :param silent_if_inapplicable:
        don't output messages when determining if an entitlement can be
        enabled on this system

    @return: True on success, False otherwise
    """
    ent_cls = entitlements.ENTITLEMENT_CLASS_BY_NAME[entitlement_name]
    entitlement = ent_cls(cfg)
    ret = entitlement.enable(silent_if_inapplicable=silent_if_inapplicable)
    cfg.status()  # Update the status cache
    return ret


@assert_root
@require_valid_entitlement_name("enable")
@assert_attached(ua_status.MESSAGE_ENABLE_FAILURE_UNATTACHED_TMPL)
def action_enable(args, cfg):
    """Perform the enable action on a named entitlement.

    @return: 0 on success, 1 otherwise
    """
    print(ua_status.MESSAGE_REFRESH_ENABLE)
    try:
        contract.request_updated_contract(cfg)
    except (util.UrlError, exceptions.UserFacingError):
        # Inability to refresh is not a critical issue during enable
        logging.debug(ua_status.MESSAGE_REFRESH_FAILURE, exc_info=True)
    return 0 if _perform_enable(args.name, cfg) else 1


@assert_root
@assert_attached()
def action_detach(args, cfg) -> int:
    """Perform the detach action for this machine.

    @return: 0 on success, 1 otherwise
    """
    return _detach(cfg, assume_yes=args.assume_yes)


def _detach(cfg: config.UAConfig, assume_yes: bool) -> int:
    """Detach the machine from the active Ubuntu Advantage subscription,

    :param cfg: a ``config.UAConfig`` instance
    :param assume_yes: Assume a yes answer to any prompts requested.
         In this case, it means automatically disable any service during
         detach.

    @return: 0 on success, 1 otherwise
    """
    to_disable = []
    for ent_cls in entitlements.ENTITLEMENT_CLASSES:
        ent = ent_cls(cfg)
        if ent.can_disable(silent=True):
            to_disable.append(ent)
    if to_disable:
        suffix = "s" if len(to_disable) > 1 else ""
        print("Detach will disable the following service{}:".format(suffix))
        for ent in to_disable:
            print("    {}".format(ent.name))
    if not assume_yes and not util.prompt_for_confirmation():
        return 1
    for ent in to_disable:
        ent.disable(silent=True)
    contract_client = contract.UAContractClient(cfg)
    machine_token = cfg.machine_token["machineToken"]
    contract_id = cfg.machine_token["machineTokenInfo"]["contractInfo"]["id"]
    contract_client.detach_machine_from_contract(machine_token, contract_id)
    cfg.delete_cache()
    print(ua_status.MESSAGE_DETACH_SUCCESS)
    return 0


def _attach_with_token(
    cfg: config.UAConfig, token: str, allow_enable: bool
) -> int:
    """Common functionality to take a token and attach via contract backend"""
    try:
        contract.request_updated_contract(
            cfg, token, allow_enable=allow_enable
        )
    except util.UrlError as exc:
        with util.disable_log_to_console():
            logging.exception(exc)
        print(ua_status.MESSAGE_ATTACH_FAILURE)
        cfg.status()  # Persist updated status in the event of partial attach
        return 1
    except exceptions.UserFacingError as exc:
        logging.warning(exc.msg)
        cfg.status()  # Persist updated status in the event of partial attach
        return 1
    contract_name = cfg.machine_token["machineTokenInfo"]["contractInfo"][
        "name"
    ]
    print(
        ua_status.MESSAGE_ATTACH_SUCCESS_TMPL.format(
            contract_name=contract_name
        )
    )

    action_status(args=None, cfg=cfg)
    return 0


def _get_contract_token_from_cloud_identity(cfg: config.UAConfig) -> str:
    """Detect cloud_type and request a contract token from identity info.

    :param cfg: a ``config.UAConfig`` instance

    :raise NonAutoAttachImageError: When not on an auto-attach image type.
    :raise UrlError: On unexpected connectivity issues to contract
        server or inability to access identity doc from metadata service.
    :raise ContractAPIError: On unexpected errors when talking to the contract
        server.
    :raise NonAutoAttachImageError: If this cloud type does not have
        auto-attach support.

    :return: contract token obtained from identity doc
    """
    try:
        instance = identity.cloud_instance_factory()
    except exceptions.UserFacingError as e:
        if cfg.is_attached:
            # We are attached on non-Pro Image, just report already attached
            raise exceptions.AlreadyAttachedError(cfg)
        # Unattached on non-Pro return UserFacing error msg details
        raise e
    current_iid = identity.get_instance_id()
    if cfg.is_attached:
        prev_iid = cfg.read_cache("instance-id")
        if current_iid == prev_iid:
            raise exceptions.AlreadyAttachedError(cfg)
        print("Re-attaching Ubuntu Advantage subscription on new instance")
        if _detach(cfg, assume_yes=True) != 0:
            raise exceptions.UserFacingError(
                ua_status.MESSAGE_DETACH_AUTOMATION_FAILURE
            )
    contract_client = contract.UAContractClient(cfg)
    try:
        tokenResponse = contract_client.request_auto_attach_contract_token(
            instance=instance
        )
    except contract.ContractAPIError as e:
        if e.code and 400 <= e.code < 500:
            raise exceptions.NonAutoAttachImageError(
                ua_status.MESSAGE_UNSUPPORTED_AUTO_ATTACH
            )
        raise e
    if current_iid:
        cfg.write_cache("instance-id", current_iid)

    return tokenResponse["contractToken"]


@assert_root
def action_auto_attach(args, cfg):
    token = _get_contract_token_from_cloud_identity(cfg)
    return _attach_with_token(cfg, token=token, allow_enable=True)


@assert_not_attached
@assert_root
def action_attach(args, cfg):
    if not args.token:
        raise exceptions.UserFacingError(
            ua_status.MESSAGE_ATTACH_REQUIRES_TOKEN
        )
    return _attach_with_token(
        cfg, token=args.token, allow_enable=args.auto_enable
    )


def get_parser():
    service_line_tmpl = " - {name}: {description}{url}"
    description_lines = [__doc__]
    sorted_classes = sorted(entitlements.ENTITLEMENT_CLASS_BY_NAME.items())
    for name, ent_cls in sorted_classes:
        if ent_cls.help_doc_url:
            url = " ({})".format(ent_cls.help_doc_url)
        else:
            url = ""
        service_line = service_line_tmpl.format(
            name=name, description=ent_cls.description, url=url
        )
        if len(service_line) <= 80:
            description_lines.append(service_line)
        else:
            wrapped_words = []
            line = service_line
            while len(line) > 80:
                [line, wrapped_word] = line.rsplit(" ", 1)
                wrapped_words.insert(0, wrapped_word)
            description_lines.extend([line, "   " + " ".join(wrapped_words)])

    parser = argparse.ArgumentParser(
        prog=NAME,
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description="\n".join(description_lines),
        usage=USAGE_TMPL.format(name=NAME, command="[command]"),
        epilog=EPILOG_TMPL.format(name=NAME, command="[command]"),
    )
    parser.add_argument(
        "--debug",
        action="store_true",
        help="show all debug log messages to console",
    )
    parser._optionals.title = "Flags"
    subparsers = parser.add_subparsers(
        title="Available Commands", dest="command", metavar=""
    )
    subparsers.required = True
    parser_status = subparsers.add_parser(
        "status", help="current status of all Ubuntu Advantage services"
    )
    parser_status.set_defaults(action=action_status)
    status_parser(parser_status)
    parser_attach = subparsers.add_parser(
        "attach",
        help="attach this machine to an Ubuntu Advantage subscription",
    )
    attach_parser(parser_attach)
    parser_attach.set_defaults(action=action_attach)
    parser_auto_attach = subparsers.add_parser(
        "auto-attach",
        help="automatically attach Ubuntu Advantage on supported platforms",
    )
    auto_attach_parser(parser_auto_attach)
    parser_auto_attach.set_defaults(action=action_auto_attach)
    parser_detach = subparsers.add_parser(
        "detach",
        help="remove this machine from an Ubuntu Advantage subscription",
    )
    detach_parser(parser_detach)
    parser_detach.set_defaults(action=action_detach)
    parser_enable = subparsers.add_parser(
        "enable",
        help="enable a specific Ubuntu Advantage service on this machine",
    )
    enable_parser(parser_enable)
    parser_enable.set_defaults(action=action_enable)
    parser_disable = subparsers.add_parser(
        "disable",
        help="disable a specific Ubuntu Advantage service on this machine",
    )
    disable_parser(parser_disable)
    parser_disable.set_defaults(action=action_disable)
    parser_refresh = subparsers.add_parser(
        "refresh",
        help="refresh Ubuntu Advantage services from contracts server",
    )
    parser_refresh.set_defaults(action=action_refresh)
    parser_version = subparsers.add_parser(
        "version", help="show version of {}".format(NAME)
    )
    parser_version.set_defaults(action=print_version)
    parser_help = subparsers.add_parser(
        "help", help="show this help message and exit"
    )
    parser_help.set_defaults(action=action_help)
    return parser


def action_status(args, cfg):
    if not cfg:
        cfg = config.UAConfig()
    if args and args.format == "json":
        status = cfg.status()
        if status["expires"] != ua_status.UserFacingStatus.INAPPLICABLE.value:
            status["expires"] = str(status["expires"])
        print(json.dumps(status))
    else:
        output = ua_status.format_tabular(cfg.status())
        # Replace our Unicode dash with an ASCII dash if we aren't going to be
        # writing to a utf-8 output; see
        # https://github.com/CanonicalLtd/ubuntu-advantage-client/issues/859
        if (
            sys.stdout.encoding is None
            or "UTF-8" not in sys.stdout.encoding.upper()
        ):
            output = output.replace("\u2014", "-")
        print(output)
    return 0


def print_version(_args=None, _cfg=None):
    print(version.get_version())


@assert_root
@assert_attached()
def action_refresh(args, cfg):
    try:
        contract.request_updated_contract(cfg)
    except util.UrlError as exc:
        with util.disable_log_to_console():
            logging.exception(exc)
        raise exceptions.UserFacingError(ua_status.MESSAGE_REFRESH_FAILURE)
    print(ua_status.MESSAGE_REFRESH_SUCCESS)
    return 0


def action_help(_args, _cfg):
    get_parser().print_help()
    return 0


def setup_logging(console_level, log_level, log_file=None):
    """Setup console logging and debug logging to log_file"""
    if log_file is None:
        log_file = config.CONFIG_DEFAULTS["log_file"]
    console_formatter = util.LogFormatter()
    log_formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
    root = logging.getLogger()
    root.setLevel(log_level)
    # Setup console logging
    stderr_found = False
    for handler in root.handlers:
        if hasattr(handler, "stream") and hasattr(handler.stream, "name"):
            if handler.stream.name == "<stderr>":
                handler.setLevel(console_level)
                handler.setFormatter(console_formatter)
                handler.set_name("console")  # Used to disable console logging
                stderr_found = True
                break
    if not stderr_found:
        console = logging.StreamHandler(sys.stderr)
        console.setFormatter(console_formatter)
        console.setLevel(console_level)
        console.set_name("console")  # Used to disable console logging
        root.addHandler(console)
    if os.getuid() == 0:
        # Setup readable-by-root-only debug file logging if running as root
        log_file_path = pathlib.Path(log_file)
        log_file_path.touch()
        log_file_path.chmod(0o600)

        filehandler = logging.FileHandler(log_file)
        filehandler.setLevel(log_level)
        filehandler.setFormatter(log_formatter)
        root.addHandler(filehandler)


def main_error_handler(func):
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except KeyboardInterrupt:
            with util.disable_log_to_console():
                logging.exception("KeyboardInterrupt")
            print("Interrupt received; exiting.", file=sys.stderr)
            sys.exit(1)
        except util.UrlError as exc:
            with util.disable_log_to_console():
                msg_args = {"url": exc.url, "error": exc}
                if exc.url:
                    msg_tmpl = ua_status.LOG_CONNECTIVITY_ERROR_WITH_URL_TMPL
                else:
                    msg_tmpl = ua_status.LOG_CONNECTIVITY_ERROR_TMPL
                logging.exception(msg_tmpl.format(**msg_args))
            print(ua_status.MESSAGE_CONNECTIVITY_ERROR, file=sys.stderr)
            sys.exit(1)
        except exceptions.UserFacingError as exc:
            with util.disable_log_to_console():
                logging.exception(exc.msg)
            print("{}".format(exc.msg), file=sys.stderr)
            sys.exit(exc.exit_code)
        except Exception:
            with util.disable_log_to_console():
                logging.exception("Unhandled exception, please file a bug")
            print(ua_status.MESSAGE_UNEXPECTED_ERROR, file=sys.stderr)
            sys.exit(1)

    return wrapper


@main_error_handler
def main(sys_argv=None):
    if not sys_argv:
        sys_argv = sys.argv
    parser = get_parser()
    cli_arguments = sys_argv[1:]
    if not cli_arguments:
        parser.print_usage()
        print("Try 'ua --help' for more information.")
        sys.exit(1)
    args = parser.parse_args(args=cli_arguments)
    cfg = config.UAConfig()
    log_level = cfg.log_level
    console_level = logging.DEBUG if args.debug else logging.INFO
    setup_logging(console_level, log_level, cfg.log_file)
    logging.debug("Executed with sys.argv: %r", sys_argv)
    return args.action(args, cfg)


if __name__ == "__main__":
    sys.exit(main())

Youez - 2016 - github.com/yon3zu
LinuXploit