Initial commit

This commit is contained in:
2026-03-19 12:13:02 -04:00
commit b9e7e1bfca
23 changed files with 1021 additions and 0 deletions

1
src/watfag/__init__.py Normal file
View File

@@ -0,0 +1 @@
__version__ = "1.0.0"

View File

View File

@@ -0,0 +1,84 @@
import importlib
from pathlib import Path
from pkgutil import iter_modules
from typing import Optional, Type
from parsers.generic.parsers import DataParser
from parsers.generic.watfag import WATFAG, SeedStatus
class Release:
def __init__(
self,
unparsed_text,
dl_link,
**kwargs
):
self.original_text: str = unparsed_text
self.metadata_text: Optional[str] = ''
self.dl_link: str = dl_link
self.view_link: str = kwargs.get('view_link', dl_link)
self.size: int = kwargs.get('size', 0)
self.seeders: int = kwargs.get('seeders', 0)
self.seed_status: Optional[SeedStatus] = None
self.parser_results: dict[str, bool] = {} # Stores which parsers have been run and their results.
def __lt__(self, other):
return self.watfag < other.watfag
def fully_consumed(self):
return self.metadata_text is not None and self.metadata_text.strip() == ""
@property
def watfag(self):
sum_of_weights = 0
total_score = 0
for attr, value in self.__dict__.items():
if issubclass(value.__class__, WATFAG):
sum_of_weights += value.weight()
total_score += value.score
return total_score / sum_of_weights if sum_of_weights > 0 else 0
@property
def str_size(self):
if self.size >= 1 << 40:
return f"{self.size / (1 << 40):.2f} TB"
elif self.size >= 1 << 30:
return f"{self.size / (1 << 30):.2f} GB"
elif self.size >= 1 << 20:
return f"{self.size / (1 << 20):.2f} MB"
elif self.size >= 1 << 10:
return f"{self.size / (1 << 10):.2f} KB"
else:
return f"{self.size} B"
class GenericParser:
"""
This class can be inherited by any parser that wants to use the generic WATFAG parsing logic.
It allows dynamic importing of parser classes.
"""
class ParserManager:
"""Manages and runs parsers on releases."""
def __init__(self):
self.parsers: list[Type[DataParser]] = []
self.collect_parsers()
def collect_parsers(self):
"""Dynamically imports all generic parsers."""
package_dir = Path(__file__).parent
for _, module_name, _ in iter_modules([package_dir]):
importlib.import_module(f"{__package__}.{module_name}")
self.parsers.extend(GenericParser.__subclasses__())
def run_parsers(self, release: Release):
"""Runs all parsers on the given release in order of their priority."""
instances = [parser_cls(release) for parser_cls in self.parsers]
instances.sort() # Sort by priority
for parser in instances:
result = parser.parse()
release.parser_results[parser.__class__.__name__] = result

View File

@@ -0,0 +1,86 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import DataParser
from parsers.generic.watfag import AudioCodec, AudioLayout
patterns = [
re.compile(
r"(?P<codec>(?:aac|dts(?:(?:[ -]?hd)?(?:[ -]?(?:ma))?)?|dd[p+]?a?|(?:e-?)?ac-?3|truehd|flac|mp3|opus|wav)(?: ?atmos)?)"
r"[ -]?(?P<layout>[257][. ]?[01])",
re.IGNORECASE
),
re.compile( # When audio layout is not specified, we can still try to extract the codec
r"(?P<codec>(?:aac|dts(?:(?:[ -]?hd)?(?:[ -]?(?:ma))?)?|dd[p+]?a?|(?:e-?)?ac-?3|truehd|flac|mp3|opus|wav)(?: ?atmos)?)",
re.IGNORECASE
)
]
codec_aliases = [
(["AAC"], AudioCodec.AAC),
(["DTS"], AudioCodec.DTS),
(["DTSHDMA", "DTSHD", "DTSMA"], AudioCodec.DTS_MA),
(["AC3", "DD"], AudioCodec.DD),
(["EAC3", "DDP", "DD+"], AudioCodec.DDP),
(["TRUEHD"], AudioCodec.TRUEHD),
(["FLAC"], AudioCodec.FLAC),
(["MP3"], AudioCodec.MP3),
(["OPUS"], AudioCodec.OPUS),
(["WAV"], AudioCodec.WAV)
]
layout_aliases = [
(["mono", "1.0", "1"], AudioLayout.MONO),
(["stereo", "2.0", "2"], AudioLayout.STEREO),
(["5.1"], AudioLayout.SURROUND),
(["7.1"], AudioLayout.SURROUND_SIDE)
]
class AudioParser(DataParser, GenericParser):
def __init__(self, movie):
super().__init__(movie)
self.priority = 45
"""Parses the audio codec and channel layout from the unparsed text."""
def parse(self) -> bool:
match = patterns[0].search(self.release.metadata_text) # First try to find both codec and layout together
if not match:
match = patterns[1].search(self.release.metadata_text) # If that fails, try to find just the codec
if match:
temp_codec = match.group("codec").upper().replace(" ", "").replace("-", "")
atmos = False
if "ATMOS" in temp_codec:
temp_codec = temp_codec.replace("ATMOS", "")
atmos = True
if temp_codec == "DDPA": # Special case for "DDPA" which is a common abbreviation for "Dolby Digital Plus Atmos"
atmos = True
temp_codec = "DDP"
if "atmos" in self.release.metadata_text.lower():
atmos = True
for aliases, standard in codec_aliases:
if temp_codec in aliases:
self.release.audio_codec = standard
break
for aliases, standard in layout_aliases:
if "layout" in match.groupdict().keys() and match.group("layout").replace(" ", ".") in aliases:
if atmos:
standard = AudioLayout.from_string(str(standard) + " + Atmos")
self.release.audio_layout = standard
break
self.release.metadata_text = self.release.metadata_text[:match.span()[0]] + self.release.metadata_text[match.span()[1]:]
self.release.metadata_text = re.sub(r"atmos", "", self.release.metadata_text, flags=re.IGNORECASE) # Remove any remaining "atmos" mentions
self.release.metadata_text = re.sub(r"\s+", " ", self.release.metadata_text).strip() # Clean up extra spaces
return True
else:
self.release.audio_codec = AudioCodec.UKNOWN
self.release.audio_layout = AudioLayout.UKNOWN
return False

