From f3a16083b9f09c132dce6f262a1fb30f672b9441 Mon Sep 17 00:00:00 2001 From: garret1317 Date: Sun, 14 Sep 2025 15:04:02 +0100 Subject: rename "misc" -> "contrib" --- contrib/generate_html.py | 84 +++++++++++++++++ contrib/how to do a release | 41 +++++++++ contrib/old_generate_changelog.py | 116 ++++++++++++++++++++++++ contrib/protostuff.py | 154 ++++++++++++++++++++++++++++++++ contrib/randominfo.py | 12 +++ contrib/streammon.py | 66 ++++++++++++++ contrib/test_areas.py | 26 ++++++ contrib/test_extractors.py | 183 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 682 insertions(+) create mode 100755 contrib/generate_html.py create mode 100644 contrib/how to do a release create mode 100755 contrib/old_generate_changelog.py create mode 100755 contrib/protostuff.py create mode 100755 contrib/randominfo.py create mode 100755 contrib/streammon.py create mode 100755 contrib/test_areas.py create mode 100755 contrib/test_extractors.py (limited to 'contrib') diff --git a/contrib/generate_html.py b/contrib/generate_html.py new file mode 100755 index 0000000..0e15d6a --- /dev/null +++ b/contrib/generate_html.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +import os +import hashlib +import re + +pip_index = open("index.html", "w") + +pip_index.write(""" + + + yt-dlp-rajiko pip index + + + + + + + +""") + +latest_tarball = tarballs[-1] +latest_wheel = wheels[-1] +print(latest_tarball, latest_wheel) + +os.remove("yt_dlp_rajiko-latest.tar.gz") +os.symlink(latest_tarball, "yt_dlp_rajiko-latest.tar.gz") + +os.remove("yt_dlp_rajiko-latest.whl") +os.symlink(latest_wheel, "yt_dlp_rajiko-latest.whl") + +site_sha256.reverse() + +latest_list = site_sha256[:2] +previous_list = site_sha256[2:] + +latest = "\n".join(["", "", "\n".join(latest_list), "", ""]) + +previous = "\n".join(["", "", "\n".join(previous_list), "", ""]) + +for i in ["../../index.html", "../../index.ja.html"]: + with open(i, "r+") as f: + page = f.read() + + page = re.sub(r".+", latest, page, flags=re.DOTALL) + page = re.sub(r".+", previous, page, flags=re.DOTALL) + + f.seek(0) + f.truncate(0) + f.write(page) diff --git a/contrib/how to do a release b/contrib/how to do a release new file mode 100644 index 0000000..6e91e14 --- /dev/null +++ b/contrib/how to do a release @@ -0,0 +1,41 @@ +putting this here because i'll forget how to do it otherwise + +update the pyproject.toml +tag it in git, eg v1.0 + +## build the builds +python3 -m build + +and then put BOTH items from `dist` into the pip index dir - ~/site2/yt-dlp-rajiko/pip/yt-dlp-rajiko/ +because without the .whl pip has to "build" it itself, with all the stuff that needs to be installed for that to work + +run script to update the pip index html and the dl/ "latest" symlinks +this also updates the sha256s on the site + +## update the changelog file + +write in html, paste into the feed xml like +make sure to set the link, date +to get date use: +git log --pretty --date=rfc2822 + +include the pip instructions, sha256sum etc + +now push to the server + +!!NEW!! +upload to pip proper as well +go to dl/ dir and do +twine upload yt_dlp_rajiko-1.x* + + +## update github + +paste the changelog output into a github release, upload the new builds +change link at the bottom to just "below" + +post in the radiko thread on 5ch if i can be bothered + +and thats probably all diff --git a/contrib/old_generate_changelog.py b/contrib/old_generate_changelog.py new file mode 100755 index 0000000..1bce073 --- /dev/null +++ b/contrib/old_generate_changelog.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +import email.utils +import feedgenerator + +def parse_changelog(lines): + got_version = False + got_date = False + got_url = False + done_remarks = False + releases = [] + release = {} + release_remarks = [] + release_changes = [] + current_change = "" + + for idx, line in enumerate(lines): + line = line.rstrip() + + if not got_version: + got_version = True + release["version"] = line + continue + + if not got_date: + release["date"] = email.utils.parsedate_to_datetime(line) + got_date = True + continue + + key, sep, val = line.partition(": ") + if key in ["url", "sha256", "released"] and val != "": + release[key] = val + continue + + if not done_remarks: + if line == "": + done_remarks = True + release["remarks"] = release_remarks + release_remarks = [] + continue + else: + release_remarks.append(line) + continue + + if line != "": + release_changes.append(line.rstrip()) + + if idx + 1 != len(lines): + continue + + release["changes"] = release_changes + if release.get("released") != "no": + releases.append(release) + + got_version = False + got_date = False + done_remarks = False + release = {} + release_changes = [] + + return releases + +def generate_rss_feed(releases): + feed = feedgenerator.Rss201rev2Feed( + title="yt-dlp-rajiko changelog", + description="Notifications for new yt-dlp-rajiko releases, with changelogs", + link="https://427738.xyz/yt-dlp-rajiko/", + language="en-GB", + ttl=180, # 3 hours + ) + + for release in releases: + title = "yt-dlp-rajiko " + release["version"] + " has been released" + description = "" + description += "

