i am ALMOST done. just need to finish timestamp comparisons.

i can probably do it with a sorted list and comparing the last values in a remote vs. local list.
This commit is contained in:
brent s. 2020-06-16 02:34:05 -04:00
parent eed480c590
commit 66d1ad7af5
Signed by: bts
GPG Key ID: 8C004C2F93481F6B
11 changed files with 541 additions and 20 deletions

View File

@ -2,10 +2,11 @@ from . import logger
##
import logging
##
_logger = logging.getLogger()
from . import config
from . import constants
from . import fetcher
from . import sync


_logger = logging.getLogger()

Sync = sync.Sync

View File

@ -1,2 +1,12 @@
PROTO_DEF_PORTS = {'ftp': 21,
'rsync': 873}
RSYNC_DEF_ARGS = ['recursive',
'times',
'links',
'hard-links',
'delete-after',
'delay-updates',
'copy-links',
'safe-links',
'delete-extended',
'exclude=.*']

View File

@ -0,0 +1,5 @@
from . import ftp
from . import rsync

FTP = ftp.FTP
RSync = rsync.RSync

View File

@ -0,0 +1,36 @@
import datetime
import logging
import os


_logger = logging.getLogger()


class BaseFetcher(object):
type = None

def __init__(self, domain, port, path, dest, owner = None, filechecks = None, *args, **kwargs):
self.domain = domain
self.port = int(port)
self.path = path
self.dest = os.path.abspath(os.path.expanduser(dest))
self.url = '{0}://{1}:{2}/{3}'.format(self.type, self.domain, self.port, self.path.lstrip('/'))
self.owner = owner
self.filechecks = filechecks
self.timestamps = {}
os.makedirs(self.dest, mode = 0o0755, exist_ok = True)
if self.owner:
os.chown(self.dest, **self.owner)

def check(self):
for k, v in self.filechecks['remote']:
if v:
tstmp_raw = self.fetch_content(v.path).decode('utf-8').strip()
tstmp = datetime.datetime.strptime(tstmp_raw, v.fmt)
self.timestamps[k] = tstmp
_logger.debug('Updated timestamps: {0}'.format(self.timestamps))
return(None)

def fetch_content(self, path):
# Dummy func.
return(b'')

136
repomirror/fetcher/ftp.py Normal file
View File

@ -0,0 +1,136 @@
import ftplib
import logging
import io
import os
import pathlib
##
from . import _base


_logger = logging.getLogger()


class FTP(_base.BaseFetcher):
type = 'ftp'

def __init__(self, domain, port, path, dest, owner = None, *args, **kwargs):
super().__init__(domain, port, path, dest, owner = owner, *args, **kwargs)
_logger.debug('Instantiated FTP fetcher')
self.handler = ftplib.FTP(self.domain)
_logger.debug('Configured handler for {0}'.format(self.domain))
self.handler.port = self.port
_logger.debug('Set port for {0} to {1}'.format(self.domain, self.port))
self.connected = None

def _connect(self):
if not self.connected:
self.handler.login()
_logger.debug('Connected to {0}:{1} as Anonymous'.format(self.domain, self.port))
self.connected = True
return(None)

def _disconnect(self):
if self.connected:
self.handler.quit()
_logger.debug('Disconnected from {0}:{1} as Anonymous'.format(self.domain, self.port))
self.connected = False
return(None)

def _pathtuple(self, path):
relpath = path.lstrip('/')
relpath_stripped = str(pathlib.Path(relpath).relative_to(self.path))
destdir = os.path.join(self.dest, os.path.dirname(relpath_stripped))
destpath = os.path.join(self.dest, relpath_stripped)
return((relpath, destdir, destpath))

def _prepdir(self, destdir):
os.makedirs(destdir, mode = 0o0755, exist_ok = True)
_logger.debug('Created directory {0} (if it did not exist)'.format(destdir))
if self.owner:
os.chown(destdir, **self.owner)
_logger.debug('Chowned {0} to {uid}:{gid}'.format(destdir, **self.owner))
return()

