From 7b7d265592177c8dc591556e5c0c323a6bfea445 Mon Sep 17 00:00:00 2001 From: brent s Date: Tue, 7 Apr 2020 02:46:19 -0400 Subject: [PATCH] getting there --- vaultpass/QR.py | 56 +++++++++++++++++++ vaultpass/__init__.py | 125 ++++++++++++++++++++++++++++++++++++++++-- vaultpass/args.py | 11 ++-- vaultpass/mounts.py | 17 +++++- 4 files changed, 197 insertions(+), 12 deletions(-) create mode 100644 vaultpass/QR.py diff --git a/vaultpass/QR.py b/vaultpass/QR.py new file mode 100644 index 0000000..122e6ef --- /dev/null +++ b/vaultpass/QR.py @@ -0,0 +1,56 @@ +import io +import os +import subprocess +import logging +_logger = logging.getLogger() +## +try: + import qrcode + has_qrcode = True + _logger.warning(('Could not import qrcode; ' + 'library required for QR code generation')) +except ImportError: + has_qrcode = False +try: + import qrcode.image.svg + has_qrcode_svg = True +except ImportError: + has_qrcode_svg = False + _logger.error(('Could not import qrcode.image.svg; ' + 'library required for image QR code generation')) + + +def genQr(data, image = False): + _logger.debug('Generating QR code') + qr = qrcode.QRCode(error_correction = qrcode.constants.ERROR_CORRECT_H) + qr.add_data(data) + qr.make(fit = True) + termname = os.environ.get('TERM', 'linux') + if termname == 'linux': + # We can't spawn an xdg-open because we don't have X. + _logger.warning('Disabling image display/generation because we don\'t have X') + image = False + if not image: + _logger.debug('Rendering to terminal') + buf = io.StringIO() + if termname == 'linux' or termname.startswith('screen'): + invert = False + else: + _logger.debug('Inverting B/W for better visibility in environment (we cannot predict terminal colours)') + invert = True + qr.print_ascii(invert = invert, out = buf) + else: + _logger.debug('Rendering to image') + buf = io.BytesIO() + if not has_qrcode_svg: + _logger.warning('Falling back to PNG for image generation; could not support SVG') + # Generate a PNG + img = qr.make_image() + else: + _logger.debug('Generating an SVG') + # Preferred; generate an SVG. + factory = qrcode.image.svg.SvgPathFillImage + img = qrcode.make(data, image_factory = factory) + img.save(buf) + buf.seek(0, 0) + return(buf, image) diff --git a/vaultpass/__init__.py b/vaultpass/__init__.py index fa0486f..d1f3e5a 100644 --- a/vaultpass/__init__.py +++ b/vaultpass/__init__.py @@ -1,8 +1,13 @@ import logging +import tempfile import os +import subprocess ## from . import logger _logger = logging.getLogger('VaultPass') +## +import hvac.exceptions +## from . import args from . import auth from . import clipboard @@ -11,6 +16,7 @@ from . import constants from . import gpg_handler from . import mounts from . import pass_import +from . import QR class VaultPass(object): @@ -19,7 +25,8 @@ class VaultPass(object): uri = None mount = None - def __init__(self, cfg = '~/.config/vaultpass.xml'): + def __init__(self, mount, cfg = '~/.config/vaultpass.xml'): + self.mname = mount self.cfg = config.getConfig(cfg) self._getURI() self.getClient() @@ -44,9 +51,48 @@ class VaultPass(object): raise RuntimeError('Unable to unseal') return(None) + def _getHandler(self, mount = None, func = 'read', *args, **kwargs): + if func not in ('read', 'write', 'list'): + _logger.error('Invalid func') + _logger.debug('Invalid func; must be one of: read, write, list, update') + raise ValueError('Invalid func') + if not mount: + mount = self.mname + mtype = self.mount.getMountType(mount) + handler = None + if mtype == 'cubbyhole': + if func == 'read': + handler = self.mount.cubbyhandler.read_secret + elif func == 'write': + handler = self.mount.cubbyhandler.write_secret + elif func == 'list': + handler = self.mount.cubbyhandler.list_secrets + elif mtype == 'kv1': + if func == 'read': + handler = self.client.secrets.kv.v1.read_secret + elif func == 'write': + handler = self.client.secrets.kv.v1.create_or_update_secret + elif func == 'list': + handler = self.client.secrets.kv.v1.list_secrets + elif mtype == 'kv2': + if func == 'read': + handler = self.client.secrets.kv.v2.read_secret_version + elif func == 'write': + handler = self.client.secrets.kv.v2.create_or_update_secret + elif func == 'list': + handler = self.client.secrets.kv.v2.list_secrets + if not handler: + _logger.error('Could not get handler') + _logger.debug('Could not get handler for mount {0}'.format(mount)) + raise RuntimeError('Could not get handler') + return(handler) + def _getMount(self): mounts_xml = self.cfg.xml.find('.//mounts') self.mount = mounts.MountHandler(self.client, mounts_xml = mounts_xml) + if self.mname: + # Check that the mount exists + self.mount.getMountType(self.mname) return(None) def _getURI(self): @@ -65,6 +111,14 @@ class VaultPass(object): _logger.debug('Set URI to {0}'.format(self.uri)) return(None) + def _pathExists(self, path, mount = None, *args, **kwargs): + if not mount: + mount = self.mname + exists = False + if self.mount.getPath(path, mount): + exists = True + return(exists) + def convert(self, mount, force = False, @@ -74,7 +128,15 @@ class VaultPass(object): pass # TODO def copySecret(self, oldpath, newpath, mount, newmount, force = False, remove_old = False, *args, **kwargs): - pass # TODO + mtype = self.mount.getMountType(mount) + oldexists = self._pathExists(oldpath, mount = mount) + if not oldexists: + _logger.error('oldpath does not exist') + _logger.debug('The oldpath {0} does not exist'.format(oldpath)) + raise ValueError('oldpath does not exist') + data = self.getSecret(oldpath, mount) + # TODO: left off here + newexists = self._pathExists(newpath, mount = mount) if remove_old: self.deleteSecret(oldpath, mount, force = force) @@ -152,8 +214,63 @@ class VaultPass(object): raise RuntimeError('Not initialized') return(None) - def getSecret(self, path, mount, clip = None, qr = None, seconds = constants.CLIP_TIMEOUT, *args, **kwargs): - pass # TODO + def getSecret(self, + path, + mount, + kname = None, + clip = None, + qr = None, + seconds = constants.CLIP_TIMEOUT, + printme = False, + *args, **kwargs): + mtype = self.mount.getMountType(mount) + args = {'path': path, + 'mount_point': mount} + handler = self._getHandler(mount, func = 'read') + try: + data = handler(**args) + if mtype in ('cubbyhole', 'kv1'): + data = data['data'] + elif mtype == 'kv2': + data = data['data']['data'] + if kname: + data = data.get(kname) + except hvac.exceptions.InvalidPath as e: + lpath = path.split('/') + path = '/'.join(lpath[0:-1]) + args = {'path': path, + 'kname': lpath[-1], + 'mount': mount, + 'clip': clip, + 'qr': qr, + 'seconds': seconds, + 'printme': printme} + data = self.getSecret(**args) + if qr is not None: + data, has_x = QR.genQr(data, image = True) + if has_x: + fpath = tempfile.mkstemp(prefix = '.vaultpass.qr.', suffix = '.svg', dir = '/dev/shm')[1] + _logger.debug('Writing to {0} so it can be displayed'.format(fpath)) + with open(fpath, 'wb') as fh: + fh.write(data.read()) + if printme: + _logger.debug('Opening {0} in the default image viwer application'.format(fpath)) + cmd = subprocess.run(['xdg-open', fpath], stdout = subprocess.PIPE, stderr = subprocess.PIPE) + if cmd.returncode != 0: + _logger.error('xdg-open returned non-zero status code') + for x in ('stdin', 'stdout'): + o = getattr(cmd, x) + if not o: + continue + o = o.decode('utf-8').strip() + if o != '': + _logger.debug('{0}: {1}'.format(x.upper(), o)) + os.remove(fpath) + elif printme: + print(data.read()) + data.seek(0, 0) + # TODO: clip, etc. + return(data) def initVault(self, *args, **kwargs): pass # TODO diff --git a/vaultpass/args.py b/vaultpass/args.py index f0dcb71..0bf7958 100644 --- a/vaultpass/args.py +++ b/vaultpass/args.py @@ -408,7 +408,7 @@ def parseArgs(): metavar = 'PATH/TO/SECRET', help = ('The path to the secret or subdirectory')) # SHOW - # vp.getSecret() ? plus QR etc. printing + # vp.getSecret(printme = True) # TODO: does the default overwrite the None if not specified? show.add_argument('-c', '--clip', nargs = '?', @@ -424,11 +424,10 @@ def parseArgs(): nargs = '?', type = int, metavar = 'LINE_NUMBER', - default = constants.SHOW_CLIP_LINENUM, - help = ('If specified, do not print the secret at line number LINE_NUMBER (Default: {0}) but ' - 'instead generate a QR code of it (either graphically or in-terminal depending on ' - 'environment). ' - 'Use 0 for LINE_NUMBER for the entire secret').format(constants.SHOW_CLIP_LINENUM)) + default = None, + help = ('If specified, do not print the secret but instead generate a QR code of it (either ' + 'graphically or in-terminal depending on environment). ' + 'LINE_NUMBER has no effect and is kept for compatibility reasons')) show.add_argument('-s', '--seconds', dest = 'seconds', type = int, diff --git a/vaultpass/mounts.py b/vaultpass/mounts.py index dc0939a..37488c5 100644 --- a/vaultpass/mounts.py +++ b/vaultpass/mounts.py @@ -26,6 +26,10 @@ class CubbyHandler(object): def __init__(self, client): self.client = client + def create_or_update_secret(self, *args, **kwargs): + # Alias function + return(self.write_secret(*args, **kwargs)) + def list_secrets(self, path, mount_point = 'cubbyhole'): path = path.lstrip('/') uri = '/v1/{0}/{1}'.format(mount_point, path) @@ -34,11 +38,12 @@ class CubbyHandler(object): def read_secret(self, path, mount_point = 'cubbyhole'): path = path.lstrip('/') - uri = '/v1/{0}/{1}'.format(mount_point, path) + # uri = '/v1/{0}/{1}'.format(mount_point, path) + uri = '{0}/{1}'.format(mount_point, path) resp = self.client._adapter.get(url = uri) return(resp.json()) - def write_secret(self, path, secret, mount_point = 'cubbyhole'): + def write_secret(self, path, secret, mount_point = 'cubbyhole', *args, **kwargs): path = path.lstrip('/') args = {'path': '/'.join((mount_point, path))} for k, v in secret.items(): @@ -104,6 +109,14 @@ class MountHandler(object): raise ValueError('Mount not found in defined mounts') return(mtype) + def getPath(self, path, mount): + relpath = path.lstrip('/') + fullpath = '/'.join((mount, relpath)) + if not self.paths: + self.getSecretsTree() + obj = dpath.util.get(self.paths, fullpath, None) + return(obj) + def getSecret(self, path, mount, version = None): if not self.mounts: self.getSysMounts()