#!/usr/bin/env python3 # Thanks to https://github.com/alfg/murmur-rest/blob/master/app/__init__.py import argparse from collections import defaultdict import configparser import datetime import email.utils import getpass import hashlib import Ice # python-zeroc-ice in AUR import IcePy # python-zeroc-ice in AUR import getpass import os import re import sys import tempfile class IceMgr(object): def __init__(self, args): self.args = args if 'interactive' not in self.args.keys(): self.args['interactive'] = False if self.args['verbose']: import pprint self.getCfg() self.connect() def getCfg(self): _cfg = os.path.join(os.path.abspath(os.path.expanduser(self.args['cfgfile']))) if not os.path.isfile(_cfg): raise FileNotFoundError('{0} does not exist!'.format(_cfg)) return() _parser = configparser.ConfigParser() _parser._interpolation = configparser.ExtendedInterpolation() _parser.read(_cfg) self.cfg = defaultdict(dict) for section in _parser.sections(): self.cfg[section] = {} for option in _parser.options(section): self.cfg[section][option] = _parser.get(section, option) return() def sshTunnel(self): try: from sshtunnel import SSHTunnelForwarder,create_logger except ImportError: raise ImportError('You must install the sshtunnel Python module to use SSH tunneling!') import time _sshcfg = self.cfg['TUNNEL'] # Do some munging to make this easier to deal with. if _sshcfg['user'] == '': _sshcfg['user'] = getpass.getuser() if _sshcfg['port'] == '': _sshcfg['port'] = 22 else: _sshcfg['port'] = int(_sshcfg['port']) if _sshcfg['auth'].lower() == 'passphrase': if _sshcfg['passphrase'] == '': _sshcfg['passphrase'] = getpass.getpass(('What passphrase should ' + 'we use for {0}@{1}:{2}? (Will not ' + 'echo back.)\nPassphrase: ').format( _sshcfg['user'], _sshcfg['host'], _sshcfg['port'])).encode('utf-8') else: _sshcfg['passphrase'] = _sshcfg['passphrase'].encode('utf-8') _sshcfg['key'] = None else: if _sshcfg['key'] == '': _sshcfg['key'] = '~/.ssh/id_rsa' _key = os.path.abspath(os.path.expanduser(_sshcfg['key'])) # We need to get the passphrase for the key, if it's set. if _sshcfg['key_passphrase'].lower() == 'true': _keypass = getpass.getpass(('What is the passphrase for {0}? ' + '(Will not be echoed back.)\nPassphrase: ').format(_key)).encode('utf-8') else: _keypass = None # To pring debug info, just add "logger=create_logger(loglevel=1)" to the params. self.ssh = SSHTunnelForwarder(_sshcfg['host'], ssh_pkey = _key, ssh_private_key_password = _keypass, ssh_username = _sshcfg['user'], ssh_port = _sshcfg['port'], local_bind_address = ('127.0.0.1', ), remote_bind_address = (self.cfg['ICE']['host'], int(self.cfg['ICE']['port'])), set_keepalive = 3.0) self.ssh.start() if self.args['verbose']: print('Configured tunneling for {0}:{1}({2}:{3}) => {4}:{5}'.format( _sshcfg['host'], _sshcfg['port'], self.cfg['ICE']['host'], self.cfg['ICE']['port'], self.ssh.local_bind_address[0], self.ssh.local_bind_address[1])) #self.cfg['ICE']['port'] = int(self.ssh.local_bind_ports[0]) self.cfg['ICE']['port'] = int(self.ssh.local_bind_port) self.cfg['ICE']['host'] = self.ssh.local_bind_address[0] time.sleep(3) return() def connect(self): if self.cfg['TUNNEL']['enable'].lower() == 'true': self.sshTunnel() _props = {'ImplicitContext': 'Shared', 'Default.EncodingVersion': '1.0', 'MessageSizeMax': str(self.cfg['ICE']['max_size'])} _prop_data = Ice.createProperties() for k, v in _props.items(): _prop_data.setProperty('Ice.{0}'.format(k), v) _conn = Ice.InitializationData() _conn.properties = _prop_data self.ice = Ice.initialize(_conn) _host = 'Meta:{0} -h {1} -p {2} -t 1000'.format(self.cfg['ICE']['proto'], self.cfg['ICE']['host'], self.cfg['ICE']['port']) _ctx = self.ice.stringToProxy(_host) # I owe a lot of neat tricks here to: # https://raw.githubusercontent.com/mumble-voip/mumble-scripts/master/Helpers/mice.py # Namely, the load-slice-from-server stuff especially _slicedir = Ice.getSliceDir() if not _slicedir: _slicedir = ["-I/usr/share/Ice/slice", "-I/usr/share/slice"] else: _slicedir = ['-I' + _slicedir] if self.cfg['ICE']['slice'] == '': if IcePy.intVersion() < 30500: # Old 3.4 signature with 9 parameters _op = IcePy.Operation('getSlice', Ice.OperationMode.Idempotent, Ice.OperationMode.Idempotent, True, (), (), (), IcePy._t_string, ()) else: # New 3.5 signature with 10 parameters. _op = IcePy.Operation('getSlice', Ice.OperationMode.Idempotent, Ice.OperationMode.Idempotent, True, None, (), (), (), ((), IcePy._t_string, False, 0), ()) _slice = _op.invoke(_ctx, ((), None)) (_filedesc, _filepath) = tempfile.mkstemp(suffix = '.ice') _slicefile = os.fdopen(_filedesc, 'w') _slicefile.write(_slice) _slicefile.flush() Ice.loadSlice('', _slicedir + [_filepath]) _slicefile.close() os.remove(_filepath) else: # A .ice file was explicitly defined in the cfg _slicedir.append(os.path.join(os.path.abspath(os.path.expanduser(self.cfg['ICE']['slice'])))) Ice.loadSlice('', _slicedir) import Murmur self.conn = {} if self.cfg['AUTH']['read'] != '': _secret = self.ice.getImplicitContext().put("secret", self.cfg['AUTH']['read']) self.conn['read'] = Murmur.MetaPrx.checkedCast(_ctx) else: self.conn['read'] = False if self.cfg['AUTH']['write'] != '': _secret = self.ice.getImplicitContext().put("secret", self.cfg['AUTH']['write']) self.conn['write'] = Murmur.MetaPrx.checkedCast(_ctx) else: self.conn['write'] = False return() def dictify(self, obj): # Thanks to: # https://github.com/alfg/murmur-rest/blob/master/app/utils.py # (Modified to be python 3 compatible) _rv = {'_type': str(type(obj))} if type(obj) in (bool, int, float, str, bytes): return(obj) if type(obj) in (list, tuple): return([dictify(i) for i in obj]) if type(obj) == dict: return(dict((str(k), dictify(v)) for k, v in obj.items())) return(dictify(obj.__dict__)) def add(self): _userinfo = {Murmur.UserInfo.UserName: self.args['UserName']} if not self.conn['write']: raise PermissionError('You do not have write access configured!') if not (self.args['certhash'] or self.args['password']): raise RuntimeError(('You must specify either a certificate hash ' + 'or a method for getting the password.')) if self.args['certhash']: # it's a certificate fingerprint hash _e = '{0} is not a valid certificate fingerprint hash.'.format(self.args['certhash']) try: # Try *really hard* to mahe sure it's a SHA1. # SHA1s are 160 bits in length, in binary representation. # (the string representations are 40 chars in hex). # However, we use 161 because of the prefix python3 adds # automatically: "0b". I know. "This should be 162!" Shut up, trust me. # Change it to 162 and watch it break if you don't believe me. h = int(self.args['certhash'], 16) try: assert len(bin(h)) == 161 #_userinfo[Murmur.UserInfo.UserPassword] = None _userinfo[Murmur.UserInfo.UserHash] = self.args['UserHash'] except AssertionError: raise ValueError(_e) except (ValueError, TypeError): raise ValueError(_e) if self.args['UserPassword']: # it's a password if self.args['UserPassword'] == 'stdin': #self.args['password'] = hashlib.sha1(sys.stdin.read().replace('\n', '').encode('utf-8')).hexdigest().lower() _userinfo[Murmur.UserInfo.UserPassword] = sys.stdin.read().replace('\n', '').encode('utf-8') #_userinfo[Murmur.UserInfo.UserHash] = None else: _repeat = True while _repeat == True: _pass_in = getpass.getpass('What password should {0} have (will not echo back)? '.format(self.args['UserName'])) if not _pass_in or _pass_in == '': print('Invalid password. Please re-enter: ') else: _repeat = False #self.args['password'] = hashlib.sha1(_pass_in.replace('\n', '').encode('utf-8')).hexdigest().lower() _userinfo[Murmur.UserInfo.UserPassword] = _pass_in.replace('\n', '').encode('utf-8') #_userinfo[Murmur.UserInfo.UserHash] = None # Validate the email address if self.args['UserEmail']: _email = email.utils.parseaddr(self.args['UserEmail']) # This is a stupidly simplified regex. For reasons why, see: # https://stackoverflow.com/questions/8022530/python-check-for-valid-email-address # http://www.regular-expressions.info/email.html # TL;DR: email is really fucking hard to regex against, # even (especially) if you follow RFC5322, and I don't want to have # to rely on https://pypi.python.org/pypi/validate_email if not re.match('[^@]+@[^@]+\.[^@]+', _email[1]): raise ValueError('{0} is not a valid email address!'.format(self.args['UserEmail'])) else: _userinfo[Murmur.UserInfo.UserEmail] = _email[1] #else: # _userinfo[Murmur.UserInfo.UserEmail] = None if self.args['UserComment']: _userinfo[Murmur.UserInfo.UserComment] = self.args['UserComment'] # Set a dummy LastActive _userinfo[Murmur.UserInfo.LastActive] = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) # Now we Do the Thing(TM) _server = self.conn['write'].getServer(self.args['server']) _regid = _server.registerUser(_userinfo) # And a little more Doing the Thing(TM), add groups. # This is... a little convoluted. # See https://sourceforge.net/p/mumble/discussion/492607/thread/579de8f9/ if args['groups']: # First we get the ACL listings. The groups are *actually stored in # the ACLs*, which is... insane to me, sort of, but whatever. _acl = _server.getACL() # Then build a dict of all groups to assign. _groups = {} for g in self.args['groups'].split(','): _g = g.strip().split(':') if _g[0] not in _groups.keys(): _groups[_g[0]] = [g[1]] else: _groups[_g[0]].append(_g[1]) # Now we need to see which groups currently exist and which down't. if sys.stdout.isatty(): print('Added user {0} (UID: {1})'.format(self.args['UserName'], _regid)) if self.args['verbose']: _u = _server.getRegistration(_regid) pprint.pprint(self.dictify(_u)) return() def rm(self): pass def lsUsers(self): pass def lsGroups(self): pass def edit(self): pass def status(self): # https://github.com/alfg/murmur-rest/blob/master/app/api.py#L71 pass def close(self): self.ice.destroy() if self.cfg['TUNNEL']['enable'].lower() in ('', 'true'): self.ssh.stop() self.ssh.close() return() def parseArgs(): _cfgfile = os.path.abspath(os.path.join(os.path.expanduser('~'), '.config', 'optools', 'mumbleadmin.ini')) commonargs = argparse.ArgumentParser(add_help = False) reqcommon = commonargs.add_argument_group('REQUIRED common arguments') optcommon = argparse.ArgumentParser(add_help = False) reqcommon.add_argument('-u', '--user', type = str, dest = 'UserName', required = True, help = 'The username to perform the action for.') reqcommon.add_argument('-s', '--server', type = int, dest = 'server', default = 1, help = ('The server ID. ' + 'Defaults to \033[1m{0}\033[0m').format(1)) optcommon.add_argument('-f', '--config', type = str, dest = 'cfgfile', metavar = '/path/to/mumbleadmin.ini', default = _cfgfile, help = ('The path to the configuration file ' + '("mumleadmin.ini"). Default: \033[1m{0}\033[0m').format(_cfgfile)) optcommon.add_argument('-v', '--verbose', dest = 'verbose', action = 'store_true', help = ('If specified, print more information than normal')) args = argparse.ArgumentParser(epilog = 'This program has context-sensitive help (e.g. try "... add --help")') subparsers = args.add_subparsers(help = 'Operation to perform', dest = 'operation') addargs = subparsers.add_parser('add', parents = [commonargs, optcommon], help = 'Add a user to the Murmur database') delargs = subparsers.add_parser('rm', parents = [commonargs, optcommon], help = 'Remove a user from the Murmur database') listargs = subparsers.add_parser('ls', parents = [optcommon], help = 'List users in the Murmur database') editargs = subparsers.add_parser('edit', parents = [commonargs, optcommon], help = 'Edit a user in the Murmur database') # Operation-specific optional arguments # Why did I even add this? It's not used *anywhere*. #addargs.add_argument('-n', '--name', # type = str, # metavar = '"Firstname Lastname"', # dest = 'name', # default = None, # help = 'The new user\'s (real) name') addargs.add_argument('-c', '--comment', type = str, metavar = '"This comment becomes the user\'s profile."', dest = 'UserComment', default = None, help = 'The comment for the new user') addargs.add_argument('-e', '--email', type = str, metavar = 'email@domain.tld', dest = 'UserEmail', default = None, help = 'The email address for the new user') addargs.add_argument('-C', '--certhash', type = str, metavar = 'CERTIFICATE_FINGERPRINT_HASH', default = None, dest = 'UserHash', help = ('The certificate fingerprint hash. See gencerthash.py. ' + 'This is the preferred way. ' + 'If you do not specify this, you must specify -p/--passwordhash')) addargs.add_argument('-p', '--password', type = str, dest = 'UserPassword', choices = ['stdin', 'prompt'], default = None, help = ('If not specified, you must specify -C/--certhash. Otherwise, either ' + '\'stdin\' (the password is being piped into this program) or \'prompt\' ' + '(a password will be asked for in a non-echoing prompt). "prompt" is much more secure and recommended.')) addargs.add_argument('-g', '--groups', type = str, metavar = 'CHANID:GROUP1(,CHANID:GROUP2,CHANID:GROUP3...)', default = None, help = ('A comma-separated list of groups the user should be added to. If a group ' + 'doesn\'t exist, it will be created. CHANID is a ' + 'numerical ID of the channel to assign the group to. ' + '(You can get channel IDs by doing "... ls -gv".) ' + 'If no CHANID is provided, the root channel (0) will be used.')) # Listing should only take the DB as the "common" arg listargs.add_argument('-g', '--groups', action = 'store_true', dest = 'groups', help = 'If specified, list groups (and their members), not users') listargs.add_argument('-s', '--server', type = str, dest = 'server', default = None, help = 'The server ID. Defaults to all servers. Specify one by the numerical ID.') # Deleting args delargs.add_argument('-n', '--no-prune', dest = 'noprune', action = 'store_true', help = ('If specified, do NOT remove the ACLs and user info for the user as well (profile, ' + 'certificate fingerprint, etc.)')) delargs.add_argument('-P', '--prune-groups', dest = 'prunegrps', action = 'store_true', help = ('If specified, remove any groups the user was in ' + 'that are now empty (i.e. the user was the only member)')) return(args) def main(): args = vars(parseArgs().parse_args()) if not args['operation']: #raise RuntimeError('You must specify an operation to perform. Try running with -h/--help.') exit('You must specify an operation to perform. Try running with -h/--help.') args['interactive'] = True mgmt = IceMgr(args) if args['operation'] == 'add': mgmt.add() elif args['operation'] == 'rm': mgmt.rm() elif args['operation'] == 'ls': if not args['groups']: mgmt.lsUsers() else: mgmt.lsGroups() elif args['operation'] == 'edit': mgmt.edit() else: pass # No-op because something went SUPER wrong. mgmt.close() if __name__ == '__main__': main()