"""Second Brain Task Manager — self-contained SQLite-backed task service.""" import json import os import time import uuid import urllib.parse from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from datetime import datetime, timedelta from pathlib import Path from threading import Lock PORT = int(os.environ.get("PORT", 8098)) DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data")) DB_PATH = DATA_DIR / "tasks.db" # ── Database ── import sqlite3 def get_db(): conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA journal_mode = WAL") return conn def init_db(): DATA_DIR.mkdir(parents=True, exist_ok=True) conn = get_db() c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS users ( id TEXT PRIMARY KEY, username TEXT NOT NULL, display_name TEXT NOT NULL DEFAULT '', created_at TEXT DEFAULT CURRENT_TIMESTAMP )''') c.execute('''CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, name TEXT NOT NULL, color TEXT DEFAULT '', sort_order INTEGER DEFAULT 0, is_inbox INTEGER DEFAULT 0, is_shared INTEGER DEFAULT 0, archived INTEGER DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) )''') c.execute('''CREATE TABLE IF NOT EXISTS project_members ( project_id TEXT NOT NULL, user_id TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'member', added_at TEXT DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (project_id, user_id), FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE, FOREIGN KEY (user_id) REFERENCES users(id) )''') c.execute('''CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, user_id TEXT NOT NULL, title TEXT NOT NULL, content TEXT DEFAULT '', status INTEGER DEFAULT 0, priority INTEGER DEFAULT 0, start_date TEXT, due_date TEXT, is_all_day INTEGER DEFAULT 1, completed_at TEXT, repeat_flag TEXT, repeat_from TEXT DEFAULT 'due', parent_task_id TEXT, reminders TEXT DEFAULT '[]', sort_order INTEGER DEFAULT 0, gcal_event_id TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id), FOREIGN KEY (user_id) REFERENCES users(id), FOREIGN KEY (parent_task_id) REFERENCES tasks(id) ON DELETE SET NULL )''') c.execute('''CREATE TABLE IF NOT EXISTS tags ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, name TEXT NOT NULL, color TEXT DEFAULT '', created_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(user_id, name), FOREIGN KEY (user_id) REFERENCES users(id) )''') c.execute('''CREATE TABLE IF NOT EXISTS task_tags ( task_id TEXT NOT NULL, tag_id TEXT NOT NULL, PRIMARY KEY (task_id, tag_id), FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE )''') c.execute('''CREATE TABLE IF NOT EXISTS completions ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, user_id TEXT NOT NULL, completed_at TEXT NOT NULL, FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, FOREIGN KEY (user_id) REFERENCES users(id) )''') c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_user_status ON tasks(user_id, status)") c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_id)") c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_date)") c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_task_id)") c.execute("CREATE INDEX IF NOT EXISTS idx_project_members ON project_members(user_id)") conn.commit() conn.close() # ── User + Inbox helpers ── def ensure_user(user_id, username="", display_name=""): """Upsert a user record and ensure they have an Inbox project.""" conn = get_db() c = conn.cursor() existing = c.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone() if not existing: c.execute("INSERT INTO users (id, username, display_name) VALUES (?, ?, ?)", (user_id, username, display_name)) # Create Inbox for new user inbox_id = str(uuid.uuid4()) c.execute("INSERT INTO projects (id, user_id, name, is_inbox, sort_order) VALUES (?, ?, 'Inbox', 1, -1)", (inbox_id, user_id)) conn.commit() conn.close() def get_inbox_id(user_id): conn = get_db() row = conn.execute("SELECT id FROM projects WHERE user_id = ? AND is_inbox = 1", (user_id,)).fetchone() conn.close() return row["id"] if row else None def get_user_project_ids(user_id): """Get all project IDs the user can access (owned + shared membership).""" conn = get_db() owned = [r["id"] for r in conn.execute( "SELECT id FROM projects WHERE user_id = ? AND archived = 0", (user_id,)).fetchall()] shared = [r["project_id"] for r in conn.execute( "SELECT project_id FROM project_members WHERE user_id = ?", (user_id,)).fetchall()] # Also include projects marked is_shared (visible to all) global_shared = [r["id"] for r in conn.execute( "SELECT id FROM projects WHERE is_shared = 1 AND archived = 0 AND user_id != ?", (user_id,)).fetchall()] conn.close() return list(set(owned + shared + global_shared)) # ── RRULE Parser ── def advance_rrule(due_date_str, repeat_flag, from_date_str=None): """Given a due date and RRULE string, compute the next occurrence. Returns ISO date string or None if recurrence is exhausted.""" if not repeat_flag: return None base = datetime.fromisoformat(due_date_str.replace("+0000", "+00:00").replace("Z", "+00:00")) if from_date_str: base = datetime.fromisoformat(from_date_str.replace("+0000", "+00:00").replace("Z", "+00:00")) # Parse RRULE components parts = {} for segment in repeat_flag.replace("RRULE:", "").split(";"): if "=" in segment: k, v = segment.split("=", 1) parts[k.upper()] = v freq = parts.get("FREQ", "DAILY").upper() interval = int(parts.get("INTERVAL", "1")) until = parts.get("UNTIL") count = parts.get("COUNT") # not enforced here, checked by caller # Compute next date if freq == "DAILY": next_dt = base + timedelta(days=interval) elif freq == "WEEKLY": byday = parts.get("BYDAY", "") if byday: day_map = {"MO": 0, "TU": 1, "WE": 2, "TH": 3, "FR": 4, "SA": 5, "SU": 6} target_days = sorted([day_map[d.strip()] for d in byday.split(",") if d.strip() in day_map]) if target_days: current_wd = base.weekday() # Find next target day found = False for td in target_days: if td > current_wd: next_dt = base + timedelta(days=(td - current_wd)) found = True break if not found: # Wrap to first day of next week(s) days_to_next = (7 * interval) - current_wd + target_days[0] next_dt = base + timedelta(days=days_to_next) else: next_dt = base + timedelta(weeks=interval) else: next_dt = base + timedelta(weeks=interval) elif freq == "MONTHLY": bymonthday = parts.get("BYMONTHDAY") month = base.month + interval year = base.year + (month - 1) // 12 month = ((month - 1) % 12) + 1 day = int(bymonthday) if bymonthday else base.day # Clamp day to valid range import calendar max_day = calendar.monthrange(year, month)[1] day = min(day, max_day) next_dt = base.replace(year=year, month=month, day=day) elif freq == "YEARLY": next_dt = base.replace(year=base.year + interval) else: return None # Check UNTIL if until: try: until_dt = datetime.strptime(until[:8], "%Y%m%d").replace(tzinfo=base.tzinfo) if next_dt > until_dt: return None except ValueError: pass return next_dt.isoformat() # ── Task helpers ── def task_to_dict(row, project_name=""): """Convert a SQLite row to the API response format (TickTick-compatible field names).""" return { "id": row["id"], "title": row["title"], "content": row["content"] or "", "projectId": row["project_id"], "_projectName": project_name or "", "_projectId": row["project_id"], "dueDate": row["due_date"], "startDate": row["start_date"], "isAllDay": bool(row["is_all_day"]), "priority": row["priority"], "status": row["status"], "repeatFlag": row["repeat_flag"] or "", "completedAt": row["completed_at"], "reminders": json.loads(row["reminders"] or "[]"), "createdAt": row["created_at"], "sortOrder": row["sort_order"], } def fetch_tasks_with_projects(conn, where_clause, params, user_id): """Fetch tasks joined with project names, scoped to user's accessible projects.""" project_ids = get_user_project_ids(user_id) if not project_ids: return [] placeholders = ",".join("?" * len(project_ids)) sql = f"""SELECT t.*, p.name as project_name FROM tasks t JOIN projects p ON t.project_id = p.id WHERE t.project_id IN ({placeholders}) AND {where_clause} ORDER BY t.start_date IS NULL, t.start_date, t.due_date, t.sort_order""" rows = conn.execute(sql, project_ids + list(params)).fetchall() return [task_to_dict(r, r["project_name"]) for r in rows] # ── HTTP Handler ── class Handler(BaseHTTPRequestHandler): def _read_body(self): length = int(self.headers.get("Content-Length", 0)) return json.loads(self.rfile.read(length)) if length else {} def _send_json(self, data, status=200): body = json.dumps(data).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _send_error(self, msg, status=500): self._send_json({"error": msg}, status) def _parse_query(self): qs = self.path.split("?", 1)[1] if "?" in self.path else "" return dict(urllib.parse.parse_qsl(qs)) def _get_user(self): """Get user identity from gateway-injected headers.""" user_id = self.headers.get("X-Gateway-User-Id") if not user_id: return None username = self.headers.get("X-Gateway-User-Name", "") ensure_user(user_id, username, username) return user_id # ── GET ── def do_GET(self): path = self.path.split("?")[0] if path == "/health": self._send_json({"status": "ok"}) return user_id = self._get_user() if not user_id: self._send_error("Unauthorized", 401) return # List projects if path == "/api/projects": try: conn = get_db() project_ids = get_user_project_ids(user_id) if not project_ids: self._send_json({"projects": []}) conn.close() return placeholders = ",".join("?" * len(project_ids)) rows = conn.execute( f"SELECT * FROM projects WHERE id IN ({placeholders}) ORDER BY is_inbox DESC, sort_order, name", project_ids).fetchall() conn.close() projects = [{"id": r["id"], "name": r["name"], "color": r["color"], "isInbox": bool(r["is_inbox"]), "isShared": bool(r["is_shared"]), "sortOrder": r["sort_order"]} for r in rows] self._send_json({"projects": projects}) except Exception as e: self._send_error(str(e)) return # List all tasks (optional project filter) if path == "/api/tasks": try: params = self._parse_query() project_id = params.get("project_id") conn = get_db() if project_id: tasks = fetch_tasks_with_projects(conn, "t.status = 0 AND t.project_id = ?", (project_id,), user_id) else: tasks = fetch_tasks_with_projects(conn, "t.status = 0", (), user_id) conn.close() self._send_json({"tasks": tasks}) except Exception as e: self._send_error(str(e)) return # Today + overdue (dashboard widget) if path == "/api/today": try: conn = get_db() now = datetime.now() today_str = now.strftime("%Y-%m-%d") all_active = fetch_tasks_with_projects(conn, "t.status = 0", (), user_id) conn.close() today_tasks = [] overdue_tasks = [] for t in all_active: due = t.get("startDate") or t.get("dueDate") if not due: continue due_date = due[:10] if due_date == today_str: today_tasks.append(t) elif due_date < today_str: overdue_tasks.append(t) self._send_json({ "today": today_tasks, "overdue": overdue_tasks, "todayCount": len(today_tasks), "overdueCount": len(overdue_tasks), }) except Exception as e: self._send_error(str(e)) return # Completed tasks if path == "/api/tasks/completed": try: conn = get_db() tasks = fetch_tasks_with_projects(conn, "t.status = 2", (), user_id) conn.close() # Sort by completed_at descending tasks.sort(key=lambda t: t.get("completedAt") or "", reverse=True) self._send_json({"tasks": tasks[:50]}) except Exception as e: self._send_error(str(e)) return # List tags if path == "/api/tags": try: conn = get_db() rows = conn.execute("SELECT * FROM tags WHERE user_id = ? ORDER BY name", (user_id,)).fetchall() conn.close() self._send_json({"tags": [{"id": r["id"], "name": r["name"], "color": r["color"]} for r in rows]}) except Exception as e: self._send_error(str(e)) return # Get single project with tasks if path.startswith("/api/projects/") and path.count("/") == 3: project_id = path.split("/")[3] try: conn = get_db() proj = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone() if not proj: self._send_error("Not found", 404) conn.close() return tasks = fetch_tasks_with_projects(conn, "t.status = 0 AND t.project_id = ?", (project_id,), user_id) conn.close() self._send_json({"project": {"id": proj["id"], "name": proj["name"]}, "tasks": tasks}) except Exception as e: self._send_error(str(e)) return self._send_json({"error": "Not found"}, 404) # ── POST ── def do_POST(self): path = self.path.split("?")[0] body = self._read_body() user_id = self._get_user() if not user_id: self._send_error("Unauthorized", 401) return # Create task if path == "/api/tasks": try: title = body.get("title", "").strip() if not title: self._send_error("Title required", 400) return project_id = body.get("projectId") or get_inbox_id(user_id) task_id = str(uuid.uuid4()) conn = get_db() conn.execute("""INSERT INTO tasks (id, project_id, user_id, title, content, priority, start_date, due_date, is_all_day, repeat_flag, reminders, sort_order) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (task_id, project_id, user_id, title, body.get("content", ""), body.get("priority", 0), body.get("startDate"), body.get("dueDate"), 1 if body.get("isAllDay", True) else 0, body.get("repeatFlag"), json.dumps(body.get("reminders", [])), body.get("sortOrder", 0))) # Handle tags for tag_name in body.get("tags", []): tag_id = str(uuid.uuid4()) conn.execute("INSERT OR IGNORE INTO tags (id, user_id, name) VALUES (?, ?, ?)", (tag_id, user_id, tag_name)) tag_row = conn.execute("SELECT id FROM tags WHERE user_id = ? AND name = ?", (user_id, tag_name)).fetchone() if tag_row: conn.execute("INSERT OR IGNORE INTO task_tags (task_id, tag_id) VALUES (?, ?)", (task_id, tag_row["id"])) conn.commit() # Return created task row = conn.execute("SELECT t.*, p.name as project_name FROM tasks t JOIN projects p ON t.project_id = p.id WHERE t.id = ?", (task_id,)).fetchone() conn.close() self._send_json(task_to_dict(row, row["project_name"]), 201) except Exception as e: self._send_error(str(e)) return # Complete task if path.startswith("/api/tasks/") and path.endswith("/complete"): task_id = path.split("/")[3] try: conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone() if not task: self._send_error("Not found", 404) conn.close() return now_str = datetime.now().isoformat() # Mark complete conn.execute("UPDATE tasks SET status = 2, completed_at = ?, updated_at = ? WHERE id = ?", (now_str, now_str, task_id)) # Record completion conn.execute("INSERT INTO completions (id, task_id, user_id, completed_at) VALUES (?, ?, ?, ?)", (str(uuid.uuid4()), task_id, user_id, now_str)) # Handle recurrence — spawn next instance if task["repeat_flag"]: base_date = task["due_date"] or task["start_date"] if task["repeat_from"] == "completion": base_date = now_str next_date = advance_rrule(base_date, task["repeat_flag"]) if next_date: new_id = str(uuid.uuid4()) # Calculate start_date offset if both existed new_start = None if task["start_date"] and task["due_date"]: try: orig_start = datetime.fromisoformat(task["start_date"].replace("+0000", "+00:00")) orig_due = datetime.fromisoformat(task["due_date"].replace("+0000", "+00:00")) new_due = datetime.fromisoformat(next_date.replace("+0000", "+00:00")) offset = orig_due - orig_start new_start = (new_due - offset).isoformat() except: new_start = next_date elif task["start_date"]: new_start = next_date conn.execute("""INSERT INTO tasks (id, project_id, user_id, title, content, priority, start_date, due_date, is_all_day, repeat_flag, repeat_from, parent_task_id, reminders, sort_order) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (new_id, task["project_id"], task["user_id"], task["title"], task["content"], task["priority"], new_start, next_date, task["is_all_day"], task["repeat_flag"], task["repeat_from"], task["parent_task_id"] or task["id"], task["reminders"], task["sort_order"])) conn.commit() conn.close() self._send_json({"status": "completed"}) except Exception as e: self._send_error(str(e)) return # Create project if path == "/api/projects": try: name = body.get("name", "").strip() if not name: self._send_error("Name required", 400) return project_id = str(uuid.uuid4()) conn = get_db() conn.execute("INSERT INTO projects (id, user_id, name, color, is_shared) VALUES (?, ?, ?, ?, ?)", (project_id, user_id, name, body.get("color", ""), 1 if body.get("isShared") else 0)) conn.commit() conn.close() self._send_json({"id": project_id, "name": name}, 201) except Exception as e: self._send_error(str(e)) return # Create tag if path == "/api/tags": try: name = body.get("name", "").strip() if not name: self._send_error("Name required", 400) return tag_id = str(uuid.uuid4()) conn = get_db() conn.execute("INSERT INTO tags (id, user_id, name, color) VALUES (?, ?, ?, ?)", (tag_id, user_id, name, body.get("color", ""))) conn.commit() conn.close() self._send_json({"id": tag_id, "name": name}, 201) except Exception as e: self._send_error(str(e)) return # Share project if path.startswith("/api/projects/") and path.endswith("/share"): project_id = path.split("/")[3] try: target_user_id = body.get("userId") if not target_user_id: self._send_error("userId required", 400) return conn = get_db() conn.execute("INSERT OR IGNORE INTO project_members (project_id, user_id) VALUES (?, ?)", (project_id, target_user_id)) conn.commit() conn.close() self._send_json({"status": "shared"}) except Exception as e: self._send_error(str(e)) return self._send_json({"error": "Not found"}, 404) # ── PATCH ── def do_PATCH(self): path = self.path.split("?")[0] body = self._read_body() user_id = self._get_user() if not user_id: self._send_error("Unauthorized", 401) return # Update task if path.startswith("/api/tasks/") and path.count("/") == 3: task_id = path.split("/")[3] try: conn = get_db() task = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone() if not task: self._send_error("Not found", 404) conn.close() return # Build SET clause from provided fields updates = [] params = [] field_map = { "title": "title", "content": "content", "priority": "priority", "startDate": "start_date", "dueDate": "due_date", "isAllDay": "is_all_day", "repeatFlag": "repeat_flag", "projectId": "project_id", "sortOrder": "sort_order", "reminders": "reminders", } for api_field, db_field in field_map.items(): if api_field in body: val = body[api_field] if api_field == "isAllDay": val = 1 if val else 0 elif api_field == "reminders": val = json.dumps(val) updates.append(f"{db_field} = ?") params.append(val) if updates: updates.append("updated_at = ?") params.append(datetime.now().isoformat()) params.append(task_id) conn.execute(f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?", params) conn.commit() row = conn.execute("SELECT t.*, p.name as project_name FROM tasks t JOIN projects p ON t.project_id = p.id WHERE t.id = ?", (task_id,)).fetchone() conn.close() self._send_json(task_to_dict(row, row["project_name"])) except Exception as e: self._send_error(str(e)) return # Update project if path.startswith("/api/projects/") and path.count("/") == 3: project_id = path.split("/")[3] try: conn = get_db() updates = [] params = [] if "name" in body: updates.append("name = ?") params.append(body["name"]) if "color" in body: updates.append("color = ?") params.append(body["color"]) if updates: updates.append("updated_at = ?") params.append(datetime.now().isoformat()) params.append(project_id) conn.execute(f"UPDATE projects SET {', '.join(updates)} WHERE id = ?", params) conn.commit() conn.close() self._send_json({"status": "updated"}) except Exception as e: self._send_error(str(e)) return self._send_json({"error": "Not found"}, 404) # ── DELETE ── def do_DELETE(self): path = self.path.split("?")[0] user_id = self._get_user() if not user_id: self._send_error("Unauthorized", 401) return # Delete task if path.startswith("/api/tasks/") and path.count("/") == 3: task_id = path.split("/")[3] try: conn = get_db() conn.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) conn.commit() conn.close() self._send_json({"status": "deleted"}) except Exception as e: self._send_error(str(e)) return # Delete/archive project if path.startswith("/api/projects/") and path.count("/") == 3: project_id = path.split("/")[3] try: conn = get_db() proj = conn.execute("SELECT is_inbox FROM projects WHERE id = ?", (project_id,)).fetchone() if proj and proj["is_inbox"]: self._send_error("Cannot delete Inbox", 400) conn.close() return conn.execute("UPDATE projects SET archived = 1, updated_at = ? WHERE id = ?", (datetime.now().isoformat(), project_id)) conn.commit() conn.close() self._send_json({"status": "archived"}) except Exception as e: self._send_error(str(e)) return # Delete tag if path.startswith("/api/tags/") and path.count("/") == 3: tag_id = path.split("/")[3] try: conn = get_db() conn.execute("DELETE FROM tags WHERE id = ? AND user_id = ?", (tag_id, user_id)) conn.commit() conn.close() self._send_json({"status": "deleted"}) except Exception as e: self._send_error(str(e)) return self._send_json({"error": "Not found"}, 404) def log_message(self, format, *args): pass # ── Start ── if __name__ == "__main__": init_db() print(f"Task Manager listening on port {PORT}") server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler) server.serve_forever()