adding some testing tools

This commit is contained in:
brent s. 2020-04-01 11:37:21 -04:00
parent 236af1ea37
commit 616374754b
Signed by: bts
GPG Key ID: 8C004C2F93481F6B
6 changed files with 188 additions and 1 deletions

6
.gitignore vendored
View File

@ -18,6 +18,12 @@ pkg/
*.jar
*.deb
test*
!testing/
!testing/*
testing/data
testing/test.pid
testing/testserver.json
testing/vault.log
__pycache__/
logs/
docs/README.html

50
testing/serverconf.py Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3

import json
import os


conf_file = './testserver.json'
log_file = './vault.log'

default_conf = {'listener': [
{'tcp': '127.0.0.1:8200',
'tls_disable': True}
],
'storage': {'file': './data'},
'log_level': 'Debug', # highest is 'Trace'
'pid_file': './test.pid',
'raw_storage_endpoint': True,
'log_format': 'json', # or String
'ui': True}


conf_file = os.path.abspath(os.path.expanduser(conf_file))
log_file = os.path.abspath(os.path.expanduser(log_file))


def genConf(confdict = None):
if not confdict:
confdict = default_conf.copy()
storage = confdict.get('storage')
if storage:
if 'file' in storage.keys():
storage['file'] = os.path.abspath(os.path.expanduser(storage['file']))
confdict['storage'] = storage
if 'pid_file' in confdict:
confdict['pid_file'] = os.path.abspath(os.path.expanduser(confdict['pid_file']))
conf = os.path.abspath(os.path.expanduser(conf_file))
with open(conf, 'w') as fh:
fh.write(json.dumps(confdict, indent = 4))
return(None)


def parseHCL(hclcontent):
# We only load this on-demand.
import hcl
conf = hcl.loads(hclcontent)
return(conf)


if __name__ == '__main__':
genConf()

110
testing/spawn.py Executable file
View File

@ -0,0 +1,110 @@
#!/usr/bin/env python3

import json
import os
import re
import socket
import subprocess
##
import hvac
import psutil
##
from . import serverconf
from . import vauptpassconf


_url_re = re.compile(r'^(?P<proto>https?)://(?P<addr>[^:/]+)(:(?P<port>[0-9]+)?)?(?P<path>/.*)?$')


class VaultSpawner(object):
client = hvac.Client()
binary_name = 'vault'

def __init__(self, conf, genconf = True, is_dev = False):
self.conf = conf
self.genconf = genconf
self.pid = None
self.process = None
self._parseConf()

def _getProcess(self):
processes = [p for p in psutil.process_iter() if p.name() == self.binary_name]
if not processes:
self.process = None
self.pid = None
return(None)
r = _url_re.search(self.client.url)
if not r:
raise ValueError('Invalid server URL')
try:
port = int(r.groupdict().get('port', 8200))
except ValueError:
port = 8200
ip = socket.gethostbyname(r.groupdict('addr'))
pids = []
# First we try the easy way, but requires root privs even if you ran vault as your current user.
has_priv = True
for p in processes:
pids.append(p)
try:
p_port = p.connections()
except (psutil.AccessDenied, psutil.AccessDenied):
has_priv = False
break
if not has_priv:
conns = [c for c in psutil.net_connections() if c.laddr.port == port and c.laddr.ip == ip]
if not conns:
self.process = None
self.pid = None
return(None)
for c in conns:
if not c.pid:
continue
if c.pid in pids:
self.pid = c.pid
self.process = psutil.Process(pid = self.pid)
if not all((self.process, self.pid)):
if len(conns) == 1 and len(pids) == 1:
self.process = pids[0]
self.pid = self.process.pid
else:
raise RuntimeError('Could not reliably determine which Vault instance to manage')



def _parseConf(self):
is_hcl = False
rawconf = None
if not self.conf:
if os.path.isfile(serverconf.conf_file):
self.conf = serverconf.conf_file
else:
# Use the default.
self.genconf = True
self.conf = serverconf.default_conf
elif not isinstance(self.conf, dict):
# Assume it's a file.
self.conf = os.path.abspath(os.path.expanduser(self.conf))
with open(self.conf, 'r') as fh:
rawconf = fh.read()
try:
self.conf = json.loads(rawconf)
except json.decoder.JSONDecodeError:
is_hcl = True # It's probably HCL.
if is_hcl:
self.conf = serverconf.parseHCL(rawconf)
if self.genconf:
serverconf.genConf(confdict = self.conf)
listener = self.conf['listener'][0]['tcp']
addr = listener['address']
is_tls = listener.get('tls_disable', False)
url = '{0}://{1}'.format(('https' if is_tls else 'http'), addr)
if not _url_re.search(url):
raise ValueError('Invalid server address')
self.client.url = url
return(None)

def start(self):
if any((self.pid, self.process)):
# Already started.
return(None)

19
testing/test.config.xml Normal file
View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8" ?>
<vaultpass xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://git.square-r00t.net/VaultPass/"
xsi:schemaLocation="https://git.square-r00t.net/VaultPass/ http://schema.xml.r00t2.io/projects/vaultpass.xsd">
<server>
<uri>http://localhost:8200/</uri>
<unseal>WU9VUiBVTlNFQUwgU0hBUkQgSEVSRQo=</unseal>
</server>
<auth>
<token/>
</auth>
<!--
<mounts>
<mount type="kv1">secret_legacy</mount>
<mount type="kv2">secret</mount>
<mount type="cubbyhole">cubbyhole</mount>
</mounts>
-->
</vaultpass>

1
testing/vauptpassconf.py Symbolic link
View File

@ -0,0 +1 @@
../vaultpass/config.py

View File

@ -108,9 +108,10 @@ class MountHandler(object):
if mount not in self.paths.keys():
self.paths[mount] = {}
try:
_logger.debug('Fetching path {0} on mount {1}...'.format(path, mount))
paths = handler.list_secrets(**args)
except hvac.exceptions.InvalidPath:
# It's a secret name.
# It's a secret name or doesn't exist.
_logger.debug('Path {0} on mount {1} is a secret, not a subdir.'.format(path, mount))
dpath.util.new(self.paths, fullpath, self.getSecretNames(path, mount, version = version))
continue