Files
platform/gateway/assistant.py
Yusuf Suleman 3f3d290054
All checks were successful
Security Checks / dependency-audit (push) Successful in 13s
Security Checks / secret-scanning (push) Successful in 4s
Security Checks / dockerfile-lint (push) Successful in 4s
fix: remove leftover sys.stderr debug log causing NameError
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 14:00:54 -05:00

1579 lines
61 KiB
Python

"""
Platform Gateway — AI Assistant endpoints (fitness + brain).
Replaces the SvelteKit assistant routes so both web and iOS can use them.
"""
import hashlib
import json
import re
import urllib.request
import urllib.error
from datetime import datetime
from config import (
FITNESS_URL, BRAIN_URL, OPENAI_API_KEY, OPENAI_MODEL,
)
from proxy import proxy_request
# ── Utilities ──
def today_iso():
now = datetime.now()
return now.strftime("%Y-%m-%d")
def to_number(value, fallback=0):
if isinstance(value, (int, float)) and not (isinstance(value, float) and (value != value)):
return value
if isinstance(value, str):
try:
parsed = float(value)
if parsed == parsed: # not NaN
return parsed
except ValueError:
pass
return fallback
def clamp_draft(inp):
if not inp or not isinstance(inp, dict):
inp = {}
meal = inp.get("meal_type")
if meal not in ("breakfast", "lunch", "dinner", "snack"):
meal = "snack"
return {
"food_name": str(inp.get("food_name") or "").strip(),
"meal_type": meal,
"entry_date": str(inp.get("entry_date") or "") or today_iso(),
"quantity": max(to_number(inp.get("quantity"), 1), 0),
"unit": str(inp.get("unit") or "").strip() or "serving",
"calories": max(to_number(inp.get("calories")), 0),
"protein": max(to_number(inp.get("protein")), 0),
"carbs": max(to_number(inp.get("carbs")), 0),
"fat": max(to_number(inp.get("fat")), 0),
"sugar": max(to_number(inp.get("sugar")), 0),
"fiber": max(to_number(inp.get("fiber")), 0),
"note": str(inp.get("note") or "").strip(),
"default_serving_label": str(inp.get("default_serving_label") or "").strip(),
}
def parse_leading_quantity(name):
trimmed = name.strip()
m = re.match(r'^(\d+(?:\.\d+)?)\s+(.+)$', trimmed)
if not m:
return None, trimmed
return float(m.group(1)), m.group(2).strip()
def canonical_food_name(name):
cleaned = re.sub(r'^(?:a|an|the)\s+', '', name.strip(), flags=re.IGNORECASE)
cleaned = re.sub(r'\s+', ' ', cleaned)
parts = cleaned.split()
result = []
for part in parts:
if not part:
continue
if re.match(r'[A-Z]{2,}', part):
result.append(part)
else:
result.append(part[0].upper() + part[1:].lower())
return " ".join(result)
def normalized_food_key(name):
canon = canonical_food_name(name).lower()
canon = re.sub(r'[^a-z0-9\s]', ' ', canon)
canon = re.sub(r'\b(\d+(?:\.\d+)?)\b', ' ', canon)
return re.sub(r'\s+', ' ', canon).strip()
def food_base_unit(draft):
unit = (draft.get("unit") or "serving").strip().lower()
if unit in ("piece", "slice", "cup", "scoop", "serving"):
return unit
return "serving"
def default_serving_name(base_unit):
return f"1 {base_unit}"
def has_material_nutrition_mismatch(draft, matched_food, quantity):
if quantity <= 0:
quantity = 1
draft_per_base = {
"calories": max((draft.get("calories") or 0) / quantity, 0),
"protein": max((draft.get("protein") or 0) / quantity, 0),
"carbs": max((draft.get("carbs") or 0) / quantity, 0),
"fat": max((draft.get("fat") or 0) / quantity, 0),
"sugar": max((draft.get("sugar") or 0) / quantity, 0),
"fiber": max((draft.get("fiber") or 0) / quantity, 0),
}
current_per_base = {
"calories": matched_food.get("calories_per_base") or 0,
"protein": matched_food.get("protein_per_base") or 0,
"carbs": matched_food.get("carbs_per_base") or 0,
"fat": matched_food.get("fat_per_base") or 0,
"sugar": matched_food.get("sugar_per_base") or 0,
"fiber": matched_food.get("fiber_per_base") or 0,
}
for key in draft_per_base:
nxt = draft_per_base[key]
cur = current_per_base[key]
if abs(nxt - cur) >= 5:
return True
if max(nxt, cur) <= 0:
continue
if abs(nxt - cur) / max(nxt, cur) >= 0.12:
return True
return False
def entry_idempotency_key(draft, index=0):
payload = json.dumps({
"index": index,
"food_name": draft.get("food_name") or "",
"meal_type": draft.get("meal_type") or "snack",
"entry_date": draft.get("entry_date") or today_iso(),
"quantity": draft.get("quantity") or 1,
"unit": draft.get("unit") or "serving",
"calories": draft.get("calories") or 0,
"protein": draft.get("protein") or 0,
"carbs": draft.get("carbs") or 0,
"fat": draft.get("fat") or 0,
"sugar": draft.get("sugar") or 0,
"fiber": draft.get("fiber") or 0,
"note": draft.get("note") or "",
}, sort_keys=True)
return hashlib.sha256(payload.encode()).hexdigest()
def has_complete_draft(draft):
return bool(draft.get("food_name")) and bool(draft.get("meal_type")) and isinstance(draft.get("calories"), (int, float))
def has_complete_drafts(drafts):
return isinstance(drafts, list) and len(drafts) > 0 and all(has_complete_draft(d) for d in drafts)
def is_explicit_confirmation(text):
clean = text.strip().lower()
if not clean:
return False
patterns = [
r'^add it[.!]?$', r'^log it[.!]?$', r'^save it[.!]?$',
r'^looks good[.!]?$', r'^looks good add it[.!]?$',
r'^that looks good[.!]?$', r'^that looks good add it[.!]?$',
r'^confirm[.!]?$', r'^go ahead[.!]?$',
r'^yes add it[.!]?$', r'^yes log it[.!]?$', r'^yes save it[.!]?$',
]
return any(re.match(p, clean) for p in patterns)
def is_retry_request(text):
clean = text.strip().lower()
if not clean:
return False
phrases = [
"that's not right", "that is not right", "wrong item", "wrong food",
"wrong meal", "try again", "search again", "guess again", "redo that",
"start over", "that is wrong", "that's wrong",
]
return any(phrase in clean for phrase in phrases)
def draft_for_retry(draft):
return {
"meal_type": draft.get("meal_type"),
"entry_date": draft.get("entry_date") or today_iso(),
"quantity": draft.get("quantity") or 1,
"unit": draft.get("unit") or "serving",
"food_name": "",
"calories": 0, "protein": 0, "carbs": 0, "fat": 0, "sugar": 0, "fiber": 0,
"note": draft.get("note") or "",
"default_serving_label": "",
}
# ── Brain helper functions ──
def is_confirmation(text):
clean = text.strip().lower()
return clean in (
"yes", "yes delete it", "delete it", "confirm",
"yes do it", "do it", "go ahead",
)
def is_undo(text):
return bool(re.match(r'^(undo|undo last change|undo that|revert that)$', text.strip(), re.IGNORECASE))
def wants_new_note(text):
return bool(re.search(r'create (?:a )?new note', text, re.IGNORECASE) or
re.search(r'make (?:that|it) a new note', text, re.IGNORECASE))
def move_target(text):
m = re.search(r'(?:add|move)\s+(?:that|it)\s+to\s+(.+)$', text, re.IGNORECASE)
return m.group(1).strip() if m else None
def wants_delete(text):
return bool(re.search(r'\bdelete\b', text, re.IGNORECASE) and
re.search(r'\b(note|item|that)\b', text, re.IGNORECASE))
def is_explicit_update(text):
return bool(re.search(r'\b(update|edit|change|replace|correct|set)\b', text, re.IGNORECASE))
def is_listing_intent(text):
return bool(re.search(r'\b(what|which|show|list)\b', text, re.IGNORECASE) and
re.search(r'\b(do i have|have i saved|saved|notes|items|books?|pdfs?|links?|documents?|files?)\b', text, re.IGNORECASE))
def normalize_search_seed(text):
t = text.strip()
t = re.sub(r'^what\s+(books?|notes?|pdfs?|links?|documents?|files?)\s+do i have saved\s*', r'\1 ', t, flags=re.IGNORECASE)
t = re.sub(r'^show me\s+(all\s+)?(books?|notes?|pdfs?|links?|documents?|files?)\s*', r'\2 ', t, flags=re.IGNORECASE)
t = re.sub(r'^list\s+(my\s+)?(books?|notes?|pdfs?|links?|documents?|files?)\s*', r'\2 ', t, flags=re.IGNORECASE)
t = re.sub(r'^add note\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'^add\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'^save\s+(?:this|that)\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'^what do i have about\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'^what have i saved about\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'^find\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'^search for\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'^answer\s+', '', t, flags=re.IGNORECASE)
return t.strip()
def build_candidate_summary(items):
if not items:
return "No candidates found."
lines = []
for i, item in enumerate(items):
snippet = (item.get("raw_content") or item.get("extracted_text") or item.get("summary") or "")
snippet = re.sub(r'\s+', ' ', snippet)[:220]
lines.append(
f"{i+1}. id={item.get('id')}\n"
f"title={item.get('title') or 'Untitled'}\n"
f"type={item.get('type')}\n"
f"folder={item.get('folder') or ''}\n"
f"tags={', '.join(item.get('tags') or [])}\n"
f"snippet={snippet}"
)
return "\n\n".join(lines)
def to_source(item):
return {
"id": item.get("id"),
"title": item.get("title") or "Untitled",
"type": item.get("type"),
"href": f"/brain?item={item.get('id')}",
}
def source_links_from_ids(items, ids):
if not isinstance(ids, list) or not ids:
return []
by_id = {item["id"]: item for item in items if "id" in item}
return [to_source(by_id[i]) for i in ids if i in by_id]
# ── Recent messages helper ──
def recent_messages(messages, limit=10):
if not isinstance(messages, list):
return []
result = []
for m in messages:
if not m or not isinstance(m, dict):
continue
role = "assistant" if m.get("role") == "assistant" else "user"
content = str(m.get("content") or "")[:4000]
if content.strip():
result.append({"role": role, "content": content})
return result[-limit:]
def last_user_message(messages):
for m in reversed(messages):
if m.get("role") == "user":
return m.get("content", "").strip()
return ""
# ── OpenAI helper ──
def call_openai(system_prompt, messages, temperature=0.2, max_tokens=900, image_url=None):
if not OPENAI_API_KEY:
return None
api_messages = [{"role": "system", "content": system_prompt}]
for m in messages:
api_messages.append({"role": m["role"], "content": m["content"]})
if image_url:
latest_text = ""
for m in reversed(messages):
if m["role"] == "user":
latest_text = m["content"]
break
if not latest_text:
latest_text = "Analyze this image."
api_messages.append({
"role": "user",
"content": [
{"type": "text", "text": latest_text},
{"type": "image_url", "image_url": {"url": image_url}},
],
})
payload = json.dumps({
"model": OPENAI_MODEL or "gpt-5.2",
"response_format": {"type": "json_object"},
"temperature": temperature,
"max_completion_tokens": max_tokens,
"messages": api_messages,
}).encode()
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=payload,
method="POST",
)
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}")
try:
with urllib.request.urlopen(req, timeout=60) as resp:
raw = json.loads(resp.read())
content = raw.get("choices", [{}])[0].get("message", {}).get("content")
if isinstance(content, str):
return json.loads(content)
return None
except Exception:
return None
def call_openai_raw(system_prompt, messages, temperature=0.2, max_tokens=900):
"""Like call_openai but returns (ok, parsed_or_error_text)."""
if not OPENAI_API_KEY:
return False, "AI not configured"
api_messages = [{"role": "system", "content": system_prompt}]
for m in messages:
api_messages.append({"role": m["role"], "content": m["content"]})
payload = json.dumps({
"model": OPENAI_MODEL or "gpt-5.2",
"response_format": {"type": "json_object"},
"temperature": temperature,
"max_completion_tokens": max_tokens,
"messages": api_messages,
}).encode()
req = urllib.request.Request(
"https://api.openai.com/v1/chat/completions",
data=payload,
method="POST",
)
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}")
try:
with urllib.request.urlopen(req, timeout=60) as resp:
raw = json.loads(resp.read())
content = raw.get("choices", [{}])[0].get("message", {}).get("content")
if isinstance(content, str):
return True, json.loads(content)
return False, "Empty response"
except urllib.error.HTTPError as e:
return False, e.read().decode() if e.fp else str(e)
except Exception as e:
return False, str(e)
# ── Internal service calls ──
_current_user = {} # set by handler before calling
def fitness_api(method, path, user_id, body_dict=None):
url = f"{FITNESS_URL}/api{path}"
headers = {
"Content-Type": "application/json",
"X-Gateway-User-Id": str(user_id),
"X-Gateway-User-Name": _current_user.get("display_name") or _current_user.get("username", ""),
}
body = json.dumps(body_dict).encode() if body_dict else None
status, resp_headers, resp_body = proxy_request(url, method, headers, body)
try:
parsed = json.loads(resp_body)
except Exception:
parsed = {}
return status, parsed
def brain_api(method, path, user_id, body_dict=None):
url = f"{BRAIN_URL}/api{path}"
headers = {
"Content-Type": "application/json",
"X-Gateway-User-Id": str(user_id),
}
body = json.dumps(body_dict).encode() if body_dict else None
status, resp_headers, resp_body = proxy_request(url, method, headers, body)
try:
parsed = json.loads(resp_body)
except Exception:
parsed = {}
return status, parsed
# ── Domain detection ──
def is_fitness_intent(text):
return bool(
re.search(r'\b(calories?|protein|carbs?|fat|sugar|fiber|breakfast|lunch|dinner|snack|meal|food|ate|eaten|log|track|entries|macros?)\b', text, re.IGNORECASE)
or re.search(r'\bfor (breakfast|lunch|dinner|snack)\b', text, re.IGNORECASE)
or re.search(r'\bhow many calories do i have left\b', text, re.IGNORECASE)
)
def is_brain_intent(text):
return bool(re.search(
r'\b(note|notes|brain|remember|save this|save that|what do i have|what have i saved|find my|delete (?:that|this|note|item)|update (?:that|this|note)|from my notes)\b',
text, re.IGNORECASE,
))
def detect_domain(messages, state, image_data_url=None, allow_brain=True):
text = last_user_message(messages)
if image_data_url:
return "fitness"
if not text:
return state.get("activeDomain") or "brain"
fitness = is_fitness_intent(text)
brain = is_brain_intent(text) if allow_brain else False
if fitness and not brain:
return "fitness"
if brain and not fitness:
return "brain"
active = state.get("activeDomain")
if active == "fitness" and not brain:
return "fitness"
if active == "brain" and not fitness:
return "brain"
return "fitness" if fitness else "brain"
# ── Fitness: apply draft ──
def should_reuse_resolved(canonical_name, resolved, confidence):
if not resolved or not resolved.get("name"):
return False
if confidence < 0.9:
return False
draft_key = normalized_food_key(canonical_name)
resolved_key = normalized_food_key(resolved["name"])
if not draft_key or not resolved_key:
return False
return draft_key == resolved_key
def apply_draft(user_id, draft, index=0):
parsed_qty, parsed_name = parse_leading_quantity(draft.get("food_name") or "")
entry_quantity = max(
draft.get("quantity") if draft.get("quantity") and draft["quantity"] > 0 else (parsed_qty or 1),
0.1,
)
canonical_name = canonical_food_name(parsed_name or draft.get("food_name") or "Quick add")
base_unit = food_base_unit(draft)
# Resolve food
status, resolve_body = fitness_api("POST", "/foods/resolve", user_id, {
"raw_phrase": canonical_name,
"meal_type": draft.get("meal_type") or "snack",
"entry_date": draft.get("entry_date") or today_iso(),
"source": "assistant",
})
matched_food = None
if status == 200 and should_reuse_resolved(canonical_name, resolve_body.get("matched_food"), to_number(resolve_body.get("confidence"), 0)):
matched_food = resolve_body.get("matched_food")
if not matched_food:
per_base_calories = max((draft.get("calories") or 0) / entry_quantity, 0)
per_base_protein = max((draft.get("protein") or 0) / entry_quantity, 0)
per_base_carbs = max((draft.get("carbs") or 0) / entry_quantity, 0)
per_base_fat = max((draft.get("fat") or 0) / entry_quantity, 0)
per_base_sugar = max((draft.get("sugar") or 0) / entry_quantity, 0)
per_base_fiber = max((draft.get("fiber") or 0) / entry_quantity, 0)
status, created_body = fitness_api("POST", "/foods", user_id, {
"name": canonical_name,
"calories_per_base": per_base_calories,
"protein_per_base": per_base_protein,
"carbs_per_base": per_base_carbs,
"fat_per_base": per_base_fat,
"sugar_per_base": per_base_sugar,
"fiber_per_base": per_base_fiber,
"base_unit": base_unit,
"status": "assistant_created",
"notes": f"Assistant created from chat draft: {draft.get('food_name') or canonical_name}",
"servings": [{
"name": (draft.get("default_serving_label") or "").strip() or default_serving_name(base_unit),
"amount_in_base": 1.0,
"is_default": True,
}],
})
if status < 200 or status >= 300:
return {"ok": False, "status": status, "body": created_body}
matched_food = created_body
elif (matched_food.get("status") in ("assistant_created", "ai_created") and
has_material_nutrition_mismatch(draft, matched_food, entry_quantity)):
update_status, updated_body = fitness_api("PATCH", f"/foods/{matched_food['id']}", user_id, {
"calories_per_base": max((draft.get("calories") or 0) / entry_quantity, 0),
"protein_per_base": max((draft.get("protein") or 0) / entry_quantity, 0),
"carbs_per_base": max((draft.get("carbs") or 0) / entry_quantity, 0),
"fat_per_base": max((draft.get("fat") or 0) / entry_quantity, 0),
"sugar_per_base": max((draft.get("sugar") or 0) / entry_quantity, 0),
"fiber_per_base": max((draft.get("fiber") or 0) / entry_quantity, 0),
})
if 200 <= update_status < 300:
matched_food = updated_body
# Find default serving
serving_id = None
for s in (matched_food.get("servings") or []):
if s.get("is_default"):
serving_id = s.get("id")
break
entry_payload = {
"food_id": matched_food.get("id"),
"quantity": entry_quantity,
"unit": base_unit,
"serving_id": serving_id,
"meal_type": draft.get("meal_type") or "snack",
"entry_date": draft.get("entry_date") or today_iso(),
"entry_method": "assistant",
"source": "assistant",
"note": draft.get("note") or None,
"idempotency_key": entry_idempotency_key(draft, index),
}
food_name = (draft.get("food_name") or "").strip()
if food_name and food_name != canonical_name:
entry_payload["snapshot_food_name_override"] = food_name
status, entry_body = fitness_api("POST", "/entries", user_id, entry_payload)
return {"ok": 200 <= status < 300, "status": status, "body": entry_body}
# ── Fitness: split + resolve bundle ──
def split_input_items(user_id, phrase):
status, body = fitness_api("POST", "/foods/split", user_id, {"phrase": phrase})
if 200 <= status < 300 and isinstance(body.get("items"), list) and body["items"]:
return [str(i).strip() for i in body["items"] if str(i).strip()]
return [p.strip() for p in phrase.split(",") if p.strip()]
def draft_from_resolved_item(resolved, entry_date):
parsed = resolved.get("parsed") or {}
matched_food = resolved.get("matched_food") or None
ai_estimate = resolved.get("ai_estimate") or None
quantity = max(to_number(parsed.get("quantity"), 1), 0.1)
unit = str(parsed.get("unit") or "") or "serving"
meal = parsed.get("meal_type")
if meal not in ("breakfast", "lunch", "dinner", "snack"):
meal = "snack"
calories = protein = carbs = fat = sugar = fiber = 0
if matched_food:
calories = to_number(matched_food.get("calories_per_base")) * quantity
protein = to_number(matched_food.get("protein_per_base")) * quantity
carbs = to_number(matched_food.get("carbs_per_base")) * quantity
fat = to_number(matched_food.get("fat_per_base")) * quantity
sugar = to_number(matched_food.get("sugar_per_base")) * quantity
fiber = to_number(matched_food.get("fiber_per_base")) * quantity
elif ai_estimate:
calories = to_number(ai_estimate.get("calories_per_base")) * quantity
protein = to_number(ai_estimate.get("protein_per_base")) * quantity
carbs = to_number(ai_estimate.get("carbs_per_base")) * quantity
fat = to_number(ai_estimate.get("fat_per_base")) * quantity
sugar = to_number(ai_estimate.get("sugar_per_base")) * quantity
fiber = to_number(ai_estimate.get("fiber_per_base")) * quantity
elif resolved.get("resolution_type") == "quick_add":
calories = to_number(parsed.get("quantity"))
food_name = (
(resolved.get("snapshot_name_override") if isinstance(resolved.get("snapshot_name_override"), str) and resolved["snapshot_name_override"] else None)
or (matched_food.get("name") if matched_food and isinstance(matched_food.get("name"), str) and matched_food["name"] else None)
or (ai_estimate.get("food_name") if ai_estimate and isinstance(ai_estimate.get("food_name"), str) and ai_estimate["food_name"] else None)
or (resolved.get("raw_text") if isinstance(resolved.get("raw_text"), str) and resolved["raw_text"] else None)
or "Quick add"
)
serving_label = ""
if ai_estimate and isinstance(ai_estimate.get("serving_description"), str):
serving_label = ai_estimate["serving_description"]
elif quantity > 0 and unit:
serving_label = f"{quantity} {unit}"
return clamp_draft({
"food_name": food_name,
"meal_type": meal,
"entry_date": entry_date,
"quantity": quantity,
"unit": unit,
"calories": round(calories),
"protein": round(protein),
"carbs": round(carbs),
"fat": round(fat),
"sugar": round(sugar),
"fiber": round(fiber),
"note": resolved.get("note") if isinstance(resolved.get("note"), str) else "",
"default_serving_label": serving_label,
})
def build_draft_bundle(user_id, phrase, entry_date):
parts = split_input_items(user_id, phrase)
if len(parts) < 2:
return None
results = []
for part in parts:
status, body = fitness_api("POST", "/foods/resolve", user_id, {
"raw_phrase": part,
"entry_date": entry_date,
"source": "assistant",
})
if status < 200 or status >= 300:
continue
results.append(draft_from_resolved_item(body, entry_date))
valid = [d for d in results if has_complete_draft(d)]
return valid if valid else None
def revise_draft_bundle(messages, drafts, image_data_url=None):
if not OPENAI_API_KEY or not drafts:
return None
system_prompt = (
"You are revising a bundled food draft inside a fitness app.\n\n"
"Return ONLY JSON like:\n"
'{\n "reply": "short assistant reply",\n'
' "drafts": [\n {\n "food_name": "string",\n'
' "meal_type": "breakfast|lunch|dinner|snack",\n'
' "entry_date": "YYYY-MM-DD",\n "quantity": 1,\n'
' "unit": "serving",\n "calories": 0,\n "protein": 0,\n'
' "carbs": 0,\n "fat": 0,\n "sugar": 0,\n "fiber": 0,\n'
' "note": "",\n "default_serving_label": ""\n }\n ]\n}\n\n'
"Rules:\n"
"- Update only the item or items the user is correcting.\n"
"- Keep untouched items unchanged.\n"
"- If the user says one item is wrong, replace that item without collapsing the bundle into one merged food.\n"
"- Preserve meal and entry date unless the user changes them.\n"
"- Keep replies brief and natural.\n"
"- If a photo is attached, you may use it again for corrections.\n\n"
f"Current bundle:\n{json.dumps(drafts, indent=2)}"
)
parsed = call_openai(system_prompt, messages, temperature=0.2, max_tokens=1000, image_url=image_data_url)
if not parsed:
return None
next_drafts = [clamp_draft(d) for d in parsed.get("drafts", [])] if isinstance(parsed.get("drafts"), list) else []
if not has_complete_drafts(next_drafts):
return None
return {
"reply": parsed.get("reply") or "I updated those items.",
"drafts": next_drafts,
}
# ── Fitness handler ──
def handle_fitness_assistant(handler, body, user):
global _current_user
_current_user = user
try:
data = json.loads(body)
except Exception:
handler._send_json({"error": "Invalid JSON"}, 400)
return
messages_raw = data.get("messages") or []
draft_raw = data.get("draft")
drafts_raw = data.get("drafts")
action = data.get("action", "chat")
image_data_url = data.get("imageDataUrl")
entry_date = data.get("entryDate")
user_id = user["id"]
requested_date = entry_date if isinstance(entry_date, str) and re.match(r'^\d{4}-\d{2}-\d{2}$', entry_date) else today_iso()
current_draft = clamp_draft({
"entry_date": requested_date,
**(draft_raw if isinstance(draft_raw, dict) else {}),
})
current_drafts = []
if isinstance(drafts_raw, list):
for item in drafts_raw:
if item and isinstance(item, dict):
current_drafts.append(clamp_draft({"entry_date": requested_date, **item}))
# ── Apply action ──
if action == "apply":
if has_complete_drafts(current_drafts):
results = [apply_draft(user_id, item, i) for i, item in enumerate(current_drafts)]
failed = next((r for r in results if not r["ok"]), None)
if failed:
handler._send_json({
"reply": "I couldn't add all of those entries yet. Try again in a moment.",
"drafts": current_drafts,
"applied": False,
"error": failed["body"].get("error") or f"Fitness API returned {failed['status']}",
}, 500)
return
handler._send_json({
"reply": f"Added {len(current_drafts)} items to {current_drafts[0].get('meal_type') or 'your log'}.",
"drafts": current_drafts,
"applied": True,
"entries": [r["body"] for r in results],
})
return
if not has_complete_draft(current_draft):
handler._send_json({
"reply": "I still need a food and calories before I can add it.",
"draft": current_draft,
"drafts": current_drafts,
"applied": False,
})
return
result = apply_draft(user_id, current_draft)
if not result["ok"]:
handler._send_json({
"reply": "I couldn't add that entry yet. Try again in a moment.",
"draft": current_draft,
"applied": False,
"error": result["body"].get("error") or f"Fitness API returned {result['status']}",
}, 500)
return
handler._send_json({
"reply": f"Added {current_draft.get('food_name')} to {current_draft.get('meal_type')}.",
"draft": current_draft,
"drafts": [],
"applied": True,
"entry": result["body"],
})
return
# ── Chat action ──
chat = recent_messages(messages_raw, 10)
user_text = last_user_message(chat)
allow_apply = is_explicit_confirmation(user_text)
retry_requested = is_retry_request(user_text)
has_photo = (
isinstance(image_data_url, str)
and image_data_url.startswith("data:image/")
and len(image_data_url) < 8_000_000
)
# Bundle revision
if not allow_apply and len(current_drafts) > 1 and user_text.strip():
revised = revise_draft_bundle(
chat, current_drafts,
image_data_url if has_photo else None,
)
if revised:
handler._send_json({
"reply": revised["reply"],
"drafts": revised["drafts"],
"draft": None,
"applied": False,
})
return
# Multi-item split
if not has_photo and not retry_requested and not allow_apply and user_text.strip():
bundle = build_draft_bundle(user_id, user_text, requested_date)
if bundle and len(bundle) > 1:
meal = bundle[0].get("meal_type") or "snack"
names = ", ".join(d.get("food_name", "") for d in bundle)
handler._send_json({
"reply": f"I split that into {len(bundle)} items for {meal}: {names}. Add them when this looks right.",
"drafts": bundle,
"draft": None,
"applied": False,
})
return
# Main chat with OpenAI
system_prompt = (
"You are a conversational fitness logging assistant inside a personal app.\n\n"
"Your job:\n"
"- read the chat plus the current draft food entry\n"
"- update the draft naturally\n"
"- keep the reply short, plain, and useful\n"
"- do not add an entry on the first food message\n"
'- only set apply_now=true if the latest user message is a pure confirmation like "add it", "log it", "save it", or "looks good add it"\n'
"- never say an item was added, logged, or saved unless apply_now=true for that response\n"
"- if a food photo is attached, identify the likely food, portion, and meal context before drafting\n"
"- if the user says the current guess is wrong, treat that as authoritative and replace the draft instead of defending the previous guess\n\n"
"Return ONLY JSON with this shape:\n"
'{\n'
' "reply": "short assistant reply",\n'
' "draft": {\n'
' "food_name": "string",\n'
' "meal_type": "breakfast|lunch|dinner|snack",\n'
' "entry_date": "YYYY-MM-DD",\n'
' "quantity": 1,\n'
' "unit": "serving",\n'
' "calories": 0,\n'
' "protein": 0,\n'
' "carbs": 0,\n'
' "fat": 0,\n'
' "sugar": 0,\n'
' "fiber": 0,\n'
' "note": "",\n'
' "default_serving_label": ""\n'
' },\n'
' "apply_now": false\n'
'}\n\n'
"Rules:\n"
"- Preserve the current draft unless the user changes something.\n"
"- If the latest user message says the guess is wrong, try again, or search again, do not cling to the old food guess. Replace the draft with a new best guess.\n"
'- If the user says "make it 150 calories", update calories and keep the rest unless another field should obviously move with it.\n'
"- If the user says a meal, move it to that meal.\n"
"- Default meal_type to snack if not specified.\n"
"- Default entry_date to today unless the user specifies another date.\n"
"- Estimate realistic nutrition when needed.\n"
"- Always include sugar and fiber estimates, even if rough.\n"
'- Keep food_name human and concise, for example "2 boiled eggs".\n'
"- If the photo is a nutrition label or package nutrition panel, extract the serving size from the label and put it in default_serving_label.\n"
"- If the user gives a product name plus a nutrition label photo, use the label values instead of guessing from memory.\n"
"- If the photo is ambiguous, briefly mention up to 2 likely alternatives instead of sounding overconfident.\n"
"- After drafting or revising, summarize the draft with calories and key macros, then ask for confirmation.\n"
"- If a photo is unclear, say what you think it is and mention the uncertainty briefly.\n"
"- If retrying from a photo, use the image again and produce a different best guess or ask one short clarifying question.\n"
"- If details are missing, ask one short follow-up instead of overexplaining.\n"
"- When the user confirms, keep the reply brief because the app will add the entry next.\n\n"
f"Today is {today_iso()}.\n"
f"Current draft:\n{json.dumps(draft_for_retry(current_draft) if retry_requested else current_draft, indent=2)}"
)
if not OPENAI_API_KEY:
handler._send_json({
"reply": "Assistant is not configured yet.",
"draft": current_draft,
"drafts": current_drafts,
"applied": False,
}, 500)
return
parsed = call_openai(
system_prompt, chat, temperature=0.2, max_tokens=900,
image_url=image_data_url if has_photo else None,
)
if not parsed:
handler._send_json({
"reply": "The assistant did not respond cleanly.",
"draft": current_draft,
"drafts": current_drafts,
"applied": False,
}, 500)
return
next_draft = clamp_draft(parsed.get("draft") or current_draft)
if parsed.get("apply_now") and allow_apply and has_complete_draft(next_draft):
result = apply_draft(user_id, next_draft)
if result["ok"]:
handler._send_json({
"reply": f"Added {next_draft.get('food_name')} to {next_draft.get('meal_type')}.",
"draft": next_draft,
"drafts": [],
"applied": True,
"entry": result["body"],
})
return
reply = parsed.get("reply") or (
f"{next_draft.get('food_name')} is staged at {round(next_draft.get('calories') or 0)} calories."
if has_complete_draft(next_draft)
else "I updated the draft."
)
handler._send_json({
"reply": reply,
"draft": next_draft,
"drafts": [],
"applied": False,
})
# ── Brain: internal API helpers ──
def brain_search(user_id, q, limit=5):
status, body = brain_api("POST", "/search/hybrid", user_id, {"q": q, "limit": limit})
if 200 <= status < 300 and isinstance(body.get("items"), list):
return body["items"]
return []
def create_brain_note(user_id, content, title=None):
payload = {"type": "note", "raw_content": content}
if title and title.strip():
payload["title"] = title.strip()
status, body = brain_api("POST", "/items", user_id, payload)
return {"ok": 200 <= status < 300, "status": status, "body": body}
def update_brain_item(user_id, item_id, updates):
status, body = brain_api("PATCH", f"/items/{item_id}", user_id, updates)
return {"ok": 200 <= status < 300, "status": status, "body": body}
def append_to_item(user_id, item_id, content):
status, body = brain_api("POST", f"/items/{item_id}/additions", user_id, {
"content": content, "source": "assistant", "kind": "append",
})
return {"ok": 200 <= status < 300, "status": status, "body": body}
def delete_addition(user_id, item_id, addition_id):
status, _ = brain_api("DELETE", f"/items/{item_id}/additions/{addition_id}", user_id)
return 200 <= status < 300
def delete_item(user_id, item_id):
status, _ = brain_api("DELETE", f"/items/{item_id}", user_id)
return 200 <= status < 300
def get_item(user_id, item_id):
status, body = brain_api("GET", f"/items/{item_id}", user_id)
if 200 <= status < 300 and body:
return body
return None
# ── Brain: search + decide ──
def derive_search_queries(messages, user_text):
normalized = normalize_search_seed(user_text)
fallback = [normalized or user_text.strip()]
fallback = [q for q in fallback if q]
system_prompt = (
"You extract concise retrieval queries for a personal knowledge base.\n\n"
"Return ONLY JSON:\n"
'{\n "queries": ["query one", "query two"]\n}\n\n'
"Rules:\n"
"- Return 1 to 3 short search queries.\n"
"- Focus on the underlying topic, not chat filler.\n"
'- For "what do I have about X", include just X.\n'
"- For advice/tips, you may infer a likely broader topic if strongly implied.\n"
"- Keep each query short, usually 1 to 5 words.\n"
"- Do not include punctuation-heavy sentences.\n"
"- Do not include explanatory text."
)
parsed = call_openai(system_prompt, messages[-6:] + [{"role": "user", "content": f"Search intent: {user_text}"}], temperature=0.1, max_tokens=200)
if not parsed:
return fallback
queries = parsed.get("queries")
if isinstance(queries, list):
queries = [str(q).strip() for q in queries if str(q).strip()][:3]
return queries if queries else fallback
return fallback
def collect_candidates(user_id, messages, user_text, limit=8):
queries = derive_search_queries(messages, user_text)
merged = {}
for q in queries:
items = brain_search(user_id, q, limit)
for item in items:
if item.get("id") not in merged:
merged[item["id"]] = item
if not merged:
for item in brain_search(user_id, user_text, limit):
if item.get("id") not in merged:
merged[item["id"]] = item
return list(merged.values())[:limit]
def decide_action(messages, candidates, state, listing_intent=False):
system_prompt = (
"You are the Brain assistant inside a personal app.\n\n"
"You help the user save notes naturally, append thoughts to existing items, answer questions from saved notes, and choose whether to create a new note.\n\n"
"Return ONLY JSON with this shape:\n"
'{\n'
' "action": "append_existing" | "create_new_note" | "update_existing" | "answer" | "list_items" | "delete_target",\n'
' "reply": "short reply preview",\n'
' "target_item_id": "optional item id",\n'
' "formatted_content": "text to append or create",\n'
' "create_title": "short AI-generated title when creating a new note",\n'
' "answer": "short answer when action=answer",\n'
' "source_item_ids": ["id1", "id2"],\n'
' "match_confidence": "high" | "low"\n'
'}\n\n'
"Rules:\n"
"- Use existing items only when the topical match is strong.\n"
"- If the match is weak or ambiguous, create a new note.\n"
"- The user prefers speed: do not ask which note to use.\n"
"- If the user explicitly says update, change, edit, replace, set, or correct, prefer update_existing when one strong existing note clearly matches.\n"
"- If the user is asking to list what they have saved, prefer list_items instead of answer.\n"
"- For short tips/advice, format as bullets when natural.\n"
"- Only fix spelling and grammar. Do not rewrite the meaning.\n"
"- For questions, answer briefly and cite up to 3 source ids.\n"
"- For list_items, return a concise list-style reply and cite up to 12 source ids.\n"
"- For delete requests, choose the most likely target and set action=delete_target.\n"
"- Never choose append_existing unless target_item_id is one of the candidate ids.\n"
"- Never choose update_existing unless target_item_id is one of the candidate ids.\n"
"- If all candidates are weak, set action=create_new_note and match_confidence=low.\n"
f"- The current assistant state is only for context:\n{json.dumps(state, indent=2)}\n"
f"- listing_intent={listing_intent}\n"
)
ok, result = call_openai_raw(
system_prompt,
messages + [{"role": "user", "content": f"Candidate items:\n{build_candidate_summary(candidates)}"}],
temperature=0.2, max_tokens=1100,
)
if not ok:
raise Exception(result)
return result
def rewrite_existing_item_content(messages, target, user_instruction):
existing = (target.get("raw_content") or target.get("extracted_text") or "").strip()
if not existing:
raise Exception("Target item has no editable content.")
system_prompt = (
"You edit an existing personal note in place.\n\n"
"Return ONLY JSON:\n"
'{\n "updated_content": "full updated note body"\n}\n\n'
"Rules:\n"
"- Apply the user's requested update to the existing note.\n"
"- Preserve the note's structure and wording as much as possible.\n"
"- Make the smallest correct change needed.\n"
"- Do not append a new line when the user clearly wants an existing value updated.\n"
"- Only fix spelling and grammar where needed for the changed text.\n"
"- Return the full updated note body, not a diff."
)
ok, result = call_openai_raw(
system_prompt,
list(messages[-8:]) + [{
"role": "user",
"content": f"Target title: {target.get('title') or 'Untitled'}\n\nExisting note:\n{existing}\n\nInstruction: {user_instruction}",
}],
temperature=0.1, max_tokens=900,
)
if not ok:
raise Exception(result)
updated = (result.get("updated_content") or "").strip()
if not updated:
raise Exception("Updated content was empty.")
return updated
# ── Brain handler ──
def handle_brain_assistant(handler, body, user):
try:
data = json.loads(body)
except Exception:
handler._send_json({"error": "Invalid JSON"}, 400)
return
if not OPENAI_API_KEY:
handler._send_json({"error": "Assistant is not configured."}, 500)
return
messages_raw = data.get("messages") or []
state = data.get("state") or {}
if not isinstance(state, dict):
state = {}
user_id = user["id"]
chat = recent_messages(messages_raw, 12)
user_text = last_user_message(chat)
if not user_text:
handler._send_json({
"reply": "Tell me what to save, find, answer, or delete.",
"state": state,
"sources": [],
})
return
# ── Pending delete confirmation ──
pending_delete = state.get("pendingDelete")
if pending_delete and is_confirmation(user_text):
ok = delete_item(user_id, pending_delete["itemId"])
if not ok:
handler._send_json({
"reply": f"I couldn't delete \"{pending_delete['itemTitle']}\" yet.",
"state": state,
"sources": [],
}, 500)
return
handler._send_json({
"reply": f"Deleted \"{pending_delete['itemTitle']}\".",
"state": {},
"sources": [],
})
return
# ── Undo ──
last_mutation = state.get("lastMutation")
if last_mutation and is_undo(user_text):
if last_mutation.get("type") == "append" and last_mutation.get("additionId"):
ok = delete_addition(user_id, last_mutation["itemId"], last_mutation["additionId"])
handler._send_json({
"reply": f"Undid the change in \"{last_mutation['itemTitle']}\"." if ok else "I could not undo that change yet.",
"state": {} if ok else state,
"sources": [{
"id": last_mutation["itemId"],
"title": last_mutation["itemTitle"],
"type": "note",
"href": f"/brain?item={last_mutation['itemId']}",
}] if ok else [],
})
return
if last_mutation.get("type") == "create" and last_mutation.get("createdItemId"):
ok = delete_item(user_id, last_mutation["createdItemId"])
handler._send_json({
"reply": f"Removed \"{last_mutation['itemTitle']}\"." if ok else "I could not undo that note creation yet.",
"state": {} if ok else state,
"sources": [],
})
return
if last_mutation.get("type") == "update" and last_mutation.get("previousRawContent") is not None:
restored = update_brain_item(user_id, last_mutation["itemId"], {
"raw_content": last_mutation["previousRawContent"],
})
handler._send_json({
"reply": f"Undid the update in \"{last_mutation['itemTitle']}\"." if restored["ok"] else "I could not undo that update yet.",
"state": {} if restored["ok"] else state,
"sources": [{
"id": last_mutation["itemId"],
"title": last_mutation["itemTitle"],
"type": "note",
"href": f"/brain?item={last_mutation['itemId']}",
}] if restored["ok"] else [],
})
return
# ── Make new note instead ──
if last_mutation and wants_new_note(user_text):
content = last_mutation.get("content", "")
if last_mutation.get("type") == "append" and last_mutation.get("additionId"):
delete_addition(user_id, last_mutation["itemId"], last_mutation["additionId"])
try:
title_decision = decide_action(
[{"role": "user", "content": f"Create a concise title for this note:\n{content}"}],
[], state,
)
except Exception:
title_decision = {"create_title": "New Note"}
created = create_brain_note(user_id, content, title_decision.get("create_title"))
if not created["ok"]:
handler._send_json({"reply": "I could not create the new note yet.", "state": state, "sources": []}, 500)
return
created_item = created["body"]
title = created_item.get("title") or title_decision.get("create_title") or "New note"
handler._send_json({
"reply": f"Created \"{title}\".",
"state": {
"lastMutation": {
"type": "create",
"itemId": created_item.get("id"),
"createdItemId": created_item.get("id"),
"itemTitle": title,
"content": content,
},
},
"sources": [to_source(created_item)],
})
return
# ── Move/retarget ──
retarget = move_target(user_text) if last_mutation else None
if last_mutation and retarget:
if last_mutation.get("type") == "append" and last_mutation.get("additionId"):
delete_addition(user_id, last_mutation["itemId"], last_mutation["additionId"])
elif last_mutation.get("type") == "create" and last_mutation.get("createdItemId"):
delete_item(user_id, last_mutation["createdItemId"])
candidates = collect_candidates(user_id, chat, retarget, 8)
target = candidates[0] if candidates else None
if not target:
created = create_brain_note(user_id, last_mutation["content"], retarget)
if not created["ok"]:
handler._send_json({"reply": "I could not move that into a new note yet.", "state": state, "sources": []}, 500)
return
created_item = created["body"]
handler._send_json({
"reply": f"Created \"{created_item.get('title') or retarget}\".",
"state": {
"lastMutation": {
"type": "create",
"itemId": created_item.get("id"),
"createdItemId": created_item.get("id"),
"itemTitle": created_item.get("title") or retarget,
"content": last_mutation["content"],
},
},
"sources": [to_source(created_item)],
})
return
appended = append_to_item(user_id, target["id"], last_mutation["content"])
if not appended["ok"]:
handler._send_json({
"reply": f"I couldn't move that to \"{target.get('title') or 'that item'}\" yet.",
"state": state,
"sources": [],
}, 500)
return
handler._send_json({
"reply": f"Moved it to \"{target.get('title') or 'Untitled'}\".",
"state": {
"lastMutation": {
"type": "append",
"itemId": target["id"],
"itemTitle": target.get("title") or "Untitled",
"additionId": appended["body"].get("id"),
"content": last_mutation["content"],
},
},
"sources": [to_source(target)],
})
return
# ── Listing / explicit update / delete shortcut ──
listing_intent = is_listing_intent(user_text)
candidates = collect_candidates(user_id, chat, user_text, 24 if listing_intent else 8)
# Explicit update shortcut
if is_explicit_update(user_text) and candidates and (candidates[0].get("raw_content") or candidates[0].get("extracted_text")):
target = candidates[0]
try:
updated_content = rewrite_existing_item_content(chat, target, user_text)
updated = update_brain_item(user_id, target["id"], {"raw_content": updated_content})
if not updated["ok"]:
handler._send_json({
"reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.",
"state": state, "sources": [],
}, 500)
return
handler._send_json({
"reply": f"Updated \"{target.get('title') or 'Untitled'}\".",
"state": {
"lastMutation": {
"type": "update",
"itemId": target["id"],
"itemTitle": target.get("title") or "Untitled",
"content": updated_content,
"previousRawContent": target.get("raw_content") or target.get("extracted_text") or "",
},
},
"sources": [to_source(target)],
})
return
except Exception as e:
handler._send_json({
"reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.",
"state": state,
"sources": [to_source(target)],
"error": str(e),
}, 500)
return
# Delete shortcut
if wants_delete(user_text) and not state.get("pendingDelete"):
target = candidates[0] if candidates else None
if not target:
handler._send_json({"reply": "I couldn't find a note to delete.", "state": state, "sources": []})
return
handler._send_json({
"reply": f"Delete \"{target.get('title') or 'Untitled'}\"?",
"state": {
**state,
"pendingDelete": {
"itemId": target["id"],
"itemTitle": target.get("title") or "Untitled",
},
},
"sources": [to_source(target)],
})
return
# ── AI decision ──
try:
decision = decide_action(chat, candidates, state, listing_intent)
except Exception as e:
handler._send_json({
"reply": "The Brain assistant did not respond cleanly.",
"state": state,
"sources": [],
"error": str(e),
}, 500)
return
action_type = decision.get("action")
if action_type == "answer":
handler._send_json({
"reply": decision.get("answer") or decision.get("reply") or "I found a few relevant notes.",
"state": state,
"sources": source_links_from_ids(candidates, decision.get("source_item_ids")),
})
return
if action_type == "list_items":
handler._send_json({
"reply": decision.get("answer") or decision.get("reply") or "Here are the matching items I found.",
"state": state,
"sources": source_links_from_ids(candidates, decision.get("source_item_ids")),
})
return
if action_type == "delete_target" and decision.get("target_item_id"):
target = None
for c in candidates:
if c.get("id") == decision["target_item_id"]:
target = c
break
if not target:
target = get_item(user_id, decision["target_item_id"])
if not target:
handler._send_json({"reply": "I couldn't find that note to delete.", "state": state, "sources": []})
return
handler._send_json({
"reply": f"Delete \"{target.get('title') or 'Untitled'}\"?",
"state": {
**state,
"pendingDelete": {
"itemId": target["id"],
"itemTitle": target.get("title") or "Untitled",
},
},
"sources": [to_source(target)],
})
return
if action_type == "update_existing" and decision.get("target_item_id") and decision.get("match_confidence") == "high":
target = None
for c in candidates:
if c.get("id") == decision["target_item_id"]:
target = c
break
if not target:
target = get_item(user_id, decision["target_item_id"])
if target and (target.get("raw_content") or target.get("extracted_text")):
try:
updated_content = rewrite_existing_item_content(chat, target, user_text)
updated = update_brain_item(user_id, target["id"], {"raw_content": updated_content})
if not updated["ok"]:
handler._send_json({
"reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.",
"state": state, "sources": [],
}, 500)
return
handler._send_json({
"reply": f"Updated \"{target.get('title') or 'Untitled'}\".",
"state": {
"lastMutation": {
"type": "update",
"itemId": target["id"],
"itemTitle": target.get("title") or "Untitled",
"content": updated_content,
"previousRawContent": target.get("raw_content") or target.get("extracted_text") or "",
},
},
"sources": [to_source(target)],
})
return
except Exception as e:
handler._send_json({
"reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.",
"state": state,
"sources": [to_source(target)] if target else [],
"error": str(e),
}, 500)
return
content = (decision.get("formatted_content") or "").strip() or user_text
if action_type == "append_existing" and decision.get("target_item_id") and decision.get("match_confidence") == "high":
target = None
for c in candidates:
if c.get("id") == decision["target_item_id"]:
target = c
break
if not target:
target = get_item(user_id, decision["target_item_id"])
if target:
appended = append_to_item(user_id, target["id"], content)
if not appended["ok"]:
handler._send_json({
"reply": f"I couldn't add that to \"{target.get('title') or 'Untitled'}\" yet.",
"state": state, "sources": [],
}, 500)
return
handler._send_json({
"reply": f"Added to \"{target.get('title') or 'Untitled'}\".",
"state": {
"lastMutation": {
"type": "append",
"itemId": target["id"],
"itemTitle": target.get("title") or "Untitled",
"additionId": appended["body"].get("id"),
"content": content,
},
},
"sources": [to_source(target)],
})
return
# ── Fall through: create new note ──
created = create_brain_note(user_id, content, decision.get("create_title"))
if not created["ok"]:
handler._send_json({"reply": "I could not create the note yet.", "state": state, "sources": []}, 500)
return
created_item = created["body"]
title = created_item.get("title") or decision.get("create_title") or "New note"
handler._send_json({
"reply": f"Created \"{title}\".",
"state": {
"lastMutation": {
"type": "create",
"itemId": created_item.get("id"),
"createdItemId": created_item.get("id"),
"itemTitle": title,
"content": content,
},
},
"sources": [to_source(created_item)],
})
# ── Unified dispatcher ──
def handle_assistant(handler, body, user):
try:
data = json.loads(body)
except Exception:
handler._send_json({"error": "Invalid JSON"}, 400)
return
messages_raw = data.get("messages") or []
state_raw = data.get("state") or {}
if not isinstance(state_raw, dict):
state_raw = {}
image_data_url = data.get("imageDataUrl")
entry_date = data.get("entryDate")
action = data.get("action", "chat")
allow_brain = bool(data.get("allowBrain", True))
# Disable brain for user "madiha"
if user.get("username") == "madiha":
allow_brain = False
chat = recent_messages(messages_raw, 16)
unified_state = state_raw
domain = detect_domain(chat, unified_state, image_data_url, allow_brain) if allow_brain else "fitness"
if domain == "fitness":
fitness_state = unified_state.get("fitnessState") or {}
inner_body = json.dumps({
"action": action,
"messages": messages_raw,
"draft": fitness_state.get("draft") if isinstance(fitness_state, dict) and "draft" in fitness_state else None,
"drafts": fitness_state.get("drafts") if isinstance(fitness_state, dict) and "drafts" in fitness_state else [],
"entryDate": entry_date,
"imageDataUrl": image_data_url,
}).encode()
# Call fitness handler directly, capturing the response
result = _call_fitness(user, inner_body)
else:
brain_state = unified_state.get("brainState") or {}
inner_body = json.dumps({
"messages": messages_raw,
"state": brain_state,
}).encode()
result = _call_brain(user, inner_body)
if result is None:
handler._send_json({"error": "Internal assistant error"}, 500)
return
response_data = result.get("data", {})
response_status = result.get("status", 200)
# Merge unified state
if domain == "fitness":
merged_state = {
"activeDomain": domain,
"fitnessState": {
"draft": response_data.get("draft"),
"drafts": response_data.get("drafts") if isinstance(response_data.get("drafts"), list) else [],
},
"brainState": unified_state.get("brainState") or {},
}
else:
merged_state = {
"activeDomain": domain,
"fitnessState": unified_state.get("fitnessState") or {},
"brainState": response_data.get("state") or {},
}
output = {**response_data, "domain": domain, "state": merged_state}
handler._send_json(output, response_status)
class _CaptureHandler:
"""Lightweight mock handler that captures _send_json calls."""
def __init__(self):
self.result = None
def _send_json(self, data, status=200):
self.result = {"data": data, "status": status}
def _call_fitness(user, body):
cap = _CaptureHandler()
handle_fitness_assistant(cap, body, user)
return cap.result
def _call_brain(user, body):
cap = _CaptureHandler()
handle_brain_assistant(cap, body, user)
return cap.result