import os
import logging
import codecs
import urllib
import json

from selenium import webdriver
from import WebDriverWait
from import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

logger = logging.getLogger(__name__)

[docs]class ScreenScraper(object): """ Base class for screen-scraping bank/financial websites. """ def __init__(self, savedir='./', screenshot=False): """ Initialize ScreenScraper. :param savedir: directory to save OFX in :type savedir: str :param screenshot: whether or not to take screenshots throughout the process :type screenshot: bool """ self._savedir = os.path.abspath(os.path.expanduser(savedir)) if not os.path.exists(self._savedir): os.makedirs(self._savedir) self._cookie_file = os.path.join(self._savedir, 'cookies.txt') logger.debug('Using savedir: %s', self._savedir) self._screenshot_num = 1 self._screenshot = screenshot if self._screenshot: logger.warning("screenshotting all actions") self.user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' \ ' (KHTML, like Gecko) Chrome/62.0.3202.62 ' \ 'Safari/537.36'
[docs] def load_cookies(self, cookie_file): """ Load cookies from a JSON cookie file on disk. This file is not the format used natively by PhantomJS, but rather the JSON-serialized representation of the dict returned by :py:meth:`selenium.webdriver.remote.webdriver.WebDriver.get_cookies`. Cookies are loaded via :py:meth:`selenium.webdriver.remote.webdriver.WebDriver.add_cookie` :param cookie_file: path to the cookie file on disk :type cookie_file: str """ if not os.path.exists(cookie_file): logger.warning('Cookie file does not exist: %s', cookie_file) return'Loading cookies from: %s', cookie_file) with open(cookie_file, 'r') as fh: cookies = json.loads( count = 0 for c in cookies: try: self.browser.add_cookie(c) count += 1 except Exception as ex:'Error loading cookie %s: %s', c, ex) logger.debug('Loaded %d of %d cookies', count, len(cookies))
[docs] def save_cookies(self, cookie_file): """ Save cookies to a JSON cookie file on disk. This file is not the format used natively by PhantomJS, but rather the JSON-serialized representation of the dict returned by :py:meth:`selenium.webdriver.remote.webdriver.WebDriver.get_cookies`. :param cookie_file: path to the cookie file on disk :type cookie_file: str """ cookies = self.browser.get_cookies() raw = json.dumps(cookies) with open(cookie_file, 'w') as fh: fh.write(raw)'Wrote %d cookies to: %s', len(cookies), cookie_file)
[docs] def do_screenshot(self): """take a debug screenshot""" if not self._screenshot: return fname = os.path.join( os.getcwd(), '{n}.png'.format(n=self._screenshot_num) ) self._pre_screenshot() self.browser.get_screenshot_as_file(fname) self._post_screenshot() logger.debug( "Screenshot: {f} of: {s}".format( f=fname, s=self.browser.current_url ) ) self._screenshot_num += 1
[docs] def error_screenshot(self, fname=None): if fname is None: fname = os.path.join(os.getcwd(), 'webdriver_fail.png') self._pre_screenshot() self.browser.get_screenshot_as_file(fname) self._post_screenshot() logger.error("Screenshot saved to: {s}".format(s=fname)) logger.error("Page title: %s", self.browser.title) html_path = os.path.join(os.getcwd(), 'webdriver_fail.html') source = self.browser.execute_script( "return document.getElementsByTagName('html')[0].innerHTML" ) with, 'w', 'utf-8') as fh: fh.write(source) logger.error('Page source saved to: %s', html_path)
[docs] def _pre_screenshot(self): if not self._browser_name.startswith('chrome'): return height = self.browser.execute_script( "return Math.max(document.body.scrollHeight, " "document.body.offsetHeight, document.documentElement." "clientHeight, document.documentElement.scrollHeight, " "document.documentElement.offsetHeight);" ) height += 100'Resizing browser to %d high', height) self.browser.set_window_size(1920, height)
[docs] def _post_screenshot(self): if not self._browser_name.startswith('chrome'): return self.browser.set_window_size(1920, 1080)
[docs] def xhr_get_url(self, url): """ use JS to download a given URL, return its contents """ script = 'var xhr = new XMLHttpRequest(); ' script += 'var jantman_dl_response = null; ' script += '"GET", "{url}", false); '.format(url=url) script += 'xhr.send(null); ' script += 'jantman_dl_response = xhr.response; ' script += 'return jantman_dl_response;' logger.debug("executing in browser: {s}".format(s=script)) res = self.browser.execute_script(script) logger.debug("got {c} characters of return value from script".format( c=len(res))) return res
[docs] def xhr_post_urlencoded(self, url, data, headers={}): """ use JS to download a given URL, return its contents """ if not isinstance(data, type('')) and not isinstance(data, type(u'')): data = urllib.urlencode(data) script = 'var xhr = new XMLHttpRequest(); ' script += 'var jantman_dl_response = null; ' script += '"POST", "{url}", false); '.format(url=url) if isinstance(headers, type({})): headers["Content-type"] = "application/x-www-form-urlencoded" for k, v in headers.items(): script += 'xhr.setRequestHeader("%s", "%s"); ' % (k, v) else: for item in headers: script += 'xhr.setRequestHeader("%s", "%s"); ' % ( item[0], item[1] ) script += 'xhr.send("{p}"); '.format(p=data) script += 'jantman_dl_response = { resp: xhr.response, ' \ 'respText: xhr.responseText, status: xhr.status, ' \ 'headers: xhr.getAllResponseHeaders() }; ' script += 'return JSON.stringify(jantman_dl_response);' logger.debug("executing in browser: %s", script) res = self.browser.execute_script(script) j = json.loads(res) logger.debug("Script returned %d length result (status %s; " "headers: %s)", len(j['respText']), j['status'], j['headers'].replace("\r", "").replace("\n", "; ")) return j['respText']
[docs] def get_browser(self, browser_name): """get a webdriver browser instance """ self._browser_name = browser_name if browser_name == 'firefox': logger.debug("getting Firefox browser (local)") if 'DISPLAY' not in os.environ: logger.debug("exporting DISPLAY=:0") os.environ['DISPLAY'] = ":0" browser = webdriver.Firefox() elif browser_name == 'chrome': logger.debug("getting Chrome browser (local)") browser = webdriver.Chrome() browser.set_window_size(1920, 1080) browser.implicitly_wait(2) elif browser_name == 'chrome-headless': logger.debug('getting Chrome browser (local) with --headless') chrome_options = Options() chrome_options.add_argument("--headless") browser = webdriver.Chrome(chrome_options=chrome_options) browser.set_window_size(1920, 1080) browser.implicitly_wait(2) elif browser_name == 'phantomjs': logger.debug("getting PhantomJS browser (local)") dcap = dict(DesiredCapabilities.PHANTOMJS) dcap[""] = self.user_agent args = [ '--cookies-file={c}'.format(c=self._cookie_file), '--ssl-protocol=any', '--ignore-ssl-errors=true', '--web-security=false' ] browser = webdriver.PhantomJS( desired_capabilities=dcap, service_args=args ) browser.set_window_size(1024, 768) else: raise SystemExit( "ERROR: browser type must be one of 'firefox', 'chrome', " "'chrome-headless' or 'phantomjs', not '{b}'".format( b=browser_name ) ) logger.debug("returning browser") return browser
[docs] def doc_readystate_is_complete(self, foo): """ return true if document is ready/complete, false otherwise """ result_str = self.browser.execute_script("return document.readyState") if result_str == "complete": return True return False
[docs] def jquery_finished(self, foo): """ return true if == 0 else false """ active = self.browser.execute_script("return") if active == 0: return True return False
[docs] def wait_for_ajax_load(self, timeout=20): """ Function to wait for an ajax event to finish and trigger page load, like the Janrain login form. Pieced together from timeout is in seconds """ WebDriverWait(self.browser, timeout).until( self.doc_readystate_is_complete ) return True