iOS App (complete rebuild): - Audited all fitness API endpoints against live responses - Models match exact API field names (snapshot_ prefixes, UUID strings) - FoodEntry uses computed properties (foodName, calories, etc.) wrapping snapshot fields - Flexible Int/Double decoding for all numeric fields - AI assistant with raw JSON state management (JSONSerialization, not Codable) - Home dashboard with custom background, frosted glass calorie widget - Fitness: Today/Templates/Goals/Foods tabs - Food search with recent + all sections - Meal sections with colored accent bars, swipe to delete - 120fps ProMotion, iOS 17+ @Observable Podcast/Media Service: - FastAPI backend for podcast RSS + local audiobook folders - Shows, episodes, playback progress, queue management - RSS feed fetching with feedparser + ETag support - Local folder scanning with mutagen for audio metadata - HTTP Range streaming for local audio files - Playback events logging (play/pause/seek/complete) - Reuses brain's PostgreSQL + Redis - media_ prefixed tables Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
299 lines
9.5 KiB
Python
299 lines
9.5 KiB
Python
"""Worker tasks — feed fetching, local folder scanning, scheduling loop."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
from datetime import datetime
|
|
from time import mktime
|
|
|
|
import feedparser
|
|
import httpx
|
|
from sqlalchemy import create_engine, select, text
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
|
|
from app.config import DATABASE_URL_SYNC, FEED_FETCH_INTERVAL, AUDIO_EXTENSIONS
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Sync engine for worker
|
|
engine = create_engine(DATABASE_URL_SYNC, pool_size=5, max_overflow=2)
|
|
SessionLocal = sessionmaker(bind=engine)
|
|
|
|
|
|
def _parse_duration(value: str) -> int | None:
|
|
if not value:
|
|
return None
|
|
parts = value.strip().split(":")
|
|
try:
|
|
if len(parts) == 3:
|
|
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
|
|
elif len(parts) == 2:
|
|
return int(parts[0]) * 60 + int(parts[1])
|
|
else:
|
|
return int(float(parts[0]))
|
|
except (ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def fetch_feed(feed_url: str, etag: str = None, last_modified: str = None):
|
|
"""Fetch and parse an RSS feed synchronously."""
|
|
headers = {}
|
|
if etag:
|
|
headers["If-None-Match"] = etag
|
|
if last_modified:
|
|
headers["If-Modified-Since"] = last_modified
|
|
|
|
with httpx.Client(timeout=30, follow_redirects=True) as client:
|
|
resp = client.get(feed_url, headers=headers)
|
|
|
|
if resp.status_code == 304:
|
|
return None, None, None
|
|
|
|
resp.raise_for_status()
|
|
feed = feedparser.parse(resp.text)
|
|
new_etag = resp.headers.get("ETag")
|
|
new_lm = resp.headers.get("Last-Modified")
|
|
return feed, new_etag, new_lm
|
|
|
|
|
|
def refresh_rss_show(session: Session, show_id: str, feed_url: str, etag: str, last_modified: str, user_id: str):
|
|
"""Refresh a single RSS show, inserting new episodes."""
|
|
try:
|
|
feed, new_etag, new_lm = fetch_feed(feed_url, etag, last_modified)
|
|
except Exception as e:
|
|
log.error("Failed to fetch feed %s: %s", feed_url, e)
|
|
return 0
|
|
|
|
if feed is None:
|
|
log.debug("Feed %s not modified", feed_url)
|
|
return 0
|
|
|
|
# Update show metadata
|
|
session.execute(
|
|
text("""
|
|
UPDATE media_shows
|
|
SET etag = :etag, last_modified = :lm, last_fetched_at = NOW()
|
|
WHERE id = :id
|
|
"""),
|
|
{"etag": new_etag, "lm": new_lm, "id": show_id},
|
|
)
|
|
|
|
# Get existing guids
|
|
rows = session.execute(
|
|
text("SELECT guid FROM media_episodes WHERE show_id = :sid"),
|
|
{"sid": show_id},
|
|
).fetchall()
|
|
existing_guids = {r[0] for r in rows}
|
|
|
|
new_count = 0
|
|
for entry in feed.entries:
|
|
audio_url = None
|
|
file_size = None
|
|
for enc in getattr(entry, "enclosures", []):
|
|
if enc.get("type", "").startswith("audio/") or enc.get("href", "").split("?")[0].endswith(
|
|
(".mp3", ".m4a", ".ogg", ".opus")
|
|
):
|
|
audio_url = enc.get("href")
|
|
file_size = int(enc.get("length", 0)) or None
|
|
break
|
|
if not audio_url:
|
|
for link in getattr(entry, "links", []):
|
|
if link.get("type", "").startswith("audio/"):
|
|
audio_url = link.get("href")
|
|
file_size = int(link.get("length", 0)) or None
|
|
break
|
|
if not audio_url:
|
|
continue
|
|
|
|
guid = getattr(entry, "id", None) or audio_url
|
|
if guid in existing_guids:
|
|
continue
|
|
|
|
duration = None
|
|
itunes_duration = getattr(entry, "itunes_duration", None)
|
|
if itunes_duration:
|
|
duration = _parse_duration(str(itunes_duration))
|
|
|
|
published = None
|
|
if hasattr(entry, "published_parsed") and entry.published_parsed:
|
|
try:
|
|
published = datetime.fromtimestamp(mktime(entry.published_parsed))
|
|
except (TypeError, ValueError, OverflowError):
|
|
pass
|
|
|
|
ep_artwork = None
|
|
itunes_image = getattr(entry, "itunes_image", None)
|
|
if itunes_image and isinstance(itunes_image, dict):
|
|
ep_artwork = itunes_image.get("href")
|
|
|
|
session.execute(
|
|
text("""
|
|
INSERT INTO media_episodes
|
|
(id, show_id, user_id, title, description, audio_url,
|
|
duration_seconds, file_size_bytes, published_at, guid, artwork_url)
|
|
VALUES
|
|
(:id, :show_id, :user_id, :title, :desc, :audio_url,
|
|
:dur, :size, :pub, :guid, :art)
|
|
ON CONFLICT (show_id, guid) DO NOTHING
|
|
"""),
|
|
{
|
|
"id": str(uuid.uuid4()),
|
|
"show_id": show_id,
|
|
"user_id": user_id,
|
|
"title": getattr(entry, "title", None),
|
|
"desc": getattr(entry, "summary", None),
|
|
"audio_url": audio_url,
|
|
"dur": duration,
|
|
"size": file_size,
|
|
"pub": published,
|
|
"guid": guid,
|
|
"art": ep_artwork,
|
|
},
|
|
)
|
|
new_count += 1
|
|
|
|
session.commit()
|
|
if new_count:
|
|
log.info("Show %s: added %d new episodes", show_id, new_count)
|
|
return new_count
|
|
|
|
|
|
def refresh_local_show(session: Session, show_id: str, local_path: str, user_id: str):
|
|
"""Re-scan a local folder for new audio files."""
|
|
if not os.path.isdir(local_path):
|
|
log.warning("Local path does not exist: %s", local_path)
|
|
return 0
|
|
|
|
rows = session.execute(
|
|
text("SELECT guid FROM media_episodes WHERE show_id = :sid"),
|
|
{"sid": show_id},
|
|
).fetchall()
|
|
existing_guids = {r[0] for r in rows}
|
|
|
|
new_count = 0
|
|
for fname in sorted(os.listdir(local_path)):
|
|
ext = os.path.splitext(fname)[1].lower()
|
|
if ext not in AUDIO_EXTENSIONS:
|
|
continue
|
|
|
|
fpath = os.path.join(local_path, fname)
|
|
if not os.path.isfile(fpath):
|
|
continue
|
|
|
|
guid = f"local:{fpath}"
|
|
if guid in existing_guids:
|
|
continue
|
|
|
|
title = os.path.splitext(fname)[0]
|
|
duration = None
|
|
file_size = os.path.getsize(fpath)
|
|
|
|
try:
|
|
from mutagen import File as MutagenFile
|
|
audio = MutagenFile(fpath)
|
|
if audio and audio.info:
|
|
duration = int(audio.info.length)
|
|
if audio and audio.tags:
|
|
for tag_key in ("title", "TIT2", "\xa9nam"):
|
|
tag_val = audio.tags.get(tag_key)
|
|
if tag_val:
|
|
title = str(tag_val[0]) if isinstance(tag_val, list) else str(tag_val)
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
stat = os.stat(fpath)
|
|
published = datetime.fromtimestamp(stat.st_mtime)
|
|
|
|
session.execute(
|
|
text("""
|
|
INSERT INTO media_episodes
|
|
(id, show_id, user_id, title, audio_url,
|
|
duration_seconds, file_size_bytes, published_at, guid)
|
|
VALUES
|
|
(:id, :show_id, :user_id, :title, :audio_url,
|
|
:dur, :size, :pub, :guid)
|
|
ON CONFLICT (show_id, guid) DO NOTHING
|
|
"""),
|
|
{
|
|
"id": str(uuid.uuid4()),
|
|
"show_id": show_id,
|
|
"user_id": user_id,
|
|
"title": title,
|
|
"audio_url": fpath,
|
|
"dur": duration,
|
|
"size": file_size,
|
|
"pub": published,
|
|
"guid": guid,
|
|
},
|
|
)
|
|
new_count += 1
|
|
|
|
if new_count:
|
|
session.execute(
|
|
text("UPDATE media_shows SET last_fetched_at = NOW() WHERE id = :id"),
|
|
{"id": show_id},
|
|
)
|
|
session.commit()
|
|
log.info("Local show %s: added %d new files", show_id, new_count)
|
|
return new_count
|
|
|
|
|
|
def refresh_all_shows():
|
|
"""Refresh all shows — RSS and local."""
|
|
session = SessionLocal()
|
|
try:
|
|
rows = session.execute(
|
|
text("SELECT id, user_id, feed_url, local_path, show_type, etag, last_modified FROM media_shows")
|
|
).fetchall()
|
|
|
|
total_new = 0
|
|
for row in rows:
|
|
sid, uid, feed_url, local_path, show_type, etag, lm = row
|
|
if show_type == "podcast" and feed_url:
|
|
total_new += refresh_rss_show(session, str(sid), feed_url, etag, lm, uid)
|
|
elif show_type == "local" and local_path:
|
|
total_new += refresh_local_show(session, str(sid), local_path, uid)
|
|
|
|
if total_new:
|
|
log.info("Feed refresh complete: %d new episodes total", total_new)
|
|
except Exception:
|
|
log.exception("Error during feed refresh")
|
|
session.rollback()
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def run_scheduler():
|
|
"""Simple scheduler loop — refresh feeds at configured interval."""
|
|
log.info("Media worker started — refresh interval: %ds", FEED_FETCH_INTERVAL)
|
|
|
|
# Wait for DB to be ready
|
|
for attempt in range(10):
|
|
try:
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1"))
|
|
break
|
|
except Exception:
|
|
log.info("Waiting for database... (attempt %d)", attempt + 1)
|
|
time.sleep(3)
|
|
|
|
while True:
|
|
try:
|
|
refresh_all_shows()
|
|
except Exception:
|
|
log.exception("Scheduler loop error")
|
|
|
|
time.sleep(FEED_FETCH_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_scheduler()
|