Files
platform/services/tasks/server.py
Yusuf Suleman 6023ebf9d0 feat: tasks app, security hardening, mobile fixes, iOS app shell
- Custom SQLite task manager replacing TickTick wrapper
- 73 tasks migrated from TickTick across 15 projects
- RRULE recurrence engine with lazy materialization
- Dashboard tasks widget (desktop sidebar + mobile card)
- Tasks page with project tabs, add/edit/complete/delete
- Security: locked ports to localhost, removed old containers
- Gitea Actions runner configured and all 3 CI jobs passing
- Fixed mobile overflow on dashboard cards
- iOS Capacitor app shell (Second Brain)
- Frontend/backend guide docs for adding new services
- TickTick Google Calendar sync re-authorized

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 15:35:57 -05:00

761 lines
30 KiB
Python

"""Second Brain Task Manager — self-contained SQLite-backed task service."""
import json
import os
import time
import uuid
import urllib.parse
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from datetime import datetime, timedelta
from pathlib import Path
from threading import Lock
PORT = int(os.environ.get("PORT", 8098))
DATA_DIR = Path(os.environ.get("DATA_DIR", "/app/data"))
DB_PATH = DATA_DIR / "tasks.db"
# ── Database ──
import sqlite3
def get_db():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
return conn
def init_db():
DATA_DIR.mkdir(parents=True, exist_ok=True)
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)''')
c.execute('''CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
color TEXT DEFAULT '',
sort_order INTEGER DEFAULT 0,
is_inbox INTEGER DEFAULT 0,
is_shared INTEGER DEFAULT 0,
archived INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
c.execute('''CREATE TABLE IF NOT EXISTS project_members (
project_id TEXT NOT NULL,
user_id TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'member',
added_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (project_id, user_id),
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
c.execute('''CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
user_id TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT DEFAULT '',
status INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0,
start_date TEXT,
due_date TEXT,
is_all_day INTEGER DEFAULT 1,
completed_at TEXT,
repeat_flag TEXT,
repeat_from TEXT DEFAULT 'due',
parent_task_id TEXT,
reminders TEXT DEFAULT '[]',
sort_order INTEGER DEFAULT 0,
gcal_event_id TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (parent_task_id) REFERENCES tasks(id) ON DELETE SET NULL
)''')
c.execute('''CREATE TABLE IF NOT EXISTS tags (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
color TEXT DEFAULT '',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, name),
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
c.execute('''CREATE TABLE IF NOT EXISTS task_tags (
task_id TEXT NOT NULL,
tag_id TEXT NOT NULL,
PRIMARY KEY (task_id, tag_id),
FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)''')
c.execute('''CREATE TABLE IF NOT EXISTS completions (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
user_id TEXT NOT NULL,
completed_at TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_user_status ON tasks(user_id, status)")
c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_id)")
c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_date)")
c.execute("CREATE INDEX IF NOT EXISTS idx_tasks_parent ON tasks(parent_task_id)")
c.execute("CREATE INDEX IF NOT EXISTS idx_project_members ON project_members(user_id)")
conn.commit()
conn.close()
# ── User + Inbox helpers ──
def ensure_user(user_id, username="", display_name=""):
"""Upsert a user record and ensure they have an Inbox project."""
conn = get_db()
c = conn.cursor()
existing = c.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
if not existing:
c.execute("INSERT INTO users (id, username, display_name) VALUES (?, ?, ?)",
(user_id, username, display_name))
# Create Inbox for new user
inbox_id = str(uuid.uuid4())
c.execute("INSERT INTO projects (id, user_id, name, is_inbox, sort_order) VALUES (?, ?, 'Inbox', 1, -1)",
(inbox_id, user_id))
conn.commit()
conn.close()
def get_inbox_id(user_id):
conn = get_db()
row = conn.execute("SELECT id FROM projects WHERE user_id = ? AND is_inbox = 1", (user_id,)).fetchone()
conn.close()
return row["id"] if row else None
def get_user_project_ids(user_id):
"""Get all project IDs the user can access (owned + shared membership)."""
conn = get_db()
owned = [r["id"] for r in conn.execute(
"SELECT id FROM projects WHERE user_id = ? AND archived = 0", (user_id,)).fetchall()]
shared = [r["project_id"] for r in conn.execute(
"SELECT project_id FROM project_members WHERE user_id = ?", (user_id,)).fetchall()]
# Also include projects marked is_shared (visible to all)
global_shared = [r["id"] for r in conn.execute(
"SELECT id FROM projects WHERE is_shared = 1 AND archived = 0 AND user_id != ?", (user_id,)).fetchall()]
conn.close()
return list(set(owned + shared + global_shared))
# ── RRULE Parser ──
def advance_rrule(due_date_str, repeat_flag, from_date_str=None):
"""Given a due date and RRULE string, compute the next occurrence.
Returns ISO date string or None if recurrence is exhausted."""
if not repeat_flag:
return None
base = datetime.fromisoformat(due_date_str.replace("+0000", "+00:00").replace("Z", "+00:00"))
if from_date_str:
base = datetime.fromisoformat(from_date_str.replace("+0000", "+00:00").replace("Z", "+00:00"))
# Parse RRULE components
parts = {}
for segment in repeat_flag.replace("RRULE:", "").split(";"):
if "=" in segment:
k, v = segment.split("=", 1)
parts[k.upper()] = v
freq = parts.get("FREQ", "DAILY").upper()
interval = int(parts.get("INTERVAL", "1"))
until = parts.get("UNTIL")
count = parts.get("COUNT") # not enforced here, checked by caller
# Compute next date
if freq == "DAILY":
next_dt = base + timedelta(days=interval)
elif freq == "WEEKLY":
byday = parts.get("BYDAY", "")
if byday:
day_map = {"MO": 0, "TU": 1, "WE": 2, "TH": 3, "FR": 4, "SA": 5, "SU": 6}
target_days = sorted([day_map[d.strip()] for d in byday.split(",") if d.strip() in day_map])
if target_days:
current_wd = base.weekday()
# Find next target day
found = False
for td in target_days:
if td > current_wd:
next_dt = base + timedelta(days=(td - current_wd))
found = True
break
if not found:
# Wrap to first day of next week(s)
days_to_next = (7 * interval) - current_wd + target_days[0]
next_dt = base + timedelta(days=days_to_next)
else:
next_dt = base + timedelta(weeks=interval)
else:
next_dt = base + timedelta(weeks=interval)
elif freq == "MONTHLY":
bymonthday = parts.get("BYMONTHDAY")
month = base.month + interval
year = base.year + (month - 1) // 12
month = ((month - 1) % 12) + 1
day = int(bymonthday) if bymonthday else base.day
# Clamp day to valid range
import calendar
max_day = calendar.monthrange(year, month)[1]
day = min(day, max_day)
next_dt = base.replace(year=year, month=month, day=day)
elif freq == "YEARLY":
next_dt = base.replace(year=base.year + interval)
else:
return None
# Check UNTIL
if until:
try:
until_dt = datetime.strptime(until[:8], "%Y%m%d").replace(tzinfo=base.tzinfo)
if next_dt > until_dt:
return None
except ValueError:
pass
return next_dt.isoformat()
# ── Task helpers ──
def task_to_dict(row, project_name=""):
"""Convert a SQLite row to the API response format (TickTick-compatible field names)."""
return {
"id": row["id"],
"title": row["title"],
"content": row["content"] or "",
"projectId": row["project_id"],
"_projectName": project_name or "",
"_projectId": row["project_id"],
"dueDate": row["due_date"],
"startDate": row["start_date"],
"isAllDay": bool(row["is_all_day"]),
"priority": row["priority"],
"status": row["status"],
"repeatFlag": row["repeat_flag"] or "",
"completedAt": row["completed_at"],
"reminders": json.loads(row["reminders"] or "[]"),
"createdAt": row["created_at"],
"sortOrder": row["sort_order"],
}
def fetch_tasks_with_projects(conn, where_clause, params, user_id):
"""Fetch tasks joined with project names, scoped to user's accessible projects."""
project_ids = get_user_project_ids(user_id)
if not project_ids:
return []
placeholders = ",".join("?" * len(project_ids))
sql = f"""SELECT t.*, p.name as project_name FROM tasks t
JOIN projects p ON t.project_id = p.id
WHERE t.project_id IN ({placeholders}) AND {where_clause}
ORDER BY t.start_date IS NULL, t.start_date, t.due_date, t.sort_order"""
rows = conn.execute(sql, project_ids + list(params)).fetchall()
return [task_to_dict(r, r["project_name"]) for r in rows]
# ── HTTP Handler ──
class Handler(BaseHTTPRequestHandler):
def _read_body(self):
length = int(self.headers.get("Content-Length", 0))
return json.loads(self.rfile.read(length)) if length else {}
def _send_json(self, data, status=200):
body = json.dumps(data).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _send_error(self, msg, status=500):
self._send_json({"error": msg}, status)
def _parse_query(self):
qs = self.path.split("?", 1)[1] if "?" in self.path else ""
return dict(urllib.parse.parse_qsl(qs))
def _get_user(self):
"""Get user identity from gateway-injected headers."""
user_id = self.headers.get("X-Gateway-User-Id")
if not user_id:
return None
username = self.headers.get("X-Gateway-User-Name", "")
ensure_user(user_id, username, username)
return user_id
# ── GET ──
def do_GET(self):
path = self.path.split("?")[0]
if path == "/health":
self._send_json({"status": "ok"})
return
user_id = self._get_user()
if not user_id:
self._send_error("Unauthorized", 401)
return
# List projects
if path == "/api/projects":
try:
conn = get_db()
project_ids = get_user_project_ids(user_id)
if not project_ids:
self._send_json({"projects": []})
conn.close()
return
placeholders = ",".join("?" * len(project_ids))
rows = conn.execute(
f"SELECT * FROM projects WHERE id IN ({placeholders}) ORDER BY is_inbox DESC, sort_order, name",
project_ids).fetchall()
conn.close()
projects = [{"id": r["id"], "name": r["name"], "color": r["color"],
"isInbox": bool(r["is_inbox"]), "isShared": bool(r["is_shared"]),
"sortOrder": r["sort_order"]} for r in rows]
self._send_json({"projects": projects})
except Exception as e:
self._send_error(str(e))
return
# List all tasks (optional project filter)
if path == "/api/tasks":
try:
params = self._parse_query()
project_id = params.get("project_id")
conn = get_db()
if project_id:
tasks = fetch_tasks_with_projects(conn, "t.status = 0 AND t.project_id = ?", (project_id,), user_id)
else:
tasks = fetch_tasks_with_projects(conn, "t.status = 0", (), user_id)
conn.close()
self._send_json({"tasks": tasks})
except Exception as e:
self._send_error(str(e))
return
# Today + overdue (dashboard widget)
if path == "/api/today":
try:
conn = get_db()
now = datetime.now()
today_str = now.strftime("%Y-%m-%d")
all_active = fetch_tasks_with_projects(conn, "t.status = 0", (), user_id)
conn.close()
today_tasks = []
overdue_tasks = []
for t in all_active:
due = t.get("startDate") or t.get("dueDate")
if not due:
continue
due_date = due[:10]
if due_date == today_str:
today_tasks.append(t)
elif due_date < today_str:
overdue_tasks.append(t)
self._send_json({
"today": today_tasks,
"overdue": overdue_tasks,
"todayCount": len(today_tasks),
"overdueCount": len(overdue_tasks),
})
except Exception as e:
self._send_error(str(e))
return
# Completed tasks
if path == "/api/tasks/completed":
try:
conn = get_db()
tasks = fetch_tasks_with_projects(conn, "t.status = 2", (), user_id)
conn.close()
# Sort by completed_at descending
tasks.sort(key=lambda t: t.get("completedAt") or "", reverse=True)
self._send_json({"tasks": tasks[:50]})
except Exception as e:
self._send_error(str(e))
return
# List tags
if path == "/api/tags":
try:
conn = get_db()
rows = conn.execute("SELECT * FROM tags WHERE user_id = ? ORDER BY name", (user_id,)).fetchall()
conn.close()
self._send_json({"tags": [{"id": r["id"], "name": r["name"], "color": r["color"]} for r in rows]})
except Exception as e:
self._send_error(str(e))
return
# Get single project with tasks
if path.startswith("/api/projects/") and path.count("/") == 3:
project_id = path.split("/")[3]
try:
conn = get_db()
proj = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
if not proj:
self._send_error("Not found", 404)
conn.close()
return
tasks = fetch_tasks_with_projects(conn, "t.status = 0 AND t.project_id = ?", (project_id,), user_id)
conn.close()
self._send_json({"project": {"id": proj["id"], "name": proj["name"]}, "tasks": tasks})
except Exception as e:
self._send_error(str(e))
return
self._send_json({"error": "Not found"}, 404)
# ── POST ──
def do_POST(self):
path = self.path.split("?")[0]
body = self._read_body()
user_id = self._get_user()
if not user_id:
self._send_error("Unauthorized", 401)
return
# Create task
if path == "/api/tasks":
try:
title = body.get("title", "").strip()
if not title:
self._send_error("Title required", 400)
return
project_id = body.get("projectId") or get_inbox_id(user_id)
task_id = str(uuid.uuid4())
conn = get_db()
conn.execute("""INSERT INTO tasks (id, project_id, user_id, title, content, priority,
start_date, due_date, is_all_day, repeat_flag, reminders, sort_order)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(task_id, project_id, user_id, title,
body.get("content", ""),
body.get("priority", 0),
body.get("startDate"),
body.get("dueDate"),
1 if body.get("isAllDay", True) else 0,
body.get("repeatFlag"),
json.dumps(body.get("reminders", [])),
body.get("sortOrder", 0)))
# Handle tags
for tag_name in body.get("tags", []):
tag_id = str(uuid.uuid4())
conn.execute("INSERT OR IGNORE INTO tags (id, user_id, name) VALUES (?, ?, ?)",
(tag_id, user_id, tag_name))
tag_row = conn.execute("SELECT id FROM tags WHERE user_id = ? AND name = ?",
(user_id, tag_name)).fetchone()
if tag_row:
conn.execute("INSERT OR IGNORE INTO task_tags (task_id, tag_id) VALUES (?, ?)",
(task_id, tag_row["id"]))
conn.commit()
# Return created task
row = conn.execute("SELECT t.*, p.name as project_name FROM tasks t JOIN projects p ON t.project_id = p.id WHERE t.id = ?",
(task_id,)).fetchone()
conn.close()
self._send_json(task_to_dict(row, row["project_name"]), 201)
except Exception as e:
self._send_error(str(e))
return
# Complete task
if path.startswith("/api/tasks/") and path.endswith("/complete"):
task_id = path.split("/")[3]
try:
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
if not task:
self._send_error("Not found", 404)
conn.close()
return
now_str = datetime.now().isoformat()
# Mark complete
conn.execute("UPDATE tasks SET status = 2, completed_at = ?, updated_at = ? WHERE id = ?",
(now_str, now_str, task_id))
# Record completion
conn.execute("INSERT INTO completions (id, task_id, user_id, completed_at) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), task_id, user_id, now_str))
# Handle recurrence — spawn next instance
if task["repeat_flag"]:
base_date = task["due_date"] or task["start_date"]
if task["repeat_from"] == "completion":
base_date = now_str
next_date = advance_rrule(base_date, task["repeat_flag"])
if next_date:
new_id = str(uuid.uuid4())
# Calculate start_date offset if both existed
new_start = None
if task["start_date"] and task["due_date"]:
try:
orig_start = datetime.fromisoformat(task["start_date"].replace("+0000", "+00:00"))
orig_due = datetime.fromisoformat(task["due_date"].replace("+0000", "+00:00"))
new_due = datetime.fromisoformat(next_date.replace("+0000", "+00:00"))
offset = orig_due - orig_start
new_start = (new_due - offset).isoformat()
except:
new_start = next_date
elif task["start_date"]:
new_start = next_date
conn.execute("""INSERT INTO tasks (id, project_id, user_id, title, content, priority,
start_date, due_date, is_all_day, repeat_flag, repeat_from,
parent_task_id, reminders, sort_order)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(new_id, task["project_id"], task["user_id"], task["title"],
task["content"], task["priority"],
new_start, next_date, task["is_all_day"],
task["repeat_flag"], task["repeat_from"],
task["parent_task_id"] or task["id"],
task["reminders"], task["sort_order"]))
conn.commit()
conn.close()
self._send_json({"status": "completed"})
except Exception as e:
self._send_error(str(e))
return
# Create project
if path == "/api/projects":
try:
name = body.get("name", "").strip()
if not name:
self._send_error("Name required", 400)
return
project_id = str(uuid.uuid4())
conn = get_db()
conn.execute("INSERT INTO projects (id, user_id, name, color, is_shared) VALUES (?, ?, ?, ?, ?)",
(project_id, user_id, name, body.get("color", ""), 1 if body.get("isShared") else 0))
conn.commit()
conn.close()
self._send_json({"id": project_id, "name": name}, 201)
except Exception as e:
self._send_error(str(e))
return
# Create tag
if path == "/api/tags":
try:
name = body.get("name", "").strip()
if not name:
self._send_error("Name required", 400)
return
tag_id = str(uuid.uuid4())
conn = get_db()
conn.execute("INSERT INTO tags (id, user_id, name, color) VALUES (?, ?, ?, ?)",
(tag_id, user_id, name, body.get("color", "")))
conn.commit()
conn.close()
self._send_json({"id": tag_id, "name": name}, 201)
except Exception as e:
self._send_error(str(e))
return
# Share project
if path.startswith("/api/projects/") and path.endswith("/share"):
project_id = path.split("/")[3]
try:
target_user_id = body.get("userId")
if not target_user_id:
self._send_error("userId required", 400)
return
conn = get_db()
conn.execute("INSERT OR IGNORE INTO project_members (project_id, user_id) VALUES (?, ?)",
(project_id, target_user_id))
conn.commit()
conn.close()
self._send_json({"status": "shared"})
except Exception as e:
self._send_error(str(e))
return
self._send_json({"error": "Not found"}, 404)
# ── PATCH ──
def do_PATCH(self):
path = self.path.split("?")[0]
body = self._read_body()
user_id = self._get_user()
if not user_id:
self._send_error("Unauthorized", 401)
return
# Update task
if path.startswith("/api/tasks/") and path.count("/") == 3:
task_id = path.split("/")[3]
try:
conn = get_db()
task = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
if not task:
self._send_error("Not found", 404)
conn.close()
return
# Build SET clause from provided fields
updates = []
params = []
field_map = {
"title": "title", "content": "content", "priority": "priority",
"startDate": "start_date", "dueDate": "due_date",
"isAllDay": "is_all_day", "repeatFlag": "repeat_flag",
"projectId": "project_id", "sortOrder": "sort_order",
"reminders": "reminders",
}
for api_field, db_field in field_map.items():
if api_field in body:
val = body[api_field]
if api_field == "isAllDay":
val = 1 if val else 0
elif api_field == "reminders":
val = json.dumps(val)
updates.append(f"{db_field} = ?")
params.append(val)
if updates:
updates.append("updated_at = ?")
params.append(datetime.now().isoformat())
params.append(task_id)
conn.execute(f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?", params)
conn.commit()
row = conn.execute("SELECT t.*, p.name as project_name FROM tasks t JOIN projects p ON t.project_id = p.id WHERE t.id = ?",
(task_id,)).fetchone()
conn.close()
self._send_json(task_to_dict(row, row["project_name"]))
except Exception as e:
self._send_error(str(e))
return
# Update project
if path.startswith("/api/projects/") and path.count("/") == 3:
project_id = path.split("/")[3]
try:
conn = get_db()
updates = []
params = []
if "name" in body:
updates.append("name = ?")
params.append(body["name"])
if "color" in body:
updates.append("color = ?")
params.append(body["color"])
if updates:
updates.append("updated_at = ?")
params.append(datetime.now().isoformat())
params.append(project_id)
conn.execute(f"UPDATE projects SET {', '.join(updates)} WHERE id = ?", params)
conn.commit()
conn.close()
self._send_json({"status": "updated"})
except Exception as e:
self._send_error(str(e))
return
self._send_json({"error": "Not found"}, 404)
# ── DELETE ──
def do_DELETE(self):
path = self.path.split("?")[0]
user_id = self._get_user()
if not user_id:
self._send_error("Unauthorized", 401)
return
# Delete task
if path.startswith("/api/tasks/") and path.count("/") == 3:
task_id = path.split("/")[3]
try:
conn = get_db()
conn.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
conn.commit()
conn.close()
self._send_json({"status": "deleted"})
except Exception as e:
self._send_error(str(e))
return
# Delete/archive project
if path.startswith("/api/projects/") and path.count("/") == 3:
project_id = path.split("/")[3]
try:
conn = get_db()
proj = conn.execute("SELECT is_inbox FROM projects WHERE id = ?", (project_id,)).fetchone()
if proj and proj["is_inbox"]:
self._send_error("Cannot delete Inbox", 400)
conn.close()
return
conn.execute("UPDATE projects SET archived = 1, updated_at = ? WHERE id = ?",
(datetime.now().isoformat(), project_id))
conn.commit()
conn.close()
self._send_json({"status": "archived"})
except Exception as e:
self._send_error(str(e))
return
# Delete tag
if path.startswith("/api/tags/") and path.count("/") == 3:
tag_id = path.split("/")[3]
try:
conn = get_db()
conn.execute("DELETE FROM tags WHERE id = ? AND user_id = ?", (tag_id, user_id))
conn.commit()
conn.close()
self._send_json({"status": "deleted"})
except Exception as e:
self._send_error(str(e))
return
self._send_json({"error": "Not found"}, 404)
def log_message(self, format, *args):
pass
# ── Start ──
if __name__ == "__main__":
init_db()
print(f"Task Manager listening on port {PORT}")
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
server.serve_forever()