import: anime utility scripts from /storage/scripts/
Перенос из vespaserver:/storage/scripts/ перед удалением оригиналов. Скрипты разовые — клонируются по мере необходимости.
This commit is contained in:
parent
67b460d40c
commit
70814ddc1f
5 changed files with 1918 additions and 2 deletions
31
README.md
31
README.md
|
|
@ -1,3 +1,30 @@
|
||||||
# utility-scripts
|
# Utility Scripts
|
||||||
|
|
||||||
One-shot utility scripts (anime tagging, cover downloads). Pull when needed.
|
Одноразовые утилиты для homelab Vespa. Не предназначены для постоянной работы — запускаются при необходимости.
|
||||||
|
|
||||||
|
## Скрипты
|
||||||
|
|
||||||
|
### `rename_anime.py`
|
||||||
|
Массовое переименование файлов аниме (приведение к стандартному формату).
|
||||||
|
|
||||||
|
### `tag_anime.py`
|
||||||
|
Тегирование аниме-файлов через MyAnimeList/Jikan API.
|
||||||
|
|
||||||
|
### `download_anime_covers.py`
|
||||||
|
Скачивание обложек аниме напрямую с источников.
|
||||||
|
|
||||||
|
### `download_anime_covers_proxy.py`
|
||||||
|
Версия с прокси (через xray на vespaserver: `http://192.168.31.216:1080`).
|
||||||
|
|
||||||
|
## Использование
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git clone https://git.vespahomelab.ru/shu/utility-scripts.git
|
||||||
|
cd utility-scripts
|
||||||
|
python3 -m pip install -r requirements.txt # если будет создан
|
||||||
|
python3 <script>.py
|
||||||
|
```
|
||||||
|
|
||||||
|
## Контекст
|
||||||
|
|
||||||
|
Перенесены сюда из `/storage/scripts/` на vespaserver в рамках реструктуризации homelab (см. Obsidian project `Реструктуризация-homelab`, 2026-04-29).
|
||||||
|
|
|
||||||
367
download_anime_covers.py
Normal file
367
download_anime_covers.py
Normal file
|
|
@ -0,0 +1,367 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Скачивает обложки аниме через Jikan API (jikan.moe) и встраивает их в аудиофайлы.
|
||||||
|
|
||||||
|
Название аниме берётся из первых квадратных скобок в названии файла:
|
||||||
|
TV-2 OP01 - Nobody Knows (Suga Shikao) [xxxHOLiC].mp3
|
||||||
|
^^^^^^^^
|
||||||
|
извлекаем это
|
||||||
|
|
||||||
|
Jikan API - бесплатный API без аутентификации (парсит MyAnimeList)
|
||||||
|
https://jikan.moe/
|
||||||
|
|
||||||
|
Использование:
|
||||||
|
python3 download_anime_covers.py run --dry-run /path/to/music
|
||||||
|
python3 download_anime_covers.py run /path/to/music
|
||||||
|
python3 download_anime_covers.py search "xxxHOLiC" # Поиск обложки для теста
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import requests
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, Tuple, Dict
|
||||||
|
|
||||||
|
try:
|
||||||
|
from mutagen.flac import FLAC
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
|
from mutagen.oggvorbis import OggVorbis
|
||||||
|
from mutagen.oggopus import OggOpus
|
||||||
|
from mutagen.mp4 import MP4
|
||||||
|
from mutagen.id3 import APIC, ID3
|
||||||
|
except ImportError:
|
||||||
|
print("Установите mutagen: pip install mutagen")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
AUDIO_EXTENSIONS = {".flac", ".mp3", ".ogg", ".opus", ".m4a"}
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# КОНФИГУРАЦИЯ
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
# Jikan API (бесплатный, без ключа)
|
||||||
|
JIKAN_BASE_URL = "https://api.jikan.moe/v4"
|
||||||
|
|
||||||
|
# Кэш обложек (чтобы не скачивать одну и ту же много раз)
|
||||||
|
COVER_CACHE: Dict[str, bytes] = {}
|
||||||
|
|
||||||
|
# Задержки для API (Jikan требует ~2 секунды между запросами)
|
||||||
|
JIKAN_DELAY = 2.0
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# ФУНКЦИИ
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def extract_anime_name(filepath: Path) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Извлекает название аниме из первых квадратных скобок в названии файла.
|
||||||
|
|
||||||
|
Пример:
|
||||||
|
"TV-2 OP01 - Nobody Knows (Suga) [xxxHOLiC].mp3" → "xxxHOLiC"
|
||||||
|
"Song [Fullmetal Alchemist Brotherhood].flac" → "Fullmetal Alchemist Brotherhood"
|
||||||
|
"""
|
||||||
|
stem = filepath.stem
|
||||||
|
|
||||||
|
# Ищем первые квадратные скобки
|
||||||
|
match = re.search(r'\[([^\]]+)\]', stem)
|
||||||
|
if match:
|
||||||
|
return match.group(1).strip()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class JikanAPI:
|
||||||
|
"""Класс для работы с Jikan API (jikan.moe)"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.base_url = JIKAN_BASE_URL
|
||||||
|
self.session = requests.Session()
|
||||||
|
self.last_request_time = 0
|
||||||
|
|
||||||
|
# Jikan требует User-Agent
|
||||||
|
self.session.headers.update({
|
||||||
|
'User-Agent': 'Anime-Cover-Downloader/1.0 (shu@vespahomelab.ru)'
|
||||||
|
})
|
||||||
|
|
||||||
|
def _rate_limit(self):
|
||||||
|
"""Соблюдает лимиты API Jikan (~2 секунды между запросами)"""
|
||||||
|
elapsed = time.time() - self.last_request_time
|
||||||
|
if elapsed < JIKAN_DELAY:
|
||||||
|
time.sleep(JIKAN_DELAY - elapsed)
|
||||||
|
self.last_request_time = time.time()
|
||||||
|
|
||||||
|
def search_anime(self, query: str, limit: int = 5) -> list:
|
||||||
|
"""Ищет аниме по названию"""
|
||||||
|
self._rate_limit()
|
||||||
|
|
||||||
|
url = f"{self.base_url}/anime"
|
||||||
|
params = {
|
||||||
|
'q': query,
|
||||||
|
'limit': limit
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = self.session.get(url, params=params, timeout=30)
|
||||||
|
response.raise_for_status()
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for item in data.get('data', []):
|
||||||
|
title = item.get('title', '')
|
||||||
|
title_english = item.get('title_english', '')
|
||||||
|
images = item.get('images', {})
|
||||||
|
jpg = images.get('jpg', {})
|
||||||
|
image_url = jpg.get('large', jpg.get('image_url', ''))
|
||||||
|
|
||||||
|
if image_url:
|
||||||
|
results.append({
|
||||||
|
'title': title,
|
||||||
|
'title_english': title_english,
|
||||||
|
'image_url': image_url,
|
||||||
|
'source': 'Jikan'
|
||||||
|
})
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Ошибка поиска в Jikan: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_anime_cover(self, anime_name: str) -> Optional[str]:
|
||||||
|
"""Получает URL обложки для аниме"""
|
||||||
|
results = self.search_anime(anime_name)
|
||||||
|
|
||||||
|
if results:
|
||||||
|
# Возвращаем первый результат (наиболее релевантный)
|
||||||
|
return results[0]['image_url']
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def download_image(url: str) -> Optional[bytes]:
|
||||||
|
"""Скачивает изображение по URL"""
|
||||||
|
try:
|
||||||
|
response = requests.get(url, timeout=30)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.content
|
||||||
|
except Exception as e:
|
||||||
|
print(f" Ошибка скачивания изображения: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_cover_image(anime_name: str, jikan_api: JikanAPI) -> Optional[bytes]:
|
||||||
|
"""
|
||||||
|
Получает обложку для аниме через Jikan API.
|
||||||
|
Использует кэш для повторных запросов.
|
||||||
|
"""
|
||||||
|
# Проверяем кэш
|
||||||
|
cache_key = anime_name.lower()
|
||||||
|
if cache_key in COVER_CACHE:
|
||||||
|
print(f" Найдено в кэше: {anime_name}")
|
||||||
|
return COVER_CACHE[cache_key]
|
||||||
|
|
||||||
|
print(f" Поиск обложки для: {anime_name}")
|
||||||
|
|
||||||
|
# Ищем через Jikan
|
||||||
|
image_url = jikan_api.get_anime_cover(anime_name)
|
||||||
|
if image_url:
|
||||||
|
print(f" Найдено: {image_url}")
|
||||||
|
image_data = download_image(image_url)
|
||||||
|
if image_data:
|
||||||
|
COVER_CACHE[cache_key] = image_data
|
||||||
|
return image_data
|
||||||
|
|
||||||
|
print(f" Обложка не найдена")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def embed_cover(filepath: Path, image_data: bytes, dry_run: bool = False) -> bool:
|
||||||
|
"""
|
||||||
|
Встраивает обложку в аудиофайл.
|
||||||
|
"""
|
||||||
|
if dry_run:
|
||||||
|
return True
|
||||||
|
|
||||||
|
ext = filepath.suffix.lower()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if ext == ".mp3":
|
||||||
|
audio = MP3(str(filepath), ID3=ID3)
|
||||||
|
if audio.tags is None:
|
||||||
|
audio.add_tags()
|
||||||
|
|
||||||
|
# Удаляем старую обложку если есть
|
||||||
|
audio.tags.delall('APIC')
|
||||||
|
|
||||||
|
# Добавляем новую
|
||||||
|
audio.tags.add(APIC(
|
||||||
|
encoding=3,
|
||||||
|
mime='image/jpeg',
|
||||||
|
type=3, # Front cover
|
||||||
|
desc='Cover',
|
||||||
|
data=image_data
|
||||||
|
))
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".flac":
|
||||||
|
audio = FLAC(str(filepath))
|
||||||
|
audio.clear_pictures()
|
||||||
|
audio.add_picture(image_data)
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".ogg":
|
||||||
|
audio = OggVorbis(str(filepath))
|
||||||
|
# Для Ogg Vorbis обложка хранится в METADATA_BLOCK_PICTURE
|
||||||
|
from mutagen.flac import Picture
|
||||||
|
pic = Picture()
|
||||||
|
pic.type = 3
|
||||||
|
pic.mime = 'image/jpeg'
|
||||||
|
pic.data = image_data
|
||||||
|
audio.add_picture(pic)
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".opus":
|
||||||
|
audio = OggOpus(str(filepath))
|
||||||
|
from mutagen.flac import Picture
|
||||||
|
pic = Picture()
|
||||||
|
pic.type = 3
|
||||||
|
pic.mime = 'image/jpeg'
|
||||||
|
pic.data = image_data
|
||||||
|
audio.add_picture(pic)
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".m4a":
|
||||||
|
audio = MP4(str(filepath))
|
||||||
|
audio['covr'] = [image_data]
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ✗ Ошибка встраивания обложки: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def process_directory(root_path: str, dry_run: bool = False):
|
||||||
|
"""Обработать директорию."""
|
||||||
|
root = Path(root_path)
|
||||||
|
|
||||||
|
if not root.is_dir():
|
||||||
|
print(f"Директория не найдена: {root_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Инициализируем Jikan API
|
||||||
|
jikan_api = JikanAPI()
|
||||||
|
|
||||||
|
prefix_str = "[DRY] " if dry_run else ""
|
||||||
|
processed = 0
|
||||||
|
skipped = 0
|
||||||
|
errors = 0
|
||||||
|
no_cover = 0
|
||||||
|
|
||||||
|
audio_files = sorted([
|
||||||
|
f for f in root.rglob("*")
|
||||||
|
if f.is_file() and f.suffix.lower() in AUDIO_EXTENSIONS
|
||||||
|
])
|
||||||
|
|
||||||
|
if not audio_files:
|
||||||
|
print("Аудиофайлы не найдены")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"{prefix_str}Найдено файлов: {len(audio_files)}\n")
|
||||||
|
|
||||||
|
# Группируем файлы по названию аниме для кэширования обложек
|
||||||
|
anime_covers: Dict[str, Optional[bytes]] = {}
|
||||||
|
|
||||||
|
for idx, filepath in enumerate(audio_files, 1):
|
||||||
|
anime_name = extract_anime_name(filepath)
|
||||||
|
|
||||||
|
if not anime_name:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"[{idx}/{len(audio_files)}] {filepath.name}")
|
||||||
|
|
||||||
|
# Получаем обложку (с кэшированием)
|
||||||
|
if anime_name not in anime_covers:
|
||||||
|
cover_data = get_cover_image(anime_name, jikan_api)
|
||||||
|
anime_covers[anime_name] = cover_data
|
||||||
|
else:
|
||||||
|
cover_data = anime_covers[anime_name]
|
||||||
|
print(f" Используем кэш для: {anime_name}")
|
||||||
|
|
||||||
|
if cover_data is None:
|
||||||
|
no_cover += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
success = embed_cover(filepath, cover_data, dry_run)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
processed += 1
|
||||||
|
if dry_run:
|
||||||
|
print(f" ✓ Будет встроена обложка")
|
||||||
|
else:
|
||||||
|
print(f" ✓ Обложка встроена")
|
||||||
|
else:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print(f" {'Будет обработано' if dry_run else 'Обработано'}: {processed}")
|
||||||
|
print(f" Пропущено (нет названия аниме): {skipped}")
|
||||||
|
print(f" Не найдено обложек: {no_cover}")
|
||||||
|
print(f" Ошибок: {errors}")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
|
||||||
|
def search_cover(anime_name: str):
|
||||||
|
"""Тестовый поиск обложки для аниме"""
|
||||||
|
jikan_api = JikanAPI()
|
||||||
|
|
||||||
|
print(f"Поиск обложки для: {anime_name}\n")
|
||||||
|
|
||||||
|
# Jikan
|
||||||
|
print("Jikan (MyAnimeList):")
|
||||||
|
results = jikan_api.search_anime(anime_name, limit=5)
|
||||||
|
for i, result in enumerate(results, 1):
|
||||||
|
print(f" {i}. {result['title']}")
|
||||||
|
if result['title_english']:
|
||||||
|
print(f" English: {result['title_english']}")
|
||||||
|
print(f" {result['image_url']}")
|
||||||
|
|
||||||
|
if not results:
|
||||||
|
print(" Не найдено")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Скачивание и встраивание обложек аниме")
|
||||||
|
sub = parser.add_subparsers(dest="command")
|
||||||
|
|
||||||
|
p_run = sub.add_parser("run", help="Обработать директорию")
|
||||||
|
p_run.add_argument("directory", help="Директория с музыкой")
|
||||||
|
p_run.add_argument("--dry-run", action="store_true", help="Показать что будет сделано без изменений")
|
||||||
|
|
||||||
|
p_search = sub.add_parser("search", help="Поиск обложки для теста")
|
||||||
|
p_search.add_argument("anime_name", help="Название аниме для поиска")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command == "search":
|
||||||
|
search_cover(args.anime_name)
|
||||||
|
elif args.command == "run":
|
||||||
|
process_directory(args.directory, args.dry_run)
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
389
download_anime_covers_proxy.py
Normal file
389
download_anime_covers_proxy.py
Normal file
|
|
@ -0,0 +1,389 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Скачивает обложки аниме через Jikan API (jikan.moe) и встраивает их в аудиофайлы.
|
||||||
|
Использует SOCKS5 прокси для доступа к API и скачивания изображений.
|
||||||
|
|
||||||
|
Название аниме берётся из первых квадратных скобок в названии файла:
|
||||||
|
TV-2 OP01 - Nobody Knows (Suga Shikao) [xxxHOLiC].mp3
|
||||||
|
^^^^^^^^
|
||||||
|
извлекаем это
|
||||||
|
|
||||||
|
Jikan API - бесплатный API без аутентификации (парсит MyAnimeList)
|
||||||
|
https://jikan.moe/
|
||||||
|
|
||||||
|
Использование:
|
||||||
|
python3 download_anime_covers_proxy.py run --dry-run /path/to/music
|
||||||
|
python3 download_anime_covers_proxy.py run /path/to/music
|
||||||
|
python3 download_anime_covers_proxy.py search "xxxHOLiC" # Поиск обложки для теста
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import requests
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, Tuple, Dict
|
||||||
|
|
||||||
|
try:
|
||||||
|
from mutagen.flac import FLAC
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
|
from mutagen.oggvorbis import OggVorbis
|
||||||
|
from mutagen.oggopus import OggOpus
|
||||||
|
from mutagen.mp4 import MP4
|
||||||
|
from mutagen.id3 import APIC, ID3
|
||||||
|
except ImportError:
|
||||||
|
print("Установите mutagen: pip install mutagen")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from urllib3.contrib.socks import SOCKSProxyManager
|
||||||
|
SOCKS_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
SOCKS_AVAILABLE = False
|
||||||
|
print("Warning: SOCKS proxy not available. Install: pip install requests[socks]")
|
||||||
|
|
||||||
|
AUDIO_EXTENSIONS = {".flac", ".mp3", ".ogg", ".opus", ".m4a"}
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# КОНФИГУРАЦИЯ
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
# Jikan API (бесплатный, без ключа)
|
||||||
|
JIKAN_BASE_URL = "https://api.jikan.moe/v4"
|
||||||
|
|
||||||
|
# Прокси для доступа к API (SOCKS5)
|
||||||
|
PROXY_URL = "socks5://192.168.31.216:1080"
|
||||||
|
|
||||||
|
# Кэш обложек (чтобы не скачивать одну и ту же много раз)
|
||||||
|
COVER_CACHE: Dict[str, bytes] = {}
|
||||||
|
|
||||||
|
# Задержки для API (Jikan требует ~2 секунды между запросами)
|
||||||
|
JIKAN_DELAY = 2.0
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# ФУНКЦИИ
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def extract_anime_name(filepath: Path) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Извлекает название аниме из первых квадратных скобок в названии файла.
|
||||||
|
"""
|
||||||
|
stem = filepath.stem
|
||||||
|
match = re.search(r'\[([^\]]+)\]', stem)
|
||||||
|
if match:
|
||||||
|
return match.group(1).strip()
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class JikanAPI:
|
||||||
|
"""Класс для работы с Jikan API (jikan.moe) через прокси"""
|
||||||
|
|
||||||
|
def __init__(self, proxy_url: str = None):
|
||||||
|
self.base_url = JIKAN_BASE_URL
|
||||||
|
self.session = self._create_session(proxy_url)
|
||||||
|
self.last_request_time = 0
|
||||||
|
|
||||||
|
def _create_session(self, proxy_url: str = None) -> requests.Session:
|
||||||
|
"""Создаёт сессию с прокси (если указан)"""
|
||||||
|
session = requests.Session()
|
||||||
|
|
||||||
|
# Jikan требует User-Agent
|
||||||
|
session.headers.update({
|
||||||
|
'User-Agent': 'Anime-Cover-Downloader/1.0 (shu@vespahomelab.ru)'
|
||||||
|
})
|
||||||
|
|
||||||
|
# Настраиваем прокси через SOCKS5
|
||||||
|
if proxy_url and SOCKS_AVAILABLE:
|
||||||
|
try:
|
||||||
|
proxy_manager = SOCKSProxyManager(proxy_url)
|
||||||
|
session.mount('http://', proxy_manager)
|
||||||
|
session.mount('https://', proxy_manager)
|
||||||
|
print(f" ✓ Прокси настроен: {proxy_url}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Warning: Не удалось настроить прокси: {e}. Будет использовано прямое соединение.")
|
||||||
|
|
||||||
|
return session
|
||||||
|
|
||||||
|
def _rate_limit(self):
|
||||||
|
"""Соблюдает лимиты API Jikan (~2 секунды между запросами)"""
|
||||||
|
elapsed = time.time() - self.last_request_time
|
||||||
|
if elapsed < JIKAN_DELAY:
|
||||||
|
time.sleep(JIKAN_DELAY - elapsed)
|
||||||
|
self.last_request_time = time.time()
|
||||||
|
|
||||||
|
def search_anime(self, query: str, limit: int = 5) -> list:
|
||||||
|
"""Ищет аниме по названию"""
|
||||||
|
self._rate_limit()
|
||||||
|
|
||||||
|
url = f"{self.base_url}/anime"
|
||||||
|
params = {
|
||||||
|
'q': query,
|
||||||
|
'limit': limit
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = self.session.get(url, params=params, timeout=30)
|
||||||
|
response.raise_for_status()
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for item in data.get('data', []):
|
||||||
|
title = item.get('title', '')
|
||||||
|
title_english = item.get('title_english', '')
|
||||||
|
images = item.get('images', {})
|
||||||
|
jpg = images.get('jpg', {})
|
||||||
|
image_url = jpg.get('large', jpg.get('image_url', ''))
|
||||||
|
|
||||||
|
if image_url:
|
||||||
|
results.append({
|
||||||
|
'title': title,
|
||||||
|
'title_english': title_english,
|
||||||
|
'image_url': image_url,
|
||||||
|
'source': 'Jikan'
|
||||||
|
})
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ✗ Ошибка поиска в Jikan: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_anime_cover(self, anime_name: str) -> Optional[str]:
|
||||||
|
"""Получает URL обложки для аниме"""
|
||||||
|
results = self.search_anime(anime_name)
|
||||||
|
|
||||||
|
if results:
|
||||||
|
return results[0]['image_url']
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def download_image(url: str, proxy_url: str = None) -> Optional[bytes]:
|
||||||
|
"""Скачивает изображение по URL через прокси"""
|
||||||
|
try:
|
||||||
|
session = requests.Session()
|
||||||
|
|
||||||
|
if proxy_url and SOCKS_AVAILABLE:
|
||||||
|
try:
|
||||||
|
proxy_manager = SOCKSProxyManager(proxy_url)
|
||||||
|
session.mount('http://', proxy_manager)
|
||||||
|
session.mount('https://', proxy_manager)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
response = session.get(url, timeout=30)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.content
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ✗ Ошибка скачивания изображения: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_cover_image(anime_name: str, jikan_api: JikanAPI, proxy_url: str = None) -> Optional[bytes]:
|
||||||
|
"""
|
||||||
|
Получает обложку для аниме через Jikan API.
|
||||||
|
Использует кэш для повторных запросов.
|
||||||
|
"""
|
||||||
|
# Проверяем кэш
|
||||||
|
cache_key = anime_name.lower()
|
||||||
|
if cache_key in COVER_CACHE:
|
||||||
|
print(f" ✓ Найдено в кэше: {anime_name}")
|
||||||
|
return COVER_CACHE[cache_key]
|
||||||
|
|
||||||
|
print(f" Поиск обложки для: {anime_name}")
|
||||||
|
|
||||||
|
# Ищем через Jikan
|
||||||
|
image_url = jikan_api.get_anime_cover(anime_name)
|
||||||
|
if image_url:
|
||||||
|
print(f" ✓ Найдено: {image_url}")
|
||||||
|
image_data = download_image(image_url, proxy_url)
|
||||||
|
if image_data:
|
||||||
|
COVER_CACHE[cache_key] = image_data
|
||||||
|
return image_data
|
||||||
|
|
||||||
|
print(f" ✗ Обложка не найдена")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def embed_cover(filepath: Path, image_data: bytes, dry_run: bool = False) -> bool:
|
||||||
|
"""Встраивает обложку в аудиофайл."""
|
||||||
|
if dry_run:
|
||||||
|
return True
|
||||||
|
|
||||||
|
ext = filepath.suffix.lower()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if ext == ".mp3":
|
||||||
|
audio = MP3(str(filepath), ID3=ID3)
|
||||||
|
if audio.tags is None:
|
||||||
|
audio.add_tags()
|
||||||
|
|
||||||
|
# Удаляем старую обложку если есть
|
||||||
|
audio.tags.delall('APIC')
|
||||||
|
|
||||||
|
# Добавляем новую
|
||||||
|
audio.tags.add(APIC(
|
||||||
|
encoding=3,
|
||||||
|
mime='image/jpeg',
|
||||||
|
type=3, # Front cover
|
||||||
|
desc='Cover',
|
||||||
|
data=image_data
|
||||||
|
))
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".flac":
|
||||||
|
audio = FLAC(str(filepath))
|
||||||
|
audio.clear_pictures()
|
||||||
|
audio.add_picture(image_data)
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".ogg":
|
||||||
|
audio = OggVorbis(str(filepath))
|
||||||
|
from mutagen.flac import Picture
|
||||||
|
pic = Picture()
|
||||||
|
pic.type = 3
|
||||||
|
pic.mime = 'image/jpeg'
|
||||||
|
pic.data = image_data
|
||||||
|
audio.add_picture(pic)
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".opus":
|
||||||
|
audio = OggOpus(str(filepath))
|
||||||
|
from mutagen.flac import Picture
|
||||||
|
pic = Picture()
|
||||||
|
pic.type = 3
|
||||||
|
pic.mime = 'image/jpeg'
|
||||||
|
pic.data = image_data
|
||||||
|
audio.add_picture(pic)
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".m4a":
|
||||||
|
audio = MP4(str(filepath))
|
||||||
|
audio['covr'] = [image_data]
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ✗ Ошибка встраивания обложки: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def process_directory(root_path: str, dry_run: bool = False):
|
||||||
|
"""Обработать директорию."""
|
||||||
|
root = Path(root_path)
|
||||||
|
|
||||||
|
if not root.is_dir():
|
||||||
|
print(f"Директория не найдена: {root_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Инициализируем Jikan API с прокси
|
||||||
|
jikan_api = JikanAPI(PROXY_URL)
|
||||||
|
|
||||||
|
prefix_str = "[DRY] " if dry_run else ""
|
||||||
|
processed = 0
|
||||||
|
skipped = 0
|
||||||
|
errors = 0
|
||||||
|
no_cover = 0
|
||||||
|
|
||||||
|
audio_files = sorted([
|
||||||
|
f for f in root.rglob("*")
|
||||||
|
if f.is_file() and f.suffix.lower() in AUDIO_EXTENSIONS
|
||||||
|
])
|
||||||
|
|
||||||
|
if not audio_files:
|
||||||
|
print("Аудиофайлы не найдены")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"{prefix_str}Найдено файлов: {len(audio_files)}\n")
|
||||||
|
|
||||||
|
# Группируем файлы по названию аниме для кэширования обложек
|
||||||
|
anime_covers: Dict[str, Optional[bytes]] = {}
|
||||||
|
|
||||||
|
for idx, filepath in enumerate(audio_files, 1):
|
||||||
|
anime_name = extract_anime_name(filepath)
|
||||||
|
|
||||||
|
if not anime_name:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"[{idx}/{len(audio_files)}] {filepath.name}")
|
||||||
|
|
||||||
|
# Получаем обложку (с кэшированием)
|
||||||
|
if anime_name not in anime_covers:
|
||||||
|
cover_data = get_cover_image(anime_name, jikan_api, PROXY_URL)
|
||||||
|
anime_covers[anime_name] = cover_data
|
||||||
|
else:
|
||||||
|
cover_data = anime_covers[anime_name]
|
||||||
|
print(f" ✓ Используем кэш для: {anime_name}")
|
||||||
|
|
||||||
|
if cover_data is None:
|
||||||
|
no_cover += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
success = embed_cover(filepath, cover_data, dry_run)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
processed += 1
|
||||||
|
if dry_run:
|
||||||
|
print(f" ✓ Будет встроена обложка")
|
||||||
|
else:
|
||||||
|
print(f" ✓ Обложка встроена")
|
||||||
|
else:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print(f" {'Будет обработано' if dry_run else 'Обработано'}: {processed}")
|
||||||
|
print(f" Пропущено (нет названия аниме): {skipped}")
|
||||||
|
print(f" Не найдено обложек: {no_cover}")
|
||||||
|
print(f" Ошибок: {errors}")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
|
||||||
|
def search_cover(anime_name: str):
|
||||||
|
"""Тестовый поиск обложки для аниме"""
|
||||||
|
jikan_api = JikanAPI(PROXY_URL)
|
||||||
|
|
||||||
|
print(f"Поиск обложки для: {anime_name}\n")
|
||||||
|
|
||||||
|
print("Jikan (MyAnimeList):")
|
||||||
|
results = jikan_api.search_anime(anime_name, limit=5)
|
||||||
|
for i, result in enumerate(results, 1):
|
||||||
|
print(f" {i}. {result['title']}")
|
||||||
|
if result['title_english']:
|
||||||
|
print(f" English: {result['title_english']}")
|
||||||
|
print(f" {result['image_url']}")
|
||||||
|
|
||||||
|
if not results:
|
||||||
|
print(" Не найдено")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Скачивание и встраивание обложек аниме (с прокси)")
|
||||||
|
sub = parser.add_subparsers(dest="command")
|
||||||
|
|
||||||
|
p_run = sub.add_parser("run", help="Обработать директорию")
|
||||||
|
p_run.add_argument("directory", help="Директория с музыкой")
|
||||||
|
p_run.add_argument("--dry-run", action="store_true", help="Показать что будет сделано без изменений")
|
||||||
|
|
||||||
|
p_search = sub.add_parser("search", help="Поиск обложки для теста")
|
||||||
|
p_search.add_argument("anime_name", help="Название аниме для поиска")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command == "search":
|
||||||
|
search_cover(args.anime_name)
|
||||||
|
elif args.command == "run":
|
||||||
|
process_directory(args.directory, args.dry_run)
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
209
rename_anime.py
Normal file
209
rename_anime.py
Normal file
|
|
@ -0,0 +1,209 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Добавляет название аниме (из директории) в конец названия файла перед расширением
|
||||||
|
и удаляет "[AniTousen]" из начала названия.
|
||||||
|
|
||||||
|
Пример:
|
||||||
|
Директория: Kuroshitsuji
|
||||||
|
Файл: [AniTousen] TV-1 ED01 - I'm ALIVE! (BECCA).mp3
|
||||||
|
|
||||||
|
Результат:
|
||||||
|
TV-1 ED01 - I'm ALIVE! (BECCA) [Kuroshitsuji].mp3
|
||||||
|
|
||||||
|
Использование:
|
||||||
|
python3 rename_anime.py test
|
||||||
|
python3 rename_anime.py run --dry-run /path/to/music
|
||||||
|
python3 rename_anime.py run /path/to/music
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
AUDIO_EXTENSIONS = {".flac", ".mp3", ".ogg", ".opus", ".m4a"}
|
||||||
|
|
||||||
|
|
||||||
|
def remove_ani_tousen(stem: str) -> str:
|
||||||
|
"""
|
||||||
|
Удаляет "[AniTousen]" из начала названия файла.
|
||||||
|
|
||||||
|
Было: [AniTousen] TV-1 ED01 - I'm ALIVE! (BECCA)
|
||||||
|
Стало: TV-1 ED01 - I'm ALIVE! (BECCA)
|
||||||
|
"""
|
||||||
|
# Проверяем начинается ли с [AniTousen]
|
||||||
|
if stem.startswith("[AniTousen] "):
|
||||||
|
return stem[len("[AniTousen] "):]
|
||||||
|
elif stem.startswith("[AniTousen]"):
|
||||||
|
return stem[len("[AniTousen]"):].lstrip()
|
||||||
|
return stem
|
||||||
|
|
||||||
|
|
||||||
|
def rename_file(filepath: Path, anime_source: str, dry_run: bool = False) -> bool:
|
||||||
|
"""
|
||||||
|
Переименовать файл: добавить название аниме в конец и удалить [AniTousen] из начала.
|
||||||
|
|
||||||
|
Было: [AniTousen] TV-1 ED01 - I'm ALIVE! (BECCA).mp3
|
||||||
|
Стало: TV-1 ED01 - I'm ALIVE! (BECCA) [Kuroshitsuji].mp3
|
||||||
|
"""
|
||||||
|
if dry_run:
|
||||||
|
return True
|
||||||
|
|
||||||
|
old_name = filepath.name
|
||||||
|
stem = filepath.stem
|
||||||
|
suffix = filepath.suffix
|
||||||
|
|
||||||
|
# Удаляем [AniTousen] из начала
|
||||||
|
new_stem = remove_ani_tousen(stem)
|
||||||
|
|
||||||
|
# Добавляем название аниме в конец
|
||||||
|
new_stem = f"{new_stem} [{anime_source}]"
|
||||||
|
|
||||||
|
new_name = f"{new_stem}{suffix}"
|
||||||
|
new_path = filepath.parent / new_name
|
||||||
|
|
||||||
|
# Проверяем, не существует ли уже файл с таким именем
|
||||||
|
if new_path.exists():
|
||||||
|
print(f" ⚠ Файл уже существует: {new_name}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
filepath.rename(new_path)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ✗ Ошибка переименования: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def process_directory(root_path: str, dry_run: bool = False):
|
||||||
|
"""Обработать директорию."""
|
||||||
|
root = Path(root_path)
|
||||||
|
|
||||||
|
if not root.is_dir():
|
||||||
|
print(f"Директория не найдена: {root_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
prefix = "[DRY] " if dry_run else ""
|
||||||
|
renamed = 0
|
||||||
|
errors = 0
|
||||||
|
skipped = 0
|
||||||
|
|
||||||
|
for anime_dir in sorted(root.iterdir()):
|
||||||
|
if not anime_dir.is_dir():
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Пропускаем скрытые директории
|
||||||
|
if anime_dir.name.startswith('.'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
anime_source = anime_dir.name
|
||||||
|
|
||||||
|
# Находим все аудиофайлы рекурсивно
|
||||||
|
audio_files = sorted([
|
||||||
|
f for f in anime_dir.rglob("*")
|
||||||
|
if f.is_file() and f.suffix.lower() in AUDIO_EXTENSIONS
|
||||||
|
])
|
||||||
|
|
||||||
|
if not audio_files:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"\n{prefix}📁 {anime_source} ({len(audio_files)} файлов)")
|
||||||
|
|
||||||
|
for filepath in audio_files:
|
||||||
|
# Проверяем, не добавлено ли уже название аниме
|
||||||
|
stem = filepath.stem
|
||||||
|
if stem.endswith(f" [{anime_source}]"):
|
||||||
|
skipped += 1
|
||||||
|
print(f" ⊘ {filepath.name} (уже переименован)")
|
||||||
|
continue
|
||||||
|
|
||||||
|
success = rename_file(filepath, anime_source, dry_run)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
renamed += 1
|
||||||
|
old_name = filepath.name
|
||||||
|
new_name = f"{filepath.stem} [{anime_source}]{filepath.suffix}"
|
||||||
|
print(f" ✓ {old_name}")
|
||||||
|
print(f" → {new_name}")
|
||||||
|
else:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print(f" {'Будет переименовано' if dry_run else 'Переименовано'}: {renamed}")
|
||||||
|
print(f" Пропущено (уже переименовано): {skipped}")
|
||||||
|
print(f" Ошибок: {errors}")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
|
||||||
|
def test():
|
||||||
|
"""Тест логики переименования."""
|
||||||
|
examples = [
|
||||||
|
# (старое_имя, название_папки, ожидаемое_новое_имя)
|
||||||
|
(
|
||||||
|
"[AniTousen] TV-1 ED01 - I'm ALIVE! (BECCA).mp3",
|
||||||
|
"Kuroshitsuji",
|
||||||
|
"TV-1 ED01 - I'm ALIVE! (BECCA) [Kuroshitsuji].mp3"
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"[AniTousen] TV-2 ED EP12 - Get up! (Asano Masumi).flac",
|
||||||
|
"Ikkitousen",
|
||||||
|
"TV-2 ED EP12 - Get up! (Asano Masumi) [Ikkitousen].flac"
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"[AniTousen] Movie ED - Song Name.wav",
|
||||||
|
"Your Name",
|
||||||
|
"Movie ED - Song Name [Your Name].wav"
|
||||||
|
),
|
||||||
|
# Файл без [AniTousen] - просто добавляем аниме
|
||||||
|
(
|
||||||
|
"TV OP - Song.mp3",
|
||||||
|
"Test Anime",
|
||||||
|
"TV OP - Song [Test Anime].mp3"
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
print("Тест переименования:\n")
|
||||||
|
ok = 0
|
||||||
|
|
||||||
|
for old_name, anime, expected_new in examples:
|
||||||
|
stem = Path(old_name).stem
|
||||||
|
suffix = Path(old_name).suffix
|
||||||
|
new_stem = f"{stem} [{anime}]"
|
||||||
|
new_name = f"{new_stem}{suffix}"
|
||||||
|
|
||||||
|
match = new_name == expected_new
|
||||||
|
status = "✓" if match else "✗"
|
||||||
|
if match:
|
||||||
|
ok += 1
|
||||||
|
|
||||||
|
print(f" {status} {old_name}")
|
||||||
|
print(f" → {new_name}")
|
||||||
|
if not match:
|
||||||
|
print(f" ОЖИДАЛОСЬ: {expected_new}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print(f"Пройдено: {ok}/{len(examples)}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Переименование аниме саундтреков")
|
||||||
|
sub = parser.add_subparsers(dest="command")
|
||||||
|
|
||||||
|
p_run = sub.add_parser("run", help="Переименовать файлы")
|
||||||
|
p_run.add_argument("directory", help="Директория с музыкой")
|
||||||
|
p_run.add_argument("--dry-run", action="store_true", help="Показать что будет сделано без изменений")
|
||||||
|
|
||||||
|
sub.add_parser("test", help="Тест логики")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command == "test":
|
||||||
|
test()
|
||||||
|
elif args.command == "run":
|
||||||
|
process_directory(args.directory, args.dry_run)
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
924
tag_anime.py
Normal file
924
tag_anime.py
Normal file
|
|
@ -0,0 +1,924 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Записывает теги и обложки в аудиофайлы аниме саундтреков.
|
||||||
|
|
||||||
|
Теги:
|
||||||
|
ANIME_SOURCE = название аниме (из директории)
|
||||||
|
ANIME_SOURCE_FULL = "название аниме, тип"
|
||||||
|
COMMENT = "название аниме, тип"
|
||||||
|
|
||||||
|
Обложка:
|
||||||
|
Определяет сезон/тип из имени файла (TV-2, Movie 3, OVA и т.д.)
|
||||||
|
Ищет соответствующий сезон через Jikan API (MyAnimeList)
|
||||||
|
Если не находит конкретный сезон — берёт обложку основного аниме
|
||||||
|
Обрезает до квадрата (центрированный кроп)
|
||||||
|
Кешируется в директории аниме (_raw.jpg и .jpg)
|
||||||
|
|
||||||
|
Использование:
|
||||||
|
python3 tag_anime.py test
|
||||||
|
python3 tag_anime.py run --dry-run /path/to/music
|
||||||
|
python3 tag_anime.py run /path/to/music
|
||||||
|
python3 tag_anime.py run --no-covers /path/to/music
|
||||||
|
python3 tag_anime.py run --proxy "socks5h://127.0.0.1:1080" /path/to/music
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import argparse
|
||||||
|
from pathlib import Path
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
try:
|
||||||
|
import requests
|
||||||
|
except ImportError:
|
||||||
|
print("pip install requests")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from PIL import Image
|
||||||
|
except ImportError:
|
||||||
|
print("pip install Pillow")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
import mutagen
|
||||||
|
from mutagen.flac import FLAC, Picture
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
|
from mutagen.oggvorbis import OggVorbis
|
||||||
|
from mutagen.oggopus import OggOpus
|
||||||
|
from mutagen.mp4 import MP4, MP4Cover
|
||||||
|
from mutagen.id3 import TXXX, APIC, COMM
|
||||||
|
except ImportError:
|
||||||
|
print("pip install mutagen")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
AUDIO_EXTENSIONS = {".flac", ".mp3", ".ogg", ".opus", ".m4a"}
|
||||||
|
|
||||||
|
JIKAN_SEARCH_URL = "https://api.jikan.moe/v4/anime"
|
||||||
|
JIKAN_RATE_LIMIT = 1.5
|
||||||
|
|
||||||
|
_last_jikan_request = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Обрезка изображения до квадрата
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def crop_to_square(image_data: bytes) -> bytes:
|
||||||
|
"""
|
||||||
|
Обрезает изображение до квадрата (центрированный кроп).
|
||||||
|
Возвращает JPEG bytes.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
img = Image.open(BytesIO(image_data))
|
||||||
|
w, h = img.size
|
||||||
|
|
||||||
|
if w == h:
|
||||||
|
# Уже квадратное — просто конвертируем в JPEG
|
||||||
|
buf = BytesIO()
|
||||||
|
img = img.convert("RGB")
|
||||||
|
img.save(buf, format="JPEG", quality=95)
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
# Центрированный кроп
|
||||||
|
side = min(w, h)
|
||||||
|
left = (w - side) // 2
|
||||||
|
top = (h - side) // 2
|
||||||
|
right = left + side
|
||||||
|
bottom = top + side
|
||||||
|
|
||||||
|
img = img.crop((left, top, right, bottom))
|
||||||
|
img = img.convert("RGB")
|
||||||
|
|
||||||
|
buf = BytesIO()
|
||||||
|
img.save(buf, format="JPEG", quality=95)
|
||||||
|
|
||||||
|
print(f" ✂ Обрезано: {w}x{h} → {side}x{side}")
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Ошибка обрезки: {e}")
|
||||||
|
return image_data
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Извлечение типа из имени файла
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def extract_type(filename: str) -> str:
|
||||||
|
"""
|
||||||
|
Берёт всё между '] ' и ' - '
|
||||||
|
[AniTousen] TV-1 ED01 - I'm ALIVE! (BECCA).mp3 → "TV-1 ED01"
|
||||||
|
"""
|
||||||
|
stem = Path(filename).stem
|
||||||
|
|
||||||
|
bracket_end = stem.find('] ')
|
||||||
|
if bracket_end < 0:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
after_bracket = stem[bracket_end + 2:]
|
||||||
|
|
||||||
|
dash_pos = after_bracket.find(' - ')
|
||||||
|
if dash_pos < 0:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
return after_bracket[:dash_pos].strip()
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Определение сезона/типа из строки типа
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def parse_media_info(track_type: str) -> dict:
|
||||||
|
"""
|
||||||
|
Парсит строку типа и возвращает информацию о медиа.
|
||||||
|
|
||||||
|
Примеры:
|
||||||
|
"TV-1 ED01" → {"media": "TV", "season": 1}
|
||||||
|
"TV-2 OP01" → {"media": "TV", "season": 2}
|
||||||
|
"TV ED01" → {"media": "TV", "season": None}
|
||||||
|
"Movie 1 ED01" → {"media": "Movie", "season": 1}
|
||||||
|
"OVA ED01" → {"media": "OVA", "season": None}
|
||||||
|
"ONA ED01" → {"media": "ONA", "season": None}
|
||||||
|
"Special ED01" → {"media": "Special", "season": None}
|
||||||
|
"Game 2 OP01" → {"media": "Game", "season": 2}
|
||||||
|
"""
|
||||||
|
if not track_type:
|
||||||
|
return {"media": None, "season": None}
|
||||||
|
|
||||||
|
m = re.match(
|
||||||
|
r'^(TV|Movie|OVA|ONA|Special|Game)(?:[-\s](\d+))?',
|
||||||
|
track_type,
|
||||||
|
re.IGNORECASE
|
||||||
|
)
|
||||||
|
|
||||||
|
if not m:
|
||||||
|
return {"media": None, "season": None}
|
||||||
|
|
||||||
|
media = m.group(1)
|
||||||
|
if media.lower() == "tv":
|
||||||
|
media = "TV"
|
||||||
|
elif media.lower() == "ova":
|
||||||
|
media = "OVA"
|
||||||
|
elif media.lower() == "ona":
|
||||||
|
media = "ONA"
|
||||||
|
else:
|
||||||
|
media = media.capitalize()
|
||||||
|
|
||||||
|
season = int(m.group(2)) if m.group(2) else None
|
||||||
|
|
||||||
|
return {"media": media, "season": season}
|
||||||
|
|
||||||
|
|
||||||
|
def build_search_queries(anime_name: str, media_info: dict) -> list:
|
||||||
|
"""
|
||||||
|
Формирует список поисковых запросов от конкретного к общему.
|
||||||
|
"""
|
||||||
|
queries = []
|
||||||
|
media = media_info.get("media")
|
||||||
|
season = media_info.get("season")
|
||||||
|
|
||||||
|
if media and season and season > 1:
|
||||||
|
ordinals = {2: "2nd", 3: "3rd", 4: "4th", 5: "5th", 6: "6th"}
|
||||||
|
roman = {2: "II", 3: "III", 4: "IV", 5: "V", 6: "VI"}
|
||||||
|
|
||||||
|
if media == "TV":
|
||||||
|
ordinal = ordinals.get(season, f"{season}th")
|
||||||
|
queries.append(f"{anime_name} {ordinal} Season")
|
||||||
|
queries.append(f"{anime_name} Season {season}")
|
||||||
|
queries.append(f"{anime_name} {season}")
|
||||||
|
if season in roman:
|
||||||
|
queries.append(f"{anime_name} {roman[season]}")
|
||||||
|
|
||||||
|
elif media == "Movie":
|
||||||
|
queries.append(f"{anime_name} Movie {season}")
|
||||||
|
queries.append(f"{anime_name} Movie")
|
||||||
|
|
||||||
|
elif media == "Game":
|
||||||
|
queries.append(f"{anime_name} Game {season}")
|
||||||
|
queries.append(f"{anime_name} Game")
|
||||||
|
|
||||||
|
else:
|
||||||
|
queries.append(f"{anime_name} {media} {season}")
|
||||||
|
queries.append(f"{anime_name} {media}")
|
||||||
|
|
||||||
|
elif media and media != "TV":
|
||||||
|
queries.append(f"{anime_name} {media}")
|
||||||
|
|
||||||
|
queries.append(anime_name)
|
||||||
|
|
||||||
|
seen = set()
|
||||||
|
unique = []
|
||||||
|
for q in queries:
|
||||||
|
if q not in seen:
|
||||||
|
seen.add(q)
|
||||||
|
unique.append(q)
|
||||||
|
|
||||||
|
return unique
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Поиск и загрузка обложки
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def jikan_rate_limit():
|
||||||
|
"""Соблюдать лимит запросов к Jikan API."""
|
||||||
|
global _last_jikan_request
|
||||||
|
now = time.time()
|
||||||
|
elapsed = now - _last_jikan_request
|
||||||
|
if elapsed < JIKAN_RATE_LIMIT:
|
||||||
|
time.sleep(JIKAN_RATE_LIMIT - elapsed)
|
||||||
|
_last_jikan_request = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_cover_url(query: str) -> str:
|
||||||
|
"""Найти URL обложки аниме через Jikan API."""
|
||||||
|
jikan_rate_limit()
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = requests.get(
|
||||||
|
JIKAN_SEARCH_URL,
|
||||||
|
params={"q": query, "limit": 5},
|
||||||
|
timeout=15,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
|
||||||
|
results = data.get("data", [])
|
||||||
|
if not results:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
best = results[0]
|
||||||
|
query_lower = query.lower()
|
||||||
|
|
||||||
|
for entry in results:
|
||||||
|
title = (entry.get("title", "") or "").lower()
|
||||||
|
title_en = (entry.get("title_english", "") or "").lower()
|
||||||
|
|
||||||
|
if query_lower == title or query_lower == title_en:
|
||||||
|
best = entry
|
||||||
|
break
|
||||||
|
|
||||||
|
images = best.get("images", {})
|
||||||
|
jpg = images.get("jpg", {})
|
||||||
|
|
||||||
|
for key in ("large_image_url", "image_url"):
|
||||||
|
url = jpg.get(key, "")
|
||||||
|
if url:
|
||||||
|
return url
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Jikan API ошибка: {e}")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def search_cover_with_fallback(anime_name: str, media_info: dict) -> tuple:
|
||||||
|
"""
|
||||||
|
Ищет обложку, пробуя запросы от конкретного к общему.
|
||||||
|
Возвращает (url, matched_query) или ("", "").
|
||||||
|
"""
|
||||||
|
queries = build_search_queries(anime_name, media_info)
|
||||||
|
|
||||||
|
for query in queries:
|
||||||
|
print(f" 🔍 Поиск: \"{query}\"")
|
||||||
|
url = fetch_cover_url(query)
|
||||||
|
if url:
|
||||||
|
print(f" ✓ Найдено по запросу: \"{query}\"")
|
||||||
|
return url, query
|
||||||
|
|
||||||
|
return "", ""
|
||||||
|
|
||||||
|
|
||||||
|
def download_image(url: str) -> bytes:
|
||||||
|
"""Скачать изображение по URL."""
|
||||||
|
try:
|
||||||
|
resp = requests.get(url, timeout=15)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.content
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Ошибка загрузки обложки: {e}")
|
||||||
|
return b""
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Кеш обложек
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def get_cache_base(media_info: dict) -> str:
|
||||||
|
"""
|
||||||
|
Базовое имя кеша без расширения.
|
||||||
|
|
||||||
|
cover — базовая обложка
|
||||||
|
cover_TV-2 — TV сезон 2
|
||||||
|
cover_Movie — Movie без номера
|
||||||
|
cover_Movie-1 — Movie 1
|
||||||
|
cover_OVA — OVA
|
||||||
|
"""
|
||||||
|
media = media_info.get("media")
|
||||||
|
season = media_info.get("season")
|
||||||
|
|
||||||
|
if not media or (media == "TV" and not season):
|
||||||
|
return "cover"
|
||||||
|
|
||||||
|
if season:
|
||||||
|
return f"cover_{media}-{season}"
|
||||||
|
else:
|
||||||
|
return f"cover_{media}"
|
||||||
|
|
||||||
|
|
||||||
|
def is_square(image_data: bytes) -> bool:
|
||||||
|
"""Проверить, квадратное ли изображение."""
|
||||||
|
try:
|
||||||
|
img = Image.open(BytesIO(image_data))
|
||||||
|
w, h = img.size
|
||||||
|
return w == h
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_cropped(image_data: bytes, cache_path: Path, dry_run: bool) -> bytes:
|
||||||
|
"""
|
||||||
|
Если изображение не квадратное — обрезать и перезаписать кеш.
|
||||||
|
Перед перезаписью сохранить оригинал в _raw.
|
||||||
|
"""
|
||||||
|
if not image_data:
|
||||||
|
return b""
|
||||||
|
|
||||||
|
if is_square(image_data):
|
||||||
|
return image_data
|
||||||
|
|
||||||
|
# Сохранить оригинал как _raw перед перезаписью
|
||||||
|
raw_path = cache_path.with_name(
|
||||||
|
cache_path.stem + "_raw" + cache_path.suffix
|
||||||
|
)
|
||||||
|
|
||||||
|
if not dry_run and not raw_path.exists():
|
||||||
|
try:
|
||||||
|
raw_path.write_bytes(image_data)
|
||||||
|
print(f" 💾 Оригинал сохранён: {raw_path.name}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Не удалось сохранить оригинал: {e}")
|
||||||
|
|
||||||
|
cropped = crop_to_square(image_data)
|
||||||
|
|
||||||
|
if not dry_run:
|
||||||
|
try:
|
||||||
|
cache_path.write_bytes(cropped)
|
||||||
|
print(f" 💾 Перезаписан обрезанным: {cache_path.name}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Не удалось перезаписать: {e}")
|
||||||
|
|
||||||
|
return cropped
|
||||||
|
|
||||||
|
|
||||||
|
def get_cover(anime_name: str, anime_dir: Path, media_info: dict,
|
||||||
|
dry_run: bool = False) -> bytes:
|
||||||
|
"""
|
||||||
|
Получить обложку (обрезанную до квадрата).
|
||||||
|
|
||||||
|
Логика кеша:
|
||||||
|
1. Есть cover_XX.jpg → проверить квадрат → если нет — обрезать, перезаписать
|
||||||
|
2. Есть cover_XX_raw.jpg → обрезать, сохранить .jpg
|
||||||
|
3. Нет ничего → скачать, сохранить _raw.jpg + обрезанный .jpg
|
||||||
|
4. Fallback на cover.jpg / cover_raw.jpg
|
||||||
|
"""
|
||||||
|
cache_base = get_cache_base(media_info)
|
||||||
|
cache_cropped = anime_dir / f"{cache_base}.jpg"
|
||||||
|
cache_raw = anime_dir / f"{cache_base}_raw.jpg"
|
||||||
|
|
||||||
|
fallback_base = "cover"
|
||||||
|
fallback_cropped = anime_dir / f"{fallback_base}.jpg"
|
||||||
|
fallback_raw = anime_dir / f"{fallback_base}_raw.jpg"
|
||||||
|
|
||||||
|
# 1. Кеш .jpg существует → проверить и при необходимости обрезать
|
||||||
|
if cache_cropped.exists() and cache_cropped.stat().st_size > 0:
|
||||||
|
print(f" 🖼 Обложка из кеша: {cache_cropped.name}")
|
||||||
|
data = cache_cropped.read_bytes()
|
||||||
|
return ensure_cropped(data, cache_cropped, dry_run)
|
||||||
|
|
||||||
|
# 2. Есть _raw → обрезать
|
||||||
|
if cache_raw.exists() and cache_raw.stat().st_size > 0:
|
||||||
|
print(f" 🖼 Найден raw кеш: {cache_raw.name}, обрезаю...")
|
||||||
|
raw_data = cache_raw.read_bytes()
|
||||||
|
cropped = crop_to_square(raw_data)
|
||||||
|
if not dry_run:
|
||||||
|
try:
|
||||||
|
cache_cropped.write_bytes(cropped)
|
||||||
|
print(f" 💾 Сохранён: {cache_cropped.name}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Не удалось сохранить: {e}")
|
||||||
|
return cropped
|
||||||
|
|
||||||
|
# 3. Скачать через API (только если не базовый запрос)
|
||||||
|
is_base = (cache_base == fallback_base)
|
||||||
|
|
||||||
|
if not is_base:
|
||||||
|
print(f" 🔍 Поиск обложки: {anime_name} "
|
||||||
|
f"[{media_info.get('media', '')} "
|
||||||
|
f"{media_info.get('season', '') or ''}]")
|
||||||
|
url, matched = search_cover_with_fallback(anime_name, media_info)
|
||||||
|
|
||||||
|
if url:
|
||||||
|
print(f" 🌐 Скачиваю: {url}")
|
||||||
|
raw_data = download_image(url)
|
||||||
|
|
||||||
|
if raw_data:
|
||||||
|
cropped = crop_to_square(raw_data)
|
||||||
|
if not dry_run:
|
||||||
|
try:
|
||||||
|
cache_raw.write_bytes(raw_data)
|
||||||
|
cache_cropped.write_bytes(cropped)
|
||||||
|
print(f" 💾 Кеш: {cache_raw.name} + "
|
||||||
|
f"{cache_cropped.name}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Не удалось сохранить кеш: {e}")
|
||||||
|
return cropped
|
||||||
|
|
||||||
|
# 4. Fallback: .jpg → проверить и обрезать
|
||||||
|
if fallback_cropped.exists() and fallback_cropped.stat().st_size > 0:
|
||||||
|
print(f" 🖼 Fallback: {fallback_cropped.name}")
|
||||||
|
data = fallback_cropped.read_bytes()
|
||||||
|
return ensure_cropped(data, fallback_cropped, dry_run)
|
||||||
|
|
||||||
|
# 5. Fallback: _raw → обрезать
|
||||||
|
if fallback_raw.exists() and fallback_raw.stat().st_size > 0:
|
||||||
|
print(f" 🖼 Fallback raw: {fallback_raw.name}, обрезаю...")
|
||||||
|
raw_data = fallback_raw.read_bytes()
|
||||||
|
cropped = crop_to_square(raw_data)
|
||||||
|
if not dry_run:
|
||||||
|
try:
|
||||||
|
fallback_cropped.write_bytes(cropped)
|
||||||
|
print(f" 💾 Сохранён: {fallback_cropped.name}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Не удалось сохранить: {e}")
|
||||||
|
return cropped
|
||||||
|
|
||||||
|
# 6. Скачать базовую
|
||||||
|
print(f" 🔍 Поиск базовой обложки: {anime_name}")
|
||||||
|
base_info = {"media": None, "season": None}
|
||||||
|
url, matched = search_cover_with_fallback(anime_name, base_info)
|
||||||
|
|
||||||
|
if not url:
|
||||||
|
print(f" ⚠ Обложка не найдена")
|
||||||
|
return b""
|
||||||
|
|
||||||
|
print(f" 🌐 Скачиваю: {url}")
|
||||||
|
raw_data = download_image(url)
|
||||||
|
|
||||||
|
if not raw_data:
|
||||||
|
return b""
|
||||||
|
|
||||||
|
cropped = crop_to_square(raw_data)
|
||||||
|
|
||||||
|
if not dry_run:
|
||||||
|
try:
|
||||||
|
fallback_raw.write_bytes(raw_data)
|
||||||
|
fallback_cropped.write_bytes(cropped)
|
||||||
|
print(f" 💾 Кеш: {fallback_raw.name} + "
|
||||||
|
f"{fallback_cropped.name}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠ Не удалось сохранить кеш: {e}")
|
||||||
|
|
||||||
|
return cropped
|
||||||
|
|
||||||
|
|
||||||
|
def detect_mime(image_data: bytes) -> str:
|
||||||
|
"""Определить MIME-тип по заголовку файла."""
|
||||||
|
if image_data[:3] == b'\xff\xd8\xff':
|
||||||
|
return "image/jpeg"
|
||||||
|
elif image_data[:8] == b'\x89PNG\r\n\x1a\n':
|
||||||
|
return "image/png"
|
||||||
|
elif image_data[:4] == b'RIFF' and image_data[8:12] == b'WEBP':
|
||||||
|
return "image/webp"
|
||||||
|
return "image/jpeg"
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Запись тегов и обложки
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def write_tags(filepath: Path, anime_source: str, anime_source_full: str,
|
||||||
|
cover_data: bytes = b"", dry_run: bool = False) -> bool:
|
||||||
|
"""Записать теги и обложку в аудиофайл."""
|
||||||
|
if dry_run:
|
||||||
|
return True
|
||||||
|
|
||||||
|
ext = filepath.suffix.lower()
|
||||||
|
mime = detect_mime(cover_data) if cover_data else ""
|
||||||
|
|
||||||
|
try:
|
||||||
|
if ext == ".mp3":
|
||||||
|
audio = MP3(str(filepath))
|
||||||
|
if audio.tags is None:
|
||||||
|
audio.add_tags()
|
||||||
|
audio.tags.add(TXXX(encoding=3, desc="ANIME_SOURCE", text=[anime_source]))
|
||||||
|
audio.tags.add(TXXX(encoding=3, desc="ANIME_SOURCE_FULL", text=[anime_source_full]))
|
||||||
|
audio.tags.add(COMM(encoding=3, lang='eng', desc='', text=[anime_source_full]))
|
||||||
|
if cover_data:
|
||||||
|
audio.tags.delall("APIC")
|
||||||
|
audio.tags.add(APIC(
|
||||||
|
encoding=3,
|
||||||
|
mime=mime,
|
||||||
|
type=3,
|
||||||
|
desc='Cover',
|
||||||
|
data=cover_data,
|
||||||
|
))
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".flac":
|
||||||
|
audio = FLAC(str(filepath))
|
||||||
|
audio["ANIME_SOURCE"] = anime_source
|
||||||
|
audio["ANIME_SOURCE_FULL"] = anime_source_full
|
||||||
|
audio["COMMENT"] = anime_source_full
|
||||||
|
if cover_data:
|
||||||
|
pic = Picture()
|
||||||
|
pic.type = 3
|
||||||
|
pic.mime = mime
|
||||||
|
pic.desc = 'Cover'
|
||||||
|
pic.data = cover_data
|
||||||
|
audio.clear_pictures()
|
||||||
|
audio.add_picture(pic)
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".ogg":
|
||||||
|
audio = OggVorbis(str(filepath))
|
||||||
|
audio["ANIME_SOURCE"] = anime_source
|
||||||
|
audio["ANIME_SOURCE_FULL"] = anime_source_full
|
||||||
|
audio["COMMENT"] = anime_source_full
|
||||||
|
if cover_data:
|
||||||
|
import base64
|
||||||
|
pic = Picture()
|
||||||
|
pic.type = 3
|
||||||
|
pic.mime = mime
|
||||||
|
pic.desc = 'Cover'
|
||||||
|
pic.data = cover_data
|
||||||
|
audio["metadata_block_picture"] = [
|
||||||
|
base64.b64encode(pic.write()).decode("ascii")
|
||||||
|
]
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".opus":
|
||||||
|
audio = OggOpus(str(filepath))
|
||||||
|
audio["ANIME_SOURCE"] = anime_source
|
||||||
|
audio["ANIME_SOURCE_FULL"] = anime_source_full
|
||||||
|
audio["COMMENT"] = anime_source_full
|
||||||
|
if cover_data:
|
||||||
|
import base64
|
||||||
|
pic = Picture()
|
||||||
|
pic.type = 3
|
||||||
|
pic.mime = mime
|
||||||
|
pic.desc = 'Cover'
|
||||||
|
pic.data = cover_data
|
||||||
|
audio["metadata_block_picture"] = [
|
||||||
|
base64.b64encode(pic.write()).decode("ascii")
|
||||||
|
]
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
elif ext == ".m4a":
|
||||||
|
audio = MP4(str(filepath))
|
||||||
|
audio["----:com.apple.iTunes:ANIME_SOURCE"] = [anime_source.encode("utf-8")]
|
||||||
|
audio["----:com.apple.iTunes:ANIME_SOURCE_FULL"] = [anime_source_full.encode("utf-8")]
|
||||||
|
audio["\xa9cmt"] = [anime_source_full]
|
||||||
|
if cover_data:
|
||||||
|
fmt = MP4Cover.FORMAT_JPEG
|
||||||
|
if mime == "image/png":
|
||||||
|
fmt = MP4Cover.FORMAT_PNG
|
||||||
|
audio["covr"] = [MP4Cover(cover_data, imageformat=fmt)]
|
||||||
|
audio.save()
|
||||||
|
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ✗ Ошибка: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Сбор уникальных обложек по директории
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def collect_unique_covers(audio_files: list, anime_name: str, anime_dir: Path,
|
||||||
|
dry_run: bool) -> dict:
|
||||||
|
"""
|
||||||
|
Собирает все уникальные media_info из файлов директории,
|
||||||
|
скачивает обложки и возвращает словарь {cache_base: cover_bytes}.
|
||||||
|
"""
|
||||||
|
covers = {}
|
||||||
|
|
||||||
|
for filepath in audio_files:
|
||||||
|
track_type = extract_type(filepath.name)
|
||||||
|
media_info = parse_media_info(track_type)
|
||||||
|
cache_base = get_cache_base(media_info)
|
||||||
|
|
||||||
|
if cache_base not in covers:
|
||||||
|
cover_data = get_cover(anime_name, anime_dir, media_info, dry_run)
|
||||||
|
covers[cache_base] = cover_data
|
||||||
|
|
||||||
|
return covers
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Обработка директории
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def process_directory(root_path: str, dry_run: bool = False,
|
||||||
|
no_covers: bool = False, proxy: str = ""):
|
||||||
|
"""Обработать директорию."""
|
||||||
|
if proxy:
|
||||||
|
os.environ["http_proxy"] = proxy
|
||||||
|
os.environ["https_proxy"] = proxy
|
||||||
|
|
||||||
|
root = Path(root_path)
|
||||||
|
|
||||||
|
if not root.is_dir():
|
||||||
|
print(f"Директория не найдена: {root_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
prefix = "[DRY] " if dry_run else ""
|
||||||
|
tagged = 0
|
||||||
|
errors = 0
|
||||||
|
covers_found = 0
|
||||||
|
covers_missing = 0
|
||||||
|
|
||||||
|
for anime_dir in sorted(root.iterdir()):
|
||||||
|
if not anime_dir.is_dir():
|
||||||
|
continue
|
||||||
|
|
||||||
|
anime_source = anime_dir.name
|
||||||
|
|
||||||
|
audio_files = sorted([
|
||||||
|
f for f in anime_dir.rglob("*")
|
||||||
|
if f.is_file() and f.suffix.lower() in AUDIO_EXTENSIONS
|
||||||
|
])
|
||||||
|
|
||||||
|
if not audio_files:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"\n{prefix}📁 {anime_source} ({len(audio_files)} файлов)")
|
||||||
|
|
||||||
|
# Собрать и скачать все уникальные обложки
|
||||||
|
covers_cache = {}
|
||||||
|
if not no_covers:
|
||||||
|
covers_cache = collect_unique_covers(
|
||||||
|
audio_files, anime_source, anime_dir, dry_run
|
||||||
|
)
|
||||||
|
|
||||||
|
for filepath in audio_files:
|
||||||
|
track_type = extract_type(filepath.name)
|
||||||
|
media_info = parse_media_info(track_type)
|
||||||
|
|
||||||
|
if track_type:
|
||||||
|
anime_source_full = f"{anime_source}, {track_type}"
|
||||||
|
else:
|
||||||
|
anime_source_full = anime_source
|
||||||
|
|
||||||
|
cover_data = b""
|
||||||
|
if not no_covers:
|
||||||
|
cache_base = get_cache_base(media_info)
|
||||||
|
cover_data = covers_cache.get(cache_base, b"")
|
||||||
|
|
||||||
|
success = write_tags(filepath, anime_source, anime_source_full,
|
||||||
|
cover_data, dry_run)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
tagged += 1
|
||||||
|
cover_status = " 🖼" if cover_data else " ⚠ без обложки"
|
||||||
|
media_str = ""
|
||||||
|
if media_info.get("media"):
|
||||||
|
media_str = f" [{media_info['media']}"
|
||||||
|
if media_info.get("season"):
|
||||||
|
media_str += f"-{media_info['season']}"
|
||||||
|
media_str += "]"
|
||||||
|
print(f" {prefix}✓ {filepath.name}{media_str}{cover_status}")
|
||||||
|
print(f" ANIME_SOURCE = {anime_source}")
|
||||||
|
print(f" ANIME_SOURCE_FULL = {anime_source_full}")
|
||||||
|
|
||||||
|
if cover_data:
|
||||||
|
covers_found += 1
|
||||||
|
else:
|
||||||
|
covers_missing += 1
|
||||||
|
else:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print(f" {'Будет обработано' if dry_run else 'Обработано'}: {tagged}")
|
||||||
|
print(f" Ошибок: {errors}")
|
||||||
|
if not no_covers:
|
||||||
|
print(f" Треков с обложкой: {covers_found}")
|
||||||
|
print(f" Треков без обложки: {covers_missing}")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Тест
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test():
|
||||||
|
"""Тест парсера и API."""
|
||||||
|
|
||||||
|
# Тест extract_type
|
||||||
|
type_examples = [
|
||||||
|
("[AniTousen] TV-1 ED01 - I'm ALIVE! (BECCA).mp3", "TV-1 ED01"),
|
||||||
|
("[AniTousen] TV-2 ED EP12 - Get up! (Asano Masumi).mp3", "TV-2 ED EP12"),
|
||||||
|
("[AniTousen] TV ED01 - Other Side (MIYAVI).mp3", "TV ED01"),
|
||||||
|
("[AniTousen] Movie 1 ED01 - Onaji Sora (Iguchi Yuka).mp3", "Movie 1 ED01"),
|
||||||
|
("[AniTousen] OVA ED01 - Wonderful Carnival (Endou).mp3", "OVA ED01"),
|
||||||
|
("[AniTousen] ONA ED01 - Collage (Sangatsu).mp3", "ONA ED01"),
|
||||||
|
("[AniTousen] Special ED01 - Hirogare Power (Uchida).mp3", "Special ED01"),
|
||||||
|
("[AniTousen] Game 2 OP01 - Gyakko (Sakamoto).mp3", "Game 2 OP01"),
|
||||||
|
]
|
||||||
|
|
||||||
|
print("═══ Тест extract_type ═══\n")
|
||||||
|
ok = 0
|
||||||
|
for filename, expected in type_examples:
|
||||||
|
got = extract_type(filename)
|
||||||
|
match = got == expected
|
||||||
|
status = "✓" if match else "✗"
|
||||||
|
if match:
|
||||||
|
ok += 1
|
||||||
|
print(f" {status} {filename}")
|
||||||
|
print(f" → \"{got}\"" + ("" if match else f" (ожидалось \"{expected}\")"))
|
||||||
|
print(f"\n Пройдено: {ok}/{len(type_examples)}\n")
|
||||||
|
|
||||||
|
# Тест parse_media_info
|
||||||
|
media_examples = [
|
||||||
|
("TV-1 ED01", {"media": "TV", "season": 1}),
|
||||||
|
("TV-2 OP01", {"media": "TV", "season": 2}),
|
||||||
|
("TV ED01", {"media": "TV", "season": None}),
|
||||||
|
("Movie 1 ED01", {"media": "Movie", "season": 1}),
|
||||||
|
("Movie ED01", {"media": "Movie", "season": None}),
|
||||||
|
("OVA ED01", {"media": "OVA", "season": None}),
|
||||||
|
("ONA ED01", {"media": "ONA", "season": None}),
|
||||||
|
("Special ED01", {"media": "Special", "season": None}),
|
||||||
|
("Game 2 OP01", {"media": "Game", "season": 2}),
|
||||||
|
("", {"media": None, "season": None}),
|
||||||
|
]
|
||||||
|
|
||||||
|
print("═══ Тест parse_media_info ═══\n")
|
||||||
|
ok = 0
|
||||||
|
for track_type, expected in media_examples:
|
||||||
|
got = parse_media_info(track_type)
|
||||||
|
match = got == expected
|
||||||
|
status = "✓" if match else "✗"
|
||||||
|
if match:
|
||||||
|
ok += 1
|
||||||
|
print(f" {status} \"{track_type}\" → {got}")
|
||||||
|
if not match:
|
||||||
|
print(f" ожидалось: {expected}")
|
||||||
|
print(f"\n Пройдено: {ok}/{len(media_examples)}\n")
|
||||||
|
|
||||||
|
# Тест build_search_queries
|
||||||
|
query_examples = [
|
||||||
|
("Kuroshitsuji", {"media": "TV", "season": 2}, [
|
||||||
|
"Kuroshitsuji 2nd Season",
|
||||||
|
"Kuroshitsuji Season 2",
|
||||||
|
"Kuroshitsuji 2",
|
||||||
|
"Kuroshitsuji II",
|
||||||
|
"Kuroshitsuji",
|
||||||
|
]),
|
||||||
|
("Fate", {"media": "Movie", "season": 1}, [
|
||||||
|
"Fate Movie 1",
|
||||||
|
"Fate Movie",
|
||||||
|
"Fate",
|
||||||
|
]),
|
||||||
|
("Naruto", {"media": "OVA", "season": None}, [
|
||||||
|
"Naruto OVA",
|
||||||
|
"Naruto",
|
||||||
|
]),
|
||||||
|
("Naruto", {"media": "TV", "season": None}, [
|
||||||
|
"Naruto",
|
||||||
|
]),
|
||||||
|
("Naruto", {"media": None, "season": None}, [
|
||||||
|
"Naruto",
|
||||||
|
]),
|
||||||
|
]
|
||||||
|
print("═══ Тест build_search_queries ═══\n")
|
||||||
|
ok = 0
|
||||||
|
for name, info, expected in query_examples:
|
||||||
|
got = build_search_queries(name, info)
|
||||||
|
match = got == expected
|
||||||
|
status = "✓" if match else "✗"
|
||||||
|
if match:
|
||||||
|
ok += 1
|
||||||
|
print(f" {status} {name} {info}")
|
||||||
|
print(f" → {got}")
|
||||||
|
if not match:
|
||||||
|
print(f" ожидалось: {expected}")
|
||||||
|
print(f"\n Пройдено: {ok}/{len(query_examples)}\n")
|
||||||
|
|
||||||
|
# Тест crop_to_square
|
||||||
|
print("═══ Тест crop_to_square ═══\n")
|
||||||
|
crop_tests = [
|
||||||
|
("Прямоугольник 300x400", (300, 400), (300, 300)),
|
||||||
|
("Прямоугольник 400x300", (400, 300), (300, 300)),
|
||||||
|
("Квадрат 500x500", (500, 500), (500, 500)),
|
||||||
|
]
|
||||||
|
ok = 0
|
||||||
|
for desc, (w, h), (ew, eh) in crop_tests:
|
||||||
|
img = Image.new("RGB", (w, h), color="red")
|
||||||
|
buf = BytesIO()
|
||||||
|
img.save(buf, format="JPEG")
|
||||||
|
raw = buf.getvalue()
|
||||||
|
|
||||||
|
cropped = crop_to_square(raw)
|
||||||
|
result = Image.open(BytesIO(cropped))
|
||||||
|
rw, rh = result.size
|
||||||
|
|
||||||
|
match = rw == ew and rh == eh
|
||||||
|
status = "✓" if match else "✗"
|
||||||
|
if match:
|
||||||
|
ok += 1
|
||||||
|
print(f" {status} {desc}: {w}x{h} → {rw}x{rh}" +
|
||||||
|
("" if match else f" (ожидалось {ew}x{eh})"))
|
||||||
|
print(f"\n Пройдено: {ok}/{len(crop_tests)}\n")
|
||||||
|
|
||||||
|
# Тест get_cache_base
|
||||||
|
print("═══ Тест get_cache_base ═══\n")
|
||||||
|
cache_tests = [
|
||||||
|
({"media": None, "season": None}, "cover"),
|
||||||
|
({"media": "TV", "season": None}, "cover"),
|
||||||
|
({"media": "TV", "season": 1}, "cover_TV-1"),
|
||||||
|
({"media": "TV", "season": 2}, "cover_TV-2"),
|
||||||
|
({"media": "Movie", "season": None}, "cover_Movie"),
|
||||||
|
({"media": "Movie", "season": 1}, "cover_Movie-1"),
|
||||||
|
({"media": "OVA", "season": None}, "cover_OVA"),
|
||||||
|
({"media": "ONA", "season": None}, "cover_ONA"),
|
||||||
|
({"media": "Special", "season": None}, "cover_Special"),
|
||||||
|
({"media": "Game", "season": 2}, "cover_Game-2"),
|
||||||
|
]
|
||||||
|
ok = 0
|
||||||
|
for info, expected in cache_tests:
|
||||||
|
got = get_cache_base(info)
|
||||||
|
match = got == expected
|
||||||
|
status = "✓" if match else "✗"
|
||||||
|
if match:
|
||||||
|
ok += 1
|
||||||
|
print(f" {status} {info} → \"{got}\"" +
|
||||||
|
("" if match else f" (ожидалось \"{expected}\")"))
|
||||||
|
print(f"\n Пройдено: {ok}/{len(cache_tests)}\n")
|
||||||
|
|
||||||
|
# Тест API
|
||||||
|
print("═══ Тест Jikan API ═══\n")
|
||||||
|
api_tests = [
|
||||||
|
("Kuroshitsuji", {"media": None, "season": None}),
|
||||||
|
("Kuroshitsuji", {"media": "TV", "season": 2}),
|
||||||
|
("Naruto", {"media": "Movie", "season": None}),
|
||||||
|
]
|
||||||
|
for name, info in api_tests:
|
||||||
|
queries = build_search_queries(name, info)
|
||||||
|
cache_base = get_cache_base(info)
|
||||||
|
print(f" {name} {info}")
|
||||||
|
print(f" Кеш: {cache_base}.jpg / {cache_base}_raw.jpg")
|
||||||
|
found = False
|
||||||
|
for q in queries:
|
||||||
|
url = fetch_cover_url(q)
|
||||||
|
if url:
|
||||||
|
print(f" ✓ \"{q}\" → {url}")
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print(f" ✗ \"{q}\" → не найдено")
|
||||||
|
if not found:
|
||||||
|
print(f" ⚠ Обложка не найдена ни по одному запросу")
|
||||||
|
print()
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
# Точка входа
|
||||||
|
# ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Тегирование аниме саундтреков")
|
||||||
|
sub = parser.add_subparsers(dest="command")
|
||||||
|
|
||||||
|
p_run = sub.add_parser("run", help="Записать теги")
|
||||||
|
p_run.add_argument("directory", help="Директория с музыкой")
|
||||||
|
p_run.add_argument("--dry-run", action="store_true",
|
||||||
|
help="Только показать, не записывать")
|
||||||
|
p_run.add_argument("--no-covers", action="store_true",
|
||||||
|
help="Не искать и не записывать обложки")
|
||||||
|
p_run.add_argument("--proxy", default="",
|
||||||
|
help="HTTP/SOCKS5 прокси (например socks5h://127.0.0.1:1080)")
|
||||||
|
|
||||||
|
sub.add_parser("test", help="Тест парсера и API")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command == "test":
|
||||||
|
test()
|
||||||
|
elif args.command == "run":
|
||||||
|
process_directory(args.directory, args.dry_run, args.no_covers,
|
||||||
|
getattr(args, 'proxy', ''))
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue