Source code for biweeklybudget.screenscraper

"""
The latest version of this package is available at:
<http://github.com/jantman/biweeklybudget>

################################################################################
Copyright 2016 Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>

    This file is part of biweeklybudget, also known as biweeklybudget.

    biweeklybudget is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    biweeklybudget is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with biweeklybudget.  If not, see <http://www.gnu.org/licenses/>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/biweeklybudget> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################

AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import os
import logging
import codecs
import urllib
import json
from tempfile import mkstemp

from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

logger = logging.getLogger(__name__)


[docs]class ScreenScraper(object): """ Base class for screen-scraping bank/financial websites. """ def __init__(self, savedir='./', screenshot=False): """ Initialize ScreenScraper. :param savedir: directory to save OFX in :type savedir: str :param screenshot: whether or not to take screenshots throughout the process :type screenshot: bool """ self._savedir = os.path.abspath(os.path.expanduser(savedir)) if not os.path.exists(self._savedir): os.makedirs(self._savedir) self._cookie_file = os.path.join(self._savedir, 'cookies.txt') logger.debug('Using savedir: %s', self._savedir) self._screenshot_num = 1 self._screenshot = screenshot if self._screenshot: logger.warning("screenshotting all actions") self.user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' \ ' (KHTML, like Gecko) Chrome/62.0.3202.62 ' \ 'Safari/537.36' # temporary file for driver logs fp, self._service_log_path = mkstemp() os.close(fp) def __del__(self): try: os.unlink(self._service_log_path) except Exception: pass
[docs] def load_cookies(self, cookie_file): """ Load cookies from a JSON cookie file on disk. This file is not the format used natively by PhantomJS, but rather the JSON-serialized representation of the dict returned by :py:meth:`selenium.webdriver.remote.webdriver.WebDriver.get_cookies`. Cookies are loaded via :py:meth:`selenium.webdriver.remote.webdriver.WebDriver.add_cookie` :param cookie_file: path to the cookie file on disk :type cookie_file: str """ if not os.path.exists(cookie_file): logger.warning('Cookie file does not exist: %s', cookie_file) return logger.info('Loading cookies from: %s', cookie_file) with open(cookie_file, 'r') as fh: cookies = json.loads(fh.read()) count = 0 for c in cookies: try: self.browser.add_cookie(c) count += 1 except Exception as ex: logger.info('Error loading cookie %s: %s', c, ex) logger.debug('Loaded %d of %d cookies', count, len(cookies))
[docs] def save_cookies(self, cookie_file): """ Save cookies to a JSON cookie file on disk. This file is not the format used natively by PhantomJS, but rather the JSON-serialized representation of the dict returned by :py:meth:`selenium.webdriver.remote.webdriver.WebDriver.get_cookies`. :param cookie_file: path to the cookie file on disk :type cookie_file: str """ cookies = self.browser.get_cookies() raw = json.dumps(cookies) with open(cookie_file, 'w') as fh: fh.write(raw) logger.info('Wrote %d cookies to: %s', len(cookies), cookie_file)
[docs] def do_screenshot(self): """take a debug screenshot""" if not self._screenshot: return fname = os.path.join( os.getcwd(), '{n}.png'.format(n=self._screenshot_num) ) self._pre_screenshot() self.browser.get_screenshot_as_file(fname) self._post_screenshot() logger.debug( "Screenshot: {f} of: {s}".format( f=fname, s=self.browser.current_url ) ) self._screenshot_num += 1
[docs] def error_screenshot(self, fname=None): cwd = os.getcwd() if fname is None: fname = os.path.join(cwd, 'webdriver_fail.png') self._pre_screenshot() self.browser.get_screenshot_as_file(fname) self._post_screenshot() logger.error("Screenshot saved to: {s}".format(s=fname)) logger.error("Page title: %s", self.browser.title) logger.error('Page URL: %s', self.browser.current_url) html_path = os.path.join(cwd, 'webdriver_fail.html') source = self.browser.execute_script( "return document.getElementsByTagName('html')[0].innerHTML" ) with codecs.open(html_path, 'w', 'utf-8') as fh: fh.write(source) logger.error('Page source saved to: %s', html_path) if ( os.path.exists(self._service_log_path) and os.path.getsize(self._service_log_path) > 2 ): logpath = os.path.join(cwd, 'webdriver_service_log.txt') with open(logpath, 'w') as svclog: with open(self._service_log_path, 'r') as orig: svclog.write(orig.read()) logger.error('Webdriver driver log written to: %s', logpath) try: log_types = self.browser.log_types except Exception: logger.error('Failed to gather browser logs', excinfo=True) return for name in log_types: try: log = self.browser.get_log(name) except Exception: logger.error( 'Failed to get %s log from browser', name, excinfo=True ) continue logpath = os.path.join(cwd, 'webdriver_log_%s.txt' % name) with open(logpath, 'w') as fh: if isinstance(log, type([])): fh.write("\n".join([str(x) for x in log])) else: fh.write(log) logger.error( 'Wrote driver\'s "%s" log (length: %d) to: %s', name, len(log), logpath )
[docs] def _pre_screenshot(self): if not self._browser_name.startswith('chrome'): return height = self.browser.execute_script( "return Math.max(document.body.scrollHeight, " "document.body.offsetHeight, document.documentElement." "clientHeight, document.documentElement.scrollHeight, " "document.documentElement.offsetHeight);" ) height += 100 logger.info('Resizing browser to %d high', height) self.browser.set_window_size(1920, height)
[docs] def _post_screenshot(self): if not self._browser_name.startswith('chrome'): return self.browser.set_window_size(1920, 1080)
[docs] def xhr_get_url(self, url): """ use JS to download a given URL, return its contents """ script = 'var xhr = new XMLHttpRequest(); ' script += 'var jantman_dl_response = null; ' script += 'xhr.open("GET", "{url}", false); '.format(url=url) script += 'xhr.send(null); ' script += 'jantman_dl_response = xhr.response; ' script += 'return jantman_dl_response;' logger.debug("executing in browser: {s}".format(s=script)) res = self.browser.execute_script(script) logger.debug("got {c} characters of return value from script".format( c=len(res))) return res
[docs] def xhr_post_urlencoded(self, url, data, headers={}): """ use JS to download a given URL, return its contents """ if not isinstance(data, type('')) and not isinstance(data, type(u'')): data = urllib.urlencode(data) script = 'var xhr = new XMLHttpRequest(); ' script += 'var jantman_dl_response = null; ' script += 'xhr.open("POST", "{url}", false); '.format(url=url) if isinstance(headers, type({})): headers["Content-type"] = "application/x-www-form-urlencoded" for k, v in headers.items(): script += 'xhr.setRequestHeader("%s", "%s"); ' % (k, v) else: for item in headers: script += 'xhr.setRequestHeader("%s", "%s"); ' % ( item[0], item[1] ) script += 'xhr.send("{p}"); '.format(p=data) script += 'jantman_dl_response = { resp: xhr.response, ' \ 'respText: xhr.responseText, status: xhr.status, ' \ 'headers: xhr.getAllResponseHeaders() }; ' script += 'return JSON.stringify(jantman_dl_response);' logger.debug("executing in browser: %s", script) res = self.browser.execute_script(script) j = json.loads(res) logger.debug("Script returned %d length result (status %s; " "headers: %s)", len(j['respText']), j['status'], j['headers'].replace("\r", "").replace("\n", "; ")) return j['respText']
[docs] def get_browser(self, browser_name, useragent=None): """ get a webdriver browser instance :param browser_name: name of browser to get. Can be one of "firefox", "chrome", "chrome-headless", or "phantomjs" :type browser_name: str :param useragent: Optionally override the browser's default user-agent string with this value. Supported for phantomjs or chrome. :type useragent: str """ self._browser_name = browser_name if browser_name == 'firefox': logger.debug("getting Firefox browser (local)") if 'DISPLAY' not in os.environ: logger.debug("exporting DISPLAY=:0") os.environ['DISPLAY'] = ":0" browser = webdriver.Firefox() elif browser_name in ['chrome', 'chrome-headless']: chrome_options = Options() if browser_name == 'chrome-headless': logger.debug('getting Chrome browser (local) with --headless') chrome_options.add_argument("--headless") else: logger.debug("getting Chrome browser (local)") if useragent is not None: chrome_options.add_argument('--user-agent=%s' % useragent) logger.debug( 'Setting chrome user-agent to "%s"', useragent ) browser = webdriver.Chrome( chrome_options=chrome_options, desired_capabilities={ 'loggingPrefs': {'browser': 'ALL'} }, service_log_path=self._service_log_path ) browser.set_window_size(1920, 1080) browser.implicitly_wait(2) elif browser_name == 'phantomjs': logger.debug("getting PhantomJS browser (local)") dcap = dict(DesiredCapabilities.PHANTOMJS) if useragent is None: logger.debug( 'Setting phantomjs user-agent to "%s"', self.user_agent ) dcap["phantomjs.page.settings.userAgent"] = self.user_agent else: dcap["phantomjs.page.settings.userAgent"] = useragent logger.debug( 'Setting phantomjs user-agent to "%s"', useragent ) args = [ '--cookies-file={c}'.format(c=self._cookie_file), '--ssl-protocol=any', '--ignore-ssl-errors=true', '--web-security=false' ] browser = webdriver.PhantomJS( desired_capabilities=dcap, service_args=args ) browser.set_window_size(1024, 768) else: raise SystemExit( "ERROR: browser type must be one of 'firefox', 'chrome', " "'chrome-headless' or 'phantomjs', not '{b}'".format( b=browser_name ) ) logger.debug("returning browser") return browser
[docs] def doc_readystate_is_complete(self, foo): """ return true if document is ready/complete, false otherwise """ result_str = self.browser.execute_script("return document.readyState") if result_str == "complete": return True return False
[docs] def jquery_finished(self, foo): """ return true if jQuery.active == 0 else false """ active = self.browser.execute_script("return jQuery.active") if active == 0: return True return False
[docs] def wait_for_ajax_load(self, timeout=20): """ Function to wait for an ajax event to finish and trigger page load, like the Janrain login form. Pieced together from http://stackoverflow.com/a/15791319 timeout is in seconds """ WebDriverWait(self.browser, timeout).until( self.doc_readystate_is_complete ) return True