from datetime import datetime
import os
import argparse
import logging
from io import BytesIO
from pytz import UTC

from ofxparse import OfxParser
from sqlalchemy.exc import InvalidRequestError, IntegrityError

from biweeklybudget.cliutils import set_log_debug, set_log_info
from biweeklybudget.ofxapi import apiclient
from biweeklybudget.ofxapi.exceptions import DuplicateFileException

logger = logging.getLogger(__name__)

[docs]class OfxBackfiller(object): """ Class to backfill OFX in database from files on disk. """ def __init__(self, client, savedir): """ Initialize the OFX Backfiller. :param client: API client :type client: Instance of :py:class:`~.OfxApiLocal` or :py:class:`~.OfxApiRemote` :param savedir: directory/path to save statements in :type savedir: str """'Initializing OfxBackfiller with savedir=%s', savedir) self.savedir = savedir self._client = client
[docs] def run(self): """ Main entry point - run the backfill. """ logger.debug('Checking for Accounts with statement directories') accounts = self._client.get_accounts() for acctname in sorted(accounts.keys()): p = os.path.join(self.savedir, acctname) data = accounts[acctname] if not os.path.isdir(p):'No statement directory for Account %d (%s)', data['id'], p) continue logger.debug('Found directory %s for Account %d', p, data['id']) self._do_account_dir(data['id'], p)
[docs] def _do_account_dir(self, acct_id, path): """ Handle all OFX statements in a per-account directory. :param acct_id: account database ID :type acct_id: int :param path: absolute path to per-account directory :type path: str """ logger.debug('Doing account %d directory (%s)', acct_id, path) files = {} for f in os.listdir(path): p = os.path.join(path, f) if not os.path.isfile(p): continue extension = p.split('.')[-1].lower() if extension not in ['ofx', 'qfx']: continue files[p] = os.path.getmtime(p) logger.debug('Found %d files for account %d', len(files), acct_id) # run through the files, oldest to newest success = 0 already = 0 for p in sorted(files, key=files.get): try: self._do_one_file(acct_id, p) success += 1 except DuplicateFileException: already += 1 logger.warning('OFX is already parsed for account; skipping') except (InvalidRequestError, IntegrityError, TypeError): raise except Exception: logger.error('Exception parsing and inserting file %s', p, exc_info=True)'Successfully parsed and inserted %d of %d files for ' 'account %d; %d files already in DB', success, len(files), acct_id, already)
[docs] def _do_one_file(self, acct_id, path): """ Parse one OFX file and use OFXUpdater to upsert it into the DB. :param acct_id: Account ID number :type acct_id: int :param path: absolute path to OFX/QFX file :type path: str """ logger.debug('Handle file %s for Account %d', path, acct_id) with open(path, 'rb') as fh: ofx_str = ofx = OfxParser.parse(BytesIO(ofx_str)) logger.debug('Parsed OFX') fname = os.path.basename(path) mtime = datetime.fromtimestamp(os.path.getmtime(path), tz=UTC) self._client.update_statement_ofx( acct_id, ofx, mtime=mtime, filename=fname ) logger.debug('Done updating')
[docs]def parse_args(): """ Parse command-line arguments. """ p = argparse.ArgumentParser(description='Backfill OFX from disk') p.add_argument('-v', '--verbose', dest='verbose', action='count', default=0, help='verbose output. specify twice for debug-level output.') p.add_argument('-r', '--remote', dest='remote', action='store', type=str, default=None, help='biweeklybudget API URL to use instead of direct DB ' 'access') p.add_argument('-s', '--save-path', dest='save_path', action='store', type=str, default=None, help='Statement save path; must be specified when running ' 'in remote (-r) mode.') p.add_argument('--ca-bundle', dest='ca_bundle', action='store', type=str, default=None, help='Path to CA certificate bundle file or directory to ' 'use for SSL verification') p.add_argument('--client-cert', dest='client_cert', action='store', type=str, default=None, help='path to client certificate to use for SSL client ' 'cert auth') p.add_argument('--client-key', dest='client_key', action='store', type=str, default=None, help='path to unencrypted client key to use for SSL client ' 'cert auth, if key is not contained in the cert file') args = p.parse_args() return args
[docs]def main(): """ Main entry point - instantiate and run :py:class:`~.OfxBackfiller`. """ global logger format = "[%(asctime)s %(levelname)s] %(message)s" logging.basicConfig(level=logging.WARNING, format=format) logger = logging.getLogger() args = parse_args() # set logging level if args.verbose > 1: set_log_debug(logger) elif args.verbose == 1: set_log_info(logger) client = apiclient( api_url=args.remote, ca_bundle=args.ca_bundle, client_cert=args.client_cert, client_key=args.client_key ) if args.remote is None: from biweeklybudget import settings save_path = settings.STATEMENTS_SAVE_PATH else: if args.save_path is None: logger.error('ERROR: -s|--save-path must be specified when running ' 'in remote mode.') raise SystemExit(1) save_path = os.path.abspath(args.save_path) cls = OfxBackfiller(client, save_path)
if __name__ == "__main__": main()