View File

@@ -0,0 +1,45 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.watfag import Group
from parsers.generic.parsers import DataParser
patterns = [
re.compile(r"(?:-| - )(?P<group>[a-zA-Z0-9 &]*)\)?$", re.UNICODE),
re.compile(r"(?: )\[?(?P<group>[a-zA-Z0-9]*?)]?\)?$", re.UNICODE)
]
invalid_group_substrs = [ # lowercase-only matches that can be any substring of a group name
' hevc ', # Can appear at end of release name while not being a group
' x264 ',
' x265 ',
' truehd ',
'bluray'
]
invalid_groups = [ # Case sensitive full group names that are invalid
'MP4'
]
class GroupParser(DataParser, GenericParser):
"""Parses the SCENE release group from the unparsed text."""
def parse(self) -> bool:
for pattern in patterns:
match = pattern.search(self.release.metadata_text)
if match:
if any(substr in match.group("group").lower() for substr in invalid_group_substrs):
continue # Skip this match if it contains any invalid substrings
if match.group("group").strip() in invalid_groups:
continue # Skip this match if it is in the list of invalid group names
if len(match.group("group").strip()) < 2:
continue # Skip groups that are too short to be valid
group = match.group("group").strip()
self.release.group = Group.from_string(group)
self.release.group_name = group
self.release.metadata_text = self.release.metadata_text[:match.span()[0]] + self.release.metadata_text[match.span()[1]:]
self.release.metadata_text = re.sub(r"\s+", " ", self.release.metadata_text).strip() # Clean up extra spaces
return True
return False

View File

@@ -0,0 +1,20 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import DynamicRange
checks = {
re.compile(r"hybrid|do?vi? ?hdr(?:10)?[\+p]?", re.IGNORECASE): DynamicRange.HYBRID,
re.compile(r"do?vi?", re.IGNORECASE): DynamicRange.DV,
re.compile(r"hdr10\+?", re.IGNORECASE): DynamicRange.HDR10P,
re.compile(r"hdr", re.IGNORECASE): DynamicRange.HDR
}
class DynamicRangeParser(CheckParser, GenericParser):
def __init__(self, release):
super().__init__(release)
self.checks = checks
self.remove_checks = [] # No remove checks for HDR
self.attribute_name = "dynamic_range"
self.default = DynamicRange.SDR # Default to SDR if no HDR indicators are found

View File

@@ -0,0 +1,17 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import Multi
checks = {
re.compile(r"multi", re.IGNORECASE): Multi.MULTI
}
class MultiParser(CheckParser, GenericParser):
def __init__(self, release):
super().__init__(release)
self.checks = checks
self.remove_checks = [] # No remove checks for multi
self.attribute_name = "multi"
self.default = Multi.NOT

View File

