+ def _signature_cache_id(self, example_sig):
+ """ Return a string representation of a signature """
+ return u'.'.join(compat_str(len(part)) for part in example_sig.split('.'))
+
+ def _extract_signature_function(self, video_id, player_url, example_sig):
+ id_m = re.match(
+ r'.*-(?P<id>[a-zA-Z0-9_-]+)(?:/watch_as3|/html5player)?\.(?P<ext>[a-z]+)$',
+ player_url)
+ if not id_m:
+ raise ExtractorError('Cannot identify player %r' % player_url)
+ player_type = id_m.group('ext')
+ player_id = id_m.group('id')
+
+ # Read from filesystem cache
+ func_id = '%s_%s_%s' % (
+ player_type, player_id, self._signature_cache_id(example_sig))
+ assert os.path.basename(func_id) == func_id
+ cache_dir = get_cachedir(self._downloader.params)
+
+ cache_enabled = cache_dir is not None
+ if cache_enabled:
+ cache_fn = os.path.join(os.path.expanduser(cache_dir),
+ u'youtube-sigfuncs',
+ func_id + '.json')
+ try:
+ with io.open(cache_fn, 'r', encoding='utf-8') as cachef:
+ cache_spec = json.load(cachef)
+ return lambda s: u''.join(s[i] for i in cache_spec)
+ except IOError:
+ pass # No cache available
+
+ if player_type == 'js':
+ code = self._download_webpage(
+ player_url, video_id,
+ note=u'Downloading %s player %s' % (player_type, player_id),
+ errnote=u'Download of %s failed' % player_url)
+ res = self._parse_sig_js(code)
+ elif player_type == 'swf':
+ urlh = self._request_webpage(
+ player_url, video_id,
+ note=u'Downloading %s player %s' % (player_type, player_id),
+ errnote=u'Download of %s failed' % player_url)
+ code = urlh.read()
+ res = self._parse_sig_swf(code)
+ else:
+ assert False, 'Invalid player type %r' % player_type
+
+ if cache_enabled:
+ try:
+ test_string = u''.join(map(compat_chr, range(len(example_sig))))
+ cache_res = res(test_string)
+ cache_spec = [ord(c) for c in cache_res]
+ try:
+ os.makedirs(os.path.dirname(cache_fn))
+ except OSError as ose:
+ if ose.errno != errno.EEXIST:
+ raise
+ write_json_file(cache_spec, cache_fn)
+ except Exception:
+ tb = traceback.format_exc()
+ self._downloader.report_warning(
+ u'Writing cache to %r failed: %s' % (cache_fn, tb))
+
+ return res
+
+ def _print_sig_code(self, func, example_sig):
+ def gen_sig_code(idxs):
+ def _genslice(start, end, step):
+ starts = u'' if start == 0 else str(start)
+ ends = (u':%d' % (end+step)) if end + step >= 0 else u':'
+ steps = u'' if step == 1 else (u':%d' % step)
+ return u's[%s%s%s]' % (starts, ends, steps)
+
+ step = None
+ start = '(Never used)' # Quelch pyflakes warnings - start will be
+ # set as soon as step is set
+ for i, prev in zip(idxs[1:], idxs[:-1]):
+ if step is not None:
+ if i - prev == step:
+ continue
+ yield _genslice(start, prev, step)
+ step = None
+ continue
+ if i - prev in [-1, 1]:
+ step = i - prev
+ start = prev
+ continue
+ else:
+ yield u's[%d]' % prev
+ if step is None:
+ yield u's[%d]' % i
+ else:
+ yield _genslice(start, i, step)
+
+ test_string = u''.join(map(compat_chr, range(len(example_sig))))
+ cache_res = func(test_string)
+ cache_spec = [ord(c) for c in cache_res]
+ expr_code = u' + '.join(gen_sig_code(cache_spec))
+ signature_id_tuple = '(%s)' % (
+ ', '.join(compat_str(len(p)) for p in example_sig.split('.')))
+ code = (u'if tuple(len(p) for p in s.split(\'.\')) == %s:\n'
+ u' return %s\n') % (signature_id_tuple, expr_code)
+ self.to_screen(u'Extracted signature function:\n' + code)
+
+ def _parse_sig_js(self, jscode):
+ funcname = self._search_regex(
+ r'signature=([$a-zA-Z]+)', jscode,
+ u'Initial JS player signature function name')
+
+ jsi = JSInterpreter(jscode)
+ initial_function = jsi.extract_function(funcname)
+ return lambda s: initial_function([s])
+
+ def _parse_sig_swf(self, file_contents):
+ swfi = SWFInterpreter(file_contents)
+ TARGET_CLASSNAME = u'SignatureDecipher'
+ searched_class = swfi.extract_class(TARGET_CLASSNAME)
+ initial_function = swfi.extract_function(searched_class, u'decipher')
+ return lambda s: initial_function([s])
+
+ def _decrypt_signature(self, s, video_id, player_url, age_gate=False):