encodeFilename,
escape_rfc3986,
escape_url,
+ extract_attributes,
ExtractorError,
find_xpath_attr,
fix_xml_ampersands,
orderedSet,
parse_duration,
parse_filesize,
+ parse_count,
parse_iso8601,
read_batch_urls,
sanitize_filename,
lowercase_escape,
url_basename,
urlencode_postdata,
+ update_url_query,
version_tuple,
xpath_with_ns,
xpath_element,
cli_bool_option,
)
from youtube_dl.compat import (
+ compat_chr,
compat_etree_fromstring,
+ compat_urlparse,
+ compat_parse_qs,
)
data = urlencode_postdata({'username': 'foo@bar.com', 'password': '1234'})
self.assertTrue(isinstance(data, bytes))
+ def test_update_url_query(self):
+ def query_dict(url):
+ return compat_parse_qs(compat_urlparse.urlparse(url).query)
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'quality': ['HD'], 'format': ['mp4']})),
+ query_dict('http://example.com/path?quality=HD&format=mp4'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'system': ['LINUX', 'WINDOWS']})),
+ query_dict('http://example.com/path?system=LINUX&system=WINDOWS'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'fields': 'id,formats,subtitles'})),
+ query_dict('http://example.com/path?fields=id,formats,subtitles'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'fields': ('id,formats,subtitles', 'thumbnails')})),
+ query_dict('http://example.com/path?fields=id,formats,subtitles&fields=thumbnails'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path?manifest=f4m', {'manifest': []})),
+ query_dict('http://example.com/path'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path?system=LINUX&system=WINDOWS', {'system': 'LINUX'})),
+ query_dict('http://example.com/path?system=LINUX'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'fields': b'id,formats,subtitles'})),
+ query_dict('http://example.com/path?fields=id,formats,subtitles'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'width': 1080, 'height': 720})),
+ query_dict('http://example.com/path?width=1080&height=720'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'bitrate': 5020.43})),
+ query_dict('http://example.com/path?bitrate=5020.43'))
+ self.assertEqual(query_dict(update_url_query(
+ 'http://example.com/path', {'test': '第二行тест'})),
+ query_dict('http://example.com/path?test=%E7%AC%AC%E4%BA%8C%E8%A1%8C%D1%82%D0%B5%D1%81%D1%82'))
+
def test_dict_get(self):
FALSE_VALUES = {
'none': None,
on = js_to_json('{"abc": "def",}')
self.assertEqual(json.loads(on), {'abc': 'def'})
+ def test_extract_attributes(self):
+ self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'})
+ self.assertEqual(extract_attributes("<e x='y'>"), {'x': 'y'})
+ self.assertEqual(extract_attributes('<e x=y>'), {'x': 'y'})
+ self.assertEqual(extract_attributes('<e x="a \'b\' c">'), {'x': "a 'b' c"})
+ self.assertEqual(extract_attributes('<e x=\'a "b" c\'>'), {'x': 'a "b" c'})
+ self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'})
+ self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'})
+ self.assertEqual(extract_attributes('<e x="&">'), {'x': '&'}) # XML
+ self.assertEqual(extract_attributes('<e x=""">'), {'x': '"'})
+ self.assertEqual(extract_attributes('<e x="£">'), {'x': '£'}) # HTML 3.2
+ self.assertEqual(extract_attributes('<e x="λ">'), {'x': 'λ'}) # HTML 4.0
+ self.assertEqual(extract_attributes('<e x="&foo">'), {'x': '&foo'})
+ self.assertEqual(extract_attributes('<e x="\'">'), {'x': "'"})
+ self.assertEqual(extract_attributes('<e x=\'"\'>'), {'x': '"'})
+ self.assertEqual(extract_attributes('<e x >'), {'x': None})
+ self.assertEqual(extract_attributes('<e x=y a>'), {'x': 'y', 'a': None})
+ self.assertEqual(extract_attributes('<e x= y>'), {'x': 'y'})
+ self.assertEqual(extract_attributes('<e x=1 y=2 x=3>'), {'y': '2', 'x': '3'})
+ self.assertEqual(extract_attributes('<e \nx=\ny\n>'), {'x': 'y'})
+ self.assertEqual(extract_attributes('<e \nx=\n"y"\n>'), {'x': 'y'})
+ self.assertEqual(extract_attributes("<e \nx=\n'y'\n>"), {'x': 'y'})
+ self.assertEqual(extract_attributes('<e \nx="\ny\n">'), {'x': '\ny\n'})
+ self.assertEqual(extract_attributes('<e CAPS=x>'), {'caps': 'x'}) # Names lowercased
+ self.assertEqual(extract_attributes('<e x=1 X=2>'), {'x': '2'})
+ self.assertEqual(extract_attributes('<e X=1 x=2>'), {'x': '2'})
+ self.assertEqual(extract_attributes('<e _:funny-name1=1>'), {'_:funny-name1': '1'})
+ self.assertEqual(extract_attributes('<e x="Fáilte 世界 \U0001f600">'), {'x': 'Fáilte 世界 \U0001f600'})
+ self.assertEqual(extract_attributes('<e x="décomposé">'), {'x': 'décompose\u0301'})
+ # "Narrow" Python builds don't support unicode code points outside BMP.
+ try:
+ compat_chr(0x10000)
+ supports_outside_bmp = True
+ except ValueError:
+ supports_outside_bmp = False
+ if supports_outside_bmp:
+ self.assertEqual(extract_attributes('<e x="Smile 😀!">'), {'x': 'Smile \U0001f600!'})
+
def test_clean_html(self):
self.assertEqual(clean_html('a:\nb'), 'a: b')
self.assertEqual(clean_html('a:\n "b"'), 'a: "b"')
self.assertEqual(parse_filesize('1.2Tb'), 1200000000000)
self.assertEqual(parse_filesize('1,24 KB'), 1240)
+ def test_parse_count(self):
+ self.assertEqual(parse_count(None), None)
+ self.assertEqual(parse_count(''), None)
+ self.assertEqual(parse_count('0'), 0)
+ self.assertEqual(parse_count('1000'), 1000)
+ self.assertEqual(parse_count('1.000'), 1000)
+ self.assertEqual(parse_count('1.1k'), 1100)
+ self.assertEqual(parse_count('1.1kk'), 1100000)
+
def test_version_tuple(self):
self.assertEqual(version_tuple('1'), (1,))
self.assertEqual(version_tuple('10.23.344'), (10, 23, 344))
import zlib
from .compat import (
+ compat_HTMLParser,
compat_basestring,
compat_chr,
compat_etree_fromstring,
return unescapeHTML(res)
+ class HTMLAttributeParser(compat_HTMLParser):
+ """Trivial HTML parser to gather the attributes for a single element"""
+ def __init__(self):
+ self.attrs = { }
+ compat_HTMLParser.__init__(self)
+
+ def handle_starttag(self, tag, attrs):
+ self.attrs = dict(attrs)
+
+ def extract_attributes(html_element):
+ """Given a string for an HTML element such as
+ <el
+ a="foo" B="bar" c="&98;az" d=boz
+ empty= noval entity="&"
+ sq='"' dq="'"
+ >
+ Decode and return a dictionary of attributes.
+ {
+ 'a': 'foo', 'b': 'bar', c: 'baz', d: 'boz',
+ 'empty': '', 'noval': None, 'entity': '&',
+ 'sq': '"', 'dq': '\''
+ }.
+ NB HTMLParser is stricter in Python 2.6 & 3.2 than in later versions,
+ but the cases in the unit test will work for all of 2.6, 2.7, 3.2-3.5.
+ """
+ parser = HTMLAttributeParser()
+ parser.feed(html_element)
+ parser.close()
+ return parser.attrs
def clean_html(html):
"""Clean an HTML snippet into a readable string"""
if not for_subprocess and sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
return s
+ # Jython assumes filenames are Unicode strings though reported as Python 2.x compatible
+ if sys.platform.startswith('java'):
+ return s
+
return s.encode(get_subprocess_encoding(), 'ignore')
raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
else:
- import fcntl
+ # Some platforms, such as Jython, is missing fcntl
+ try:
+ import fcntl
- def _lock_file(f, exclusive):
- fcntl.flock(f, fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH)
+ def _lock_file(f, exclusive):
+ fcntl.flock(f, fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH)
- def _unlock_file(f):
- fcntl.flock(f, fcntl.LOCK_UN)
+ def _unlock_file(f):
+ fcntl.flock(f, fcntl.LOCK_UN)
+ except ImportError:
+ UNSUPPORTED_MSG = 'file locking is not supported on this platform'
+
+ def _lock_file(f, exclusive):
+ raise IOError(UNSUPPORTED_MSG)
+
+ def _unlock_file(f):
+ raise IOError(UNSUPPORTED_MSG)
class locked_file(object):
return '%.2f%s' % (converted, suffix)
+def lookup_unit_table(unit_table, s):
+ units_re = '|'.join(re.escape(u) for u in unit_table)
+ m = re.match(
+ r'(?P<num>[0-9]+(?:[,.][0-9]*)?)\s*(?P<unit>%s)' % units_re, s)
+ if not m:
+ return None
+ num_str = m.group('num').replace(',', '.')
+ mult = unit_table[m.group('unit')]
+ return int(float(num_str) * mult)
+
+
def parse_filesize(s):
if s is None:
return None
'Yb': 1000 ** 8,
}
- units_re = '|'.join(re.escape(u) for u in _UNIT_TABLE)
- m = re.match(
- r'(?P<num>[0-9]+(?:[,.][0-9]*)?)\s*(?P<unit>%s)' % units_re, s)
- if not m:
+ return lookup_unit_table(_UNIT_TABLE, s)
+
+
+def parse_count(s):
+ if s is None:
return None
- num_str = m.group('num').replace(',', '.')
- mult = _UNIT_TABLE[m.group('unit')]
- return int(float(num_str) * mult)
+ s = s.strip()
+
+ if re.match(r'^[\d,.]+$', s):
+ return str_to_int(s)
+
+ _UNIT_TABLE = {
+ 'k': 1000,
+ 'K': 1000,
+ 'm': 1000 ** 2,
+ 'M': 1000 ** 2,
+ 'kk': 1000 ** 2,
+ 'KK': 1000 ** 2,
+ }
+
+ return lookup_unit_table(_UNIT_TABLE, s)
def month_by_name(name):
def setproctitle(title):
assert isinstance(title, compat_str)
+
+ # ctypes in Jython is not complete
+ # http://bugs.jython.org/issue2148
+ if sys.platform.startswith('java'):
+ return
+
try:
libc = ctypes.cdll.LoadLibrary('libc.so.6')
except OSError:
return compat_urllib_parse.urlencode(*args, **kargs).encode('ascii')
+def update_url_query(url, query):
+ parsed_url = compat_urlparse.urlparse(url)
+ qs = compat_parse_qs(parsed_url.query)
+ qs.update(query)
+ qs = encode_dict(qs)
+ return compat_urlparse.urlunparse(parsed_url._replace(
+ query=compat_urllib_parse.urlencode(qs, True)))
+
+
def encode_dict(d, encoding='utf-8'):
def encode(v):
return v.encode(encoding) if isinstance(v, compat_basestring) else v