Source code for safe.gpg

# -*- coding: utf-8 -*-
"""
Facilities for interacting with GPG encrypted files.

:author: Joe Joyce <joe@decafjoe.com>
:copyright: Copyright (c) Joe Joyce and contributors, 2016-2019.
:license: BSD
"""
import os
import re
import shutil
import subprocess

from safe.util import get_executable, Subprocess, temporary_directory


#: Name of the cipher to use if unspecified in :meth:`GPGFile.save`.
#:
#: :type: :class:`str`
PREFERRED_CIPHER = 'aes256'


[docs]class GPGError(Exception): """Raised for errors from this module."""
[docs] def __init__(self, message, stdout, stderr): """ Instantiate the error. :param str message: Short message describing the error :param stdout: Standard output related to the error :type stdout: :class:`str` or ``None`` :param stderr: Standard error related to the error :type stderr: :class:`str` or ``None`` """ super(GPGError, self).__init__(message) #: Short message describing the error. #: #: :type: :class:`str` self.message = message #: Standard out associated with the error. #: #: :type: :class:`str` or ``None`` self.stdout = stdout #: Standard error associated with the error. #: #: :type: :class:`str` or ``None`` self.stderr = stderr
[docs]def get_gpg_executable(): """ Return GPG executable, raising a :exc:`GPGError` if not found. This will first look for an executable named ``gpg2``, returning it immediately if found. If ``gpg2`` does not exist but ``gpg`` does, this function runs ``gpg --version`` to check the version. If version 2, the absolute path to the executable is returned. Failure to find a GPG2 executable results in a :exc:`GPGError` being raised. :raise: :exc:`GPGError` if GPG executable is not found :return: Absolute path to the GPG executable :rtype: :class:`str` """ rv = get_executable('gpg2') if rv is not None: return rv rv = get_executable('gpg') if rv is None: msg = 'neither gpg2 nor gpg executables were found' raise GPGError(msg, None, None) process = Subprocess((rv, '--version'), stdout=subprocess.PIPE) stdout, stderr = process.communicate() if process.returncode: msg = '`gpg --version` exited non-zero: %s' % process.returncode raise GPGError(msg, stdout, stderr) regex = re.compile(r'^gpg[^\n]+\s+((?P<major>\d+)\.\d+\.\d+).+', re.DOTALL) match = regex.search(stdout) if not match: msg = 'could not extract version from `gpg --version`' raise GPGError(msg, stdout, stderr) major_version = match.groupdict()['major'] if major_version != '2': msg = 'safe requires gpg version 2, found version: %s' % major_version raise GPGError(msg, stdout, stderr) return rv
[docs]class GPGSubprocess(Subprocess): """Convenience class for running GPG commands.""" #: Absolute path to the GPG executable. #: #: This is populated the first time a :class:`GPGSubprocess` is #: instantiated and is subsequently reused. #: #: :type: ``None`` until populated with a :class:`str` on :meth:`__init__` _gpg = None
[docs] def __init__(self, command): """ Instantiate the subprocess. :param command: Arguments to pass to GPG :type command: :func:`tuple` of arguments, *not* including ``gpg`` itself at the beginning """ if self.__class__._gpg is None: self.__class__._gpg = get_gpg_executable() cmd = (self._gpg,) + command pipe = subprocess.PIPE kwargs = dict(stdin=pipe, stdout=pipe, stderr=pipe) super(GPGSubprocess, self).__init__(cmd, **kwargs)
[docs]class GPGFile(object): """Manage decryption and encryption of a GPG file.""" #: Regex matching the keyid output string from ``gpg #: --list-packets``. #: #: :type: :func:`re.compile` KEYID_RE = re.compile(r'keyid (?P<keyid>[0-9A-F]+)') # These are defined here so we can reference them in the internal # documentation in doc/development/internals/gpg.rst. _homedir = None _keyid = None _password = None _path = None _symmetric = None
[docs] def __init__(self, path): """ Instantiate the file wrapper. :param str path: Path to the GPG encrypted file :raise: :exc:`GPGError` if file cannot be read """ #: Home directory to use for GnuPG calls (i.e. the ``--homedir`` #: argument). Defaults to ~/.gnupg. This attribute exists to #: allow tests to tweak the GnuPG environment while running, #: and is not otherwise used. #: #: :type: :class:`str` self._homedir = os.path.join(os.path.expanduser('~'), '.gnupg') #: Keyid to which the file was encrypted. Populated when #: :meth:`decrypt_to` is called. #: #: :type: :class:`str` or ``None`` (if file is symmetrically #: encrypted) self._keyid = None #: Password with which file was encryted. Populated when #: :meth:`decrypt_to` is called. #: #: :type: :class:`str` or ``None`` (if file is asymmetrically #: encrypted) self._password = None #: Path to the encrypted file. #: #: :type: :class:`str` self._path = path #: Boolean indicating whether the file is symmetrically #: encrypted. If false, the file is asymmetrically encrypted. #: #: :type: :class:`bool` self._symmetric = None with temporary_directory() as tmp: command = ( '--batch', '--homedir', tmp, '--passphrase', '', '--quiet', '--list-packets', path, ) process = GPGSubprocess(command) stdout, stderr = process.communicate() for line in stdout.splitlines(): if line.startswith(':symkey'): self._symmetric = True break elif line.startswith(':pubkey'): self._symmetric = False error_msg = 'failed to extract keyid from packets' match = self.KEYID_RE.search(line) if not match: raise GPGError(error_msg, stdout, stderr) keyid = match.groupdict()['keyid'] if re.search('^0+$', keyid): raise GPGError(error_msg, stdout, stderr) self._keyid = keyid break if self._symmetric is None: msg = 'did not find encryption type packet in file (are you ' \ 'sure this is a gpg file?)' raise GPGError(msg, stdout, stderr)
@property def symmetric(self): """If true, file is encrypted symmetrically (i.e. with a password).""" return self._symmetric
[docs] def decrypt_to(self, path, password=None): """ Decrypt file to ``path`` using ``password``. If decryption is successful, this will cache the password/keyid for use in subsequent calls to :meth:`save`. :param str path: Path to which to decrypt file :param password: Password for file, if encrypted symmetrically :type password: :class:`str` if file is symmetrically encrypted else ``None`` :raise: :exc:`GPGError` if decryption fails :rtype: ``None`` """ if self.symmetric and password is None: raise Exception('password required when symmetrically encrypted') command = ( '--batch', '--homedir', self._homedir, '--output', path, '--quiet', ) if self.symmetric: command += ('--passphrase-fd', '0') command += ('--decrypt', self._path) process = GPGSubprocess(command) stdout, stderr = process.communicate(password) if process.returncode: raise GPGError('failed to decrypt file', stdout, stderr) self._password = password
[docs] def save(self, source, cipher=PREFERRED_CIPHER): """ Save plaintext file ``source`` back to the original path, encrypted. :meth:`decrypt_to` **must be called before calling this method.** Certain values needed by this method are cached when a file is decrypted. (Namely, password for symmetrically encrypted files and keyid for asymmetrically encrypted files.) :param str source: Path to file to save :param str cipher: Cipher to use for encryption (defaults to :data:`PREFERRED_CIPHER`) :raise: :exc:`GPGError` if encryption fails (original encrypted file is left untouched) :rtype: ``None`` """ with temporary_directory() as tmp: tmp_path = os.path.join(tmp, 'f') command = ( '--armor', '--batch', '--cipher-algo', cipher, '--homedir', self._homedir, '--output', tmp_path, '--quiet', ) if self.symmetric: command += ('--passphrase-fd', '0', '--symmetric') else: command += ('--recipient', self._keyid, '--encrypt') command += (source,) process = GPGSubprocess(command) stdout, stderr = process.communicate(self._password) if process.returncode: raise GPGError('failed to re-encrypt file', stdout, stderr) shutil.move(tmp_path, self._path)