@@ -0,0 +1,73 @@
from typing import Optional, TYPE_CHECKING
from regex import Pattern
if TYPE_CHECKING:
from parsers.generic import WATFAG, Release
class DataParser:
"""Base class for all data parsers."""
def __init__(self, release: 'Release'):
self.release = release
self.priority = 50 # Default priority, can be overridden in subclasses
def __lt__(self, other):
return self.priority < other.priority
def parse(self) -> bool:
"""
Override this method in subclasses to implement the parsing logic.
Should return True if parsing was successful.
"""
raise NotImplementedError("Subclasses must implement the parse() method.")
class CheckParser(DataParser):
"""
Type of parser that checks many regex patterns and assigns a WATFAG enum value based on the first match found.
Will keep checking for redundant matches to remove all instances of the pattern,
but will only assign the WATFAG value once.
A remove_checks list can also be provided, which will remove any matches without assigning a WATFAG value.
This is useful for cases where a releaser might use a certain word that matches a WATFAG value,
but actually means something else and should not be scored as that WATFAG value.
"""
checks: dict[Pattern, 'WATFAG']
remove_checks: list[Pattern]
attribute_name: str # The name of the attribute to set on the release, e.g. "quality" or "source"
def __init__(self, release):
super().__init__(release)
self.default: Optional['WATFAG'] = None # Default value to assign if no matches are found, can be set in subclasses
def parse(self) -> bool:
parsed = False
for pattern, wf_value in self.checks.items():
found = False
while match := pattern.search(self.release.metadata_text):
found = True
if not hasattr(self.release, self.attribute_name) or getattr(self.release, self.attribute_name) is None:
setattr(self.release, self.attribute_name, wf_value)
self.release.metadata_text = (
self.release.metadata_text[:match.span()[0]] + self.release.metadata_text[match.span()[1]:]
)
self.release.metadata_text = self.release.metadata_text.strip() # Clean up extra spaces
if found:
parsed = True
for pattern in self.remove_checks:
while match := pattern.search(self.release.metadata_text):
self.release.metadata_text = (
self.release.metadata_text[:match.span()[0]] + self.release.metadata_text[match.span()[1]:]
)
self.release.metadata_text = self.release.metadata_text.strip() # Clean up extra spaces
if not parsed and self.default is not None:
setattr(self.release, self.attribute_name, self.default)
parsed = True
return parsed

View File

@@ -0,0 +1,18 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import Repack
checks = {
re.compile(r"repack", re.IGNORECASE): Repack.REPACK,
re.compile(r"proper", re.IGNORECASE): Repack.PROPER
}
class RepackParser(CheckParser, GenericParser):
def __init__(self, release):
super().__init__(release)
self.checks = checks
self.remove_checks = [] # No remove checks for repack
self.attribute_name = "repack"
self.default = Repack.NOT

View File

@@ -0,0 +1,25 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import Resolution
checks = {
re.compile(r"2160p", re.IGNORECASE): Resolution.UHD,
re.compile(r"1080p", re.IGNORECASE): Resolution.FHD,
re.compile(r"720p", re.IGNORECASE): Resolution.HD,
re.compile(r"576p", re.IGNORECASE): Resolution.SD_576,
re.compile(r"480p", re.IGNORECASE): Resolution.SD,
}
remove_checks = [ # Patterns that indicate a resolution-type word that should be removed without setting a resolution
# This is mostly used for releasers that use "UHD" to mean 1080p for some reason?
re.compile(r"UHD|4K", re.IGNORECASE)
]
class ResolutionParser(CheckParser, GenericParser):
def __init__(self, release):
super().__init__(release)
self.checks = checks
self.remove_checks = remove_checks
self.attribute_name = "quality"
self.priority = 30

View File

@@ -0,0 +1,17 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import SeedStatus
class SeederParser(CheckParser, GenericParser):
def parse(self) -> bool:
# Check if the seeders attribute is present and greater than 0
if self.release.seeders == 0:
self.release.seed_status = SeedStatus.ZERO
elif 1 <= self.release.seeders < 10:
self.release.seed_status = SeedStatus.LOW
else:
self.release.seed_status = SeedStatus.GOOD
return True

View File

@@ -0,0 +1,26 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import Source
checks = {
re.compile(r"remux", re.IGNORECASE): Source.REMUX,
re.compile(r"blu-?ray|bdrip|brrip", re.IGNORECASE): Source.BLURAY,
re.compile(r"web-?dl", re.IGNORECASE): Source.WEBDL,
re.compile(r"webrip|web", re.IGNORECASE): Source.WEBRIP,
re.compile(r"dvdrip|dvdscr|dvd", re.IGNORECASE): Source.DVDRIP,
re.compile(r"hdtv", re.IGNORECASE): Source.HDTV,
re.compile(r"hdrip", re.IGNORECASE): Source.HDRIP,
re.compile(r"camrip|tsrip|tc|cam", re.IGNORECASE): Source.CAM,
re.compile(r"screener|scr", re.IGNORECASE): Source.SCREENER
}
class SourceParser(CheckParser, GenericParser):
def __init__(self, release):
super().__init__(release)
self.priority = 40 # Set priority to 40 to ensure this parser runs before others that depend on source
self.checks = checks
self.remove_checks = [] # No remove checks for source
self.attribute_name = "source"
self.default = Source.UKNOWN

