"""Aoi-chan: Matrix notifications for Navidrome and discovery-playlist."""
from __future__ import annotations
import html
import json
import mimetypes
import os
import re
import sqlite3
import threading
import time
import traceback
import uuid
from collections import Counter
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any
from urllib.parse import quote_plus
import requests
from flask import Flask, jsonify, request
BASE_DIR = Path(__file__).resolve().parent
CONFIG_PATH = Path(os.environ.get("AOI_CONFIG", BASE_DIR / "config.json"))
STATE_DB = Path(os.environ.get("AOI_STATE_DB", BASE_DIR / "aoi.db"))
LOG_PATH = Path(os.environ.get("AOI_LOG", BASE_DIR / "aoi.log"))
STATE_LOCK = threading.Lock()
def _load_config() -> dict:
example = BASE_DIR / "config.example.json"
path = CONFIG_PATH if CONFIG_PATH.exists() else example
with path.open("r", encoding="utf-8-sig") as f:
return json.load(f)
CFG = _load_config()
def log(message: str) -> None:
line = f"{datetime.now(timezone.utc).isoformat()} {message}"
print(line, flush=True)
try:
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception:
pass
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def parse_dt(value: Any) -> datetime | None:
if not value:
return None
if isinstance(value, datetime):
return value if value.tzinfo else value.replace(tzinfo=timezone.utc)
text = str(value).strip()
if not text:
return None
text = text.replace("Z", "+00:00")
for candidate in (text, text.split(".")[0]):
try:
dt = datetime.fromisoformat(candidate)
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
except Exception:
pass
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y-%m", "%Y"):
try:
return datetime.strptime(text[: len(fmt)], fmt).replace(tzinfo=timezone.utc)
except Exception:
pass
return None
def parse_date(value: Any) -> datetime | None:
dt = parse_dt(value)
if dt:
return dt
text = str(value or "").strip()
for fmt in ("%Y", "%Y-%m", "%Y-%m-%d"):
try:
return datetime.strptime(text, fmt).replace(tzinfo=timezone.utc)
except Exception:
pass
return None
def strip_html(text: str) -> str:
text = re.sub(r"
", "\n", text or "", flags=re.I)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text).strip()
def compact(text: str, limit: int = 700) -> str:
text = re.sub(r"\s+", " ", strip_html(text)).strip()
if len(text) <= limit:
return text
return text[: limit - 1].rstrip() + "…"
def clean_lastfm_text(text: str) -> str:
text = re.sub(r"Read more on Last\.fm.*$", "", text or "", flags=re.I | re.S)
text = re.sub(r"User-contributed text.*$", "", text, flags=re.I | re.S)
return re.sub(r"\s+", " ", strip_html(text)).strip()
def init_state() -> None:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS sent_events (
kind TEXT NOT NULL,
event_key TEXT NOT NULL,
payload_json TEXT,
created_at TEXT NOT NULL,
PRIMARY KEY(kind, event_key)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cursors (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT NOT NULL
)
"""
)
conn.commit()
def already_sent(kind: str, key: str) -> bool:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
row = conn.execute(
"SELECT 1 FROM sent_events WHERE kind=? AND event_key=?",
(kind, key),
).fetchone()
return bool(row)
def mark_sent(kind: str, key: str, payload: dict | None = None) -> None:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
conn.execute(
"""
INSERT OR IGNORE INTO sent_events(kind,event_key,payload_json,created_at)
VALUES(?,?,?,?)
""",
(kind, key, json.dumps(payload or {}, ensure_ascii=False), now_utc().isoformat()),
)
conn.commit()
def get_cursor(key: str) -> str:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
row = conn.execute("SELECT value FROM cursors WHERE key=?", (key,)).fetchone()
return row[0] if row else ""
def set_cursor(key: str, value: str) -> None:
with STATE_LOCK, sqlite3.connect(STATE_DB, timeout=30) as conn:
conn.execute("PRAGMA busy_timeout=30000")
conn.execute(
"""
INSERT INTO cursors(key,value,updated_at) VALUES(?,?,?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at
""",
(key, value, now_utc().isoformat()),
)
conn.commit()
def request_session() -> requests.Session:
sess = requests.Session()
proxy = (CFG.get("metadata") or {}).get("proxy") or ""
if proxy:
sess.proxies.update({"http": proxy, "https": proxy})
return sess
HTTP = request_session()
LASTFM_API_KEY_CACHE: str | None = None
class MatrixClient:
def __init__(self, cfg: dict):
self.homeserver = cfg["homeserver"].rstrip("/")
self.room_id = cfg["room_id"]
self.token = cfg["access_token"]
self.user_id = cfg.get("user_id") or ""
@property
def headers(self) -> dict:
return {"Authorization": f"Bearer {self.token}"}
def set_profile(self) -> None:
bot = CFG.get("bot") or {}
name = bot.get("name")
if name and self.user_id:
url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/displayname"
r = requests.put(url, headers=self.headers, json={"displayname": name}, timeout=20)
if not r.ok:
log(f"matrix display name update failed: {r.status_code} {r.text[:200]}")
avatar_path = bot.get("avatar_path") or ""
if avatar_path:
path = Path(avatar_path)
if path.exists() and self.user_id:
mxc = self.upload_file(path.read_bytes(), path.name, mimetypes.guess_type(path.name)[0] or "image/jpeg")
if mxc:
url = f"{self.homeserver}/_matrix/client/v3/profile/{self.user_id}/avatar_url"
r = requests.put(url, headers=self.headers, json={"avatar_url": mxc}, timeout=20)
if not r.ok:
log(f"matrix avatar update failed: {r.status_code} {r.text[:200]}")
def upload_file(self, data: bytes, filename: str, content_type: str) -> str:
r = requests.post(
f"{self.homeserver}/_matrix/media/v3/upload",
headers={**self.headers, "Content-Type": content_type},
params={"filename": filename},
data=data,
timeout=45,
)
r.raise_for_status()
return r.json()["content_uri"]
def send_text(self, text: str, formatted_html: str | None = None) -> dict:
body = {"msgtype": "m.text", "body": text}
if formatted_html:
body["format"] = "org.matrix.custom.html"
body["formatted_body"] = formatted_html
txn = uuid.uuid4().hex
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}"
r = requests.put(url, headers=self.headers, json=body, timeout=30)
r.raise_for_status()
return r.json()
def send_image_bytes(self, data: bytes, filename: str, content_type: str = "image/jpeg") -> dict:
mxc = self.upload_file(data, filename, content_type)
body = {
"msgtype": "m.image",
"body": filename,
"url": mxc,
"info": {"mimetype": content_type, "size": len(data)},
}
txn = uuid.uuid4().hex
url = f"{self.homeserver}/_matrix/client/v3/rooms/{self.room_id}/send/m.room.message/{txn}"
r = requests.put(url, headers=self.headers, json=body, timeout=30)
r.raise_for_status()
return r.json()
def send_card(self, text: str, formatted_html: str, image: tuple[bytes, str, str] | None = None) -> None:
if image:
try:
self.send_image_bytes(*image)
except Exception as e:
log(f"matrix image send failed: {e}")
self.send_text(text, formatted_html)
class NavidromeClient:
def __init__(self, cfg: dict):
self.base_url = cfg["url"].rstrip("/")
self.username = cfg["username"]
self.password = cfg["password"]
def _params(self) -> dict:
return {
"u": self.username,
"p": self.password,
"v": "1.16.1",
"c": "AoiNotifier",
"f": "json",
}
def get(self, endpoint: str, **params) -> dict:
r = requests.get(
f"{self.base_url}/rest/{endpoint}",
params={**self._params(), **params},
timeout=35,
)
r.raise_for_status()
data = r.json().get("subsonic-response", {})
if data.get("status") != "ok":
raise RuntimeError(f"Navidrome {endpoint} failed: {data}")
return data
def ping(self) -> bool:
try:
return self.get("ping").get("status") == "ok"
except Exception:
return False
def newest_albums(self, music_folder_id: int, size: int) -> list[dict]:
data = self.get("getAlbumList2", type="newest", size=size, musicFolderId=music_folder_id)
return data.get("albumList2", {}).get("album", []) or []
def album(self, album_id: str) -> dict:
return self.get("getAlbum", id=album_id).get("album", {}) or {}
def playlists(self) -> list[dict]:
data = self.get("getPlaylists")
return data.get("playlists", {}).get("playlist", []) or []
def playlist(self, playlist_id: str) -> dict:
return self.get("getPlaylist", id=playlist_id).get("playlist", {}) or {}
def cover(self, cover_art: str | None) -> tuple[bytes, str, str] | None:
if not cover_art:
return None
try:
r = requests.get(
f"{self.base_url}/rest/getCoverArt",
params={**self._params(), "id": cover_art, "size": 900},
timeout=35,
)
r.raise_for_status()
ctype = r.headers.get("Content-Type") or "image/jpeg"
ext = mimetypes.guess_extension(ctype.split(";")[0]) or ".jpg"
return (r.content, f"cover{ext}", ctype)
except Exception as e:
log(f"cover fetch failed: {e}")
return None
MATRIX = MatrixClient(CFG["matrix"])
NAV = NavidromeClient(CFG["navidrome"])
def metadata_headers() -> dict:
ua = (CFG.get("metadata") or {}).get("user_agent") or "AoiNotifier/1.0"
return {"User-Agent": ua, "Accept": "application/json"}
def discovery_setting(key: str) -> str:
db_path = (CFG.get("discovery") or {}).get("db_path") or ""
if not db_path or not Path(db_path).exists():
return ""
try:
with sqlite3.connect(db_path) as conn:
row = conn.execute("SELECT value FROM user_settings WHERE key=?", (key,)).fetchone()
return str(row[0] or "") if row else ""
except Exception as e:
log(f"discovery setting lookup failed for {key}: {e}")
return ""
def lastfm_api_key() -> str:
global LASTFM_API_KEY_CACHE
if LASTFM_API_KEY_CACHE is not None:
return LASTFM_API_KEY_CACHE
LASTFM_API_KEY_CACHE = (CFG.get("metadata") or {}).get("lastfm_api_key") or discovery_setting("lastfm_api_key")
return LASTFM_API_KEY_CACHE
def lastfm_request(method: str, params: dict, timeout: int = 25) -> dict:
api_key = lastfm_api_key()
if not api_key:
return {}
req_params = {
"method": method,
"api_key": api_key,
"format": "json",
"autocorrect": 1,
**params,
}
try:
r = HTTP.get("https://ws.audioscrobbler.com/2.0/", params=req_params, timeout=timeout)
if not r.ok:
return {}
return r.json()
except Exception as e:
log(f"lastfm {method} failed: {e}")
return {}
def lastfm_album_tags(artist: str, album: str, limit: int = 6) -> list[str]:
data = lastfm_request("album.getTopTags", {"artist": artist, "album": album}, timeout=20)
tags = []
for tag in ((data.get("toptags") or {}).get("tag") or [])[:limit]:
name = (tag.get("name") or "").strip()
if name and len(name) <= 32 and not name.isdigit():
tags.append(name)
return tags
def lastfm_artist_summary(artist: str) -> str:
for params in ({"artist": artist, "lang": "ru"}, {"artist": artist}):
data = lastfm_request("artist.getinfo", params, timeout=20)
bio = ((data.get("artist") or {}).get("bio") or {})
summary = clean_lastfm_text(bio.get("summary") or bio.get("content") or "")
if summary:
return summary
return ""
def lastfm_album_search_url(artist: str, album: str) -> str:
return f"https://www.last.fm/search/albums?q={quote_plus(f'{artist} {album}')}"
def bandcamp_album_search_url(artist: str, album: str) -> str:
return f"https://bandcamp.com/search?q={quote_plus(f'{artist} {album}')}&item_type=a"
def album_source_links(artist: str, album: str, lastfm_url: str = "") -> list[dict[str, str]]:
return [
{"label": "Last.fm", "url": lastfm_url or lastfm_album_search_url(artist, album)},
{"label": "Bandcamp", "url": bandcamp_album_search_url(artist, album)},
]
def lastfm_album_info(artist: str, album: str) -> dict[str, str]:
for params in ({"artist": artist, "album": album, "lang": "ru"}, {"artist": artist, "album": album}):
data = lastfm_request("album.getinfo", params)
album_data = data.get("album") or {}
wiki = album_data.get("wiki") or {}
summary = clean_lastfm_text(wiki.get("summary") or wiki.get("content") or "")
if summary:
return {"summary": summary, "url": album_data.get("url") or ""}
tags = lastfm_album_tags(artist, album)
artist_summary = lastfm_artist_summary(artist)
parts = []
if tags:
parts.append("Last.fm описывает релиз тегами: " + ", ".join(tags) + ".")
if artist_summary:
parts.append(artist_summary)
if parts:
return {"summary": " ".join(parts), "url": ""}
return {"summary": "", "url": ""}
def lastfm_album_summary(artist: str, album: str) -> str:
return lastfm_album_info(artist, album).get("summary", "")
def musicbrainz_release_hint(artist: str, album: str) -> dict:
if not (CFG.get("metadata") or {}).get("musicbrainz_enabled", True):
return {}
try:
query = f'artist:"{artist}" AND release:"{album}"'
r = HTTP.get(
"https://musicbrainz.org/ws/2/release/",
params={"query": query, "fmt": "json", "limit": 3},
headers=metadata_headers(),
timeout=25,
)
if not r.ok:
return {}
releases = r.json().get("releases") or []
if not releases:
return {}
rel = releases[0]
return {
"date": rel.get("date") or "",
"country": rel.get("country") or "",
"status": rel.get("status") or "",
"score": rel.get("score") or 0,
}
except Exception as e:
log(f"musicbrainz lookup failed: {e}")
return {}
def lastfm_tags_for_tracks(tracks: list[dict]) -> Counter:
api_key = lastfm_api_key()
tags = Counter()
if not api_key:
return tags
for tr in tracks[:8]:
try:
r = HTTP.get(
"https://ws.audioscrobbler.com/2.0/",
params={
"method": "track.getTopTags",
"artist": tr.get("artist") or "",
"track": tr.get("track") or tr.get("title") or "",
"api_key": api_key,
"format": "json",
"autocorrect": 1,
},
timeout=20,
)
if not r.ok:
continue
for tag in ((r.json().get("toptags") or {}).get("tag") or [])[:5]:
name = (tag.get("name") or "").lower().strip()
if name and len(name) <= 32:
tags[name] += 1
except Exception:
continue
return tags
def album_plain_and_html(album: dict, library_label: str, songs: list[dict]) -> tuple[str, str]:
artist = album.get("artist") or "Unknown artist"
title = album.get("name") or album.get("album") or "Unknown album"
year = album.get("year") or ""
duration = int(album.get("duration") or sum(int(s.get("duration") or 0) for s in songs) or 0)
minutes = duration // 60 if duration else 0
lastfm_info = lastfm_album_info(artist, title)
summary = lastfm_info.get("summary") or ""
links = album_source_links(artist, title, lastfm_info.get("url") or "")
hint = musicbrainz_release_hint(artist, title)
release_date = album.get("created") or album.get("starred") or hint.get("date") or ""
track_lines = []
for idx, song in enumerate(songs[:30], 1):
name = song.get("title") or "Untitled"
dur = int(song.get("duration") or 0)
suffix = f" ({dur // 60}:{dur % 60:02d})" if dur else ""
track_lines.append(f"{idx:02d}. {name}{suffix}")
if len(songs) > 30:
track_lines.append(f"... и еще {len(songs) - 30}")
header = f"🎧 Новый релиз в Navidrome / {library_label}"
facts = [
f"{artist} — {title}" + (f" ({year})" if year else ""),
f"Треков: {len(songs)}" + (f", длительность: {minutes} мин." if minutes else ""),
]
if release_date:
facts.append(f"Дата/добавление: {release_date}")
if hint.get("country") or hint.get("status"):
facts.append("MusicBrainz: " + ", ".join(x for x in [hint.get("country"), hint.get("status")] if x))
if summary:
description_fact = f"Описание: {summary}"
else:
description_fact = "Описание: пока не нашла нормальное описание, оставляю чистый треклист."
facts.append(description_fact)
facts.append("Ссылки: " + ", ".join(f"{link['label']}: {link['url']}" for link in links))
plain = "\n".join([header, "", *facts, "", "━━ ТРЕКЛИСТ ━━", *track_lines])
html_parts = [
f"
{html.escape(description_fact)}
") html_parts.append( "Ссылки: " + ", ".join( f'{html.escape(link["label"])}' for link in links ) + "
" ) html_parts.append("Треклист
{html.escape(summary)}
", "Интересное
{html.escape(artist)} — {html.escape(album)}
", "