"""One-time migration: pull all TickTick tasks into the custom task manager SQLite DB.""" import json import os import sqlite3 import uuid import urllib.request from pathlib import Path TICKTICK_TOKEN = os.environ.get("TICKTICK_ACCESS_TOKEN", "") # Handle JSON-wrapped token try: parsed = json.loads(TICKTICK_TOKEN) TICKTICK_TOKEN = parsed.get("access_token", TICKTICK_TOKEN) except (json.JSONDecodeError, TypeError): pass TICKTICK_BASE = "https://api.ticktick.com/open/v1" DB_PATH = Path(os.environ.get("DB_PATH", "/app/data/tasks.db")) GATEWAY_USER_ID = os.environ.get("GATEWAY_USER_ID", "3") # Yusuf's gateway user ID GATEWAY_USER_NAME = os.environ.get("GATEWAY_USER_NAME", "Yusuf") def tt_request(path): url = f"{TICKTICK_BASE}/{path.lstrip('/')}" req = urllib.request.Request(url) req.add_header("Authorization", f"Bearer {TICKTICK_TOKEN}") with urllib.request.urlopen(req, timeout=15) as resp: return json.loads(resp.read()) def get_db(): conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA journal_mode = WAL") return conn def migrate(): if not TICKTICK_TOKEN: print("ERROR: TICKTICK_ACCESS_TOKEN not set") return conn = get_db() c = conn.cursor() # Ensure user exists existing = c.execute("SELECT id FROM users WHERE id = ?", (GATEWAY_USER_ID,)).fetchone() if not existing: c.execute("INSERT INTO users (id, username, display_name) VALUES (?, ?, ?)", (GATEWAY_USER_ID, GATEWAY_USER_NAME.lower(), GATEWAY_USER_NAME)) conn.commit() # Ensure Inbox exists inbox = c.execute("SELECT id FROM projects WHERE user_id = ? AND is_inbox = 1", (GATEWAY_USER_ID,)).fetchone() if not inbox: inbox_id = str(uuid.uuid4()) c.execute("INSERT INTO projects (id, user_id, name, is_inbox, sort_order) VALUES (?, ?, 'Inbox', 1, -1)", (inbox_id, GATEWAY_USER_ID)) conn.commit() inbox = c.execute("SELECT id FROM projects WHERE user_id = ? AND is_inbox = 1", (GATEWAY_USER_ID,)).fetchone() inbox_id = inbox["id"] # Fetch TickTick projects print("Fetching TickTick projects...") tt_projects = tt_request("/project") print(f" Found {len(tt_projects)} projects") # Map TickTick project IDs to our project IDs project_map = {} # tt_project_id -> our_project_id for tp in tt_projects: tt_id = tp["id"] name = tp.get("name", "Untitled") # Check if we already migrated this project (by name match) existing_proj = c.execute("SELECT id FROM projects WHERE user_id = ? AND name = ? AND is_inbox = 0", (GATEWAY_USER_ID, name)).fetchone() if existing_proj: project_map[tt_id] = existing_proj["id"] print(f" Project '{name}' already exists, skipping creation") else: new_id = str(uuid.uuid4()) is_shared = 1 if any(kw in name.lower() for kw in ["family", "shared"]) else 0 c.execute("INSERT INTO projects (id, user_id, name, is_shared, sort_order) VALUES (?, ?, ?, ?, ?)", (new_id, GATEWAY_USER_ID, name, is_shared, tp.get("sortOrder", 0))) project_map[tt_id] = new_id print(f" Created project '{name}' (shared={is_shared})") conn.commit() # Fetch all tasks from each project + inbox all_tasks = [] # Inbox print("Fetching Inbox tasks...") try: inbox_data = tt_request("/project/inbox/data") inbox_tasks = inbox_data.get("tasks", []) for t in inbox_tasks: t["_our_project_id"] = inbox_id t["_project_name"] = "Inbox" all_tasks.extend(inbox_tasks) print(f" Inbox: {len(inbox_tasks)} tasks") except Exception as e: print(f" Inbox error: {e}") # Other projects for tp in tt_projects: tt_id = tp["id"] name = tp.get("name", "?") try: data = tt_request(f"/project/{tt_id}/data") tasks = data.get("tasks", []) for t in tasks: t["_our_project_id"] = project_map.get(tt_id, inbox_id) t["_project_name"] = name all_tasks.extend(tasks) print(f" {name}: {len(tasks)} tasks") except Exception as e: print(f" {name} error: {e}") print(f"\nTotal tasks to migrate: {len(all_tasks)}") # Insert tasks migrated = 0 skipped = 0 for t in all_tasks: title = t.get("title", "").strip() if not title: skipped += 1 continue # Check for duplicate by title + project existing_task = c.execute( "SELECT id FROM tasks WHERE title = ? AND project_id = ? AND user_id = ?", (title, t["_our_project_id"], GATEWAY_USER_ID)).fetchone() if existing_task: skipped += 1 continue task_id = str(uuid.uuid4()) status = t.get("status", 0) completed_at = None if status != 0: completed_at = t.get("completedTime") or t.get("modifiedTime") c.execute("""INSERT INTO tasks (id, project_id, user_id, title, content, status, priority, start_date, due_date, is_all_day, completed_at, repeat_flag, sort_order, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (task_id, t["_our_project_id"], GATEWAY_USER_ID, title, t.get("content", ""), status, t.get("priority", 0), t.get("startDate"), t.get("dueDate"), 1 if t.get("isAllDay", True) else 0, completed_at, t.get("repeatFlag"), t.get("sortOrder", 0), t.get("createdTime") or t.get("modifiedTime"))) migrated += 1 conn.commit() conn.close() print(f"\nMigration complete!") print(f" Migrated: {migrated} tasks") print(f" Skipped: {skipped} (duplicates or empty)") print(f" Projects: {len(project_map) + 1} (including Inbox)") if __name__ == "__main__": migrate()