X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=test%2Ftest_download.py;h=8fccdaf9e079e23de57856457fbc865be844fecd;hb=ff14fc496401b3318339e108f3db2825622c44c3;hp=16f2008094fca751f69ee01a38f16a6fa0c3fb53;hpb=6c5ad80cdcd3f51b61a9d21c55e21d51e6b2f39a;p=youtube-dl diff --git a/test/test_download.py b/test/test_download.py index 16f200809..8fccdaf9e 100644 --- a/test/test_download.py +++ b/test/test_download.py @@ -8,22 +8,22 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from test.helper import ( get_params, - get_testcases, - global_setup, + gettestcases, try_rm, md5, report_warning ) -global_setup() import hashlib import io import json +import re import socket import youtube_dl.YoutubeDL from youtube_dl.utils import ( + compat_http_client, compat_str, compat_urllib_error, compat_HTTPError, @@ -51,7 +51,7 @@ def _file_md5(fn): with open(fn, 'rb') as f: return hashlib.md5(f.read()).hexdigest() -defs = get_testcases() +defs = gettestcases() class TestDownload(unittest.TestCase): @@ -73,9 +73,7 @@ def generator(test_case): if 'playlist' not in test_case: info_dict = test_case.get('info_dict', {}) if not test_case.get('file') and not (info_dict.get('id') and info_dict.get('ext')): - print_skipping('The output file cannot be know, the "file" ' - 'key is missing or the info_dict is incomplete') - return + raise Exception('Test definition incorrect. The output file cannot be known. Are both \'id\' and \'ext\' keys present?') if 'skip' in test_case: print_skipping(test_case['skip']) return @@ -92,7 +90,7 @@ def generator(test_case): def _hook(status): if status['status'] == 'finished': finished_hook_called.add(status['filename']) - ydl.fd.add_progress_hook(_hook) + ydl.add_progress_hook(_hook) def get_tc_filename(tc): return tc.get('file') or ydl.prepare_filename(tc.get('info_dict', {})) @@ -103,7 +101,7 @@ def generator(test_case): tc_filename = get_tc_filename(tc) try_rm(tc_filename) try_rm(tc_filename + '.part') - try_rm(tc_filename + '.info.json') + try_rm(os.path.splitext(tc_filename)[0] + '.info.json') try_rm_tcs_files() try: try_num = 1 @@ -112,7 +110,7 @@ def generator(test_case): ydl.download([test_case['url']]) except (DownloadError, ExtractorError) as err: # Check if the exception is not a network related one - if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503): + if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError, compat_http_client.BadStatusLine) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503): raise if try_num == RETRIES: @@ -130,26 +128,33 @@ def generator(test_case): if not test_case.get('params', {}).get('skip_download', False): self.assertTrue(os.path.exists(tc_filename), msg='Missing file ' + tc_filename) self.assertTrue(tc_filename in finished_hook_called) - self.assertTrue(os.path.exists(tc_filename + '.info.json')) + info_json_fn = os.path.splitext(tc_filename)[0] + '.info.json' + self.assertTrue(os.path.exists(info_json_fn)) if 'md5' in tc: md5_for_file = _file_md5(tc_filename) self.assertEqual(md5_for_file, tc['md5']) - with io.open(tc_filename + '.info.json', encoding='utf-8') as infof: + with io.open(info_json_fn, encoding='utf-8') as infof: info_dict = json.load(infof) for (info_field, expected) in tc.get('info_dict', {}).items(): - if isinstance(expected, compat_str) and expected.startswith('md5:'): - got = 'md5:' + md5(info_dict.get(info_field)) - else: + if isinstance(expected, compat_str) and expected.startswith('re:'): got = info_dict.get(info_field) - self.assertEqual(expected, got, - u'invalid value for field %s, expected %r, got %r' % (info_field, expected, got)) + match_str = expected[len('re:'):] + match_rex = re.compile(match_str) - # If checkable fields are missing from the test case, print the info_dict - test_info_dict = dict((key, value if not isinstance(value, compat_str) or len(value) < 250 else 'md5:' + md5(value)) - for key, value in info_dict.items() - if value and key in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location')) - if not all(key in tc.get('info_dict', {}).keys() for key in test_info_dict.keys()): - sys.stderr.write(u'\n"info_dict": ' + json.dumps(test_info_dict, ensure_ascii=False, indent=2) + u'\n') + self.assertTrue( + isinstance(got, compat_str) and match_rex.match(got), + u'field %s (value: %r) should match %r' % (info_field, got, match_str)) + elif isinstance(expected, type): + got = info_dict.get(info_field) + self.assertTrue(isinstance(got, expected), + u'Expected type %r, but got value %r of type %r' % (expected, got, type(got))) + else: + if isinstance(expected, compat_str) and expected.startswith('md5:'): + got = 'md5:' + md5(info_dict.get(info_field)) + else: + got = info_dict.get(info_field) + self.assertEqual(expected, got, + u'invalid value for field %s, expected %r, got %r' % (info_field, expected, got)) # Check for the presence of mandatory fields for key in ('id', 'url', 'title', 'ext'): @@ -157,6 +162,13 @@ def generator(test_case): # Check for mandatory fields that are automatically set by YoutubeDL for key in ['webpage_url', 'extractor', 'extractor_key']: self.assertTrue(info_dict.get(key), u'Missing field: %s' % key) + + # If checkable fields are missing from the test case, print the info_dict + test_info_dict = dict((key, value if not isinstance(value, compat_str) or len(value) < 250 else 'md5:' + md5(value)) + for key, value in info_dict.items() + if value and key in ('title', 'description', 'uploader', 'upload_date', 'timestamp', 'uploader_id', 'location')) + if not all(key in tc.get('info_dict', {}).keys() for key in test_info_dict.keys()): + sys.stderr.write(u'\n"info_dict": ' + json.dumps(test_info_dict, ensure_ascii=False, indent=4) + u'\n') finally: try_rm_tcs_files()