def fetch(self):
def getter(path, relroot):
_logger.debug('getter invoked with path={0}, relroot={1}'.format(path, relroot))
if relroot == path:
parentdir = path
_logger.debug('relroot and path are the same')
else:
parentdir = relroot
_logger.debug('relroot and path are not the same')
_logger.debug('parentdir set to {0}'.format(parentdir))
_logger.debug('Executing LS on {0}'.format(parentdir))
for itemspec in self.handler.mlsd(parentdir):
relpath, spec = itemspec
if relpath in ('.', '..'):
continue
_logger.debug(('Parsing path ('
'relroot: {0}, '
'path: {1}, '
'relpath: {2}) with spec {3}').format(relroot, path, relpath, itemspec))
ptype = spec['type']
newpath = os.path.join(parentdir, relpath)
itemspec = (newpath, itemspec[1])
if ptype.startswith('OS.unix=slink'):
_logger.debug('Fetching symlink {0}'.format(parentdir))
self.fetch_symlink(itemspec)
elif ptype == 'dir':
_logger.debug('Fetching dir {0}'.format(parentdir))
self.fetch_dir(itemspec)
_logger.debug('Recursing getter with relpath={0}, parentdir={1}'.format(relpath, parentdir))
getter(relpath, newpath)
elif ptype == 'file':
_logger.debug('Fetching file {0}'.format(parentdir))
self.fetch_file(itemspec)
return(None)
self._connect()
getter(self.path, self.path)
self._disconnect()
return(None)

def fetch_content(self, remote_filepath):
self._connect()
buf = io.BytesIO()
self.handler.retrbinary('RETR {0}'.format(remote_filepath), buf.write)
self._disconnect()
buf.seek(0, 0)
return(buf.read())

def fetch_dir(self, pathspec):
self._connect()
# Relative to FTP root.
relpath, destdir, destpath = self._pathtuple(pathspec[0])
mode = int(pathspec[1]['unix.mode'], 8)
os.makedirs(destpath, mode = mode, exist_ok = True)
_logger.debug('Created directory {0} with mode {1} (if it did not exist)'.format(destpath, oct(mode)))
if self.owner:
os.chown(destpath, **self.owner)
_logger.debug('Chowned {0} to {uid}:{gid}'.format(destpath, **self.owner))
return(None)

def fetch_file(self, pathspec):
self._connect()
relpath, destdir, destpath = self._pathtuple(pathspec[0])
self._prepdir(destdir)
with open(destpath, 'wb') as fh:
self.handler.retrbinary('RETR {0}'.format(relpath), fh.write)
_logger.debug('Created file {0}'.format(destpath))
mode = int(pathspec[1]['unix.mode'], 8)
os.chmod(destpath, mode)
_logger.debug('Chmodded {0} to {1}'.format(destpath, oct(mode)))
if self.owner:
os.chown(destpath, **self.owner)
_logger.debug('Chowned {0} to {uid}:{gid}'.format(destpath, **self.owner))
return(None)

def fetch_symlink(self, pathspec):
relpath, destdir, destpath = self._pathtuple(pathspec[0])
self._prepdir(destdir)
# For symlinks, this is something like: OS.unix=slink:path/to/target
target = pathspec[1]['type'].split(':', 1)[1]
# We don't care if the target exists.
os.symlink(target, destpath)
_logger.debug('Created symlink {0} -> {1}'.format(destpath, target))
return(None)

View File

@ -0,0 +1,99 @@
import logging
import os
import subprocess
import sys
import tempfile
import warnings
##
_cur_dir = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
sys.path.append(os.path.abspath(os.path.join(_cur_dir, '..')))
import constants
# import logger
from . import _base


_logger = logging.getLogger()


class RSync(_base.BaseFetcher):
type = 'rsync'

def __init__(self,
domain,
port,
path,
dest,
rsync_args = None,
owner = None,
log = True,
filechecks = None,
*args,
**kwargs):
super().__init__(domain, port, path, dest, owner = owner, filechecks = filechecks, *args, **kwargs)
_logger.debug('Instantiated RSync fetcher')
if rsync_args:
self.rsync_args = rsync_args
else:
self.rsync_args = constants.RSYNC_DEF_ARGS
_logger.debug('RSync args given: {0}'.format(self.rsync_args))
if log:
# Do I want to do this in subprocess + logging module? Or keep this?
# It looks a little ugly in the log but it makes more sense than doing it via subprocess just to write it
# back out.
_log_path = None
for h in _logger.handlers:
if isinstance(h, logging.handlers.RotatingFileHandler):
_log_path = h.baseFileName
break
self.rsync_args.extend(['--verbose',
'--log-file-format="[RSYNC {0}:{1}]:%l:%f%L"'.format(self.domain, self.port),
'--log-file={0}'.format(_log_path)])

