- Custom SQLite task manager replacing TickTick wrapper - 73 tasks migrated from TickTick across 15 projects - RRULE recurrence engine with lazy materialization - Dashboard tasks widget (desktop sidebar + mobile card) - Tasks page with project tabs, add/edit/complete/delete - Security: locked ports to localhost, removed old containers - Gitea Actions runner configured and all 3 CI jobs passing - Fixed mobile overflow on dashboard cards - iOS Capacitor app shell (Second Brain) - Frontend/backend guide docs for adding new services - TickTick Google Calendar sync re-authorized Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
177 lines
6.1 KiB
Python
177 lines
6.1 KiB
Python
"""One-time migration: pull all TickTick tasks into the custom task manager SQLite DB."""
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import uuid
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
TICKTICK_TOKEN = os.environ.get("TICKTICK_ACCESS_TOKEN", "")
|
|
# Handle JSON-wrapped token
|
|
try:
|
|
parsed = json.loads(TICKTICK_TOKEN)
|
|
TICKTICK_TOKEN = parsed.get("access_token", TICKTICK_TOKEN)
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
TICKTICK_BASE = "https://api.ticktick.com/open/v1"
|
|
DB_PATH = Path(os.environ.get("DB_PATH", "/app/data/tasks.db"))
|
|
GATEWAY_USER_ID = os.environ.get("GATEWAY_USER_ID", "3") # Yusuf's gateway user ID
|
|
GATEWAY_USER_NAME = os.environ.get("GATEWAY_USER_NAME", "Yusuf")
|
|
|
|
|
|
def tt_request(path):
|
|
url = f"{TICKTICK_BASE}/{path.lstrip('/')}"
|
|
req = urllib.request.Request(url)
|
|
req.add_header("Authorization", f"Bearer {TICKTICK_TOKEN}")
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
conn.execute("PRAGMA journal_mode = WAL")
|
|
return conn
|
|
|
|
|
|
def migrate():
|
|
if not TICKTICK_TOKEN:
|
|
print("ERROR: TICKTICK_ACCESS_TOKEN not set")
|
|
return
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Ensure user exists
|
|
existing = c.execute("SELECT id FROM users WHERE id = ?", (GATEWAY_USER_ID,)).fetchone()
|
|
if not existing:
|
|
c.execute("INSERT INTO users (id, username, display_name) VALUES (?, ?, ?)",
|
|
(GATEWAY_USER_ID, GATEWAY_USER_NAME.lower(), GATEWAY_USER_NAME))
|
|
conn.commit()
|
|
|
|
# Ensure Inbox exists
|
|
inbox = c.execute("SELECT id FROM projects WHERE user_id = ? AND is_inbox = 1", (GATEWAY_USER_ID,)).fetchone()
|
|
if not inbox:
|
|
inbox_id = str(uuid.uuid4())
|
|
c.execute("INSERT INTO projects (id, user_id, name, is_inbox, sort_order) VALUES (?, ?, 'Inbox', 1, -1)",
|
|
(inbox_id, GATEWAY_USER_ID))
|
|
conn.commit()
|
|
inbox = c.execute("SELECT id FROM projects WHERE user_id = ? AND is_inbox = 1", (GATEWAY_USER_ID,)).fetchone()
|
|
|
|
inbox_id = inbox["id"]
|
|
|
|
# Fetch TickTick projects
|
|
print("Fetching TickTick projects...")
|
|
tt_projects = tt_request("/project")
|
|
print(f" Found {len(tt_projects)} projects")
|
|
|
|
# Map TickTick project IDs to our project IDs
|
|
project_map = {} # tt_project_id -> our_project_id
|
|
|
|
for tp in tt_projects:
|
|
tt_id = tp["id"]
|
|
name = tp.get("name", "Untitled")
|
|
|
|
# Check if we already migrated this project (by name match)
|
|
existing_proj = c.execute("SELECT id FROM projects WHERE user_id = ? AND name = ? AND is_inbox = 0",
|
|
(GATEWAY_USER_ID, name)).fetchone()
|
|
if existing_proj:
|
|
project_map[tt_id] = existing_proj["id"]
|
|
print(f" Project '{name}' already exists, skipping creation")
|
|
else:
|
|
new_id = str(uuid.uuid4())
|
|
is_shared = 1 if any(kw in name.lower() for kw in ["family", "shared"]) else 0
|
|
c.execute("INSERT INTO projects (id, user_id, name, is_shared, sort_order) VALUES (?, ?, ?, ?, ?)",
|
|
(new_id, GATEWAY_USER_ID, name, is_shared, tp.get("sortOrder", 0)))
|
|
project_map[tt_id] = new_id
|
|
print(f" Created project '{name}' (shared={is_shared})")
|
|
|
|
conn.commit()
|
|
|
|
# Fetch all tasks from each project + inbox
|
|
all_tasks = []
|
|
|
|
# Inbox
|
|
print("Fetching Inbox tasks...")
|
|
try:
|
|
inbox_data = tt_request("/project/inbox/data")
|
|
inbox_tasks = inbox_data.get("tasks", [])
|
|
for t in inbox_tasks:
|
|
t["_our_project_id"] = inbox_id
|
|
t["_project_name"] = "Inbox"
|
|
all_tasks.extend(inbox_tasks)
|
|
print(f" Inbox: {len(inbox_tasks)} tasks")
|
|
except Exception as e:
|
|
print(f" Inbox error: {e}")
|
|
|
|
# Other projects
|
|
for tp in tt_projects:
|
|
tt_id = tp["id"]
|
|
name = tp.get("name", "?")
|
|
try:
|
|
data = tt_request(f"/project/{tt_id}/data")
|
|
tasks = data.get("tasks", [])
|
|
for t in tasks:
|
|
t["_our_project_id"] = project_map.get(tt_id, inbox_id)
|
|
t["_project_name"] = name
|
|
all_tasks.extend(tasks)
|
|
print(f" {name}: {len(tasks)} tasks")
|
|
except Exception as e:
|
|
print(f" {name} error: {e}")
|
|
|
|
print(f"\nTotal tasks to migrate: {len(all_tasks)}")
|
|
|
|
# Insert tasks
|
|
migrated = 0
|
|
skipped = 0
|
|
for t in all_tasks:
|
|
title = t.get("title", "").strip()
|
|
if not title:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Check for duplicate by title + project
|
|
existing_task = c.execute(
|
|
"SELECT id FROM tasks WHERE title = ? AND project_id = ? AND user_id = ?",
|
|
(title, t["_our_project_id"], GATEWAY_USER_ID)).fetchone()
|
|
if existing_task:
|
|
skipped += 1
|
|
continue
|
|
|
|
task_id = str(uuid.uuid4())
|
|
status = t.get("status", 0)
|
|
completed_at = None
|
|
if status != 0:
|
|
completed_at = t.get("completedTime") or t.get("modifiedTime")
|
|
|
|
c.execute("""INSERT INTO tasks (id, project_id, user_id, title, content, status, priority,
|
|
start_date, due_date, is_all_day, completed_at, repeat_flag, sort_order, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(task_id, t["_our_project_id"], GATEWAY_USER_ID, title,
|
|
t.get("content", ""),
|
|
status,
|
|
t.get("priority", 0),
|
|
t.get("startDate"),
|
|
t.get("dueDate"),
|
|
1 if t.get("isAllDay", True) else 0,
|
|
completed_at,
|
|
t.get("repeatFlag"),
|
|
t.get("sortOrder", 0),
|
|
t.get("createdTime") or t.get("modifiedTime")))
|
|
migrated += 1
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"\nMigration complete!")
|
|
print(f" Migrated: {migrated} tasks")
|
|
print(f" Skipped: {skipped} (duplicates or empty)")
|
|
print(f" Projects: {len(project_map) + 1} (including Inbox)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
migrate()
|