diff --git a/README.md b/README.md index 0b9221d..d68b7ff 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ## M3U Logo Matcher -Simple application that will match logos with TV programmes provided by you. +Simple script that will match logos with TV programmes provided by you. ### NOTE! This may **NOT** be 100% accurate. @@ -9,7 +9,7 @@ This may **NOT** be 100% accurate. - urllib - pathlib - difflib -- [m3u8](https://github.com/globocom/m3u8) +- [ipytv](https://github.com/Beer4Ever83/ipytv) - re - os @@ -26,7 +26,7 @@ Any that display logos as a file tree. A good example is [this provider](http:// ![Alt text](image.png) ### Result -When everything is done, result is written in **result_\.txt** file, in m3u parent directory. +When everything is done, playlist file is **OVERWRITTEN**! ### Example usage ```bash diff --git a/__main__.py b/__main__.py index 0d2af42..8968a76 100644 --- a/__main__.py +++ b/__main__.py @@ -1,15 +1,16 @@ from argparse import ArgumentParser from difflib import SequenceMatcher -import re, m3u8 +import re + +from ipytv import playlist as tv_playlist from pathlib import Path -from os.path import join import urllib.request import urllib.error verbose = False -def debug(message): +def debug(message: str): global verbose if verbose: @@ -23,15 +24,16 @@ def parse_arguments(): return parser.parse_args() -def fetch_logos(logo_url): - debug(f"Fetching logos from {logo_url}...") +def fetch_logos(base_logo_url: str): + debug(f"Fetching logos from {base_logo_url}...") - with urllib.request.urlopen(logo_url) as url: + with urllib.request.urlopen(base_logo_url) as url: data = url.read().decode() # Get logo names for logo in re.findall(r'(.*\..+)', data): - yield logo + logo_url, channel_name = logo + yield f"{base_logo_url}/{logo_url}", channel_name def process(arguments): debug("Processing...") @@ -43,39 +45,55 @@ def process(arguments): # Get m3u data debug(f"Loading m3u file {arguments.m3u}...") - playlist = m3u8.load(str(arguments.m3u)) - debug(f"Loaded {len(playlist.segments)} segments") + playlist = tv_playlist.loadf(str(arguments.m3u)) + debug(f"Loaded {playlist.length()} channels") matched_logos = list(match_similar_logos(logos, get_channels(playlist))) - debug(f"Matched {len(matched_logos)}/{len(playlist.segments)} logos.") - debug(f"Missing logos: {len(playlist.segments) - len(matched_logos)}") + debug(f"Matched {len(matched_logos)}/{playlist.length()} logos.") + debug(f"Missing logos: {playlist.length() - len(matched_logos)}") - # Give result - result_path = join(arguments.m3u.parent, f"result_{arguments.m3u.name}.txt") - if Path(result_path).exists(): - response = input(f"Result file {result_path} already exists. Overwrite? [y/N] ") - if response.lower() == "y": - print("Overwriting...") - save_result(result_path, matched_logos) - else: - print("Aborting...") - return + replace_logos(playlist, matched_logos) + debug("Logos replaced.") + debug("Saving result...") - save_result(result_path, matched_logos) - print(f"Result written to result_{arguments.m3u}.txt") + save_result(arguments.m3u, playlist) -def save_result(result_path, matched_logos): - with open(result_path, "w+") as rf: - debug(f"Writing result to result_{arguments.m3u.name}.txt...") - for channel, logo in matched_logos: - parsed_logo_url = f"{arguments.url}/{logo}" - - rf.write(f"{channel} -> {parsed_logo_url}\n") +def replace_logos(playlist: tv_playlist.M3UPlaylist, matched_logos: list[tuple[str, str]]): + debug("Replacing logos...") - print(f"Result written to {result_path}") + for matched_channel, logo in matched_logos: + debug(f"Replacing logo for {matched_channel}...") + + channels = playlist.get_channels() + channel: tv_playlist.IPTVChannel = None + channel_index: int = -1 + + for i, c in enumerate(channels): + if c.name == matched_channel: + channel = c + channel_index = i + break + + if channel is None: + # Something weird happened. + print(f"Channel {matched_channel} not found! How did this happen?") + continue + + channel_copy = channel.copy() + channel_copy.attributes["tvg-logo"] = logo + playlist.update_channel(channel_index, channel_copy) + +def save_result( + input_path: str, + playlist: tv_playlist.M3UPlaylist +): + with open(input_path, "w+") as f: + f.write(playlist.to_m3u_plus_playlist()) + + print("File overwritten.") print("Thank you for using this script!") -def match_similar_logos(logos, channels): +def match_similar_logos(logos: list[tuple[str, str]], channels: list[str]): debug("Matching logos...") for channel in channels: @@ -103,11 +121,11 @@ def match_similar_logos(logos, channels): debug(f"No match for {channel}") continue -def get_channels(playlist): - for segment in playlist.segments: - yield segment.title +def get_channels(playlist: tv_playlist.M3UPlaylist): + for channel in playlist: + yield channel.name -def get_similarity_ratio(a, b): +def get_similarity_ratio(a: str, b: str): return SequenceMatcher(None, a, b).ratio() if __name__ == "__main__":