View File

@@ -0,0 +1,28 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import StreamingService
checks = {
re.compile(r"ATVP"): StreamingService.ATVP,
re.compile(r"NF"): StreamingService.NFLX,
re.compile(r"AMZN"): StreamingService.AMZN,
re.compile(r"DSNP"): StreamingService.DSNP,
re.compile(r"HMAX|MAX"): StreamingService.HMAX,
re.compile(r"HULU"): StreamingService.HULU,
re.compile(r"PCOK"): StreamingService.PCOK,
re.compile(r"PMTP|PTV"): StreamingService.PMTP,
re.compile(r"ROKU"): StreamingService.ROKU,
re.compile(r"TUBI"): StreamingService.TUBI,
re.compile(r"MGM[\+P]"): StreamingService.MGMP,
re.compile(r"iT"): StreamingService.ITUN,
re.compile(r"MA"): StreamingService.MOAN
}
class StreamingParser(CheckParser, GenericParser):
def __init__(self, release):
super().__init__(release)
self.checks = checks
self.remove_checks = [] # No remove checks for streaming service
self.attribute_name = "streaming"

View File

@@ -0,0 +1,20 @@
import regex as re
from parsers.generic import GenericParser
from parsers.generic.parsers import CheckParser
from parsers.generic.watfag import VideoCodec
checks = {
re.compile(r"avc|[hx][\. -]?264", re.IGNORECASE): VideoCodec.AVC,
re.compile(r"hevc|[hx][\. -]?265", re.IGNORECASE): VideoCodec.HEVC,
re.compile(r"vp9", re.IGNORECASE): VideoCodec.VP9,
re.compile(r"av1", re.IGNORECASE): VideoCodec.AV1,
re.compile(r"mpeg-4|mpeg4|mp4v|xvid", re.IGNORECASE): VideoCodec.XVID
}
class VideoCodecParser(CheckParser, GenericParser):
def __init__(self, release):
super().__init__(release)
self.checks = checks
self.remove_checks = [] # No remove checks for video codec
self.attribute_name = "video_codec"

View File

