- aoi.py: poller for Navidrome (main + anime libraries), discovery playlists, release watch for liked/rated artists - Last.fm enrichment (bio, tags); Bandcamp search links - config.example.json: safe template; config.json gitignored - deploy/aoi.service: systemd unit (production) - assets/banner.png + aoi-avatar.png: persona banner + bot avatar - Russian README in line with sibling bots (rada/watcher)
921 lines
33 KiB
Python
921 lines
33 KiB
Python
"""Aoi-chan: Matrix notifications for Navidrome and discovery-playlist."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import html
|
||
import json
|
||
import mimetypes
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import threading
|
||
import time
|
||
import traceback
|
||
import uuid
|
||
from collections import Counter
|
||
from datetime import datetime, timezone, timedelta
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib.parse import quote_plus
|
||
|
||
import requests
|
||
from flask import Flask, jsonify, request
|
||
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
CONFIG_PATH = Path(os.environ.get("AOI_CONFIG", BASE_DIR / "config.json"))
|
||
STATE_DB = Path(os.environ.get("AOI_STATE_DB", BASE_DIR / "aoi.db"))
|
||
LOG_PATH = Path(os.environ.get("AOI_LOG", BASE_DIR / "aoi.log"))
|
||
STATE_LOCK = threading.Lock()
|
||
|
||
|
||
def _load_config() -> dict:
|
||
example = BASE_DIR / "config.example.json"
|
||
path = CONFIG_PATH if CONFIG_PATH.exists() else example
|
||
with path.open("r", encoding="utf-8-sig") as f:
|
||
return json.load(f)
|
||
|
||
|
||
CFG = _load_config()
|
||
|
||
|
||
def log(message: str) -> None:
|
||
line = f"{datetime.now(timezone.utc).isoformat()} {message}"
|
||
print(line, flush=True)
|
||
try:
|
||
with LOG_PATH.open("a", encoding="utf-8") as f:
|
||
f.write(line + "\n")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def now_utc() -> datetime:
|
||
return datetime.now(timezone.utc)
|
||
|
||
|
||
def parse_dt(value: Any) -> datetime | None:
|
||
if not value:
|
||
return None
|
||
if isinstance(value, datetime):
|
||
return value if value.tzinfo else value.replace(tzinfo=timezone.utc)
|
||
text = str(value).strip()
|
||
if not text:
|
||
return None
|
||
text = text.replace("Z", "+00:00")
|
||
for candidate in (text, text.split(".")[0]):
|
||
try:
|
||
dt = datetime.fromisoformat(candidate)
|
||
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
|
||
except Exception:
|
||
pass
|
||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y-%m", "%Y"):
|
||
try:
|
||
return datetime.strptime(text[: len(fmt)], fmt).replace(tzinfo=timezone.utc)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def parse_date(value: Any) -> datetime | None:
|
||
dt = parse_dt(value)
|
||
if dt:
|
||
return dt
|
||
text = str(value or "").strip()
|
||
for fmt in ("%Y", "%Y-%m", "%Y-%m-%d"):
|
||
try:
|
||
return datetime.strptime(text, fmt).replace(tzinfo=timezone.utc)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def strip_html(text: str) -> str:
|
||
text = re.sub(r"<br\s*/?>", "\n", text or "", flags=re.I)
|
||
text = re.sub(r"<[^>]+>", "", text)
|
||
return html.unescape(text).strip()
|
||
|
||
|
||
def compact(text: str, limit: int = 700) -> str:
|
||
text = re.sub(r"\s+", " ", strip_html(text)).strip()
|
||
if len(text) <= limit:
|
||
return text
|
||
return text[: limit - 1].rstrip() + "…"
|
||
|
||
|
||
def clean_lastfm_text(text: str) -> str:
|
||
text = re.sub(r"Read more on Last\.fm.*$", "", text or "", flags=re.I | re.S)
|
||
text = re.sub(r"User-contributed text.*$", "", text, flags=re.I | re.S)
|
||
return re.sub(r"\s+", " ", strip_html(text)).strip()
|
||
|
||
|
||
def init_state() -> None:
|
||
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.execute("PRAGMA busy_timeout=30000")
|
||
conn.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS sent_events (
|
||
kind TEXT NOT NULL,
|
||
event_key TEXT NOT NULL,
|
||
payload_json TEXT,
|
||
created_at TEXT NOT NULL,
|
||
PRIMARY KEY(kind, event_key)
|
||
)
|
||
"""
|
||
)
|
||
conn.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS cursors (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT,
|
||
updated_at TEXT NOT NULL
|
||
)
|
||
"""
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def already_sent(kind: str, key: str) -> bool:
|
||
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
|
||
conn.execute("PRAGMA busy_timeout=30000")
|
||
row = conn.execute(
|
||
"SELECT 1 FROM sent_events WHERE kind=? AND event_key=?",
|
||
(kind, key),
|
||
).fetchone()
|
||
return bool(row)
|
||
|
||
|
||
def mark_sent(kind: str, key: str, payload: dict | None = None) -> None:
|
||
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
|
||
conn.execute("PRAGMA busy_timeout=30000")
|
||
conn.execute(
|
||
"""
|
||
INSERT OR IGNORE INTO sent_events(kind,event_key,payload_json,created_at)
|
||
VALUES(?,?,?,?)
|
||
""",
|
||
(kind, key, json.dumps(payload or {}, ensure_ascii=False), now_utc().isoformat()),
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def get_cursor(key: str) -> str:
|
||
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
|
||
conn.execute("PRAGMA busy_timeout=30000")
|
||
row = conn.execute("SELECT value FROM cursors WHERE key=?", (key,)).fetchone()
|
||
return row[0] if row else ""
|
||
|
||
|
||
def set_cursor(key: str, value: str) -> None:
|
||
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
|
||
conn.execute("PRAGMA busy_timeout=30000")
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO cursors(key,value,updated_at) VALUES(?,?,?)
|
||
ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at
|
||
""",
|
||
(key, value, now_utc().isoformat()),
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def request_session() -> requests.Session:
|
||
sess = requests.Session()
|
||
proxy = (CFG.get("metadata") or {}).get("proxy") or ""
|
||
if proxy:
|
||
sess.proxies.update({"http": proxy, "https": proxy})
|
||
return sess
|
||
|
||
|
||
HTTP = request_session()
|
||
LASTFM_API_KEY_CACHE: str | None = None
|
||
|
||
|
||
class MatrixClient:
|
||
def __init__(self, cfg: dict):
|
||
self.homeserver = cfg["homeserver"].rstrip("/")
|
||
self.room_id = cfg["room_id"]
|
||
self.token = cfg["access_token"]
|
||
self.user_id = cfg.get("user_id") or ""
|
||
|
||
@property
|
||
def headers(self) -> dict:
|
||
return {"Authorization": f"Bearer {self.token}"}
|
||
|
||
def set_profile(self) -> None:
|
||
bot = CFG.get("bot") or {}
|
||
name = bot.get("name")
|
||
if name and self.user_id:
|
||
url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/displayname"
|
||
r = requests.put(url, headers=self.headers, json={"displayname": name}, timeout=20)
|
||
if not r.ok:
|
||
log(f"matrix display name update failed: {r.status_code} {r.text[:200]}")
|
||
|
||
avatar_path = bot.get("avatar_path") or ""
|
||
if avatar_path:
|
||
path = Path(avatar_path)
|
||
if path.exists() and self.user_id:
|
||
mxc = self.upload_file(path.read_bytes(), path.name, mimetypes.guess_type(path.name)[0] or "image/jpeg")
|
||
if mxc:
|
||
url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/avatar_url"
|
||
r = requests.put(url, headers=self.headers, json={"avatar_url": mxc}, timeout=20)
|
||
if not r.ok:
|
||
log(f"matrix avatar update failed: {r.status_code} {r.text[:200]}")
|
||
|
||
def upload_file(self, data: bytes, filename: str, content_type: str) -> str:
|
||
r = requests.post(
|
||
f"{self.homeserver}/_matrix/media/v3/upload",
|
||
headers={**self.headers, "Content-Type": content_type},
|
||
params={"filename": filename},
|
||
data=data,
|
||
timeout=45,
|
||
)
|
||
r.raise_for_status()
|
||
return r.json()["content_uri"]
|
||
|
||
def send_text(self, text: str, formatted_html: str | None = None) -> dict:
|
||
body = {"msgtype": "m.text", "body": text}
|
||
if formatted_html:
|
||
body["format"] = "org.matrix.custom.html"
|
||
body["formatted_body"] = formatted_html
|
||
txn = uuid.uuid4().hex
|
||
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}"
|
||
r = requests.put(url, headers=self.headers, json=body, timeout=30)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
|
||
def send_image_bytes(self, data: bytes, filename: str, content_type: str = "image/jpeg") -> dict:
|
||
mxc = self.upload_file(data, filename, content_type)
|
||
body = {
|
||
"msgtype": "m.image",
|
||
"body": filename,
|
||
"url": mxc,
|
||
"info": {"mimetype": content_type, "size": len(data)},
|
||
}
|
||
txn = uuid.uuid4().hex
|
||
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}"
|
||
r = requests.put(url, headers=self.headers, json=body, timeout=30)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
|
||
def send_card(self, text: str, formatted_html: str, image: tuple[bytes, str, str] | None = None) -> None:
|
||
if image:
|
||
try:
|
||
self.send_image_bytes(*image)
|
||
except Exception as e:
|
||
log(f"matrix image send failed: {e}")
|
||
self.send_text(text, formatted_html)
|
||
|
||
|
||
class NavidromeClient:
|
||
def __init__(self, cfg: dict):
|
||
self.base_url = cfg["url"].rstrip("/")
|
||
self.username = cfg["username"]
|
||
self.password = cfg["password"]
|
||
|
||
def _params(self) -> dict:
|
||
return {
|
||
"u": self.username,
|
||
"p": self.password,
|
||
"v": "1.16.1",
|
||
"c": "AoiNotifier",
|
||
"f": "json",
|
||
}
|
||
|
||
def get(self, endpoint: str, **params) -> dict:
|
||
r = requests.get(
|
||
f"{self.base_url}/rest/{endpoint}",
|
||
params={**self._params(), **params},
|
||
timeout=35,
|
||
)
|
||
r.raise_for_status()
|
||
data = r.json().get("subsonic-response", {})
|
||
if data.get("status") != "ok":
|
||
raise RuntimeError(f"Navidrome {endpoint} failed: {data}")
|
||
return data
|
||
|
||
def ping(self) -> bool:
|
||
try:
|
||
return self.get("ping").get("status") == "ok"
|
||
except Exception:
|
||
return False
|
||
|
||
def newest_albums(self, music_folder_id: int, size: int) -> list[dict]:
|
||
data = self.get("getAlbumList2", type="newest", size=size, musicFolderId=music_folder_id)
|
||
return data.get("albumList2", {}).get("album", []) or []
|
||
|
||
def album(self, album_id: str) -> dict:
|
||
return self.get("getAlbum", id=album_id).get("album", {}) or {}
|
||
|
||
def playlists(self) -> list[dict]:
|
||
data = self.get("getPlaylists")
|
||
return data.get("playlists", {}).get("playlist", []) or []
|
||
|
||
def playlist(self, playlist_id: str) -> dict:
|
||
return self.get("getPlaylist", id=playlist_id).get("playlist", {}) or {}
|
||
|
||
def cover(self, cover_art: str | None) -> tuple[bytes, str, str] | None:
|
||
if not cover_art:
|
||
return None
|
||
try:
|
||
r = requests.get(
|
||
f"{self.base_url}/rest/getCoverArt",
|
||
params={**self._params(), "id": cover_art, "size": 900},
|
||
timeout=35,
|
||
)
|
||
r.raise_for_status()
|
||
ctype = r.headers.get("Content-Type") or "image/jpeg"
|
||
ext = mimetypes.guess_extension(ctype.split(";")[0]) or ".jpg"
|
||
return (r.content, f"cover{ext}", ctype)
|
||
except Exception as e:
|
||
log(f"cover fetch failed: {e}")
|
||
return None
|
||
|
||
|
||
MATRIX = MatrixClient(CFG["matrix"])
|
||
NAV = NavidromeClient(CFG["navidrome"])
|
||
|
||
|
||
def metadata_headers() -> dict:
|
||
ua = (CFG.get("metadata") or {}).get("user_agent") or "AoiNotifier/1.0"
|
||
return {"User-Agent": ua, "Accept": "application/json"}
|
||
|
||
|
||
def discovery_setting(key: str) -> str:
|
||
db_path = (CFG.get("discovery") or {}).get("db_path") or ""
|
||
if not db_path or not Path(db_path).exists():
|
||
return ""
|
||
try:
|
||
with sqlite3.connect(db_path) as conn:
|
||
row = conn.execute("SELECT value FROM user_settings WHERE key=?", (key,)).fetchone()
|
||
return str(row[0] or "") if row else ""
|
||
except Exception as e:
|
||
log(f"discovery setting lookup failed for {key}: {e}")
|
||
return ""
|
||
|
||
|
||
def lastfm_api_key() -> str:
|
||
global LASTFM_API_KEY_CACHE
|
||
if LASTFM_API_KEY_CACHE is not None:
|
||
return LASTFM_API_KEY_CACHE
|
||
LASTFM_API_KEY_CACHE = (CFG.get("metadata") or {}).get("lastfm_api_key") or discovery_setting("lastfm_api_key")
|
||
return LASTFM_API_KEY_CACHE
|
||
|
||
|
||
def lastfm_request(method: str, params: dict, timeout: int = 25) -> dict:
|
||
api_key = lastfm_api_key()
|
||
if not api_key:
|
||
return {}
|
||
req_params = {
|
||
"method": method,
|
||
"api_key": api_key,
|
||
"format": "json",
|
||
"autocorrect": 1,
|
||
**params,
|
||
}
|
||
try:
|
||
r = HTTP.get("https://ws.audioscrobbler.com/2.0/", params=req_params, timeout=timeout)
|
||
if not r.ok:
|
||
return {}
|
||
return r.json()
|
||
except Exception as e:
|
||
log(f"lastfm {method} failed: {e}")
|
||
return {}
|
||
|
||
|
||
def lastfm_album_tags(artist: str, album: str, limit: int = 6) -> list[str]:
|
||
data = lastfm_request("album.getTopTags", {"artist": artist, "album": album}, timeout=20)
|
||
tags = []
|
||
for tag in ((data.get("toptags") or {}).get("tag") or [])[:limit]:
|
||
name = (tag.get("name") or "").strip()
|
||
if name and len(name) <= 32 and not name.isdigit():
|
||
tags.append(name)
|
||
return tags
|
||
|
||
|
||
def lastfm_artist_summary(artist: str) -> str:
|
||
for params in ({"artist": artist, "lang": "ru"}, {"artist": artist}):
|
||
data = lastfm_request("artist.getinfo", params, timeout=20)
|
||
bio = ((data.get("artist") or {}).get("bio") or {})
|
||
summary = clean_lastfm_text(bio.get("summary") or bio.get("content") or "")
|
||
if summary:
|
||
return summary
|
||
return ""
|
||
|
||
|
||
def lastfm_album_search_url(artist: str, album: str) -> str:
|
||
return f"https://www.last.fm/search/albums?q={quote_plus(f'{artist} {album}')}"
|
||
|
||
|
||
def bandcamp_album_search_url(artist: str, album: str) -> str:
|
||
return f"https://bandcamp.com/search?q={quote_plus(f'{artist} {album}')}&item_type=a"
|
||
|
||
|
||
def album_source_links(artist: str, album: str, lastfm_url: str = "") -> list[dict[str, str]]:
|
||
return [
|
||
{"label": "Last.fm", "url": lastfm_url or lastfm_album_search_url(artist, album)},
|
||
{"label": "Bandcamp", "url": bandcamp_album_search_url(artist, album)},
|
||
]
|
||
|
||
|
||
def lastfm_album_info(artist: str, album: str) -> dict[str, str]:
|
||
for params in ({"artist": artist, "album": album, "lang": "ru"}, {"artist": artist, "album": album}):
|
||
data = lastfm_request("album.getinfo", params)
|
||
album_data = data.get("album") or {}
|
||
wiki = album_data.get("wiki") or {}
|
||
summary = clean_lastfm_text(wiki.get("summary") or wiki.get("content") or "")
|
||
if summary:
|
||
return {"summary": summary, "url": album_data.get("url") or ""}
|
||
|
||
tags = lastfm_album_tags(artist, album)
|
||
artist_summary = lastfm_artist_summary(artist)
|
||
parts = []
|
||
if tags:
|
||
parts.append("Last.fm описывает релиз тегами: " + ", ".join(tags) + ".")
|
||
if artist_summary:
|
||
parts.append(artist_summary)
|
||
if parts:
|
||
return {"summary": " ".join(parts), "url": ""}
|
||
return {"summary": "", "url": ""}
|
||
|
||
|
||
def lastfm_album_summary(artist: str, album: str) -> str:
|
||
return lastfm_album_info(artist, album).get("summary", "")
|
||
|
||
def musicbrainz_release_hint(artist: str, album: str) -> dict:
|
||
if not (CFG.get("metadata") or {}).get("musicbrainz_enabled", True):
|
||
return {}
|
||
try:
|
||
query = f'artist:"{artist}" AND release:"{album}"'
|
||
r = HTTP.get(
|
||
"https://musicbrainz.org/ws/2/release/",
|
||
params={"query": query, "fmt": "json", "limit": 3},
|
||
headers=metadata_headers(),
|
||
timeout=25,
|
||
)
|
||
if not r.ok:
|
||
return {}
|
||
releases = r.json().get("releases") or []
|
||
if not releases:
|
||
return {}
|
||
rel = releases[0]
|
||
return {
|
||
"date": rel.get("date") or "",
|
||
"country": rel.get("country") or "",
|
||
"status": rel.get("status") or "",
|
||
"score": rel.get("score") or 0,
|
||
}
|
||
except Exception as e:
|
||
log(f"musicbrainz lookup failed: {e}")
|
||
return {}
|
||
|
||
|
||
def lastfm_tags_for_tracks(tracks: list[dict]) -> Counter:
|
||
api_key = lastfm_api_key()
|
||
tags = Counter()
|
||
if not api_key:
|
||
return tags
|
||
for tr in tracks[:8]:
|
||
try:
|
||
r = HTTP.get(
|
||
"https://ws.audioscrobbler.com/2.0/",
|
||
params={
|
||
"method": "track.getTopTags",
|
||
"artist": tr.get("artist") or "",
|
||
"track": tr.get("track") or tr.get("title") or "",
|
||
"api_key": api_key,
|
||
"format": "json",
|
||
"autocorrect": 1,
|
||
},
|
||
timeout=20,
|
||
)
|
||
if not r.ok:
|
||
continue
|
||
for tag in ((r.json().get("toptags") or {}).get("tag") or [])[:5]:
|
||
name = (tag.get("name") or "").lower().strip()
|
||
if name and len(name) <= 32:
|
||
tags[name] += 1
|
||
except Exception:
|
||
continue
|
||
return tags
|
||
|
||
|
||
def album_plain_and_html(album: dict, library_label: str, songs: list[dict]) -> tuple[str, str]:
|
||
artist = album.get("artist") or "Unknown artist"
|
||
title = album.get("name") or album.get("album") or "Unknown album"
|
||
year = album.get("year") or ""
|
||
duration = int(album.get("duration") or sum(int(s.get("duration") or 0) for s in songs) or 0)
|
||
minutes = duration // 60 if duration else 0
|
||
lastfm_info = lastfm_album_info(artist, title)
|
||
summary = lastfm_info.get("summary") or ""
|
||
links = album_source_links(artist, title, lastfm_info.get("url") or "")
|
||
hint = musicbrainz_release_hint(artist, title)
|
||
release_date = album.get("created") or album.get("starred") or hint.get("date") or ""
|
||
track_lines = []
|
||
for idx, song in enumerate(songs[:30], 1):
|
||
name = song.get("title") or "Untitled"
|
||
dur = int(song.get("duration") or 0)
|
||
suffix = f" ({dur // 60}:{dur % 60:02d})" if dur else ""
|
||
track_lines.append(f"{idx:02d}. {name}{suffix}")
|
||
if len(songs) > 30:
|
||
track_lines.append(f"... и еще {len(songs) - 30}")
|
||
|
||
header = f"🎧 Новый релиз в Navidrome / {library_label}"
|
||
facts = [
|
||
f"{artist} — {title}" + (f" ({year})" if year else ""),
|
||
f"Треков: {len(songs)}" + (f", длительность: {minutes} мин." if minutes else ""),
|
||
]
|
||
if release_date:
|
||
facts.append(f"Дата/добавление: {release_date}")
|
||
if hint.get("country") or hint.get("status"):
|
||
facts.append("MusicBrainz: " + ", ".join(x for x in [hint.get("country"), hint.get("status")] if x))
|
||
if summary:
|
||
description_fact = f"Описание: {summary}"
|
||
else:
|
||
description_fact = "Описание: пока не нашла нормальное описание, оставляю чистый треклист."
|
||
facts.append(description_fact)
|
||
facts.append("Ссылки: " + ", ".join(f"{link['label']}: {link['url']}" for link in links))
|
||
plain = "\n".join([header, "", *facts, "", "━━ ТРЕКЛИСТ ━━", *track_lines])
|
||
|
||
html_parts = [
|
||
f"<h3>{html.escape(header)}</h3>",
|
||
f"<b>{html.escape(artist)} — {html.escape(title)}</b>" + (f" ({html.escape(str(year))})" if year else ""),
|
||
"<ul>",
|
||
f"<li>Треков: {len(songs)}" + (f", длительность: {minutes} мин." if minutes else "") + "</li>",
|
||
]
|
||
if release_date:
|
||
html_parts.append(f"<li>Дата/добавление: {html.escape(str(release_date))}</li>")
|
||
if hint.get("country") or hint.get("status"):
|
||
html_parts.append(f"<li>MusicBrainz: {html.escape(', '.join(x for x in [hint.get('country'), hint.get('status')] if x))}</li>")
|
||
html_parts.append("</ul>")
|
||
html_parts.append(f"<p>{html.escape(description_fact)}</p>")
|
||
html_parts.append(
|
||
"<p>Ссылки: "
|
||
+ ", ".join(
|
||
f'<a href="{html.escape(link["url"], quote=True)}">{html.escape(link["label"])}</a>'
|
||
for link in links
|
||
)
|
||
+ "</p>"
|
||
)
|
||
html_parts.append("<p><b>Треклист</b></p><ol>")
|
||
for line in track_lines:
|
||
html_parts.append(f"<li>{html.escape(line[4:] if re.match(r'^\\d\\d\\. ', line) else line)}</li>")
|
||
html_parts.append("</ol>")
|
||
return plain, "\n".join(html_parts)
|
||
|
||
|
||
def poll_navidrome_albums(force: bool = False) -> dict:
|
||
cfg = CFG["navidrome"]
|
||
libraries = [
|
||
("main", int(cfg.get("main_library_id") or 0), "main"),
|
||
("anime", int(cfg.get("anime_library_id") or 0), "anime"),
|
||
]
|
||
size = int(cfg.get("album_poll_size") or 40)
|
||
baseline = (CFG.get("polling") or {}).get("baseline_existing_on_first_run", True)
|
||
first_run = not get_cursor("navidrome_albums_baselined")
|
||
sent = 0
|
||
seen = 0
|
||
for kind, folder_id, label in libraries:
|
||
if not folder_id:
|
||
continue
|
||
for album in NAV.newest_albums(folder_id, size):
|
||
album_id = str(album.get("id") or "")
|
||
if not album_id:
|
||
continue
|
||
key = f"{kind}:{album_id}"
|
||
if already_sent("navidrome_album", key):
|
||
continue
|
||
seen += 1
|
||
if first_run and baseline and not force:
|
||
mark_sent("navidrome_album", key, album)
|
||
continue
|
||
full_album = NAV.album(album_id)
|
||
songs = full_album.get("song") or []
|
||
plain, formatted = album_plain_and_html(full_album or album, label, songs)
|
||
image = NAV.cover((full_album or album).get("coverArt"))
|
||
MATRIX.send_card(plain, formatted, image)
|
||
mark_sent("navidrome_album", key, {"album": album, "sent_at": now_utc().isoformat()})
|
||
sent += 1
|
||
time.sleep(1)
|
||
set_cursor("navidrome_albums_baselined", now_utc().isoformat())
|
||
return {"ok": True, "seen_new": seen, "sent": sent, "first_run": first_run}
|
||
|
||
|
||
def discovery_conn() -> sqlite3.Connection | None:
|
||
db_path = (CFG.get("discovery") or {}).get("db_path") or ""
|
||
if not db_path or not Path(db_path).exists():
|
||
log(f"discovery db not found: {db_path}")
|
||
return None
|
||
conn = sqlite3.connect(db_path)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def playlist_rows(limit: int = 20) -> list[sqlite3.Row]:
|
||
conn = discovery_conn()
|
||
if not conn:
|
||
return []
|
||
try:
|
||
return conn.execute(
|
||
"""
|
||
SELECT * FROM playlists
|
||
ORDER BY COALESCE(last_generated, created_at) DESC
|
||
LIMIT ?
|
||
""",
|
||
(limit,),
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def playlist_tracks(playlist_id: int) -> list[dict]:
|
||
conn = discovery_conn()
|
||
if not conn:
|
||
return []
|
||
try:
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT * FROM playlist_tracks
|
||
WHERE playlist_id=?
|
||
ORDER BY score DESC, added_at DESC
|
||
""",
|
||
(playlist_id,),
|
||
).fetchall()
|
||
return [dict(r) for r in rows]
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def navidrome_playlist_cover(name: str) -> tuple[bytes, str, str] | None:
|
||
try:
|
||
needle = name.strip().lower()
|
||
candidates = NAV.playlists()
|
||
chosen = None
|
||
for pl in candidates:
|
||
pl_name = (pl.get("name") or "").strip().lower()
|
||
if pl_name == needle or needle in pl_name or pl_name in needle:
|
||
chosen = pl
|
||
break
|
||
if not chosen:
|
||
return None
|
||
full = NAV.playlist(str(chosen.get("id")))
|
||
if full.get("coverArt"):
|
||
return NAV.cover(full.get("coverArt"))
|
||
entries = full.get("entry") or []
|
||
if entries:
|
||
return NAV.cover(entries[0].get("coverArt"))
|
||
except Exception as e:
|
||
log(f"navidrome playlist cover failed: {e}")
|
||
return None
|
||
|
||
|
||
PROFILE_LABELS = {
|
||
"deep_cuts": "глубокие находки",
|
||
"fresh_picks": "свежие рекомендации",
|
||
"comfort_zone": "комфортная зона",
|
||
}
|
||
|
||
|
||
def playlist_plain_and_html(pl: dict, tracks: list[dict]) -> tuple[str, str]:
|
||
name = pl.get("name") or "Discovery"
|
||
created = pl.get("last_generated") or pl.get("created_at") or ""
|
||
top_tracks = tracks[:8]
|
||
top_lines = [f"• {t.get('artist')} — {t.get('track')}" for t in top_tracks]
|
||
profile_counts = Counter((t.get("profile") or "").strip() for t in tracks if t.get("profile"))
|
||
profile_text = ", ".join(
|
||
f"{PROFILE_LABELS.get(k, k)}: {v}" for k, v in profile_counts.most_common()
|
||
) or "смешанный режим"
|
||
tags = lastfm_tags_for_tracks(tracks)
|
||
tag_text = ", ".join(tag for tag, _ in tags.most_common(6))
|
||
if not tag_text:
|
||
tag_text = profile_text
|
||
artists = Counter((t.get("artist") or "").strip() for t in tracks if t.get("artist"))
|
||
artist_text = ", ".join(a for a, _ in artists.most_common(5))
|
||
summary = (
|
||
f"В этот раз уклон: {tag_text}. "
|
||
f"Из заметного: {', '.join(line[2:] for line in top_lines[:4])}."
|
||
)
|
||
plain = "\n".join([
|
||
"🧭 Новый Discovery-плейлист",
|
||
"",
|
||
name,
|
||
f"Треков: {len(tracks)}",
|
||
f"Собран: {created}",
|
||
f"Профиль: {profile_text}",
|
||
f"Главные артисты: {artist_text or 'пока не выделяются'}",
|
||
"",
|
||
summary,
|
||
"",
|
||
"━━ ИНТЕРЕСНОЕ ━━",
|
||
*top_lines,
|
||
])
|
||
formatted = "\n".join([
|
||
"<h3>🧭 Новый Discovery-плейлист</h3>",
|
||
f"<b>{html.escape(name)}</b>",
|
||
"<ul>",
|
||
f"<li>Треков: {len(tracks)}</li>",
|
||
f"<li>Собран: {html.escape(str(created))}</li>",
|
||
f"<li>Профиль: {html.escape(profile_text)}</li>",
|
||
f"<li>Главные артисты: {html.escape(artist_text or 'пока не выделяются')}</li>",
|
||
"</ul>",
|
||
f"<p>{html.escape(summary)}</p>",
|
||
"<p><b>Интересное</b></p><ul>",
|
||
*[f"<li>{html.escape(line[2:])}</li>" for line in top_lines],
|
||
"</ul>",
|
||
])
|
||
return plain, formatted
|
||
|
||
|
||
def poll_discovery_playlists(force: bool = False) -> dict:
|
||
baseline = (CFG.get("polling") or {}).get("baseline_existing_on_first_run", True)
|
||
first_run = not get_cursor("discovery_playlists_baselined")
|
||
sent = 0
|
||
seen = 0
|
||
for row in playlist_rows():
|
||
pl = dict(row)
|
||
stamp = pl.get("last_generated") or pl.get("created_at") or ""
|
||
if not stamp:
|
||
continue
|
||
key = f"{pl.get('id')}:{stamp}:{pl.get('iteration_number', 0)}"
|
||
if already_sent("discovery_playlist", key):
|
||
continue
|
||
seen += 1
|
||
if first_run and baseline and not force:
|
||
mark_sent("discovery_playlist", key, pl)
|
||
continue
|
||
tracks = playlist_tracks(int(pl["id"]))
|
||
plain, formatted = playlist_plain_and_html(pl, tracks)
|
||
image = navidrome_playlist_cover(pl.get("name") or "")
|
||
MATRIX.send_card(plain, formatted, image)
|
||
mark_sent("discovery_playlist", key, {"playlist": pl, "sent_at": now_utc().isoformat()})
|
||
sent += 1
|
||
time.sleep(1)
|
||
set_cursor("discovery_playlists_baselined", now_utc().isoformat())
|
||
return {"ok": True, "seen_new": seen, "sent": sent, "first_run": first_run}
|
||
|
||
|
||
def release_rows(limit: int = 120) -> list[dict]:
|
||
conn = discovery_conn()
|
||
if not conn:
|
||
return []
|
||
try:
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT * FROM artist_release_watch
|
||
ORDER BY COALESCE(downloaded_at, first_seen_at) DESC
|
||
LIMIT ?
|
||
""",
|
||
(limit,),
|
||
).fetchall()
|
||
return [dict(r) for r in rows]
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def release_plain_and_html(row: dict, mode: str) -> tuple[str, str]:
|
||
artist = row.get("artist") or "Unknown artist"
|
||
album = row.get("album") or "Unknown release"
|
||
date = row.get("release_date") or "дата уточняется"
|
||
source = row.get("source") or "discovery-playlist"
|
||
tracks = int(row.get("desired_track_count") or 0)
|
||
status = row.get("status") or ""
|
||
if mode == "out":
|
||
title = "📀 Релиз вышел"
|
||
lead = f"{artist} — {album} уже можно забирать."
|
||
else:
|
||
title = "📡 Анонс релиза отслеживаемого артиста"
|
||
lead = f"{artist} — {album} появился в release watch."
|
||
facts = [
|
||
lead,
|
||
f"Дата релиза: {date}",
|
||
f"Статус: {status}",
|
||
f"Источник: {source}",
|
||
]
|
||
if tracks:
|
||
facts.append(f"Ожидаемый треклист: {tracks} треков")
|
||
if row.get("error_message"):
|
||
facts.append(f"Заметка: {compact(row.get('error_message'), 240)}")
|
||
plain = "\n".join([title, "", *facts])
|
||
formatted = "\n".join([
|
||
f"<h3>{html.escape(title)}</h3>",
|
||
f"<p><b>{html.escape(artist)} — {html.escape(album)}</b></p>",
|
||
"<ul>",
|
||
*[f"<li>{html.escape(f)}</li>" for f in facts[1:]],
|
||
"</ul>",
|
||
])
|
||
return plain, formatted
|
||
|
||
|
||
def release_is_out(row: dict) -> bool:
|
||
dt = parse_date(row.get("release_date"))
|
||
return bool(dt and dt.date() == now_utc().date())
|
||
|
||
|
||
def release_is_announcement(row: dict) -> bool:
|
||
dt = parse_date(row.get("release_date"))
|
||
return bool(dt and dt.date() > now_utc().date())
|
||
|
||
|
||
def poll_release_watch(force: bool = False) -> dict:
|
||
baseline = (CFG.get("polling") or {}).get("baseline_existing_on_first_run", True)
|
||
first_run = not get_cursor("release_watch_baselined")
|
||
announced = 0
|
||
out = 0
|
||
skipped = 0
|
||
for row in release_rows():
|
||
row_id = row.get("id")
|
||
first_seen = row.get("first_seen_at") or ""
|
||
announce_key = f"{row_id}:{first_seen}"
|
||
if release_is_announcement(row) and not already_sent("release_announcement", announce_key):
|
||
if first_run and baseline and not force:
|
||
mark_sent("release_announcement", announce_key, row)
|
||
else:
|
||
plain, formatted = release_plain_and_html(row, "announce")
|
||
MATRIX.send_card(plain, formatted, None)
|
||
mark_sent("release_announcement", announce_key, row)
|
||
announced += 1
|
||
time.sleep(1)
|
||
elif not release_is_announcement(row):
|
||
skipped += 1
|
||
|
||
if release_is_out(row):
|
||
out_stamp = row.get("downloaded_at") or row.get("release_date") or row.get("last_checked_at") or ""
|
||
out_key = f"{row_id}:{out_stamp}:{row.get('status')}"
|
||
if already_sent("release_out", out_key):
|
||
continue
|
||
if first_run and baseline and not force:
|
||
mark_sent("release_out", out_key, row)
|
||
else:
|
||
plain, formatted = release_plain_and_html(row, "out")
|
||
MATRIX.send_card(plain, formatted, None)
|
||
mark_sent("release_out", out_key, row)
|
||
out += 1
|
||
time.sleep(1)
|
||
set_cursor("release_watch_baselined", now_utc().isoformat())
|
||
return {"ok": True, "announced": announced, "out": out, "skipped_not_current_or_future": skipped, "first_run": first_run}
|
||
|
||
|
||
def loop_forever(name: str, interval: int, fn) -> None:
|
||
while True:
|
||
try:
|
||
result = fn()
|
||
log(f"{name}: {result}")
|
||
except Exception as e:
|
||
log(f"{name} failed: {e}\n{traceback.format_exc()}")
|
||
time.sleep(interval)
|
||
|
||
|
||
app = Flask(__name__)
|
||
|
||
|
||
@app.get("/healthz")
|
||
def healthz():
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.post("/run/navidrome-albums")
|
||
def run_navidrome_albums():
|
||
payload = request.get_json(silent=True) or {}
|
||
return jsonify(poll_navidrome_albums(force=bool(payload.get("force"))))
|
||
|
||
|
||
@app.post("/run/discovery-playlists")
|
||
def run_discovery_playlists():
|
||
payload = request.get_json(silent=True) or {}
|
||
return jsonify(poll_discovery_playlists(force=bool(payload.get("force"))))
|
||
|
||
|
||
@app.post("/run/release-watch")
|
||
def run_release_watch():
|
||
payload = request.get_json(silent=True) or {}
|
||
return jsonify(poll_release_watch(force=bool(payload.get("force"))))
|
||
|
||
|
||
def start_background() -> None:
|
||
polling = CFG.get("polling") or {}
|
||
tasks = [
|
||
("navidrome_albums", int(polling.get("navidrome_album_interval_seconds") or 300), poll_navidrome_albums),
|
||
("discovery_playlists", int(polling.get("discovery_playlist_interval_seconds") or 180), poll_discovery_playlists),
|
||
("release_watch", int(polling.get("release_watch_interval_seconds") or 600), poll_release_watch),
|
||
]
|
||
for name, interval, fn in tasks:
|
||
threading.Thread(target=loop_forever, args=(name, interval, fn), daemon=True).start()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
init_state()
|
||
try:
|
||
MATRIX.set_profile()
|
||
except Exception as e:
|
||
log(f"matrix profile setup failed: {e}")
|
||
if not NAV.ping():
|
||
log("warning: Navidrome ping failed")
|
||
start_background()
|
||
server = CFG.get("server") or {}
|
||
host = server.get("host") or "0.0.0.0"
|
||
port = int(server.get("port") or 18323)
|
||
try:
|
||
from waitress import serve
|
||
|
||
serve(app, host=host, port=port)
|
||
except Exception:
|
||
app.run(host=host, port=port)
|