from __future__ import unicode_literals
+import base64
import calendar
import codecs
import contextlib
from .compat import (
compat_basestring,
compat_chr,
+ compat_etree_fromstring,
compat_html_entities,
compat_http_client,
compat_kwargs,
'January', 'February', 'March', 'April', 'May', 'June',
'July', 'August', 'September', 'October', 'November', 'December']
+KNOWN_EXTENSIONS = (
+ 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
+ 'flv', 'f4v', 'f4a', 'f4b',
+ 'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
+ 'mkv', 'mka', 'mk3d',
+ 'avi', 'divx',
+ 'mov',
+ 'asf', 'wmv', 'wma',
+ '3gp', '3g2',
+ 'mp3',
+ 'flac',
+ 'ape',
+ 'wav',
+ 'f4f', 'f4m', 'm3u8', 'smil')
+
def preferredencoding():
"""Get preferred encoding.
def xpath_element(node, xpath, name=None, fatal=False, default=NO_DEFAULT):
- if sys.version_info < (2, 7): # Crazy 2.6
- xpath = xpath.encode('ascii')
+ def _find_xpath(xpath):
+ if sys.version_info < (2, 7): # Crazy 2.6
+ xpath = xpath.encode('ascii')
+ return node.find(xpath)
+
+ if isinstance(xpath, (str, compat_str)):
+ n = _find_xpath(xpath)
+ else:
+ for xp in xpath:
+ n = _find_xpath(xp)
+ if n is not None:
+ break
- n = node.find(xpath)
if n is None:
if default is not NO_DEFAULT:
return default
if drive_or_unc:
norm_path.pop(0)
sanitized_path = [
- path_part if path_part in ['.', '..'] else re.sub('(?:[/<>:"\\|\\\\?\\*]|\.$)', '#', path_part)
+ path_part if path_part in ['.', '..'] else re.sub('(?:[/<>:"\\|\\\\?\\*]|[\s.]$)', '#', path_part)
for path_part in norm_path]
if drive_or_unc:
sanitized_path.insert(0, drive_or_unc + os.path.sep)
return os.path.join(*sanitized_path)
+# Prepend protocol-less URLs with `http:` scheme in order to mitigate the number of
+# unwanted failures due to missing protocol
+def sanitized_Request(url, *args, **kwargs):
+ return compat_urllib_request.Request(
+ 'http:%s' % url if url.startswith('//') else url, *args, **kwargs)
+
+
def orderedSet(iterable):
""" Remove all duplicates from the input iterable """
res = []
numstr = '0%s' % numstr
else:
base = 10
- return compat_chr(int(numstr, base))
+ # See https://github.com/rg3/youtube-dl/issues/7518
+ try:
+ return compat_chr(int(numstr, base))
+ except ValueError:
+ pass
# Unknown entity in name, return its literal representation
- return ('&%s;' % entity)
+ return '&%s;' % entity
def unescapeHTML(s):
# expected HTTP responses to meet HTTP/1.0 or later (see also
# https://github.com/rg3/youtube-dl/issues/6727)
if sys.version_info < (3, 0):
- kwargs['strict'] = True
+ kwargs[b'strict'] = True
hc = http_class(*args, **kwargs)
source_address = ydl_handler._params.get('source_address')
if source_address is not None:
return hc
+def handle_youtubedl_headers(headers):
+ filtered_headers = headers
+
+ if 'Youtubedl-no-compression' in filtered_headers:
+ filtered_headers = dict((k, v) for k, v in filtered_headers.items() if k.lower() != 'accept-encoding')
+ del filtered_headers['Youtubedl-no-compression']
+
+ return filtered_headers
+
+
class YoutubeDLHandler(compat_urllib_request.HTTPHandler):
"""Handler for HTTP requests and responses.
the standard headers to every HTTP request and handles gzipped and
deflated responses from web servers. If compression is to be avoided in
a particular request, the original request in the program code only has
- to include the HTTP header "Youtubedl-No-Compression", which will be
+ to include the HTTP header "Youtubedl-no-compression", which will be
removed before making the real request.
Part of this code was copied from:
# The dict keys are capitalized because of this bug by urllib
if h.capitalize() not in req.headers:
req.add_header(h, v)
- if 'Youtubedl-no-compression' in req.headers:
- if 'Accept-encoding' in req.headers:
- del req.headers['Accept-encoding']
- del req.headers['Youtubedl-no-compression']
+
+ req.headers = handle_youtubedl_headers(req.headers)
if sys.version_info < (2, 7) and '#' in req.get_full_url():
# Python 2.6 is brain-dead when it comes to fragments
raise original_ioerror
resp = self.addinfourl_wrapper(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
+ del resp.headers['Content-encoding']
# deflate
if resp.headers.get('Content-encoding', '') == 'deflate':
gz = io.BytesIO(self.deflate(resp.read()))
resp = self.addinfourl_wrapper(gz, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
+ del resp.headers['Content-encoding']
# Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
# https://github.com/rg3/youtube-dl/issues/6457).
if 300 <= resp.code < 400:
# https://github.com/rg3/youtube-dl/issues/6769).
# In order to at least prevent crashing we will percent encode Set-Cookie
# header before HTTPCookieProcessor starts processing it.
- if sys.version_info < (3, 0) and response.headers:
- for set_cookie_header in ('Set-Cookie', 'Set-Cookie2'):
- set_cookie = response.headers.get(set_cookie_header)
- if set_cookie:
- set_cookie_escaped = '; '.join([
- escape_rfc3986(cookie_attr.strip())
- for cookie_attr in set_cookie.decode('iso-8859-1').split(';')]).encode('iso-8859-1')
- if set_cookie != set_cookie_escaped:
- del response.headers[set_cookie_header]
- response.headers[set_cookie_header] = set_cookie_escaped
+ # if sys.version_info < (3, 0) and response.headers:
+ # for set_cookie_header in ('Set-Cookie', 'Set-Cookie2'):
+ # set_cookie = response.headers.get(set_cookie_header)
+ # if set_cookie:
+ # set_cookie_escaped = compat_urllib_parse.quote(set_cookie, b"%/;:@&=+$,!~*'()?#[] ")
+ # if set_cookie != set_cookie_escaped:
+ # del response.headers[set_cookie_header]
+ # response.headers[set_cookie_header] = set_cookie_escaped
return compat_urllib_request.HTTPCookieProcessor.http_response(self, request, response)
https_request = compat_urllib_request.HTTPCookieProcessor.http_request
if date_str is None:
return None
+ date_str = re.sub(r'\.[0-9]+', '', date_str)
+
if timezone is None:
m = re.search(
- r'(\.[0-9]+)?(?:Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
+ r'(?:Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
date_str)
if not m:
timezone = datetime.timedelta()
timezone = datetime.timedelta(
hours=sign * int(m.group('hours')),
minutes=sign * int(m.group('minutes')))
- date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter)
- dt = datetime.datetime.strptime(date_str, date_format) - timezone
- return calendar.timegm(dt.timetuple())
+ try:
+ date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter)
+ dt = datetime.datetime.strptime(date_str, date_format) - timezone
+ return calendar.timegm(dt.timetuple())
+ except ValueError:
+ pass
def unified_strdate(date_str, day_first=True):
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d')
- return upload_date
+ if upload_date is not None:
+ return compat_str(upload_date)
def determine_ext(url, default_ext='unknown_video'):
guess = url.partition('?')[0].rpartition('.')[2]
if re.match(r'^[A-Za-z0-9]+$', guess):
return guess
+ # Try extract ext from URLs like http://example.com/foo/bar.mp4/?download
+ elif guess.rstrip('/') in KNOWN_EXTENSIONS:
+ return guess.rstrip('/')
else:
return default_ext
return s
+def remove_quotes(s):
+ if s is None or len(s) < 2:
+ return s
+ for quote in ('"', "'", ):
+ if s[0] == quote and s[-1] == quote:
+ return s[1:-1]
+ return s
+
+
def url_basename(url):
path = compat_urlparse.urlparse(url).path
return path.strip('/').split('/')[-1]
v = getattr(v, get_attr, None)
if v == '':
v = None
- return default if v is None else (int(v) * invscale // scale)
+ if v is None:
+ return default
+ try:
+ return int(v) * invscale // scale
+ except ValueError:
+ return default
def str_or_none(v, default=None):
def float_or_none(v, scale=1, invscale=1, default=None):
- return default if v is None else (float(v) * invscale / scale)
+ if v is None:
+ return default
+ try:
+ return float(v) * invscale / scale
+ except ValueError:
+ return default
def parse_duration(s):
return compat_urllib_parse.urlencode(*args, **kargs).encode('ascii')
-try:
- etree_iter = xml.etree.ElementTree.Element.iter
-except AttributeError: # Python <=2.6
- etree_iter = lambda n: n.findall('.//*')
-
+def encode_dict(d, encoding='utf-8'):
+ def encode(v):
+ return v.encode(encoding) if isinstance(v, compat_basestring) else v
+ return dict((encode(k), encode(v)) for k, v in d.items())
-def parse_xml(s):
- class TreeBuilder(xml.etree.ElementTree.TreeBuilder):
- def doctype(self, name, pubid, system):
- pass # Ignore doctypes
- parser = xml.etree.ElementTree.XMLParser(target=TreeBuilder())
- kwargs = {'parser': parser} if sys.version_info >= (2, 7) else {}
- tree = xml.etree.ElementTree.XML(s.encode('utf-8'), **kwargs)
- # Fix up XML parser in Python 2.x
- if sys.version_info < (3, 0):
- for n in etree_iter(tree):
- if n.text is not None:
- if not isinstance(n.text, compat_str):
- n.text = n.text.decode('utf-8')
- return tree
+def encode_compat_str(string, encoding=preferredencoding(), errors='strict'):
+ return string if isinstance(string, compat_str) else compat_str(string, encoding, errors)
US_RATINGS = {
if v in ('true', 'false', 'null'):
return v
if v.startswith('"'):
- return v
- if v.startswith("'"):
+ v = re.sub(r"\\'", "'", v[1:-1])
+ elif v.startswith("'"):
v = v[1:-1]
v = re.sub(r"\\\\|\\'|\"", lambda m: {
'\\\\': '\\\\',
return ' '.join(shlex_quote(a) for a in args)
+def error_to_compat_str(err):
+ err_str = str(err)
+ # On python 2 error byte string must be decoded with proper
+ # encoding rather than ascii
+ if sys.version_info[0] < 3:
+ err_str = err_str.decode(preferredencoding())
+ return err_str
+
+
def mimetype2ext(mt):
_, _, res = mt.rpartition('/')
return mimetype2ext(getheader('Content-Type'))
+def encode_data_uri(data, mime_type):
+ return 'data:%s;base64,%s' % (mime_type, base64.b64encode(data).decode('ascii'))
+
+
def age_restricted(content_limit, age_limit):
""" Returns True iff the content should be blocked """
def parse_dfxp_time_expr(time_expr):
if not time_expr:
- return 0.0
+ return
mobj = re.match(r'^(?P<time_offset>\d+(?:\.\d+)?)s?$', time_expr)
if mobj:
return float(mobj.group('time_offset'))
- mobj = re.match(r'^(\d+):(\d\d):(\d\d(?:\.\d+)?)$', time_expr)
+ mobj = re.match(r'^(\d+):(\d\d):(\d\d(?:(?:\.|:)\d+)?)$', time_expr)
if mobj:
- return 3600 * int(mobj.group(1)) + 60 * int(mobj.group(2)) + float(mobj.group(3))
+ return 3600 * int(mobj.group(1)) + 60 * int(mobj.group(2)) + float(mobj.group(3).replace(':', '.'))
def srt_subtitles_timecode(seconds):
return out
- dfxp = xml.etree.ElementTree.fromstring(dfxp_data.encode('utf-8'))
+ dfxp = compat_etree_fromstring(dfxp_data.encode('utf-8'))
out = []
paras = dfxp.findall(_x('.//ttml:p')) or dfxp.findall(_x('.//ttaf1:p')) or dfxp.findall('.//p')
raise ValueError('Invalid dfxp/TTML subtitle')
for para, index in zip(paras, itertools.count(1)):
- begin_time = parse_dfxp_time_expr(para.attrib['begin'])
+ begin_time = parse_dfxp_time_expr(para.attrib.get('begin'))
end_time = parse_dfxp_time_expr(para.attrib.get('end'))
+ dur = parse_dfxp_time_expr(para.attrib.get('dur'))
+ if begin_time is None:
+ continue
if not end_time:
- end_time = begin_time + parse_dfxp_time_expr(para.attrib['dur'])
+ if not dur:
+ continue
+ end_time = begin_time + dur
out.append('%d\n%s --> %s\n%s\n\n' % (
index,
srt_subtitles_timecode(begin_time),