@@ -0,0 +1,211 @@
from enum import Enum
class WATFAG(Enum):
"""
Base class for all WATFAG attributes. Each attribute should inherit from this class and implement the weight()
method to provide a weight for the WATFAG score calculation. The value of each attribute can either be a single
string (in which case it gets a default score of 5) or a tuple of (string, score). The __str__ method returns
the string representation of the attribute, and the score property returns the score for the attribute.
"""
def __str__(self):
if not isinstance(self.value, tuple):
return self.value
return self.value[0]
@property
def score(self) -> float:
"""
Each WATFAG attribute can either have a single string value (in which case it gets a default score of 5)
or a tuple of (string, score). This property returns the score for the attribute.
"""
if not isinstance(self.value, tuple):
return 5 * self.weight()
return self.value[1] * self.weight()
@staticmethod
def weight() -> float:
"""Override this method in subclasses to provide a weight for the WATFAG score calculation."""
return 0.5
@classmethod
def from_string(cls, text: str):
for member in cls:
if member.value[0].lower() == text.lower():
return member
raise ValueError(f"No matching {cls.__name__} for string: {text}")
class Resolution(WATFAG):
"""
4K is ideal, only SD should be actively worse than average.
"""
UHD = "2160p", 10
FHD = "1080p", 5
HD = "720p", 5
SD_576 = "576p", 2
SD = "480p", 2
@staticmethod
def weight() -> float:
return 0.5
class Source(WATFAG):
"""
CAM and Screener should be actively very bad. WEB-DL is much better than WEBRip, and Blu-ray is best.
"""
BLURAY = "Blu-ray", 10
REMUX = "Remux", 3
WEBDL = "WEB-DL", 10
WEBRIP = "WEBRip", 5
DVDRIP = "DVD", 3
HDTV = "HDTV", 3
HDRIP = "HDRip", 3
CAM = "CAM", 1
SCREENER = "Screener", 1
UKNOWN = "Unknown", 5
@staticmethod
def weight() -> float:
return 0.7
class StreamingService(WATFAG):
"""
Most services are pretty similar in quality, however Amazon and Apple TV+ are typically higher quality overall.
"""
AMZN = "Amazon Prime Video", 10
ATVP = "Apple TV+", 10
DSNP = "Disney+", 5
HMAX = "HBO Max", 5
HULU = "Hulu", 5
MGMP = "MGM+", 5
MOAN = "MoviesAnywhere (Disney)", 5
NFLX = "Netflix", 7
PCOK = "Peacock", 5
PMTP = "Paramount TV+", 5
ROKU = "Roku Channel", 5
TUBI = "Tubi", 5
ITUN = "iTunes", 8
class VideoCodec(WATFAG):
"""
HEVC is vastly superior. AV1 encodes are rated lower due to playback issues with some Plex clients.
"""
AVC = "H.264", 5
HEVC = "HEVC", 10
VP9 = "VP9", 5
AV1 = "AV1", 3
XVID = "XviD", 5
class AudioCodec(WATFAG):
"""
Lossless codecs TrueHD and DTS-HD MA are the best. FLAC, though lossless, is a bad choice for movies, mostly due
to very large file sizes.
"""
AAC = "AAC", 3
DTS = "DTS", 5
DTS_MA = "DTS-HD Master Audio", 10
DD = "Dolby Digital", 5
DDP = "Dolby Digital Plus", 5
TRUEHD = "Dolby TrueHD", 10
FLAC = "FLAC", 3
MP3 = "MP3", 1
OPUS = "Opus", 5
WAV = "WAV", 1
UKNOWN = "Unknown", 5
@staticmethod
def weight() -> float:
return 0.7
class AudioLayout(WATFAG):
"""
Atmos is preferred over non-Atmos, number of channels is mostly unimportant.
"""
MONO = "Mono", 1
STEREO = "Stereo", 5
SURROUND = "5.1", 5
SURROUND_A = "5.1 + Atmos", 8
SURROUND_SIDE = "7.1", 7
SURROUND_SIDE_A = "7.1 + Atmos", 10
UKNOWN = "Unknown", 5
@staticmethod
def weight() -> float:
return 0.8
class DynamicRange(WATFAG):
"""
Dolby Vision alone causes playback issues on many devices, so hybrid should be preferred whenever available.
HDR10+ does not seem to cause any playback issues.
"""
SDR = "SDR", 5
HDR = "HDR", 8
HDR10P = "HDR10+", 8
DV = "Dolby Vision", 1
HYBRID = "Hybrid HDR10+ Dolby Vision", 10
@staticmethod
def weight() -> float:
return 0.7
class Repack(WATFAG):
"""
Repacks are generally preferred, but not as important as other factors. Same with proper releases, which are often
just repacks with a different name.
"""
REPACK = "Repack", 10
PROPER = "Proper", 8
NOT = "Not Repack", 5
@staticmethod
def weight() -> float:
return 0.4
class Multi(WATFAG):
"""
Multi releases generally include all extra audio dub tracks, which drive up file size.
"""
MULTI = "MULTI", 2
NOT = "Not MULTI", 5
class SeedStatus(WATFAG):
"""
Having 0 seeders is an issue, 10 or above is preferred
"""
ZERO = "0 seeders", 1
LOW = "1-9 seeders", 5
GOOD = "10+ seeders", 10
@staticmethod
def weight() -> float:
return 0.9
class Group(WATFAG):
"""
Groups should be added here frequently as they are encountered.
"""
FLUX = "FLUX", 10 # Very good WEB-DL releases and fast
HONE = "HONE", 10 # High quality re-encodes
PHOCIS = "PHOCiS", 8 # Same as FLUX
LEGION = "LEGi0N", 8 # Same as FLUX
AOC = "AOC", 1 # Often low quality CAM releases. While fast, not worth it for most movies.
ETHEL = "ETHEL", 4 # WEB-DL releases, not tagged or named very well.
OTHER = "Other", 5
@staticmethod
def weight() -> float:
return 1.0
@classmethod
def from_string(cls, text: str):
for member in cls:
if member.value[0].lower() in text.lower():
return member
return cls.OTHER
__all__ = [cls.__name__ for cls in WATFAG.__subclasses__()]

View File

