[mixcloud] Use compat_b64decode (closes #15394)
[youtube-dl] / youtube_dl / extractor / mixcloud.py
1 from __future__ import unicode_literals
2
3 import base64
4 import functools
5 import itertools
6 import re
7
8 from .common import InfoExtractor
9 from ..compat import (
10     compat_b64decode,
11     compat_chr,
12     compat_ord,
13     compat_str,
14     compat_urllib_parse_unquote,
15     compat_urlparse,
16     compat_zip
17 )
18 from ..utils import (
19     clean_html,
20     ExtractorError,
21     int_or_none,
22     OnDemandPagedList,
23     str_to_int,
24     try_get,
25     urljoin,
26 )
27
28
29 class MixcloudIE(InfoExtractor):
30     _VALID_URL = r'https?://(?:(?:www|beta|m)\.)?mixcloud\.com/([^/]+)/(?!stream|uploads|favorites|listens|playlists)([^/]+)'
31     IE_NAME = 'mixcloud'
32
33     _TESTS = [{
34         'url': 'http://www.mixcloud.com/dholbach/cryptkeeper/',
35         'info_dict': {
36             'id': 'dholbach-cryptkeeper',
37             'ext': 'm4a',
38             'title': 'Cryptkeeper',
39             'description': 'After quite a long silence from myself, finally another Drum\'n\'Bass mix with my favourite current dance floor bangers.',
40             'uploader': 'Daniel Holbach',
41             'uploader_id': 'dholbach',
42             'thumbnail': r're:https?://.*\.jpg',
43             'view_count': int,
44         },
45     }, {
46         'url': 'http://www.mixcloud.com/gillespeterson/caribou-7-inch-vinyl-mix-chat/',
47         'info_dict': {
48             'id': 'gillespeterson-caribou-7-inch-vinyl-mix-chat',
49             'ext': 'mp3',
50             'title': 'Caribou 7 inch Vinyl Mix & Chat',
51             'description': 'md5:2b8aec6adce69f9d41724647c65875e8',
52             'uploader': 'Gilles Peterson Worldwide',
53             'uploader_id': 'gillespeterson',
54             'thumbnail': 're:https?://.*',
55             'view_count': int,
56         },
57     }, {
58         'url': 'https://beta.mixcloud.com/RedLightRadio/nosedrip-15-red-light-radio-01-18-2016/',
59         'only_matching': True,
60     }]
61
62     @staticmethod
63     def _decrypt_xor_cipher(key, ciphertext):
64         """Encrypt/Decrypt XOR cipher. Both ways are possible because it's XOR."""
65         return ''.join([
66             compat_chr(compat_ord(ch) ^ compat_ord(k))
67             for ch, k in compat_zip(ciphertext, itertools.cycle(key))])
68
69     def _real_extract(self, url):
70         mobj = re.match(self._VALID_URL, url)
71         uploader = mobj.group(1)
72         cloudcast_name = mobj.group(2)
73         track_id = compat_urllib_parse_unquote('-'.join((uploader, cloudcast_name)))
74
75         webpage = self._download_webpage(url, track_id)
76
77         # Legacy path
78         encrypted_play_info = self._search_regex(
79             r'm-play-info="([^"]+)"', webpage, 'play info', default=None)
80
81         if encrypted_play_info is not None:
82             # Decode
83             encrypted_play_info = compat_b64decode(encrypted_play_info)
84         else:
85             # New path
86             full_info_json = self._parse_json(self._html_search_regex(
87                 r'<script id="relay-data" type="text/x-mixcloud">([^<]+)</script>',
88                 webpage, 'play info'), 'play info')
89             for item in full_info_json:
90                 item_data = try_get(
91                     item, lambda x: x['cloudcast']['data']['cloudcastLookup'],
92                     dict)
93                 if try_get(item_data, lambda x: x['streamInfo']['url']):
94                     info_json = item_data
95                     break
96             else:
97                 raise ExtractorError('Failed to extract matching stream info')
98
99         message = self._html_search_regex(
100             r'(?s)<div[^>]+class="global-message cloudcast-disabled-notice-light"[^>]*>(.+?)<(?:a|/div)',
101             webpage, 'error message', default=None)
102
103         js_url = self._search_regex(
104             r'<script[^>]+\bsrc=["\"](https://(?:www\.)?mixcloud\.com/media/(?:js2/www_js_4|js/www)\.[^>]+\.js)',
105             webpage, 'js url')
106         js = self._download_webpage(js_url, track_id, 'Downloading JS')
107         # Known plaintext attack
108         if encrypted_play_info:
109             kps = ['{"stream_url":']
110             kpa_target = encrypted_play_info
111         else:
112             kps = ['https://', 'http://']
113             kpa_target = compat_b64decode(info_json['streamInfo']['url'])
114         for kp in kps:
115             partial_key = self._decrypt_xor_cipher(kpa_target, kp)
116             for quote in ["'", '"']:
117                 key = self._search_regex(
118                     r'{0}({1}[^{0}]*){0}'.format(quote, re.escape(partial_key)),
119                     js, 'encryption key', default=None)
120                 if key is not None:
121                     break
122             else:
123                 continue
124             break
125         else:
126             raise ExtractorError('Failed to extract encryption key')
127
128         if encrypted_play_info is not None:
129             play_info = self._parse_json(self._decrypt_xor_cipher(key, encrypted_play_info), 'play info')
130             if message and 'stream_url' not in play_info:
131                 raise ExtractorError('%s said: %s' % (self.IE_NAME, message), expected=True)
132             song_url = play_info['stream_url']
133             formats = [{
134                 'format_id': 'normal',
135                 'url': song_url
136             }]
137
138             title = self._html_search_regex(r'm-title="([^"]+)"', webpage, 'title')
139             thumbnail = self._proto_relative_url(self._html_search_regex(
140                 r'm-thumbnail-url="([^"]+)"', webpage, 'thumbnail', fatal=False))
141             uploader = self._html_search_regex(
142                 r'm-owner-name="([^"]+)"', webpage, 'uploader', fatal=False)
143             uploader_id = self._search_regex(
144                 r'\s+"profile": "([^"]+)",', webpage, 'uploader id', fatal=False)
145             description = self._og_search_description(webpage)
146             view_count = str_to_int(self._search_regex(
147                 [r'<meta itemprop="interactionCount" content="UserPlays:([0-9]+)"',
148                  r'/listeners/?">([0-9,.]+)</a>',
149                  r'(?:m|data)-tooltip=["\']([\d,.]+) plays'],
150                 webpage, 'play count', default=None))
151
152         else:
153             title = info_json['name']
154             thumbnail = urljoin(
155                 'https://thumbnailer.mixcloud.com/unsafe/600x600/',
156                 try_get(info_json, lambda x: x['picture']['urlRoot'], compat_str))
157             uploader = try_get(info_json, lambda x: x['owner']['displayName'])
158             uploader_id = try_get(info_json, lambda x: x['owner']['username'])
159             description = try_get(info_json, lambda x: x['description'])
160             view_count = int_or_none(try_get(info_json, lambda x: x['plays']))
161
162             stream_info = info_json['streamInfo']
163             formats = []
164
165             for url_key in ('url', 'hlsUrl', 'dashUrl'):
166                 format_url = stream_info.get(url_key)
167                 if not format_url:
168                     continue
169                 decrypted = self._decrypt_xor_cipher(key, compat_b64decode(format_url))
170                 if not decrypted:
171                     continue
172                 if url_key == 'hlsUrl':
173                     formats.extend(self._extract_m3u8_formats(
174                         decrypted, track_id, 'mp4', entry_protocol='m3u8_native',
175                         m3u8_id='hls', fatal=False))
176                 elif url_key == 'dashUrl':
177                     formats.extend(self._extract_mpd_formats(
178                         decrypted, track_id, mpd_id='dash', fatal=False))
179                 else:
180                     formats.append({
181                         'format_id': 'http',
182                         'url': decrypted,
183                     })
184             self._sort_formats(formats)
185
186         return {
187             'id': track_id,
188             'title': title,
189             'formats': formats,
190             'description': description,
191             'thumbnail': thumbnail,
192             'uploader': uploader,
193             'uploader_id': uploader_id,
194             'view_count': view_count,
195         }
196
197
198 class MixcloudPlaylistBaseIE(InfoExtractor):
199     _PAGE_SIZE = 24
200
201     def _find_urls_in_page(self, page):
202         for url in re.findall(r'm-play-button m-url="(?P<url>[^"]+)"', page):
203             yield self.url_result(
204                 compat_urlparse.urljoin('https://www.mixcloud.com', clean_html(url)),
205                 MixcloudIE.ie_key())
206
207     def _fetch_tracks_page(self, path, video_id, page_name, current_page, real_page_number=None):
208         real_page_number = real_page_number or current_page + 1
209         return self._download_webpage(
210             'https://www.mixcloud.com/%s/' % path, video_id,
211             note='Download %s (page %d)' % (page_name, current_page + 1),
212             errnote='Unable to download %s' % page_name,
213             query={'page': real_page_number, 'list': 'main', '_ajax': '1'},
214             headers={'X-Requested-With': 'XMLHttpRequest'})
215
216     def _tracks_page_func(self, page, video_id, page_name, current_page):
217         resp = self._fetch_tracks_page(page, video_id, page_name, current_page)
218
219         for item in self._find_urls_in_page(resp):
220             yield item
221
222     def _get_user_description(self, page_content):
223         return self._html_search_regex(
224             r'<div[^>]+class="profile-bio"[^>]*>(.+?)</div>',
225             page_content, 'user description', fatal=False)
226
227
228 class MixcloudUserIE(MixcloudPlaylistBaseIE):
229     _VALID_URL = r'https?://(?:www\.)?mixcloud\.com/(?P<user>[^/]+)/(?P<type>uploads|favorites|listens)?/?$'
230     IE_NAME = 'mixcloud:user'
231
232     _TESTS = [{
233         'url': 'http://www.mixcloud.com/dholbach/',
234         'info_dict': {
235             'id': 'dholbach_uploads',
236             'title': 'Daniel Holbach (uploads)',
237             'description': 'md5:def36060ac8747b3aabca54924897e47',
238         },
239         'playlist_mincount': 11,
240     }, {
241         'url': 'http://www.mixcloud.com/dholbach/uploads/',
242         'info_dict': {
243             'id': 'dholbach_uploads',
244             'title': 'Daniel Holbach (uploads)',
245             'description': 'md5:def36060ac8747b3aabca54924897e47',
246         },
247         'playlist_mincount': 11,
248     }, {
249         'url': 'http://www.mixcloud.com/dholbach/favorites/',
250         'info_dict': {
251             'id': 'dholbach_favorites',
252             'title': 'Daniel Holbach (favorites)',
253             'description': 'md5:def36060ac8747b3aabca54924897e47',
254         },
255         'params': {
256             'playlist_items': '1-100',
257         },
258         'playlist_mincount': 100,
259     }, {
260         'url': 'http://www.mixcloud.com/dholbach/listens/',
261         'info_dict': {
262             'id': 'dholbach_listens',
263             'title': 'Daniel Holbach (listens)',
264             'description': 'md5:def36060ac8747b3aabca54924897e47',
265         },
266         'params': {
267             'playlist_items': '1-100',
268         },
269         'playlist_mincount': 100,
270     }]
271
272     def _real_extract(self, url):
273         mobj = re.match(self._VALID_URL, url)
274         user_id = mobj.group('user')
275         list_type = mobj.group('type')
276
277         # if only a profile URL was supplied, default to download all uploads
278         if list_type is None:
279             list_type = 'uploads'
280
281         video_id = '%s_%s' % (user_id, list_type)
282
283         profile = self._download_webpage(
284             'https://www.mixcloud.com/%s/' % user_id, video_id,
285             note='Downloading user profile',
286             errnote='Unable to download user profile')
287
288         username = self._og_search_title(profile)
289         description = self._get_user_description(profile)
290
291         entries = OnDemandPagedList(
292             functools.partial(
293                 self._tracks_page_func,
294                 '%s/%s' % (user_id, list_type), video_id, 'list of %s' % list_type),
295             self._PAGE_SIZE)
296
297         return self.playlist_result(
298             entries, video_id, '%s (%s)' % (username, list_type), description)
299
300
301 class MixcloudPlaylistIE(MixcloudPlaylistBaseIE):
302     _VALID_URL = r'https?://(?:www\.)?mixcloud\.com/(?P<user>[^/]+)/playlists/(?P<playlist>[^/]+)/?$'
303     IE_NAME = 'mixcloud:playlist'
304
305     _TESTS = [{
306         'url': 'https://www.mixcloud.com/RedBullThre3style/playlists/tokyo-finalists-2015/',
307         'info_dict': {
308             'id': 'RedBullThre3style_tokyo-finalists-2015',
309             'title': 'National Champions 2015',
310             'description': 'md5:6ff5fb01ac76a31abc9b3939c16243a3',
311         },
312         'playlist_mincount': 16,
313     }, {
314         'url': 'https://www.mixcloud.com/maxvibes/playlists/jazzcat-on-ness-radio/',
315         'only_matching': True,
316     }]
317
318     def _real_extract(self, url):
319         mobj = re.match(self._VALID_URL, url)
320         user_id = mobj.group('user')
321         playlist_id = mobj.group('playlist')
322         video_id = '%s_%s' % (user_id, playlist_id)
323
324         webpage = self._download_webpage(
325             url, user_id,
326             note='Downloading playlist page',
327             errnote='Unable to download playlist page')
328
329         title = self._html_search_regex(
330             r'<a[^>]+class="parent active"[^>]*><b>\d+</b><span[^>]*>([^<]+)',
331             webpage, 'playlist title',
332             default=None) or self._og_search_title(webpage, fatal=False)
333         description = self._get_user_description(webpage)
334
335         entries = OnDemandPagedList(
336             functools.partial(
337                 self._tracks_page_func,
338                 '%s/playlists/%s' % (user_id, playlist_id), video_id, 'tracklist'),
339             self._PAGE_SIZE)
340
341         return self.playlist_result(entries, video_id, title, description)
342
343
344 class MixcloudStreamIE(MixcloudPlaylistBaseIE):
345     _VALID_URL = r'https?://(?:www\.)?mixcloud\.com/(?P<id>[^/]+)/stream/?$'
346     IE_NAME = 'mixcloud:stream'
347
348     _TEST = {
349         'url': 'https://www.mixcloud.com/FirstEar/stream/',
350         'info_dict': {
351             'id': 'FirstEar',
352             'title': 'First Ear',
353             'description': 'Curators of good music\nfirstearmusic.com',
354         },
355         'playlist_mincount': 192,
356     }
357
358     def _real_extract(self, url):
359         user_id = self._match_id(url)
360
361         webpage = self._download_webpage(url, user_id)
362
363         entries = []
364         prev_page_url = None
365
366         def _handle_page(page):
367             entries.extend(self._find_urls_in_page(page))
368             return self._search_regex(
369                 r'm-next-page-url="([^"]+)"', page,
370                 'next page URL', default=None)
371
372         next_page_url = _handle_page(webpage)
373
374         for idx in itertools.count(0):
375             if not next_page_url or prev_page_url == next_page_url:
376                 break
377
378             prev_page_url = next_page_url
379             current_page = int(self._search_regex(
380                 r'\?page=(\d+)', next_page_url, 'next page number'))
381
382             next_page_url = _handle_page(self._fetch_tracks_page(
383                 '%s/stream' % user_id, user_id, 'stream', idx,
384                 real_page_number=current_page))
385
386         username = self._og_search_title(webpage)
387         description = self._get_user_description(webpage)
388
389         return self.playlist_result(entries, user_id, username, description)