"""Aoi-chan: Matrix notifications for Navidrome and discovery-playlist."""
from __future__ import annotations
import html
import json
import mimetypes
import os
import re
import sqlite3
import threading
import time
import traceback
import uuid
from collections import Counter
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any
from urllib.parse import quote_plus
import requests
from flask import Flask, Response, jsonify, request
BASE_DIR = Path(__file__).resolve().parent
CONFIG_PATH = Path(os.environ.get("AOI_CONFIG", BASE_DIR / "config.json"))
STATE_DB = Path(os.environ.get("AOI_STATE_DB", BASE_DIR / "aoi.db"))
LOG_PATH = Path(os.environ.get("AOI_LOG", BASE_DIR / "aoi.log"))
STATE_LOCK = threading.Lock()
DASHBOARD_LOCK = threading.Lock()
DASHBOARD_CACHE: dict[str, tuple[float, dict]] = {}
RELEASE_COVER_CACHE: dict[str, str] = {}
DASHBOARD_CACHE_SECONDS = 60
ARTIST_SIGNAL_CACHE: dict[str, tuple[float, dict]] = {}
ARTIST_SIGNAL_TTL_SECONDS = 12 * 60 * 60
def _load_config() -> dict:
example = BASE_DIR / "config.example.json"
path = CONFIG_PATH if CONFIG_PATH.exists() else example
with path.open("r", encoding="utf-8-sig") as f:
return json.load(f)
CFG = _load_config()
def log(message: str) -> None:
line = f"{datetime.now(timezone.utc).isoformat()} {message}"
print(line, flush=True)
try:
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception:
pass
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def parse_dt(value: Any) -> datetime | None:
if not value:
return None
if isinstance(value, datetime):
return value if value.tzinfo else value.replace(tzinfo=timezone.utc)
text = str(value).strip()
if not text:
return None
text = text.replace("Z", "+00:00")
for candidate in (text, text.split(".")[0]):
try:
dt = datetime.fromisoformat(candidate)
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
except Exception:
pass
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y-%m", "%Y"):
try:
return datetime.strptime(text[: len(fmt)], fmt).replace(tzinfo=timezone.utc)
except Exception:
pass
return None
def parse_date(value: Any) -> datetime | None:
dt = parse_dt(value)
if dt:
return dt
text = str(value or "").strip()
for fmt in ("%Y", "%Y-%m", "%Y-%m-%d"):
try:
return datetime.strptime(text, fmt).replace(tzinfo=timezone.utc)
except Exception:
pass
return None
def parse_timezone(value: Any) -> timezone:
text = str(value or "").strip()
match = re.fullmatch(r"UTC([+-])(\d{1,2})(?::(\d{2}))?", text)
if not match:
return timezone.utc
sign = 1 if match.group(1) == "+" else -1
hours = int(match.group(2))
minutes = int(match.group(3) or 0)
return timezone(sign * timedelta(hours=hours, minutes=minutes))
def human_delta(target: datetime | None, now: datetime | None = None) -> str:
if not target:
return ""
now = now or now_utc()
seconds = max(0, int((target.astimezone(timezone.utc) - now.astimezone(timezone.utc)).total_seconds()))
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
if days:
return f"{days}d {hours}h"
if hours:
return f"{hours}h {minutes}m"
return f"{minutes}m"
def relative_datetime_label(value: Any, tz_value: Any = None, now: datetime | None = None) -> str:
dt = parse_dt(value)
if not dt:
return ""
tz = parse_timezone(tz_value or "UTC+03:00")
local_dt = dt.astimezone(tz)
local_now = (now or now_utc()).astimezone(tz)
day_delta = (local_dt.date() - local_now.date()).days
time_text = local_dt.strftime("%H:%M")
if day_delta == -1:
return f"yesterday {time_text}"
if day_delta == 0:
return f"today {time_text}"
if day_delta == 1:
return f"tomorrow {time_text}"
return local_dt.strftime("%Y-%m-%d %H:%M")
def strip_html(text: str) -> str:
text = re.sub(r"
", "\n", text or "", flags=re.I)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text).strip()
def compact(text: str, limit: int = 700) -> str:
text = re.sub(r"\s+", " ", strip_html(text)).strip()
if len(text) <= limit:
return text
return text[: limit - 1].rstrip() + "…"
def clean_lastfm_text(text: str) -> str:
text = re.sub(r"Read more on Last\.fm.*$", "", text or "", flags=re.I | re.S)
text = re.sub(r"User-contributed text.*$", "", text, flags=re.I | re.S)
return re.sub(r"\s+", " ", strip_html(text)).strip()
def parse_int(value: Any) -> int:
try:
return int(str(value or "0").replace(",", "").strip() or 0)
except Exception:
return 0
def init_state() -> None:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS sent_events (
kind TEXT NOT NULL,
event_key TEXT NOT NULL,
payload_json TEXT,
created_at TEXT NOT NULL,
PRIMARY KEY(kind, event_key)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cursors (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT NOT NULL
)
"""
)
conn.commit()
def already_sent(kind: str, key: str) -> bool:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
row = conn.execute(
"SELECT 1 FROM sent_events WHERE kind=? AND event_key=?",
(kind, key),
).fetchone()
return bool(row)
def mark_sent(kind: str, key: str, payload: dict | None = None) -> None:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
conn.execute(
"""
INSERT OR IGNORE INTO sent_events(kind,event_key,payload_json,created_at)
VALUES(?,?,?,?)
""",
(kind, key, json.dumps(payload or {}, ensure_ascii=False), now_utc().isoformat()),
)
conn.commit()
def get_cursor(key: str) -> str:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
row = conn.execute("SELECT value FROM cursors WHERE key=?", (key,)).fetchone()
return row[0] if row else ""
def set_cursor(key: str, value: str) -> None:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
conn.execute(
"""
INSERT INTO cursors(key,value,updated_at) VALUES(?,?,?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at
""",
(key, value, now_utc().isoformat()),
)
conn.commit()
def request_session() -> requests.Session:
sess = requests.Session()
proxy = (CFG.get("metadata") or {}).get("proxy") or ""
if proxy:
sess.proxies.update({"http": proxy, "https": proxy})
return sess
HTTP = request_session()
LASTFM_API_KEY_CACHE: str | None = None
class MatrixClient:
def __init__(self, cfg: dict):
self.homeserver = cfg["homeserver"].rstrip("/")
self.room_id = cfg["room_id"]
self.token = cfg["access_token"]
self.user_id = cfg.get("user_id") or ""
@property
def headers(self) -> dict:
return {"Authorization": f"Bearer {self.token}"}
def set_profile(self) -> None:
bot = CFG.get("bot") or {}
name = bot.get("name")
if name and self.user_id:
url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/displayname"
r = requests.put(url, headers=self.headers, json={"displayname": name}, timeout=20)
if not r.ok:
log(f"matrix display name update failed: {r.status_code} {r.text[:200]}")
avatar_path = bot.get("avatar_path") or ""
if avatar_path:
path = Path(avatar_path)
if path.exists() and self.user_id:
mxc = self.upload_file(path.read_bytes(), path.name, mimetypes.guess_type(path.name)[0] or "image/jpeg")
if mxc:
url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/avatar_url"
r = requests.put(url, headers=self.headers, json={"avatar_url": mxc}, timeout=20)
if not r.ok:
log(f"matrix avatar update failed: {r.status_code} {r.text[:200]}")
def upload_file(self, data: bytes, filename: str, content_type: str) -> str:
r = requests.post(
f"{self.homeserver}/_matrix/media/v3/upload",
headers={**self.headers, "Content-Type": content_type},
params={"filename": filename},
data=data,
timeout=45,
)
r.raise_for_status()
return r.json()["content_uri"]
def send_text(self, text: str, formatted_html: str | None = None) -> dict:
body = {"msgtype": "m.text", "body": text}
if formatted_html:
body["format"] = "org.matrix.custom.html"
body["formatted_body"] = formatted_html
txn = uuid.uuid4().hex
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}"
r = requests.put(url, headers=self.headers, json=body, timeout=30)
r.raise_for_status()
return r.json()
def send_image_bytes(self, data: bytes, filename: str, content_type: str = "image/jpeg") -> dict:
mxc = self.upload_file(data, filename, content_type)
body = {
"msgtype": "m.image",
"body": filename,
"url": mxc,
"info": {"mimetype": content_type, "size": len(data)},
}
txn = uuid.uuid4().hex
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}"
r = requests.put(url, headers=self.headers, json=body, timeout=30)
r.raise_for_status()
return r.json()
def send_card(self, text: str, formatted_html: str, image: tuple[bytes, str, str] | None = None) -> None:
if image:
try:
self.send_image_bytes(*image)
except Exception as e:
log(f"matrix image send failed: {e}")
self.send_text(text, formatted_html)
class NavidromeClient:
def __init__(self, cfg: dict):
self.base_url = cfg["url"].rstrip("/")
self.username = cfg["username"]
self.password = cfg["password"]
def _params(self) -> dict:
return {
"u": self.username,
"p": self.password,
"v": "1.16.1",
"c": "AoiNotifier",
"f": "json",
}
def get(self, endpoint: str, **params) -> dict:
r = requests.get(
f"{self.base_url}/rest/{endpoint}",
params={**self._params(), **params},
timeout=35,
)
r.raise_for_status()
data = r.json().get("subsonic-response", {})
if data.get("status") != "ok":
raise RuntimeError(f"Navidrome {endpoint} failed: {data}")
return data
def ping(self) -> bool:
try:
return self.get("ping").get("status") == "ok"
except Exception:
return False
def newest_albums(self, music_folder_id: int, size: int) -> list[dict]:
data = self.get("getAlbumList2", type="newest", size=size, musicFolderId=music_folder_id)
return data.get("albumList2", {}).get("album", []) or []
def albums_by_year(self, size: int, music_folder_id: int | None = None) -> list[dict]:
year = datetime.now(timezone.utc).year + 1
params = {"type": "byYear", "fromYear": year, "toYear": 1900, "size": size}
if music_folder_id:
params["musicFolderId"] = music_folder_id
data = self.get("getAlbumList2", **params)
return data.get("albumList2", {}).get("album", []) or []
def album(self, album_id: str) -> dict:
return self.get("getAlbum", id=album_id).get("album", {}) or {}
def playlists(self) -> list[dict]:
data = self.get("getPlaylists")
return data.get("playlists", {}).get("playlist", []) or []
def playlist(self, playlist_id: str) -> dict:
return self.get("getPlaylist", id=playlist_id).get("playlist", {}) or {}
def search3(self, query: str, song_count: int = 5, album_count: int = 0, artist_count: int = 0) -> dict:
return self.get(
"search3",
query=query,
songCount=song_count,
albumCount=album_count,
artistCount=artist_count,
)
def cover(self, cover_art: str | None) -> tuple[bytes, str, str] | None:
if not cover_art:
return None
try:
r = requests.get(
f"{self.base_url}/rest/getCoverArt",
params={**self._params(), "id": cover_art, "size": 900},
timeout=35,
)
r.raise_for_status()
ctype = r.headers.get("Content-Type") or "image/jpeg"
ext = mimetypes.guess_extension(ctype.split(";")[0]) or ".jpg"
return (r.content, f"cover{ext}", ctype)
except Exception as e:
log(f"cover fetch failed: {e}")
return None
MATRIX = MatrixClient(CFG["matrix"])
NAV = NavidromeClient(CFG["navidrome"])
def metadata_headers() -> dict:
ua = (CFG.get("metadata") or {}).get("user_agent") or "AoiNotifier/1.0"
return {"User-Agent": ua, "Accept": "application/json"}
def discovery_setting(key: str) -> str:
db_path = (CFG.get("discovery") or {}).get("db_path") or ""
if not db_path or not Path(db_path).exists():
return ""
try:
with sqlite3.connect(db_path) as conn:
row = conn.execute("SELECT value FROM user_settings WHERE key=?", (key,)).fetchone()
return str(row[0] or "") if row else ""
except Exception as e:
log(f"discovery setting lookup failed for {key}: {e}")
return ""
def lastfm_api_key() -> str:
global LASTFM_API_KEY_CACHE
if LASTFM_API_KEY_CACHE is not None:
return LASTFM_API_KEY_CACHE
LASTFM_API_KEY_CACHE = (CFG.get("metadata") or {}).get("lastfm_api_key") or discovery_setting("lastfm_api_key")
return LASTFM_API_KEY_CACHE
def lastfm_request(method: str, params: dict, timeout: int = 25) -> dict:
api_key = lastfm_api_key()
if not api_key:
return {}
req_params = {
"method": method,
"api_key": api_key,
"format": "json",
"autocorrect": 1,
**params,
}
try:
r = HTTP.get("https://ws.audioscrobbler.com/2.0/", params=req_params, timeout=timeout)
if not r.ok:
return {}
return r.json()
except Exception as e:
log(f"lastfm {method} failed: {e}")
return {}
def lastfm_album_tags(artist: str, album: str, limit: int = 6) -> list[str]:
data = lastfm_request("album.getTopTags", {"artist": artist, "album": album}, timeout=20)
tags = []
for tag in ((data.get("toptags") or {}).get("tag") or [])[:limit]:
name = (tag.get("name") or "").strip()
if name and len(name) <= 32 and not name.isdigit():
tags.append(name)
return tags
def lastfm_artist_summary(artist: str) -> str:
for params in ({"artist": artist, "lang": "ru"}, {"artist": artist}):
data = lastfm_request("artist.getinfo", params, timeout=20)
bio = ((data.get("artist") or {}).get("bio") or {})
summary = clean_lastfm_text(bio.get("summary") or bio.get("content") or "")
if summary:
return summary
return ""
def lastfm_artist_popularity(artist: str) -> dict:
data = lastfm_request("artist.getinfo", {"artist": artist}, timeout=18)
artist_data = data.get("artist") or {}
stats = artist_data.get("stats") or {}
listeners = parse_int(stats.get("listeners"))
playcount = parse_int(stats.get("playcount"))
mbid = artist_data.get("mbid") or ""
if listeners or playcount or mbid:
return {"listeners": listeners, "playcount": playcount, "mbid": mbid}
data = lastfm_request("artist.search", {"artist": artist, "limit": 1}, timeout=18)
matches = ((data.get("results") or {}).get("artistmatches") or {}).get("artist") or []
if isinstance(matches, dict):
matches = [matches]
match = matches[0] if matches else {}
return {
"listeners": parse_int(match.get("listeners")),
"playcount": parse_int(match.get("playcount")),
"mbid": match.get("mbid") or "",
}
def listenbrainz_headers() -> dict:
token = (CFG.get("metadata") or {}).get("listenbrainz_token") or discovery_setting("lb_token")
headers = metadata_headers()
if token:
headers["Authorization"] = f"Token {token}"
return headers
def listenbrainz_artist_popularity(artist_mbid: str) -> dict:
if not artist_mbid:
return {"listen_count": 0}
try:
r = HTTP.get(
f"https://api.listenbrainz.org/1/popularity/top-recordings-for-artist/{artist_mbid}",
params={"count": 25},
headers=listenbrainz_headers(),
timeout=25,
)
if not r.ok:
return {"listen_count": 0}
data = r.json()
if isinstance(data, dict):
items = data.get("payload") or []
elif isinstance(data, list):
items = data
else:
items = []
return {"listen_count": sum(parse_int(item.get("listen_count")) for item in items if isinstance(item, dict))}
except Exception as e:
log(f"listenbrainz artist popularity failed: {e}")
return {"listen_count": 0}
def log10_signal(value: Any) -> float:
import math
return math.log10(max(parse_int(value), 0) + 1)
def artist_signal_score(track_count: int, lastfm: dict, listenbrainz: dict) -> float:
return (
track_count * 2.0
+ log10_signal(lastfm.get("listeners")) * 2.5
+ log10_signal(lastfm.get("playcount")) * 1.2
+ log10_signal(listenbrainz.get("listen_count")) * 2.0
)
def lastfm_album_search_url(artist: str, album: str) -> str:
return f"https://www.last.fm/search/albums?q={quote_plus(f'{artist} {album}')}"
def bandcamp_album_search_url(artist: str, album: str) -> str:
return f"https://bandcamp.com/search?q={quote_plus(f'{artist} {album}')}&item_type=a"
def album_source_links(artist: str, album: str, lastfm_url: str = "") -> list[dict[str, str]]:
return [
{"label": "Last.fm", "url": lastfm_url or lastfm_album_search_url(artist, album)},
{"label": "Bandcamp", "url": bandcamp_album_search_url(artist, album)},
]
def lastfm_album_info(artist: str, album: str) -> dict[str, str]:
for params in ({"artist": artist, "album": album, "lang": "ru"}, {"artist": artist, "album": album}):
data = lastfm_request("album.getinfo", params)
album_data = data.get("album") or {}
wiki = album_data.get("wiki") or {}
summary = clean_lastfm_text(wiki.get("summary") or wiki.get("content") or "")
if summary:
return {"summary": summary, "url": album_data.get("url") or ""}
tags = lastfm_album_tags(artist, album)
artist_summary = lastfm_artist_summary(artist)
parts = []
if tags:
parts.append("Last.fm описывает релиз тегами: " + ", ".join(tags) + ".")
if artist_summary:
parts.append(artist_summary)
if parts:
return {"summary": " ".join(parts), "url": ""}
return {"summary": "", "url": ""}
def lastfm_album_summary(artist: str, album: str) -> str:
return lastfm_album_info(artist, album).get("summary", "")
def lastfm_album_image(artist: str, album: str) -> str:
data = lastfm_request("album.getinfo", {"artist": artist, "album": album}, timeout=20)
images = (data.get("album") or {}).get("image") or []
for image in reversed(images):
url = image.get("#text") or ""
if url:
return url
return ""
def musicbrainz_release_hint(artist: str, album: str) -> dict:
if not (CFG.get("metadata") or {}).get("musicbrainz_enabled", True):
return {}
try:
query = f'artist:"{artist}" AND release:"{album}"'
r = HTTP.get(
"https://musicbrainz.org/ws/2/release/",
params={"query": query, "fmt": "json", "limit": 3},
headers=metadata_headers(),
timeout=25,
)
if not r.ok:
return {}
releases = r.json().get("releases") or []
if not releases:
return {}
rel = releases[0]
return {
"date": rel.get("date") or "",
"country": rel.get("country") or "",
"status": rel.get("status") or "",
"score": rel.get("score") or 0,
}
except Exception as e:
log(f"musicbrainz lookup failed: {e}")
return {}
def lastfm_tags_for_tracks(tracks: list[dict]) -> Counter:
api_key = lastfm_api_key()
tags = Counter()
if not api_key:
return tags
for tr in tracks[:8]:
try:
r = HTTP.get(
"https://ws.audioscrobbler.com/2.0/",
params={
"method": "track.getTopTags",
"artist": tr.get("artist") or "",
"track": tr.get("track") or tr.get("title") or "",
"api_key": api_key,
"format": "json",
"autocorrect": 1,
},
timeout=20,
)
if not r.ok:
continue
for tag in ((r.json().get("toptags") or {}).get("tag") or [])[:5]:
name = (tag.get("name") or "").lower().strip()
if name and len(name) <= 32:
tags[name] += 1
except Exception:
continue
return tags
def ranked_playlist_artists(tracks: list[dict], limit: int = 5) -> list[dict]:
grouped: dict[str, dict] = {}
for track in tracks:
artist = (track.get("artist") or "").strip()
if not artist:
continue
key = re.sub(r"\s+", " ", artist.lower()).strip()
item = grouped.setdefault(key, {"name": artist, "track_count": 0, "mbids": set()})
item["track_count"] += 1
if track.get("mbid_artist"):
item["mbids"].add(track.get("mbid_artist"))
ranked = []
now = time.time()
for key, item in grouped.items():
cached = ARTIST_SIGNAL_CACHE.get(key)
if cached and now - cached[0] < ARTIST_SIGNAL_TTL_SECONDS:
signals = cached[1]
else:
lastfm = lastfm_artist_popularity(item["name"])
mbid = next(iter(item["mbids"]), "") or lastfm.get("mbid") or ""
listenbrainz = listenbrainz_artist_popularity(mbid)
signals = {"lastfm": lastfm, "listenbrainz": listenbrainz}
ARTIST_SIGNAL_CACHE[key] = (now, signals)
ranked.append({
"name": item["name"],
"track_count": item["track_count"],
"score": artist_signal_score(item["track_count"], signals["lastfm"], signals["listenbrainz"]),
"lastfm_listeners": signals["lastfm"].get("listeners", 0),
"lastfm_playcount": signals["lastfm"].get("playcount", 0),
"listenbrainz_listens": signals["listenbrainz"].get("listen_count", 0),
})
ranked.sort(key=lambda x: x["score"], reverse=True)
return ranked[:limit]
def album_plain_and_html(album: dict, library_label: str, songs: list[dict]) -> tuple[str, str]:
artist = album.get("artist") or "Unknown artist"
title = album.get("name") or album.get("album") or "Unknown album"
year = album.get("year") or ""
duration = int(album.get("duration") or sum(int(s.get("duration") or 0) for s in songs) or 0)
minutes = duration // 60 if duration else 0
lastfm_info = lastfm_album_info(artist, title)
summary = lastfm_info.get("summary") or ""
links = album_source_links(artist, title, lastfm_info.get("url") or "")
hint = musicbrainz_release_hint(artist, title)
release_date = album.get("created") or album.get("starred") or hint.get("date") or ""
track_lines = []
for idx, song in enumerate(songs[:30], 1):
name = song.get("title") or "Untitled"
dur = int(song.get("duration") or 0)
suffix = f" ({dur // 60}:{dur % 60:02d})" if dur else ""
track_lines.append(f"{idx:02d}. {name}{suffix}")
if len(songs) > 30:
track_lines.append(f"... и еще {len(songs) - 30}")
header = f"🎧 Новый релиз в Navidrome / {library_label}"
facts = [
f"{artist} — {title}" + (f" ({year})" if year else ""),
f"Треков: {len(songs)}" + (f", длительность: {minutes} мин." if minutes else ""),
]
if release_date:
facts.append(f"Дата/добавление: {release_date}")
if hint.get("country") or hint.get("status"):
facts.append("MusicBrainz: " + ", ".join(x for x in [hint.get("country"), hint.get("status")] if x))
if summary:
description_fact = f"Описание: {summary}"
else:
description_fact = "Описание: пока не нашла нормальное описание, оставляю чистый треклист."
facts.append(description_fact)
facts.append("Ссылки: " + ", ".join(f"{link['label']}: {link['url']}" for link in links))
plain = "\n".join([header, "", *facts, "", "━━ ТРЕКЛИСТ ━━", *track_lines])
html_parts = [
f"
{html.escape(header)}
",
f"{html.escape(artist)} — {html.escape(title)}" + (f" ({html.escape(str(year))})" if year else ""),
"",
f"- Треков: {len(songs)}" + (f", длительность: {minutes} мин." if minutes else "") + "
",
]
if release_date:
html_parts.append(f"- Дата/добавление: {html.escape(str(release_date))}
")
if hint.get("country") or hint.get("status"):
html_parts.append(f"- MusicBrainz: {html.escape(', '.join(x for x in [hint.get('country'), hint.get('status')] if x))}
")
html_parts.append("
")
html_parts.append(f"{html.escape(description_fact)}
")
html_parts.append(
"Ссылки: "
+ ", ".join(
f'{html.escape(link["label"])}'
for link in links
)
+ "
"
)
html_parts.append("Треклист
")
for line in track_lines:
html_parts.append(f"- {html.escape(line[4:] if re.match(r'^\\d\\d\\. ', line) else line)}
")
html_parts.append("
")
return plain, "\n".join(html_parts)
def poll_navidrome_albums(force: bool = False) -> dict:
cfg = CFG["navidrome"]
libraries = [
("main", int(cfg.get("main_library_id") or 0), "main"),
("anime", int(cfg.get("anime_library_id") or 0), "anime"),
]
size = int(cfg.get("album_poll_size") or 40)
baseline = (CFG.get("polling") or {}).get("baseline_existing_on_first_run", True)
first_run = not get_cursor("navidrome_albums_baselined")
sent = 0
seen = 0
for kind, folder_id, label in libraries:
if not folder_id:
continue
for album in NAV.newest_albums(folder_id, size):
album_id = str(album.get("id") or "")
if not album_id:
continue
key = f"{kind}:{album_id}"
if already_sent("navidrome_album", key):
continue
seen += 1
if first_run and baseline and not force:
mark_sent("navidrome_album", key, album)
continue
full_album = NAV.album(album_id)
songs = full_album.get("song") or []
duplicate_source = matching_downloaded_library_album(full_album or album)
if duplicate_source and not force:
mark_sent(
"navidrome_album",
key,
{
"album": album,
"sent_at": now_utc().isoformat(),
"skipped_duplicate_kind": "liked_library_album",
"downloaded_album": duplicate_source,
},
)
continue
plain, formatted = album_plain_and_html(full_album or album, label, songs)
image = NAV.cover((full_album or album).get("coverArt"))
MATRIX.send_card(plain, formatted, image)
mark_sent("navidrome_album", key, {"album": album, "sent_at": now_utc().isoformat()})
sent += 1
time.sleep(1)
set_cursor("navidrome_albums_baselined", now_utc().isoformat())
return {"ok": True, "seen_new": seen, "sent": sent, "first_run": first_run}
def _loose_text_key(value: Any) -> str:
text = str(value or "").casefold()
text = text.replace("’", "'").replace("`", "'")
return re.sub(r"[^a-z0-9а-яё]+", "", text, flags=re.I)
def find_navidrome_album(artist: str, album: str) -> dict:
artist_key = _loose_text_key(artist)
album_key = _loose_text_key(album)
if not artist_key or not album_key:
return {}
try:
results = NAV.search3(f"{artist} {album}", song_count=0, album_count=8)
albums = (results.get("searchResult3") or {}).get("album") or []
matches = []
for item in albums:
item_artist_key = _loose_text_key(item.get("artist") or "")
item_album_key = _loose_text_key(item.get("name") or item.get("album") or "")
if (
item.get("id")
and (artist_key in item_artist_key or item_artist_key in artist_key)
and (album_key in item_album_key or item_album_key in album_key)
):
try:
full = NAV.album(str(item.get("id")))
song_count = len(full.get("song") or [])
merged = {**item, **full, "_song_count": song_count}
matches.append(merged)
except Exception:
item["_song_count"] = int(item.get("songCount") or 0)
matches.append(item)
if matches:
matches.sort(key=lambda item: int(item.get("_song_count") or item.get("songCount") or 0), reverse=True)
return matches[0]
except Exception as e:
log(f"liked album Navidrome search failed for {artist} - {album}: {e}")
return {}
def downloaded_library_album_rows(limit: int = 50) -> list[dict]:
conn = discovery_conn()
if not conn:
return []
try:
rows = conn.execute(
"""
SELECT id, playlist_id, artist, album, generation_id, moved_to_library, created_at
FROM downloaded_albums
WHERE moved_to_library=1
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def matching_downloaded_library_album(album: dict, limit: int = 80) -> dict:
artist_key = _loose_text_key(album.get("artist") or "")
album_key = _loose_text_key(album.get("name") or album.get("album") or "")
if not artist_key or not album_key:
return {}
for row in downloaded_library_album_rows(limit):
row_artist_key = _loose_text_key(row.get("artist") or "")
row_album_key = _loose_text_key(row.get("album") or "")
if artist_key == row_artist_key and album_key == row_album_key:
return row
return {}
def suppress_navidrome_album_notification(album_id: Any, source_row: dict) -> None:
album_id = str(album_id or "").strip()
if not album_id:
return
payload = {
"album": source_row,
"sent_at": now_utc().isoformat(),
"skipped_duplicate_kind": "liked_library_album",
}
for kind in ("main", "anime"):
key = f"{kind}:{album_id}"
if not already_sent("navidrome_album", key):
mark_sent("navidrome_album", key, payload)
def liked_album_plain_and_html(row: dict, full_album: dict, songs: list[dict]) -> tuple[str, str]:
album = dict(full_album or {})
album.setdefault("artist", row.get("artist") or "Unknown artist")
album.setdefault("name", row.get("album") or "Unknown album")
plain, formatted = album_plain_and_html(album, "liked", songs)
plain = plain.replace("Новый релиз в Navidrome / liked", "Лайкнутый альбом добавлен в библиотеку", 1)
formatted = formatted.replace("Новый релиз в Navidrome / liked", "Лайкнутый альбом добавлен в библиотеку", 1)
return plain, formatted
def poll_liked_library_albums(force: bool = False) -> dict:
polling_cfg = CFG.get("polling") or {}
baseline = polling_cfg.get("baseline_existing_on_first_run", True)
first_run = not get_cursor("liked_library_albums_baselined")
sent = 0
seen = 0
waiting = 0
for row in downloaded_library_album_rows(int(polling_cfg.get("liked_album_poll_size") or 50)):
key = f"{row.get('id')}:{row.get('created_at')}"
if already_sent("liked_library_album", key):
continue
seen += 1
if first_run and baseline and not force:
mark_sent("liked_library_album", key, row)
continue
artist = row.get("artist") or ""
album = row.get("album") or ""
nav_album = find_navidrome_album(artist, album)
if not nav_album:
waiting += 1
log(f"liked album {artist!r} - {album!r} not visible in Navidrome yet — deferring")
continue
full_album = nav_album if nav_album.get("song") else NAV.album(str(nav_album.get("id")))
songs = full_album.get("song") or []
plain, formatted = liked_album_plain_and_html(row, full_album or nav_album, songs)
image = NAV.cover((full_album or nav_album).get("coverArt"))
MATRIX.send_card(plain, formatted, image)
suppress_navidrome_album_notification(nav_album.get("id"), row)
mark_sent(
"liked_library_album",
key,
{
"album": row,
"sent_at": now_utc().isoformat(),
"navidrome_id": nav_album.get("id"),
"song_count": len(songs),
},
)
sent += 1
time.sleep(1)
set_cursor("liked_library_albums_baselined", now_utc().isoformat())
return {"ok": True, "seen_new": seen, "sent": sent, "waiting": waiting, "first_run": first_run}
def discovery_conn() -> sqlite3.Connection | None:
db_path = (CFG.get("discovery") or {}).get("db_path") or ""
if not db_path or not Path(db_path).exists():
log(f"discovery db not found: {db_path}")
return None
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def playlist_rows(limit: int = 20) -> list[sqlite3.Row]:
conn = discovery_conn()
if not conn:
return []
try:
return conn.execute(
"""
SELECT * FROM playlists
ORDER BY COALESCE(last_generated, created_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
finally:
conn.close()
def playlist_tracks(playlist_id: int) -> list[dict]:
conn = discovery_conn()
if not conn:
return []
try:
rows = conn.execute(
"""
SELECT * FROM playlist_tracks
WHERE playlist_id=?
ORDER BY score DESC, added_at DESC
""",
(playlist_id,),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def playlist_download_stats(playlist_id: int) -> dict:
conn = discovery_conn()
if not conn:
return {"total": 0, "done": 0, "final": 0, "running": False, "done_track_ids": set()}
try:
playlist = conn.execute(
"SELECT download_running FROM playlists WHERE id=?",
(playlist_id,),
).fetchone()
rows = conn.execute(
"""
SELECT status, COUNT(*) AS count
FROM download_status
WHERE playlist_id=?
GROUP BY status
""",
(playlist_id,),
).fetchall()
done_rows = conn.execute(
"""
SELECT track_id
FROM download_status
WHERE playlist_id=? AND status='done' AND track_id IS NOT NULL
""",
(playlist_id,),
).fetchall()
counts = {str(row["status"]): int(row["count"]) for row in rows}
final = sum(counts.get(status, 0) for status in ("done", "error", "skipped", "failed"))
total = sum(counts.values())
return {
"total": total,
"done": counts.get("done", 0),
"error": counts.get("error", 0),
"skipped": counts.get("skipped", 0),
"failed": counts.get("failed", 0),
"final": final,
"running": bool(playlist["download_running"]) if playlist else False,
"done_track_ids": {int(row["track_id"]) for row in done_rows if row["track_id"] is not None},
}
finally:
conn.close()
def navidrome_find_playlist(name: str) -> dict | None:
"""Return Navidrome playlist (basic info from getPlaylists) whose name matches `name` exactly, case-insensitive."""
try:
needle = (name or "").strip().lower()
if not needle:
return None
for pl in NAV.playlists():
if (pl.get("name") or "").strip().lower() == needle:
return pl
except Exception as e:
log(f"navidrome lookup failed for {name!r}: {e}")
return None
def navidrome_playlist_cover_for(nav_pl: dict) -> tuple[bytes, str, str] | None:
try:
full = NAV.playlist(str(nav_pl.get("id")))
if full.get("coverArt"):
return NAV.cover(full.get("coverArt"))
entries = full.get("entry") or []
if entries:
return NAV.cover(entries[0].get("coverArt"))
except Exception as e:
log(f"navidrome playlist cover failed: {e}")
return None
def navidrome_playlist_cover(name: str) -> tuple[bytes, str, str] | None:
pl = navidrome_find_playlist(name)
if not pl:
return None
return navidrome_playlist_cover_for(pl)
def discovery_playlist_ready(pl: dict, nav_pl: dict | None, nav_min_tracks: int) -> tuple[bool, dict]:
stats = playlist_download_stats(int(pl["id"]))
if stats["running"]:
return False, {**stats, "reason": "download_running", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)}
if stats["total"] <= 0 or stats["final"] < stats["total"]:
return False, {**stats, "reason": "download_not_final", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)}
if stats["done"] <= 0:
return False, {**stats, "reason": "nothing_downloaded", "navidrome_tracks": int((nav_pl or {}).get("songCount") or 0)}
nav_song_count = int((nav_pl or {}).get("songCount") or 0)
required = max(nav_min_tracks, stats["done"])
if not nav_pl or nav_song_count < required:
return False, {**stats, "reason": "not_in_navidrome", "navidrome_tracks": nav_song_count, "required_navidrome_tracks": required}
return True, {**stats, "reason": "ready", "navidrome_tracks": nav_song_count, "required_navidrome_tracks": required}
PROFILE_LABELS = {
"deep_cuts": "глубокие находки",
"fresh_picks": "свежие рекомендации",
"comfort_zone": "комфортная зона",
}
def playlist_plain_and_html(pl: dict, tracks: list[dict], readiness: dict | None = None) -> tuple[str, str]:
name = pl.get("name") or "Discovery"
created = pl.get("last_generated") or pl.get("created_at") or ""
top_tracks = tracks[:8]
top_lines = [f"• {t.get('artist')} — {t.get('track')}" for t in top_tracks]
profile_counts = Counter((t.get("profile") or "").strip() for t in tracks if t.get("profile"))
profile_text = ", ".join(
f"{PROFILE_LABELS.get(k, k)}: {v}" for k, v in profile_counts.most_common()
) or "смешанный режим"
tags = lastfm_tags_for_tracks(tracks)
tag_text = ", ".join(tag for tag, _ in tags.most_common(6))
if not tag_text:
tag_text = profile_text
ranked_artists = ranked_playlist_artists(tracks)
artist_text = ", ".join(a["name"] for a in ranked_artists)
download_text = ""
if readiness:
download_text = (
f"Скачано: {readiness.get('done', 0)}, "
f"в Navidrome: {readiness.get('navidrome_tracks', 0)}"
)
summary = (
f"В этот раз уклон: {tag_text}. "
f"Из заметного: {', '.join(line[2:] for line in top_lines[:4])}."
)
plain = "\n".join([
"🧭 Новый Discovery-плейлист",
"",
name,
f"Треков: {len(tracks)}",
download_text,
f"Собран: {created}",
f"Профиль: {profile_text}",
f"Главные артисты: {artist_text or 'пока не выделяются'}",
"",
summary,
"",
"━━ ИНТЕРЕСНОЕ ━━",
*top_lines,
])
formatted = "\n".join([
"🧭 Новый Discovery-плейлист
",
f"{html.escape(name)}",
"",
f"- Треков: {len(tracks)}
",
f"- {html.escape(download_text)}
" if download_text else "",
f"- Собран: {html.escape(str(created))}
",
f"- Профиль: {html.escape(profile_text)}
",
f"- Главные артисты: {html.escape(artist_text or 'пока не выделяются')}
",
"
",
f"{html.escape(summary)}
",
"Интересное
",
*[f"- {html.escape(line[2:])}
" for line in top_lines],
"
",
])
return plain, formatted
def poll_discovery_playlists(force: bool = False) -> dict:
polling_cfg = CFG.get("polling") or {}
baseline = polling_cfg.get("baseline_existing_on_first_run", True)
nav_min_tracks = int(polling_cfg.get("discovery_min_tracks_in_navidrome", 1))
stale_after_hours = float(polling_cfg.get("discovery_stale_after_hours", 48))
first_run = not get_cursor("discovery_playlists_baselined")
sent = 0
seen = 0
waiting = 0
for row in playlist_rows():
pl = dict(row)
stamp = pl.get("last_generated") or pl.get("created_at") or ""
if not stamp:
continue
key = f"{pl.get('id')}:{stamp}:{pl.get('iteration_number', 0)}"
if already_sent("discovery_playlist", key):
continue
seen += 1
name = pl.get("name") or ""
nav_pl = navidrome_find_playlist(name)
ready, readiness = discovery_playlist_ready(pl, nav_pl, nav_min_tracks)
nav_song_count = int(readiness.get("navidrome_tracks") or 0)
if not ready:
# Not yet downloaded or visible in Navidrome — defer notification
# until the playlist is actually playable. Do NOT mark_sent so the
# next poll re-checks.
stamp_dt = parse_dt(stamp)
age_hours = ((now_utc() - stamp_dt).total_seconds() / 3600) if stamp_dt else 0.0
if age_hours > stale_after_hours:
log(
f"discovery_playlist {name!r} stale: age={age_hours:.1f}h, "
f"reason={readiness.get('reason')} navidrome_present={bool(nav_pl)}, "
f"nav_songs={nav_song_count} done={readiness.get('done', 0)} — "
f"marking sent without notification (will not retry)"
)
mark_sent("discovery_playlist", key, {
"playlist": pl,
"readiness": {k: v for k, v in readiness.items() if k != "done_track_ids"},
"skipped_stale": True,
})
else:
waiting += 1
log(
f"discovery_playlist {name!r} not ready for notification "
f"(reason={readiness.get('reason')} nav_present={bool(nav_pl)} "
f"nav_songs={nav_song_count} done={readiness.get('done', 0)} "
f"need>={readiness.get('required_navidrome_tracks', nav_min_tracks)} "
f"age={age_hours:.1f}h) — deferring"
)
continue
if first_run and baseline and not force:
mark_sent("discovery_playlist", key, {
"playlist": pl,
"readiness": {k: v for k, v in readiness.items() if k != "done_track_ids"},
"baselined_at": now_utc().isoformat(),
})
continue
tracks = playlist_tracks(int(pl["id"]))
done_track_ids = readiness.get("done_track_ids") or set()
ready_tracks = [track for track in tracks if int(track.get("id") or 0) in done_track_ids] or tracks
plain, formatted = playlist_plain_and_html(pl, ready_tracks, readiness)
image = navidrome_playlist_cover_for(nav_pl)
MATRIX.send_card(plain, formatted, image)
mark_sent(
"discovery_playlist",
key,
{
"playlist": pl,
"sent_at": now_utc().isoformat(),
"navidrome_id": nav_pl.get("id"),
"navidrome_song_count": nav_song_count,
"readiness": {k: v for k, v in readiness.items() if k != "done_track_ids"},
},
)
sent += 1
time.sleep(1)
set_cursor("discovery_playlists_baselined", now_utc().isoformat())
return {
"ok": True,
"seen_new": seen,
"sent": sent,
"waiting": waiting,
"first_run": first_run,
}
def release_rows(limit: int = 120) -> list[dict]:
conn = discovery_conn()
if not conn:
return []
try:
rows = conn.execute(
"""
SELECT * FROM artist_release_watch
ORDER BY COALESCE(downloaded_at, first_seen_at) DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def release_plain_and_html(row: dict, mode: str) -> tuple[str, str]:
artist = row.get("artist") or "Unknown artist"
album = row.get("album") or "Unknown release"
date = row.get("release_date") or "дата уточняется"
source = row.get("source") or "discovery-playlist"
tracks = int(row.get("desired_track_count") or 0)
status = row.get("status") or ""
if mode == "out":
title = "📀 Релиз вышел"
lead = f"{artist} — {album} уже можно забирать."
else:
title = "📡 Анонс релиза отслеживаемого артиста"
lead = f"{artist} — {album} появился в release watch."
facts = [
lead,
f"Дата релиза: {date}",
f"Статус: {status}",
f"Источник: {source}",
]
if tracks:
facts.append(f"Ожидаемый треклист: {tracks} треков")
if row.get("error_message"):
facts.append(f"Заметка: {compact(row.get('error_message'), 240)}")
plain = "\n".join([title, "", *facts])
formatted = "\n".join([
f"{html.escape(title)}
",
f"{html.escape(artist)} — {html.escape(album)}
",
"",
*[f"- {html.escape(f)}
" for f in facts[1:]],
"
",
])
return plain, formatted
def release_is_out(row: dict) -> bool:
dt = parse_date(row.get("release_date"))
return bool(dt and dt.date() == now_utc().date())
def release_is_announcement(row: dict) -> bool:
dt = parse_date(row.get("release_date"))
return bool(dt and dt.date() > now_utc().date())
def poll_release_watch(force: bool = False) -> dict:
baseline = (CFG.get("polling") or {}).get("baseline_existing_on_first_run", True)
first_run = not get_cursor("release_watch_baselined")
announced = 0
out = 0
skipped = 0
for row in release_rows():
row_id = row.get("id")
first_seen = row.get("first_seen_at") or ""
announce_key = f"{row_id}:{first_seen}"
if release_is_announcement(row) and not already_sent("release_announcement", announce_key):
if first_run and baseline and not force:
mark_sent("release_announcement", announce_key, row)
else:
plain, formatted = release_plain_and_html(row, "announce")
MATRIX.send_card(plain, formatted, None)
mark_sent("release_announcement", announce_key, row)
announced += 1
time.sleep(1)
elif not release_is_announcement(row):
skipped += 1
if release_is_out(row):
out_stamp = row.get("downloaded_at") or row.get("release_date") or row.get("last_checked_at") or ""
out_key = f"{row_id}:{out_stamp}:{row.get('status')}"
if already_sent("release_out", out_key):
continue
if first_run and baseline and not force:
mark_sent("release_out", out_key, row)
else:
plain, formatted = release_plain_and_html(row, "out")
MATRIX.send_card(plain, formatted, None)
mark_sent("release_out", out_key, row)
out += 1
time.sleep(1)
set_cursor("release_watch_baselined", now_utc().isoformat())
return {"ok": True, "announced": announced, "out": out, "skipped_not_current_or_future": skipped, "first_run": first_run}
def loop_forever(name: str, interval: int, fn) -> None:
while True:
try:
result = fn()
log(f"{name}: {result}")
except Exception as e:
log(f"{name} failed: {e}\n{traceback.format_exc()}")
time.sleep(interval)
app = Flask(__name__)
@app.get("/healthz")
def healthz():
return jsonify({"ok": True})
def sent_event_rows(kind: str, limit: int) -> list[dict]:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
rows = conn.execute(
"""
SELECT payload_json, created_at
FROM sent_events
WHERE kind=?
ORDER BY created_at DESC
LIMIT ?
""",
(kind, limit),
).fetchall()
out = []
for payload_json, created_at in rows:
try:
payload = json.loads(payload_json or "{}")
except Exception:
payload = {}
out.append({"payload": payload, "created_at": created_at})
return out
def navidrome_cover_url(cover_art: Any) -> str:
cover_art = str(cover_art or "").strip()
return f"/covers/navidrome/{cover_art}" if cover_art else ""
def dashboard_album_item(album: dict, fallback_created: str = "") -> dict:
return {
"artist": album.get("artist") or album.get("displayArtist") or "Unknown artist",
"title": album.get("name") or album.get("album") or "Unknown album",
"year": album.get("year") or "",
"song_count": album.get("songCount") or album.get("song_count") or "",
"created": album.get("created") or fallback_created,
"cover_url": navidrome_cover_url(album.get("coverArt")),
}
def dashboard_music_folder_ids() -> list[int]:
cfg = CFG.get("navidrome") or {}
ids = []
for key in ("main_library_id", "anime_library_id"):
try:
value = int(cfg.get(key) or 0)
except Exception:
value = 0
if value and value not in ids:
ids.append(value)
return ids
def dashboard_allowed_album_ids(album_ids: list[str]) -> set[str]:
ids = [str(album_id) for album_id in album_ids if album_id]
if not ids:
return set()
library_ids = dashboard_music_folder_ids()
db_path = Path((CFG.get("navidrome") or {}).get("db_path") or "/storage-1/qbit/navidrome/navidrome.db")
if not db_path.exists():
return set(ids)
try:
placeholders = ",".join("?" for _ in ids)
library_placeholders = ",".join("?" for _ in library_ids)
with sqlite3.connect(db_path) as db:
rows = db.execute(
f"SELECT id FROM album WHERE id IN ({placeholders}) AND library_id IN ({library_placeholders})",
[*ids, *library_ids],
).fetchall()
return {row[0] for row in rows}
except Exception as e:
log(f"dashboard album library filter failed: {e}")
return set(ids)
def navidrome_db_path() -> Path:
return Path((CFG.get("navidrome") or {}).get("db_path") or "/storage-1/qbit/navidrome/navidrome.db")
def _album_year(row: sqlite3.Row) -> int | str:
for key in ("min_original_year", "max_year", "min_year"):
try:
value = int(row[key] or 0)
except Exception:
value = 0
if value:
return value
return ""
def _album_release_date_from_row(row: sqlite3.Row) -> str:
for key in ("original_date", "release_date", "date"):
value = str(row[key] or "").strip()
if value:
return value
year = _album_year(row)
return str(year) if year else ""
def _dashboard_album_from_db_row(row: sqlite3.Row) -> dict:
return {
"artist": row["album_artist"] or "Unknown artist",
"title": row["name"] or "Unknown album",
"year": _album_year(row),
"song_count": row["song_count"] or "",
"created": row["imported_at"] or row["created_at"] or "",
"cover_url": navidrome_cover_url(row["id"]),
}
def dashboard_recent_album_rows(limit: int) -> list[sqlite3.Row]:
db_path = navidrome_db_path()
if not db_path.exists():
return []
library_ids = dashboard_music_folder_ids()
placeholders = ",".join("?" for _ in library_ids)
with sqlite3.connect(db_path) as db:
db.row_factory = sqlite3.Row
return db.execute(
f"""
SELECT id, name, album_artist, song_count, library_id, imported_at, created_at,
min_year, max_year, min_original_year, original_date, release_date, date
FROM album
WHERE library_id IN ({placeholders})
ORDER BY datetime(NULLIF(imported_at, '0000-00-00 00:00:00')) DESC,
datetime(created_at) DESC
LIMIT ?
""",
[*library_ids, limit],
).fetchall()
def dashboard_release_album_rows(limit: int) -> list[sqlite3.Row]:
db_path = navidrome_db_path()
if not db_path.exists():
return []
library_ids = dashboard_music_folder_ids()
placeholders = ",".join("?" for _ in library_ids)
with sqlite3.connect(db_path) as db:
db.row_factory = sqlite3.Row
return db.execute(
f"""
SELECT id, name, album_artist, song_count, library_id, imported_at, created_at,
min_year, max_year, min_original_year, original_date, release_date, date
FROM album
WHERE library_id IN ({placeholders})
AND COALESCE(NULLIF(original_date, ''), NULLIF(release_date, ''), NULLIF(date, ''),
CASE
WHEN min_original_year > 0 THEN printf('%04d', min_original_year)
WHEN max_year > 0 THEN printf('%04d', max_year)
WHEN min_year > 0 THEN printf('%04d', min_year)
ELSE ''
END) != ''
ORDER BY COALESCE(NULLIF(original_date, ''), NULLIF(release_date, ''), NULLIF(date, ''),
CASE
WHEN min_original_year > 0 THEN printf('%04d', min_original_year)
WHEN max_year > 0 THEN printf('%04d', max_year)
WHEN min_year > 0 THEN printf('%04d', min_year)
ELSE ''
END) DESC,
datetime(NULLIF(imported_at, '0000-00-00 00:00:00')) DESC
LIMIT ?
""",
[*library_ids, limit],
).fetchall()
def _date_parts(value: dict) -> tuple[int, int, int]:
try:
year = int(value.get("year") or 0)
month = int(value.get("month") or 1)
day = int(value.get("day") or 1)
return year, month, day
except Exception:
return 0, 1, 1
def album_release_date(album: dict) -> str:
for key in ("originalReleaseDate", "releaseDate"):
parts = album.get(key) or {}
if not isinstance(parts, dict):
continue
year, month, day = _date_parts(parts)
if year:
if parts.get("month") and parts.get("day"):
return f"{year:04d}-{month:02d}-{day:02d}"
if parts.get("month"):
return f"{year:04d}-{month:02d}"
return f"{year:04d}"
if album.get("year"):
return str(album.get("year"))
return ""
def album_release_sort_key(album: dict) -> tuple[int, int, int, str]:
for key in ("originalReleaseDate", "releaseDate"):
parts = album.get(key) or {}
if isinstance(parts, dict):
year, month, day = _date_parts(parts)
if year:
return year, month, day, str(album.get("created") or "")
try:
year = int(album.get("year") or 0)
except Exception:
year = 0
return year, 1, 1, str(album.get("created") or "")
def dashboard_albums(limit: int) -> list[dict]:
try:
rows = dashboard_recent_album_rows(limit)
if rows:
return [_dashboard_album_from_db_row(row) for row in rows]
except Exception as e:
log(f"dashboard recent albums db query failed: {e}")
albums_by_id: dict[str, dict] = {}
size = max(limit * 4, 40)
for folder_id in dashboard_music_folder_ids():
try:
for album in NAV.newest_albums(folder_id, size):
album_id = str(album.get("id") or "")
if album_id:
albums_by_id[album_id] = album
except Exception as e:
log(f"dashboard newest albums failed for folder {folder_id}: {e}")
if albums_by_id:
allowed_ids = dashboard_allowed_album_ids(list(albums_by_id.keys()))
albums = [album for album_id, album in albums_by_id.items() if album_id in allowed_ids]
albums = sorted(albums, key=lambda item: item.get("created") or "", reverse=True)
return [dashboard_album_item(album) for album in albums[:limit]]
items = []
for row in sent_event_rows("navidrome_album", limit):
album = row["payload"].get("album") or row["payload"]
items.append(dashboard_album_item(album, row["created_at"]))
return items
def playlist_cover_art(nav_pl: dict) -> str:
cover_art = nav_pl.get("coverArt") or ""
if cover_art:
return cover_art
try:
full = NAV.playlist(str(nav_pl.get("id")))
cover_art = full.get("coverArt") or ""
if cover_art:
return cover_art
entries = full.get("entry") or []
if entries:
return entries[0].get("coverArt") or ""
except Exception as e:
if "playlist not found" not in str(e).lower():
log(f"dashboard playlist cover failed: {e}")
return ""
def playlist_track_cover_art(playlist_id: Any) -> str:
try:
for track in playlist_tracks(int(playlist_id))[:5]:
query = " ".join(str(v or "") for v in (track.get("artist"), track.get("track"))).strip()
if not query:
continue
results = NAV.search3(query, song_count=3)
songs = (results.get("searchResult3") or {}).get("song") or []
for song in songs:
if song.get("coverArt"):
return song.get("coverArt") or ""
except Exception as e:
log(f"dashboard playlist track cover failed: {e}")
return ""
def release_cover_art(artist: str, album: str) -> str:
cache_key = f"{artist}\0{album}".casefold()
if cache_key in RELEASE_COVER_CACHE:
return RELEASE_COVER_CACHE[cache_key]
try:
results = NAV.search3(f"{artist} {album}", song_count=0, album_count=3)
albums = (results.get("searchResult3") or {}).get("album") or []
artist_l = artist.casefold()
album_l = album.casefold()
for item in albums:
item_artist = str(item.get("artist") or "").casefold()
item_album = str(item.get("name") or "").casefold()
if item.get("coverArt") and (artist_l in item_artist or item_artist in artist_l) and (album_l in item_album or item_album in album_l):
RELEASE_COVER_CACHE[cache_key] = item.get("coverArt") or ""
return RELEASE_COVER_CACHE[cache_key]
for item in albums:
if item.get("coverArt"):
RELEASE_COVER_CACHE[cache_key] = item.get("coverArt") or ""
return RELEASE_COVER_CACHE[cache_key]
except Exception as e:
log(f"dashboard release cover failed: {e}")
RELEASE_COVER_CACHE[cache_key] = ""
return ""
def playlist_next_generation(playlist: dict) -> datetime | None:
now = now_utc()
time_text = str(playlist.get("generation_time") or "").strip()
days_raw = playlist.get("generation_days")
days: list[int] = []
if days_raw:
try:
parsed = json.loads(days_raw) if isinstance(days_raw, str) else days_raw
days = [int(day) for day in parsed]
except Exception:
days = []
if time_text and days:
tz = parse_timezone(playlist.get("generation_timezone"))
local_now = now.astimezone(tz)
try:
hour, minute = [int(part) for part in time_text.split(":", 1)]
except Exception:
hour = minute = -1
if hour >= 0 and minute >= 0:
for offset in range(0, 14):
candidate_date = local_now.date() + timedelta(days=offset)
candidate = datetime(candidate_date.year, candidate_date.month, candidate_date.day, hour, minute, tzinfo=tz)
# discovery-playlist stores Python weekday indices: Monday=0, Sunday=6.
if candidate.weekday() not in days:
continue
if candidate > local_now:
return candidate.astimezone(timezone.utc)
interval = playlist.get("generation_interval_minutes")
last_generated = parse_dt(playlist.get("last_generated")) or parse_dt(playlist.get("created_at"))
if interval and last_generated:
try:
target = last_generated + timedelta(minutes=int(interval))
while target <= now:
target += timedelta(minutes=int(interval))
return target
except Exception:
pass
return None
def dashboard_playlists(limit: int) -> list[dict]:
items = []
seen_names = set()
for playlist_row in playlist_rows(limit):
playlist = dict(playlist_row)
name = playlist.get("name") or "Discovery"
name_key = name.casefold()
if name_key in seen_names:
continue
seen_names.add(name_key)
cover_art = ""
nav_pl = navidrome_find_playlist(name)
if nav_pl:
cover_art = playlist_cover_art(nav_pl)
if not cover_art:
cover_art = playlist_track_cover_art(playlist.get("id"))
generated = playlist.get("last_generated") or playlist.get("created_at") or ""
next_generation = playlist_next_generation(playlist)
tz_value = playlist.get("generation_timezone") or "UTC+03:00"
items.append(
{
"name": name,
"track_count": playlist.get("track_count") or playlist.get("songCount") or "",
"generated": generated,
"generated_label": relative_datetime_label(generated, tz_value),
"next_generation": next_generation.isoformat() if next_generation else "",
"next_generation_label": relative_datetime_label(next_generation, tz_value) if next_generation else "",
"next_generation_in": human_delta(next_generation) if next_generation else "",
"cover_url": navidrome_cover_url(cover_art),
}
)
if len(items) >= limit:
return items
for row in sent_event_rows("discovery_playlist", limit * 3):
payload = row["payload"]
playlist = payload.get("playlist") or payload
name = playlist.get("name") or "Discovery"
name_key = name.casefold()
if name_key in seen_names:
continue
seen_names.add(name_key)
navidrome_id = payload.get("navidrome_id") or ""
cover_art = ""
if navidrome_id:
cover_art = playlist_cover_art({"id": navidrome_id})
if not cover_art:
nav_pl = navidrome_find_playlist(name)
if nav_pl:
cover_art = playlist_cover_art(nav_pl)
next_generation = playlist_next_generation(playlist)
tz_value = playlist.get("generation_timezone") or "UTC+03:00"
generated = playlist.get("last_generated") or playlist.get("created_at") or row["created_at"]
items.append(
{
"name": name,
"track_count": playlist.get("track_count") or playlist.get("songCount") or "",
"generated": generated,
"generated_label": relative_datetime_label(generated, tz_value),
"next_generation": next_generation.isoformat() if next_generation else "",
"next_generation_label": relative_datetime_label(next_generation, tz_value) if next_generation else "",
"next_generation_in": human_delta(next_generation) if next_generation else "",
"cover_url": navidrome_cover_url(cover_art),
}
)
if len(items) >= limit:
break
return items
def dashboard_releases(limit: int) -> list[dict]:
try:
rows = dashboard_release_album_rows(limit)
if rows:
return [
{
"artist": row["album_artist"] or "Unknown artist",
"title": row["name"] or "Unknown release",
"release_date": _album_release_date_from_row(row),
"status": "in library",
"source": "navidrome",
"cover_url": navidrome_cover_url(row["id"]),
}
for row in rows
]
except Exception as e:
log(f"dashboard releases db query failed: {e}")
albums_by_id: dict[str, dict] = {}
size = max(limit * 10, 120)
for folder_id in dashboard_music_folder_ids():
try:
for album in NAV.albums_by_year(size, folder_id):
album_id = str(album.get("id") or "")
if album_id:
albums_by_id[album_id] = album
except Exception as e:
log(f"dashboard library releases failed for folder {folder_id}: {e}")
if albums_by_id:
allowed_ids = dashboard_allowed_album_ids(list(albums_by_id.keys()))
albums = [album for album_id, album in albums_by_id.items() if album_id in allowed_ids]
albums = sorted(albums, key=album_release_sort_key, reverse=True)
items = []
for album in albums[:limit]:
items.append(
{
"artist": album.get("artist") or album.get("displayArtist") or "Unknown artist",
"title": album.get("name") or "Unknown release",
"release_date": album_release_date(album),
"status": "in library",
"source": "navidrome",
"cover_url": navidrome_cover_url(album.get("coverArt")),
}
)
return items
items = []
seen = set()
for row in sent_event_rows("release_out", limit * 2):
release = row["payload"]
artist = release.get("artist") or "Unknown artist"
album = release.get("album") or "Unknown release"
key = (artist.lower(), album.lower())
if key in seen:
continue
seen.add(key)
cover_art = release_cover_art(artist, album)
items.append(
{
"artist": artist,
"title": album,
"release_date": release.get("release_date") or "",
"status": release.get("status") or "",
"source": release.get("source") or "",
"cover_url": navidrome_cover_url(cover_art),
}
)
if len(items) >= limit:
break
return items
@app.get("/dashboard")
def dashboard():
album_limit = min(max(int(request.args.get("album_limit", 8)), 1), 24)
playlist_limit = min(max(int(request.args.get("playlist_limit", 8)), 1), 24)
release_limit = min(max(int(request.args.get("release_limit", 10)), 1), 24)
cache_key = f"{album_limit}:{playlist_limit}:{release_limit}"
now_ts = time.monotonic()
with DASHBOARD_LOCK:
cached = DASHBOARD_CACHE.get(cache_key)
if cached and now_ts - cached[0] < DASHBOARD_CACHE_SECONDS:
return jsonify(cached[1])
payload = {
"ok": True,
"albums": dashboard_albums(album_limit),
"playlists": dashboard_playlists(playlist_limit),
"releases": dashboard_releases(release_limit),
}
DASHBOARD_CACHE[cache_key] = (time.monotonic(), payload)
return jsonify(payload)
@app.get("/covers/navidrome/")
def navidrome_cover_proxy(cover_art: str):
image = NAV.cover(cover_art)
if not image:
return Response(status=404)
data, _filename, content_type = image
return Response(
data,
mimetype=content_type,
headers={"Cache-Control": "public, max-age=86400"},
)
@app.post("/run/navidrome-albums")
def run_navidrome_albums():
payload = request.get_json(silent=True) or {}
return jsonify(poll_navidrome_albums(force=bool(payload.get("force"))))
@app.post("/run/liked-albums")
def run_liked_albums():
payload = request.get_json(silent=True) or {}
return jsonify(poll_liked_library_albums(force=bool(payload.get("force"))))
@app.post("/run/discovery-playlists")
def run_discovery_playlists():
payload = request.get_json(silent=True) or {}
return jsonify(poll_discovery_playlists(force=bool(payload.get("force"))))
@app.post("/run/release-watch")
def run_release_watch():
payload = request.get_json(silent=True) or {}
return jsonify(poll_release_watch(force=bool(payload.get("force"))))
def start_background() -> None:
polling = CFG.get("polling") or {}
tasks = [
("navidrome_albums", int(polling.get("navidrome_album_interval_seconds") or 300), poll_navidrome_albums),
("liked_library_albums", int(polling.get("liked_album_interval_seconds") or 300), poll_liked_library_albums),
("discovery_playlists", int(polling.get("discovery_playlist_interval_seconds") or 180), poll_discovery_playlists),
("release_watch", int(polling.get("release_watch_interval_seconds") or 600), poll_release_watch),
]
for name, interval, fn in tasks:
threading.Thread(target=loop_forever, args=(name, interval, fn), daemon=True).start()
if __name__ == "__main__":
init_state()
try:
MATRIX.set_profile()
except Exception as e:
log(f"matrix profile setup failed: {e}")
if not NAV.ping():
log("warning: Navidrome ping failed")
start_background()
server = CFG.get("server") or {}
host = server.get("host") or "0.0.0.0"
port = int(server.get("port") or 18323)
try:
from waitress import serve
serve(app, host=host, port=port)
except Exception:
app.run(host=host, port=port)