@@ -0,0 +1,58 @@
import importlib
from pathlib import Path
from pkgutil import iter_modules
from typing import Optional
from parsers.generic import Release, ParserManager
from parsers.generic.watfag import *
class MovieRelease(Release):
"""Holds info representing a release of a movie."""
def __init__(self, unparsed_text, dl_link, **kwargs):
super().__init__(unparsed_text, dl_link, **kwargs)
self.title: str = ""
self.year: int = 0
self.edition: Optional[str] = None
self.group: Optional[Group] = None
self.group_name: Optional[str] = None
self.quality: Optional[Resolution] = None
self.source: Optional[Source] = None
self.streaming: Optional[StreamingService] = None
self.video_codec: Optional[VideoCodec] = None
self.audio_codec: Optional[AudioCodec] = None
self.audio_layout: Optional[AudioLayout] = None
self.dynamic_range: Optional[DynamicRange] = None
self.repack: Optional[Repack] = None
self.multi: Optional[Multi] = None
def __str__(self):
parts = [f"{self.title} ({self.year})" + (f" [{self.edition}]" if self.edition else "")]
for attr in ['quality', 'video_codec', 'audio_codec', 'audio_layout', 'dynamic_range', 'repack', 'multi', 'source']:
value = getattr(self, attr)
parts.append(f"{attr.capitalize()}: {value if value else 'Unknown'}")
if self.streaming:
parts.append(f"Streaming: {self.streaming}")
parts.append(f"Group: {self.group_name if self.group else 'Unknown'}")
if not self.fully_consumed():
parts.append(f"Unparsed: {self.metadata_text}")
parts.append(f"WATFAG: {self.watfag:.2f}")
return " | ".join(parts)
class MovieParser:
"""
This class can be inherited by any parser that is specific to movies.
It allows dynamic importing of parser classes and provides a method to run all parsers on a given movie release.
"""
class MovieParserManager(ParserManager):
"""Parses movie releases."""
def collect_parsers(self):
"""Dynamically imports all movie parsers."""
super().collect_parsers()
package_dir = Path(__file__).parent
for _, module_name, _ in iter_modules([package_dir]):
importlib.import_module(f"{__package__}.{module_name}")
self.parsers.extend(MovieParser.__subclasses__())

View File

@@ -0,0 +1,22 @@
import regex as re
from parsers.generic.parsers import DataParser
from parsers.movie import MovieParser, MovieRelease
from parsers.movie.title_year import edition_regex
class EditionParser(DataParser, MovieParser):
def __init__(self, movie: MovieRelease):
super().__init__(movie)
self.priority = 90
"""Parses the edition from the unparsed text."""
def parse(self) -> bool:
if self.release.edition is None:
if match := re.compile(edition_regex).search(self.release.metadata_text):
self.release.edition = match.group(1).strip()
self.release.metadata_text = self.release.metadata_text[:match.span()[0]] + self.release.metadata_text[match.span()[1]:]
self.release.metadata_text = re.sub(r"\s+", " ", self.release.metadata_text).strip() # Clean up extra spaces
return True

View File

@@ -0,0 +1,61 @@
import regex as re
from parsers.generic.parsers import DataParser
from parsers.movie import MovieParser, MovieRelease
# Shamelessly stolen from Radarr: https://github.com/Radarr/Radarr/blob/develop/src/NzbDrone.Core/Parser/Parser.cs
edition_regex = r"\(?\b(?P<edition>(((Recut.|Extended.|Ultimate.)?(Director.?s|Collector.?s|Theatrical|Ultimate|Extended|Despecialized|(Special|Rouge|Final|Assembly|Imperial|Diamond|Signature|Hunter|Rekall)(?=(.(Cut|Edition|Version)))|\d{2,3}(th)?.Anniversary)(?:.(Cut|Edition|Version))?(.(Extended|Uncensored|Remastered|Unrated|Uncut|Open.?Matte|IMAX|Fan.?Edit))?|((Uncensored|Remastered|Unrated|Uncut|Open?.Matte|IMAX|Fan.?Edit|Restored|((2|3|4)in1))))))\b\)?"
patterns = [
re.compile( # Special or funny edition movies
r"^(?P<title>(?![(\[]).+?)?(?:(?:[-_\W](?<![)\[!]))*"
+ edition_regex
+ r".{1,3}(?P<year>(1(8|9)|20)\d{2}(?!p|i|\d+|]|\W\d+)))+(\W+|_|$)(?!\\)",
re.IGNORECASE | re.UNICODE
),
re.compile(
r"^(?P<title>(?![(\[]).+?)?(?:(?:[-_\W](?<![)\[!]))*"
r"(?P<year>(1(8|9)|20)\d{2}(?!p|i|\d+|]|\W\d+)))+(\W+|_|$)(?!\\)"
+ edition_regex,
re.IGNORECASE | re.UNICODE
),
re.compile( # Normal movie format, will match 98% of movies
r"^(?P<title>(?![(\[]).+?)?(?:(?:[-_\W](?<![)\[!]))*"
r"(?P<year>(1(8|9)|20)\d{2}(?!p|i|(1(8|9)|20)\d{2}|]|\W(1(8|9)|20)\d{2})))+(\W+|_|$)(?!\\)",
re.IGNORECASE | re.UNICODE
),
re.compile( # Movies with scene name directly after them
r"^(?P<title>.+?)?(?:(?:[-_\W](?<![()\[!]))*(?P<year>(\[\w *\])))+(\W+|_|$)(?!\\)",
re.IGNORECASE | re.UNICODE
),
re.compile( # Movies with year in square brackets (for some reason)
r"^(?P<title>(?![(\[]).+?)?(?:(?:[-_\W](?<![)!]))*"
r"(?P<year>(1(8|9)|20)\d{2}(?!p|i|\d+|\W\d+)))+(\W+|_|$)(?!\\)",
re.IGNORECASE | re.UNICODE
),
re.compile( # Movies with brackets in their title, potentially
r"^(?P<title>.+?)?(?:(?:[-_\W](?<![)\[!]))*"
r"(?P<year>(1(8|9)|20)\d{2}(?!p|i|\d+|\]|\W\d+)))+(\W+|_|$)(?!\\)",
re.IGNORECASE | re.UNICODE
)
]
class TitleYearParser(DataParser, MovieParser):
def __init__(self, movie: MovieRelease):
super().__init__(movie)
self.priority = 0 # First parser to run
"""Parses the title and year from the unparsed text."""
def parse(self) -> bool:
for pattern in patterns:
match = pattern.match(self.release.original_text)
if match:
self.release.title = match.group("title").replace(".", " ").replace("_", " ").strip() if match.group("title") else ""
self.release.year = int(match.group("year")) if match.group("year") else 0
self.release.edition = match.group("edition") if "edition" in match.groupdict() and match.group("edition") else ""
self.release.metadata_text = self.release.original_text[:match.span()[0]] + self.release.original_text[match.span()[1]:]
return True
return False

