diff --git a/aoi.py b/aoi.py index 9a78aa9..bd271d4 100644 --- a/aoi.py +++ b/aoi.py @@ -27,6 +27,12 @@ CONFIG_PATH = Path(os.environ.get("AOI_CONFIG", BASE_DIR / "config.json")) STATE_DB = Path(os.environ.get("AOI_STATE_DB", BASE_DIR / "aoi.db")) LOG_PATH = Path(os.environ.get("AOI_LOG", BASE_DIR / "aoi.log")) STATE_LOCK = threading.Lock() +DASHBOARD_LOCK = threading.Lock() +DASHBOARD_CACHE: dict[str, tuple[float, dict]] = {} +RELEASE_COVER_CACHE: dict[str, str] = {} +DASHBOARD_CACHE_SECONDS = 60 +ARTIST_SIGNAL_CACHE: dict[str, tuple[float, dict]] = {} +ARTIST_SIGNAL_TTL_SECONDS = 12 * 60 * 60 def _load_config() -> dict: @@ -89,6 +95,32 @@ def parse_date(value: Any) -> datetime | None: return None +def parse_timezone(value: Any) -> timezone: + text = str(value or "").strip() + match = re.fullmatch(r"UTC([+-])(\d{1,2})(?::(\d{2}))?", text) + if not match: + return timezone.utc + sign = 1 if match.group(1) == "+" else -1 + hours = int(match.group(2)) + minutes = int(match.group(3) or 0) + return timezone(sign * timedelta(hours=hours, minutes=minutes)) + + +def human_delta(target: datetime | None, now: datetime | None = None) -> str: + if not target: + return "" + now = now or now_utc() + seconds = max(0, int((target.astimezone(timezone.utc) - now.astimezone(timezone.utc)).total_seconds())) + days, rem = divmod(seconds, 86400) + hours, rem = divmod(rem, 3600) + minutes, _ = divmod(rem, 60) + if days: + return f"{days}d {hours}h" + if hours: + return f"{hours}h {minutes}m" + return f"{minutes}m" + + def strip_html(text: str) -> str: text = re.sub(r"", "\n", text or "", flags=re.I) text = re.sub(r"<[^>]+>", "", text) @@ -108,6 +140,13 @@ def clean_lastfm_text(text: str) -> str: return re.sub(r"\s+", " ", strip_html(text)).strip() +def parse_int(value: Any) -> int: + try: + return int(str(value or "0").replace(",", "").strip() or 0) + except Exception: + return 0 + + def init_state() -> None: with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn: conn.execute("PRAGMA journal_mode=WAL") @@ -313,6 +352,15 @@ class NavidromeClient: def playlist(self, playlist_id: str) -> dict: return self.get("getPlaylist", id=playlist_id).get("playlist", {}) or {} + def search3(self, query: str, song_count: int = 5, album_count: int = 0, artist_count: int = 0) -> dict: + return self.get( + "search3", + query=query, + songCount=song_count, + albumCount=album_count, + artistCount=artist_count, + ) + def cover(self, cover_art: str | None) -> tuple[bytes, str, str] | None: if not cover_art: return None @@ -402,6 +450,76 @@ def lastfm_artist_summary(artist: str) -> str: return "" +def lastfm_artist_popularity(artist: str) -> dict: + data = lastfm_request("artist.getinfo", {"artist": artist}, timeout=18) + artist_data = data.get("artist") or {} + stats = artist_data.get("stats") or {} + listeners = parse_int(stats.get("listeners")) + playcount = parse_int(stats.get("playcount")) + mbid = artist_data.get("mbid") or "" + if listeners or playcount or mbid: + return {"listeners": listeners, "playcount": playcount, "mbid": mbid} + + data = lastfm_request("artist.search", {"artist": artist, "limit": 1}, timeout=18) + matches = ((data.get("results") or {}).get("artistmatches") or {}).get("artist") or [] + if isinstance(matches, dict): + matches = [matches] + match = matches[0] if matches else {} + return { + "listeners": parse_int(match.get("listeners")), + "playcount": parse_int(match.get("playcount")), + "mbid": match.get("mbid") or "", + } + + +def listenbrainz_headers() -> dict: + token = (CFG.get("metadata") or {}).get("listenbrainz_token") or discovery_setting("lb_token") + headers = metadata_headers() + if token: + headers["Authorization"] = f"Token {token}" + return headers + + +def listenbrainz_artist_popularity(artist_mbid: str) -> dict: + if not artist_mbid: + return {"listen_count": 0} + try: + r = HTTP.get( + f"https://api.listenbrainz.org/1/popularity/top-recordings-for-artist/{artist_mbid}", + params={"count": 25}, + headers=listenbrainz_headers(), + timeout=25, + ) + if not r.ok: + return {"listen_count": 0} + data = r.json() + if isinstance(data, dict): + items = data.get("payload") or [] + elif isinstance(data, list): + items = data + else: + items = [] + return {"listen_count": sum(parse_int(item.get("listen_count")) for item in items if isinstance(item, dict))} + except Exception as e: + log(f"listenbrainz artist popularity failed: {e}") + return {"listen_count": 0} + + +def log10_signal(value: Any) -> float: + import math + + return math.log10(max(parse_int(value), 0) + 1) + + +def artist_signal_score(track_count: int, lastfm: dict, listenbrainz: dict) -> float: + return ( + track_count * 2.0 + + log10_signal(lastfm.get("listeners")) * 2.5 + + log10_signal(lastfm.get("playcount")) * 1.2 + + log10_signal(listenbrainz.get("listen_count")) * 2.0 + ) + + def lastfm_album_search_url(artist: str, album: str) -> str: return f"https://www.last.fm/search/albums?q={quote_plus(f'{artist} {album}')}" @@ -441,6 +559,17 @@ def lastfm_album_info(artist: str, album: str) -> dict[str, str]: def lastfm_album_summary(artist: str, album: str) -> str: return lastfm_album_info(artist, album).get("summary", "") + +def lastfm_album_image(artist: str, album: str) -> str: + data = lastfm_request("album.getinfo", {"artist": artist, "album": album}, timeout=20) + images = (data.get("album") or {}).get("image") or [] + for image in reversed(images): + url = image.get("#text") or "" + if url: + return url + return "" + + def musicbrainz_release_hint(artist: str, album: str) -> dict: if not (CFG.get("metadata") or {}).get("musicbrainz_enabled", True): return {} @@ -499,6 +628,44 @@ def lastfm_tags_for_tracks(tracks: list[dict]) -> Counter: return tags +def ranked_playlist_artists(tracks: list[dict], limit: int = 5) -> list[dict]: + grouped: dict[str, dict] = {} + for track in tracks: + artist = (track.get("artist") or "").strip() + if not artist: + continue + key = re.sub(r"\s+", " ", artist.lower()).strip() + item = grouped.setdefault(key, {"name": artist, "track_count": 0, "mbids": set()}) + item["track_count"] += 1 + if track.get("mbid_artist"): + item["mbids"].add(track.get("mbid_artist")) + + ranked = [] + now = time.time() + for key, item in grouped.items(): + cached = ARTIST_SIGNAL_CACHE.get(key) + if cached and now - cached[0] < ARTIST_SIGNAL_TTL_SECONDS: + signals = cached[1] + else: + lastfm = lastfm_artist_popularity(item["name"]) + mbid = next(iter(item["mbids"]), "") or lastfm.get("mbid") or "" + listenbrainz = listenbrainz_artist_popularity(mbid) + signals = {"lastfm": lastfm, "listenbrainz": listenbrainz} + ARTIST_SIGNAL_CACHE[key] = (now, signals) + + ranked.append({ + "name": item["name"], + "track_count": item["track_count"], + "score": artist_signal_score(item["track_count"], signals["lastfm"], signals["listenbrainz"]), + "lastfm_listeners": signals["lastfm"].get("listeners", 0), + "lastfm_playcount": signals["lastfm"].get("playcount", 0), + "listenbrainz_listens": signals["listenbrainz"].get("listen_count", 0), + }) + + ranked.sort(key=lambda x: x["score"], reverse=True) + return ranked[:limit] + + def album_plain_and_html(album: dict, library_label: str, songs: list[dict]) -> tuple[str, str]: artist = album.get("artist") or "Unknown artist" title = album.get("name") or album.get("album") or "Unknown album" @@ -600,6 +767,118 @@ def poll_navidrome_albums(force: bool = False) -> dict: return {"ok": True, "seen_new": seen, "sent": sent, "first_run": first_run} +def _loose_text_key(value: Any) -> str: + text = str(value or "").casefold() + text = text.replace("’", "'").replace("`", "'") + return re.sub(r"[^a-z0-9а-яё]+", "", text, flags=re.I) + + +def find_navidrome_album(artist: str, album: str) -> dict: + artist_key = _loose_text_key(artist) + album_key = _loose_text_key(album) + if not artist_key or not album_key: + return {} + try: + results = NAV.search3(f"{artist} {album}", song_count=0, album_count=8) + albums = (results.get("searchResult3") or {}).get("album") or [] + matches = [] + for item in albums: + item_artist_key = _loose_text_key(item.get("artist") or "") + item_album_key = _loose_text_key(item.get("name") or item.get("album") or "") + if ( + item.get("id") + and (artist_key in item_artist_key or item_artist_key in artist_key) + and (album_key in item_album_key or item_album_key in album_key) + ): + try: + full = NAV.album(str(item.get("id"))) + song_count = len(full.get("song") or []) + merged = {**item, **full, "_song_count": song_count} + matches.append(merged) + except Exception: + item["_song_count"] = int(item.get("songCount") or 0) + matches.append(item) + if matches: + matches.sort(key=lambda item: int(item.get("_song_count") or item.get("songCount") or 0), reverse=True) + return matches[0] + except Exception as e: + log(f"liked album Navidrome search failed for {artist} - {album}: {e}") + return {} + + +def downloaded_library_album_rows(limit: int = 50) -> list[dict]: + conn = discovery_conn() + if not conn: + return [] + try: + rows = conn.execute( + """ + SELECT id, playlist_id, artist, album, generation_id, moved_to_library, created_at + FROM downloaded_albums + WHERE moved_to_library=1 + ORDER BY id DESC + LIMIT ? + """, + (limit,), + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def liked_album_plain_and_html(row: dict, full_album: dict, songs: list[dict]) -> tuple[str, str]: + album = dict(full_album or {}) + album.setdefault("artist", row.get("artist") or "Unknown artist") + album.setdefault("name", row.get("album") or "Unknown album") + plain, formatted = album_plain_and_html(album, "liked", songs) + plain = plain.replace("Новый релиз в Navidrome / liked", "Лайкнутый альбом добавлен в библиотеку", 1) + formatted = formatted.replace("Новый релиз в Navidrome / liked", "Лайкнутый альбом добавлен в библиотеку", 1) + return plain, formatted + + +def poll_liked_library_albums(force: bool = False) -> dict: + polling_cfg = CFG.get("polling") or {} + baseline = polling_cfg.get("baseline_existing_on_first_run", True) + first_run = not get_cursor("liked_library_albums_baselined") + sent = 0 + seen = 0 + waiting = 0 + for row in downloaded_library_album_rows(int(polling_cfg.get("liked_album_poll_size") or 50)): + key = f"{row.get('id')}:{row.get('created_at')}" + if already_sent("liked_library_album", key): + continue + seen += 1 + if first_run and baseline and not force: + mark_sent("liked_library_album", key, row) + continue + artist = row.get("artist") or "" + album = row.get("album") or "" + nav_album = find_navidrome_album(artist, album) + if not nav_album: + waiting += 1 + log(f"liked album {artist!r} - {album!r} not visible in Navidrome yet — deferring") + continue + full_album = nav_album if nav_album.get("song") else NAV.album(str(nav_album.get("id"))) + songs = full_album.get("song") or [] + plain, formatted = liked_album_plain_and_html(row, full_album or nav_album, songs) + image = NAV.cover((full_album or nav_album).get("coverArt")) + MATRIX.send_card(plain, formatted, image) + mark_sent( + "liked_library_album", + key, + { + "album": row, + "sent_at": now_utc().isoformat(), + "navidrome_id": nav_album.get("id"), + "song_count": len(songs), + }, + ) + sent += 1 + time.sleep(1) + set_cursor("liked_library_albums_baselined", now_utc().isoformat()) + return {"ok": True, "seen_new": seen, "sent": sent, "waiting": waiting, "first_run": first_run} + + def discovery_conn() -> sqlite3.Connection | None: db_path = (CFG.get("discovery") or {}).get("db_path") or "" if not db_path or not Path(db_path).exists(): @@ -645,6 +924,49 @@ def playlist_tracks(playlist_id: int) -> list[dict]: conn.close() +def playlist_download_stats(playlist_id: int) -> dict: + conn = discovery_conn() + if not conn: + return {"total": 0, "done": 0, "final": 0, "running": False, "done_track_ids": set()} + try: + playlist = conn.execute( + "SELECT download_running FROM playlists WHERE id=?", + (playlist_id,), + ).fetchone() + rows = conn.execute( + """ + SELECT status, COUNT(*) AS count + FROM download_status + WHERE playlist_id=? + GROUP BY status + """, + (playlist_id,), + ).fetchall() + done_rows = conn.execute( + """ + SELECT track_id + FROM download_status + WHERE playlist_id=? AND status='done' AND track_id IS NOT NULL + """, + (playlist_id,), + ).fetchall() + counts = {str(row["status"]): int(row["count"]) for row in rows} + final = sum(counts.get(status, 0) for status in ("done", "error", "skipped", "failed")) + total = sum(counts.values()) + return { + "total": total, + "done": counts.get("done", 0), + "error": counts.get("error", 0), + "skipped": counts.get("skipped", 0), + "failed": counts.get("failed", 0), + "final": final, + "running": bool(playlist["download_running"]) if playlist else False, + "done_track_ids": {int(row["track_id"]) for row in done_rows if row["track_id"] is not None}, + } + finally: + conn.close() + + def navidrome_find_playlist(name: str) -> dict | None: """Return Navidrome playlist (basic info from getPlaylists) whose name matches `name` exactly, case-insensitive.""" try: @@ -679,6 +1001,23 @@ def navidrome_playlist_cover(name: str) -> tuple[bytes, str, str] | None: return navidrome_playlist_cover_for(pl) +def discovery_playlist_ready(pl: dict, nav_pl: dict | None, nav_min_tracks: int) -> tuple[bool, dict]: + stats = playlist_download_stats(int(pl["id"])) + if stats["running"]: + return False, {**stats, "reason": "download_running", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)} + if stats["total"] <= 0 or stats["final"] < stats["total"]: + return False, {**stats, "reason": "download_not_final", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)} + if stats["done"] <= 0: + return False, {**stats, "reason": "nothing_downloaded", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)} + + nav_song_count = int((nav_pl or {}).get("songCount") or 0) + required = max(nav_min_tracks, stats["done"]) + if not nav_pl or nav_song_count < required: + return False, {**stats, "reason": "not_in_navidrome", "navidrome_tracks": nav_song_count, "required_navidrome_tracks": required} + + return True, {**stats, "reason": "ready", "navidrome_tracks": nav_song_count, "required_navidrome_tracks": required} + + PROFILE_LABELS = { "deep_cuts": "глубокие находки", "fresh_picks": "свежие рекомендации", @@ -686,7 +1025,7 @@ PROFILE_LABELS = { } -def playlist_plain_and_html(pl: dict, tracks: list[dict]) -> tuple[str, str]: +def playlist_plain_and_html(pl: dict, tracks: list[dict], readiness: dict | None = None) -> tuple[str, str]: name = pl.get("name") or "Discovery" created = pl.get("last_generated") or pl.get("created_at") or "" top_tracks = tracks[:8] @@ -699,8 +1038,14 @@ def playlist_plain_and_html(pl: dict, tracks: list[dict]) -> tuple[str, str]: tag_text = ", ".join(tag for tag, _ in tags.most_common(6)) if not tag_text: tag_text = profile_text - artists = Counter((t.get("artist") or "").strip() for t in tracks if t.get("artist")) - artist_text = ", ".join(a for a, _ in artists.most_common(5)) + ranked_artists = ranked_playlist_artists(tracks) + artist_text = ", ".join(a["name"] for a in ranked_artists) + download_text = "" + if readiness: + download_text = ( + f"Скачано: {readiness.get('done', 0)}, " + f"в Navidrome: {readiness.get('navidrome_tracks', 0)}" + ) summary = ( f"В этот раз уклон: {tag_text}. " f"Из заметного: {', '.join(line[2:] for line in top_lines[:4])}." @@ -710,6 +1055,7 @@ def playlist_plain_and_html(pl: dict, tracks: list[dict]) -> tuple[str, str]: "", name, f"Треков: {len(tracks)}", + download_text, f"Собран: {created}", f"Профиль: {profile_text}", f"Главные артисты: {artist_text or 'пока не выделяются'}", @@ -724,6 +1070,7 @@ def playlist_plain_and_html(pl: dict, tracks: list[dict]) -> tuple[str, str]: f"{html.escape(name)}", "