Update Aoi dashboard and cover proxy

This commit is contained in:
shu 2026-05-07 15:07:19 +03:00
parent 49503a78c4
commit 7f2572cec3
2 changed files with 394 additions and 44 deletions

435
aoi.py
View file

@ -19,7 +19,7 @@ from typing import Any
from urllib.parse import quote_plus
import requests
from flask import Flask, jsonify, request
from flask import Flask, Response, jsonify, request
BASE_DIR = Path(__file__).resolve().parent
@ -121,6 +121,24 @@ def human_delta(target: datetime | None, now: datetime | None = None) -> str:
return f"{minutes}m"
def relative_datetime_label(value: Any, tz_value: Any = None, now: datetime | None = None) -> str:
dt = parse_dt(value)
if not dt:
return ""
tz = parse_timezone(tz_value or "UTC+03:00")
local_dt = dt.astimezone(tz)
local_now = (now or now_utc()).astimezone(tz)
day_delta = (local_dt.date() - local_now.date()).days
time_text = local_dt.strftime("%H:%M")
if day_delta == -1:
return f"yesterday {time_text}"
if day_delta == 0:
return f"today {time_text}"
if day_delta == 1:
return f"tomorrow {time_text}"
return local_dt.strftime("%Y-%m-%d %H:%M")
def strip_html(text: str) -> str:
text = re.sub(r"<br\s*/?>", "\n", text or "", flags=re.I)
text = re.sub(r"<[^>]+>", "", text)
@ -342,6 +360,14 @@ class NavidromeClient:
data = self.get("getAlbumList2", type="newest", size=size, musicFolderId=music_folder_id)
return data.get("albumList2", {}).get("album", []) or []
def albums_by_year(self, size: int, music_folder_id: int | None = None) -> list[dict]:
year = datetime.now(timezone.utc).year + 1
params = {"type": "byYear", "fromYear": year, "toYear": 1900, "size": size}
if music_folder_id:
params["musicFolderId"] = music_folder_id
data = self.get("getAlbumList2", **params)
return data.get("albumList2", {}).get("album", []) or []
def album(self, album_id: str) -> dict:
return self.get("getAlbum", id=album_id).get("album", {}) or {}
@ -757,6 +783,19 @@ def poll_navidrome_albums(force: bool = False) -> dict:
continue
full_album = NAV.album(album_id)
songs = full_album.get("song") or []
duplicate_source = matching_downloaded_library_album(full_album or album)
if duplicate_source and not force:
mark_sent(
"navidrome_album",
key,
{
"album": album,
"sent_at": now_utc().isoformat(),
"skipped_duplicate_kind": "liked_library_album",
"downloaded_album": duplicate_source,
},
)
continue
plain, formatted = album_plain_and_html(full_album or album, label, songs)
image = NAV.cover((full_album or album).get("coverArt"))
MATRIX.send_card(plain, formatted, image)
@ -826,6 +865,34 @@ def downloaded_library_album_rows(limit: int = 50) -> list[dict]:
conn.close()
def matching_downloaded_library_album(album: dict, limit: int = 80) -> dict:
artist_key = _loose_text_key(album.get("artist") or "")
album_key = _loose_text_key(album.get("name") or album.get("album") or "")
if not artist_key or not album_key:
return {}
for row in downloaded_library_album_rows(limit):
row_artist_key = _loose_text_key(row.get("artist") or "")
row_album_key = _loose_text_key(row.get("album") or "")
if artist_key == row_artist_key and album_key == row_album_key:
return row
return {}
def suppress_navidrome_album_notification(album_id: Any, source_row: dict) -> None:
album_id = str(album_id or "").strip()
if not album_id:
return
payload = {
"album": source_row,
"sent_at": now_utc().isoformat(),
"skipped_duplicate_kind": "liked_library_album",
}
for kind in ("main", "anime"):
key = f"{kind}:{album_id}"
if not already_sent("navidrome_album", key):
mark_sent("navidrome_album", key, payload)
def liked_album_plain_and_html(row: dict, full_album: dict, songs: list[dict]) -> tuple[str, str]:
album = dict(full_album or {})
album.setdefault("artist", row.get("artist") or "Unknown artist")
@ -863,6 +930,7 @@ def poll_liked_library_albums(force: bool = False) -> dict:
plain, formatted = liked_album_plain_and_html(row, full_album or nav_album, songs)
image = NAV.cover((full_album or nav_album).get("coverArt"))
MATRIX.send_card(plain, formatted, image)
suppress_navidrome_album_notification(nav_album.get("id"), row)
mark_sent(
"liked_library_album",
key,
@ -1319,20 +1387,213 @@ def navidrome_cover_url(cover_art: Any) -> str:
return f"/covers/navidrome/{cover_art}" if cover_art else ""
def dashboard_album_item(album: dict, fallback_created: str = "") -> dict:
return {
"artist": album.get("artist") or album.get("displayArtist") or "Unknown artist",
"title": album.get("name") or album.get("album") or "Unknown album",
"year": album.get("year") or "",
"song_count": album.get("songCount") or album.get("song_count") or "",
"created": album.get("created") or fallback_created,
"cover_url": navidrome_cover_url(album.get("coverArt")),
}
def dashboard_music_folder_ids() -> list[int]:
cfg = CFG.get("navidrome") or {}
ids = []
for key in ("main_library_id", "anime_library_id"):
try:
value = int(cfg.get(key) or 0)
except Exception:
value = 0
if value and value not in ids:
ids.append(value)
return ids
def dashboard_allowed_album_ids(album_ids: list[str]) -> set[str]:
ids = [str(album_id) for album_id in album_ids if album_id]
if not ids:
return set()
library_ids = dashboard_music_folder_ids()
db_path = Path((CFG.get("navidrome") or {}).get("db_path") or "/storage-1/qbit/navidrome/navidrome.db")
if not db_path.exists():
return set(ids)
try:
placeholders = ",".join("?" for _ in ids)
library_placeholders = ",".join("?" for _ in library_ids)
with sqlite3.connect(db_path) as db:
rows = db.execute(
f"SELECT id FROM album WHERE id IN ({placeholders}) AND library_id IN ({library_placeholders})",
[*ids, *library_ids],
).fetchall()
return {row[0] for row in rows}
except Exception as e:
log(f"dashboard album library filter failed: {e}")
return set(ids)
def navidrome_db_path() -> Path:
return Path((CFG.get("navidrome") or {}).get("db_path") or "/storage-1/qbit/navidrome/navidrome.db")
def _album_year(row: sqlite3.Row) -> int | str:
for key in ("min_original_year", "max_year", "min_year"):
try:
value = int(row[key] or 0)
except Exception:
value = 0
if value:
return value
return ""
def _album_release_date_from_row(row: sqlite3.Row) -> str:
for key in ("original_date", "release_date", "date"):
value = str(row[key] or "").strip()
if value:
return value
year = _album_year(row)
return str(year) if year else ""
def _dashboard_album_from_db_row(row: sqlite3.Row) -> dict:
return {
"artist": row["album_artist"] or "Unknown artist",
"title": row["name"] or "Unknown album",
"year": _album_year(row),
"song_count": row["song_count"] or "",
"created": row["imported_at"] or row["created_at"] or "",
"cover_url": navidrome_cover_url(row["id"]),
}
def dashboard_recent_album_rows(limit: int) -> list[sqlite3.Row]:
db_path = navidrome_db_path()
if not db_path.exists():
return []
library_ids = dashboard_music_folder_ids()
placeholders = ",".join("?" for _ in library_ids)
with sqlite3.connect(db_path) as db:
db.row_factory = sqlite3.Row
return db.execute(
f"""
SELECT id, name, album_artist, song_count, library_id, imported_at, created_at,
min_year, max_year, min_original_year, original_date, release_date, date
FROM album
WHERE library_id IN ({placeholders})
ORDER BY datetime(NULLIF(imported_at, '0000-00-00 00:00:00')) DESC,
datetime(created_at) DESC
LIMIT ?
""",
[*library_ids, limit],
).fetchall()
def dashboard_release_album_rows(limit: int) -> list[sqlite3.Row]:
db_path = navidrome_db_path()
if not db_path.exists():
return []
library_ids = dashboard_music_folder_ids()
placeholders = ",".join("?" for _ in library_ids)
with sqlite3.connect(db_path) as db:
db.row_factory = sqlite3.Row
return db.execute(
f"""
SELECT id, name, album_artist, song_count, library_id, imported_at, created_at,
min_year, max_year, min_original_year, original_date, release_date, date
FROM album
WHERE library_id IN ({placeholders})
AND COALESCE(NULLIF(original_date, ''), NULLIF(release_date, ''), NULLIF(date, ''),
CASE
WHEN min_original_year > 0 THEN printf('%04d', min_original_year)
WHEN max_year > 0 THEN printf('%04d', max_year)
WHEN min_year > 0 THEN printf('%04d', min_year)
ELSE ''
END) != ''
ORDER BY COALESCE(NULLIF(original_date, ''), NULLIF(release_date, ''), NULLIF(date, ''),
CASE
WHEN min_original_year > 0 THEN printf('%04d', min_original_year)
WHEN max_year > 0 THEN printf('%04d', max_year)
WHEN min_year > 0 THEN printf('%04d', min_year)
ELSE ''
END) DESC,
datetime(NULLIF(imported_at, '0000-00-00 00:00:00')) DESC
LIMIT ?
""",
[*library_ids, limit],
).fetchall()
def _date_parts(value: dict) -> tuple[int, int, int]:
try:
year = int(value.get("year") or 0)
month = int(value.get("month") or 1)
day = int(value.get("day") or 1)
return year, month, day
except Exception:
return 0, 1, 1
def album_release_date(album: dict) -> str:
for key in ("originalReleaseDate", "releaseDate"):
parts = album.get(key) or {}
if not isinstance(parts, dict):
continue
year, month, day = _date_parts(parts)
if year:
if parts.get("month") and parts.get("day"):
return f"{year:04d}-{month:02d}-{day:02d}"
if parts.get("month"):
return f"{year:04d}-{month:02d}"
return f"{year:04d}"
if album.get("year"):
return str(album.get("year"))
return ""
def album_release_sort_key(album: dict) -> tuple[int, int, int, str]:
for key in ("originalReleaseDate", "releaseDate"):
parts = album.get(key) or {}
if isinstance(parts, dict):
year, month, day = _date_parts(parts)
if year:
return year, month, day, str(album.get("created") or "")
try:
year = int(album.get("year") or 0)
except Exception:
year = 0
return year, 1, 1, str(album.get("created") or "")
def dashboard_albums(limit: int) -> list[dict]:
try:
rows = dashboard_recent_album_rows(limit)
if rows:
return [_dashboard_album_from_db_row(row) for row in rows]
except Exception as e:
log(f"dashboard recent albums db query failed: {e}")
albums_by_id: dict[str, dict] = {}
size = max(limit * 4, 40)
for folder_id in dashboard_music_folder_ids():
try:
for album in NAV.newest_albums(folder_id, size):
album_id = str(album.get("id") or "")
if album_id:
albums_by_id[album_id] = album
except Exception as e:
log(f"dashboard newest albums failed for folder {folder_id}: {e}")
if albums_by_id:
allowed_ids = dashboard_allowed_album_ids(list(albums_by_id.keys()))
albums = [album for album_id, album in albums_by_id.items() if album_id in allowed_ids]
albums = sorted(albums, key=lambda item: item.get("created") or "", reverse=True)
return [dashboard_album_item(album) for album in albums[:limit]]
items = []
for row in sent_event_rows("navidrome_album", limit):
album = row["payload"].get("album") or row["payload"]
items.append(
{
"artist": album.get("artist") or "Unknown artist",
"title": album.get("name") or album.get("album") or "Unknown album",
"year": album.get("year") or "",
"song_count": album.get("songCount") or album.get("song_count") or "",
"created": album.get("created") or row["created_at"],
"cover_url": navidrome_cover_url(album.get("coverArt")),
}
)
items.append(dashboard_album_item(album, row["created_at"]))
return items
@ -1397,6 +1658,33 @@ def release_cover_art(artist: str, album: str) -> str:
def playlist_next_generation(playlist: dict) -> datetime | None:
now = now_utc()
time_text = str(playlist.get("generation_time") or "").strip()
days_raw = playlist.get("generation_days")
days: list[int] = []
if days_raw:
try:
parsed = json.loads(days_raw) if isinstance(days_raw, str) else days_raw
days = [int(day) for day in parsed]
except Exception:
days = []
if time_text and days:
tz = parse_timezone(playlist.get("generation_timezone"))
local_now = now.astimezone(tz)
try:
hour, minute = [int(part) for part in time_text.split(":", 1)]
except Exception:
hour = minute = -1
if hour >= 0 and minute >= 0:
for offset in range(0, 14):
candidate_date = local_now.date() + timedelta(days=offset)
candidate = datetime(candidate_date.year, candidate_date.month, candidate_date.day, hour, minute, tzinfo=tz)
# discovery-playlist stores Python weekday indices: Monday=0, Sunday=6.
if candidate.weekday() not in days:
continue
if candidate > local_now:
return candidate.astimezone(timezone.utc)
interval = playlist.get("generation_interval_minutes")
last_generated = parse_dt(playlist.get("last_generated")) or parse_dt(playlist.get("created_at"))
if interval and last_generated:
@ -1407,42 +1695,43 @@ def playlist_next_generation(playlist: dict) -> datetime | None:
return target
except Exception:
pass
time_text = str(playlist.get("generation_time") or "").strip()
if not time_text:
return None
tz = parse_timezone(playlist.get("generation_timezone"))
local_now = now.astimezone(tz)
try:
hour, minute = [int(part) for part in time_text.split(":", 1)]
except Exception:
return None
days_raw = playlist.get("generation_days")
days: list[int] = []
if days_raw:
try:
parsed = json.loads(days_raw) if isinstance(days_raw, str) else days_raw
days = [int(day) for day in parsed]
except Exception:
days = []
for offset in range(0, 14):
candidate_date = local_now.date() + timedelta(days=offset)
candidate = datetime(candidate_date.year, candidate_date.month, candidate_date.day, hour, minute, tzinfo=tz)
# Python Monday is 0; stored schedule uses 1 for Monday.
weekday = candidate.weekday() + 1
if days and weekday not in days:
continue
if candidate > local_now:
return candidate.astimezone(timezone.utc)
return None
def dashboard_playlists(limit: int) -> list[dict]:
items = []
seen_names = set()
for playlist_row in playlist_rows(limit):
playlist = dict(playlist_row)
name = playlist.get("name") or "Discovery"
name_key = name.casefold()
if name_key in seen_names:
continue
seen_names.add(name_key)
cover_art = ""
nav_pl = navidrome_find_playlist(name)
if nav_pl:
cover_art = playlist_cover_art(nav_pl)
if not cover_art:
cover_art = playlist_track_cover_art(playlist.get("id"))
generated = playlist.get("last_generated") or playlist.get("created_at") or ""
next_generation = playlist_next_generation(playlist)
tz_value = playlist.get("generation_timezone") or "UTC+03:00"
items.append(
{
"name": name,
"track_count": playlist.get("track_count") or playlist.get("songCount") or "",
"generated": generated,
"generated_label": relative_datetime_label(generated, tz_value),
"next_generation": next_generation.isoformat() if next_generation else "",
"next_generation_label": relative_datetime_label(next_generation, tz_value) if next_generation else "",
"next_generation_in": human_delta(next_generation) if next_generation else "",
"cover_url": navidrome_cover_url(cover_art),
}
)
if len(items) >= limit:
return items
for row in sent_event_rows("discovery_playlist", limit * 3):
payload = row["payload"]
playlist = payload.get("playlist") or payload
@ -1460,12 +1749,16 @@ def dashboard_playlists(limit: int) -> list[dict]:
if nav_pl:
cover_art = playlist_cover_art(nav_pl)
next_generation = playlist_next_generation(playlist)
tz_value = playlist.get("generation_timezone") or "UTC+03:00"
generated = playlist.get("last_generated") or playlist.get("created_at") or row["created_at"]
items.append(
{
"name": name,
"track_count": playlist.get("track_count") or playlist.get("songCount") or "",
"generated": playlist.get("last_generated") or playlist.get("created_at") or row["created_at"],
"generated": generated,
"generated_label": relative_datetime_label(generated, tz_value),
"next_generation": next_generation.isoformat() if next_generation else "",
"next_generation_label": relative_datetime_label(next_generation, tz_value) if next_generation else "",
"next_generation_in": human_delta(next_generation) if next_generation else "",
"cover_url": navidrome_cover_url(cover_art),
}
@ -1476,6 +1769,51 @@ def dashboard_playlists(limit: int) -> list[dict]:
def dashboard_releases(limit: int) -> list[dict]:
try:
rows = dashboard_release_album_rows(limit)
if rows:
return [
{
"artist": row["album_artist"] or "Unknown artist",
"title": row["name"] or "Unknown release",
"release_date": _album_release_date_from_row(row),
"status": "in library",
"source": "navidrome",
"cover_url": navidrome_cover_url(row["id"]),
}
for row in rows
]
except Exception as e:
log(f"dashboard releases db query failed: {e}")
albums_by_id: dict[str, dict] = {}
size = max(limit * 10, 120)
for folder_id in dashboard_music_folder_ids():
try:
for album in NAV.albums_by_year(size, folder_id):
album_id = str(album.get("id") or "")
if album_id:
albums_by_id[album_id] = album
except Exception as e:
log(f"dashboard library releases failed for folder {folder_id}: {e}")
if albums_by_id:
allowed_ids = dashboard_allowed_album_ids(list(albums_by_id.keys()))
albums = [album for album_id, album in albums_by_id.items() if album_id in allowed_ids]
albums = sorted(albums, key=album_release_sort_key, reverse=True)
items = []
for album in albums[:limit]:
items.append(
{
"artist": album.get("artist") or album.get("displayArtist") or "Unknown artist",
"title": album.get("name") or "Unknown release",
"release_date": album_release_date(album),
"status": "in library",
"source": "navidrome",
"cover_url": navidrome_cover_url(album.get("coverArt")),
}
)
return items
items = []
seen = set()
for row in sent_event_rows("release_out", limit * 2):
@ -1523,6 +1861,19 @@ def dashboard():
return jsonify(payload)
@app.get("/covers/navidrome/<path:cover_art>")
def navidrome_cover_proxy(cover_art: str):
image = NAV.cover(cover_art)
if not image:
return Response(status=404)
data, _filename, content_type = image
return Response(
data,
mimetype=content_type,
headers={"Cache-Control": "public, max-age=86400"},
)
@app.post("/run/navidrome-albums")
def run_navidrome_albums():
payload = request.get_json(silent=True) or {}