View File

@@ -0,0 +1,57 @@
import importlib
from pathlib import Path
from pkgutil import iter_modules
from typing import Optional
from parsers.generic import Release, ParserManager
from parsers.generic.watfag import *
class TVBoxSetRelease(Release):
"""Holds info representing a release of a TV box set."""
def __init__(self, unparsed_text, dl_link, **kwargs):
super().__init__(unparsed_text, dl_link, **kwargs)
self.show_title: str = ""
self.seasons: Optional[str] = None
self.group: Optional[Group] = None
self.group_name: Optional[str] = None
self.quality: Optional[Resolution] = None
self.source: Optional[Source] = None
self.streaming: Optional[StreamingService] = None
self.video_codec: Optional[VideoCodec] = None
self.audio_codec: Optional[AudioCodec] = None
self.audio_layout: Optional[AudioLayout] = None
self.dynamic_range: Optional[DynamicRange] = None
self.repack: Optional[Repack] = None
self.multi: Optional[Multi] = None
def __str__(self):
parts = [f"{self.show_title} (Seasons: {self.seasons})"]
for attr in ['quality', 'video_codec', 'audio_codec', 'audio_layout', 'dynamic_range', 'repack', 'multi', 'source']:
value = getattr(self, attr)
parts.append(f"{attr.capitalize()}: {value if value else 'Unknown'}")
if self.streaming:
parts.append(f"Streaming: {self.streaming}")
parts.append(f"Group: {self.group_name if self.group else 'Unknown'}")
if not self.fully_consumed():
parts.append(f"Unparsed: {self.metadata_text}")
parts.append(f"WATFAG: {self.watfag:.2f}")
return " | ".join(parts)
class TVBoxSetParser:
"""
This class can be inherited by any parser that is specific to TV box sets.
It allows dynamic importing of parser classes and provides a method to run all parsers on a given TV box set release.
"""
class TVBoxSetParserManager(ParserManager):
"""Parses TV box set releases."""
def collect_parsers(self):
"""Dynamically imports all TV box set parsers."""
super().collect_parsers()
package_dir = Path(__file__).parent
for _, module_name, _ in iter_modules([package_dir]):
importlib.import_module(f"{__package__}.{module_name}")
self.parsers.extend(TVBoxSetParser.__subclasses__())

View File