" + for remark in release["remarks"]: + description += remark + description += "
" + description += "

" + description += "

This release:

\n" + description += "

" + + if release.get("url"): + if release["version"] != "1.0": + description += "\n

If you use pip, you should be able to upgrade with pip install yt-dlp-rajiko --upgrade --extra-index-url https://427738.xyz/yt-dlp-rajiko/pip/.
" + description += "If you installed manually, you can download the updated .whl from this post's link." + if release.get("sha256"): + description += " The SHA256 checksum should be " + description += release.get("sha256") + description += "." + description += "

" + else: + description += '\n

Please see the homepage for initial installation instructions.

' + + feed.add_item( + title=title, + description=description, + link=release.get("url"), + pubdate=release["date"] + ) + return feed + +if __name__ == "__main__": + with open("CHANGELOG") as f: + releases = parse_changelog(f.readlines()) + + feed = generate_rss_feed(releases) + feed_contents = feed.writeString("utf-8") + feed_contents = feed_contents.replace("\nBI', compression_flag, message_length) + return header + protobuf_data + +def strip_grpc_response(response): + return response[5:].rpartition(b"grpc-status:")[0] + +print("SIGNUP") +# why do they have to make it so bloody complicated + +lsid = ''.join(random.choices('0123456789abcdef', k=32)) +big_funny = ("\n " + lsid).encode() + +signup = requests.post("https://api.annex.radiko.jp/radiko.UserService/SignUp", headers={ + 'Origin': 'https://radiko.jp', + 'Content-Type': 'application/grpc-web+proto', + 'X-User-Agent': 'grpc-web-javascript/0.1', + 'X-Grpc-Web': '1', + }, data=( add_grpc_header(big_funny)), +) + +print(signup.content) + +# youre meant to only do the sign up ^ once and then keep your id for later +# so that you can V sign in and get the token for the API to work + +print("SIGNIN") + +si=add_grpc_header(protobug.dumps(SignInRequest( + lsid=lsid, + area="JP13", +))) + +print(si) +print(base64.b64encode(si)) + +signin = requests.post("https://api.annex.radiko.jp/radiko.UserService/SignIn", headers={ + 'Origin': 'https://radiko.jp', + 'Content-Type': 'application/grpc-web+proto', + 'X-User-Agent': 'grpc-web-javascript/0.1', + 'X-Grpc-Web': '1', +}, data=si) + +print(signin.content) + +signin_result = protobug.loads(strip_grpc_response(signin.content), SignInResponse) + + +headers = { + 'Origin': 'https://radiko.jp', + 'Authorization': f'Bearer {signin_result.jwt}', + 'x-annex-proto-version': '1.0.0', + 'Content-Type': 'application/grpc-web+proto', + 'X-User-Agent': 'grpc-web-javascript/0.1', + 'X-Grpc-Web': '1', +} + +response = requests.post('https://api.annex.radiko.jp/radiko.PodcastService/ListPodcastEpisodes', headers=headers, + data=add_grpc_header(protobug.dumps(ListPodcastEpisodesRequest( + channel_id="0ce1d2d7-5e07-4ec5-901a-d0eacdacc332", + dontknow=1, + page_length=200, # site uses 20 +# cursor="ef693874-0ad2-48cc-8c52-ac4de31cbf54" # here you put the id of the last episode you've seen in the list + ))) +) + +print(response) + +episodes = strip_grpc_response(response.content) + + +with open("ListPodcastEpisodes.bin", "wb") as f: + f.write(episodes) + + +@protobug.message +class Audio: + revision: protobug.Int32 = protobug.field(1) + url: protobug.String = protobug.field(2) + fileSize: protobug.Int64 = protobug.field(3) + durationSec: protobug.Int64 = protobug.field(4) + transcoded: protobug.Bool = protobug.field(5) + +@protobug.message +class EpisodeStartAt: + seconds: protobug.UInt64 = protobug.field(1) + nanos: protobug.UInt64 = protobug.field(2, default=0) + + +@protobug.message +class PodcastEpisode: + id: protobug.String = protobug.field(1) + workspaceId: protobug.String = protobug.field(2) + channelId: protobug.String = protobug.field(3) + title: protobug.String = protobug.field(4) + description: protobug.String = protobug.field(5) + + audio: Audio = protobug.field(8) + channelImageUrl: protobug.String = protobug.field(16) + channelTitle: protobug.String = protobug.field(17) + channelStationName: protobug.String = protobug.field(18) + channelAuthor: protobug.String = protobug.field(19) + + channelThumbnailImageUrl: protobug.String = protobug.field(21) + channelStationType: protobug.UInt32 = protobug.field(22) + startAt: EpisodeStartAt = protobug.field(27) + isEnabled: protobug.Bool = protobug.field(29) + hasTranscription: protobug.Bool = protobug.field(32) + + imageUrl: protobug.String = protobug.field(7, default=None) + thumbnailImageUrl: protobug.String = protobug.field(20, default=None) + +@protobug.message +class ListPodcastEpisodesResponse: + episodes: list[PodcastEpisode] = protobug.field(1) + hasNextPage: protobug.Bool = protobug.field(2, default=False) + + +episodes_response = protobug.loads(episodes, ListPodcastEpisodesResponse) + +print(episodes_response) + +for e in episodes_response.episodes: + print(e.title, e.id) +print(episodes_response.hasNextPage) diff --git a/contrib/randominfo.py b/contrib/randominfo.py new file mode 100755 index 0000000..bdb7660 --- /dev/null +++ b/contrib/randominfo.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +from yt_dlp_plugins.extractor import radiko +from yt_dlp import YoutubeDL + + +ie = radiko._RadikoBaseIE() +ydl = YoutubeDL(auto_init=False) +ie.set_downloader(ydl) + +info = ie._generate_random_info() +print("random device info") +print(info) diff --git a/contrib/streammon.py b/contrib/streammon.py new file mode 100755 index 0000000..8f52bb4 --- /dev/null +++ b/contrib/streammon.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# monitor stream APIs for any changes, so I can check they don't break anything +# run via cronjob every now and then + +import difflib +import os +import sys +import xml.etree.ElementTree as ET +from datetime import datetime + +import requests + +s = requests.Session() + +DISCORD_WEBHOOK = "PUT WEBHOOK HERE" +STREAMS_API = "https://radiko.jp/v3/station/stream/{device}/{station}.xml" + +if len(sys.argv) > 1: + PATH = sys.argv[1] +else: + PATH = "" + +devices = ('pc_html5', 'aSmartPhone7a', 'aSmartPhone8') +stations = ('FMT', 'CCL', 'NORTHWAVE', 'TBS') + +def format_xml(txt): + root = ET.fromstring(txt) + res = "" + for el in root.findall("url"): + res += el.find("playlist_create_url").text + for k, v in el.attrib.items(): + res += f" {k}:{v}" + + res += "\n" + return res + +for device in devices: + for station in stations: + url = STREAMS_API.format(device=device, station=station) + now_response = s.get(url) + now = now_response.text + now_modified = now_response.headers["last-modified"] + now_datetime = datetime.strptime(now_modified, "%a, %d %b %Y %H:%M:%S %Z") + + + filename = f"{PATH}{station}-{device}.xml" + with open(filename, "a+") as f: + f.seek(0) + past = f.read() + + modtime = datetime.fromtimestamp(os.path.getmtime(filename)) + diff = difflib.unified_diff( + format_xml(past).splitlines(), format_xml(now).splitlines(), + fromfile=url, tofile=url, + fromfiledate=str(modtime), tofiledate=str(now_datetime.now()), + ) + + diff_str = "\n".join(diff) + if diff_str != "": + f.truncate(0) + f.write(now) + + s.post(DISCORD_WEBHOOK, json={ + "content": f"**Streams changed: {station} {device}**\n" + "\n".join(("```diff", diff_str, "```")), + }) + os.utime(filename, (now_datetime.timestamp(), now_datetime.timestamp())) diff --git a/contrib/test_areas.py b/contrib/test_areas.py new file mode 100755 index 0000000..ba6475f --- /dev/null +++ b/contrib/test_areas.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import unittest + +from yt_dlp_plugins.extractor import radiko +from yt_dlp import YoutubeDL + + +class test_tokens(unittest.TestCase): + + def setUp(self): + self.ie = radiko._RadikoBaseIE() + ydl = YoutubeDL(auto_init=False) + self.ie.set_downloader(ydl) + + def test_area(self): + # check areas etc work + for i in range(1, 48): + area = "JP" + str(i) + with self.subTest(f"Negotiating token for {area}", area=area): + token = self.ie._negotiate_token(area) + self.assertEqual(token.get("X-Radiko-AreaId"), area) + + +if __name__ == '__main__': + unittest.main() + # may wish to set failfast=True diff --git a/contrib/test_extractors.py b/contrib/test_extractors.py new file mode 100755 index 0000000..21800c5 --- /dev/null +++ b/contrib/test_extractors.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 + +# programmes expire, so i have to update the times in the tests every time i run them +# but thats a massive ballache, so i end up just not running them, which leads to cockups +# so, this script has the tests automatically use the latest episode as you run it, by setting dynamically generated time values +# everything else is always the same so it should be fine lol + + +import datetime +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.insert(0, "/home/g/Downloads/yt-dlp/") # TODO: un-hardcode. has to be the source/git repo because pip doesnt carry the tests + +from yt_dlp_plugins.extractor import radiko_time as rtime + +MON, TUE, WED, THU, FRI, SAT, SUN = range(7) +weekdays = {0: "MON", 1: "TUE", 2: "WED", 3: "THU", 4: "FRI", 5: "SAT", 6: "SUN"} + +now = rtime.RadikoTime.now(tz = rtime.JST) +UTC = datetime.timezone.utc + +def get_latest_airtimes(now, weekday, hour, minute, duration): + days_after_weekday = (7 - (now.weekday() - weekday)) % 7 + latest_airdate = (now + datetime.timedelta(days=days_after_weekday)).replace(hour=hour, minute=minute, second=0, microsecond=0) + if (latest_airdate + duration) > now: + latest_airdate -= datetime.timedelta(days=7) + return latest_airdate, latest_airdate + duration + +def get_test_timefields(airtime, release_time): + return { + "timestamp": airtime.timestamp(), + "release_timestamp": release_time.timestamp(), + "upload_date": airtime.astimezone(UTC).strftime("%Y%m%d"), + "release_date": release_time.astimezone(UTC).strftime("%Y%m%d"), + + "duration": (release_time - airtime).total_seconds(), + } + + + + +from yt_dlp_plugins.extractor.radiko import ( + RadikoTimeFreeIE, RadikoShareIE, + RadikoLiveIE, RadikoPersonIE, RadikoStationButtonIE, + RadikoRSeasonsIE +) + +from yt_dlp_plugins.extractor.radiko_podcast import ( + RadikoPodcastEpisodeIE, RadikoPodcastChannelIE, +) +RadikoTimeFreeIE._TESTS = [] + + + +# TOKYO MOON - interfm - EVERY FRI 2300 +airtime, release_time = get_latest_airtimes(now, FRI, 23, 0, datetime.timedelta(hours=1)) +RadikoTimeFreeIE._TESTS.append({ + "url": f"https://radiko.jp/#!/ts/INT/{airtime.timestring()}", + "info_dict": { + "ext": "m4a", + "id": f"INT-{airtime.timestring()}", + + **get_test_timefields(airtime, release_time), + + 'title': 'TOKYO MOON', + 'description': r're:[\S\s]+Xハッシュタグは「#tokyomoon」$', + 'uploader': 'interfm', + 'uploader_id': 'INT', + 'uploader_url': 'https://www.interfm.co.jp/', + 'channel': 'interfm', + 'channel_id': 'INT', + 'channel_url': 'https://www.interfm.co.jp/', + 'thumbnail': 'https://program-static.cf.radiko.jp/ehwtw6mcvy.jpg', + 'chapters': list, + 'tags': ['松浦俊夫', 'ジャズの魅力を楽しめる'], + 'cast': ['松浦\u3000俊夫'], + 'series': 'Tokyo Moon', + 'live_status': 'was_live', + } +}) + + +# late-night/v. early morning show, to test broadcast day handling +# this should be monday 27:00 / tuesday 03:00 +airtime, release_time = get_latest_airtimes(now, TUE, 3, 0, datetime.timedelta(hours=2)) +RadikoTimeFreeIE._TESTS.append({ + "url": f"https://radiko.jp/#!/ts/TBS/{airtime.timestring()}", + "info_dict": { + "ext": "m4a", + "id": f"TBS-{airtime.timestring()}", + + **get_test_timefields(airtime, release_time), + 'title': 'CITY CHILL CLUB', + 'description': r"re:^目を閉じて…リラックスして[\S\s]+chill@tbs.co.jp$", + 'uploader': 'TBSラジオ', + 'uploader_id': 'TBS', + 'uploader_url': 'https://www.tbsradio.jp/', + 'channel': 'TBSラジオ', + 'channel_id': 'TBS', + 'channel_url': 'https://www.tbsradio.jp/', + 'thumbnail': 'https://program-static.cf.radiko.jp/nrf8fowbjo.jpg', + 'chapters': list, + 'tags': ['CCC905', '音楽との出会いが楽しめる', '人気アーティストトーク', '音楽プロデューサー出演', 'ドライブ中におすすめ', '寝る前におすすめ', '学生におすすめ'], + 'cast': list, + 'series': 'CITY CHILL CLUB', + 'live_status': 'was_live', + }, +}) + + +# testing 29-hour clock handling +airtime, release_time = get_latest_airtimes(now, WED, 0, 0, datetime.timedelta(minutes=55)) +share_timestring = (airtime - datetime.timedelta(days=1)).strftime("%Y%m%d") + "240000" + +RadikoShareIE._TESTS = [{ + "url": f"http://radiko.jp/share/?sid=FMT&t={share_timestring}", + "info_dict": { + "live_status": "was_live", + "ext": "m4a", + "id": f"FMT-{airtime.timestring()}", + + **get_test_timefields(airtime, release_time), + + "title": "JET STREAM", + "series": "JET STREAM", + "description": r"re:^JET STREAM・・・[\s\S]+https://www.tfm.co.jp/f/jetstream/message$", + "chapters": list, + "thumbnail": "https://program-static.cf.radiko.jp/greinlrspi.jpg", + + "channel": "TOKYO FM", + "channel_id": "FMT", + "channel_url": "https://www.tfm.co.jp/", + "uploader": "TOKYO FM", + "uploader_id": "FMT", + "uploader_url": "https://www.tfm.co.jp/", + + "cast": ["福山雅治"], + "tags": ["福山雅治", "夜間飛行", "音楽との出会いが楽しめる", "朗読を楽しめる", "寝る前に聴きたい"], + }, + }] + + + +IEs = [ + RadikoTimeFreeIE, RadikoShareIE, + RadikoLiveIE, RadikoPersonIE, RadikoStationButtonIE, + RadikoPodcastEpisodeIE, RadikoPodcastChannelIE, + RadikoRSeasonsIE, +] + +import test.helper as th + +# override to only get testcases from our IEs + +def _new_gettestcases(include_onlymatching=False): + import yt_dlp.plugins as plugins + plugins.load_all_plugins() + + for ie in IEs: + yield from ie.get_testcases(include_onlymatching) + +def _new_getwebpagetestcases(): + import yt_dlp.plugins as plugins + plugins.load_all_plugins() + + for ie in IEs: + for tc in ie.get_webpage_testcases(): + tc.setdefault('add_ie', []).append('Generic') + yield tc + +th.gettestcases = _new_gettestcases +th.getwebpagetestcases = _new_getwebpagetestcases + +import test.test_download as td + +class TestDownload(td.TestDownload): + pass + +if __name__ == "__main__": + unittest.main() -- cgit v1.2.3-70-g09d2