import argparse
import logging
import atexit
import sys
from types import MethodType

from biweeklybudget.models.projects import Project, BoMItem
from biweeklybudget.db import init_db, db_session, cleanup_db
from biweeklybudget.cliutils import set_log_debug, set_log_info

logger = logging.getLogger(__name__)

# suppress requests logging
requests_log = logging.getLogger("requests")
requests_log.propagate = True

[docs]class WishlistToProject(object): def __init__(self): atexit.register(cleanup_db) init_db() try: from wishlist.core import Wishlist except ImportError: sys.stderr.write('ERROR: wishlist could not be imported. Please ' '"pip install wishlist".\n') raise SystemExit(1) self._wlist = Wishlist()
[docs] def run(self): """ Run the synchronization. :return: 2-tuple; count of successful syncs, total count of projects with associated wishlists :rtype: tuple """ logger.debug('Beginning wishlist sync run') success = 0 total = 0 for list_url, proj in self._get_wishlist_projects(): total += 1 try: if self._do_project(list_url, proj): success += 1 except Exception: logger.error('Exception updating project %s with list %s', proj, list_url, exc_info=True) return success, total
[docs] def _do_project(self, list_url, project): """ Update a project with information from its wishlist. :param list_url: Amazon wishlist URL :type list_url: str :param project: the project to update :type project: Project :return: whether or not the update was successful :rtype: bool """ logger.debug('Handling project: %s', project) pitems = self._project_items(project) witems = self._wishlist_items(list_url) logger.debug('Project has %d items; wishlist has %d', len(pitems), len(witems)) for url, item in pitems.items(): if url not in witems: '%s (%s) removed from amazon list; setting inactive', item, url ) item.is_active = False db_session.add(item) for url, item in witems.items(): if url in pitems: bitem = pitems[url]'Updating %s from Amazon wishlist', bitem) else: bitem = BoMItem() bitem.project = project'Adding new BoMItem for wishlist %s', url) bitem.url = url bitem.is_active = True bitem.quantity = item['quantity'] bitem.unit_cost = item['cost'] = item['name'] db_session.add(bitem)'Committing changes for project %s url %s', project, list_url) db_session.commit() return True
[docs] def _project_items(self, proj): """ Return all of the BoMItems for the specified project, as a dict of URL to BoMItem. :param proj: the project to get items for :type proj: Project :return: item URLs to BoMItems :rtype: dict """ res = {} for i in db_session.query(BoMItem).filter( BoMItem.project.__eq__(proj) ).all(): res[i.url] = i return res
[docs] def _wishlist_items(self, list_url): """ Get the items on the specified wishlist. :param list_url: wishlist URL :type list_url: str :return: dict of item URL to item details dict :rtype: dict """ res = {} list_name = list_url.split('/')[-1] logger.debug('Getting wishlist items for wishlist: %s', list_name) items = self._get_wishlist(list_name) logger.debug("Found %d items in list" % len(items)) for item in items: d = {'name': item.title, 'url': item.url} try: d['quantity'] = item.wanted_count except Exception: d['quantity'] = 1 if item.price > 0: d['cost'] = item.price else: d['cost'] = item.marketplace_price res[item.url] = d return res
[docs] def _get_wishlist(self, list_name): """ Workaround for a bug in wishlist==0.1.2 :param list_name: wishlist name to get :type list_name: str :return: list of items in wishlist :rtype: list """ logger.debug('Wishlist bug workaround - list name: %s', list_name) try: items = [i for i in self._wlist.get(list_name)] assert len(items) > 0 except Exception:'Hit pagination bug in wishlist package') def hack(cls, _): return 1 self._wlist.get_total_pages_from_body = MethodType( hack, self._wlist ) items = [ i for i in self._wlist.get(list_name, start_page=1, stop_page=1) ] return items
[docs] def _get_wishlist_projects(self): """ Find all projects with descriptions that begin with a wishlist URL. :return: list of (url, Project object) tuples :rtype: list """ res = [] logger.debug('Querying active projects for wishlist URLs') q = db_session.query(Project).filter(Project.is_active.__eq__(True)) total_p = 0 for p in q.all(): total_p += 1 if p.notes.strip() == '': continue u = p.notes.split(' ')[0] if self._url_is_wishlist(u): res.append((u, p))'Found %d of %d projects with wishlist URLs', len(res), total_p) return res
[docs] def _url_is_wishlist(url): """ Determine if the given string or URL matches a wishlist. :param url: URL or string to test :type url: str :return: whether url is a wishlist URL :rtype: bool """ return url.startswith('')
[docs]def parse_args(): p = argparse.ArgumentParser( description='Synchronize Amazon wishlists to projects, for projects ' 'with Notes fields beginning with an Amazon public ' 'wishlist URL' ) p.add_argument('-v', '--verbose', dest='verbose', action='count', default=0, help='verbose output. specify twice for debug-level output.') args = p.parse_args() return args
[docs]def main(): global logger format = "[%(asctime)s %(levelname)s] %(message)s" logging.basicConfig(level=logging.WARNING, format=format) logger = logging.getLogger() args = parse_args() # set logging level if args.verbose > 1: set_log_debug(logger) elif args.verbose == 1: set_log_info(logger) if args.verbose <= 1: # if we're not in verbose mode, suppress routine logging for cron lgr = logging.getLogger('alembic') lgr.setLevel(logging.WARNING) lgr = logging.getLogger('biweeklybudget.db') lgr.setLevel(logging.WARNING) syncer = WishlistToProject() success, total = if success != total: logger.warning('Synced %d of %d project wishlists', success, total) raise SystemExit(1) raise SystemExit(0)
if __name__ == "__main__": main()