feat: rebuild iOS app from API audit + new podcast/media service
iOS App (complete rebuild): - Audited all fitness API endpoints against live responses - Models match exact API field names (snapshot_ prefixes, UUID strings) - FoodEntry uses computed properties (foodName, calories, etc.) wrapping snapshot fields - Flexible Int/Double decoding for all numeric fields - AI assistant with raw JSON state management (JSONSerialization, not Codable) - Home dashboard with custom background, frosted glass calorie widget - Fitness: Today/Templates/Goals/Foods tabs - Food search with recent + all sections - Meal sections with colored accent bars, swipe to delete - 120fps ProMotion, iOS 17+ @Observable Podcast/Media Service: - FastAPI backend for podcast RSS + local audiobook folders - Shows, episodes, playback progress, queue management - RSS feed fetching with feedparser + ETag support - Local folder scanning with mutagen for audio metadata - HTTP Range streaming for local audio files - Playback events logging (play/pause/seek/complete) - Reuses brain's PostgreSQL + Redis - media_ prefixed tables Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
298
services/media/app/worker/tasks.py
Normal file
298
services/media/app/worker/tasks.py
Normal file
@@ -0,0 +1,298 @@
|
||||
"""Worker tasks — feed fetching, local folder scanning, scheduling loop."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from time import mktime
|
||||
|
||||
import feedparser
|
||||
import httpx
|
||||
from sqlalchemy import create_engine, select, text
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from app.config import DATABASE_URL_SYNC, FEED_FETCH_INTERVAL, AUDIO_EXTENSIONS
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Sync engine for worker
|
||||
engine = create_engine(DATABASE_URL_SYNC, pool_size=5, max_overflow=2)
|
||||
SessionLocal = sessionmaker(bind=engine)
|
||||
|
||||
|
||||
def _parse_duration(value: str) -> int | None:
|
||||
if not value:
|
||||
return None
|
||||
parts = value.strip().split(":")
|
||||
try:
|
||||
if len(parts) == 3:
|
||||
return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
|
||||
elif len(parts) == 2:
|
||||
return int(parts[0]) * 60 + int(parts[1])
|
||||
else:
|
||||
return int(float(parts[0]))
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
|
||||
def fetch_feed(feed_url: str, etag: str = None, last_modified: str = None):
|
||||
"""Fetch and parse an RSS feed synchronously."""
|
||||
headers = {}
|
||||
if etag:
|
||||
headers["If-None-Match"] = etag
|
||||
if last_modified:
|
||||
headers["If-Modified-Since"] = last_modified
|
||||
|
||||
with httpx.Client(timeout=30, follow_redirects=True) as client:
|
||||
resp = client.get(feed_url, headers=headers)
|
||||
|
||||
if resp.status_code == 304:
|
||||
return None, None, None
|
||||
|
||||
resp.raise_for_status()
|
||||
feed = feedparser.parse(resp.text)
|
||||
new_etag = resp.headers.get("ETag")
|
||||
new_lm = resp.headers.get("Last-Modified")
|
||||
return feed, new_etag, new_lm
|
||||
|
||||
|
||||
def refresh_rss_show(session: Session, show_id: str, feed_url: str, etag: str, last_modified: str, user_id: str):
|
||||
"""Refresh a single RSS show, inserting new episodes."""
|
||||
try:
|
||||
feed, new_etag, new_lm = fetch_feed(feed_url, etag, last_modified)
|
||||
except Exception as e:
|
||||
log.error("Failed to fetch feed %s: %s", feed_url, e)
|
||||
return 0
|
||||
|
||||
if feed is None:
|
||||
log.debug("Feed %s not modified", feed_url)
|
||||
return 0
|
||||
|
||||
# Update show metadata
|
||||
session.execute(
|
||||
text("""
|
||||
UPDATE media_shows
|
||||
SET etag = :etag, last_modified = :lm, last_fetched_at = NOW()
|
||||
WHERE id = :id
|
||||
"""),
|
||||
{"etag": new_etag, "lm": new_lm, "id": show_id},
|
||||
)
|
||||
|
||||
# Get existing guids
|
||||
rows = session.execute(
|
||||
text("SELECT guid FROM media_episodes WHERE show_id = :sid"),
|
||||
{"sid": show_id},
|
||||
).fetchall()
|
||||
existing_guids = {r[0] for r in rows}
|
||||
|
||||
new_count = 0
|
||||
for entry in feed.entries:
|
||||
audio_url = None
|
||||
file_size = None
|
||||
for enc in getattr(entry, "enclosures", []):
|
||||
if enc.get("type", "").startswith("audio/") or enc.get("href", "").split("?")[0].endswith(
|
||||
(".mp3", ".m4a", ".ogg", ".opus")
|
||||
):
|
||||
audio_url = enc.get("href")
|
||||
file_size = int(enc.get("length", 0)) or None
|
||||
break
|
||||
if not audio_url:
|
||||
for link in getattr(entry, "links", []):
|
||||
if link.get("type", "").startswith("audio/"):
|
||||
audio_url = link.get("href")
|
||||
file_size = int(link.get("length", 0)) or None
|
||||
break
|
||||
if not audio_url:
|
||||
continue
|
||||
|
||||
guid = getattr(entry, "id", None) or audio_url
|
||||
if guid in existing_guids:
|
||||
continue
|
||||
|
||||
duration = None
|
||||
itunes_duration = getattr(entry, "itunes_duration", None)
|
||||
if itunes_duration:
|
||||
duration = _parse_duration(str(itunes_duration))
|
||||
|
||||
published = None
|
||||
if hasattr(entry, "published_parsed") and entry.published_parsed:
|
||||
try:
|
||||
published = datetime.fromtimestamp(mktime(entry.published_parsed))
|
||||
except (TypeError, ValueError, OverflowError):
|
||||
pass
|
||||
|
||||
ep_artwork = None
|
||||
itunes_image = getattr(entry, "itunes_image", None)
|
||||
if itunes_image and isinstance(itunes_image, dict):
|
||||
ep_artwork = itunes_image.get("href")
|
||||
|
||||
session.execute(
|
||||
text("""
|
||||
INSERT INTO media_episodes
|
||||
(id, show_id, user_id, title, description, audio_url,
|
||||
duration_seconds, file_size_bytes, published_at, guid, artwork_url)
|
||||
VALUES
|
||||
(:id, :show_id, :user_id, :title, :desc, :audio_url,
|
||||
:dur, :size, :pub, :guid, :art)
|
||||
ON CONFLICT (show_id, guid) DO NOTHING
|
||||
"""),
|
||||
{
|
||||
"id": str(uuid.uuid4()),
|
||||
"show_id": show_id,
|
||||
"user_id": user_id,
|
||||
"title": getattr(entry, "title", None),
|
||||
"desc": getattr(entry, "summary", None),
|
||||
"audio_url": audio_url,
|
||||
"dur": duration,
|
||||
"size": file_size,
|
||||
"pub": published,
|
||||
"guid": guid,
|
||||
"art": ep_artwork,
|
||||
},
|
||||
)
|
||||
new_count += 1
|
||||
|
||||
session.commit()
|
||||
if new_count:
|
||||
log.info("Show %s: added %d new episodes", show_id, new_count)
|
||||
return new_count
|
||||
|
||||
|
||||
def refresh_local_show(session: Session, show_id: str, local_path: str, user_id: str):
|
||||
"""Re-scan a local folder for new audio files."""
|
||||
if not os.path.isdir(local_path):
|
||||
log.warning("Local path does not exist: %s", local_path)
|
||||
return 0
|
||||
|
||||
rows = session.execute(
|
||||
text("SELECT guid FROM media_episodes WHERE show_id = :sid"),
|
||||
{"sid": show_id},
|
||||
).fetchall()
|
||||
existing_guids = {r[0] for r in rows}
|
||||
|
||||
new_count = 0
|
||||
for fname in sorted(os.listdir(local_path)):
|
||||
ext = os.path.splitext(fname)[1].lower()
|
||||
if ext not in AUDIO_EXTENSIONS:
|
||||
continue
|
||||
|
||||
fpath = os.path.join(local_path, fname)
|
||||
if not os.path.isfile(fpath):
|
||||
continue
|
||||
|
||||
guid = f"local:{fpath}"
|
||||
if guid in existing_guids:
|
||||
continue
|
||||
|
||||
title = os.path.splitext(fname)[0]
|
||||
duration = None
|
||||
file_size = os.path.getsize(fpath)
|
||||
|
||||
try:
|
||||
from mutagen import File as MutagenFile
|
||||
audio = MutagenFile(fpath)
|
||||
if audio and audio.info:
|
||||
duration = int(audio.info.length)
|
||||
if audio and audio.tags:
|
||||
for tag_key in ("title", "TIT2", "\xa9nam"):
|
||||
tag_val = audio.tags.get(tag_key)
|
||||
if tag_val:
|
||||
title = str(tag_val[0]) if isinstance(tag_val, list) else str(tag_val)
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
stat = os.stat(fpath)
|
||||
published = datetime.fromtimestamp(stat.st_mtime)
|
||||
|
||||
session.execute(
|
||||
text("""
|
||||
INSERT INTO media_episodes
|
||||
(id, show_id, user_id, title, audio_url,
|
||||
duration_seconds, file_size_bytes, published_at, guid)
|
||||
VALUES
|
||||
(:id, :show_id, :user_id, :title, :audio_url,
|
||||
:dur, :size, :pub, :guid)
|
||||
ON CONFLICT (show_id, guid) DO NOTHING
|
||||
"""),
|
||||
{
|
||||
"id": str(uuid.uuid4()),
|
||||
"show_id": show_id,
|
||||
"user_id": user_id,
|
||||
"title": title,
|
||||
"audio_url": fpath,
|
||||
"dur": duration,
|
||||
"size": file_size,
|
||||
"pub": published,
|
||||
"guid": guid,
|
||||
},
|
||||
)
|
||||
new_count += 1
|
||||
|
||||
if new_count:
|
||||
session.execute(
|
||||
text("UPDATE media_shows SET last_fetched_at = NOW() WHERE id = :id"),
|
||||
{"id": show_id},
|
||||
)
|
||||
session.commit()
|
||||
log.info("Local show %s: added %d new files", show_id, new_count)
|
||||
return new_count
|
||||
|
||||
|
||||
def refresh_all_shows():
|
||||
"""Refresh all shows — RSS and local."""
|
||||
session = SessionLocal()
|
||||
try:
|
||||
rows = session.execute(
|
||||
text("SELECT id, user_id, feed_url, local_path, show_type, etag, last_modified FROM media_shows")
|
||||
).fetchall()
|
||||
|
||||
total_new = 0
|
||||
for row in rows:
|
||||
sid, uid, feed_url, local_path, show_type, etag, lm = row
|
||||
if show_type == "podcast" and feed_url:
|
||||
total_new += refresh_rss_show(session, str(sid), feed_url, etag, lm, uid)
|
||||
elif show_type == "local" and local_path:
|
||||
total_new += refresh_local_show(session, str(sid), local_path, uid)
|
||||
|
||||
if total_new:
|
||||
log.info("Feed refresh complete: %d new episodes total", total_new)
|
||||
except Exception:
|
||||
log.exception("Error during feed refresh")
|
||||
session.rollback()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def run_scheduler():
|
||||
"""Simple scheduler loop — refresh feeds at configured interval."""
|
||||
log.info("Media worker started — refresh interval: %ds", FEED_FETCH_INTERVAL)
|
||||
|
||||
# Wait for DB to be ready
|
||||
for attempt in range(10):
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("SELECT 1"))
|
||||
break
|
||||
except Exception:
|
||||
log.info("Waiting for database... (attempt %d)", attempt + 1)
|
||||
time.sleep(3)
|
||||
|
||||
while True:
|
||||
try:
|
||||
refresh_all_shows()
|
||||
except Exception:
|
||||
log.exception("Scheduler loop error")
|
||||
|
||||
time.sleep(FEED_FETCH_INTERVAL)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_scheduler()
|
||||
Reference in New Issue
Block a user