#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import contextlib
import ctypes
import datetime
import email.utils
import re
import ssl
import socket
+import struct
import subprocess
import sys
import traceback
except NameError:
compat_chr = chr
+try:
+ from xml.etree.ElementTree import ParseError as compat_xml_parse_error
+except ImportError: # Python 2.6
+ from xml.parsers.expat import ExpatError as compat_xml_parse_error
+
def compat_ord(c):
if type(c) is int: return c
else: return ord(c)
def find_xpath_attr(node, xpath, key, val):
""" Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z]+$', key)
- assert re.match(r'^[a-zA-Z0-9@\s]*$', val)
+ assert re.match(r'^[a-zA-Z0-9@\s:._]*$', val)
expr = xpath + u"[@%s='%s']" % (key, val)
return node.find(expr)
else:
https_request = http_request
https_response = http_response
+
def unified_strdate(date_str):
"""Return a string with the date in the format YYYYMMDD"""
upload_date = None
#Replace commas
- date_str = date_str.replace(',',' ')
+ date_str = date_str.replace(',', ' ')
# %z (UTC offset) is only supported in python>=3.2
- date_str = re.sub(r' (\+|-)[\d]*$', '', date_str)
+ date_str = re.sub(r' ?(\+|-)[0-9]{2}:?[0-9]{2}$', '', date_str)
format_expressions = [
'%d %B %Y',
+ '%d %b %Y',
'%B %d %Y',
'%b %d %Y',
'%Y-%m-%d',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%S.%f0Z',
'%Y-%m-%dT%H:%M:%S',
+ '%Y-%m-%dT%H:%M:%S.%f',
+ '%Y-%m-%dT%H:%M',
]
for expression in format_expressions:
try:
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
- except:
+ except ValueError:
pass
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
return None
-def fix_xml_all_ampersand(xml_str):
+def fix_xml_ampersands(xml_str):
"""Replace all the '&' by '&' in XML"""
- return xml_str.replace(u'&', u'&')
+ return re.sub(
+ r'&(?!amp;|lt;|gt;|apos;|quot;|#x[0-9a-fA-F]{,4};|#[0-9]{,4};)',
+ u'&',
+ xml_str)
def setproctitle(title):
return "HEAD"
-def int_or_none(v):
- return v if v is None else int(v)
+def int_or_none(v, scale=1):
+ return v if v is None else (int(v) // scale)
def parse_duration(s):
return None
m = re.match(
- r'(?:(?:(?P<hours>[0-9]+):)?(?P<mins>[0-9]+):)?(?P<secs>[0-9]+)$', s)
+ r'(?:(?:(?P<hours>[0-9]+)[:h])?(?P<mins>[0-9]+)[:m])?(?P<secs>[0-9]+)s?$', s)
if not m:
return None
res = int(m.group('secs'))
self._pagefunc = pagefunc
self._pagesize = pagesize
+ def __len__(self):
+ # This is only useful for tests
+ return len(self.getslice())
+
def getslice(self, start=0, end=None):
res = []
for pagenum in itertools.count(start // self._pagesize):
if end == nextfirstid:
break
return res
+
+
+def uppercase_escape(s):
+ return re.sub(
+ r'\\U([0-9a-fA-F]{8})',
+ lambda m: compat_chr(int(m.group(1), base=16)), s)
+
+try:
+ struct.pack(u'!I', 0)
+except TypeError:
+ # In Python 2.6 (and some 2.7 versions), struct requires a bytes argument
+ def struct_pack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.pack(spec, *args)
+
+ def struct_unpack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.unpack(spec, *args)
+else:
+ struct_pack = struct.pack
+ struct_unpack = struct.unpack
+
+
+def read_batch_urls(batch_fd):
+ def fixup(url):
+ if not isinstance(url, compat_str):
+ url = url.decode('utf-8', 'replace')
+ BOM_UTF8 = u'\xef\xbb\xbf'
+ if url.startswith(BOM_UTF8):
+ url = url[len(BOM_UTF8):]
+ url = url.strip()
+ if url.startswith(('#', ';', ']')):
+ return False
+ return url
+
+ with contextlib.closing(batch_fd) as fd:
+ return [url for url in map(fixup, fd) if url]