def fetch(self):
path = self.url.rstrip('/')
if not path.endswith('/.'):
path += '/.'
dest = self.dest
if not dest.endswith('/.'):
dest += '/.'
# Yes, I know it's named "cmd_*str*". Yes, I know it's a *list*.
cmd_str = ['rsync',
*self.rsync_args,
path,
dest]
cmd = subprocess.run(cmd_str,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
stdout = cmd.stdout.read().decode('utf-8').strip()
stderr = cmd.stderr.read().decode('utf-8').strip()
if stdout != '':
_logger.debug('STDOUT: {0}'.format(stdout))
if stderr != '' or cmd.returncode != 0:
_logger.error('Rsync to {0}:{1} returned exit status {2}'.format(self.domain, self.port, cmd.returncode))
_logger.debug('STDERR: {0}'.format(stderr))
warnings.warn('Rsync process returned non-zero')
return(None)

def fetch_content(self, remote_filepath):
tf = tempfile.mkstemp()[1]
url = os.path.join(self.url.rstrip('/'),remote_filepath.lstrip('/'))
cmd_str = ['rsync',
*self.rsync_args,
url,
tf]
cmd = subprocess.run(cmd_str,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
stdout = cmd.stdout.read().decode('utf-8').strip()
stderr = cmd.stderr.read().decode('utf-8').strip()
if stdout != '':
_logger.debug('STDOUT: {0}'.format(stdout))
if stderr != '' or cmd.returncode != 0:
_logger.error('Rsync to {0}:{1} returned exit status {2}'.format(self.domain, self.port, cmd.returncode))
_logger.debug('STDERR: {0}'.format(stderr))
warnings.warn('Rsync process returned non-zero')
with open(tf, 'rb') as fh:
raw_content = fh.read()
os.remove(tf)
return(raw_content)

View File

@ -37,7 +37,7 @@ if _has_journald:
h = journal.JournaldLogHandler()
# Systemd includes times, so we don't need to.
h.setFormatter(logging.Formatter(style = '{',
fmt = ('{name}:{levelname}:{name}:{filename}:'
fmt = ('{name}:{levelname}:{filename}:'
'{funcName}:{lineno}: {message}')))
_cfg_args['handlers'].append(h)


View File

@ -1,13 +1,40 @@
import datetime
import logging
import pwd
import grp
import os
import socket
##
from . import config
from . import constants
from . import fetcher


_logger = logging.getLogger()


def get_owner(owner_xml):
owner = {}
user = owner_xml.find('user')
if user:
user = user.text
group = owner_xml.find('group')
if group:
group = group.text
if user:
user_obj = pwd.getpwnam(user)
else:
user_obj = pwd.getpwuid(os.geteuid())
owner['uid'] = user_obj.pw_uid
if group:
group_obj = grp.getgrnam(group)
else:
group_obj = grp.getgrgid(pwd.getpwuid(os.geteuid()).pw_gid)
owner['gid'] = group_obj.gr_gid
_logger.debug('Resolved owner xml to {0}'.format(owner))
return(owner)


class Args(object):
def __init__(self, args_xml):
self.xml = args_xml
@ -15,8 +42,20 @@ class Args(object):
self._parse_xml()

def _parse_xml(self):
self.args = []
for arg_xml in self.xml.xpath('(short|long)'):

val = arg_xml.attrib.get('value')
if arg_xml.tag == 'short':
prefix = '-'
# elif arg_xml.tag == 'long':
else:
prefix = '--'
arg = '{0}{1}'.format(prefix, arg_xml.text)
if val:
arg += '={0}'.format(val)
self.args.append(arg)
_logger.debug('Generated args list: {0}'.format(self.args))
return(None)


class Mount(object):
@ -26,36 +65,194 @@ class Mount(object):
self._check_mount()

def _check_mount(self):
_logger.debug('Getting mount status for {0}'.format(self.path))
with open('/proc/mounts', 'r') as fh:
raw = fh.read()
for line in raw.splitlines():
l = line.split()
mp = l[1]
if mp == self.path:
_logger.debug('{0} is mounted.'.format(self.path))
self.is_mounted = True
return(None)
self.is_mounted = False
_logger.debug('{0} is not mounted.'.format(self.path))
return(None)


class TimestampFile(object):
def __init__(self, ts_xml):
def __init__(self, ts_xml, owner_xml = None):
self.fmt = ts_xml.attrib.get('timeFormat', 'UNIX_EPOCH')
if self.fmt == 'UNIX_EPOCH':
self.fmt = '%s'
elif self.fmt == 'MICROSECOND_EPOCH':
self.fmt = '%s.%f'
_logger.debug('Set timestamp format string to {0}'.format(self.fmt))
self.owner_xml = owner_xml
self.owner = {}
if self.owner_xml:
self.owner = get_owner(self.owner_xml)
_logger.debug('Owner set is {0}'.format(self.owner))
self.path = os.path.abspath(os.path.expanduser(ts_xml.text))
_logger.debug('Path resolved to {0}'.format(self.path))

def read(self, parentdir = None):
if parentdir:
path = os.path.join(os.path.abspath(os.path.expanduser(parentdir)),
self.path.lstrip('/'))
else:
path = self.path
with open(path, 'r') as fh:
timestamp = datetime.datetime.strptime(fh.read().strip(), self.fmt)
_logger.debug('Read timestamp {0} from {1}'.format(str(timestamp), self.path))
return(timestamp)

def write(self):
dname = os.path.dirname(self.path)
if not os.path.isdir(dname):
os.makedirs(dname, mode = 0o0755)
if self.owner:
os.chown(dname, **self.owner)
_logger.debug('Created {0}'.format(dname))
with open(self.path, 'w') as fh:
fh.write(datetime.datetime.utcnow().strftime(self.fmt))
fh.write('\n')
os.chmod(self.path, mode = 0o0644)
if self.owner:
os.chown(self.path, **self.owner)
_logger.debug('Wrote timestamp to {0}'.format(self.path))
return(None)


class Upstream(object):
def __init__(self, upstream_xml):
pass
def __init__(self, upstream_xml, dest, rsync_args = None, owner = None, filechecks = None):
self.xml = upstream_xml
# These are required for all upstreams.
self.sync_type = self.xml.find('syncType').text.lower()
self.domain = self.xml.find('domain').text
self.path = self.xml.find('path').text
self.dest = os.path.abspath(os.path.expanduser(dest))
self.owner = owner
self.filechecks = filechecks
self.has_new = False
# These are optional.
for i in ('port', 'bwlimit'):
e = self.xml.find(i)
if e:
setattr(self, i, int(e.text))
else:
setattr(self, i, None)
if not getattr(self, 'port'):
self.port = constants.PROTO_DEF_PORTS[self.sync_type]
self.available = None
if self.sync_type == 'rsync':
self.fetcher = fetcher.RSync(self.domain,
self.port,
self.path,
self.dest,
rsync_args = rsync_args,
filechecks = self.filechecks,
owner = self.owner)
else:
self.fetcher = fetcher.FTP(self.domain, self.port, self.path, self.dest, owner = self.owner)
self._check_conn()

def _check_conn(self):
sock = socket.socket()
sock.settimeout(7)
try:
sock.connect((self.domain, self.port))
sock.close()
self.available = True
except (socket.timeout, socket.error):
self.available = False
return(None)

def sync(self):
self.fetcher.fetch()
return(None)


class Distro(object):
def __init__(self, distro_xml):
self.xml = distro_xml
self.name = distro_xml.attrib['name']
self.dest = os.path.abspath(os.path.expanduser(distro_xml.find('dest').text))
self.mount = Mount(self.xml.find('mountCheck'))
self.filechecks = {'local': {'check': None,
'sync': None},
'remote': {'update': None,
'sync': None}}
self.timestamps = {}
self.rsync_args = None
self.owner = None
self.upstreams = []
# These are optional.
self.owner_xml = self.xml.find('owner')
if self.owner_xml:
self.owner = get_owner(self.owner_xml)
self.rsync_xml = self.xml.find('rsyncArgs')
if self.rsync_xml:
self.rsync_args = Args(self.rsync_xml)
for i in ('Check', 'Sync'):
e = self.xml.find('lastLocal{0}'.format(i))
if e:
self.filechecks['local'][i.lower()] = TimestampFile(e)
for i in ('Sync', 'Update'):
e = self.xml.find('lastRemote{0}'.format(i))
if e:
self.filechecks['remote'][i.lower()] = TimestampFile(e)
for u in self.xml.findall('upstream'):
self.upstreams.append(Upstream(u,
self.dest,
rsync_args = self.rsync_args,
owner = self.owner,
filechecks = self.filechecks))

def check(self):
for k, v in self.filechecks['local']:
if v:
tstmp = v.read()
self.timestamps[k] = tstmp
_logger.debug('Updated timestamps: {0}'.format(self.timestamps))

def sync(self):
self.check()
for u in self.upstreams:
if not u.available:
continue
u.fetcher.check(self.filechecks['local'])
if u.has_new:
u.sync()
if self.filechecks['local']['sync']:
self.filechecks['local']['sync'].write()
break
if self.filechecks['local']['check']:
self.filechecks['local']['check'].write()
return(None)


class Sync(object):
def __init__(self, cfg = None, dummy = False, distro = None, logdir = None, *args, **kwargs):
try:
_args = dict(locals())
del(_args['self'])
_logger.debug('Sync class instantiated with args: {0}'.format(_args))
self.cfg = config.Config(cfg)
self.dummy = dummy
if distro:
self.distro = distro
else:
self.distro = []
self._distro_objs = []
self.logdir = logdir
self.xml = config.Config(cfg)
self._distro_populate()
except Exception:
_logger.error('FATAL ERROR. Stacktrace follows.', exc_info = True)

def _distro_populate(self):
pass

def sync(self):
for d in self._distro_objs:
d.sync()

22
repomirror/test.py Executable file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python3

import os
import shutil
##
import logger
import fetcher

dest = '/tmp/ipxe_ftp'
path = 'ipxe'


def main():
if os.path.isdir(dest):
shutil.rmtree(dest)
f = fetcher.FTP('10.11.12.12', 21, path, dest)
f.fetch()


if __name__ == '__main__':
main()

View File

@ -25,11 +25,12 @@ def parseArgs():
dest = 'cfg',
help = ('The path to the config file. If it does not exist, a bare version will be created. '
'Default: ~/.config/repomirror.xmlost'))
args.add_argument('-n', '--dry-run',
action = 'store_true',
dest = 'dummy',
help = ('If specified, do not actually sync anything (other than timestamp files if '
'applicable to determine logic); do not actually sync any repositories'))
# args.add_argument('-n', '--dry-run',
# action = 'store_true',
# dest = 'dummy',
# help = ('If specified, do not actually sync anything (other than timestamp files if '
# 'applicable to determine logic); do not actually sync any repositories. Useful for '
# 'generating logs to determine potential issues before they happen'))
args.add_argument('-d', '--distro',
dest = 'distro',
action = 'append',
@ -46,8 +47,8 @@ def parseArgs():

def main():
args = parseArgs().parse_args()
r = repomirror.Sync()

r = repomirror.Sync(**vars(args))
r.sync()
return(None)



View File

@ -61,7 +61,21 @@
<mountCheck>/</mountCheck>
<!--
You cannot reliably use two dashes in XML strings, so this is a workaround.
The following is only used for rsync upstreams and is optional. The default is just archive and delete-after.
The following is only used for rsync upstreams and is optional. The default is the following:
<rsyncArgs>
<long>recursive</long>
<long>times</long>
<long>links</long>
<long>hard-links</long>
<long>delete-after</long>
<long>delay-updates</long>
<long>copy-links</long>
<long>safe-links</long>
<long>delete-extended</long>
<long value=".*">exclude</long>
<rsyncArgs>
These arguments should be sane for most, if not all, rsync-driven repository mirroring. The last one (exclude) may
be removed in future versions.
If arguments are provided, the defaults are overwritten so if you need the above, be sure to specify them.
See the rsync man page (rsync(1)) for more details and a listing of supported flags on your system.
-->
@ -96,7 +110,7 @@
-->
<syncType>rsync</syncType>
<!--
Required; ONLY the domain goes here.
Required; ONLY the domain (or IP) goes here.
-->
<domain>arch.mirror.constant.com</domain>
<!--