migrate to m3u8-ipytv

This commit is contained in:
sadorowo 2024-01-14 18:09:53 +01:00
parent 39ec281d4b
commit 6dc81794c1
2 changed files with 57 additions and 39 deletions

View File

@ -1,6 +1,6 @@
## M3U Logo Matcher ## M3U Logo Matcher
Simple application that will match logos with TV programmes provided by you. Simple script that will match logos with TV programmes provided by you.
### NOTE! ### NOTE!
This may **NOT** be 100% accurate. This may **NOT** be 100% accurate.
@ -9,7 +9,7 @@ This may **NOT** be 100% accurate.
- urllib - urllib
- pathlib - pathlib
- difflib - difflib
- [m3u8](https://github.com/globocom/m3u8) - [ipytv](https://github.com/Beer4Ever83/ipytv)
- re - re
- os - os
@ -26,7 +26,7 @@ Any that display logos as a file tree. A good example is [this provider](http://
![Alt text](image.png) ![Alt text](image.png)
### Result ### Result
When everything is done, result is written in **result_\<m3u-file-name\>.txt** file, in m3u parent directory. When everything is done, playlist file is **OVERWRITTEN**!
### Example usage ### Example usage
```bash ```bash

View File

@ -1,15 +1,16 @@
from argparse import ArgumentParser from argparse import ArgumentParser
from difflib import SequenceMatcher from difflib import SequenceMatcher
import re, m3u8 import re
from ipytv import playlist as tv_playlist
from pathlib import Path from pathlib import Path
from os.path import join
import urllib.request import urllib.request
import urllib.error import urllib.error
verbose = False verbose = False
def debug(message): def debug(message: str):
global verbose global verbose
if verbose: if verbose:
@ -23,15 +24,16 @@ def parse_arguments():
return parser.parse_args() return parser.parse_args()
def fetch_logos(logo_url): def fetch_logos(base_logo_url: str):
debug(f"Fetching logos from {logo_url}...") debug(f"Fetching logos from {base_logo_url}...")
with urllib.request.urlopen(logo_url) as url: with urllib.request.urlopen(base_logo_url) as url:
data = url.read().decode() data = url.read().decode()
# Get logo names # Get logo names
for logo in re.findall(r'<a href="(.+\..+)">(.*\..+)</a>', data): for logo in re.findall(r'<a href="(.+\..+)">(.*\..+)</a>', data):
yield logo logo_url, channel_name = logo
yield f"{base_logo_url}/{logo_url}", channel_name
def process(arguments): def process(arguments):
debug("Processing...") debug("Processing...")
@ -43,39 +45,55 @@ def process(arguments):
# Get m3u data # Get m3u data
debug(f"Loading m3u file {arguments.m3u}...") debug(f"Loading m3u file {arguments.m3u}...")
playlist = m3u8.load(str(arguments.m3u)) playlist = tv_playlist.loadf(str(arguments.m3u))
debug(f"Loaded {len(playlist.segments)} segments") debug(f"Loaded {playlist.length()} channels")
matched_logos = list(match_similar_logos(logos, get_channels(playlist))) matched_logos = list(match_similar_logos(logos, get_channels(playlist)))
debug(f"Matched {len(matched_logos)}/{len(playlist.segments)} logos.") debug(f"Matched {len(matched_logos)}/{playlist.length()} logos.")
debug(f"Missing logos: {len(playlist.segments) - len(matched_logos)}") debug(f"Missing logos: {playlist.length() - len(matched_logos)}")
# Give result replace_logos(playlist, matched_logos)
result_path = join(arguments.m3u.parent, f"result_{arguments.m3u.name}.txt") debug("Logos replaced.")
if Path(result_path).exists(): debug("Saving result...")
response = input(f"Result file {result_path} already exists. Overwrite? [y/N] ")
if response.lower() == "y":
print("Overwriting...")
save_result(result_path, matched_logos)
else:
print("Aborting...")
return
save_result(result_path, matched_logos) save_result(arguments.m3u, playlist)
print(f"Result written to result_{arguments.m3u}.txt")
def save_result(result_path, matched_logos): def replace_logos(playlist: tv_playlist.M3UPlaylist, matched_logos: list[tuple[str, str]]):
with open(result_path, "w+") as rf: debug("Replacing logos...")
debug(f"Writing result to result_{arguments.m3u.name}.txt...")
for channel, logo in matched_logos:
parsed_logo_url = f"{arguments.url}/{logo}"
rf.write(f"{channel} -> {parsed_logo_url}\n") for matched_channel, logo in matched_logos:
debug(f"Replacing logo for {matched_channel}...")
print(f"Result written to {result_path}") channels = playlist.get_channels()
channel: tv_playlist.IPTVChannel = None
channel_index: int = -1
for i, c in enumerate(channels):
if c.name == matched_channel:
channel = c
channel_index = i
break
if channel is None:
# Something weird happened.
print(f"Channel {matched_channel} not found! How did this happen?")
continue
channel_copy = channel.copy()
channel_copy.attributes["tvg-logo"] = logo
playlist.update_channel(channel_index, channel_copy)
def save_result(
input_path: str,
playlist: tv_playlist.M3UPlaylist
):
with open(input_path, "w+") as f:
f.write(playlist.to_m3u_plus_playlist())
print("File overwritten.")
print("Thank you for using this script!") print("Thank you for using this script!")
def match_similar_logos(logos, channels): def match_similar_logos(logos: list[tuple[str, str]], channels: list[str]):
debug("Matching logos...") debug("Matching logos...")
for channel in channels: for channel in channels:
@ -103,11 +121,11 @@ def match_similar_logos(logos, channels):
debug(f"No match for {channel}") debug(f"No match for {channel}")
continue continue
def get_channels(playlist): def get_channels(playlist: tv_playlist.M3UPlaylist):
for segment in playlist.segments: for channel in playlist:
yield segment.title yield channel.name
def get_similarity_ratio(a, b): def get_similarity_ratio(a: str, b: str):
return SequenceMatcher(None, a, b).ratio() return SequenceMatcher(None, a, b).ratio()
if __name__ == "__main__": if __name__ == "__main__":