Source code for biweeklybudget.plaid_updater

"""
The latest version of this package is available at:
<http://github.com/jantman/biweeklybudget>

################################################################################
Copyright 2020 Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>

    This file is part of biweeklybudget, also known as biweeklybudget.

    biweeklybudget is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    biweeklybudget is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with biweeklybudget.  If not, see <http://www.gnu.org/licenses/>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/biweeklybudget> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################

AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import logging
from datetime import datetime, timedelta, time
from decimal import Decimal, ROUND_HALF_DOWN
from typing import Optional, List, Dict

from pytz import UTC

from biweeklybudget.db import db_session, upsert_record
from biweeklybudget.models.ofx_transaction import OFXTransaction
from biweeklybudget.models.ofx_statement import OFXStatement
from biweeklybudget.models.account import Account
from biweeklybudget.models.plaid_items import PlaidItem
from biweeklybudget.models.plaid_accounts import PlaidAccount
from biweeklybudget.utils import plaid_client, dtnow

from plaid.models import (
    ItemGetRequest, TransactionsGetRequest, TransactionsGetRequestOptions
)

logger = logging.getLogger(__name__)


[docs]class PlaidUpdateResult: """Describes the result of updating a single account via Plaid.""" def __init__( self, item: PlaidItem, success: bool, updated: int, added: int, exc: Optional[Exception], stmt_ids: Optional[List[int]] ): """ Store the result of an update. :param item: the PlaidItem that this update pertains to :param success: whether the update succeeded or not :param updated: count of updated transactions :param added: count of added transactions :param exc: exception encountered, if any :param stmt_ids: list of added Statement IDs """ self.item = item self.success = success self.updated = updated self.added = added self.exc = exc self.stmt_ids = stmt_ids @property def as_dict(self): return { 'item_id': self.item.item_id, 'success': self.success, 'exception': str(self.exc), 'statement_ids': self.stmt_ids, 'added': self.added, 'updated': self.updated }
[docs]class PlaidUpdater: def __init__(self): self.client = plaid_client()
[docs] @classmethod def available_items(cls): """ Return a list of :py:class:`~.PlaidItem` objects that can be updated via Plaid. :return: PlaidItem that can be updated via Plaid :rtype: list of :py:class:`~.PlaidItem` objects """ return db_session.query(PlaidItem).order_by( PlaidItem.institution_name ).all()
[docs] def update(self, items=None, days=30): """ Update account balances and transactions from Plaid, for either all Plaid Items that are available or a specified list of Item IDs. :param items: a list of :py:class:`~.PlaidItem` objects to update :type items: list or None :param days: number of days of transactions to get from Plaid :type days: int :return: list of :py:class:`~.PlaidUpdateResult` instances :rtype: list """ if items is None: items = self.available_items() logger.debug( 'Running Plaid update for %d items: %s', len(items), items ) result = [] for item in items: result.append(self._do_item(item, days=days)) return result
[docs] def _do_item(self, item, days): """ Request transactions from Plaid for one Item. Update balances and transactions for each Account in that item. :param item: the item to update :type item: PlaidItem :param days: number of days of transactions to get from Plaid :type days: int :rtype: PlaidUpdateResult """ logger.info('Plaid update for %s', item) try: end_date: datetime = dtnow() start_date: datetime = end_date - timedelta(days=days) iteminfo = self.client.item_get( ItemGetRequest(access_token=item.access_token) ) logger.info( 'Item %s transactions status: %s', item, iteminfo.get('status', {}).get('transactions') ) logger.debug( 'Downloading Plaid transactions for item: %s from %s to %s', item, start_date, end_date ) txns: List[dict] accts: Dict txns, accts = self._get_transactions( item.access_token, start_date, end_date ) accounts: Dict[str, PlaidAccount] = {} pa: PlaidAccount for pa in db_session.query(PlaidAccount).filter( PlaidAccount.item_id == item.item_id ).all(): accounts[pa.account_id] = pa stmt_ids: List[int] = [] added: int = 0 updated: int = 0 txns_per_acct: Dict[str, list] = {} for t in txns: if t['account_id'] not in txns_per_acct: txns_per_acct[t['account_id']] = [] txns_per_acct[t['account_id']].append(t) for plaid_account_id in accts.keys(): plaid_acct: PlaidAccount = accounts[plaid_account_id] acct: Optional[Account] = plaid_acct.account if acct is None: logger.warning( 'Plaid item %s returned transactions for %s, but it is ' 'not mapped to an Account.', item, plaid_acct ) continue sid, a, u = self._stmt_for_acct( acct, accts[plaid_account_id], txns_per_acct.get(plaid_account_id, []), end_date ) added += a updated += u stmt_ids.append(sid) item.last_updated = dtnow() db_session.add(item) db_session.commit() return PlaidUpdateResult( item, True, updated, added, None, stmt_ids ) except Exception as ex: logger.error( 'Exception encountered when updating item: %s (%s)', item.institution_name, item.item_id, exc_info=True ) return PlaidUpdateResult( item, False, 0, 0, ex, None )
[docs] def _get_transactions( self, access_token: str, start_dt: datetime, end_dt: datetime ): kwargs = { 'access_token': access_token, 'start_date': start_dt.date(), 'end_date': end_dt.date(), 'options': TransactionsGetRequestOptions() } txns: List[dict] = [] accts: dict = {} req = TransactionsGetRequest(**kwargs) logger.debug('Issuing transactions get request') resp = self.client.transactions_get(req) txns = resp['transactions'] for acct in resp['accounts']: accts[acct['account_id']] = acct logger.debug( 'Got %d transactions; expecting %d total', len(txns), resp['total_transactions'] ) while len(txns) < resp['total_transactions']: kwargs['options'] = TransactionsGetRequestOptions( offset=len(txns) ) req = TransactionsGetRequest(**kwargs) logger.debug( 'Issuing transactions get request with offset=%d', len(txns) ) resp = self.client.transactions_get(req) txns.extend(resp['transactions']) for acct in resp['accounts']: accts[acct['account_id']] = acct return txns, accts
[docs] def _stmt_for_acct( self, account: Account, plaid_acct_info: dict, plaid_txns: List[dict], end_dt: datetime ): """ Put Plaid transaction data to the DB :param account: the account to update :param plaid_acct_info: dict of account information from Plaid :param txns: list of transactions from Plaid :param end_dt: current time, as of when transactions were retrieved """ stmt = OFXStatement( account_id=account.id, filename=f'Plaid_{account.name}_{int(end_dt.timestamp())}.ofx', file_mtime=end_dt, as_of=end_dt, currency=plaid_acct_info['balances']['iso_currency_code'], acctid=plaid_acct_info['mask'] ) stmt.bankid = account.plaid_account.plaid_item.institution_id if account.plaid_account.account_type == 'credit': stmt.type = 'CreditCard' self._update_bank_or_credit( end_dt, account, plaid_acct_info, plaid_txns, stmt ) elif account.plaid_account.account_type == 'depository': stmt.type = 'Bank' self._update_bank_or_credit( end_dt, account, plaid_acct_info, plaid_txns, stmt ) elif account.plaid_account.account_type == 'investment': stmt.type = 'Investment' self._update_investment(end_dt, account, plaid_acct_info, stmt) elif account.plaid_account.account_type == 'loan': # For now, this should work... stmt.type = 'Investment' self._update_investment(end_dt, account, plaid_acct_info, stmt) else: raise RuntimeError( 'ERROR: Unknown account type: ' + account.plaid_account.account_type ) count_new, count_upd = self._new_updated_counts() db_session.commit() logger.info('Account "%s" - inserted %d new OFXTransaction(s), updated ' '%d existing OFXTransaction(s)', account.name, count_new, count_upd) logger.debug('Done updating OFX in DB') return stmt.id, count_new, count_upd
[docs] def _new_updated_counts(self): """ Return integer counts of the number of :py:class:`~.OFXTransaction` objects that have been created and updated. :return: 2-tuple of new OFXTransactions created, OFXTransactions updated :rtype: tuple """ count_new = 0 count_upd = 0 for obj in db_session.dirty: if isinstance(obj, OFXTransaction): count_upd += 1 for obj in db_session.new: if isinstance(obj, OFXTransaction): count_new += 1 return count_new, count_upd
[docs] def _update_bank_or_credit( self, end_dt: datetime, account: Account, plaid_acct_info: dict, plaid_txns: List[dict], stmt: OFXStatement ): logger.debug('Generating statement for credit account') stmt.as_of = end_dt stmt.ledger_bal_as_of = end_dt stmt.ledger_bal = Decimal( plaid_acct_info['balances']['current'] ).quantize( Decimal('.01'), rounding=ROUND_HALF_DOWN ) if plaid_acct_info['balances'].get('available', None) is not None: stmt.avail_bal = Decimal( plaid_acct_info['balances']['available'] ).quantize(Decimal('.01'), rounding=ROUND_HALF_DOWN) stmt.avail_bal_as_of = end_dt stmt.currency = plaid_acct_info['balances']['iso_currency_code'] db_session.add(stmt) account.set_balance( overall_date=stmt.as_of, ledger=stmt.ledger_bal, ledger_date=stmt.ledger_bal_as_of, avail=stmt.avail_bal, avail_date=stmt.avail_bal_as_of ) for pt in plaid_txns: if pt['pending']: logger.info('Skipping pending transaction: %s', pt) continue kwargs = { 'amount': Decimal(str(pt['amount'])).quantize( Decimal('.01'), rounding=ROUND_HALF_DOWN ), 'date_posted': datetime.combine( pt["date"], time(0, 0, 0), tzinfo=UTC ), 'fitid': pt['payment_meta']['reference_number'], 'name': pt['name'], 'account_id': account.id, 'statement': stmt } if account.negate_ofx_amounts: kwargs['amount'] = kwargs['amount'] * Decimal('-1.0') if kwargs['fitid'] is None: kwargs['fitid'] = pt['transaction_id'] upsert_record( OFXTransaction, ['account_id', 'fitid'], **kwargs )
[docs] def _update_investment( self, end_dt: datetime, account: Account, plaid_acct_info: dict, stmt: OFXStatement ): logger.debug('Generating statement for investment account') stmt.as_of = end_dt stmt.ledger_bal = Decimal( plaid_acct_info['balances']['current'] ).quantize(Decimal('.01'), rounding=ROUND_HALF_DOWN) stmt.ledger_bal_as_of = end_dt stmt.currency = plaid_acct_info['balances']['iso_currency_code'] db_session.add(stmt) account.set_balance( overall_date=stmt.as_of, ledger=stmt.ledger_bal, ledger_date=stmt.ledger_bal_as_of )