"""
The latest version of this package is available at:
<http://github.com/jantman/biweeklybudget>
################################################################################
Copyright 2016 Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
This file is part of biweeklybudget, also known as biweeklybudget.
biweeklybudget is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
biweeklybudget is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with biweeklybudget. If not, see <http://www.gnu.org/licenses/>.
The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/biweeklybudget> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################
AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""
import os
import logging
import codecs
import urllib
import json
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
logger = logging.getLogger(__name__)
[docs]class ScreenScraper(object):
"""
Base class for screen-scraping bank/financial websites.
"""
def __init__(self, savedir='./', screenshot=False):
"""
Initialize ScreenScraper.
:param savedir: directory to save OFX in
:type savedir: str
:param screenshot: whether or not to take screenshots throughout the
process
:type screenshot: bool
"""
self._savedir = os.path.abspath(os.path.expanduser(savedir))
if not os.path.exists(self._savedir):
os.makedirs(self._savedir)
self._cookie_file = os.path.join(self._savedir, 'cookies.txt')
logger.debug('Using savedir: %s', self._savedir)
self._screenshot_num = 1
self._screenshot = screenshot
if self._screenshot:
logger.warning("screenshotting all actions")
self.user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:33.0) ' \
'Gecko/20100101 Firefox/33.0'
[docs] def load_cookies(self, cookie_file):
"""
Load cookies from a JSON cookie file on disk. This file is not the
format used natively by PhantomJS, but rather the JSON-serialized
representation of the dict returned by
:py:meth:`selenium.webdriver.remote.webdriver.WebDriver.get_cookies`.
Cookies are loaded via
:py:meth:`selenium.webdriver.remote.webdriver.WebDriver.add_cookie`
:param cookie_file: path to the cookie file on disk
:type cookie_file: str
"""
if not os.path.exists(cookie_file):
logger.warning('Cookie file does not exist: %s', cookie_file)
return
logger.info('Loading cookies from: %s', cookie_file)
with open(cookie_file, 'r') as fh:
cookies = json.loads(fh.read())
count = 0
for c in cookies:
try:
self.browser.add_cookie(c)
count += 1
except Exception as ex:
logger.info('Error loading cookie %s: %s', c, ex)
logger.debug('Loaded %d of %d cookies', count, len(cookies))
[docs] def save_cookies(self, cookie_file):
"""
Save cookies to a JSON cookie file on disk. This file is not the
format used natively by PhantomJS, but rather the JSON-serialized
representation of the dict returned by
:py:meth:`selenium.webdriver.remote.webdriver.WebDriver.get_cookies`.
:param cookie_file: path to the cookie file on disk
:type cookie_file: str
"""
cookies = self.browser.get_cookies()
raw = json.dumps(cookies)
with open(cookie_file, 'w') as fh:
fh.write(raw)
logger.info('Wrote %d cookies to: %s', len(cookies), cookie_file)
[docs] def do_screenshot(self):
"""take a debug screenshot"""
if not self._screenshot:
return
fname = os.path.join(
os.getcwd(), '{n}.png'.format(n=self._screenshot_num)
)
self.browser.get_screenshot_as_file(fname)
logger.debug(
"Screenshot: {f} of: {s}".format(
f=fname,
s=self.browser.current_url
)
)
self._screenshot_num += 1
[docs] def error_screenshot(self, fname=None):
if fname is None:
fname = os.path.join(os.getcwd(), 'webdriver_fail.png')
self.browser.get_screenshot_as_file(fname)
logger.error("Screenshot saved to: {s}".format(s=fname))
logger.error("Page title: %s", self.browser.title)
html_path = os.path.join(os.getcwd(), 'webdriver_fail.html')
source = self.browser.execute_script(
"return document.getElementsByTagName('html')[0].innerHTML"
)
with codecs.open(html_path, 'w', 'utf-8') as fh:
fh.write(source)
logger.error('Page source saved to: %s', html_path)
[docs] def xhr_get_url(self, url):
""" use JS to download a given URL, return its contents """
script = 'var xhr = new XMLHttpRequest(); '
script += 'var jantman_dl_response = null; '
script += 'xhr.open("GET", "{url}", false); '.format(url=url)
script += 'xhr.send(null); '
script += 'jantman_dl_response = xhr.response; '
script += 'return jantman_dl_response;'
logger.debug("executing in browser: {s}".format(s=script))
res = self.browser.execute_script(script)
logger.debug("got {c} characters of return value from script".format(
c=len(res)))
return res
[docs] def xhr_post_urlencoded(self, url, data, headers={}):
""" use JS to download a given URL, return its contents """
if not isinstance(data, type('')) and not isinstance(data, type(u'')):
data = urllib.urlencode(data)
script = 'var xhr = new XMLHttpRequest(); '
script += 'var jantman_dl_response = null; '
script += 'xhr.open("POST", "{url}", false); '.format(url=url)
if isinstance(headers, type({})):
headers["Content-type"] = "application/x-www-form-urlencoded"
for k, v in headers.items():
script += 'xhr.setRequestHeader("%s", "%s"); ' % (k, v)
else:
for item in headers:
script += 'xhr.setRequestHeader("%s", "%s"); ' % (
item[0], item[1]
)
script += 'xhr.send("{p}"); '.format(p=data)
script += 'jantman_dl_response = { resp: xhr.response, ' \
'respText: xhr.responseText, status: xhr.status, ' \
'headers: xhr.getAllResponseHeaders() }; '
script += 'return JSON.stringify(jantman_dl_response);'
logger.debug("executing in browser: %s", script)
res = self.browser.execute_script(script)
j = json.loads(res)
logger.debug("Script returned %d length result (status %s; "
"headers: %s)", len(j['respText']),
j['status'],
j['headers'].replace("\r", "").replace("\n", "; "))
return j['respText']
[docs] def get_browser(self, browser_name):
"""get a webdriver browser instance """
if browser_name == 'firefox':
logger.debug("getting Firefox browser (local)")
if 'DISPLAY' not in os.environ:
logger.debug("exporting DISPLAY=:0")
os.environ['DISPLAY'] = ":0"
browser = webdriver.Firefox()
elif browser_name == 'chrome':
logger.debug("getting Chrome browser (local)")
browser = webdriver.Chrome()
elif browser_name == 'phantomjs':
logger.debug("getting PhantomJS browser (local)")
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = self.user_agent
args = [
'--cookies-file={c}'.format(c=self._cookie_file),
'--ssl-protocol=any',
'--ignore-ssl-errors=true',
'--web-security=false'
]
browser = webdriver.PhantomJS(
desired_capabilities=dcap, service_args=args
)
browser.set_window_size(1024, 768)
else:
raise SystemExit(
"ERROR: browser type must be one of 'firefox', 'chrome' or "
"'phantomjs', not '{b}'".format(b=browser_name))
logger.debug("returning browser")
return browser
[docs] def doc_readystate_is_complete(self, foo):
""" return true if document is ready/complete, false otherwise """
result_str = self.browser.execute_script("return document.readyState")
if result_str == "complete":
return True
return False
[docs] def jquery_finished(self, foo):
""" return true if jQuery.active == 0 else false """
active = self.browser.execute_script("return jQuery.active")
if active == 0:
return True
return False
[docs] def wait_for_ajax_load(self, timeout=20):
"""
Function to wait for an ajax event to finish and trigger page load,
like the Janrain login form.
Pieced together from
http://stackoverflow.com/a/15791319
timeout is in seconds
"""
WebDriverWait(self.browser, timeout).until(
self.doc_readystate_is_complete
)
return True