@@ -0,0 +1,57 @@
import regex as re
from parsers.generic.parsers import DataParser
from parsers.tvboxset import TVBoxSetParser, TVBoxSetRelease
patterns = [
re.compile( # Show Name S01-S02 (year)
r"^(?P<title>.+?)[-_. ]S(?:eason)?s?(?P<season_start>[0-9]{1,2}) ?[-_. ] ?(?:S(?:eason)?)?"
r"(?P<season_end>[0-9]{1,2})[-_. ]\(?(1(8|9)|20)\d{2}(?!p|i|(1(8|9)|20)\d{2}|\]|\W(1(8|9)|20)\d{2})\)?",
re.IGNORECASE | re.UNICODE
),
re.compile( # Show Name (year) S01-S02
r"^(?<title>.+?)[-_. ]\(?(1(8|9)|20)\d{2}(?!p|i|(1(8|9)|20)\d{2}|\]|\W(1(8|9)|20)\d{2})\)?[-_. ]S(?:eason)?s?"
r"(?<season_start>[0-9]{1,2}) ?[-_. ] ?(?:S(?:eason)?)?(?<season_end>[0-9]{1,2})[-_. ]",
re.IGNORECASE | re.UNICODE
),
re.compile( # Show Name (year) S01
r"^(?<title>.+?)[-_. ]\(?(1(8|9)|20)\d{2}(?!p|i|(1(8|9)|20)\d{2}|\]|\W(1(8|9)|20)\d{2})\)?[-_. ]"
r"S(?:eason)?s? ?(?<season_start>[0-9]{1,2}) ?[-_. ]",
re.IGNORECASE | re.UNICODE
),
re.compile( # Show Name S01 (year)
r"^(?<title>.+?)[-_. ]S(?:eason)?s? ?(?<season_start>[0-9]{1,2}) ?[-_. ]"
r"\(?(1(8|9)|20)\d{2}(?!p|i|(1(8|9)|20)\d{2}|\]|\W(1(8|9)|20)\d{2})\)?[-_. ]",
re.IGNORECASE | re.UNICODE
),
re.compile( # Show Name (Complete) S01-S02
r"^(?<title>.+?)[-_. ](?:Complete[-_. ]?(?:Series[-_. ])?)?\(?S(?:eason)?s?(?<season_start>[0-9]{1,2})"
r" ?[-_. ] ?(?:S(?:eason)?)?(?<season_end>[0-9]{1,2})\)?[-_. ](?:Complete[-_. ])?",
re.IGNORECASE | re.UNICODE
),
re.compile( # Nuclear option: Show Name S01
r"^(?<title>.+?)[-_. ]S(?:eason)?s? ?(?<season_start>[0-9]{1,2}) ?[-_. ]",
re.IGNORECASE | re.UNICODE
)
]
class TitleSeasonsParser(DataParser, TVBoxSetParser):
def __init__(self, release: TVBoxSetRelease):
super().__init__(release)
self.priority = 0 # First parser to run
"""Parses the title and seasons from the unparsed text."""
def parse(self) -> bool:
for pattern in patterns:
match = pattern.match(self.release.original_text)
if match:
self.release.show_title = match.group("title").replace(".", " ").replace("_", " ").strip() if match.group("title") else ""
season_start = int(match.group("season_start")) if match.group("season_start") else 0
season_end = int(match.group("season_end")) if "season_end" in match.groupdict() and match.group("season_end") else season_start
self.release.seasons = f"{season_start}" if season_start == season_end else f"{season_start}-{season_end}"
self.release.metadata_text = self.release.original_text[:match.span()[0]] + self.release.original_text[match.span()[1]:]
return True
return False

67
src/watfag/search.py Normal file
View File

@@ -0,0 +1,67 @@
from xml.etree import ElementTree
from httpx import AsyncClient
from parsers.generic import Release
from parsers.movie import MovieRelease, MovieParserManager
from parsers.tvboxset import TVBoxSetRelease, TVBoxSetParserManager
class Jackett:
def __init__(self, api_key, base_url):
self.api_key = api_key
self.base_url = base_url
self.movie_parser = MovieParserManager()
self.tvboxset_parser = TVBoxSetParserManager()
async def get_capabilities(self):
params = {
"apikey": self.api_key,
"t": "caps",
}
async with AsyncClient() as client:
response = await client.get(self.base_url, params=params, timeout=30)
response.raise_for_status()
return response.text
async def search(self, query) -> list[Release]:
params = {
"apikey": self.api_key,
"t": "search",
"q": query,
}
async with AsyncClient() as client:
response = await client.get(self.base_url, params=params, timeout=30)
response.raise_for_status()
results = ElementTree.fromstring(response.text)
releases = []
for item in results.find('channel').findall('item'):
# Get the torznab attributes
attrs: dict[str, list[str]] = {}
for attr in item.findall('torznab:attr', namespaces={'torznab': 'http://torznab.com/schemas/2015/feed'}):
attrs[attr.get('name')] = attrs.get(attr.get('name'), []) + [attr.get('value')]
# Find out from categories what kind of result this is
if any(cat.startswith('2') for cat in attrs.get('category')): # This is a movie
release = MovieRelease(
item.find('title').text,
item.find('link').text,
size=int(item.find('size').text),
seeders=int(attrs.get('seeders')[0]),
view_link=item.find('comments').text
)
self.movie_parser.run_parsers(release)
releases.append(release)
elif any(cat == '100027' for cat in attrs.get('category')): # This is a TV boxset
release = TVBoxSetRelease(
item.find('title').text,
item.find('link').text,
size=int(item.find('size').text),
seeders=int(attrs.get('seeders')[0]),
view_link=item.find('comments').text
)
self.tvboxset_parser.run_parsers(release)
releases.append(release)
return releases