""" Platform Gateway — AI Assistant endpoints (fitness + brain). Replaces the SvelteKit assistant routes so both web and iOS can use them. """ import hashlib import json import re import urllib.request import urllib.error from datetime import datetime from config import ( FITNESS_URL, BRAIN_URL, OPENAI_API_KEY, OPENAI_MODEL, ) from proxy import proxy_request # ── Utilities ── def today_iso(): now = datetime.now() return now.strftime("%Y-%m-%d") def to_number(value, fallback=0): if isinstance(value, (int, float)) and not (isinstance(value, float) and (value != value)): return value if isinstance(value, str): try: parsed = float(value) if parsed == parsed: # not NaN return parsed except ValueError: pass return fallback def clamp_draft(inp): if not inp or not isinstance(inp, dict): inp = {} meal = inp.get("meal_type") if meal not in ("breakfast", "lunch", "dinner", "snack"): meal = "snack" return { "food_name": str(inp.get("food_name") or "").strip(), "meal_type": meal, "entry_date": str(inp.get("entry_date") or "") or today_iso(), "quantity": max(to_number(inp.get("quantity"), 1), 0), "unit": str(inp.get("unit") or "").strip() or "serving", "calories": max(to_number(inp.get("calories")), 0), "protein": max(to_number(inp.get("protein")), 0), "carbs": max(to_number(inp.get("carbs")), 0), "fat": max(to_number(inp.get("fat")), 0), "sugar": max(to_number(inp.get("sugar")), 0), "fiber": max(to_number(inp.get("fiber")), 0), "note": str(inp.get("note") or "").strip(), "default_serving_label": str(inp.get("default_serving_label") or "").strip(), } def parse_leading_quantity(name): trimmed = name.strip() m = re.match(r'^(\d+(?:\.\d+)?)\s+(.+)$', trimmed) if not m: return None, trimmed return float(m.group(1)), m.group(2).strip() def canonical_food_name(name): cleaned = re.sub(r'^(?:a|an|the)\s+', '', name.strip(), flags=re.IGNORECASE) cleaned = re.sub(r'\s+', ' ', cleaned) parts = cleaned.split() result = [] for part in parts: if not part: continue if re.match(r'[A-Z]{2,}', part): result.append(part) else: result.append(part[0].upper() + part[1:].lower()) return " ".join(result) def normalized_food_key(name): canon = canonical_food_name(name).lower() canon = re.sub(r'[^a-z0-9\s]', ' ', canon) canon = re.sub(r'\b(\d+(?:\.\d+)?)\b', ' ', canon) return re.sub(r'\s+', ' ', canon).strip() def food_base_unit(draft): unit = (draft.get("unit") or "serving").strip().lower() if unit in ("piece", "slice", "cup", "scoop", "serving"): return unit return "serving" def default_serving_name(base_unit): return f"1 {base_unit}" def has_material_nutrition_mismatch(draft, matched_food, quantity): if quantity <= 0: quantity = 1 draft_per_base = { "calories": max((draft.get("calories") or 0) / quantity, 0), "protein": max((draft.get("protein") or 0) / quantity, 0), "carbs": max((draft.get("carbs") or 0) / quantity, 0), "fat": max((draft.get("fat") or 0) / quantity, 0), "sugar": max((draft.get("sugar") or 0) / quantity, 0), "fiber": max((draft.get("fiber") or 0) / quantity, 0), } current_per_base = { "calories": matched_food.get("calories_per_base") or 0, "protein": matched_food.get("protein_per_base") or 0, "carbs": matched_food.get("carbs_per_base") or 0, "fat": matched_food.get("fat_per_base") or 0, "sugar": matched_food.get("sugar_per_base") or 0, "fiber": matched_food.get("fiber_per_base") or 0, } for key in draft_per_base: nxt = draft_per_base[key] cur = current_per_base[key] if abs(nxt - cur) >= 5: return True if max(nxt, cur) <= 0: continue if abs(nxt - cur) / max(nxt, cur) >= 0.12: return True return False def entry_idempotency_key(draft, index=0): payload = json.dumps({ "index": index, "food_name": draft.get("food_name") or "", "meal_type": draft.get("meal_type") or "snack", "entry_date": draft.get("entry_date") or today_iso(), "quantity": draft.get("quantity") or 1, "unit": draft.get("unit") or "serving", "calories": draft.get("calories") or 0, "protein": draft.get("protein") or 0, "carbs": draft.get("carbs") or 0, "fat": draft.get("fat") or 0, "sugar": draft.get("sugar") or 0, "fiber": draft.get("fiber") or 0, "note": draft.get("note") or "", }, sort_keys=True) return hashlib.sha256(payload.encode()).hexdigest() def has_complete_draft(draft): return bool(draft.get("food_name")) and bool(draft.get("meal_type")) and isinstance(draft.get("calories"), (int, float)) def has_complete_drafts(drafts): return isinstance(drafts, list) and len(drafts) > 0 and all(has_complete_draft(d) for d in drafts) def is_explicit_confirmation(text): clean = text.strip().lower() if not clean: return False patterns = [ r'^add it[.!]?$', r'^log it[.!]?$', r'^save it[.!]?$', r'^looks good[.!]?$', r'^looks good add it[.!]?$', r'^that looks good[.!]?$', r'^that looks good add it[.!]?$', r'^confirm[.!]?$', r'^go ahead[.!]?$', r'^yes add it[.!]?$', r'^yes log it[.!]?$', r'^yes save it[.!]?$', ] return any(re.match(p, clean) for p in patterns) def is_retry_request(text): clean = text.strip().lower() if not clean: return False phrases = [ "that's not right", "that is not right", "wrong item", "wrong food", "wrong meal", "try again", "search again", "guess again", "redo that", "start over", "that is wrong", "that's wrong", ] return any(phrase in clean for phrase in phrases) def draft_for_retry(draft): return { "meal_type": draft.get("meal_type"), "entry_date": draft.get("entry_date") or today_iso(), "quantity": draft.get("quantity") or 1, "unit": draft.get("unit") or "serving", "food_name": "", "calories": 0, "protein": 0, "carbs": 0, "fat": 0, "sugar": 0, "fiber": 0, "note": draft.get("note") or "", "default_serving_label": "", } # ── Brain helper functions ── def is_confirmation(text): clean = text.strip().lower() return clean in ( "yes", "yes delete it", "delete it", "confirm", "yes do it", "do it", "go ahead", ) def is_undo(text): return bool(re.match(r'^(undo|undo last change|undo that|revert that)$', text.strip(), re.IGNORECASE)) def wants_new_note(text): return bool(re.search(r'create (?:a )?new note', text, re.IGNORECASE) or re.search(r'make (?:that|it) a new note', text, re.IGNORECASE)) def move_target(text): m = re.search(r'(?:add|move)\s+(?:that|it)\s+to\s+(.+)$', text, re.IGNORECASE) return m.group(1).strip() if m else None def wants_delete(text): return bool(re.search(r'\bdelete\b', text, re.IGNORECASE) and re.search(r'\b(note|item|that)\b', text, re.IGNORECASE)) def is_explicit_update(text): return bool(re.search(r'\b(update|edit|change|replace|correct|set)\b', text, re.IGNORECASE)) def is_listing_intent(text): return bool(re.search(r'\b(what|which|show|list)\b', text, re.IGNORECASE) and re.search(r'\b(do i have|have i saved|saved|notes|items|books?|pdfs?|links?|documents?|files?)\b', text, re.IGNORECASE)) def normalize_search_seed(text): t = text.strip() t = re.sub(r'^what\s+(books?|notes?|pdfs?|links?|documents?|files?)\s+do i have saved\s*', r'\1 ', t, flags=re.IGNORECASE) t = re.sub(r'^show me\s+(all\s+)?(books?|notes?|pdfs?|links?|documents?|files?)\s*', r'\2 ', t, flags=re.IGNORECASE) t = re.sub(r'^list\s+(my\s+)?(books?|notes?|pdfs?|links?|documents?|files?)\s*', r'\2 ', t, flags=re.IGNORECASE) t = re.sub(r'^add note\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'^add\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'^save\s+(?:this|that)\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'^what do i have about\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'^what have i saved about\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'^find\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'^search for\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'^answer\s+', '', t, flags=re.IGNORECASE) return t.strip() def build_candidate_summary(items): if not items: return "No candidates found." lines = [] for i, item in enumerate(items): snippet = (item.get("raw_content") or item.get("extracted_text") or item.get("summary") or "") snippet = re.sub(r'\s+', ' ', snippet)[:220] lines.append( f"{i+1}. id={item.get('id')}\n" f"title={item.get('title') or 'Untitled'}\n" f"type={item.get('type')}\n" f"folder={item.get('folder') or ''}\n" f"tags={', '.join(item.get('tags') or [])}\n" f"snippet={snippet}" ) return "\n\n".join(lines) def to_source(item): return { "id": item.get("id"), "title": item.get("title") or "Untitled", "type": item.get("type"), "href": f"/brain?item={item.get('id')}", } def source_links_from_ids(items, ids): if not isinstance(ids, list) or not ids: return [] by_id = {item["id"]: item for item in items if "id" in item} return [to_source(by_id[i]) for i in ids if i in by_id] # ── Recent messages helper ── def recent_messages(messages, limit=10): if not isinstance(messages, list): return [] result = [] for m in messages: if not m or not isinstance(m, dict): continue role = "assistant" if m.get("role") == "assistant" else "user" content = str(m.get("content") or "")[:4000] if content.strip(): result.append({"role": role, "content": content}) return result[-limit:] def last_user_message(messages): for m in reversed(messages): if m.get("role") == "user": return m.get("content", "").strip() return "" # ── OpenAI helper ── def call_openai(system_prompt, messages, temperature=0.2, max_tokens=900, image_url=None): if not OPENAI_API_KEY: return None api_messages = [{"role": "system", "content": system_prompt}] for m in messages: api_messages.append({"role": m["role"], "content": m["content"]}) if image_url: latest_text = "" for m in reversed(messages): if m["role"] == "user": latest_text = m["content"] break if not latest_text: latest_text = "Analyze this image." api_messages.append({ "role": "user", "content": [ {"type": "text", "text": latest_text}, {"type": "image_url", "image_url": {"url": image_url}}, ], }) payload = json.dumps({ "model": OPENAI_MODEL or "gpt-5.2", "response_format": {"type": "json_object"}, "temperature": temperature, "max_completion_tokens": max_tokens, "messages": api_messages, }).encode() req = urllib.request.Request( "https://api.openai.com/v1/chat/completions", data=payload, method="POST", ) req.add_header("Content-Type", "application/json") req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}") try: with urllib.request.urlopen(req, timeout=60) as resp: raw = json.loads(resp.read()) content = raw.get("choices", [{}])[0].get("message", {}).get("content") if isinstance(content, str): return json.loads(content) return None except Exception: return None def call_openai_raw(system_prompt, messages, temperature=0.2, max_tokens=900): """Like call_openai but returns (ok, parsed_or_error_text).""" if not OPENAI_API_KEY: return False, "AI not configured" api_messages = [{"role": "system", "content": system_prompt}] for m in messages: api_messages.append({"role": m["role"], "content": m["content"]}) payload = json.dumps({ "model": OPENAI_MODEL or "gpt-5.2", "response_format": {"type": "json_object"}, "temperature": temperature, "max_completion_tokens": max_tokens, "messages": api_messages, }).encode() req = urllib.request.Request( "https://api.openai.com/v1/chat/completions", data=payload, method="POST", ) req.add_header("Content-Type", "application/json") req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}") try: with urllib.request.urlopen(req, timeout=60) as resp: raw = json.loads(resp.read()) content = raw.get("choices", [{}])[0].get("message", {}).get("content") if isinstance(content, str): return True, json.loads(content) return False, "Empty response" except urllib.error.HTTPError as e: return False, e.read().decode() if e.fp else str(e) except Exception as e: return False, str(e) # ── Internal service calls ── _current_user = {} # set by handler before calling def fitness_api(method, path, user_id, body_dict=None): url = f"{FITNESS_URL}/api{path}" headers = { "Content-Type": "application/json", "X-Gateway-User-Id": str(user_id), "X-Gateway-User-Name": _current_user.get("display_name") or _current_user.get("username", ""), } body = json.dumps(body_dict).encode() if body_dict else None status, resp_headers, resp_body = proxy_request(url, method, headers, body) try: parsed = json.loads(resp_body) except Exception: parsed = {} return status, parsed def brain_api(method, path, user_id, body_dict=None): url = f"{BRAIN_URL}/api{path}" headers = { "Content-Type": "application/json", "X-Gateway-User-Id": str(user_id), } body = json.dumps(body_dict).encode() if body_dict else None status, resp_headers, resp_body = proxy_request(url, method, headers, body) try: parsed = json.loads(resp_body) except Exception: parsed = {} return status, parsed # ── Domain detection ── def is_fitness_intent(text): return bool( re.search(r'\b(calories?|protein|carbs?|fat|sugar|fiber|breakfast|lunch|dinner|snack|meal|food|ate|eaten|log|track|entries|macros?)\b', text, re.IGNORECASE) or re.search(r'\bfor (breakfast|lunch|dinner|snack)\b', text, re.IGNORECASE) or re.search(r'\bhow many calories do i have left\b', text, re.IGNORECASE) ) def is_brain_intent(text): return bool(re.search( r'\b(note|notes|brain|remember|save this|save that|what do i have|what have i saved|find my|delete (?:that|this|note|item)|update (?:that|this|note)|from my notes)\b', text, re.IGNORECASE, )) def detect_domain(messages, state, image_data_url=None, allow_brain=True): text = last_user_message(messages) if image_data_url: return "fitness" if not text: return state.get("activeDomain") or "brain" fitness = is_fitness_intent(text) brain = is_brain_intent(text) if allow_brain else False if fitness and not brain: return "fitness" if brain and not fitness: return "brain" active = state.get("activeDomain") if active == "fitness" and not brain: return "fitness" if active == "brain" and not fitness: return "brain" return "fitness" if fitness else "brain" # ── Fitness: apply draft ── def should_reuse_resolved(canonical_name, resolved, confidence): if not resolved or not resolved.get("name"): return False if confidence < 0.9: return False draft_key = normalized_food_key(canonical_name) resolved_key = normalized_food_key(resolved["name"]) if not draft_key or not resolved_key: return False return draft_key == resolved_key def apply_draft(user_id, draft, index=0): parsed_qty, parsed_name = parse_leading_quantity(draft.get("food_name") or "") entry_quantity = max( draft.get("quantity") if draft.get("quantity") and draft["quantity"] > 0 else (parsed_qty or 1), 0.1, ) canonical_name = canonical_food_name(parsed_name or draft.get("food_name") or "Quick add") base_unit = food_base_unit(draft) # Resolve food status, resolve_body = fitness_api("POST", "/foods/resolve", user_id, { "raw_phrase": canonical_name, "meal_type": draft.get("meal_type") or "snack", "entry_date": draft.get("entry_date") or today_iso(), "source": "assistant", }) matched_food = None if status == 200 and should_reuse_resolved(canonical_name, resolve_body.get("matched_food"), to_number(resolve_body.get("confidence"), 0)): matched_food = resolve_body.get("matched_food") if not matched_food: per_base_calories = max((draft.get("calories") or 0) / entry_quantity, 0) per_base_protein = max((draft.get("protein") or 0) / entry_quantity, 0) per_base_carbs = max((draft.get("carbs") or 0) / entry_quantity, 0) per_base_fat = max((draft.get("fat") or 0) / entry_quantity, 0) per_base_sugar = max((draft.get("sugar") or 0) / entry_quantity, 0) per_base_fiber = max((draft.get("fiber") or 0) / entry_quantity, 0) status, created_body = fitness_api("POST", "/foods", user_id, { "name": canonical_name, "calories_per_base": per_base_calories, "protein_per_base": per_base_protein, "carbs_per_base": per_base_carbs, "fat_per_base": per_base_fat, "sugar_per_base": per_base_sugar, "fiber_per_base": per_base_fiber, "base_unit": base_unit, "status": "assistant_created", "notes": f"Assistant created from chat draft: {draft.get('food_name') or canonical_name}", "servings": [{ "name": (draft.get("default_serving_label") or "").strip() or default_serving_name(base_unit), "amount_in_base": 1.0, "is_default": True, }], }) if status < 200 or status >= 300: return {"ok": False, "status": status, "body": created_body} matched_food = created_body elif (matched_food.get("status") in ("assistant_created", "ai_created") and has_material_nutrition_mismatch(draft, matched_food, entry_quantity)): update_status, updated_body = fitness_api("PATCH", f"/foods/{matched_food['id']}", user_id, { "calories_per_base": max((draft.get("calories") or 0) / entry_quantity, 0), "protein_per_base": max((draft.get("protein") or 0) / entry_quantity, 0), "carbs_per_base": max((draft.get("carbs") or 0) / entry_quantity, 0), "fat_per_base": max((draft.get("fat") or 0) / entry_quantity, 0), "sugar_per_base": max((draft.get("sugar") or 0) / entry_quantity, 0), "fiber_per_base": max((draft.get("fiber") or 0) / entry_quantity, 0), }) if 200 <= update_status < 300: matched_food = updated_body # Find default serving serving_id = None for s in (matched_food.get("servings") or []): if s.get("is_default"): serving_id = s.get("id") break entry_payload = { "food_id": matched_food.get("id"), "quantity": entry_quantity, "unit": base_unit, "serving_id": serving_id, "meal_type": draft.get("meal_type") or "snack", "entry_date": draft.get("entry_date") or today_iso(), "entry_method": "assistant", "source": "assistant", "note": draft.get("note") or None, "idempotency_key": entry_idempotency_key(draft, index), } food_name = (draft.get("food_name") or "").strip() if food_name and food_name != canonical_name: entry_payload["snapshot_food_name_override"] = food_name status, entry_body = fitness_api("POST", "/entries", user_id, entry_payload) return {"ok": 200 <= status < 300, "status": status, "body": entry_body} # ── Fitness: split + resolve bundle ── def split_input_items(user_id, phrase): status, body = fitness_api("POST", "/foods/split", user_id, {"phrase": phrase}) if 200 <= status < 300 and isinstance(body.get("items"), list) and body["items"]: return [str(i).strip() for i in body["items"] if str(i).strip()] return [p.strip() for p in phrase.split(",") if p.strip()] def draft_from_resolved_item(resolved, entry_date): parsed = resolved.get("parsed") or {} matched_food = resolved.get("matched_food") or None ai_estimate = resolved.get("ai_estimate") or None quantity = max(to_number(parsed.get("quantity"), 1), 0.1) unit = str(parsed.get("unit") or "") or "serving" meal = parsed.get("meal_type") if meal not in ("breakfast", "lunch", "dinner", "snack"): meal = "snack" calories = protein = carbs = fat = sugar = fiber = 0 if matched_food: calories = to_number(matched_food.get("calories_per_base")) * quantity protein = to_number(matched_food.get("protein_per_base")) * quantity carbs = to_number(matched_food.get("carbs_per_base")) * quantity fat = to_number(matched_food.get("fat_per_base")) * quantity sugar = to_number(matched_food.get("sugar_per_base")) * quantity fiber = to_number(matched_food.get("fiber_per_base")) * quantity elif ai_estimate: calories = to_number(ai_estimate.get("calories_per_base")) * quantity protein = to_number(ai_estimate.get("protein_per_base")) * quantity carbs = to_number(ai_estimate.get("carbs_per_base")) * quantity fat = to_number(ai_estimate.get("fat_per_base")) * quantity sugar = to_number(ai_estimate.get("sugar_per_base")) * quantity fiber = to_number(ai_estimate.get("fiber_per_base")) * quantity elif resolved.get("resolution_type") == "quick_add": calories = to_number(parsed.get("quantity")) food_name = ( (resolved.get("snapshot_name_override") if isinstance(resolved.get("snapshot_name_override"), str) and resolved["snapshot_name_override"] else None) or (matched_food.get("name") if matched_food and isinstance(matched_food.get("name"), str) and matched_food["name"] else None) or (ai_estimate.get("food_name") if ai_estimate and isinstance(ai_estimate.get("food_name"), str) and ai_estimate["food_name"] else None) or (resolved.get("raw_text") if isinstance(resolved.get("raw_text"), str) and resolved["raw_text"] else None) or "Quick add" ) serving_label = "" if ai_estimate and isinstance(ai_estimate.get("serving_description"), str): serving_label = ai_estimate["serving_description"] elif quantity > 0 and unit: serving_label = f"{quantity} {unit}" return clamp_draft({ "food_name": food_name, "meal_type": meal, "entry_date": entry_date, "quantity": quantity, "unit": unit, "calories": round(calories), "protein": round(protein), "carbs": round(carbs), "fat": round(fat), "sugar": round(sugar), "fiber": round(fiber), "note": resolved.get("note") if isinstance(resolved.get("note"), str) else "", "default_serving_label": serving_label, }) def build_draft_bundle(user_id, phrase, entry_date): parts = split_input_items(user_id, phrase) if len(parts) < 2: return None results = [] for part in parts: status, body = fitness_api("POST", "/foods/resolve", user_id, { "raw_phrase": part, "entry_date": entry_date, "source": "assistant", }) if status < 200 or status >= 300: continue results.append(draft_from_resolved_item(body, entry_date)) valid = [d for d in results if has_complete_draft(d)] return valid if valid else None def revise_draft_bundle(messages, drafts, image_data_url=None): if not OPENAI_API_KEY or not drafts: return None system_prompt = ( "You are revising a bundled food draft inside a fitness app.\n\n" "Return ONLY JSON like:\n" '{\n "reply": "short assistant reply",\n' ' "drafts": [\n {\n "food_name": "string",\n' ' "meal_type": "breakfast|lunch|dinner|snack",\n' ' "entry_date": "YYYY-MM-DD",\n "quantity": 1,\n' ' "unit": "serving",\n "calories": 0,\n "protein": 0,\n' ' "carbs": 0,\n "fat": 0,\n "sugar": 0,\n "fiber": 0,\n' ' "note": "",\n "default_serving_label": ""\n }\n ]\n}\n\n' "Rules:\n" "- Update only the item or items the user is correcting.\n" "- Keep untouched items unchanged.\n" "- If the user says one item is wrong, replace that item without collapsing the bundle into one merged food.\n" "- Preserve meal and entry date unless the user changes them.\n" "- Keep replies brief and natural.\n" "- If a photo is attached, you may use it again for corrections.\n\n" f"Current bundle:\n{json.dumps(drafts, indent=2)}" ) parsed = call_openai(system_prompt, messages, temperature=0.2, max_tokens=1000, image_url=image_data_url) if not parsed: return None next_drafts = [clamp_draft(d) for d in parsed.get("drafts", [])] if isinstance(parsed.get("drafts"), list) else [] if not has_complete_drafts(next_drafts): return None return { "reply": parsed.get("reply") or "I updated those items.", "drafts": next_drafts, } # ── Fitness handler ── def handle_fitness_assistant(handler, body, user): global _current_user _current_user = user try: data = json.loads(body) except Exception: handler._send_json({"error": "Invalid JSON"}, 400) return messages_raw = data.get("messages") or [] draft_raw = data.get("draft") drafts_raw = data.get("drafts") action = data.get("action", "chat") image_data_url = data.get("imageDataUrl") entry_date = data.get("entryDate") user_id = user["id"] requested_date = entry_date if isinstance(entry_date, str) and re.match(r'^\d{4}-\d{2}-\d{2}$', entry_date) else today_iso() current_draft = clamp_draft({ "entry_date": requested_date, **(draft_raw if isinstance(draft_raw, dict) else {}), }) current_drafts = [] if isinstance(drafts_raw, list): for item in drafts_raw: if item and isinstance(item, dict): current_drafts.append(clamp_draft({"entry_date": requested_date, **item})) # ── Apply action ── if action == "apply": if has_complete_drafts(current_drafts): results = [apply_draft(user_id, item, i) for i, item in enumerate(current_drafts)] failed = next((r for r in results if not r["ok"]), None) if failed: handler._send_json({ "reply": "I couldn't add all of those entries yet. Try again in a moment.", "drafts": current_drafts, "applied": False, "error": failed["body"].get("error") or f"Fitness API returned {failed['status']}", }, 500) return handler._send_json({ "reply": f"Added {len(current_drafts)} items to {current_drafts[0].get('meal_type') or 'your log'}.", "drafts": current_drafts, "applied": True, "entries": [r["body"] for r in results], }) return if not has_complete_draft(current_draft): handler._send_json({ "reply": "I still need a food and calories before I can add it.", "draft": current_draft, "drafts": current_drafts, "applied": False, }) return result = apply_draft(user_id, current_draft) if not result["ok"]: handler._send_json({ "reply": "I couldn't add that entry yet. Try again in a moment.", "draft": current_draft, "applied": False, "error": result["body"].get("error") or f"Fitness API returned {result['status']}", }, 500) return handler._send_json({ "reply": f"Added {current_draft.get('food_name')} to {current_draft.get('meal_type')}.", "draft": current_draft, "drafts": [], "applied": True, "entry": result["body"], }) return # ── Chat action ── chat = recent_messages(messages_raw, 10) user_text = last_user_message(chat) allow_apply = is_explicit_confirmation(user_text) retry_requested = is_retry_request(user_text) has_photo = ( isinstance(image_data_url, str) and image_data_url.startswith("data:image/") and len(image_data_url) < 8_000_000 ) # Bundle revision if not allow_apply and len(current_drafts) > 1 and user_text.strip(): revised = revise_draft_bundle( chat, current_drafts, image_data_url if has_photo else None, ) if revised: handler._send_json({ "reply": revised["reply"], "drafts": revised["drafts"], "draft": None, "applied": False, }) return # Multi-item split if not has_photo and not retry_requested and not allow_apply and user_text.strip(): bundle = build_draft_bundle(user_id, user_text, requested_date) if bundle and len(bundle) > 1: meal = bundle[0].get("meal_type") or "snack" names = ", ".join(d.get("food_name", "") for d in bundle) handler._send_json({ "reply": f"I split that into {len(bundle)} items for {meal}: {names}. Add them when this looks right.", "drafts": bundle, "draft": None, "applied": False, }) return # Main chat with OpenAI system_prompt = ( "You are a conversational fitness logging assistant inside a personal app.\n\n" "Your job:\n" "- read the chat plus the current draft food entry\n" "- update the draft naturally\n" "- keep the reply short, plain, and useful\n" "- do not add an entry on the first food message\n" '- only set apply_now=true if the latest user message is a pure confirmation like "add it", "log it", "save it", or "looks good add it"\n' "- never say an item was added, logged, or saved unless apply_now=true for that response\n" "- if a food photo is attached, identify the likely food, portion, and meal context before drafting\n" "- if the user says the current guess is wrong, treat that as authoritative and replace the draft instead of defending the previous guess\n\n" "Return ONLY JSON with this shape:\n" '{\n' ' "reply": "short assistant reply",\n' ' "draft": {\n' ' "food_name": "string",\n' ' "meal_type": "breakfast|lunch|dinner|snack",\n' ' "entry_date": "YYYY-MM-DD",\n' ' "quantity": 1,\n' ' "unit": "serving",\n' ' "calories": 0,\n' ' "protein": 0,\n' ' "carbs": 0,\n' ' "fat": 0,\n' ' "sugar": 0,\n' ' "fiber": 0,\n' ' "note": "",\n' ' "default_serving_label": ""\n' ' },\n' ' "apply_now": false\n' '}\n\n' "Rules:\n" "- Preserve the current draft unless the user changes something.\n" "- If the latest user message says the guess is wrong, try again, or search again, do not cling to the old food guess. Replace the draft with a new best guess.\n" '- If the user says "make it 150 calories", update calories and keep the rest unless another field should obviously move with it.\n' "- If the user says a meal, move it to that meal.\n" "- Default meal_type to snack if not specified.\n" "- Default entry_date to today unless the user specifies another date.\n" "- Estimate realistic nutrition when needed.\n" "- Always include sugar and fiber estimates, even if rough.\n" '- Keep food_name human and concise, for example "2 boiled eggs".\n' "- If the photo is a nutrition label or package nutrition panel, extract the serving size from the label and put it in default_serving_label.\n" "- If the user gives a product name plus a nutrition label photo, use the label values instead of guessing from memory.\n" "- If the photo is ambiguous, briefly mention up to 2 likely alternatives instead of sounding overconfident.\n" "- After drafting or revising, summarize the draft with calories and key macros, then ask for confirmation.\n" "- If a photo is unclear, say what you think it is and mention the uncertainty briefly.\n" "- If retrying from a photo, use the image again and produce a different best guess or ask one short clarifying question.\n" "- If details are missing, ask one short follow-up instead of overexplaining.\n" "- When the user confirms, keep the reply brief because the app will add the entry next.\n\n" f"Today is {today_iso()}.\n" f"Current draft:\n{json.dumps(draft_for_retry(current_draft) if retry_requested else current_draft, indent=2)}" ) if not OPENAI_API_KEY: handler._send_json({ "reply": "Assistant is not configured yet.", "draft": current_draft, "drafts": current_drafts, "applied": False, }, 500) return parsed = call_openai( system_prompt, chat, temperature=0.2, max_tokens=900, image_url=image_data_url if has_photo else None, ) if not parsed: handler._send_json({ "reply": "The assistant did not respond cleanly.", "draft": current_draft, "drafts": current_drafts, "applied": False, }, 500) return next_draft = clamp_draft(parsed.get("draft") or current_draft) if parsed.get("apply_now") and allow_apply and has_complete_draft(next_draft): result = apply_draft(user_id, next_draft) if result["ok"]: handler._send_json({ "reply": f"Added {next_draft.get('food_name')} to {next_draft.get('meal_type')}.", "draft": next_draft, "drafts": [], "applied": True, "entry": result["body"], }) return reply = parsed.get("reply") or ( f"{next_draft.get('food_name')} is staged at {round(next_draft.get('calories') or 0)} calories." if has_complete_draft(next_draft) else "I updated the draft." ) handler._send_json({ "reply": reply, "draft": next_draft, "drafts": [], "applied": False, }) # ── Brain: internal API helpers ── def brain_search(user_id, q, limit=5): status, body = brain_api("POST", "/search/hybrid", user_id, {"q": q, "limit": limit}) if 200 <= status < 300 and isinstance(body.get("items"), list): return body["items"] return [] def create_brain_note(user_id, content, title=None): payload = {"type": "note", "raw_content": content} if title and title.strip(): payload["title"] = title.strip() status, body = brain_api("POST", "/items", user_id, payload) return {"ok": 200 <= status < 300, "status": status, "body": body} def update_brain_item(user_id, item_id, updates): status, body = brain_api("PATCH", f"/items/{item_id}", user_id, updates) return {"ok": 200 <= status < 300, "status": status, "body": body} def append_to_item(user_id, item_id, content): status, body = brain_api("POST", f"/items/{item_id}/additions", user_id, { "content": content, "source": "assistant", "kind": "append", }) return {"ok": 200 <= status < 300, "status": status, "body": body} def delete_addition(user_id, item_id, addition_id): status, _ = brain_api("DELETE", f"/items/{item_id}/additions/{addition_id}", user_id) return 200 <= status < 300 def delete_item(user_id, item_id): status, _ = brain_api("DELETE", f"/items/{item_id}", user_id) return 200 <= status < 300 def get_item(user_id, item_id): status, body = brain_api("GET", f"/items/{item_id}", user_id) if 200 <= status < 300 and body: return body return None # ── Brain: search + decide ── def derive_search_queries(messages, user_text): normalized = normalize_search_seed(user_text) fallback = [normalized or user_text.strip()] fallback = [q for q in fallback if q] system_prompt = ( "You extract concise retrieval queries for a personal knowledge base.\n\n" "Return ONLY JSON:\n" '{\n "queries": ["query one", "query two"]\n}\n\n' "Rules:\n" "- Return 1 to 3 short search queries.\n" "- Focus on the underlying topic, not chat filler.\n" '- For "what do I have about X", include just X.\n' "- For advice/tips, you may infer a likely broader topic if strongly implied.\n" "- Keep each query short, usually 1 to 5 words.\n" "- Do not include punctuation-heavy sentences.\n" "- Do not include explanatory text." ) parsed = call_openai(system_prompt, messages[-6:] + [{"role": "user", "content": f"Search intent: {user_text}"}], temperature=0.1, max_tokens=200) if not parsed: return fallback queries = parsed.get("queries") if isinstance(queries, list): queries = [str(q).strip() for q in queries if str(q).strip()][:3] return queries if queries else fallback return fallback def collect_candidates(user_id, messages, user_text, limit=8): queries = derive_search_queries(messages, user_text) merged = {} for q in queries: items = brain_search(user_id, q, limit) for item in items: if item.get("id") not in merged: merged[item["id"]] = item if not merged: for item in brain_search(user_id, user_text, limit): if item.get("id") not in merged: merged[item["id"]] = item return list(merged.values())[:limit] def decide_action(messages, candidates, state, listing_intent=False): system_prompt = ( "You are the Brain assistant inside a personal app.\n\n" "You help the user save notes naturally, append thoughts to existing items, answer questions from saved notes, and choose whether to create a new note.\n\n" "Return ONLY JSON with this shape:\n" '{\n' ' "action": "append_existing" | "create_new_note" | "update_existing" | "answer" | "list_items" | "delete_target",\n' ' "reply": "short reply preview",\n' ' "target_item_id": "optional item id",\n' ' "formatted_content": "text to append or create",\n' ' "create_title": "short AI-generated title when creating a new note",\n' ' "answer": "short answer when action=answer",\n' ' "source_item_ids": ["id1", "id2"],\n' ' "match_confidence": "high" | "low"\n' '}\n\n' "Rules:\n" "- Use existing items only when the topical match is strong.\n" "- If the match is weak or ambiguous, create a new note.\n" "- The user prefers speed: do not ask which note to use.\n" "- If the user explicitly says update, change, edit, replace, set, or correct, prefer update_existing when one strong existing note clearly matches.\n" "- If the user is asking to list what they have saved, prefer list_items instead of answer.\n" "- For short tips/advice, format as bullets when natural.\n" "- Only fix spelling and grammar. Do not rewrite the meaning.\n" "- For questions, answer briefly and cite up to 3 source ids.\n" "- For list_items, return a concise list-style reply and cite up to 12 source ids.\n" "- For delete requests, choose the most likely target and set action=delete_target.\n" "- Never choose append_existing unless target_item_id is one of the candidate ids.\n" "- Never choose update_existing unless target_item_id is one of the candidate ids.\n" "- If all candidates are weak, set action=create_new_note and match_confidence=low.\n" f"- The current assistant state is only for context:\n{json.dumps(state, indent=2)}\n" f"- listing_intent={listing_intent}\n" ) ok, result = call_openai_raw( system_prompt, messages + [{"role": "user", "content": f"Candidate items:\n{build_candidate_summary(candidates)}"}], temperature=0.2, max_tokens=1100, ) if not ok: raise Exception(result) return result def rewrite_existing_item_content(messages, target, user_instruction): existing = (target.get("raw_content") or target.get("extracted_text") or "").strip() if not existing: raise Exception("Target item has no editable content.") system_prompt = ( "You edit an existing personal note in place.\n\n" "Return ONLY JSON:\n" '{\n "updated_content": "full updated note body"\n}\n\n' "Rules:\n" "- Apply the user's requested update to the existing note.\n" "- Preserve the note's structure and wording as much as possible.\n" "- Make the smallest correct change needed.\n" "- Do not append a new line when the user clearly wants an existing value updated.\n" "- Only fix spelling and grammar where needed for the changed text.\n" "- Return the full updated note body, not a diff." ) ok, result = call_openai_raw( system_prompt, list(messages[-8:]) + [{ "role": "user", "content": f"Target title: {target.get('title') or 'Untitled'}\n\nExisting note:\n{existing}\n\nInstruction: {user_instruction}", }], temperature=0.1, max_tokens=900, ) if not ok: raise Exception(result) updated = (result.get("updated_content") or "").strip() if not updated: raise Exception("Updated content was empty.") return updated # ── Brain handler ── def handle_brain_assistant(handler, body, user): try: data = json.loads(body) except Exception: handler._send_json({"error": "Invalid JSON"}, 400) return if not OPENAI_API_KEY: handler._send_json({"error": "Assistant is not configured."}, 500) return messages_raw = data.get("messages") or [] state = data.get("state") or {} if not isinstance(state, dict): state = {} user_id = user["id"] chat = recent_messages(messages_raw, 12) user_text = last_user_message(chat) if not user_text: handler._send_json({ "reply": "Tell me what to save, find, answer, or delete.", "state": state, "sources": [], }) return # ── Pending delete confirmation ── pending_delete = state.get("pendingDelete") if pending_delete and is_confirmation(user_text): ok = delete_item(user_id, pending_delete["itemId"]) if not ok: handler._send_json({ "reply": f"I couldn't delete \"{pending_delete['itemTitle']}\" yet.", "state": state, "sources": [], }, 500) return handler._send_json({ "reply": f"Deleted \"{pending_delete['itemTitle']}\".", "state": {}, "sources": [], }) return # ── Undo ── last_mutation = state.get("lastMutation") if last_mutation and is_undo(user_text): if last_mutation.get("type") == "append" and last_mutation.get("additionId"): ok = delete_addition(user_id, last_mutation["itemId"], last_mutation["additionId"]) handler._send_json({ "reply": f"Undid the change in \"{last_mutation['itemTitle']}\"." if ok else "I could not undo that change yet.", "state": {} if ok else state, "sources": [{ "id": last_mutation["itemId"], "title": last_mutation["itemTitle"], "type": "note", "href": f"/brain?item={last_mutation['itemId']}", }] if ok else [], }) return if last_mutation.get("type") == "create" and last_mutation.get("createdItemId"): ok = delete_item(user_id, last_mutation["createdItemId"]) handler._send_json({ "reply": f"Removed \"{last_mutation['itemTitle']}\"." if ok else "I could not undo that note creation yet.", "state": {} if ok else state, "sources": [], }) return if last_mutation.get("type") == "update" and last_mutation.get("previousRawContent") is not None: restored = update_brain_item(user_id, last_mutation["itemId"], { "raw_content": last_mutation["previousRawContent"], }) handler._send_json({ "reply": f"Undid the update in \"{last_mutation['itemTitle']}\"." if restored["ok"] else "I could not undo that update yet.", "state": {} if restored["ok"] else state, "sources": [{ "id": last_mutation["itemId"], "title": last_mutation["itemTitle"], "type": "note", "href": f"/brain?item={last_mutation['itemId']}", }] if restored["ok"] else [], }) return # ── Make new note instead ── if last_mutation and wants_new_note(user_text): content = last_mutation.get("content", "") if last_mutation.get("type") == "append" and last_mutation.get("additionId"): delete_addition(user_id, last_mutation["itemId"], last_mutation["additionId"]) try: title_decision = decide_action( [{"role": "user", "content": f"Create a concise title for this note:\n{content}"}], [], state, ) except Exception: title_decision = {"create_title": "New Note"} created = create_brain_note(user_id, content, title_decision.get("create_title")) if not created["ok"]: handler._send_json({"reply": "I could not create the new note yet.", "state": state, "sources": []}, 500) return created_item = created["body"] title = created_item.get("title") or title_decision.get("create_title") or "New note" handler._send_json({ "reply": f"Created \"{title}\".", "state": { "lastMutation": { "type": "create", "itemId": created_item.get("id"), "createdItemId": created_item.get("id"), "itemTitle": title, "content": content, }, }, "sources": [to_source(created_item)], }) return # ── Move/retarget ── retarget = move_target(user_text) if last_mutation else None if last_mutation and retarget: if last_mutation.get("type") == "append" and last_mutation.get("additionId"): delete_addition(user_id, last_mutation["itemId"], last_mutation["additionId"]) elif last_mutation.get("type") == "create" and last_mutation.get("createdItemId"): delete_item(user_id, last_mutation["createdItemId"]) candidates = collect_candidates(user_id, chat, retarget, 8) target = candidates[0] if candidates else None if not target: created = create_brain_note(user_id, last_mutation["content"], retarget) if not created["ok"]: handler._send_json({"reply": "I could not move that into a new note yet.", "state": state, "sources": []}, 500) return created_item = created["body"] handler._send_json({ "reply": f"Created \"{created_item.get('title') or retarget}\".", "state": { "lastMutation": { "type": "create", "itemId": created_item.get("id"), "createdItemId": created_item.get("id"), "itemTitle": created_item.get("title") or retarget, "content": last_mutation["content"], }, }, "sources": [to_source(created_item)], }) return appended = append_to_item(user_id, target["id"], last_mutation["content"]) if not appended["ok"]: handler._send_json({ "reply": f"I couldn't move that to \"{target.get('title') or 'that item'}\" yet.", "state": state, "sources": [], }, 500) return handler._send_json({ "reply": f"Moved it to \"{target.get('title') or 'Untitled'}\".", "state": { "lastMutation": { "type": "append", "itemId": target["id"], "itemTitle": target.get("title") or "Untitled", "additionId": appended["body"].get("id"), "content": last_mutation["content"], }, }, "sources": [to_source(target)], }) return # ── Listing / explicit update / delete shortcut ── listing_intent = is_listing_intent(user_text) candidates = collect_candidates(user_id, chat, user_text, 24 if listing_intent else 8) # Explicit update shortcut if is_explicit_update(user_text) and candidates and (candidates[0].get("raw_content") or candidates[0].get("extracted_text")): target = candidates[0] try: updated_content = rewrite_existing_item_content(chat, target, user_text) updated = update_brain_item(user_id, target["id"], {"raw_content": updated_content}) if not updated["ok"]: handler._send_json({ "reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.", "state": state, "sources": [], }, 500) return handler._send_json({ "reply": f"Updated \"{target.get('title') or 'Untitled'}\".", "state": { "lastMutation": { "type": "update", "itemId": target["id"], "itemTitle": target.get("title") or "Untitled", "content": updated_content, "previousRawContent": target.get("raw_content") or target.get("extracted_text") or "", }, }, "sources": [to_source(target)], }) return except Exception as e: handler._send_json({ "reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.", "state": state, "sources": [to_source(target)], "error": str(e), }, 500) return # Delete shortcut if wants_delete(user_text) and not state.get("pendingDelete"): target = candidates[0] if candidates else None if not target: handler._send_json({"reply": "I couldn't find a note to delete.", "state": state, "sources": []}) return handler._send_json({ "reply": f"Delete \"{target.get('title') or 'Untitled'}\"?", "state": { **state, "pendingDelete": { "itemId": target["id"], "itemTitle": target.get("title") or "Untitled", }, }, "sources": [to_source(target)], }) return # ── AI decision ── try: decision = decide_action(chat, candidates, state, listing_intent) except Exception as e: handler._send_json({ "reply": "The Brain assistant did not respond cleanly.", "state": state, "sources": [], "error": str(e), }, 500) return action_type = decision.get("action") if action_type == "answer": handler._send_json({ "reply": decision.get("answer") or decision.get("reply") or "I found a few relevant notes.", "state": state, "sources": source_links_from_ids(candidates, decision.get("source_item_ids")), }) return if action_type == "list_items": handler._send_json({ "reply": decision.get("answer") or decision.get("reply") or "Here are the matching items I found.", "state": state, "sources": source_links_from_ids(candidates, decision.get("source_item_ids")), }) return if action_type == "delete_target" and decision.get("target_item_id"): target = None for c in candidates: if c.get("id") == decision["target_item_id"]: target = c break if not target: target = get_item(user_id, decision["target_item_id"]) if not target: handler._send_json({"reply": "I couldn't find that note to delete.", "state": state, "sources": []}) return handler._send_json({ "reply": f"Delete \"{target.get('title') or 'Untitled'}\"?", "state": { **state, "pendingDelete": { "itemId": target["id"], "itemTitle": target.get("title") or "Untitled", }, }, "sources": [to_source(target)], }) return if action_type == "update_existing" and decision.get("target_item_id") and decision.get("match_confidence") == "high": target = None for c in candidates: if c.get("id") == decision["target_item_id"]: target = c break if not target: target = get_item(user_id, decision["target_item_id"]) if target and (target.get("raw_content") or target.get("extracted_text")): try: updated_content = rewrite_existing_item_content(chat, target, user_text) updated = update_brain_item(user_id, target["id"], {"raw_content": updated_content}) if not updated["ok"]: handler._send_json({ "reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.", "state": state, "sources": [], }, 500) return handler._send_json({ "reply": f"Updated \"{target.get('title') or 'Untitled'}\".", "state": { "lastMutation": { "type": "update", "itemId": target["id"], "itemTitle": target.get("title") or "Untitled", "content": updated_content, "previousRawContent": target.get("raw_content") or target.get("extracted_text") or "", }, }, "sources": [to_source(target)], }) return except Exception as e: handler._send_json({ "reply": f"I couldn't update \"{target.get('title') or 'Untitled'}\" yet.", "state": state, "sources": [to_source(target)] if target else [], "error": str(e), }, 500) return content = (decision.get("formatted_content") or "").strip() or user_text if action_type == "append_existing" and decision.get("target_item_id") and decision.get("match_confidence") == "high": target = None for c in candidates: if c.get("id") == decision["target_item_id"]: target = c break if not target: target = get_item(user_id, decision["target_item_id"]) if target: appended = append_to_item(user_id, target["id"], content) if not appended["ok"]: handler._send_json({ "reply": f"I couldn't add that to \"{target.get('title') or 'Untitled'}\" yet.", "state": state, "sources": [], }, 500) return handler._send_json({ "reply": f"Added to \"{target.get('title') or 'Untitled'}\".", "state": { "lastMutation": { "type": "append", "itemId": target["id"], "itemTitle": target.get("title") or "Untitled", "additionId": appended["body"].get("id"), "content": content, }, }, "sources": [to_source(target)], }) return # ── Fall through: create new note ── created = create_brain_note(user_id, content, decision.get("create_title")) if not created["ok"]: handler._send_json({"reply": "I could not create the note yet.", "state": state, "sources": []}, 500) return created_item = created["body"] title = created_item.get("title") or decision.get("create_title") or "New note" handler._send_json({ "reply": f"Created \"{title}\".", "state": { "lastMutation": { "type": "create", "itemId": created_item.get("id"), "createdItemId": created_item.get("id"), "itemTitle": title, "content": content, }, }, "sources": [to_source(created_item)], }) # ── Unified dispatcher ── def handle_assistant(handler, body, user): try: data = json.loads(body) except Exception: handler._send_json({"error": "Invalid JSON"}, 400) return messages_raw = data.get("messages") or [] state_raw = data.get("state") or {} if not isinstance(state_raw, dict): state_raw = {} image_data_url = data.get("imageDataUrl") entry_date = data.get("entryDate") action = data.get("action", "chat") allow_brain = bool(data.get("allowBrain", True)) # Disable brain for user "madiha" if user.get("username") == "madiha": allow_brain = False chat = recent_messages(messages_raw, 16) unified_state = state_raw domain = detect_domain(chat, unified_state, image_data_url, allow_brain) if allow_brain else "fitness" if domain == "fitness": fitness_state = unified_state.get("fitnessState") or {} print(f"[Dispatcher] fitnessState={json.dumps(fitness_state)[:300]}", file=sys.stderr) inner_body = json.dumps({ "action": action, "messages": messages_raw, "draft": fitness_state.get("draft") if isinstance(fitness_state, dict) and "draft" in fitness_state else None, "drafts": fitness_state.get("drafts") if isinstance(fitness_state, dict) and "drafts" in fitness_state else [], "entryDate": entry_date, "imageDataUrl": image_data_url, }).encode() # Call fitness handler directly, capturing the response result = _call_fitness(user, inner_body) else: brain_state = unified_state.get("brainState") or {} inner_body = json.dumps({ "messages": messages_raw, "state": brain_state, }).encode() result = _call_brain(user, inner_body) if result is None: handler._send_json({"error": "Internal assistant error"}, 500) return response_data = result.get("data", {}) response_status = result.get("status", 200) # Merge unified state if domain == "fitness": merged_state = { "activeDomain": domain, "fitnessState": { "draft": response_data.get("draft"), "drafts": response_data.get("drafts") if isinstance(response_data.get("drafts"), list) else [], }, "brainState": unified_state.get("brainState") or {}, } else: merged_state = { "activeDomain": domain, "fitnessState": unified_state.get("fitnessState") or {}, "brainState": response_data.get("state") or {}, } output = {**response_data, "domain": domain, "state": merged_state} handler._send_json(output, response_status) class _CaptureHandler: """Lightweight mock handler that captures _send_json calls.""" def __init__(self): self.result = None def _send_json(self, data, status=200): self.result = {"data": data, "status": status} def _call_fitness(user, body): cap = _CaptureHandler() handle_fitness_assistant(cap, body, user) return cap.result def _call_brain(user, body): cap = _CaptureHandler() handle_brain_assistant(cap, body, user) return cap.result