Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:ojkastl_buildservice:Branch_devel_languages_python
python-proxmoxer
proxmoxer-2.1.0.obscpio
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File proxmoxer-2.1.0.obscpio of Package python-proxmoxer
07070100000000000081A400000000000000000000000166B7C6B900000058000000000000000000000000000000000000001800000000proxmoxer-2.1.0/.banditskips: - B105 - B106 assert_used: skips: - '*/*_test.py' - '*/test_*.py' 07070100000001000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000001E00000000proxmoxer-2.1.0/.devcontainer07070100000002000081A400000000000000000000000166B7C6B90000033B000000000000000000000000000000000000002900000000proxmoxer-2.1.0/.devcontainer/Dockerfile# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.166.1/containers/python-3/.devcontainer/base.Dockerfile # [Choice] Python version: 3, 3.11, 3.10, 3.9, 3.8, 3.7, 3.6 ARG VARIANT="3" FROM mcr.microsoft.com/vscode/devcontainers/python:${VARIANT} # [Optional] If your pip requirements rarely change, uncomment this section to add them to the image. COPY test_requirements.txt dev_requirements.txt /tmp/pip-tmp/ RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/test_requirements.txt -r /tmp/pip-tmp/dev_requirements.txt \ && rm -rf /tmp/pip-tmp # [Optional] Uncomment this section to install additional OS packages. # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # && apt-get -y install --no-install-recommends <your-package-list-here> 07070100000003000081A400000000000000000000000166B7C6B9000007B7000000000000000000000000000000000000003000000000proxmoxer-2.1.0/.devcontainer/devcontainer.json// For format details, see https://aka.ms/devcontainer.json. For config options, see the // README at: https://github.com/devcontainers/templates/tree/main/src/python { "name": "Proxmoxer Development", "build": { "dockerfile": "Dockerfile", "context": "..", "args": { // Update 'VARIANT' to pick a Python version: 3, 3.6, 3.7, 3.8, 3.9, 3.10, 3.11 "VARIANT": "3.8" } }, // Set *default* container specific settings.json values on container create. "customizations": { "vscode": { "settings": { "terminal.integrated.shell.linux": "/bin/bash", "python.pythonPath": "/usr/local/bin/python", "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", "python.formatting.blackPath": "/usr/local/py-utils/bin/black", "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8", "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint" }, // Add the IDs of extensions you want installed when the container is created. "extensions": [ "mhutchie.git-graph", "ms-python.python", "njpwerner.autodocstring", "ryanluker.vscode-coverage-gutters", "streetsidesoftware.code-spell-checker" ] } }, // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], // Run commands to prepare the container for use "postCreateCommand": ".devcontainer/setup.sh", // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "vscode" } 07070100000004000081ED00000000000000000000000166B7C6B900000184000000000000000000000000000000000000002700000000proxmoxer-2.1.0/.devcontainer/setup.sh#!/bin/bash # install proxmoxer as an editable package pip3 install -e . rm -rf proxmoxer.egg-info/ # hide the mass-formatting commits from git blames git config blame.ignorerevsfile .git-blame-ignore-revs # install the git hook for pre-commit pre-commit install # run pre-commit on a simple file to ensure it downloads all needed tools pre-commit run --files .pre-commit-config.yaml 07070100000005000081A400000000000000000000000166B7C6B90000008C000000000000000000000000000000000000002700000000proxmoxer-2.1.0/.git-blame-ignore-revs# use with `git config blame.ignorerevsfile .git-blame-ignore-revs` # Format code base with Black 7a976de985fc7b71fdf31d3161f223eeaada38da 07070100000006000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000001800000000proxmoxer-2.1.0/.github07070100000007000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000002200000000proxmoxer-2.1.0/.github/workflows07070100000008000081A400000000000000000000000166B7C6B900000786000000000000000000000000000000000000002A00000000proxmoxer-2.1.0/.github/workflows/ci.yamlname: CI on: push: pull_request: # Allows you to run this workflow manually from the Actions tab workflow_dispatch: jobs: unit-test: continue-on-error: ${{ github.repository == 'proxmoxer/proxmoxer' }} runs-on: ubuntu-latest strategy: fail-fast: false matrix: python-version: - "3.8" - "3.9" - "3.10" - "3.11" - "3.12" steps: - name: Checkout uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Cache PIP packages uses: actions/cache@v3 with: path: ~/.cache/pip key: ${{ runner.os }}-pip-python${{ matrix.python-version }}-${{ hashFiles('*requirements.txt') }} restore-keys: | ${{ runner.os }}-pip-python${{ matrix.python-version }}- ${{ runner.os }}-pip- - name: Install pip Packages run: pip install -r test_requirements.txt - name: Install Self as Package run: pip install . - name: Run Tests run: pytest -v --cov tests/ - name: Run pre-commit lint/format checks uses: pre-commit/action@v3.0.0 - name: Upload coverage data to coveralls.io if: github.repository == 'proxmoxer/proxmoxer' run: coveralls --service=github env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} COVERALLS_FLAG_NAME: Unit Test (${{ matrix.python-version }}) COVERALLS_PARALLEL: true complete: name: Finalize Coveralls Report if: github.repository == 'proxmoxer/proxmoxer' needs: unit-test runs-on: ubuntu-latest steps: - name: Coveralls Finished uses: coverallsapp/github-action@1.1.3 with: parallel-finished: true github-token: ${{ secrets.GITHUB_TOKEN }} 07070100000009000081A400000000000000000000000166B7C6B900000841000000000000000000000000000000000000001B00000000proxmoxer-2.1.0/.gitignore# IDE files .idea *.code-workspace coverage.* # generated files README.txt # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # PEP 582; used by e.g. github.com/David-OConnor/pyflow __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ 0707010000000A000081A400000000000000000000000166B7C6B90000060B000000000000000000000000000000000000002800000000proxmoxer-2.1.0/.pre-commit-config.yamlrepos: ###### FORMATTING ###### - repo: https://github.com/psf/black-pre-commit-mirror rev: 23.11.0 hooks: - id: black language_version: python3 # Should be a command that runs python3.6+ - repo: https://github.com/PyCQA/isort rev: 5.12.0 hooks: - id: isort name: isort (python) - id: isort name: isort (pyi) types: [pyi] ###### LINTING ###### - repo: https://github.com/PyCQA/bandit rev: 1.7.5 hooks: - id: bandit args: ["--configfile", ".bandit", "--baseline", "tests/known_issues.json"] - repo: https://github.com/PyCQA/flake8 rev: 6.1.0 hooks: - id: flake8 # any flake8 plugins must be included in the hook venv # additional_dependencies: [flake8-docstrings] # - repo: https://github.com/PyCQA/pylint # rev: v2.8.2 # hooks: # - id: pylint - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: - id: check-case-conflict - id: check-symlinks - id: destroyed-symlinks - id: check-merge-conflict - id: check-docstring-first - id: mixed-line-ending args: [--fix=no] - repo: https://github.com/asottile/blacken-docs rev: 1.16.0 hooks: - id: blacken-docs additional_dependencies: [black==23.11.0] - repo: https://github.com/pre-commit/pygrep-hooks rev: v1.10.0 hooks: - id: python-no-eval - id: rst-backticks - id: rst-directive-colons - id: rst-inline-touching-normal 0707010000000B000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000001800000000proxmoxer-2.1.0/.vscode0707010000000C000081A400000000000000000000000166B7C6B900000413000000000000000000000000000000000000002600000000proxmoxer-2.1.0/.vscode/settings.json{ "python.linting.enabled": true, "python.formatting.provider": "black", "editor.formatOnPaste": false, "python.linting.flake8Enabled": true, "python.linting.pylintEnabled": true, "python.linting.banditEnabled": true, "python.linting.banditArgs": [ "--baseline", "tests/known_issues.json", "--configfile", ".bandit" ], "[python]": { "editor.codeActionsOnSave": { "source.organizeImports": true } }, "cSpell.words": [ "auths", "Butovich", "caplog", "cpus", "Oleg", "onboot", "ostemplate", "Paramiko", "proxmoxer", "pvesh", "resps", "rtype", "sess", "toolbelt", "vmid", "vztmpl" ], "autoDocstring.docstringFormat": "sphinx", "autoDocstring.startOnNewLine": true, "python.testing.pytestArgs": [ "--cov", "--cov-report", "xml:coverage.xml", "tests/", ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, "coverage-gutters.coverageFileNames": [ "coverage.xml" ], } 0707010000000D000081A400000000000000000000000166B7C6B900000AD7000000000000000000000000000000000000002300000000proxmoxer-2.1.0/.vscode/tasks.json{ // See https://go.microsoft.com/fwlink/?LinkId=733558 // for the documentation about the tasks.json format "version": "2.0.0", "tasks": [ { "label": "Run Tests (with coverage file)", "type": "shell", "command": "pytest -v --cov --cov-report xml:coverage.xml tests/", "problemMatcher": [], "icon": { "id": "beaker", "color": "terminal.ansiGreen" }, "runOptions": { "instanceLimit": 1 }, "group": { "kind": "test", "isDefault": true }, "presentation": { "echo": true, "reveal": "silent", "focus": false, "panel": "dedicated", "showReuseMessage": false, "clear": true }, }, { "label": "Run Tests (with coverage)", "type": "shell", "command": "pytest --cov tests/", "problemMatcher": [], "icon": { "id": "beaker", "color": "terminal.ansiGreen" }, "runOptions": { "instanceLimit": 1 }, "group": { "kind": "test", "isDefault": false }, "presentation": { "echo": true, "reveal": "always", "focus": false, "panel": "dedicated", "showReuseMessage": true, "clear": true }, }, { "label": "Run Tests", "type": "shell", "command": "pytest -v tests/", "problemMatcher": [], "icon": { "id": "beaker", }, "group": { "kind": "test" }, "presentation": { "echo": true, "reveal": "always", "focus": false, "panel": "dedicated", "showReuseMessage": false, "clear": true }, }, { "label": "Update bandit baseline", "type": "shell", "command": "bandit --configfile .bandit -f json -r tests/ proxmoxer/ >| tests/known_issues.json", "problemMatcher": [], "runOptions": { "instanceLimit": 1 }, "group": { "kind": "none" }, "icon": { "id": "bookmark" }, "presentation": { "echo": true, "reveal": "never", "focus": false, "panel": "shared", "showReuseMessage": false, "clear": false }, }, { "label": "Clean Cache/tmp files", "type": "shell", "command": "rm -rf ./.mypy_cache/ ./.pytest_cache/ ./coverage.xml ./.coverage", "problemMatcher": [], "group": { "kind": "none" }, "icon": { "id": "trashcan" }, "presentation": { "echo": true, "reveal": "never", "focus": false, "panel": "shared", "showReuseMessage": false, "clear": false }, } ] } 0707010000000E000081A400000000000000000000000166B7C6B900002183000000000000000000000000000000000000001D00000000proxmoxer-2.1.0/CHANGELOG.md## 2.1.0 (2024-08-10) * Improvement (docs): Update Readme with updated example ([Rob Wolinski](https://github.com/trekie86)) * Addition (tools): Added Files tools ([John Hollowell](https://github.com/jhollowe)) * Improvement (all): Add repr to some classes and add to tests ([John Hollowell](https://github.com/jhollowe)) * Bugfix (all): Correct metadata to match supported Python versions (3.6+) ([Alexei Znamensky](https://github.com/russoz)) * Bugfix (https): Fix BytesWarning when logging response status/content ([Walter Doekes](https://github.com/wdoekes)) * Improvement (meta): Update devcontainer to modern unified schema ([John Hollowell](https://github.com/jhollowe)) * Improvement (meta): Add 3.12 to CI matrix, remove 3.7 testing ([John Hollowell](https://github.com/jhollowe)) * Improvement (all): Fix improper spliting of non-exec QEMU commands ([John Hollowell](https://github.com/jhollowe)) ## 2.0.1 (2022-12-19) * Bugfix (https): properly pass verify_ssl all the way to the backend auth ([Dominik Rimpf](https://github.com/domrim)) ## 2.0.0 (2022-11-27) * Improvement (all): Convert testing framework to use pytest ([John Hollowell](https://github.com/jhollowe)) * Improvement (all): Remove Python 2.x support (minimum version of 3.7) ([John Hollowell](https://github.com/jhollowe)) * Improvement (all): Refactor code to Python 3 standards ([John Hollowell](https://github.com/jhollowe)) * Bugfix (all): Remove None values from request data and params ([Kristian Heljas](https://github.com/kristianheljas)) * Addition (tools): Added Task tools ([John Hollowell](https://github.com/jhollowe)) * Bugfix (all): Allow specifying resource_id as 0 ([John Bergvall](https://github.com/johnbergvall)) * Improvement (all): Remove ProxmoxResourceBase ([John Hollowell](https://github.com/jhollowe)) * Bugfix (all): Add platform detection before using shlex functions ([Kevin Boyd](https://github.com/r3d07)) * Improvement (https): Added `path_prefix` argument which is appended after the root of the URL (before `api2/`) ([John Hollowell](https://github.com/jhollowe)) ### Breaking Changes * `ProxmoxResourceBase` removed * `proxmoxer.backends.https.AuthenticationError` moved to `proxmoxer.AuthenticationError` * Removed `ProxmoxHTTPTicketAuth` and its arguments `auth_token` and `csrf_token` * keyword arguments to backends order changed (should not affect users specifying arguments by name) ## 1.3.1 (2022-05-14) * Bugfix (all): fix error handling for APIs that don't give a dict in the response ([Alex Wuillaume](https://github.com/wuillaumea)) ## 1.3.0 (2022-03-13) * Addition (local): Added `local` backend for running directly on Proxmox hosts. ([Markus Reiter](https://github.com/reitermarkus)) * Bugfix (all): properly parse command string sent to QEMU guest agent ([John Hollowell](https://github.com/jhollowe)) * Improvement (command_base): Refactor code to have a unified CLI backend base for `openssh`, `ssh_paramiko`, and `local` backends ([Markus Reiter](https://github.com/reitermarkus)) * Improvement (https): Support IPv6 addresses ([Daviddcc](https://github.com/dcasier)) * Improvement: Move CI to GitHub actions from Travis.ci ([John Hollowell](https://github.com/jhollowe)) * Improvement: Cleanup documentaiton and move to dedicated site ([John Hollowell](https://github.com/jhollowe)) * Improvement: Add `pre-commit` hooks for formatting and linting and format all code ([John Hollowell](https://github.com/jhollowe)) ## 1.2.0 (2021-10-07) * Addition (https): Added OTP code support to authentication ([John Hollowell](https://github.com/jhollowe)) * Addition (https): Added support for large file uploads using requests_toolbelt module ([John Hollowell](https://github.com/jhollowe)) * Addition (all): Added support for Proxmox Mail Gateway (PMG) and Proxmox Backup Server (PBS) with parameter validation ([Gabriel Cardoso de Faria](https://github.com/gabrielcardoso21), [John Hollowell](https://github.com/jhollowe)) * Addition (all): Added detailed information to ResourceException ([mihailstoynov](https://github.com/mihailstoynov)) * Bugfix (base_ssh): Resolved issue with values containing spaces by encapsulating values in quotes ([mihailstoynov](https://github.com/mihailstoynov)) * Bugfix (all): Resolved issue with using get/post/push/delete on a base ProxmoxAPI object ([John Hollowell](https://github.com/jhollowe)) * Bugfix (all): Added support for responses which are not JSON ([John Hollowell](https://github.com/jhollowe)) * Improvement: Added and updated documentation ([Ananias Filho](https://github.com/ananiasfilho), [Thomas Baag](https://github.com/b2ag)) * Improvement: Tests are now not installed when using PIP ([Ville Skyttä](https://github.com/scop)) * Addition: Devcontainer definition now available to make development easier ([John Hollowell](https://github.com/jhollowe)) ## 1.1.1 (2020-06-23) * Bugfix (https): correctly renew ticket in the session, not just the auth ([John Hollowell](https://github.com/jhollowe)) ## 1.1.0 (2020-05-22) * Addition (https): Added API Token authentication ([John Hollowell](https://github.com/jhollowe)) * Improvement (https): user/password authentication refreshes ticket to prevent expiration ([CompileNix](https://github.com/compilenix), [John Hollowell](https://github.com/jhollowe)) * Bugfix (ssh_paramiko): Handle empty stderr from ssh connections ([morph027](https://github.com/morph027)) * DEPRECATED (https): using ``auth_token`` and ``csrf_token`` (ProxmoxHTTPTicketAuth) is now deprecated. Either pass the ``auth_token`` as the ``password`` or use the API Tokens. ## 1.0.4 (2020-01-24) * Improvement (https): Added timeout to authentication (James Lin) * Improvement (https): Handle AnyEvent::HTTP status codes gracefully (Georges Martin) * Improvement (https): Advanced error message with error code >=400 ([ssi444](https://github.com/ssi444)) * Bugfix (ssh): Fix pvesh output format for version > 5.3 ([timansky](https://github.com/timansky)) * Transferred development to proxmoxer organization ## 1.0.3 (2018-09-10) * Improvement (https): Added option to specify port in hostname parameter ([pvanagtmaal](https://github.com/pvanagtmaal)) * Improvement: Added stderr to the Response content ([Jérôme Schneider](https://github.com/merinos)) * Bugfix (ssh_paramiko): Paramiko python3: stdout and stderr must be a str not bytes ([Jérôme Schneider](https://github.com/merinos)) * New lxc example in docu ([Geert Stappers](https://github.com/stappersg)) ## 1.0.2 (2017-12-02) * Tarball repackaged with tests ## 1.0.1 (2017-12-02) * LICENSE file now included in tarball * Added verify_ssl parameter to ProxmoxHTTPAuth ([Walter Doekes](https://github.com/wdoekes)) ## 1.0.0 (2017-11-12) * Update Proxmoxer readme ([Emmanuel Kasper](https://github.com/EmmanuelKasper)) * Display the reason of API calls errors ([Emmanuel Kasper](https://github.com/EmmanuelKasper), [kantsdog](https://github.com/kantsdog)) * Filter for ssh response code ([Chris Plock](https://github.com/chrisplo)) ## 0.2.5 (2017-02-12) * Adding sudo to execute CLI with paramiko ssh backend ([Jason Meridth](https://github.com/jmeridth)) * Proxmoxer/backends/ssh_paramiko: improve file upload ([Jérôme Schneider](https://github.com/merinos)) ## 0.2.4 (2016-05-02) * Removed newline in tmp_filename string ([Jérôme Schneider](https://github.com/merinos)) * Fix to avoid module reloading ([jklang](https://github.com/jklang)) ## 0.2.3 (2016-01-20) * Minor typo fix ([Srinivas Sakhamuri](https://github.com/srsakhamuri)) ## 0.2.2 (2016-01-19) * Adding sudo to execute pvesh CLI in openssh backend ([Wei Tie](https://github.com/TieWei), [Srinivas Sakhamuri](https://github.com/srsakhamuri)) * Add support to specify an identity file for ssh connections ([Srinivas Sakhamuri](https://github.com/srsakhamuri)) ## 0.2.1 (2015-05-02) * fix for python 3.4 ([kokuev](https://github.com/kokuev)) ## 0.2.0 (2015-03-21) * Https will now raise AuthenticationError when appropriate. ([scap1784](https://github.com/scap1784)) * Preliminary python 3 compatibility. ([wdoekes](https://github.com/wdoekes)) * Additional example. ([wdoekes](https://github.com/wdoekes)) ## 0.1.7 (2014-11-16) * Added ignore of "InsecureRequestWarning: Unverified HTTPS request is being made..." warning while using https (requests) backend. ## 0.1.4 (2013-06-01) * Added logging * Added openssh backend * Tests are reorganized ## 0.1.3 (2013-05-30) * Added next tests * Bugfixes ## 0.1.2 (2013-05-27) * Added first tests * Added support for travis and coveralls * Bugfixes ## 0.1.1 (2013-05-13) * Initial try. 0707010000000F000081A400000000000000000000000166B7C6B900000431000000000000000000000000000000000000001C00000000proxmoxer-2.1.0/LICENSE.txtThe MIT License Copyright (c) 2013 Oleg Butovich Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.07070100000010000081A400000000000000000000000166B7C6B900000077000000000000000000000000000000000000001C00000000proxmoxer-2.1.0/MANIFEST.ininclude LICENSE.txt include README.txt include README.rst include CHANGELOG.md global-exclude *.orig *.pyc *.log *.swp 07070100000011000081A400000000000000000000000166B7C6B900001004000000000000000000000000000000000000001B00000000proxmoxer-2.1.0/README.rst================================================ Proxmoxer: A Python wrapper for Proxmox REST API ================================================ master branch: |master_build_status| |master_coverage_status| |pypi_version| |pypi_downloads| develop branch: |develop_build_status| |develop_coverage_status| Proxmoxer is a python wrapper around the `Proxmox REST API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_. It currently supports the Proxmox services of Proxmox Virtual Environment (PVE), Proxmox Mail Gateway (PMG), and Proxmox Backup Server (PBS). It was inspired by slumber, but it is dedicated only to Proxmox. It allows not only REST API use over HTTPS, but the same api over ssh and pvesh utility. Like `Proxmoxia <https://github.com/baseblack/Proxmoxia>`_, it dynamically creates attributes which responds to the attributes you've attempted to reach. Full Documentation is available at https://proxmoxer.github.io/docs/ -------------------------------------------------------------------- Migrating to version 2 ...................... Full instructions for the minimal steps needed to update to version 2 can be found in `Migration Docs <https://proxmoxer.github.io/docs/latest/v1_migration/>`_. Installation ............ .. code-block:: bash pip install proxmoxer To use the 'https' backend, install requests .. code-block:: bash pip install requests To use the 'ssh_paramiko' backend, install paramiko .. code-block:: bash pip install paramiko To use the 'openssh' backend, install openssh_wrapper .. code-block:: bash pip install openssh_wrapper Short usage information ....................... The first thing to do is import the proxmoxer library and create ProxmoxAPI instance. .. code-block:: python from proxmoxer import ProxmoxAPI proxmox = ProxmoxAPI( "proxmox_host", user="admin@pam", password="secret_word", verify_ssl=False ) This will connect by default to PVE through the 'https' backend. **Note: ensure you have the required libraries (listed above) for the connection method you are using** Queries are exposed via the access methods **get**, **post**, **put** and **delete**. For convenience two synonyms are available: **create** for **post**, and **set** for **put**. Using the paths from the `PVE API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_, you can create API calls using the access methods above. .. code-block:: pycon >>> for node in proxmox.nodes.get(): ... for vm in proxmox.nodes(node["node"]).qemu.get(): ... print(f"{vm['vmid']}. {vm['name']} => {vm['status']}") ... 141. puppet-2.london.example.com => running 101. munki.london.example.com => running 102. redmine.london.example.com => running 140. dns-1.london.example.com => running 126. ns-3.london.example.com => running 113. rabbitmq.london.example.com => running See Changelog in `CHANGELOG.md <https://github.com/proxmoxer/proxmoxer/blob/develop/CHANGELOG.md>`_ ................................................................................................... .. |master_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=master :target: https://github.com/proxmoxer/proxmoxer/actions .. |master_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/master :target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=master .. |develop_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=develop :target: https://github.com/proxmoxer/proxmoxer/actions .. |develop_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/develop :target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=develop .. |pypi_version| image:: https://img.shields.io/pypi/v/proxmoxer.svg :target: https://pypi.python.org/pypi/proxmoxer .. |pypi_downloads| image:: https://img.shields.io/pypi/dm/proxmoxer.svg :target: https://pypi.python.org/pypi/proxmoxer 07070100000012000081A400000000000000000000000166B7C6B90000001C000000000000000000000000000000000000002500000000proxmoxer-2.1.0/dev_requirements.txttwine setuptools pre-commit 07070100000013000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000001A00000000proxmoxer-2.1.0/proxmoxer07070100000014000081A400000000000000000000000166B7C6B900000092000000000000000000000000000000000000002600000000proxmoxer-2.1.0/proxmoxer/__init__.py__author__ = "Oleg Butovich" __copyright__ = "(c) Oleg Butovich 2013-2017" __version__ = "2.1.0" __license__ = "MIT" from .core import * # noqa 07070100000015000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000002300000000proxmoxer-2.1.0/proxmoxer/backends07070100000016000081A400000000000000000000000166B7C6B90000005F000000000000000000000000000000000000002F00000000proxmoxer-2.1.0/proxmoxer/backends/__init__.py__author__ = "Oleg Butovich" __copyright__ = "(c) Oleg Butovich 2013-2017" __license__ = "MIT" 07070100000017000081A400000000000000000000000166B7C6B9000012A7000000000000000000000000000000000000003300000000proxmoxer-2.1.0/proxmoxer/backends/command_base.py__author__ = "Oleg Butovich" __copyright__ = "(c) Oleg Butovich 2013-2017" __license__ = "MIT" import json import logging import platform import re from itertools import chain from shlex import split as shell_split from proxmoxer.core import SERVICES logger = logging.getLogger(__name__) logger.setLevel(level=logging.WARNING) try: from shlex import join def shell_join(args): return join(args) except ImportError: from shlex import quote def shell_join(args): return " ".join([quote(arg) for arg in args]) class Response: def __init__(self, content, status_code): self.status_code = status_code self.content = content self.text = str(content) self.headers = {"content-type": "application/json"} def __str__(self): return f"Response ({self.status_code}) {self.content}" class CommandBaseSession: def __init__( self, service="PVE", timeout=5, sudo=False, ): self.service = service.lower() self.timeout = timeout self.sudo = sudo def _exec(self, cmd): raise NotImplementedError() # noinspection PyUnusedLocal def request(self, method, url, data=None, params=None, headers=None): method = method.lower() data = data or {} params = params or {} url = url.strip() cmd = {"post": "create", "put": "set"}.get(method, method) # separate out qemu exec commands to split into multiple argument pairs (issue#89) data_command = None if "/agent/exec" in url: data_command = data.get("command") if data_command is not None: del data["command"] # for 'upload' call some workaround tmp_filename = "" if url.endswith("upload"): # copy file to temporary location on proxmox host tmp_filename, _ = self._exec( [ "python3", "-c", "import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)", ] ) tmp_filename = str(tmp_filename, "utf-8") self.upload_file_obj(data["filename"], tmp_filename) data["filename"] = data["filename"].name data["tmpfilename"] = tmp_filename command = [f"{self.service}sh", cmd, url] # convert the options dict into a 2-tuple with the key formatted as a flag option_pairs = [(f"-{k}", str(v)) for k, v in chain(data.items(), params.items())] # add back in all the command arguments as their own pairs if data_command is not None: if isinstance(data_command, list): command_arr = data_command elif "Windows" not in platform.platform(): command_arr = shell_split(data_command) for arg in command_arr: option_pairs.append(("-command", arg)) # expand the list of 2-tuples into a flat list options = [val for pair in option_pairs for val in pair] additional_options = SERVICES[self.service.upper()].get("cli_additional_options", []) full_cmd = command + options + additional_options if self.sudo: full_cmd = ["sudo"] + full_cmd stdout, stderr = self._exec(full_cmd) def is_http_status_string(s): return re.match(r"\d\d\d [a-zA-Z]", str(s)) if stderr: # sometimes contains extra text like 'trying to acquire lock...OK' status_code = next( ( int(line.split()[0]) for line in stderr.splitlines() if is_http_status_string(line) ), 500, ) else: status_code = 200 if stdout: return Response(stdout, status_code) return Response(stderr, status_code) def upload_file_obj(self, file_obj, remote_path): raise NotImplementedError() class JsonSimpleSerializer: def loads(self, response): try: return json.loads(response.content) except (UnicodeDecodeError, ValueError): return {"errors": response.content} def loads_errors(self, response): try: return json.loads(response.text).get("errors") except (UnicodeDecodeError, ValueError): return {"errors": response.content} class CommandBaseBackend: def __init__(self): self.session = None self.target = "" def get_session(self): return self.session def get_base_url(self): return self.target def get_serializer(self): return JsonSimpleSerializer() 07070100000018000081A400000000000000000000000166B7C6B900002E01000000000000000000000000000000000000002C00000000proxmoxer-2.1.0/proxmoxer/backends/https.py__author__ = "Oleg Butovich" __copyright__ = "(c) Oleg Butovich 2013-2017" __license__ = "MIT" import io import json import logging import os import platform import sys import time from shlex import split as shell_split from proxmoxer.core import SERVICES, AuthenticationError, config_failure logger = logging.getLogger(__name__) logger.setLevel(level=logging.WARNING) STREAMING_SIZE_THRESHOLD = 10 * 1024 * 1024 # 10 MiB SSL_OVERFLOW_THRESHOLD = 2147483135 # 2^31 - 1 - 512 try: import requests from requests.auth import AuthBase from requests.cookies import cookiejar_from_dict # Disable warnings about using untrusted TLS requests.packages.urllib3.disable_warnings() except ImportError: logger.error("Chosen backend requires 'requests' module\n") sys.exit(1) class ProxmoxHTTPAuthBase(AuthBase): def __call__(self, req): return req def get_cookies(self): return cookiejar_from_dict({}) def get_tokens(self): return None, None def __init__(self, timeout=5, service="PVE", verify_ssl=False): self.timeout = timeout self.service = service self.verify_ssl = verify_ssl class ProxmoxHTTPAuth(ProxmoxHTTPAuthBase): # number of seconds between renewing access tickets (must be less than 7200 to function correctly) # if calls are made less frequently than 2 hrs, using the API token auth is recommended renew_age = 3600 def __init__(self, username, password, otp=None, base_url="", **kwargs): super().__init__(**kwargs) self.base_url = base_url self.username = username self.pve_auth_ticket = "" self._get_new_tokens(password=password, otp=otp) def _get_new_tokens(self, password=None, otp=None): if password is None: # refresh from existing (unexpired) ticket password = self.pve_auth_ticket data = {"username": self.username, "password": password} if otp: data["otp"] = otp response_data = requests.post( self.base_url + "/access/ticket", verify=self.verify_ssl, timeout=self.timeout, data=data, ).json()["data"] if response_data is None: raise AuthenticationError( "Couldn't authenticate user: {0} to {1}".format( self.username, self.base_url + "/access/ticket" ) ) if response_data.get("NeedTFA") is not None: raise AuthenticationError( "Couldn't authenticate user: missing Two Factor Authentication (TFA)" ) self.birth_time = time.monotonic() self.pve_auth_ticket = response_data["ticket"] self.csrf_prevention_token = response_data["CSRFPreventionToken"] def get_cookies(self): return cookiejar_from_dict({self.service + "AuthCookie": self.pve_auth_ticket}) def get_tokens(self): return self.pve_auth_ticket, self.csrf_prevention_token def __call__(self, req): # refresh ticket if older than `renew_age` time_diff = time.monotonic() - self.birth_time if time_diff >= self.renew_age: logger.debug(f"refreshing ticket (age {time_diff})") self._get_new_tokens() # only attach CSRF token if needed (reduce interception risk) if req.method != "GET": req.headers["CSRFPreventionToken"] = self.csrf_prevention_token return req class ProxmoxHTTPApiTokenAuth(ProxmoxHTTPAuthBase): def __init__(self, username, token_name, token_value, **kwargs): super().__init__(**kwargs) self.username = username self.token_name = token_name self.token_value = token_value def __call__(self, req): req.headers["Authorization"] = "{0}APIToken={1}!{2}{3}{4}".format( self.service, self.username, self.token_name, SERVICES[self.service]["token_separator"], self.token_value, ) return req class JsonSerializer: content_types = [ "application/json", "application/x-javascript", "text/javascript", "text/x-javascript", "text/x-json", ] def get_accept_types(self): return ", ".join(self.content_types) def loads(self, response): try: return json.loads(response.content.decode("utf-8"))["data"] except (UnicodeDecodeError, ValueError): return {"errors": response.content} def loads_errors(self, response): try: return json.loads(response.text).get("errors") except (UnicodeDecodeError, ValueError): return {"errors": response.content} # pylint:disable=arguments-renamed class ProxmoxHttpSession(requests.Session): def request( self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None, serializer=None, ): a = auth or self.auth c = cookies or self.cookies # set verify flag from auth if request does not have this parameter explicitly if verify is None: verify = a.verify_ssl if timeout is None: timeout = a.timeout # pull cookies from auth if not present if (not c) and a: cookies = a.get_cookies() # filter out streams files = files or {} data = data or {} total_file_size = 0 for k, v in data.copy().items(): # split qemu exec commands for proper parsing by PVE (issue#89) if k == "command" and url.endswith("agent/exec"): if isinstance(v, list): data[k] = v elif "Windows" not in platform.platform(): data[k] = shell_split(v) if isinstance(v, io.IOBase): total_file_size += get_file_size(v) # add in filename from file pointer (patch for https://github.com/requests/toolbelt/pull/316) # add Content-Type since Proxmox requires it (https://bugzilla.proxmox.com/show_bug.cgi?id=4344) files[k] = (requests.utils.guess_filename(v), v, "application/octet-stream") del data[k] # if there are any large files, send all data and files using streaming multipart encoding if total_file_size > STREAMING_SIZE_THRESHOLD: try: # pylint:disable=import-outside-toplevel from requests_toolbelt import MultipartEncoder encoder = MultipartEncoder(fields={**data, **files}) data = encoder files = None headers = {"Content-Type": encoder.content_type} except ImportError: # if the files will cause issues with the SSL 2GiB limit (https://bugs.python.org/issue42853#msg384566) if total_file_size > SSL_OVERFLOW_THRESHOLD: logger.warning( "Install 'requests_toolbelt' to add support for files larger than 2GiB" ) raise OverflowError("Unable to upload a payload larger than 2 GiB") else: logger.info( "Installing 'requests_toolbelt' will decrease memory used during upload" ) return super().request( method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, ) class Backend: def __init__( self, host, user=None, password=None, otp=None, port=None, verify_ssl=True, mode="json", timeout=5, token_name=None, token_value=None, path_prefix=None, service="PVE", ): host_port = "" if len(host.split(":")) > 2: # IPv6 if host.startswith("["): if "]:" in host: host, host_port = host.rsplit(":", 1) else: host = f"[{host}]" elif ":" in host: host, host_port = host.split(":") port = host_port if host_port.isdigit() else port # if a port is not specified, use the default port for this service if not port: port = SERVICES[service]["default_port"] self.mode = mode if path_prefix is not None: self.base_url = f"https://{host}:{port}/{path_prefix}/api2/{mode}" else: self.base_url = f"https://{host}:{port}/api2/{mode}" if token_name is not None: if "token" not in SERVICES[service]["supported_https_auths"]: config_failure("{} does not support API Token authentication", service) self.auth = ProxmoxHTTPApiTokenAuth( user, token_name, token_value, verify_ssl=verify_ssl, timeout=timeout, service=service, ) elif password is not None: if "password" not in SERVICES[service]["supported_https_auths"]: config_failure("{} does not support password authentication", service) self.auth = ProxmoxHTTPAuth( user, password, otp, base_url=self.base_url, verify_ssl=verify_ssl, timeout=timeout, service=service, ) else: config_failure("No valid authentication credentials were supplied") def get_session(self): session = ProxmoxHttpSession() session.auth = self.auth # cookies are taken from the auth session.headers["Connection"] = "keep-alive" session.headers["accept"] = self.get_serializer().get_accept_types() return session def get_base_url(self): return self.base_url def get_serializer(self): assert self.mode == "json" return JsonSerializer() def get_tokens(self): """Return the in-use auth and csrf tokens if using user/password auth.""" return self.auth.get_tokens() def get_file_size(file_obj): """Returns the number of bytes in the given file object in total file cursor remains at the same location as when passed in :param fileObj: file object of which the get size :type fileObj: file object :return: total bytes in file object :rtype: int """ # store existing file cursor location starting_cursor = file_obj.tell() # seek to end of file file_obj.seek(0, os.SEEK_END) size = file_obj.tell() # reset cursor file_obj.seek(starting_cursor) return size def get_file_size_partial(file_obj): """Returns the number of bytes in the given file object from the current cursor to the end :param fileObj: file object of which the get size :type fileObj: file object :return: remaining bytes in file object :rtype: int """ # store existing file cursor location starting_cursor = file_obj.tell() file_obj.seek(0, os.SEEK_END) # get number of byte between where the cursor was set and the end size = file_obj.tell() - starting_cursor # reset cursor file_obj.seek(starting_cursor) return size 07070100000019000081A400000000000000000000000166B7C6B900000307000000000000000000000000000000000000002C00000000proxmoxer-2.1.0/proxmoxer/backends/local.py__author__ = "Markus Reiter" __copyright__ = "(c) Markus Reiter 2022" __license__ = "MIT" import shutil from subprocess import PIPE, Popen from proxmoxer.backends.command_base import CommandBaseBackend, CommandBaseSession class LocalSession(CommandBaseSession): def _exec(self, cmd): proc = Popen(cmd, stdout=PIPE, stderr=PIPE) stdout, stderr = proc.communicate(timeout=self.timeout) return stdout.decode(), stderr.decode() def upload_file_obj(self, file_obj, remote_path): with open(remote_path, "wb") as dest_fp: shutil.copyfileobj(file_obj, dest_fp) class Backend(CommandBaseBackend): def __init__(self, *args, **kwargs): self.session = LocalSession(*args, **kwargs) self.target = "localhost" 0707010000001A000081A400000000000000000000000166B7C6B9000006DD000000000000000000000000000000000000002E00000000proxmoxer-2.1.0/proxmoxer/backends/openssh.py__author__ = "Oleg Butovich" __copyright__ = "(c) Oleg Butovich 2013-2017" __license__ = "MIT" import logging from proxmoxer.backends.command_base import ( CommandBaseBackend, CommandBaseSession, shell_join, ) logger = logging.getLogger(__name__) logger.setLevel(level=logging.WARNING) try: import openssh_wrapper except ImportError: import sys logger.error("Chosen backend requires 'openssh_wrapper' module\n") sys.exit(1) class OpenSSHSession(CommandBaseSession): def __init__( self, host, user, config_file=None, port=22, identity_file=None, forward_ssh_agent=False, **kwargs, ): super().__init__(**kwargs) self.host = host self.user = user self.config_file = config_file self.port = port self.forward_ssh_agent = forward_ssh_agent self.identity_file = identity_file self.ssh_client = self._connect() def _connect(self): return openssh_wrapper.SSHConnection( self.host, login=self.user, port=str(self.port), # openssh_wrapper complains if this is an int configfile=self.config_file, identity_file=self.identity_file, timeout=self.timeout, ) def _exec(self, cmd): ret = self.ssh_client.run(shell_join(cmd), forward_ssh_agent=self.forward_ssh_agent) return ret.stdout, ret.stderr def upload_file_obj(self, file_obj, remote_path): self.ssh_client.scp((file_obj,), target=remote_path) class Backend(CommandBaseBackend): def __init__(self, *args, **kwargs): self.session = OpenSSHSession(*args, **kwargs) self.target = self.session.host 0707010000001B000081A400000000000000000000000166B7C6B900000845000000000000000000000000000000000000003300000000proxmoxer-2.1.0/proxmoxer/backends/ssh_paramiko.py__author__ = "Oleg Butovich" __copyright__ = "(c) Oleg Butovich 2013-2017" __license__ = "MIT" # spell-checker:ignore putfo import logging import os from proxmoxer.backends.command_base import ( CommandBaseBackend, CommandBaseSession, shell_join, ) logger = logging.getLogger(__name__) logger.setLevel(level=logging.WARNING) try: import paramiko except ImportError: import sys logger.error("Chosen backend requires 'paramiko' module\n") sys.exit(1) class SshParamikoSession(CommandBaseSession): def __init__(self, host, user, password=None, private_key_file=None, port=22, **kwargs): super().__init__(**kwargs) self.host = host self.user = user self.password = password self.private_key_file = private_key_file self.port = port self.ssh_client = self._connect() def _connect(self): ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.private_key_file: key_filename = os.path.expanduser(self.private_key_file) else: key_filename = None ssh_client.connect( self.host, username=self.user, allow_agent=(not self.password), look_for_keys=True, key_filename=key_filename, password=self.password, timeout=self.timeout, port=self.port, ) return ssh_client def _exec(self, cmd): session = self.ssh_client.get_transport().open_session() session.exec_command(shell_join(cmd)) stdout = session.makefile("rb", -1).read().decode() stderr = session.makefile_stderr("rb", -1).read().decode() return stdout, stderr def upload_file_obj(self, file_obj, remote_path): sftp = self.ssh_client.open_sftp() sftp.putfo(file_obj, remote_path) sftp.close() class Backend(CommandBaseBackend): def __init__(self, *args, **kwargs): self.session = SshParamikoSession(*args, **kwargs) self.target = self.session.host 0707010000001C000081A400000000000000000000000166B7C6B900001E8C000000000000000000000000000000000000002200000000proxmoxer-2.1.0/proxmoxer/core.py__author__ = "Oleg Butovich" __copyright__ = "(c) Oleg Butovich 2013-2017" __license__ = "MIT" # spell-checker:ignore urlunsplit import importlib import logging import posixpath from http import client as httplib from urllib import parse as urlparse logger = logging.getLogger(__name__) logger.setLevel(level=logging.WARNING) # https://metacpan.org/pod/AnyEvent::HTTP ANYEVENT_HTTP_STATUS_CODES = { 595: "Errors during connection establishment, proxy handshake", 596: "Errors during TLS negotiation, request sending and header processing", 597: "Errors during body receiving or processing", 598: "User aborted request via on_header or on_body", 599: "Other, usually nonretryable, errors (garbled URL etc.)", } SERVICES = { "PVE": { "supported_backends": ["local", "https", "openssh", "ssh_paramiko"], "supported_https_auths": ["password", "token"], "default_port": 8006, "token_separator": "=", "cli_additional_options": ["--output-format", "json"], }, "PMG": { "supported_backends": ["local", "https", "openssh", "ssh_paramiko"], "supported_https_auths": ["password"], "default_port": 8006, }, "PBS": { "supported_backends": ["https"], "supported_https_auths": ["password", "token"], "default_port": 8007, "token_separator": ":", }, } def config_failure(message, *args): raise NotImplementedError(message.format(*args)) class ResourceException(Exception): """ An Exception thrown when an Proxmox API call failed """ def __init__(self, status_code, status_message, content, errors=None): """ Create a new ResourceException :param status_code: The HTTP status code (faked by non-HTTP backends) :type status_code: int :param status_message: HTTP Status code (faked by non-HTTP backends) :type status_message: str :param content: Extended information on what went wrong :type content: str :param errors: Any specific errors that were encountered (converted to string), defaults to None :type errors: Optional[object], optional """ self.status_code = status_code self.status_message = status_message self.content = content self.errors = errors if errors is not None: content += f" - {errors}" message = f"{status_code} {status_message}: {content}".strip() super().__init__(message) class AuthenticationError(Exception): pass class ProxmoxResource: def __init__(self, **kwargs): self._store = kwargs def __repr__(self): return f"ProxmoxResource ({self._store.get('base_url')})" def __getattr__(self, item): if item.startswith("_"): raise AttributeError(item) kwargs = self._store.copy() kwargs["base_url"] = self.url_join(self._store["base_url"], item) return ProxmoxResource(**kwargs) def url_join(self, base, *args): scheme, netloc, path, query, fragment = urlparse.urlsplit(base) path = path if len(path) else "/" path = posixpath.join(path, *[str(x) for x in args]) return urlparse.urlunsplit([scheme, netloc, path, query, fragment]) def __call__(self, resource_id=None): if resource_id in (None, ""): return self if isinstance(resource_id, (bytes, str)): resource_id = resource_id.split("/") elif not isinstance(resource_id, (tuple, list)): resource_id = [str(resource_id)] kwargs = self._store.copy() if resource_id is not None: kwargs["base_url"] = self.url_join(self._store["base_url"], *resource_id) return ProxmoxResource(**kwargs) def _request(self, method, data=None, params=None): url = self._store["base_url"] if data: logger.info(f"{method} {url} {data}") else: logger.info(f"{method} {url}") # passing None values to pvesh command breaks it, let's remove them just as requests library does # helpful when dealing with function default values higher in the chain, no need to clean up in multiple places if params: # remove keys that are set to None params_none_keys = [k for (k, v) in params.items() if v is None] for key in params_none_keys: del params[key] if data: # remove keys that are set to None data_none_keys = [k for (k, v) in data.items() if v is None] for key in data_none_keys: del data[key] resp = self._store["session"].request(method, url, data=data, params=params) logger.debug(f"Status code: {resp.status_code}, output: {resp.content!r}") if resp.status_code >= 400: if hasattr(resp, "reason"): raise ResourceException( resp.status_code, httplib.responses.get( resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code) ), resp.reason, errors=(self._store["serializer"].loads_errors(resp)), ) else: raise ResourceException( resp.status_code, httplib.responses.get( resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code) ), resp.text, ) elif 200 <= resp.status_code <= 299: return self._store["serializer"].loads(resp) def get(self, *args, **params): return self(args)._request("GET", params=params) def post(self, *args, **data): return self(args)._request("POST", data=data) def put(self, *args, **data): return self(args)._request("PUT", data=data) def delete(self, *args, **params): return self(args)._request("DELETE", params=params) def create(self, *args, **data): return self.post(*args, **data) def set(self, *args, **data): return self.put(*args, **data) class ProxmoxAPI(ProxmoxResource): def __init__(self, host=None, backend="https", service="PVE", **kwargs): super().__init__(**kwargs) service = service.upper() backend = backend.lower() # throw error for unsupported services if service not in SERVICES.keys(): config_failure("{} service is not supported", service) # throw error for unsupported backend for service if backend not in SERVICES[service]["supported_backends"]: config_failure("{} service does not support {} backend", service, backend) if host is not None: if backend == "local": config_failure("{} backend does not support host keyword", backend) else: kwargs["host"] = host kwargs["service"] = service # load backend module self._backend = importlib.import_module(f".backends.{backend}", "proxmoxer").Backend( **kwargs ) self._backend_name = backend self._store = { "base_url": self._backend.get_base_url(), "session": self._backend.get_session(), "serializer": self._backend.get_serializer(), } def __repr__(self): return f"ProxmoxAPI ({self._backend_name} backend for {self._store['base_url']})" def get_tokens(self): """Return the auth and csrf tokens. Returns (None, None) if the backend is not https using password authentication. """ if self._backend_name != "https": return None, None return self._backend.get_tokens() 0707010000001D000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000002000000000proxmoxer-2.1.0/proxmoxer/tools0707010000001E000081A400000000000000000000000166B7C6B9000000D0000000000000000000000000000000000000002C00000000proxmoxer-2.1.0/proxmoxer/tools/__init__.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" from . import * # noqa: F401 F403 from .files import * # noqa: F401 F403 from .tasks import * # noqa: F401 F403 0707010000001F000081A400000000000000000000000166B7C6B9000028DA000000000000000000000000000000000000002900000000proxmoxer-2.1.0/proxmoxer/tools/files.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2023" __license__ = "MIT" import hashlib import logging import os import sys from enum import Enum from pathlib import Path from typing import Optional from urllib.parse import urljoin, urlparse from proxmoxer import ProxmoxResource, ResourceException from proxmoxer.tools.tasks import Tasks CHECKSUM_CHUNK_SIZE = 16384 # read 16k at a time while calculating the checksum for upload logger = logging.getLogger(__name__) logger.setLevel(level=logging.WARNING) try: import requests except ImportError: logger.error("Files tools requires 'requests' module\n") sys.exit(1) class ChecksumInfo: def __init__(self, name: str, hex_size: int): self.name = name self.hex_size = hex_size def __str__(self): return self.name def __repr__(self): return f"{self.name} ({self.hex_size} digits)" class SupportedChecksums(Enum): """ An Enum of the checksum types supported by Proxmox """ # ordered by preference for longer/stronger checksums first SHA512 = ChecksumInfo("sha512", 128) SHA256 = ChecksumInfo("sha256", 64) SHA224 = ChecksumInfo("sha224", 56) SHA384 = ChecksumInfo("sha384", 96) MD5 = ChecksumInfo("md5", 32) SHA1 = ChecksumInfo("sha1", 40) class Files: """ Ease-of-use tools for interacting with the uploading/downloading files in Proxmox VE """ def __init__(self, prox: ProxmoxResource, node: str, storage: str): self._prox = prox self._node = node self._storage = storage def __repr__(self): return f"Files ({self._node}/{self._storage} at {self._prox})" def upload_local_file_to_storage( self, filename: str, do_checksum_check: bool = True, blocking_status: bool = True, ): file_path = Path(filename) if not file_path.is_file(): logger.error(f'"{file_path.absolute()}" does not exist or is not a file') return None # init to None in case errors cause no values to be set upid: str = "" checksum: str = None checksum_type: str = None try: with open(file_path.absolute(), "rb") as f_obj: if do_checksum_check: # iterate through SupportedChecksums and find the first one in hashlib.algorithms_available for checksum_info in (v.value for v in SupportedChecksums): if checksum_info.name in hashlib.algorithms_available: checksum_type = checksum_info.name break if checksum_type is None: logger.warning( "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation" ) else: h = hashlib.new(checksum_type) # Iterate through the file in CHECKSUM_CHUNK_SIZE size for byte_block in iter(lambda: f_obj.read(CHECKSUM_CHUNK_SIZE), b""): h.update(byte_block) checksum = h.hexdigest() logger.debug( f"The {checksum_type} checksum of {file_path.absolute()} is {checksum}" ) # reset to the start of the file so the upload can use the same file handle f_obj.seek(0) params = { "content": "iso" if file_path.absolute().name.endswith("iso") else "vztmpl", "checksum-algorithm": checksum_type, "checksum": checksum, "filename": f_obj, } upid = self._prox.nodes(self._node).storage(self._storage).upload.post(**params) except OSError as e: logger.error(e) return None if blocking_status: return Tasks.blocking_status(self._prox, upid) else: return self._prox.nodes(self._node).tasks(upid).status.get() def download_file_to_storage( self, url: str, checksum: Optional[str] = None, checksum_type: Optional[str] = None, blocking_status: bool = True, ): file_info = self.get_file_info(url) filename = None if file_info is not None: filename = file_info.get("filename") if checksum is None and checksum_type is None: checksum, checksum_info = self.get_checksums_from_file_url(url, filename) checksum_type = checksum_info.name if checksum_info else None elif checksum is None or checksum_type is None: logger.error( "Must pass both checksum and checksum_type or leave both None for auto-discovery" ) return None if checksum is None or checksum_type is None: logger.warning("Unable to discover checksum. Will not do checksum validation") params = { "checksum-algorithm": checksum_type, "url": url, "checksum": checksum, "content": "iso" if url.endswith("iso") else "vztmpl", "filename": filename, } upid = self._prox.nodes(self._node).storage(self._storage)("download-url").post(**params) if blocking_status: return Tasks.blocking_status(self._prox, upid) else: return self._prox.nodes(self._node).tasks(upid).status.get() def get_file_info(self, url: str): try: return self._prox.nodes(self._node)("query-url-metadata").get(url=url) except ResourceException as e: logger.warning(f"Unable to get information for {url}: {e}") return None @staticmethod def get_checksums_from_file_url( url: str, filename: str = None, preferred_type=SupportedChecksums.SHA512.value ): getters_by_quality = [ Files._get_checksum_from_sibling_file, Files._get_checksum_from_extension, Files._get_checksum_from_extension_upper, ] # hacky way to try the preferred_type first while still trying all types with no duplicates all_types_with_priority = list( dict.fromkeys([preferred_type, *(map(lambda t: t.value, SupportedChecksums))]) ) for c_info in all_types_with_priority: for getter in getters_by_quality: checksum: str = getter(url, c_info, filename) if checksum is not None: logger.info(f"{getter} found {str(c_info)} checksum {checksum}") return (checksum, c_info) else: logger.debug(f"{getter} found no {str(c_info)} checksum") return (None, None) @staticmethod def _get_checksum_from_sibling_file( url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None ) -> Optional[str]: """ Uses a checksum file in the same path as the target file to discover the checksum :param url: the URL string of the target file :type url: str :param checksum_info: the type of checksum to search for :type checksum_info: ChecksumInfo :param filename: the filename to use for finding the checksum. If None, it will be discovered from the url :type filename: str | None :return: a string of the checksum if found, else None :rtype: str | None """ sumfile_url = urljoin(url, (checksum_info.name + "SUMS").upper()) filename = filename or os.path.basename(urlparse(url).path) return Files._get_checksum_helper(sumfile_url, filename, checksum_info) @staticmethod def _get_checksum_from_extension( url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None ) -> Optional[str]: """ Uses a checksum file with a checksum extension added to the target file to discover the checksum :param url: the URL string of the target file :type url: str :param checksum_info: the type of checksum to search for :type checksum_info: ChecksumInfo :param filename: the filename to use for finding the checksum. If None, it will be discovered from the url :type filename: str | None :return: a string of the checksum if found, else None :rtype: str | None """ sumfile_url = url + "." + checksum_info.name filename = filename or os.path.basename(urlparse(url).path) return Files._get_checksum_helper(sumfile_url, filename, checksum_info) @staticmethod def _get_checksum_from_extension_upper( url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None ) -> Optional[str]: """ Uses a checksum file with a checksum extension added to the target file to discover the checksum :param url: the URL string of the target file :type url: str :param checksum_info: the type of checksum to search for :type checksum_info: ChecksumInfo :param filename: the filename to use for finding the checksum. If None, it will be discovered from the url :type filename: str | None :return: a string of the checksum if found, else None :rtype: str | None """ sumfile_url = url + "." + checksum_info.name.upper() filename = filename or os.path.basename(urlparse(url).path) return Files._get_checksum_helper(sumfile_url, filename, checksum_info) @staticmethod def _get_checksum_helper(sumfile_url: str, filename: str, checksum_info: ChecksumInfo): logger.debug(f"getting {sumfile_url}") try: resp = requests.get(sumfile_url, timeout=10) except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout): logger.info(f"Failed when trying to get {sumfile_url}") return None if resp.status_code == 200: for line in resp.iter_lines(): line_str = line.decode("utf-8") logger.debug(f"checking for '{filename}' in '{line_str}'") if filename in str(line_str): return line_str[0 : checksum_info.hex_size] return None 07070100000020000081A400000000000000000000000166B7C6B900000A9D000000000000000000000000000000000000002900000000proxmoxer-2.1.0/proxmoxer/tools/tasks.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import time class Tasks: """ Ease-of-use tools for interacting with the tasks endpoints in the Proxmox API. """ @staticmethod def blocking_status(prox, task_id, timeout=300, polling_interval=1): """ Turns getting the status of a Proxmox task into a blocking call by polling the API until the task completes :param prox: The Proxmox object used to query for status :type prox: ProxmoxAPI :param task_id: the UPID of the task :type task_id: str :param timeout: If the task does not complete in this time (in seconds) return None, defaults to 300 :type timeout: int, optional :param polling_interval: the time to wait between checking for status updates, defaults to 1 :type polling_interval: float, optional :return: the status of the task :rtype: dict """ node: str = Tasks.decode_upid(task_id)["node"] start_time: float = time.monotonic() data = {"status": ""} while data["status"] != "stopped": data = prox.nodes(node).tasks(task_id).status.get() if start_time + timeout <= time.monotonic(): data = None # type: ignore break time.sleep(polling_interval) return data @staticmethod def decode_upid(upid): """ Decodes the sections of a UPID into separate fields :param upid: a UPID string :type upid: str :return: The decoded information from the UPID :rtype: dict """ segments = upid.split(":") if segments[0] != "UPID" or len(segments) != 9: raise AssertionError("UPID is not in the correct format") data = { "upid": upid, "node": segments[1], "pid": int(segments[2], 16), "pstart": int(segments[3], 16), "starttime": int(segments[4], 16), "type": segments[5], "id": segments[6], "user": segments[7].split("!")[0], "comment": segments[8], } return data @staticmethod def decode_log(log_list): """ Takes in a task's log data and returns a multiline string representation :param log_list: The log formatting returned by the Proxmox API :type log_list: list of dicts :return: a multiline string of the log :rtype: str """ str_list = [""] * len(log_list) for line in log_list: str_list[line["n"] - 1] = line.get("t", "") return "\n".join(str_list) 07070100000021000081A400000000000000000000000166B7C6B900000039000000000000000000000000000000000000001F00000000proxmoxer-2.1.0/pyproject.toml[tool.black] line-length = 100 target-version = ['py37'] 07070100000022000081A400000000000000000000000166B7C6B9000001D2000000000000000000000000000000000000001A00000000proxmoxer-2.1.0/setup.cfg[pylint] max-line-length = 100 # allow single letter variables variable-rgx = [a-z0-9_]{1,30}$ [pylint.messages_control] # let black handle line length # ignore some pytohn3 only features (f-strings) disable = C0330, C0326, C0114, line-too-long, missing-function-docstring, consider-using-f-string,missing-class-docstring [flake8] max-line-length = 100 extend-ignore = E203, E501, F811 exclude = .git,__pycache__,old,build,dist,*.egg-info [isort] profile = black 07070100000023000081A400000000000000000000000166B7C6B9000007BF000000000000000000000000000000000000001900000000proxmoxer-2.1.0/setup.py#!/usr/bin/env python import codecs import os import re import sys from setuptools import setup from proxmoxer import __version__ as proxmoxer_version if not os.path.exists("README.txt") and "sdist" in sys.argv: with codecs.open("README.rst", encoding="utf8") as f: rst = f.read() code_block = r"(:\n\n)?\.\. code-block::.*" rst = re.sub(code_block, "::", rst) with codecs.open("README.txt", encoding="utf8", mode="wb") as f: f.write(rst) try: readme = "README.txt" if os.path.exists("README.txt") else "README.rst" long_description = codecs.open(readme, encoding="utf-8").read() except IOError: long_description = "Could not read README.txt" setup( name="proxmoxer", version=proxmoxer_version, description="Python Wrapper for the Proxmox 2.x API (HTTP and SSH)", author="Oleg Butovich", author_email="obutovich@gmail.com", license="MIT", url="https://proxmoxer.github.io/docs/", download_url="http://pypi.python.org/pypi/proxmoxer", keywords=["proxmox", "api"], packages=["proxmoxer", "proxmoxer.backends", "proxmoxer.tools"], classifiers=[ # http://pypi.python.org/pypi?%3Aaction=list_classifiers "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "Intended Audience :: System Administrators", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3", "Programming Language :: Python", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: System :: Clustering", "Topic :: System :: Monitoring", "Topic :: System :: Systems Administration", ], long_description=long_description, long_description_content_type="text/x-rst", ) 07070100000024000081A400000000000000000000000166B7C6B9000000A0000000000000000000000000000000000000002600000000proxmoxer-2.1.0/test_requirements.txt# required libraries for full functionality openssh_wrapper paramiko requests requests_toolbelt # used by test framework coveralls pytest pytest-cov responses 07070100000025000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000001600000000proxmoxer-2.1.0/tests07070100000026000081A400000000000000000000000166B7C6B90000005C000000000000000000000000000000000000002200000000proxmoxer-2.1.0/tests/__init__.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" 07070100000027000081A400000000000000000000000166B7C6B900002E22000000000000000000000000000000000000002200000000proxmoxer-2.1.0/tests/api_mock.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import json import re from urllib.parse import parse_qsl, urlparse import pytest import responses from requests_toolbelt import MultipartEncoder @pytest.fixture() def mock_pve(): with responses.RequestsMock(registry=PVERegistry, assert_all_requests_are_fired=False) as rsps: yield rsps class PVERegistry(responses.registries.FirstMatchRegistry): base_url = "https://1.2.3.4:1234/api2/json" common_headers = { "Cache-Control": "max-age=0", "Connection": "close, Keep-Alive", "Pragma": "no-cache", "Server": "pve-api-daemon/3.0", "Content-Type": "application/json;charset=UTF-8", } def __init__(self): super().__init__() for resp in self._generate_static_responses(): self.add(resp) for resp in self._generate_dynamic_responses(): self.add(resp) def _generate_static_responses(self): resps = [] # Basic GET requests resps.append( responses.Response( method="GET", url=self.base_url + "/version", json={"data": {"version": "7.2-3", "release": "7.2", "repoid": "c743d6c1"}}, ) ) resps.append( responses.Response( method="POST", url=re.compile(self.base_url + r"/nodes/[^/]+/storage/[^/]+/download-url"), # "done" added to UPID so polling will terminate (status checking is tested elsewhere) json={ "data": "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done", "success": 1, }, ) ) resps.append( responses.Response( method="POST", url=re.compile(self.base_url + r"/nodes/[^/]+/storage/storage1/upload"), # "done" added to UPID so polling will terminate (status checking is tested elsewhere) json={"data": "UPID:node:0017C594:0ADB2769:63EC5455:imgcopy::root@pam:done"}, ) ) resps.append( responses.Response( method="POST", url=re.compile(self.base_url + r"/nodes/[^/]+/storage/missing/upload"), status=500, body="storage 'missing' does not exist", ) ) return resps def _generate_dynamic_responses(self): resps = [] # Authentication resps.append( responses.CallbackResponse( method="POST", url=self.base_url + "/access/ticket", callback=self._cb_password_auth, ) ) # Session testing resps.append( responses.CallbackResponse( method="GET", url=self.base_url + "/fake/echo", callback=self._cb_echo, ) ) resps.append( responses.CallbackResponse( method="GET", url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/agent/exec"), callback=self._cb_echo, ) ) resps.append( responses.CallbackResponse( method="GET", url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/monitor"), callback=self._cb_qemu_monitor, ) ) resps.append( responses.CallbackResponse( method="GET", url=re.compile(self.base_url + r"/nodes/[^/]+/tasks/[^/]+/status"), callback=self._cb_task_status, ) ) resps.append( responses.CallbackResponse( method="GET", url=re.compile(self.base_url + r"/nodes/[^/]+/query-url-metadata.*"), callback=self._cb_url_metadata, ) ) return resps ################################### # Callbacks for Dynamic Responses # ################################### def _cb_echo(self, request): body = request.body if body is not None: if isinstance(body, MultipartEncoder): body = body.to_string() # really, to byte string body = body if isinstance(body, str) else str(body, "utf-8") resp = { "method": request.method, "url": request.url, "headers": dict(request.headers), "cookies": request._cookies.get_dict(), "body": body, # "body_json": dict(parse_qsl(request.body)), } return (200, self.common_headers, json.dumps(resp)) def _cb_password_auth(self, request): form_data_dict = dict(parse_qsl(request.body)) # if this user should not be authenticated if form_data_dict.get("username") == "bad_auth": return ( 401, self.common_headers, json.dumps({"data": None}), ) # if this user requires OTP and it is not included if form_data_dict.get("username") == "otp" and form_data_dict.get("otp") is None: return ( 200, self.common_headers, json.dumps( { "data": { "ticket": "otp_ticket", "CSRFPreventionToken": "CSRFPreventionToken", "NeedTFA": 1, } } ), ) # if this is the first ticket if form_data_dict.get("password") != "ticket": return ( 200, self.common_headers, json.dumps( {"data": {"ticket": "ticket", "CSRFPreventionToken": "CSRFPreventionToken"}} ), ) # if this is refreshing the ticket, return new ticket else: return ( 200, self.common_headers, json.dumps( { "data": { "ticket": "new_ticket", "CSRFPreventionToken": "CSRFPreventionToken_2", } } ), ) def _cb_task_status(self, request): resp = {} if "keep-running" in request.url: resp = { "data": { "id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", } } elif "stopped" in request.url: resp = { "data": { "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "interrupted by signal", "pid": 1044989, "id": "110", "node": "node1", } } elif "done" in request.url: resp = { "data": { "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1", } } elif "comment" in request.url: resp = { "data": { "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", "node": "node", "pid": 0, "pstart": 0, "starttime": 0, "type": "task", "id": "id", "user": "root@pam", "status": "stopped", "exitstatus": "OK", } } return (200, self.common_headers, json.dumps(resp)) def _cb_url_metadata(self, request): form_data_dict = dict(parse_qsl((urlparse(request.url)).query)) if "file.iso" in form_data_dict.get("url", ""): return ( 200, self.common_headers, json.dumps( { "data": { "size": 123456, "filename": "file.iso", "mimetype": "application/x-iso9660-image", # "mimetype": "application/octet-stream", }, "success": 1, } ), ) elif "invalid.iso" in form_data_dict.get("url", ""): return ( 500, self.common_headers, json.dumps( { "status": 500, "message": "invalid server response: '500 Can't connect to sub.domain.tld:443 (certificate verify failed)'\n", "success": 0, "data": None, } ), ) elif "missing.iso" in form_data_dict.get("url", ""): return ( 500, self.common_headers, json.dumps( { "status": 500, "success": 0, "message": "invalid server response: '404 Not Found'\n", "data": None, } ), ) elif "index.html" in form_data_dict.get("url", ""): return ( 200, self.common_headers, json.dumps( { "success": 1, "data": {"filename": "index.html", "mimetype": "text/html", "size": 17664}, } ), ) def _cb_qemu_monitor(self, request): body = request.body if body is not None: body = body if isinstance(body, str) else str(body, "utf-8") # if the command is an array, throw the type error PVE would throw if "&" in body: return ( 400, self.common_headers, json.dumps( { "data": None, "errors": {"command": "type check ('string') failed - got ARRAY"}, } ), ) else: resp = { "method": request.method, "url": request.url, "headers": dict(request.headers), "cookies": request._cookies.get_dict(), "body": body, # "body_json": dict(parse_qsl(request.body)), } print(resp) return (200, self.common_headers, json.dumps(resp)) 07070100000028000081A400000000000000000000000166B7C6B900000E5A000000000000000000000000000000000000002400000000proxmoxer-2.1.0/tests/files_mock.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import re import pytest import responses from requests import exceptions from .api_mock import PVERegistry @pytest.fixture() def mock_files(): with responses.RequestsMock( registry=FilesRegistry, assert_all_requests_are_fired=False ) as rsps: yield rsps class FilesRegistry(responses.registries.FirstMatchRegistry): base_url = "https://sub.domain.tld" common_headers = { "Cache-Control": "max-age=0", "Connection": "close, Keep-Alive", "Pragma": "no-cache", "Server": "pve-api-daemon/3.0", "Content-Type": "application/json;charset=UTF-8", } def __init__(self): super().__init__() for resp in self._generate_static_responses(): self.add(resp) def _generate_static_responses(self): resps = [] # Basic GET requests resps.append(responses.Response(method="GET", url=self.base_url, body="hello world")) resps.append( responses.Response(method="GET", url=self.base_url + "/file.iso", body="CONTENTS") ) # sibling resps.append( responses.Response( method="GET", url=self.base_url + "/sibling/file.iso", body="CONTENTS\n" ) ) resps.append( responses.Response( method="GET", url=self.base_url + "/sibling/TESTINGSUMS", body="this_is_the_hash file.iso", ) ) # extension resps.append( responses.Response( method="GET", url=self.base_url + "/extension/file.iso", body="CONTENTS\n" ) ) resps.append( responses.Response( method="GET", url=self.base_url + "/extension/file.iso.testing", body="this_is_the_hash file.iso", ) ) resps.append( responses.Response( method="GET", url=self.base_url + "/extension/connectionerror.iso.testing", body=exceptions.ConnectionError(), ) ) resps.append( responses.Response( method="GET", url=self.base_url + "/extension/readtimeout.iso.testing", body=exceptions.ReadTimeout(), ) ) # extension upper resps.append( responses.Response( method="GET", url=self.base_url + "/upper/file.iso", body="CONTENTS\n" ) ) resps.append( responses.Response( method="GET", url=self.base_url + "/upper/file.iso.TESTING", body="this_is_the_hash file.iso", ) ) resps.append( responses.Response( method="GET", url=re.compile(self.base_url + r"/checksums/file.iso.\w+"), body="1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 file.iso", ) ) return resps @pytest.fixture() def mock_files_and_pve(): with responses.RequestsMock(registry=BothRegistry, assert_all_requests_are_fired=False) as rsps: yield rsps class BothRegistry(responses.registries.FirstMatchRegistry): def __init__(self): super().__init__() registries = [FilesRegistry(), PVERegistry()] for reg in registries: for resp in reg.registered: self.add(resp) 07070100000029000081A400000000000000000000000166B7C6B900004215000000000000000000000000000000000000002800000000proxmoxer-2.1.0/tests/known_issues.json{ "errors": [], "generated_at": "2022-08-25T03:08:48Z", "metrics": { "_totals": { "CONFIDENCE.HIGH": 3, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 11, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 3, "SEVERITY.MEDIUM": 11, "SEVERITY.UNDEFINED": 0, "loc": 1947, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/__init__.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 5, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/backends/__init__.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 3, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/backends/command_base.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 115, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/backends/https.py": { "CONFIDENCE.HIGH": 1, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 1, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 286, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/backends/local.py": { "CONFIDENCE.HIGH": 2, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 2, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 14, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/backends/openssh.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 53, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/backends/ssh_paramiko.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 1, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 1, "SEVERITY.UNDEFINED": 0, "loc": 58, "nosec": 0, "skipped_tests": 0 }, "proxmoxer/core.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 155, "nosec": 0, "skipped_tests": 0 }, "tests/api_mock.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 108, "nosec": 0, "skipped_tests": 0 }, "tests/test_command_base.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 2, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 2, "SEVERITY.UNDEFINED": 0, "loc": 195, "nosec": 0, "skipped_tests": 0 }, "tests/test_core.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 241, "nosec": 0, "skipped_tests": 0 }, "tests/test_https.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 362, "nosec": 0, "skipped_tests": 0 }, "tests/test_https_helpers.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 62, "nosec": 0, "skipped_tests": 0 }, "tests/test_imports.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 80, "nosec": 0, "skipped_tests": 0 }, "tests/test_local.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 0, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 0, "SEVERITY.UNDEFINED": 0, "loc": 35, "nosec": 0, "skipped_tests": 0 }, "tests/test_openssh.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 2, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 2, "SEVERITY.UNDEFINED": 0, "loc": 62, "nosec": 0, "skipped_tests": 0 }, "tests/test_paramiko.py": { "CONFIDENCE.HIGH": 0, "CONFIDENCE.LOW": 0, "CONFIDENCE.MEDIUM": 6, "CONFIDENCE.UNDEFINED": 0, "SEVERITY.HIGH": 0, "SEVERITY.LOW": 0, "SEVERITY.MEDIUM": 6, "SEVERITY.UNDEFINED": 0, "loc": 113, "nosec": 0, "skipped_tests": 0 } }, "results": [ { "code": "332 def get_serializer(self):\n333 assert self.mode == \"json\"\n334 return JsonSerializer()\n", "col_offset": 8, "filename": "proxmoxer/backends/https.py", "issue_confidence": "HIGH", "issue_cwe": { "id": 703, "link": "https://cwe.mitre.org/data/definitions/703.html" }, "issue_severity": "LOW", "issue_text": "Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.", "line_number": 333, "line_range": [ 333 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b101_assert_used.html", "test_id": "B101", "test_name": "assert_used" }, { "code": "1 import shutil\n2 from subprocess import PIPE, Popen\n3 \n4 from proxmoxer.backends.command_base import CommandBaseBackend, CommandBaseSession\n", "col_offset": 0, "filename": "proxmoxer/backends/local.py", "issue_confidence": "HIGH", "issue_cwe": { "id": 78, "link": "https://cwe.mitre.org/data/definitions/78.html" }, "issue_severity": "LOW", "issue_text": "Consider possible security implications associated with the subprocess module.", "line_number": 2, "line_range": [ 2, 3 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/blacklists/blacklist_imports.html#b404-import-subprocess", "test_id": "B404", "test_name": "blacklist" }, { "code": "8 def _exec(self, cmd):\n9 proc = Popen(cmd, stdout=PIPE, stderr=PIPE)\n10 stdout, stderr = proc.communicate(timeout=self.timeout)\n", "col_offset": 15, "filename": "proxmoxer/backends/local.py", "issue_confidence": "HIGH", "issue_cwe": { "id": 78, "link": "https://cwe.mitre.org/data/definitions/78.html" }, "issue_severity": "LOW", "issue_text": "subprocess call - check for execution of untrusted input.", "line_number": 9, "line_range": [ 9 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b603_subprocess_without_shell_equals_true.html", "test_id": "B603", "test_name": "subprocess_without_shell_equals_true" }, { "code": "62 session = self.ssh_client.get_transport().open_session()\n63 session.exec_command(shell_join(cmd))\n64 stdout = session.makefile(\"rb\", -1).read().decode()\n", "col_offset": 8, "filename": "proxmoxer/backends/ssh_paramiko.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 78, "link": "https://cwe.mitre.org/data/definitions/78.html" }, "issue_severity": "MEDIUM", "issue_text": "Possible shell injection via Paramiko call, check inputs are properly sanitized.", "line_number": 63, "line_range": [ 63 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b601_paramiko_calls.html", "test_id": "B601", "test_name": "paramiko_calls" }, { "code": "39 with pytest.raises(NotImplementedError), tempfile.TemporaryFile(\"w+b\") as f_obj:\n40 self._session.upload_file_obj(f_obj, \"/tmp/file.iso\")\n41 \n", "col_offset": 49, "filename": "tests/test_command_base.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 40, "line_range": [ 40 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "160 \"-tmpfilename\",\n161 \"/tmp/tmpasdfasdf\",\n162 \"--output-format\",\n163 \"json\",\n164 ]\n165 \n166 \n167 class TestJsonSimpleSerializer:\n168 _serializer = command_base.JsonSimpleSerializer()\n169 \n170 def test_loads_pass(self):\n171 input_str = '{\"key1\": \"value1\", \"key2\": \"value2\"}'\n172 exp_output = {\"key1\": \"value1\", \"key2\": \"value2\"}\n173 \n", "col_offset": 16, "filename": "tests/test_command_base.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 161, "line_range": [ 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "61 with tempfile.NamedTemporaryFile(\"r\") as f_obj:\n62 mock_session.upload_file_obj(f_obj, \"/tmp/file\")\n63 \n", "col_offset": 48, "filename": "tests/test_openssh.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 62, "line_range": [ 62 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "65 (f_obj,),\n66 target=\"/tmp/file\",\n67 )\n", "col_offset": 23, "filename": "tests/test_openssh.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 66, "line_range": [ 66 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "23 sess = ssh_paramiko.SshParamikoSession(\n24 \"host\", \"user\", password=\"password\", private_key_file=\"/tmp/key_file\", port=1234\n25 )\n", "col_offset": 66, "filename": "tests/test_paramiko.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 24, "line_range": [ 24 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "29 assert sess.password == \"password\"\n30 assert sess.private_key_file == \"/tmp/key_file\"\n31 assert sess.port == 1234\n", "col_offset": 40, "filename": "tests/test_paramiko.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 30, "line_range": [ 30 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "55 sess = ssh_paramiko.SshParamikoSession(\n56 \"host\", \"user\", password=\"password\", private_key_file=\"/tmp/key_file\", port=1234\n57 )\n", "col_offset": 66, "filename": "tests/test_paramiko.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 56, "line_range": [ 56 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "63 look_for_keys=True,\n64 key_filename=\"/tmp/key_file\",\n65 password=\"password\",\n", "col_offset": 25, "filename": "tests/test_paramiko.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 64, "line_range": [ 64 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "110 with tempfile.NamedTemporaryFile(\"r\") as f_obj:\n111 sess.upload_file_obj(f_obj, \"/tmp/file\")\n112 \n", "col_offset": 40, "filename": "tests/test_paramiko.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 111, "line_range": [ 111 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" }, { "code": "112 \n113 mock_sftp.putfo.assert_called_once_with(f_obj, \"/tmp/file\")\n114 \n", "col_offset": 59, "filename": "tests/test_paramiko.py", "issue_confidence": "MEDIUM", "issue_cwe": { "id": 377, "link": "https://cwe.mitre.org/data/definitions/377.html" }, "issue_severity": "MEDIUM", "issue_text": "Probable insecure usage of temp file/directory.", "line_number": 113, "line_range": [ 113 ], "more_info": "https://bandit.readthedocs.io/en/1.7.4/plugins/b108_hardcoded_tmp_directory.html", "test_id": "B108", "test_name": "hardcoded_tmp_directory" } ] }0707010000002A000081A400000000000000000000000166B7C6B900001E2C000000000000000000000000000000000000002B00000000proxmoxer-2.1.0/tests/test_command_base.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import tempfile from unittest import mock import pytest from proxmoxer.backends import command_base from .api_mock import PVERegistry # pylint: disable=no-self-use class TestResponse: def test_init_all_args(self): resp = command_base.Response(b"content", 200) assert resp.content == b"content" assert resp.text == "b'content'" assert resp.status_code == 200 assert resp.headers == {"content-type": "application/json"} assert str(resp) == "Response (200) b'content'" class TestCommandBaseSession: base_url = PVERegistry.base_url _session = command_base.CommandBaseSession() def test_init_all_args(self): sess = command_base.CommandBaseSession(service="SERVICE", timeout=10, sudo=True) assert sess.service == "service" assert sess.timeout == 10 assert sess.sudo is True def test_exec(self): with pytest.raises(NotImplementedError): self._session._exec("command") def test_upload_file_obj(self): with pytest.raises(NotImplementedError), tempfile.TemporaryFile("w+b") as f_obj: self._session.upload_file_obj(f_obj, "/tmp/file.iso") def test_request_basic(self, mock_exec): resp = self._session.request("GET", self.base_url + "/fake/echo") assert resp.status_code == 200 assert resp.content == [ "pvesh", "get", self.base_url + "/fake/echo", "--output-format", "json", ] def test_request_error(self, mock_exec_err): resp = self._session.request( "GET", self.base_url + "/fake/echo", data={"thing": "403 Unauthorized"} ) assert resp.status_code == 403 assert ( resp.content == "pvesh\nget\nhttps://1.2.3.4:1234/api2/json/fake/echo\n-thing\n403 Unauthorized\n--output-format\njson" ) def test_request_error_generic(self, mock_exec_err): resp = self._session.request("GET", self.base_url + "/fake/echo", data={"thing": "failure"}) assert resp.status_code == 500 assert ( resp.content == "pvesh\nget\nhttps://1.2.3.4:1234/api2/json/fake/echo\n-thing\nfailure\n--output-format\njson" ) def test_request_sudo(self, mock_exec): resp = command_base.CommandBaseSession(sudo=True).request( "GET", self.base_url + "/fake/echo" ) assert resp.status_code == 200 assert resp.content == [ "sudo", "pvesh", "get", self.base_url + "/fake/echo", "--output-format", "json", ] def test_request_data(self, mock_exec): resp = self._session.request("GET", self.base_url + "/fake/echo", data={"key": "value"}) assert resp.status_code == 200 assert resp.content == [ "pvesh", "get", self.base_url + "/fake/echo", "-key", "value", "--output-format", "json", ] def test_request_qemu_exec(self, mock_exec): resp = self._session.request( "POST", self.base_url + "/node/node1/qemu/100/agent/exec", data={"command": "echo 'hello world'"}, ) assert resp.status_code == 200 assert resp.content == [ "pvesh", "create", self.base_url + "/node/node1/qemu/100/agent/exec", "-command", "echo", "-command", "hello world", "--output-format", "json", ] def test_request_qemu_exec_list(self, mock_exec): resp = self._session.request( "POST", self.base_url + "/node/node1/qemu/100/agent/exec", data={"command": ["echo", "hello world"]}, ) assert resp.status_code == 200 assert resp.content == [ "pvesh", "create", self.base_url + "/node/node1/qemu/100/agent/exec", "-command", "echo", "-command", "hello world", "--output-format", "json", ] def test_request_upload(self, mock_exec, mock_upload_file_obj): with tempfile.NamedTemporaryFile("w+b") as f_obj: resp = self._session.request( "POST", self.base_url + "/node/node1/storage/local/upload", data={"content": "iso", "filename": f_obj}, ) assert resp.status_code == 200 assert resp.content == [ "pvesh", "create", self.base_url + "/node/node1/storage/local/upload", "-content", "iso", "-filename", str(f_obj.name), "-tmpfilename", "/tmp/tmpasdfasdf", "--output-format", "json", ] class TestJsonSimpleSerializer: _serializer = command_base.JsonSimpleSerializer() def test_loads_pass(self): input_str = '{"key1": "value1", "key2": "value2"}' exp_output = {"key1": "value1", "key2": "value2"} response = command_base.Response(input_str.encode("utf-8"), 200) act_output = self._serializer.loads(response) assert act_output == exp_output def test_loads_not_json(self): input_str = "There was an error with the request" exp_output = {"errors": b"There was an error with the request"} response = command_base.Response(input_str.encode("utf-8"), 200) act_output = self._serializer.loads(response) assert act_output == exp_output def test_loads_not_unicode(self): input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}\x80' exp_output = {"errors": input_str.encode("utf-8")} response = command_base.Response(input_str.encode("utf-8"), 200) act_output = self._serializer.loads(response) assert act_output == exp_output class TestCommandBaseBackend: backend = command_base.CommandBaseBackend() sess = command_base.CommandBaseSession() backend.session = sess def test_init(self): b = command_base.CommandBaseBackend() assert b.session is None assert b.target == "" def test_get_session(self): assert self.backend.get_session() == self.sess def test_get_base_url(self): assert self.backend.get_base_url() == "" def test_get_serializer(self): assert isinstance(self.backend.get_serializer(), command_base.JsonSimpleSerializer) @classmethod def _exec_echo(_, cmd): # if getting a tmpfile on the remote, return fake tmpfile if cmd == [ "python3", "-c", "import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)", ]: return b"/tmp/tmpasdfasdf", None return cmd, None @classmethod def _exec_err(_, cmd): return None, "\n".join(cmd) @classmethod def upload_file_obj_echo(_, file_obj, remote_path): return file_obj, remote_path @pytest.fixture def mock_upload_file_obj(): with mock.patch.object( command_base.CommandBaseSession, "upload_file_obj", upload_file_obj_echo ): yield @pytest.fixture def mock_exec(): with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_echo): yield @pytest.fixture def mock_exec_err(): with mock.patch.object(command_base.CommandBaseSession, "_exec", _exec_err): yield 0707010000002B000081A400000000000000000000000166B7C6B90000327E000000000000000000000000000000000000002300000000proxmoxer-2.1.0/tests/test_core.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import logging from unittest import mock import pytest from proxmoxer import core from proxmoxer.backends import https from proxmoxer.backends.command_base import JsonSimpleSerializer, Response from .api_mock import ( # pylint: disable=unused-import # noqa: F401 PVERegistry, mock_pve, ) from .test_paramiko import mock_ssh_client # pylint: disable=unused-import # noqa: F401 # pylint: disable=no-self-use,protected-access MODULE_LOGGER_NAME = "proxmoxer.core" class TestResourceException: def test_init_none(self): e = core.ResourceException(None, None, None) assert e.status_code is None assert e.status_message is None assert e.content is None assert e.errors is None assert str(e) == "None None: None" assert repr(e) == "ResourceException('None None: None')" def test_init_basic(self): e = core.ResourceException(500, "Internal Error", "Unable to do the thing") assert e.status_code == 500 assert e.status_message == "Internal Error" assert e.content == "Unable to do the thing" assert e.errors is None assert str(e) == "500 Internal Error: Unable to do the thing" assert repr(e) == "ResourceException('500 Internal Error: Unable to do the thing')" def test_init_error(self): e = core.ResourceException( 500, "Internal Error", "Unable to do the thing", "functionality not found" ) assert e.status_code == 500 assert e.status_message == "Internal Error" assert e.content == "Unable to do the thing" assert e.errors == "functionality not found" assert str(e) == "500 Internal Error: Unable to do the thing - functionality not found" assert ( repr(e) == "ResourceException('500 Internal Error: Unable to do the thing - functionality not found')" ) class TestProxmoxResource: obj = core.ProxmoxResource() base_url = "http://example.com/" def test_url_join_empty_base(self): assert "/" == self.obj.url_join("", "") def test_url_join_empty(self): assert "https://www.example.com:80/" == self.obj.url_join("https://www.example.com:80", "") def test_url_join_basic(self): assert "https://www.example.com/nodes/node1" == self.obj.url_join( "https://www.example.com", "nodes", "node1" ) def test_url_join_all_segments(self): assert "https://www.example.com/base/path#div1?search=query" == self.obj.url_join( "https://www.example.com/base#div1?search=query", "path" ) def test_repr(self): obj = core.ProxmoxResource(base_url="root") assert repr(obj.first.second("third")) == "ProxmoxResource (root/first/second/third)" def test_getattr_private(self): with pytest.raises(AttributeError) as exc_info: self.obj._thing assert str(exc_info.value) == "_thing" def test_getattr_single(self): test_obj = core.ProxmoxResource(base_url=self.base_url) ret = test_obj.nodes assert isinstance(ret, core.ProxmoxResource) assert ret._store["base_url"] == self.base_url + "nodes" def test_call_basic(self): test_obj = core.ProxmoxResource(base_url=self.base_url) ret = test_obj("nodes") assert isinstance(ret, core.ProxmoxResource) assert ret._store["base_url"] == self.base_url + "nodes" def test_call_emptystr(self): test_obj = core.ProxmoxResource(base_url=self.base_url) ret = test_obj("") assert isinstance(ret, core.ProxmoxResource) assert ret._store["base_url"] == self.base_url def test_call_list(self): test_obj = core.ProxmoxResource(base_url=self.base_url) ret = test_obj(["nodes", "node1"]) assert isinstance(ret, core.ProxmoxResource) assert ret._store["base_url"] == self.base_url + "nodes/node1" def test_call_stringable(self): test_obj = core.ProxmoxResource(base_url=self.base_url) class Thing: def __str__(self): return "string" ret = test_obj(Thing()) assert isinstance(ret, core.ProxmoxResource) assert ret._store["base_url"] == self.base_url + "string" def test_request_basic_get(self, mock_resource, caplog): caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) ret = mock_resource._request("GET", params={"key": "value"}) assert caplog.record_tuples == [ (MODULE_LOGGER_NAME, logging.INFO, "GET " + self.base_url), ( MODULE_LOGGER_NAME, logging.DEBUG, 'Status code: 200, output: b\'{"data": {"key": "value"}}\'', ), ] assert ret == {"data": {"key": "value"}} def test_request_basic_post(self, mock_resource, caplog): caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) ret = mock_resource._request("POST", data={"key": "value"}) assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.INFO, "POST " + self.base_url + " " + str({"key": "value"}), ), ( MODULE_LOGGER_NAME, logging.DEBUG, 'Status code: 200, output: b\'{"data": {"key": "value"}}\'', ), ] assert ret == {"data": {"key": "value"}} def test_request_fail(self, mock_resource, caplog): caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) with pytest.raises(core.ResourceException) as exc_info: mock_resource("fail")._request("GET") assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.INFO, "GET " + self.base_url + "fail", ), ( MODULE_LOGGER_NAME, logging.DEBUG, "Status code: 500, output: b'this is the error'", ), ] assert exc_info.value.status_code == 500 assert exc_info.value.status_message == "Internal Server Error" assert exc_info.value.content == str(b"this is the error") assert exc_info.value.errors is None def test_request_fail_with_reason(self, mock_resource, caplog): caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) with pytest.raises(core.ResourceException) as exc_info: mock_resource(["fail", "reason"])._request("GET") assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.INFO, "GET " + self.base_url + "fail/reason", ), ( MODULE_LOGGER_NAME, logging.DEBUG, "Status code: 500, output: b'this is the error'", ), ] assert exc_info.value.status_code == 500 assert exc_info.value.status_message == "Internal Server Error" assert exc_info.value.content == "this is the reason" assert exc_info.value.errors == {"errors": b"this is the error"} def test_request_params_cleanup(self, mock_resource): mock_resource._request("GET", params={"key": "value", "remove_me": None}) assert mock_resource._store["session"].params == {"key": "value"} def test_request_data_cleanup(self, mock_resource): mock_resource._request("POST", data={"key": "value", "remove_me": None}) assert mock_resource._store["session"].data == {"key": "value"} class TestProxmoxResourceMethods: _resource = core.ProxmoxResource(base_url="https://example.com") def test_get(self, mock_private_request): ret = self._resource.get("nodes", key="value") ret_self = ret["self"] assert ret["method"] == "GET" assert ret["params"] == {"key": "value"} assert ret_self._store["base_url"] == "https://example.com/nodes" def test_post(self, mock_private_request): ret = self._resource.post("nodes", key="value") ret_self = ret["self"] assert ret["method"] == "POST" assert ret["data"] == {"key": "value"} assert ret_self._store["base_url"] == "https://example.com/nodes" def test_put(self, mock_private_request): ret = self._resource.put("nodes", key="value") ret_self = ret["self"] assert ret["method"] == "PUT" assert ret["data"] == {"key": "value"} assert ret_self._store["base_url"] == "https://example.com/nodes" def test_delete(self, mock_private_request): ret = self._resource.delete("nodes", key="value") ret_self = ret["self"] assert ret["method"] == "DELETE" assert ret["params"] == {"key": "value"} assert ret_self._store["base_url"] == "https://example.com/nodes" def test_create(self, mock_private_request): ret = self._resource.create("nodes", key="value") ret_self = ret["self"] assert ret["method"] == "POST" assert ret["data"] == {"key": "value"} assert ret_self._store["base_url"] == "https://example.com/nodes" def test_set(self, mock_private_request): ret = self._resource.set("nodes", key="value") ret_self = ret["self"] assert ret["method"] == "PUT" assert ret["data"] == {"key": "value"} assert ret_self._store["base_url"] == "https://example.com/nodes" class TestProxmoxAPI: def test_init_basic(self): prox = core.ProxmoxAPI( "host", token_name="name", token_value="value", service="pVe", backend="hTtPs" ) assert isinstance(prox, core.ProxmoxAPI) assert isinstance(prox, core.ProxmoxResource) assert isinstance(prox._backend, https.Backend) assert prox._backend.auth.service == "PVE" def test_init_invalid_service(self): with pytest.raises(NotImplementedError) as exc_info: core.ProxmoxAPI("host", service="NA") assert str(exc_info.value) == "NA service is not supported" def test_init_invalid_backend(self): with pytest.raises(NotImplementedError) as exc_info: core.ProxmoxAPI("host", service="pbs", backend="LocaL") assert str(exc_info.value) == "PBS service does not support local backend" def test_init_local_with_host(self): with pytest.raises(NotImplementedError) as exc_info: core.ProxmoxAPI("host", service="pve", backend="LocaL") assert str(exc_info.value) == "local backend does not support host keyword" def test_repr_https(self): prox = core.ProxmoxAPI("host", token_name="name", token_value="value", backend="hTtPs") assert repr(prox) == "ProxmoxAPI (https backend for https://host:8006/api2/json)" def test_repr_local(self): prox = core.ProxmoxAPI(backend="local") assert repr(prox) == "ProxmoxAPI (local backend for localhost)" def test_repr_openssh(self): prox = core.ProxmoxAPI("host", user="user", backend="openssh") assert repr(prox) == "ProxmoxAPI (openssh backend for host)" def test_repr_paramiko(self, mock_ssh_client): prox = core.ProxmoxAPI("host", user="user", backend="ssh_paramiko") assert repr(prox) == "ProxmoxAPI (ssh_paramiko backend for host)" def test_get_tokens_https(self, mock_pve): prox = core.ProxmoxAPI("1.2.3.4:1234", user="user", password="password", backend="https") ticket, csrf = prox.get_tokens() assert ticket == "ticket" assert csrf == "CSRFPreventionToken" def test_get_tokens_local(self): prox = core.ProxmoxAPI(service="pve", backend="local") ticket, csrf = prox.get_tokens() assert ticket is None assert csrf is None class MockSession: def request(self, method, url, data=None, params=None): # store the arguments in the session so they can be tested after the call self.data = data self.params = params self.method = method self.url = url if "fail" in url: r = Response(b"this is the error", 500) if "reason" in url: r.reason = "this is the reason" return r else: return Response(b'{"data": {"key": "value"}}', 200) @pytest.fixture def mock_private_request(): def mock_request(self, method, data=None, params=None): return {"self": self, "method": method, "data": data, "params": params} with mock.patch("proxmoxer.core.ProxmoxResource._request", mock_request): yield @pytest.fixture def mock_resource(): return core.ProxmoxResource( session=MockSession(), base_url="http://example.com/", serializer=JsonSimpleSerializer() ) 0707010000002C000081A400000000000000000000000166B7C6B900004F0C000000000000000000000000000000000000002400000000proxmoxer-2.1.0/tests/test_https.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import logging import re import sys import tempfile from unittest import mock import pytest from requests import Request, Response import proxmoxer as core from proxmoxer.backends import https from .api_mock import ( # pylint: disable=unused-import # noqa: F401 PVERegistry, mock_pve, ) # pylint: disable=no-self-use MODULE_LOGGER_NAME = "proxmoxer.backends.https" class TestHttpsBackend: """ Tests for the proxmox.backends.https file. Only tests the Backend class for correct setting of variables and selection of auth class. Other classes are separately tested. """ def test_init_no_auth(self): with pytest.raises(NotImplementedError) as exc_info: https.Backend("1.2.3.4:1234") assert str(exc_info.value) == "No valid authentication credentials were supplied" def test_init_ip4_separate_port(self): backend = https.Backend("1.2.3.4", port=1234, token_name="") exp_base_url = "https://1.2.3.4:1234/api2/json" assert backend.get_base_url() == exp_base_url def test_init_ip4_inline_port(self): backend = https.Backend("1.2.3.4:1234", token_name="") exp_base_url = "https://1.2.3.4:1234/api2/json" assert backend.get_base_url() == exp_base_url def test_init_ip6_separate_port(self): backend = https.Backend("2001:db8::1:2:3:4", port=1234, token_name="") exp_base_url = "https://[2001:db8::1:2:3:4]:1234/api2/json" assert backend.get_base_url() == exp_base_url def test_init_ip6_brackets_separate_port(self): backend = https.Backend("[2001:0db8::1:2:3:4]", port=1234, token_name="") exp_base_url = "https://[2001:0db8::1:2:3:4]:1234/api2/json" assert backend.get_base_url() == exp_base_url def test_init_ip6_inline_port(self): backend = https.Backend("[2001:db8::1:2:3:4]:1234", token_name="") exp_base_url = "https://[2001:db8::1:2:3:4]:1234/api2/json" assert backend.get_base_url() == exp_base_url def test_init_ip4_no_port(self): backend = https.Backend("1.2.3.4", token_name="") exp_base_url = "https://1.2.3.4:8006/api2/json" assert backend.get_base_url() == exp_base_url def test_init_path_prefix(self): backend = https.Backend("1.2.3.4:1234", path_prefix="path", token_name="") exp_base_url = "https://1.2.3.4:1234/path/api2/json" assert backend.get_base_url() == exp_base_url def test_init_token_pass(self): backend = https.Backend("1.2.3.4:1234", token_name="name") assert isinstance(backend.auth, https.ProxmoxHTTPApiTokenAuth) def test_init_token_not_supported(self, apply_none_service): with pytest.raises(NotImplementedError) as exc_info: https.Backend("1.2.3.4:1234", token_name="name", service="NONE") assert str(exc_info.value) == "NONE does not support API Token authentication" def test_init_password_not_supported(self, apply_none_service): with pytest.raises(NotImplementedError) as exc_info: https.Backend("1.2.3.4:1234", password="pass", service="NONE") assert str(exc_info.value) == "NONE does not support password authentication" def test_get_tokens_api_token(self): backend = https.Backend("1.2.3.4:1234", token_name="name") assert backend.get_tokens() == (None, None) def test_get_tokens_password(self, mock_pve): backend = https.Backend("1.2.3.4:1234", password="name") assert ("ticket", "CSRFPreventionToken") == backend.get_tokens() def test_verify_ssl_token(self): backend = https.Backend("1.2.3.4:1234", token_name="name") assert backend.auth.verify_ssl is True def test_verify_ssl_false_token(self): backend = https.Backend("1.2.3.4:1234", token_name="name", verify_ssl=False) assert backend.auth.verify_ssl is False def test_verify_ssl_password(self, mock_pve): backend = https.Backend("1.2.3.4:1234", password="name") assert backend.auth.verify_ssl is True def test_verify_ssl_false_password(self, mock_pve): backend = https.Backend("1.2.3.4:1234", password="name", verify_ssl=False) assert backend.auth.verify_ssl is False class TestProxmoxHTTPAuthBase: """ Tests the ProxmoxHTTPAuthBase class """ base_url = PVERegistry.base_url def test_init_all_args(self): auth = https.ProxmoxHTTPAuthBase(timeout=1234, service="PMG", verify_ssl=True) assert auth.timeout == 1234 assert auth.service == "PMG" assert auth.verify_ssl is True def test_call(self): auth = https.ProxmoxHTTPAuthBase() req = Request("HEAD", self.base_url + "/version").prepare() resp = auth(req) assert resp == req def test_get_cookies(self): auth = https.ProxmoxHTTPAuthBase() assert auth.get_cookies().get_dict() == {} class TestProxmoxHTTPApiTokenAuth: """ Tests the ProxmoxHTTPApiTokenAuth class """ base_url = PVERegistry.base_url def test_init_all_args(self): auth = https.ProxmoxHTTPApiTokenAuth( "user", "name", "value", service="PMG", timeout=1234, verify_ssl=True ) assert auth.username == "user" assert auth.token_name == "name" assert auth.token_value == "value" assert auth.service == "PMG" assert auth.timeout == 1234 assert auth.verify_ssl is True def test_call_pve(self): auth = https.ProxmoxHTTPApiTokenAuth("user", "name", "value", service="PVE") req = Request("HEAD", self.base_url + "/version").prepare() resp = auth(req) assert resp.headers["Authorization"] == "PVEAPIToken=user!name=value" def test_call_pbs(self): auth = https.ProxmoxHTTPApiTokenAuth("user", "name", "value", service="PBS") req = Request("HEAD", self.base_url + "/version").prepare() resp = auth(req) assert resp.headers["Authorization"] == "PBSAPIToken=user!name:value" class TestProxmoxHTTPAuth: """ Tests the ProxmoxHTTPApiTokenAuth class """ base_url = PVERegistry.base_url # pylint: disable=redefined-outer-name def test_init_all_args(self, mock_pve): auth = https.ProxmoxHTTPAuth( "otp", "password", otp="otp", base_url=self.base_url, service="PMG", timeout=1234, verify_ssl=True, ) assert auth.username == "otp" assert auth.pve_auth_ticket == "ticket" assert auth.csrf_prevention_token == "CSRFPreventionToken" assert auth.service == "PMG" assert auth.timeout == 1234 assert auth.verify_ssl is True def test_ticket_renewal(self, mock_pve): auth = https.ProxmoxHTTPAuth("user", "password", base_url=self.base_url) auth(Request("HEAD", self.base_url + "/version").prepare()) # check starting auth tokens assert auth.pve_auth_ticket == "ticket" assert auth.csrf_prevention_token == "CSRFPreventionToken" auth.renew_age = 0 # force renewing ticket now auth(Request("GET", self.base_url + "/version").prepare()) # check renewed auth tokens assert auth.pve_auth_ticket == "new_ticket" assert auth.csrf_prevention_token == "CSRFPreventionToken_2" def test_get_cookies(self, mock_pve): auth = https.ProxmoxHTTPAuth("user", "password", base_url=self.base_url, service="PVE") assert auth.get_cookies().get_dict() == {"PVEAuthCookie": "ticket"} def test_auth_failure(self, mock_pve): with pytest.raises(core.AuthenticationError) as exc_info: https.ProxmoxHTTPAuth("bad_auth", "", base_url=self.base_url) assert ( str(exc_info.value) == f"Couldn't authenticate user: bad_auth to {self.base_url}/access/ticket" ) assert ( repr(exc_info.value) == f'AuthenticationError("Couldn\'t authenticate user: bad_auth to {self.base_url}/access/ticket")' ) def test_auth_otp(self, mock_pve): https.ProxmoxHTTPAuth( "otp", "password", base_url=self.base_url, otp="123456", service="PVE" ) def test_auth_otp_missing(self, mock_pve): with pytest.raises(core.AuthenticationError) as exc_info: https.ProxmoxHTTPAuth("otp", "password", base_url=self.base_url, service="PVE") assert ( str(exc_info.value) == "Couldn't authenticate user: missing Two Factor Authentication (TFA)" ) assert ( repr(exc_info.value) == 'AuthenticationError("Couldn\'t authenticate user: missing Two Factor Authentication (TFA)")' ) class TestProxmoxHttpSession: """ Tests the ProxmoxHttpSession class """ base_url = PVERegistry.base_url _session = https.Backend("1.2.3.4", token_name="").get_session() def test_request_basic(self, mock_pve): resp = self._session.request("GET", self.base_url + "/fake/echo") content = resp.json() assert content["method"] == "GET" assert content["url"] == self.base_url + "/fake/echo" assert content["body"] is None assert content["headers"]["accept"] == https.JsonSerializer().get_accept_types() def test_request_data(self, mock_pve): resp = self._session.request("GET", self.base_url + "/fake/echo", data={"key": "value"}) content = resp.json() assert content["method"] == "GET" assert content["url"] == self.base_url + "/fake/echo" assert content["body"] == "key=value" assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded" def test_request_monitor_command_list(self, mock_pve): resp = self._session.request( "GET", self.base_url + "/nodes/node_name/qemu/100/monitor", data={"command": ["info", "block"]}, ) assert resp.status_code == 400 def test_request_exec_command_list(self, mock_pve): resp = self._session.request( "GET", self.base_url + "/nodes/node_name/qemu/100/agent/exec", data={"command": ["echo", "hello", "world"]}, ) content = resp.json() assert content["method"] == "GET" assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/agent/exec" assert content["body"] == "command=echo&command=hello&command=world" assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded" def test_request_monitor_command_string(self, mock_pve): resp = self._session.request( "GET", self.base_url + "/nodes/node_name/qemu/100/monitor", data={"command": "echo hello world"}, ) content = resp.json() assert content["method"] == "GET" assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/monitor" assert content["body"] == "command=echo+hello+world" assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded" def test_request_exec_command_string(self, mock_pve): resp = self._session.request( "GET", self.base_url + "/nodes/node_name/qemu/100/agent/exec", data={"command": "echo hello world"}, ) content = resp.json() assert content["method"] == "GET" assert content["url"] == self.base_url + "/nodes/node_name/qemu/100/agent/exec" assert content["body"] == "command=echo&command=hello&command=world" assert content["headers"]["Content-Type"] == "application/x-www-form-urlencoded" def test_request_file(self, mock_pve): size = 10 content = {} with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(0) resp = self._session.request("GET", self.base_url + "/fake/echo", data={"iso": f_obj}) content = resp.json() # decode multipart file body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n' m = re.match(body_regex, content["body"]) assert content["method"] == "GET" assert content["url"] == self.base_url + "/fake/echo" assert m is not None # content matches multipart for the created file assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1] def test_request_streaming(self, toolbelt_on_off, caplog, mock_pve): caplog.set_level(logging.INFO, logger=MODULE_LOGGER_NAME) size = https.STREAMING_SIZE_THRESHOLD + 1 content = {} with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(0) resp = self._session.request("GET", self.base_url + "/fake/echo", data={"iso": f_obj}) content = resp.json() # decode multipart file body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n' m = re.match(body_regex, content["body"]) assert content["method"] == "GET" assert content["url"] == self.base_url + "/fake/echo" assert m is not None # content matches multipart for the created file assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1] if not toolbelt_on_off: assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.INFO, "Installing 'requests_toolbelt' will decrease memory used during upload", ) ] def test_request_large_file(self, shrink_thresholds, toolbelt_on_off, caplog, mock_pve): size = https.SSL_OVERFLOW_THRESHOLD + 1 content = {} with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(0) if toolbelt_on_off: resp = self._session.request( "GET", self.base_url + "/fake/echo", data={"iso": f_obj} ) content = resp.json() # decode multipart file body_regex = f'--([0-9a-f]*)\r\nContent-Disposition: form-data; name="iso"\r\nContent-Type: application/octet-stream\r\n\r\na{{{size}}}\r\n--\\1--\r\n' m = re.match(body_regex, content["body"]) assert content["method"] == "GET" assert content["url"] == self.base_url + "/fake/echo" assert m is not None # content matches multipart for the created file assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1] else: # forcing an ImportError with pytest.raises(OverflowError) as exc_info: resp = self._session.request( "GET", self.base_url + "/fake/echo", data={"iso": f_obj} ) assert str(exc_info.value) == "Unable to upload a payload larger than 2 GiB" assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.WARNING, "Install 'requests_toolbelt' to add support for files larger than 2GiB", ) ] def test_request_filename(self, mock_pve): resp = self._session.request( "GET", self.base_url + "/fake/echo", files={"file1": "content"}, serializer=https.JsonSerializer, ) content = resp.json() # decode multipart file body_regex = '--([0-9a-f]*)\r\nContent-Disposition: form-data; name="file1"; filename="file1"\r\n\r\ncontent\r\n--\\1--\r\n' m = re.match(body_regex, content["body"]) assert content["method"] == "GET" assert content["url"] == self.base_url + "/fake/echo" assert m is not None # content matches multipart for the created file assert content["headers"]["Content-Type"] == "multipart/form-data; boundary=" + m[1] # pylint: disable=protected-access class TestJsonSerializer: _serializer = https.JsonSerializer() def test_get_accept_types(self): ctypes = "application/json, application/x-javascript, text/javascript, text/x-javascript, text/x-json" assert ctypes == self._serializer.get_accept_types() def test_loads_pass(self): input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}' exp_output = {"key1": "value1", "key2": "value2"} response = Response() response._content = input_str.encode("utf-8") act_output = self._serializer.loads(response) assert act_output == exp_output def test_loads_not_json(self): input_str = "There was an error with the request" exp_output = {"errors": b"There was an error with the request"} response = Response() response._content = input_str.encode("utf-8") act_output = self._serializer.loads(response) assert act_output == exp_output def test_loads_not_unicode(self): input_str = '{"data": {"key1": "value1", "key2": "value2"}, "errors": {}}\x80' exp_output = {"errors": input_str.encode("utf-8")} response = Response() response._content = input_str.encode("utf-8") act_output = self._serializer.loads(response) assert act_output == exp_output def test_loads_errors_pass(self): input_str = ( '{"data": {}, "errors": ["missing required param 1", "missing required param 2"]}' ) exp_output = ["missing required param 1", "missing required param 2"] response = Response() response._content = input_str.encode("utf-8") act_output = self._serializer.loads_errors(response) assert act_output == exp_output def test_loads_errors_not_json(self): input_str = ( '{"data": {} "errors": ["missing required param 1", "missing required param 2"]}' ) exp_output = { "errors": b'{"data": {} "errors": ["missing required param 1", "missing required param 2"]}' } response = Response() response._content = input_str.encode("utf-8") act_output = self._serializer.loads_errors(response) assert act_output == exp_output def test_loads_errors_not_unicode(self): input_str = ( '{"data": {}, "errors": ["missing required param 1", "missing required param 2"]}\x80' ) exp_output = {"errors": input_str.encode("utf-8")} response = Response() response._content = input_str.encode("utf-8") act_output = self._serializer.loads_errors(response) assert act_output == exp_output @pytest.fixture(params=(False, True)) def toolbelt_on_off(request, monkeypatch): """ runs test twice, once with importing of 'requests_toolbelt' to be allowed and one with it disabled. Returns True if module is available, False if blocked. """ if not request.param: # ran once with requests_toolbelt available and once with it removed monkeypatch.setitem(sys.modules, "requests_toolbelt", None) return request.param @pytest.fixture def shrink_thresholds(): with mock.patch("proxmoxer.backends.https.STREAMING_SIZE_THRESHOLD", 100), mock.patch( "proxmoxer.backends.https.SSL_OVERFLOW_THRESHOLD", 1000 ): yield @pytest.fixture def apply_none_service(): serv = { "NONE": { "supported_backends": [], "supported_https_auths": [], "default_port": 1234, } } with mock.patch("proxmoxer.core.SERVICES", serv), mock.patch( "proxmoxer.backends.https.SERVICES", serv ): yield 0707010000002D000081A400000000000000000000000166B7C6B90000099F000000000000000000000000000000000000002C00000000proxmoxer-2.1.0/tests/test_https_helpers.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import tempfile from proxmoxer.backends import https class TestGetFileSize: """ Tests for the get_file_size() function within proxmoxer.backends.https """ def test_empty(self): with tempfile.TemporaryFile("w+b") as f_obj: assert https.get_file_size(f_obj) == 0 def test_small(self): size = 100 with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) assert https.get_file_size(f_obj) == size def test_large(self): size = 10 * 1024 * 1024 # 10 MB with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) assert https.get_file_size(f_obj) == size def test_half_seek(self): size = 200 with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(int(size / 2)) assert https.get_file_size(f_obj) == size def test_full_seek(self): size = 200 with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(size) assert https.get_file_size(f_obj) == size class TestGetFileSizePartial: """ Tests for the get_file_size_partial() function within proxmoxer.backends.https """ def test_empty(self): with tempfile.TemporaryFile("w+b") as f_obj: assert https.get_file_size_partial(f_obj) == 0 def test_small(self): size = 100 with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(0) assert https.get_file_size_partial(f_obj) == size def test_large(self): size = 10 * 1024 * 1024 # 10 MB with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(0) assert https.get_file_size_partial(f_obj) == size def test_half_seek(self): size = 200 with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(int(size / 2)) assert https.get_file_size_partial(f_obj) == size / 2 def test_full_seek(self): size = 200 with tempfile.TemporaryFile("w+b") as f_obj: f_obj.write(b"a" * size) f_obj.seek(size) assert https.get_file_size_partial(f_obj) == 0 0707010000002E000081A400000000000000000000000166B7C6B900000FC3000000000000000000000000000000000000002600000000proxmoxer-2.1.0/tests/test_imports.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import logging import sys from importlib import reload import pytest def test_missing_requests(requests_off, caplog): with pytest.raises(SystemExit) as exit_exp: import proxmoxer.backends.https as test_https # force re-importing of the module with `requests` gone so the validation is triggered reload(test_https) assert exit_exp.value.code == 1 assert caplog.record_tuples == [ ( "proxmoxer.backends.https", logging.ERROR, "Chosen backend requires 'requests' module\n", ) ] def test_missing_requests_tools_files(requests_off, caplog): with pytest.raises(SystemExit) as exit_exp: import proxmoxer.tools.files as test_files # force re-importing of the module with `requests` gone so the validation is triggered reload(test_files) assert exit_exp.value.code == 1 assert caplog.record_tuples == [ ( "proxmoxer.tools.files", logging.ERROR, "Files tools requires 'requests' module\n", ) ] def test_missing_openssh_wrapper(openssh_off, caplog): with pytest.raises(SystemExit) as exit_exp: import proxmoxer.backends.openssh as test_openssh # force re-importing of the module with `openssh_wrapper` gone so the validation is triggered reload(test_openssh) assert exit_exp.value.code == 1 assert caplog.record_tuples == [ ( "proxmoxer.backends.openssh", logging.ERROR, "Chosen backend requires 'openssh_wrapper' module\n", ) ] def test_missing_paramiko_off(paramiko_off, caplog): with pytest.raises(SystemExit) as exit_exp: import proxmoxer.backends.ssh_paramiko as ssh_paramiko # force re-importing of the module with `ssh_paramiko` gone so the validation is triggered reload(ssh_paramiko) assert exit_exp.value.code == 1 assert caplog.record_tuples == [ ( "proxmoxer.backends.ssh_paramiko", logging.ERROR, "Chosen backend requires 'paramiko' module\n", ) ] class TestCommandBase: def test_join_empty(self, shlex_join_on_off): from proxmoxer.backends import command_base reload(command_base) arr = [] assert command_base.shell_join(arr) == "" def test_join_single(self, shlex_join_on_off): from proxmoxer.backends import command_base reload(command_base) arr = ["echo"] assert command_base.shell_join(arr) == "echo" def test_join_multiple(self, shlex_join_on_off): from proxmoxer.backends import command_base reload(command_base) arr = ["echo", "test"] assert command_base.shell_join(arr) == "echo test" def test_join_complex(self, shlex_join_on_off): from proxmoxer.backends import command_base reload(command_base) arr = ["echo", 'hello "world"'] assert command_base.shell_join(arr) == "echo 'hello \"world\"'" @pytest.fixture() def requests_off(monkeypatch): return monkeypatch.setitem(sys.modules, "requests", None) @pytest.fixture() def openssh_off(monkeypatch): return monkeypatch.setitem(sys.modules, "openssh_wrapper", None) @pytest.fixture() def paramiko_off(monkeypatch): return monkeypatch.setitem(sys.modules, "paramiko", None) @pytest.fixture(params=(False, True)) def shlex_join_on_off(request, monkeypatch): """ runs test twice, once with importing of 'shlex.join' to be allowed and one with it disabled. Returns True if module is available, False if blocked. """ if not request.param: # ran once with shlex available and once with it removed if getattr(sys.modules["shlex"], "join", None): monkeypatch.delattr(sys.modules["shlex"], "join") # else join already does not exist (py < 3.8) return request.param 0707010000002F000081A400000000000000000000000166B7C6B900000635000000000000000000000000000000000000002400000000proxmoxer-2.1.0/tests/test_local.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import tempfile from proxmoxer.backends import local # pylint: disable=no-self-use class TestLocalBackend: def test_init(self): back = local.Backend() assert isinstance(back.session, local.LocalSession) assert back.target == "localhost" class TestLocalSession: _session = local.LocalSession() def test_upload_file_obj(self): size = 100 with tempfile.NamedTemporaryFile("w+b") as f_obj, tempfile.NamedTemporaryFile( "rb" ) as dest_obj: f_obj.write(b"a" * size) f_obj.seek(0) self._session.upload_file_obj(f_obj, dest_obj.name) # reset file cursor to start of file after copy f_obj.seek(0) assert f_obj.read() == dest_obj.read() def test_upload_file_obj_end(self): size = 100 with tempfile.NamedTemporaryFile("w+b") as f_obj, tempfile.NamedTemporaryFile( "rb" ) as dest_obj: f_obj.write(b"a" * size) # do not seek to start of file before copy self._session.upload_file_obj(f_obj, dest_obj.name) assert b"" == dest_obj.read() def test_exec(self): cmd = [ "python3", "-c", 'import sys; sys.stdout.write("stdout content"); sys.stderr.write("stderr content")', ] stdout, stderr = self._session._exec(cmd) assert stdout == "stdout content" assert stderr == "stderr content" 07070100000030000081A400000000000000000000000166B7C6B9000009F1000000000000000000000000000000000000002600000000proxmoxer-2.1.0/tests/test_openssh.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import tempfile from unittest import mock import openssh_wrapper import pytest from proxmoxer.backends import openssh # pylint: disable=no-self-use class TestOpenSSHBackend: def test_init(self): back = openssh.Backend("host", "user") assert isinstance(back.session, openssh.OpenSSHSession) assert back.session.host == "host" assert back.session.user == "user" assert back.target == "host" class TestOpenSSHSession: _session = openssh.OpenSSHSession("host", "user") def test_init_all_args(self): with tempfile.NamedTemporaryFile("r") as conf_obj, tempfile.NamedTemporaryFile( "r" ) as ident_obj: sess = openssh.OpenSSHSession( "host", "user", config_file=conf_obj.name, port=123, identity_file=ident_obj.name, forward_ssh_agent=True, ) assert sess.host == "host" assert sess.user == "user" assert sess.config_file == conf_obj.name assert sess.port == 123 assert sess.identity_file == ident_obj.name assert sess.forward_ssh_agent is True def test_exec(self, mock_session): cmd = [ "echo", "hello", "world", ] stdout, stderr = mock_session._exec(cmd) assert stdout == "stdout content" assert stderr == "stderr content" mock_session.ssh_client.run.assert_called_once_with( "echo hello world", forward_ssh_agent=True, ) def test_upload_file_obj(self, mock_session): with tempfile.NamedTemporaryFile("r") as f_obj: mock_session.upload_file_obj(f_obj, "/tmp/file") mock_session.ssh_client.scp.assert_called_once_with( (f_obj,), target="/tmp/file", ) @pytest.fixture def mock_session(): with mock.patch("proxmoxer.backends.openssh.OpenSSHSession._connect", _get_mock_ssh_conn): yield openssh.OpenSSHSession("host", "user", forward_ssh_agent=True) def _get_mock_ssh_conn(_): ssh_conn = mock.Mock(spec=openssh_wrapper.SSHConnection) ssh_conn.run = mock.Mock( # spec=openssh_wrapper.SSHConnection.run, return_value=mock.Mock(stdout="stdout content", stderr="stderr content"), ) ssh_conn.scp = mock.Mock() return ssh_conn 07070100000031000081A400000000000000000000000166B7C6B90000144B000000000000000000000000000000000000002700000000proxmoxer-2.1.0/tests/test_paramiko.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import os.path import tempfile from unittest import mock import pytest from proxmoxer.backends import ssh_paramiko # pylint: disable=no-self-use class TestParamikoBackend: def test_init(self, mock_connect): back = ssh_paramiko.Backend("host", "user") assert isinstance(back.session, ssh_paramiko.SshParamikoSession) assert back.session.host == "host" assert back.session.user == "user" assert back.target == "host" class TestSshParamikoSession: def test_init_all_args(self, mock_connect): sess = ssh_paramiko.SshParamikoSession( "host", "user", password="password", private_key_file="/tmp/key_file", port=1234 ) assert sess.host == "host" assert sess.user == "user" assert sess.password == "password" assert sess.private_key_file == "/tmp/key_file" assert sess.port == 1234 assert sess.ssh_client == mock_connect() def test_connect_basic(self, mock_ssh_client): import paramiko sess = ssh_paramiko.SshParamikoSession("host", "user", password="password", port=1234) sess.ssh_client.connect.assert_called_once_with( "host", username="user", allow_agent=False, look_for_keys=True, key_filename=None, password="password", timeout=5, port=1234, ) policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0] assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy) def test_connect_key_file(self, mock_ssh_client): import paramiko sess = ssh_paramiko.SshParamikoSession( "host", "user", password="password", private_key_file="/tmp/key_file", port=1234 ) sess.ssh_client.connect.assert_called_once_with( "host", username="user", allow_agent=False, look_for_keys=True, key_filename="/tmp/key_file", password="password", timeout=5, port=1234, ) policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0] assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy) def test_connect_key_file_user(self, mock_ssh_client): import paramiko sess = ssh_paramiko.SshParamikoSession( "host", "user", password="password", private_key_file="~/key_file", port=1234 ) sess.ssh_client.connect.assert_called_once_with( "host", username="user", allow_agent=False, look_for_keys=True, key_filename=os.path.expanduser("~") + "/key_file", password="password", timeout=5, port=1234, ) policy_call_args, _ = sess.ssh_client.set_missing_host_key_policy.call_args_list[0] assert isinstance(policy_call_args[0], paramiko.AutoAddPolicy) def test_exec(self, mock_ssh_client): mock_client, mock_session, _ = mock_ssh_client sess = ssh_paramiko.SshParamikoSession("host", "user") sess.ssh_client = mock_client stdout, stderr = sess._exec(["echo", "hello", "world"]) assert stdout == "stdout contents" assert stderr == "stderr contents" mock_session.exec_command.assert_called_once_with("echo hello world") def test_upload_file_obj(self, mock_ssh_client): mock_client, _, mock_sftp = mock_ssh_client sess = ssh_paramiko.SshParamikoSession("host", "user") sess.ssh_client = mock_client with tempfile.NamedTemporaryFile("r") as f_obj: sess.upload_file_obj(f_obj, "/tmp/file") mock_sftp.putfo.assert_called_once_with(f_obj, "/tmp/file") mock_sftp.close.assert_called_once_with() @pytest.fixture def mock_connect(): m = mock.Mock(spec=ssh_paramiko.SshParamikoSession._connect) with mock.patch( "proxmoxer.backends.ssh_paramiko.SshParamikoSession._connect", m, ): yield m @pytest.fixture def mock_ssh_client(): # pylint: disable=import-outside-toplevel from paramiko import SFTPClient, SSHClient, Transport, channel mock_client = mock.Mock(spec=SSHClient) mock_transport = mock.Mock(spec=Transport) mock_channel = mock.Mock(spec=channel.Channel) mock_sftp = mock.Mock(spec=SFTPClient) # mock the return streams from the SSH connection mock_stdout = mock.Mock(spec=channel.ChannelFile) mock_stderr = mock.Mock(spec=channel.ChannelStderrFile) mock_stdout.read.return_value = b"stdout contents" mock_stderr.read.return_value = b"stderr contents" mock_channel.makefile.return_value = mock_stdout mock_channel.makefile_stderr.return_value = mock_stderr mock_transport.open_session.return_value = mock_channel mock_client.get_transport.return_value = mock_transport mock_client.open_sftp.return_value = mock_sftp with mock.patch("paramiko.SSHClient", mock_client): yield (mock_client, mock_channel, mock_sftp) 07070100000032000041ED00000000000000000000000266B7C6B900000000000000000000000000000000000000000000001C00000000proxmoxer-2.1.0/tests/tools07070100000033000081A400000000000000000000000166B7C6B90000005C000000000000000000000000000000000000002800000000proxmoxer-2.1.0/tests/tools/__init__.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" 07070100000034000081A400000000000000000000000166B7C6B900003525000000000000000000000000000000000000002A00000000proxmoxer-2.1.0/tests/tools/test_files.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2023" __license__ = "MIT" import logging import tempfile from unittest import mock import pytest from proxmoxer import ProxmoxAPI, core from proxmoxer.tools import ChecksumInfo, Files, SupportedChecksums from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401 from ..files_mock import ( # pylint: disable=unused-import # noqa: F401 mock_files, mock_files_and_pve, ) MODULE_LOGGER_NAME = "proxmoxer.tools.files" class TestChecksumInfo: def test_basic(self): info = ChecksumInfo("name", 123) assert info.name == "name" assert info.hex_size == 123 def test_str(self): info = ChecksumInfo("name", 123) assert str(info) == "name" def test_repr(self): info = ChecksumInfo("name", 123) assert repr(info) == "name (123 digits)" class TestGetChecksum: def test_get_checksum_from_sibling_file_success(self, mock_files): url = "https://sub.domain.tld/sibling/file.iso" exp_hash = "this_is_the_hash" info = ChecksumInfo("testing", 16) res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info) res2 = Files._get_checksum_from_sibling_file(url, checksum_info=info, filename="file.iso") assert res1 == exp_hash assert res2 == exp_hash def test_get_checksum_from_sibling_file_fail(self, mock_files): url = "https://sub.domain.tld/sibling/missing.iso" info = ChecksumInfo("testing", 16) res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info) res2 = Files._get_checksum_from_sibling_file( url, checksum_info=info, filename="missing.iso" ) assert res1 is None assert res2 is None def test_get_checksum_from_extension_success(self, mock_files): url = "https://sub.domain.tld/extension/file.iso" exp_hash = "this_is_the_hash" info = ChecksumInfo("testing", 16) res1 = Files._get_checksum_from_extension(url, checksum_info=info) res2 = Files._get_checksum_from_extension(url, checksum_info=info, filename="file.iso") assert res1 == exp_hash assert res2 == exp_hash def test_get_checksum_from_extension_fail(self, mock_files): url = "https://sub.domain.tld/extension/missing.iso" info = ChecksumInfo("testing", 16) res1 = Files._get_checksum_from_extension(url, checksum_info=info) res2 = Files._get_checksum_from_extension( url, checksum_info=info, filename="connectionerror.iso" ) res3 = Files._get_checksum_from_extension( url, checksum_info=info, filename="readtimeout.iso" ) assert res1 is None assert res2 is None assert res3 is None def test_get_checksum_from_extension_upper_success(self, mock_files): url = "https://sub.domain.tld/upper/file.iso" exp_hash = "this_is_the_hash" info = ChecksumInfo("testing", 16) res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info) res2 = Files._get_checksum_from_extension_upper( url, checksum_info=info, filename="file.iso" ) assert res1 == exp_hash assert res2 == exp_hash def test_get_checksum_from_extension_upper_fail(self, mock_files): url = "https://sub.domain.tld/upper/missing.iso" info = ChecksumInfo("testing", 16) res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info) res2 = Files._get_checksum_from_extension_upper( url, checksum_info=info, filename="missing.iso" ) assert res1 is None assert res2 is None def test_get_checksums_from_file_url_all_checksums(self, mock_files): base_url = "https://sub.domain.tld/checksums/file.iso" full_checksum_string = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" for types_enum in SupportedChecksums: checksum_info = types_enum.value data = Files.get_checksums_from_file_url(base_url, preferred_type=checksum_info) assert data[0] == full_checksum_string[0 : checksum_info.hex_size] assert data[1] == checksum_info def test_get_checksums_from_file_url_missing(self, mock_files): url = "https://sub.domain.tld/missing.iso" data = Files.get_checksums_from_file_url(url) assert data[0] is None assert data[1] is None class TestFiles: prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value") def test_init_basic(self): f = Files(self.prox, "node1", "storage1") assert f._prox == self.prox assert f._node == "node1" assert f._storage == "storage1" def test_repr(self): f = Files(self.prox, "node1", "storage1") assert ( repr(f) == "Files (node1/storage1 at ProxmoxAPI (https backend for https://1.2.3.4:1234/api2/json))" ) def test_get_file_info_pass(self, mock_pve): f = Files(self.prox, "node1", "storage1") info = f.get_file_info("https://sub.domain.tld/file.iso") assert info["filename"] == "file.iso" assert info["mimetype"] == "application/x-iso9660-image" assert info["size"] == 123456 def test_get_file_info_fail(self, mock_pve): f = Files(self.prox, "node1", "storage1") info = f.get_file_info("https://sub.domain.tld/invalid.iso") assert info is None class TestFilesDownload: prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value") f = Files(prox, "node1", "storage1") def test_download_discover_checksum(self, mock_files_and_pve, caplog): status = self.f.download_file_to_storage("https://sub.domain.tld/checksums/file.iso") # this is the default "done" task mock information assert status == { "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1", } assert caplog.record_tuples == [] def test_download_no_blocking(self, mock_files_and_pve, caplog): status = self.f.download_file_to_storage( "https://sub.domain.tld/checksums/file.iso", blocking_status=False ) # this is the default "done" task mock information assert status == { "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1", } assert caplog.record_tuples == [] def test_download_no_discover_checksum(self, mock_files_and_pve, caplog): caplog.set_level(logging.WARNING, logger=MODULE_LOGGER_NAME) status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso") # this is the default "stopped" task mock information assert status == { "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1", } assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.WARNING, "Unable to discover checksum. Will not do checksum validation", ), ] def test_uneven_checksum(self, caplog, mock_files_and_pve): caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso", checksum="asdf") assert status is None assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.ERROR, "Must pass both checksum and checksum_type or leave both None for auto-discovery", ), ] def test_uneven_checksum_type(self, caplog, mock_files_and_pve): caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME) status = self.f.download_file_to_storage( "https://sub.domain.tld/file.iso", checksum_type="asdf" ) assert status is None assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.ERROR, "Must pass both checksum and checksum_type or leave both None for auto-discovery", ), ] def test_get_file_info_missing(self, mock_pve): f = Files(self.prox, "node1", "storage1") info = f.get_file_info("https://sub.domain.tld/missing.iso") assert info is None def test_get_file_info_non_iso(self, mock_pve): f = Files(self.prox, "node1", "storage1") info = f.get_file_info("https://sub.domain.tld/index.html") assert info["filename"] == "index.html" assert info["mimetype"] == "text/html" class TestFilesUpload: prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value") f = Files(prox, "node1", "storage1") def test_upload_no_file(self, mock_files_and_pve, caplog): status = self.f.upload_local_file_to_storage("/does-not-exist.iso") assert status is None assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.ERROR, '"/does-not-exist.iso" does not exist or is not a file', ), ] def test_upload_dir(self, mock_files_and_pve, caplog): with tempfile.TemporaryDirectory() as tmp_dir: status = self.f.upload_local_file_to_storage(tmp_dir) assert status is None assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.ERROR, f'"{tmp_dir}" does not exist or is not a file', ), ] def test_upload_empty_file(self, mock_files_and_pve, caplog): with tempfile.NamedTemporaryFile("rb") as f_obj: status = self.f.upload_local_file_to_storage(filename=f_obj.name) assert status is not None assert caplog.record_tuples == [] def test_upload_non_empty_file(self, mock_files_and_pve, caplog): with tempfile.NamedTemporaryFile("w+b") as f_obj: f_obj.write(b"a" * 100) f_obj.seek(0) status = self.f.upload_local_file_to_storage(filename=f_obj.name) assert status is not None assert caplog.record_tuples == [] def test_upload_no_checksum(self, mock_files_and_pve, caplog): with tempfile.NamedTemporaryFile("rb") as f_obj: status = self.f.upload_local_file_to_storage( filename=f_obj.name, do_checksum_check=False ) assert status is not None assert caplog.record_tuples == [] def test_upload_checksum_unavailable(self, mock_files_and_pve, caplog, apply_no_checksums): with tempfile.NamedTemporaryFile("rb") as f_obj: status = self.f.upload_local_file_to_storage(filename=f_obj.name) assert status is not None assert caplog.record_tuples == [ ( MODULE_LOGGER_NAME, logging.WARNING, "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation", ) ] def test_upload_non_blocking(self, mock_files_and_pve, caplog): with tempfile.NamedTemporaryFile("rb") as f_obj: status = self.f.upload_local_file_to_storage(filename=f_obj.name, blocking_status=False) assert status is not None assert caplog.record_tuples == [] def test_upload_proxmox_error(self, mock_files_and_pve, caplog): with tempfile.NamedTemporaryFile("rb") as f_obj: f_copy = Files(self.f._prox, self.f._node, "missing") with pytest.raises(core.ResourceException) as exc_info: f_copy.upload_local_file_to_storage(filename=f_obj.name) assert exc_info.value.status_code == 500 assert exc_info.value.status_message == "Internal Server Error" # assert exc_info.value.content == "storage 'missing' does not exist" def test_upload_io_error(self, mock_files_and_pve, caplog): with tempfile.NamedTemporaryFile("rb") as f_obj: mo = mock.mock_open() mo.side_effect = IOError("ERROR MESSAGE") with mock.patch("builtins.open", mo): status = self.f.upload_local_file_to_storage(filename=f_obj.name) assert status is None assert caplog.record_tuples == [(MODULE_LOGGER_NAME, logging.ERROR, "ERROR MESSAGE")] @pytest.fixture def apply_no_checksums(): with mock.patch("hashlib.algorithms_available", set()): yield 07070100000035000081A400000000000000000000000166B7C6B9000022B1000000000000000000000000000000000000002A00000000proxmoxer-2.1.0/tests/tools/test_tasks.py__author__ = "John Hollowell" __copyright__ = "(c) John Hollowell 2022" __license__ = "MIT" import logging import pytest from proxmoxer import ProxmoxAPI from proxmoxer.tools import Tasks from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401 class TestBlockingStatus: def test_basic(self, mocked_prox, caplog): caplog.set_level(logging.DEBUG, logger="proxmoxer.core") status = Tasks.blocking_status( mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done" ) assert status == { "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1", } assert caplog.record_tuples == [ ( "proxmoxer.core", 20, "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done/status", ), ( "proxmoxer.core", 10, 'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1"}}\'', ), ] def test_zeroed(self, mocked_prox, caplog): caplog.set_level(logging.DEBUG, logger="proxmoxer.core") status = Tasks.blocking_status( mocked_prox, "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment" ) assert status == { "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", "node": "node", "pid": 0, "pstart": 0, "starttime": 0, "type": "task", "id": "id", "user": "root@pam", "status": "stopped", "exitstatus": "OK", } assert caplog.record_tuples == [ ( "proxmoxer.core", 20, "GET https://1.2.3.4:1234/api2/json/nodes/node/tasks/UPID:node:00000000:00000000:00000000:task:id:root@pam:comment/status", ), ( "proxmoxer.core", 10, 'Status code: 200, output: b\'{"data": {"upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", "node": "node", "pid": 0, "pstart": 0, "starttime": 0, "type": "task", "id": "id", "user": "root@pam", "status": "stopped", "exitstatus": "OK"}}\'', ), ] def test_killed(self, mocked_prox, caplog): caplog.set_level(logging.DEBUG, logger="proxmoxer.core") status = Tasks.blocking_status( mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped" ) assert status == { "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "interrupted by signal", "pid": 1044989, "id": "110", "node": "node1", } assert caplog.record_tuples == [ ( "proxmoxer.core", 20, "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped/status", ), ( "proxmoxer.core", 10, 'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "interrupted by signal", "pid": 1044989, "id": "110", "node": "node1"}}\'', ), ] def test_timeout(self, mocked_prox, caplog): caplog.set_level(logging.DEBUG, logger="proxmoxer.core") status = Tasks.blocking_status( mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", timeout=0.021, polling_interval=0.01, ) assert status is None assert caplog.record_tuples == [ ( "proxmoxer.core", 20, "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status", ), ( "proxmoxer.core", 10, 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'', ), ( "proxmoxer.core", 20, "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status", ), ( "proxmoxer.core", 10, 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'', ), ( "proxmoxer.core", 20, "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status", ), ( "proxmoxer.core", 10, 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'', ), ] class TestDecodeUpid: def test_basic(self): upid = "UPID:node:000CFC5C:03E8D0C3:6194806C:aptupdate::root@pam:" decoded = Tasks.decode_upid(upid) assert decoded["upid"] == upid assert decoded["node"] == "node" assert decoded["pid"] == 851036 assert decoded["pstart"] == 65589443 assert decoded["starttime"] == 1637122156 assert decoded["type"] == "aptupdate" assert decoded["id"] == "" assert decoded["user"] == "root@pam" assert decoded["comment"] == "" def test_all_values(self): upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:local" decoded = Tasks.decode_upid(upid) assert decoded["upid"] == upid assert decoded["node"] == "node1" assert decoded["pid"] == 851962 assert decoded["pstart"] == 65597267 assert decoded["starttime"] == 1637122234 assert decoded["type"] == "vzdump" assert decoded["id"] == "103" assert decoded["user"] == "root@pam" assert decoded["comment"] == "local" def test_invalid_length(self): upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam" with pytest.raises(AssertionError) as exc_info: Tasks.decode_upid(upid) assert str(exc_info.value) == "UPID is not in the correct format" def test_invalid_start(self): upid = "ASDF:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:" with pytest.raises(AssertionError) as exc_info: Tasks.decode_upid(upid) assert str(exc_info.value) == "UPID is not in the correct format" class TestDecodeLog: def test_basic(self): log_list = [{"n": 1, "t": "client connection: 127.0.0.1:49608"}, {"t": "TASK OK", "n": 2}] log_str = Tasks.decode_log(log_list) assert log_str == "client connection: 127.0.0.1:49608\nTASK OK" def test_empty(self): log_list = [] log_str = Tasks.decode_log(log_list) assert log_str == "" def test_unordered(self): log_list = [{"n": 3, "t": "third"}, {"t": "first", "n": 1}, {"t": "second", "n": 2}] log_str = Tasks.decode_log(log_list) assert log_str == "first\nsecond\nthird" @pytest.fixture def mocked_prox(mock_pve): return ProxmoxAPI("1.2.3.4:1234", user="user", password="password") 07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!375 blocks
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor