"""Aoi-chan: Matrix notifications for Navidrome and discovery-playlist.""" from __future__ import annotations import html import json import mimetypes import os import re import sqlite3 import threading import time import traceback import uuid from collections import Counter from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any from urllib.parse import quote_plus import requests from flask import Flask, jsonify, request BASE_DIR = Path(__file__).resolve().parent CONFIG_PATH = Path(os.environ.get("AOI_CONFIG", BASE_DIR / "config.json")) STATE_DB = Path(os.environ.get("AOI_STATE_DB", BASE_DIR / "aoi.db")) LOG_PATH = Path(os.environ.get("AOI_LOG", BASE_DIR / "aoi.log")) STATE_LOCK = threading.Lock() DASHBOARD_LOCK = threading.Lock() DASHBOARD_CACHE: dict[str, tuple[float, dict]] = {} RELEASE_COVER_CACHE: dict[str, str] = {} DASHBOARD_CACHE_SECONDS = 60 ARTIST_SIGNAL_CACHE: dict[str, tuple[float, dict]] = {} ARTIST_SIGNAL_TTL_SECONDS = 12 * 60 * 60 def _load_config() -> dict: example = BASE_DIR / "config.example.json" path = CONFIG_PATH if CONFIG_PATH.exists() else example with path.open("r", encoding="utf-8-sig") as f: return json.load(f) CFG = _load_config() def log(message: str) -> None: line = f"{datetime.now(timezone.utc).isoformat()} {message}" print(line, flush=True) try: with LOG_PATH.open("a", encoding="utf-8") as f: f.write(line + "\n") except Exception: pass def now_utc() -> datetime: return datetime.now(timezone.utc) def parse_dt(value: Any) -> datetime | None: if not value: return None if isinstance(value, datetime): return value if value.tzinfo else value.replace(tzinfo=timezone.utc) text = str(value).strip() if not text: return None text = text.replace("Z", "+00:00") for candidate in (text, text.split(".")[0]): try: dt = datetime.fromisoformat(candidate) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) except Exception: pass for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y-%m", "%Y"): try: return datetime.strptime(text[: len(fmt)], fmt).replace(tzinfo=timezone.utc) except Exception: pass return None def parse_date(value: Any) -> datetime | None: dt = parse_dt(value) if dt: return dt text = str(value or "").strip() for fmt in ("%Y", "%Y-%m", "%Y-%m-%d"): try: return datetime.strptime(text, fmt).replace(tzinfo=timezone.utc) except Exception: pass return None def parse_timezone(value: Any) -> timezone: text = str(value or "").strip() match = re.fullmatch(r"UTC([+-])(\d{1,2})(?::(\d{2}))?", text) if not match: return timezone.utc sign = 1 if match.group(1) == "+" else -1 hours = int(match.group(2)) minutes = int(match.group(3) or 0) return timezone(sign * timedelta(hours=hours, minutes=minutes)) def human_delta(target: datetime | None, now: datetime | None = None) -> str: if not target: return "" now = now or now_utc() seconds = max(0, int((target.astimezone(timezone.utc) - now.astimezone(timezone.utc)).total_seconds())) days, rem = divmod(seconds, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) if days: return f"{days}d {hours}h" if hours: return f"{hours}h {minutes}m" return f"{minutes}m" def strip_html(text: str) -> str: text = re.sub(r"", "\n", text or "", flags=re.I) text = re.sub(r"<[^>]+>", "", text) return html.unescape(text).strip() def compact(text: str, limit: int = 700) -> str: text = re.sub(r"\s+", " ", strip_html(text)).strip() if len(text) <= limit: return text return text[: limit - 1].rstrip() + "…" def clean_lastfm_text(text: str) -> str: text = re.sub(r"Read more on Last\.fm.*$", "", text or "", flags=re.I | re.S) text = re.sub(r"User-contributed text.*$", "", text, flags=re.I | re.S) return re.sub(r"\s+", " ", strip_html(text)).strip() def parse_int(value: Any) -> int: try: return int(str(value or "0").replace(",", "").strip() or 0) except Exception: return 0 def init_state() -> None: with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn: conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=30000") conn.execute( """ CREATE TABLE IF NOT EXISTS sent_events ( kind TEXT NOT NULL, event_key TEXT NOT NULL, payload_json TEXT, created_at TEXT NOT NULL, PRIMARY KEY(kind, event_key) ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS cursors ( key TEXT PRIMARY KEY, value TEXT, updated_at TEXT NOT NULL ) """ ) conn.commit() def already_sent(kind: str, key: str) -> bool: with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn: conn.execute("PRAGMA busy_timeout=30000") row = conn.execute( "SELECT 1 FROM sent_events WHERE kind=? AND event_key=?", (kind, key), ).fetchone() return bool(row) def mark_sent(kind: str, key: str, payload: dict | None = None) -> None: with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn: conn.execute("PRAGMA busy_timeout=30000") conn.execute( """ INSERT OR IGNORE INTO sent_events(kind,event_key,payload_json,created_at) VALUES(?,?,?,?) """, (kind, key, json.dumps(payload or {}, ensure_ascii=False), now_utc().isoformat()), ) conn.commit() def get_cursor(key: str) -> str: with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn: conn.execute("PRAGMA busy_timeout=30000") row = conn.execute("SELECT value FROM cursors WHERE key=?", (key,)).fetchone() return row[0] if row else "" def set_cursor(key: str, value: str) -> None: with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn: conn.execute("PRAGMA busy_timeout=30000") conn.execute( """ INSERT INTO cursors(key,value,updated_at) VALUES(?,?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at """, (key, value, now_utc().isoformat()), ) conn.commit() def request_session() -> requests.Session: sess = requests.Session() proxy = (CFG.get("metadata") or {}).get("proxy") or "" if proxy: sess.proxies.update({"http": proxy, "https": proxy}) return sess HTTP = request_session() LASTFM_API_KEY_CACHE: str | None = None class MatrixClient: def __init__(self, cfg: dict): self.homeserver = cfg["homeserver"].rstrip("/") self.room_id = cfg["room_id"] self.token = cfg["access_token"] self.user_id = cfg.get("user_id") or "" @property def headers(self) -> dict: return {"Authorization": f"Bearer {self.token}"} def set_profile(self) -> None: bot = CFG.get("bot") or {} name = bot.get("name") if name and self.user_id: url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/displayname" r = requests.put(url, headers=self.headers, json={"displayname": name}, timeout=20) if not r.ok: log(f"matrix display name update failed: {r.status_code} {r.text[:200]}") avatar_path = bot.get("avatar_path") or "" if avatar_path: path = Path(avatar_path) if path.exists() and self.user_id: mxc = self.upload_file(path.read_bytes(), path.name, mimetypes.guess_type(path.name)[0] or "image/jpeg") if mxc: url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/avatar_url" r = requests.put(url, headers=self.headers, json={"avatar_url": mxc}, timeout=20) if not r.ok: log(f"matrix avatar update failed: {r.status_code} {r.text[:200]}") def upload_file(self, data: bytes, filename: str, content_type: str) -> str: r = requests.post( f"{self.homeserver}/_matrix/media/v3/upload", headers={**self.headers, "Content-Type": content_type}, params={"filename": filename}, data=data, timeout=45, ) r.raise_for_status() return r.json()["content_uri"] def send_text(self, text: str, formatted_html: str | None = None) -> dict: body = {"msgtype": "m.text", "body": text} if formatted_html: body["format"] = "org.matrix.custom.html" body["formatted_body"] = formatted_html txn = uuid.uuid4().hex url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}" r = requests.put(url, headers=self.headers, json=body, timeout=30) r.raise_for_status() return r.json() def send_image_bytes(self, data: bytes, filename: str, content_type: str = "image/jpeg") -> dict: mxc = self.upload_file(data, filename, content_type) body = { "msgtype": "m.image", "body": filename, "url": mxc, "info": {"mimetype": content_type, "size": len(data)}, } txn = uuid.uuid4().hex url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}" r = requests.put(url, headers=self.headers, json=body, timeout=30) r.raise_for_status() return r.json() def send_card(self, text: str, formatted_html: str, image: tuple[bytes, str, str] | None = None) -> None: if image: try: self.send_image_bytes(*image) except Exception as e: log(f"matrix image send failed: {e}") self.send_text(text, formatted_html) class NavidromeClient: def __init__(self, cfg: dict): self.base_url = cfg["url"].rstrip("/") self.username = cfg["username"] self.password = cfg["password"] def _params(self) -> dict: return { "u": self.username, "p": self.password, "v": "1.16.1", "c": "AoiNotifier", "f": "json", } def get(self, endpoint: str, **params) -> dict: r = requests.get( f"{self.base_url}/rest/{endpoint}", params={**self._params(), **params}, timeout=35, ) r.raise_for_status() data = r.json().get("subsonic-response", {}) if data.get("status") != "ok": raise RuntimeError(f"Navidrome {endpoint} failed: {data}") return data def ping(self) -> bool: try: return self.get("ping").get("status") == "ok" except Exception: return False def newest_albums(self, music_folder_id: int, size: int) -> list[dict]: data = self.get("getAlbumList2", type="newest", size=size, musicFolderId=music_folder_id) return data.get("albumList2", {}).get("album", []) or [] def album(self, album_id: str) -> dict: return self.get("getAlbum", id=album_id).get("album", {}) or {} def playlists(self) -> list[dict]: data = self.get("getPlaylists") return data.get("playlists", {}).get("playlist", []) or [] def playlist(self, playlist_id: str) -> dict: return self.get("getPlaylist", id=playlist_id).get("playlist", {}) or {} def search3(self, query: str, song_count: int = 5, album_count: int = 0, artist_count: int = 0) -> dict: return self.get( "search3", query=query, songCount=song_count, albumCount=album_count, artistCount=artist_count, ) def cover(self, cover_art: str | None) -> tuple[bytes, str, str] | None: if not cover_art: return None try: r = requests.get( f"{self.base_url}/rest/getCoverArt", params={**self._params(), "id": cover_art, "size": 900}, timeout=35, ) r.raise_for_status() ctype = r.headers.get("Content-Type") or "image/jpeg" ext = mimetypes.guess_extension(ctype.split(";")[0]) or ".jpg" return (r.content, f"cover{ext}", ctype) except Exception as e: log(f"cover fetch failed: {e}") return None MATRIX = MatrixClient(CFG["matrix"]) NAV = NavidromeClient(CFG["navidrome"]) def metadata_headers() -> dict: ua = (CFG.get("metadata") or {}).get("user_agent") or "AoiNotifier/1.0" return {"User-Agent": ua, "Accept": "application/json"} def discovery_setting(key: str) -> str: db_path = (CFG.get("discovery") or {}).get("db_path") or "" if not db_path or not Path(db_path).exists(): return "" try: with sqlite3.connect(db_path) as conn: row = conn.execute("SELECT value FROM user_settings WHERE key=?", (key,)).fetchone() return str(row[0] or "") if row else "" except Exception as e: log(f"discovery setting lookup failed for {key}: {e}") return "" def lastfm_api_key() -> str: global LASTFM_API_KEY_CACHE if LASTFM_API_KEY_CACHE is not None: return LASTFM_API_KEY_CACHE LASTFM_API_KEY_CACHE = (CFG.get("metadata") or {}).get("lastfm_api_key") or discovery_setting("lastfm_api_key") return LASTFM_API_KEY_CACHE def lastfm_request(method: str, params: dict, timeout: int = 25) -> dict: api_key = lastfm_api_key() if not api_key: return {} req_params = { "method": method, "api_key": api_key, "format": "json", "autocorrect": 1, **params, } try: r = HTTP.get("https://ws.audioscrobbler.com/2.0/", params=req_params, timeout=timeout) if not r.ok: return {} return r.json() except Exception as e: log(f"lastfm {method} failed: {e}") return {} def lastfm_album_tags(artist: str, album: str, limit: int = 6) -> list[str]: data = lastfm_request("album.getTopTags", {"artist": artist, "album": album}, timeout=20) tags = [] for tag in ((data.get("toptags") or {}).get("tag") or [])[:limit]: name = (tag.get("name") or "").strip() if name and len(name) <= 32 and not name.isdigit(): tags.append(name) return tags def lastfm_artist_summary(artist: str) -> str: for params in ({"artist": artist, "lang": "ru"}, {"artist": artist}): data = lastfm_request("artist.getinfo", params, timeout=20) bio = ((data.get("artist") or {}).get("bio") or {}) summary = clean_lastfm_text(bio.get("summary") or bio.get("content") or "") if summary: return summary return "" def lastfm_artist_popularity(artist: str) -> dict: data = lastfm_request("artist.getinfo", {"artist": artist}, timeout=18) artist_data = data.get("artist") or {} stats = artist_data.get("stats") or {} listeners = parse_int(stats.get("listeners")) playcount = parse_int(stats.get("playcount")) mbid = artist_data.get("mbid") or "" if listeners or playcount or mbid: return {"listeners": listeners, "playcount": playcount, "mbid": mbid} data = lastfm_request("artist.search", {"artist": artist, "limit": 1}, timeout=18) matches = ((data.get("results") or {}).get("artistmatches") or {}).get("artist") or [] if isinstance(matches, dict): matches = [matches] match = matches[0] if matches else {} return { "listeners": parse_int(match.get("listeners")), "playcount": parse_int(match.get("playcount")), "mbid": match.get("mbid") or "", } def listenbrainz_headers() -> dict: token = (CFG.get("metadata") or {}).get("listenbrainz_token") or discovery_setting("lb_token") headers = metadata_headers() if token: headers["Authorization"] = f"Token {token}" return headers def listenbrainz_artist_popularity(artist_mbid: str) -> dict: if not artist_mbid: return {"listen_count": 0} try: r = HTTP.get( f"https://api.listenbrainz.org/1/popularity/top-recordings-for-artist/{artist_mbid}", params={"count": 25}, headers=listenbrainz_headers(), timeout=25, ) if not r.ok: return {"listen_count": 0} data = r.json() if isinstance(data, dict): items = data.get("payload") or [] elif isinstance(data, list): items = data else: items = [] return {"listen_count": sum(parse_int(item.get("listen_count")) for item in items if isinstance(item, dict))} except Exception as e: log(f"listenbrainz artist popularity failed: {e}") return {"listen_count": 0} def log10_signal(value: Any) -> float: import math return math.log10(max(parse_int(value), 0) + 1) def artist_signal_score(track_count: int, lastfm: dict, listenbrainz: dict) -> float: return ( track_count * 2.0 + log10_signal(lastfm.get("listeners")) * 2.5 + log10_signal(lastfm.get("playcount")) * 1.2 + log10_signal(listenbrainz.get("listen_count")) * 2.0 ) def lastfm_album_search_url(artist: str, album: str) -> str: return f"https://www.last.fm/search/albums?q={quote_plus(f'{artist} {album}')}" def bandcamp_album_search_url(artist: str, album: str) -> str: return f"https://bandcamp.com/search?q={quote_plus(f'{artist} {album}')}&item_type=a" def album_source_links(artist: str, album: str, lastfm_url: str = "") -> list[dict[str, str]]: return [ {"label": "Last.fm", "url": lastfm_url or lastfm_album_search_url(artist, album)}, {"label": "Bandcamp", "url": bandcamp_album_search_url(artist, album)}, ] def lastfm_album_info(artist: str, album: str) -> dict[str, str]: for params in ({"artist": artist, "album": album, "lang": "ru"}, {"artist": artist, "album": album}): data = lastfm_request("album.getinfo", params) album_data = data.get("album") or {} wiki = album_data.get("wiki") or {} summary = clean_lastfm_text(wiki.get("summary") or wiki.get("content") or "") if summary: return {"summary": summary, "url": album_data.get("url") or ""} tags = lastfm_album_tags(artist, album) artist_summary = lastfm_artist_summary(artist) parts = [] if tags: parts.append("Last.fm описывает релиз тегами: " + ", ".join(tags) + ".") if artist_summary: parts.append(artist_summary) if parts: return {"summary": " ".join(parts), "url": ""} return {"summary": "", "url": ""} def lastfm_album_summary(artist: str, album: str) -> str: return lastfm_album_info(artist, album).get("summary", "") def lastfm_album_image(artist: str, album: str) -> str: data = lastfm_request("album.getinfo", {"artist": artist, "album": album}, timeout=20) images = (data.get("album") or {}).get("image") or [] for image in reversed(images): url = image.get("#text") or "" if url: return url return "" def musicbrainz_release_hint(artist: str, album: str) -> dict: if not (CFG.get("metadata") or {}).get("musicbrainz_enabled", True): return {} try: query = f'artist:"{artist}" AND release:"{album}"' r = HTTP.get( "https://musicbrainz.org/ws/2/release/", params={"query": query, "fmt": "json", "limit": 3}, headers=metadata_headers(), timeout=25, ) if not r.ok: return {} releases = r.json().get("releases") or [] if not releases: return {} rel = releases[0] return { "date": rel.get("date") or "", "country": rel.get("country") or "", "status": rel.get("status") or "", "score": rel.get("score") or 0, } except Exception as e: log(f"musicbrainz lookup failed: {e}") return {} def lastfm_tags_for_tracks(tracks: list[dict]) -> Counter: api_key = lastfm_api_key() tags = Counter() if not api_key: return tags for tr in tracks[:8]: try: r = HTTP.get( "https://ws.audioscrobbler.com/2.0/", params={ "method": "track.getTopTags", "artist": tr.get("artist") or "", "track": tr.get("track") or tr.get("title") or "", "api_key": api_key, "format": "json", "autocorrect": 1, }, timeout=20, ) if not r.ok: continue for tag in ((r.json().get("toptags") or {}).get("tag") or [])[:5]: name = (tag.get("name") or "").lower().strip() if name and len(name) <= 32: tags[name] += 1 except Exception: continue return tags def ranked_playlist_artists(tracks: list[dict], limit: int = 5) -> list[dict]: grouped: dict[str, dict] = {} for track in tracks: artist = (track.get("artist") or "").strip() if not artist: continue key = re.sub(r"\s+", " ", artist.lower()).strip() item = grouped.setdefault(key, {"name": artist, "track_count": 0, "mbids": set()}) item["track_count"] += 1 if track.get("mbid_artist"): item["mbids"].add(track.get("mbid_artist")) ranked = [] now = time.time() for key, item in grouped.items(): cached = ARTIST_SIGNAL_CACHE.get(key) if cached and now - cached[0] < ARTIST_SIGNAL_TTL_SECONDS: signals = cached[1] else: lastfm = lastfm_artist_popularity(item["name"]) mbid = next(iter(item["mbids"]), "") or lastfm.get("mbid") or "" listenbrainz = listenbrainz_artist_popularity(mbid) signals = {"lastfm": lastfm, "listenbrainz": listenbrainz} ARTIST_SIGNAL_CACHE[key] = (now, signals) ranked.append({ "name": item["name"], "track_count": item["track_count"], "score": artist_signal_score(item["track_count"], signals["lastfm"], signals["listenbrainz"]), "lastfm_listeners": signals["lastfm"].get("listeners", 0), "lastfm_playcount": signals["lastfm"].get("playcount", 0), "listenbrainz_listens": signals["listenbrainz"].get("listen_count", 0), }) ranked.sort(key=lambda x: x["score"], reverse=True) return ranked[:limit] def album_plain_and_html(album: dict, library_label: str, songs: list[dict]) -> tuple[str, str]: artist = album.get("artist") or "Unknown artist" title = album.get("name") or album.get("album") or "Unknown album" year = album.get("year") or "" duration = int(album.get("duration") or sum(int(s.get("duration") or 0) for s in songs) or 0) minutes = duration // 60 if duration else 0 lastfm_info = lastfm_album_info(artist, title) summary = lastfm_info.get("summary") or "" links = album_source_links(artist, title, lastfm_info.get("url") or "") hint = musicbrainz_release_hint(artist, title) release_date = album.get("created") or album.get("starred") or hint.get("date") or "" track_lines = [] for idx, song in enumerate(songs[:30], 1): name = song.get("title") or "Untitled" dur = int(song.get("duration") or 0) suffix = f" ({dur // 60}:{dur % 60:02d})" if dur else "" track_lines.append(f"{idx:02d}. {name}{suffix}") if len(songs) > 30: track_lines.append(f"... и еще {len(songs) - 30}") header = f"🎧 Новый релиз в Navidrome / {library_label}" facts = [ f"{artist} — {title}" + (f" ({year})" if year else ""), f"Треков: {len(songs)}" + (f", длительность: {minutes} мин." if minutes else ""), ] if release_date: facts.append(f"Дата/добавление: {release_date}") if hint.get("country") or hint.get("status"): facts.append("MusicBrainz: " + ", ".join(x for x in [hint.get("country"), hint.get("status")] if x)) if summary: description_fact = f"Описание: {summary}" else: description_fact = "Описание: пока не нашла нормальное описание, оставляю чистый треклист." facts.append(description_fact) facts.append("Ссылки: " + ", ".join(f"{link['label']}: {link['url']}" for link in links)) plain = "\n".join([header, "", *facts, "", "━━ ТРЕКЛИСТ ━━", *track_lines]) html_parts = [ f"

{html.escape(header)}

", f"{html.escape(artist)} — {html.escape(title)}" + (f" ({html.escape(str(year))})" if year else ""), "") html_parts.append(f"

{html.escape(description_fact)}

") html_parts.append( "

Ссылки: " + ", ".join( f'{html.escape(link["label"])}' for link in links ) + "

" ) html_parts.append("

Треклист

    ") for line in track_lines: html_parts.append(f"
  1. {html.escape(line[4:] if re.match(r'^\\d\\d\\. ', line) else line)}
  2. ") html_parts.append("
") return plain, "\n".join(html_parts) def poll_navidrome_albums(force: bool = False) -> dict: cfg = CFG["navidrome"] libraries = [ ("main", int(cfg.get("main_library_id") or 0), "main"), ("anime", int(cfg.get("anime_library_id") or 0), "anime"), ] size = int(cfg.get("album_poll_size") or 40) baseline = (CFG.get("polling") or {}).get("baseline_existing_on_first_run", True) first_run = not get_cursor("navidrome_albums_baselined") sent = 0 seen = 0 for kind, folder_id, label in libraries: if not folder_id: continue for album in NAV.newest_albums(folder_id, size): album_id = str(album.get("id") or "") if not album_id: continue key = f"{kind}:{album_id}" if already_sent("navidrome_album", key): continue seen += 1 if first_run and baseline and not force: mark_sent("navidrome_album", key, album) continue full_album = NAV.album(album_id) songs = full_album.get("song") or [] plain, formatted = album_plain_and_html(full_album or album, label, songs) image = NAV.cover((full_album or album).get("coverArt")) MATRIX.send_card(plain, formatted, image) mark_sent("navidrome_album", key, {"album": album, "sent_at": now_utc().isoformat()}) sent += 1 time.sleep(1) set_cursor("navidrome_albums_baselined", now_utc().isoformat()) return {"ok": True, "seen_new": seen, "sent": sent, "first_run": first_run} def _loose_text_key(value: Any) -> str: text = str(value or "").casefold() text = text.replace("’", "'").replace("`", "'") return re.sub(r"[^a-z0-9а-яё]+", "", text, flags=re.I) def find_navidrome_album(artist: str, album: str) -> dict: artist_key = _loose_text_key(artist) album_key = _loose_text_key(album) if not artist_key or not album_key: return {} try: results = NAV.search3(f"{artist} {album}", song_count=0, album_count=8) albums = (results.get("searchResult3") or {}).get("album") or [] matches = [] for item in albums: item_artist_key = _loose_text_key(item.get("artist") or "") item_album_key = _loose_text_key(item.get("name") or item.get("album") or "") if ( item.get("id") and (artist_key in item_artist_key or item_artist_key in artist_key) and (album_key in item_album_key or item_album_key in album_key) ): try: full = NAV.album(str(item.get("id"))) song_count = len(full.get("song") or []) merged = {**item, **full, "_song_count": song_count} matches.append(merged) except Exception: item["_song_count"] = int(item.get("songCount") or 0) matches.append(item) if matches: matches.sort(key=lambda item: int(item.get("_song_count") or item.get("songCount") or 0), reverse=True) return matches[0] except Exception as e: log(f"liked album Navidrome search failed for {artist} - {album}: {e}") return {} def downloaded_library_album_rows(limit: int = 50) -> list[dict]: conn = discovery_conn() if not conn: return [] try: rows = conn.execute( """ SELECT id, playlist_id, artist, album, generation_id, moved_to_library, created_at FROM downloaded_albums WHERE moved_to_library=1 ORDER BY id DESC LIMIT ? """, (limit,), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def liked_album_plain_and_html(row: dict, full_album: dict, songs: list[dict]) -> tuple[str, str]: album = dict(full_album or {}) album.setdefault("artist", row.get("artist") or "Unknown artist") album.setdefault("name", row.get("album") or "Unknown album") plain, formatted = album_plain_and_html(album, "liked", songs) plain = plain.replace("Новый релиз в Navidrome / liked", "Лайкнутый альбом добавлен в библиотеку", 1) formatted = formatted.replace("Новый релиз в Navidrome / liked", "Лайкнутый альбом добавлен в библиотеку", 1) return plain, formatted def poll_liked_library_albums(force: bool = False) -> dict: polling_cfg = CFG.get("polling") or {} baseline = polling_cfg.get("baseline_existing_on_first_run", True) first_run = not get_cursor("liked_library_albums_baselined") sent = 0 seen = 0 waiting = 0 for row in downloaded_library_album_rows(int(polling_cfg.get("liked_album_poll_size") or 50)): key = f"{row.get('id')}:{row.get('created_at')}" if already_sent("liked_library_album", key): continue seen += 1 if first_run and baseline and not force: mark_sent("liked_library_album", key, row) continue artist = row.get("artist") or "" album = row.get("album") or "" nav_album = find_navidrome_album(artist, album) if not nav_album: waiting += 1 log(f"liked album {artist!r} - {album!r} not visible in Navidrome yet — deferring") continue full_album = nav_album if nav_album.get("song") else NAV.album(str(nav_album.get("id"))) songs = full_album.get("song") or [] plain, formatted = liked_album_plain_and_html(row, full_album or nav_album, songs) image = NAV.cover((full_album or nav_album).get("coverArt")) MATRIX.send_card(plain, formatted, image) mark_sent( "liked_library_album", key, { "album": row, "sent_at": now_utc().isoformat(), "navidrome_id": nav_album.get("id"), "song_count": len(songs), }, ) sent += 1 time.sleep(1) set_cursor("liked_library_albums_baselined", now_utc().isoformat()) return {"ok": True, "seen_new": seen, "sent": sent, "waiting": waiting, "first_run": first_run} def discovery_conn() -> sqlite3.Connection | None: db_path = (CFG.get("discovery") or {}).get("db_path") or "" if not db_path or not Path(db_path).exists(): log(f"discovery db not found: {db_path}") return None conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn def playlist_rows(limit: int = 20) -> list[sqlite3.Row]: conn = discovery_conn() if not conn: return [] try: return conn.execute( """ SELECT * FROM playlists ORDER BY COALESCE(last_generated, created_at) DESC LIMIT ? """, (limit,), ).fetchall() finally: conn.close() def playlist_tracks(playlist_id: int) -> list[dict]: conn = discovery_conn() if not conn: return [] try: rows = conn.execute( """ SELECT * FROM playlist_tracks WHERE playlist_id=? ORDER BY score DESC, added_at DESC """, (playlist_id,), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def playlist_download_stats(playlist_id: int) -> dict: conn = discovery_conn() if not conn: return {"total": 0, "done": 0, "final": 0, "running": False, "done_track_ids": set()} try: playlist = conn.execute( "SELECT download_running FROM playlists WHERE id=?", (playlist_id,), ).fetchone() rows = conn.execute( """ SELECT status, COUNT(*) AS count FROM download_status WHERE playlist_id=? GROUP BY status """, (playlist_id,), ).fetchall() done_rows = conn.execute( """ SELECT track_id FROM download_status WHERE playlist_id=? AND status='done' AND track_id IS NOT NULL """, (playlist_id,), ).fetchall() counts = {str(row["status"]): int(row["count"]) for row in rows} final = sum(counts.get(status, 0) for status in ("done", "error", "skipped", "failed")) total = sum(counts.values()) return { "total": total, "done": counts.get("done", 0), "error": counts.get("error", 0), "skipped": counts.get("skipped", 0), "failed": counts.get("failed", 0), "final": final, "running": bool(playlist["download_running"]) if playlist else False, "done_track_ids": {int(row["track_id"]) for row in done_rows if row["track_id"] is not None}, } finally: conn.close() def navidrome_find_playlist(name: str) -> dict | None: """Return Navidrome playlist (basic info from getPlaylists) whose name matches `name` exactly, case-insensitive.""" try: needle = (name or "").strip().lower() if not needle: return None for pl in NAV.playlists(): if (pl.get("name") or "").strip().lower() == needle: return pl except Exception as e: log(f"navidrome lookup failed for {name!r}: {e}") return None def navidrome_playlist_cover_for(nav_pl: dict) -> tuple[bytes, str, str] | None: try: full = NAV.playlist(str(nav_pl.get("id"))) if full.get("coverArt"): return NAV.cover(full.get("coverArt")) entries = full.get("entry") or [] if entries: return NAV.cover(entries[0].get("coverArt")) except Exception as e: log(f"navidrome playlist cover failed: {e}") return None def navidrome_playlist_cover(name: str) -> tuple[bytes, str, str] | None: pl = navidrome_find_playlist(name) if not pl: return None return navidrome_playlist_cover_for(pl) def discovery_playlist_ready(pl: dict, nav_pl: dict | None, nav_min_tracks: int) -> tuple[bool, dict]: stats = playlist_download_stats(int(pl["id"])) if stats["running"]: return False, {**stats, "reason": "download_running", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)} if stats["total"] <= 0 or stats["final"] < stats["total"]: return False, {**stats, "reason": "download_not_final", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)} if stats["done"] <= 0: return False, {**stats, "reason": "nothing_downloaded", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)} nav_song_count = int((nav_pl or {}).get("songCount") or 0) required = max(nav_min_tracks, stats["done"]) if not nav_pl or nav_song_count < required: return False, {**stats, "reason": "not_in_navidrome", "navidrome_tracks": nav_song_count, "required_navidrome_tracks": required} return True, {**stats, "reason": "ready", "navidrome_tracks": nav_song_count, "required_navidrome_tracks": required} PROFILE_LABELS = { "deep_cuts": "глубокие находки", "fresh_picks": "свежие рекомендации", "comfort_zone": "комфортная зона", } def playlist_plain_and_html(pl: dict, tracks: list[dict], readiness: dict | None = None) -> tuple[str, str]: name = pl.get("name") or "Discovery" created = pl.get("last_generated") or pl.get("created_at") or "" top_tracks = tracks[:8] top_lines = [f"• {t.get('artist')} — {t.get('track')}" for t in top_tracks] profile_counts = Counter((t.get("profile") or "").strip() for t in tracks if t.get("profile")) profile_text = ", ".join( f"{PROFILE_LABELS.get(k, k)}: {v}" for k, v in profile_counts.most_common() ) or "смешанный режим" tags = lastfm_tags_for_tracks(tracks) tag_text = ", ".join(tag for tag, _ in tags.most_common(6)) if not tag_text: tag_text = profile_text ranked_artists = ranked_playlist_artists(tracks) artist_text = ", ".join(a["name"] for a in ranked_artists) download_text = "" if readiness: download_text = ( f"Скачано: {readiness.get('done', 0)}, " f"в Navidrome: {readiness.get('navidrome_tracks', 0)}" ) summary = ( f"В этот раз уклон: {tag_text}. " f"Из заметного: {', '.join(line[2:] for line in top_lines[:4])}." ) plain = "\n".join([ "🧭 Новый Discovery-плейлист", "", name, f"Треков: {len(tracks)}", download_text, f"Собран: {created}", f"Профиль: {profile_text}", f"Главные артисты: {artist_text or 'пока не выделяются'}", "", summary, "", "━━ ИНТЕРЕСНОЕ ━━", *top_lines, ]) formatted = "\n".join([ "

🧭 Новый Discovery-плейлист

", f"{html.escape(name)}", "", f"

{html.escape(summary)}

", "

Интересное

", ]) return plain, formatted def poll_discovery_playlists(force: bool = False) -> dict: polling_cfg = CFG.get("polling") or {} baseline = polling_cfg.get("baseline_existing_on_first_run", True) nav_min_tracks = int(polling_cfg.get("discovery_min_tracks_in_navidrome", 1)) stale_after_hours = float(polling_cfg.get("discovery_stale_after_hours", 48)) first_run = not get_cursor("discovery_playlists_baselined") sent = 0 seen = 0 waiting = 0 for row in playlist_rows(): pl = dict(row) stamp = pl.get("last_generated") or pl.get("created_at") or "" if not stamp: continue key = f"{pl.get('id')}:{stamp}:{pl.get('iteration_number', 0)}" if already_sent("discovery_playlist", key): continue seen += 1 name = pl.get("name") or "" nav_pl = navidrome_find_playlist(name) ready, readiness = discovery_playlist_ready(pl, nav_pl, nav_min_tracks) nav_song_count = int(readiness.get("navidrome_tracks") or 0) if not ready: # Not yet downloaded or visible in Navidrome — defer notification # until the playlist is actually playable. Do NOT mark_sent so the # next poll re-checks. stamp_dt = parse_dt(stamp) age_hours = ((now_utc() - stamp_dt).total_seconds() / 3600) if stamp_dt else 0.0 if age_hours > stale_after_hours: log( f"discovery_playlist {name!r} stale: age={age_hours:.1f}h, " f"reason={readiness.get('reason')} navidrome_present={bool(nav_pl)}, " f"nav_songs={nav_song_count} done={readiness.get('done', 0)} — " f"marking sent without notification (will not retry)" ) mark_sent("discovery_playlist", key, { "playlist": pl, "readiness": {k: v for k, v in readiness.items() if k != "done_track_ids"}, "skipped_stale": True, }) else: waiting += 1 log( f"discovery_playlist {name!r} not ready for notification " f"(reason={readiness.get('reason')} nav_present={bool(nav_pl)} " f"nav_songs={nav_song_count} done={readiness.get('done', 0)} " f"need>={readiness.get('required_navidrome_tracks', nav_min_tracks)} " f"age={age_hours:.1f}h) — deferring" ) continue if first_run and baseline and not force: mark_sent("discovery_playlist", key, { "playlist": pl, "readiness": {k: v for k, v in readiness.items() if k != "done_track_ids"}, "baselined_at": now_utc().isoformat(), }) continue tracks = playlist_tracks(int(pl["id"])) done_track_ids = readiness.get("done_track_ids") or set() ready_tracks = [track for track in tracks if int(track.get("id") or 0) in done_track_ids] or tracks plain, formatted = playlist_plain_and_html(pl, ready_tracks, readiness) image = navidrome_playlist_cover_for(nav_pl) MATRIX.send_card(plain, formatted, image) mark_sent( "discovery_playlist", key, { "playlist": pl, "sent_at": now_utc().isoformat(), "navidrome_id": nav_pl.get("id"), "navidrome_song_count": nav_song_count, "readiness": {k: v for k, v in readiness.items() if k != "done_track_ids"}, }, ) sent += 1 time.sleep(1) set_cursor("discovery_playlists_baselined", now_utc().isoformat()) return { "ok": True, "seen_new": seen, "sent": sent, "waiting": waiting, "first_run": first_run, } def release_rows(limit: int = 120) -> list[dict]: conn = discovery_conn() if not conn: return [] try: rows = conn.execute( """ SELECT * FROM artist_release_watch ORDER BY COALESCE(downloaded_at, first_seen_at) DESC LIMIT ? """, (limit,), ).fetchall() return [dict(r) for r in rows] finally: conn.close() def release_plain_and_html(row: dict, mode: str) -> tuple[str, str]: artist = row.get("artist") or "Unknown artist" album = row.get("album") or "Unknown release" date = row.get("release_date") or "дата уточняется" source = row.get("source") or "discovery-playlist" tracks = int(row.get("desired_track_count") or 0) status = row.get("status") or "" if mode == "out": title = "📀 Релиз вышел" lead = f"{artist} — {album} уже можно забирать." else: title = "📡 Анонс релиза отслеживаемого артиста" lead = f"{artist} — {album} появился в release watch." facts = [ lead, f"Дата релиза: {date}", f"Статус: {status}", f"Источник: {source}", ] if tracks: facts.append(f"Ожидаемый треклист: {tracks} треков") if row.get("error_message"): facts.append(f"Заметка: {compact(row.get('error_message'), 240)}") plain = "\n".join([title, "", *facts]) formatted = "\n".join([ f"

{html.escape(title)}

", f"

{html.escape(artist)} — {html.escape(album)}

", "", ]) return plain, formatted def release_is_out(row: dict) -> bool: dt = parse_date(row.get("release_date")) return bool(dt and dt.date() == now_utc().date()) def release_is_announcement(row: dict) -> bool: dt = parse_date(row.get("release_date")) return bool(dt and dt.date() > now_utc().date()) def poll_release_watch(force: bool = False) -> dict: baseline = (CFG.get("polling") or {}).get("baseline_existing_on_first_run", True) first_run = not get_cursor("release_watch_baselined") announced = 0 out = 0 skipped = 0 for row in release_rows(): row_id = row.get("id") first_seen = row.get("first_seen_at") or "" announce_key = f"{row_id}:{first_seen}" if release_is_announcement(row) and not already_sent("release_announcement", announce_key): if first_run and baseline and not force: mark_sent("release_announcement", announce_key, row) else: plain, formatted = release_plain_and_html(row, "announce") MATRIX.send_card(plain, formatted, None) mark_sent("release_announcement", announce_key, row) announced += 1 time.sleep(1) elif not release_is_announcement(row): skipped += 1 if release_is_out(row): out_stamp = row.get("downloaded_at") or row.get("release_date") or row.get("last_checked_at") or "" out_key = f"{row_id}:{out_stamp}:{row.get('status')}" if already_sent("release_out", out_key): continue if first_run and baseline and not force: mark_sent("release_out", out_key, row) else: plain, formatted = release_plain_and_html(row, "out") MATRIX.send_card(plain, formatted, None) mark_sent("release_out", out_key, row) out += 1 time.sleep(1) set_cursor("release_watch_baselined", now_utc().isoformat()) return {"ok": True, "announced": announced, "out": out, "skipped_not_current_or_future": skipped, "first_run": first_run} def loop_forever(name: str, interval: int, fn) -> None: while True: try: result = fn() log(f"{name}: {result}") except Exception as e: log(f"{name} failed: {e}\n{traceback.format_exc()}") time.sleep(interval) app = Flask(__name__) @app.get("/healthz") def healthz(): return jsonify({"ok": True}) def sent_event_rows(kind: str, limit: int) -> list[dict]: with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn: conn.execute("PRAGMA busy_timeout=30000") rows = conn.execute( """ SELECT payload_json, created_at FROM sent_events WHERE kind=? ORDER BY created_at DESC LIMIT ? """, (kind, limit), ).fetchall() out = [] for payload_json, created_at in rows: try: payload = json.loads(payload_json or "{}") except Exception: payload = {} out.append({"payload": payload, "created_at": created_at}) return out def navidrome_cover_url(cover_art: Any) -> str: cover_art = str(cover_art or "").strip() return f"/covers/navidrome/{cover_art}" if cover_art else "" def dashboard_albums(limit: int) -> list[dict]: items = [] for row in sent_event_rows("navidrome_album", limit): album = row["payload"].get("album") or row["payload"] items.append( { "artist": album.get("artist") or "Unknown artist", "title": album.get("name") or album.get("album") or "Unknown album", "year": album.get("year") or "", "song_count": album.get("songCount") or album.get("song_count") or "", "created": album.get("created") or row["created_at"], "cover_url": navidrome_cover_url(album.get("coverArt")), } ) return items def playlist_cover_art(nav_pl: dict) -> str: cover_art = nav_pl.get("coverArt") or "" if cover_art: return cover_art try: full = NAV.playlist(str(nav_pl.get("id"))) cover_art = full.get("coverArt") or "" if cover_art: return cover_art entries = full.get("entry") or [] if entries: return entries[0].get("coverArt") or "" except Exception as e: if "playlist not found" not in str(e).lower(): log(f"dashboard playlist cover failed: {e}") return "" def playlist_track_cover_art(playlist_id: Any) -> str: try: for track in playlist_tracks(int(playlist_id))[:5]: query = " ".join(str(v or "") for v in (track.get("artist"), track.get("track"))).strip() if not query: continue results = NAV.search3(query, song_count=3) songs = (results.get("searchResult3") or {}).get("song") or [] for song in songs: if song.get("coverArt"): return song.get("coverArt") or "" except Exception as e: log(f"dashboard playlist track cover failed: {e}") return "" def release_cover_art(artist: str, album: str) -> str: cache_key = f"{artist}\0{album}".casefold() if cache_key in RELEASE_COVER_CACHE: return RELEASE_COVER_CACHE[cache_key] try: results = NAV.search3(f"{artist} {album}", song_count=0, album_count=3) albums = (results.get("searchResult3") or {}).get("album") or [] artist_l = artist.casefold() album_l = album.casefold() for item in albums: item_artist = str(item.get("artist") or "").casefold() item_album = str(item.get("name") or "").casefold() if item.get("coverArt") and (artist_l in item_artist or item_artist in artist_l) and (album_l in item_album or item_album in album_l): RELEASE_COVER_CACHE[cache_key] = item.get("coverArt") or "" return RELEASE_COVER_CACHE[cache_key] for item in albums: if item.get("coverArt"): RELEASE_COVER_CACHE[cache_key] = item.get("coverArt") or "" return RELEASE_COVER_CACHE[cache_key] except Exception as e: log(f"dashboard release cover failed: {e}") RELEASE_COVER_CACHE[cache_key] = "" return "" def playlist_next_generation(playlist: dict) -> datetime | None: now = now_utc() interval = playlist.get("generation_interval_minutes") last_generated = parse_dt(playlist.get("last_generated")) or parse_dt(playlist.get("created_at")) if interval and last_generated: try: target = last_generated + timedelta(minutes=int(interval)) while target <= now: target += timedelta(minutes=int(interval)) return target except Exception: pass time_text = str(playlist.get("generation_time") or "").strip() if not time_text: return None tz = parse_timezone(playlist.get("generation_timezone")) local_now = now.astimezone(tz) try: hour, minute = [int(part) for part in time_text.split(":", 1)] except Exception: return None days_raw = playlist.get("generation_days") days: list[int] = [] if days_raw: try: parsed = json.loads(days_raw) if isinstance(days_raw, str) else days_raw days = [int(day) for day in parsed] except Exception: days = [] for offset in range(0, 14): candidate_date = local_now.date() + timedelta(days=offset) candidate = datetime(candidate_date.year, candidate_date.month, candidate_date.day, hour, minute, tzinfo=tz) # Python Monday is 0; stored schedule uses 1 for Monday. weekday = candidate.weekday() + 1 if days and weekday not in days: continue if candidate > local_now: return candidate.astimezone(timezone.utc) return None def dashboard_playlists(limit: int) -> list[dict]: items = [] seen_names = set() for row in sent_event_rows("discovery_playlist", limit * 3): payload = row["payload"] playlist = payload.get("playlist") or payload name = playlist.get("name") or "Discovery" name_key = name.casefold() if name_key in seen_names: continue seen_names.add(name_key) navidrome_id = payload.get("navidrome_id") or "" cover_art = "" if navidrome_id: cover_art = playlist_cover_art({"id": navidrome_id}) if not cover_art: nav_pl = navidrome_find_playlist(name) if nav_pl: cover_art = playlist_cover_art(nav_pl) next_generation = playlist_next_generation(playlist) items.append( { "name": name, "track_count": playlist.get("track_count") or playlist.get("songCount") or "", "generated": playlist.get("last_generated") or playlist.get("created_at") or row["created_at"], "next_generation": next_generation.isoformat() if next_generation else "", "next_generation_in": human_delta(next_generation) if next_generation else "", "cover_url": navidrome_cover_url(cover_art), } ) if len(items) >= limit: break return items def dashboard_releases(limit: int) -> list[dict]: items = [] seen = set() for row in sent_event_rows("release_out", limit * 2): release = row["payload"] artist = release.get("artist") or "Unknown artist" album = release.get("album") or "Unknown release" key = (artist.lower(), album.lower()) if key in seen: continue seen.add(key) cover_art = release_cover_art(artist, album) items.append( { "artist": artist, "title": album, "release_date": release.get("release_date") or "", "status": release.get("status") or "", "source": release.get("source") or "", "cover_url": navidrome_cover_url(cover_art), } ) if len(items) >= limit: break return items @app.get("/dashboard") def dashboard(): album_limit = min(max(int(request.args.get("album_limit", 8)), 1), 24) playlist_limit = min(max(int(request.args.get("playlist_limit", 8)), 1), 24) release_limit = min(max(int(request.args.get("release_limit", 10)), 1), 24) cache_key = f"{album_limit}:{playlist_limit}:{release_limit}" now_ts = time.monotonic() with DASHBOARD_LOCK: cached = DASHBOARD_CACHE.get(cache_key) if cached and now_ts - cached[0] < DASHBOARD_CACHE_SECONDS: return jsonify(cached[1]) payload = { "ok": True, "albums": dashboard_albums(album_limit), "playlists": dashboard_playlists(playlist_limit), "releases": dashboard_releases(release_limit), } DASHBOARD_CACHE[cache_key] = (time.monotonic(), payload) return jsonify(payload) @app.post("/run/navidrome-albums") def run_navidrome_albums(): payload = request.get_json(silent=True) or {} return jsonify(poll_navidrome_albums(force=bool(payload.get("force")))) @app.post("/run/liked-albums") def run_liked_albums(): payload = request.get_json(silent=True) or {} return jsonify(poll_liked_library_albums(force=bool(payload.get("force")))) @app.post("/run/discovery-playlists") def run_discovery_playlists(): payload = request.get_json(silent=True) or {} return jsonify(poll_discovery_playlists(force=bool(payload.get("force")))) @app.post("/run/release-watch") def run_release_watch(): payload = request.get_json(silent=True) or {} return jsonify(poll_release_watch(force=bool(payload.get("force")))) def start_background() -> None: polling = CFG.get("polling") or {} tasks = [ ("navidrome_albums", int(polling.get("navidrome_album_interval_seconds") or 300), poll_navidrome_albums), ("liked_library_albums", int(polling.get("liked_album_interval_seconds") or 300), poll_liked_library_albums), ("discovery_playlists", int(polling.get("discovery_playlist_interval_seconds") or 180), poll_discovery_playlists), ("release_watch", int(polling.get("release_watch_interval_seconds") or 600), poll_release_watch), ] for name, interval, fn in tasks: threading.Thread(target=loop_forever, args=(name, interval, fn), daemon=True).start() if __name__ == "__main__": init_state() try: MATRIX.set_profile() except Exception as e: log(f"matrix profile setup failed: {e}") if not NAV.ping(): log("warning: Navidrome ping failed") start_background() server = CFG.get("server") or {} host = server.get("host") or "0.0.0.0" port = int(server.get("port") or 18323) try: from waitress import serve serve(app, host=host, port=port) except Exception: app.run(host=host, port=port)