import json
import unittest
import sys
+import hashlib
import socket
# Allow direct execution
proxy_handler = compat_urllib_request.ProxyHandler()
opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler())
compat_urllib_request.install_opener(opener)
-socket.setdefaulttimeout(300) # 5 minutes should be enough (famous last words)
class FileDownloader(youtube_dl.FileDownloader):
def __init__(self, *args, **kwargs):
- youtube_dl.FileDownloader.__init__(self, *args, **kwargs)
self.to_stderr = self.to_screen
+ self.processed_info_dicts = []
+ return youtube_dl.FileDownloader.__init__(self, *args, **kwargs)
+ def process_info(self, info_dict):
+ self.processed_info_dicts.append(info_dict)
+ return youtube_dl.FileDownloader.process_info(self, info_dict)
def _file_md5(fn):
with open(fn, 'rb') as f:
with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
parameters = json.load(pf)
+
class TestDownload(unittest.TestCase):
def setUp(self):
self.parameters = parameters
self.tearDown()
def tearDown(self):
- for test in self.defs:
- fn = test['file']
- if fn and os.path.exists(fn):
- os.remove(fn)
+ for files in [ test['files'] for test in self.defs ]:
+ for fn, md5 in files:
+ if os.path.exists(fn):
+ os.remove(fn)
-def make_test_method(test_case):
+### Dinamically generate tests
+def generator(test_case):
+
def test_template(self):
ie = getattr(youtube_dl.InfoExtractors, test_case['name'] + 'IE')
if not ie._WORKING:
print('Skipping: IE marked as not _WORKING')
return
- if not test_case['file']:
- print('Skipping: No output file specified')
- return
if 'skip' in test_case:
print('Skipping: {0}'.format(test_case['skip']))
return
+
params = dict(self.parameters) # Duplicate it locally
for p in test_case.get('params', {}):
params[p] = test_case['params'][p]
+
fd = FileDownloader(params)
fd.add_info_extractor(ie())
for ien in test_case.get('add_ie', []):
fd.add_info_extractor(getattr(youtube_dl.InfoExtractors, ien + 'IE')())
fd.download([test_case['url']])
- self.assertTrue(os.path.exists(test_case['file']))
- if 'md5' in test_case:
- md5_for_file = _file_md5(test_case['file'])
- self.assertEqual(md5_for_file, test_case['md5'])
- # TODO proper skipping annotations
+ for filename, md5 in test_case['files']:
+ self.assertTrue(os.path.exists(filename))
+ if md5:
+ md5_for_file = _file_md5(filename)
+ self.assertEqual(md5_for_file, md5)
+ info_dict = fd.processed_info_dicts[0]
+ for (info_field, value) in test_case.get('info_dict', {}).items():
+ if value.startswith('md5:'):
+ md5_info_value = hashlib.md5(info_dict.get(info_field, '')).hexdigest()
+ self.assertEqual(value[3:], md5_info_value)
+ else:
+ self.assertEqual(value, info_dict.get(info_field))
+
return test_template
+### And add them to TestDownload
for test_case in defs:
- test_method = make_test_method(test_case)
+ test_method = generator(test_case)
test_method.__name__ = "test_{0}".format(test_case["name"])
setattr(TestDownload, test_method.__name__, test_method)
+ del test_method
if __name__ == '__main__':