import argparse import asyncio import io import json import os import tempfile import urllib.request import zipfile from datetime import datetime from patchright.async_api import async_playwright EXTENSION_ID = "ekhagklcjbdpajgpjgmbionohlpdbjgc" EXTENSION_URL = ( "https://clients2.google.com/service/update2/crx" "?response=redirect&os=mac&arch=x86-64&os_arch=x86-64&nacl_arch=x86-64" "&prod=chromecrx&prodchannel=&prodversion=114.0.5735.90&lang=en-US" f"&acceptformat=crx3&x=id%3D{EXTENSION_ID}%26installsource%3Dondemand%26uc" ) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) EXTENSION_DIR = os.path.join(BASE_DIR, "zotero_extension") WINDOW_SIZE = {"width": 1280, "height": 800} TRANSLATOR_WAIT_SECONDS = 3 WELCOME_TAB_WAIT_SECONDS = 2 POST_SAVE_WAIT_SECONDS = 5 SERVICE_WORKER_POLL_ATTEMPTS = 60 SERVICE_WORKER_POLL_INTERVAL = 0.5 SERVICE_WORKER_DEBUG_ATTEMPTS = {0, 5, 10, 20, 40, 59} SAVE_SCRIPT = r""" async ({ libraryName, collectionId }) => { const debug = []; const pushDebug = (label, value = null) => debug.push({ label, value }); function normalizeCollectionId(value) { if (!value) return null; const trimmed = value.trim(); if (/^[cC]\d+$/.test(trimmed)) return `C${trimmed.slice(1)}`; if (/^\d+$/.test(trimmed)) return `C${trimmed}`; return trimmed; } function summarizeTargets(targets) { return targets.map((target) => ({ id: target.id, name: target.name, level: target.level })); } function summarizeTranslators(translators = []) { return translators.map((translator) => ({ translatorID: translator.translatorID, label: translator.label, itemType: translator.itemType, priority: translator.priority })); } function findOwningLibrary(targets, targetId) { const index = targets.findIndex((target) => target.id === targetId); if (index === -1) return null; let level = targets[index].level || 0; for (let i = index - 1; i >= 0; i -= 1) { const candidate = targets[i]; const candidateLevel = candidate.level || 0; if (candidateLevel >= level) continue; if (candidate.id.startsWith("L")) return candidate; level = candidateLevel; } return targets[index].id.startsWith("L") ? targets[index] : null; } async function resolveTarget() { if (!libraryName && !collectionId) { pushDebug("resolved target", null); return null; } const response = await Zotero.Connector.callMethod("getSelectedCollection", { switchToReadableLibrary: true }); const targets = response.targets || []; pushDebug("available targets", summarizeTargets(targets)); if (!targets.length) { throw new Error("Zotero did not return any selectable targets."); } let libraryTarget = null; if (libraryName) { const normalizedLibraryName = libraryName.trim().toLowerCase(); const matches = targets.filter( (target) => target.id.startsWith("L") && target.name.trim().toLowerCase() === normalizedLibraryName ); if (!matches.length) { throw new Error(`Library '${libraryName}' was not found.`); } if (matches.length > 1) { throw new Error(`Library '${libraryName}' is ambiguous.`); } libraryTarget = matches[0]; } let collectionTarget = null; if (collectionId) { const normalizedCollectionId = normalizeCollectionId(collectionId); collectionTarget = targets.find((target) => target.id === normalizedCollectionId); if (!collectionTarget) { throw new Error(`Collection '${collectionId}' was not found.`); } if (libraryTarget) { const owningLibrary = findOwningLibrary(targets, collectionTarget.id); if (!owningLibrary || owningLibrary.id !== libraryTarget.id) { throw new Error( `Collection '${collectionId}' does not belong to library '${libraryName}'.` ); } } } const target = collectionTarget || libraryTarget; pushDebug("resolved target", target ? { id: target.id, name: target.name } : null); return target; } function summarizeTabInfo(tabInfo) { if (!tabInfo) return null; return { url: tabInfo.url, isPDF: Boolean(tabInfo.isPDF), frameId: tabInfo.frameId, translatorCount: tabInfo.translators?.length || 0, translators: summarizeTranslators(tabInfo.translators) }; } function installSessionHooks(target) { const originalCallMethodWithCookies = Zotero.Connector.callMethodWithCookies.bind(Zotero.Connector); const originalSaveStandaloneAttachment = Zotero.ItemSaver?.saveStandaloneAttachmentToZotero?.bind(Zotero.ItemSaver); async function applyTargetToSession(sessionID) { if (!target || !sessionID) return; pushDebug("apply target to session", { sessionID, targetId: target.id }); await Zotero.Connector.callMethod("updateSession", { sessionID, target: target.id }); } Zotero.Connector.callMethodWithCookies = async function(method, payload, ...args) { pushDebug("callMethodWithCookies request", { method, hasPayload: Boolean(payload), sessionID: payload?.sessionID || null }); const result = await originalCallMethodWithCookies(method, payload, ...args); pushDebug("callMethodWithCookies response", { method, result }); if ((method === "saveItems" || method === "saveSnapshot") && payload?.sessionID) { await applyTargetToSession(payload.sessionID); } return result; }; if (originalSaveStandaloneAttachment) { Zotero.ItemSaver.saveStandaloneAttachmentToZotero = async function( attachment, sessionID, ...args ) { pushDebug("saveStandaloneAttachmentToZotero request", { title: attachment?.title || null, url: attachment?.url || null, sessionID }); const result = await originalSaveStandaloneAttachment( attachment, sessionID, ...args ); pushDebug("saveStandaloneAttachmentToZotero response", result); await applyTargetToSession(sessionID); return result; }; } return () => { Zotero.Connector.callMethodWithCookies = originalCallMethodWithCookies; if (originalSaveStandaloneAttachment) { Zotero.ItemSaver.saveStandaloneAttachmentToZotero = originalSaveStandaloneAttachment; } }; } async function runSave() { pushDebug("connector online", await Zotero.Connector.checkIsOnline()); const tabs = await chrome.tabs.query({ active: true, currentWindow: true }); if (!tabs?.length) { return { error: "No active tab found.", debug }; } const tab = tabs[0]; pushDebug("active tab", { id: tab.id, url: tab.url, title: tab.title, status: tab.status }); const tabInfo = Zotero.Connector_Browser.getTabInfo(tab.id); pushDebug("tab info", summarizeTabInfo(tabInfo)); if (!tabInfo) { return { error: "No translator or webpage saving options available.", debug }; } if (tabInfo.translators?.length) { const result = await Zotero.Connector_Browser.saveWithTranslator(tab, 0, { fallbackOnFailure: true }); pushDebug("saveWithTranslator result", result); return { ok: true, mode: "translator", result, debug }; } const result = await Zotero.Connector_Browser.saveAsWebpage(tab, tabInfo.frameId, { snapshot: true }); pushDebug("saveAsWebpage result", result); return { ok: true, mode: "webpage", result, debug }; } try { const target = await resolveTarget(); const restoreHooks = installSessionHooks(target); try { const result = await runSave(); return { ...result, target }; } finally { restoreHooks(); } } catch (error) { pushDebug("caught error", { message: error.message, stack: error.stack }); return { error: error.message, debug }; } } """ def debug_log(label, value=None): timestamp = datetime.now().strftime("%H:%M:%S") if value is None: print(f"[debug {timestamp}] {label}") return if isinstance(value, (dict, list, tuple)): try: value = json.dumps(value, ensure_ascii=True, default=str, indent=2) except TypeError: value = repr(value) print(f"[debug {timestamp}] {label}: {value}") def setup_extension(): """Download and unpack the Zotero Connector extension if needed.""" manifest_path = os.path.join(EXTENSION_DIR, "manifest.json") if os.path.exists(manifest_path): print("[*] Zotero Extension already unpacked locally.") return os.path.abspath(EXTENSION_DIR) print("[*] Downloading Zotero Connector") request = urllib.request.Request( EXTENSION_URL, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}, ) with urllib.request.urlopen(request) as response: data = response.read() print("[*] Unpacking CRX file...") zip_start = data.find(b"PK\x03\x04") if zip_start == -1: raise ValueError("Could not find ZIP header in downloaded CRX.") os.makedirs(EXTENSION_DIR, exist_ok=True) with zipfile.ZipFile(io.BytesIO(data[zip_start:])) as archive: archive.extractall(EXTENSION_DIR) print("[*] Zotero Extension setup complete.") return os.path.abspath(EXTENSION_DIR) def get_browser_launch_config(extension_path, headless_mode): args = [ f"--disable-extensions-except={extension_path}", f"--load-extension={extension_path}", ] if headless_mode == "new": args.append("--headless=new") playwright_headless = False elif headless_mode == "false": playwright_headless = False else: playwright_headless = True return playwright_headless, args async def close_extra_pages(browser_context): await asyncio.sleep(WELCOME_TAB_WAIT_SECONDS) extra_pages = browser_context.pages[1:] if not extra_pages: return debug_log("closing extra tabs", [page.url for page in extra_pages]) for extra_page in extra_pages: await extra_page.close() async def get_primary_page(browser_context): await close_extra_pages(browser_context) page = browser_context.pages[0] if browser_context.pages else await browser_context.new_page() debug_log( "active page before navigation", {"url": page.url, "page_count": len(browser_context.pages)}, ) return page async def wait_for_service_worker(browser_context): print("[*] Finding Zotero Connector service worker...") for attempt in range(SERVICE_WORKER_POLL_ATTEMPTS): workers = list(browser_context.service_workers) if attempt in SERVICE_WORKER_DEBUG_ATTEMPTS: debug_log( "service worker poll", { "attempt": attempt + 1, "known_workers": [worker.url for worker in workers], }, ) for worker in workers: if "background-worker.js" in worker.url or "zotero" in worker.url: debug_log("selected service worker", worker.url) return worker await asyncio.sleep(SERVICE_WORKER_POLL_INTERVAL) return None async def navigate_to_page(page, url, browser_context): print(f"[*] Navigating to {url}...") response = await page.goto(url, wait_until="load") debug_log( "navigation result", { "response_url": response.url if response else None, "status": response.status if response else None, "final_page_url": page.url, "title": await page.title(), }, ) print("[*] Page loaded. Waiting for Zotero translator to initialize...") await asyncio.sleep(TRANSLATOR_WAIT_SECONDS) debug_log( "post-load page snapshot", { "url": page.url, "title": await page.title(), "service_workers": [worker.url for worker in browser_context.service_workers], }, ) def log_save_result(save_result): if not save_result or "error" in save_result: error = save_result.get("error") if save_result else "Unknown error" print(f"[!] Save trigger failed: {error}") else: print(f"[*] Save completed successfully via {save_result.get('mode', 'unknown')}.") target = save_result.get("target") if target: print(f"[*] Save target: {target.get('name')} ({target.get('id')})") if save_result.get("result") is not None: print(f"[*] Save returned: {save_result['result']}") debug_log("save_result", save_result) for index, entry in enumerate(save_result.get("debug", []), start=1): debug_log(f"worker debug #{index} {entry.get('label')}", entry.get("value")) async def save_to_zotero(url, headless_mode="new", library_name=None, collection_id=None): extension_path = setup_extension() playwright_headless, browser_args = get_browser_launch_config(extension_path, headless_mode) print(f"[*] Launching Chromium browser (headless={headless_mode}) with Zotero Connector...") debug_log( "launch configuration", {"playwright_headless": playwright_headless, "args": browser_args}, ) async with async_playwright() as playwright: with tempfile.TemporaryDirectory(prefix="zotero-paper-fetcher-") as user_data_dir: debug_log( "save_to_zotero arguments", { "url": url, "headless_mode": headless_mode, "library_name": library_name, "collection_id": collection_id, "extension_path": extension_path, "user_data_dir": user_data_dir, }, ) browser_context = await playwright.chromium.launch_persistent_context( user_data_dir, headless=playwright_headless, args=browser_args, viewport=WINDOW_SIZE, ) try: debug_log( "temporary context launched", { "initial_page_count": len(browser_context.pages), "service_worker_count": len(browser_context.service_workers), "user_data_dir": user_data_dir, }, ) page = await get_primary_page(browser_context) await navigate_to_page(page, url, browser_context) worker = await wait_for_service_worker(browser_context) if not worker: print("[!] Could not find Zotero extension service worker.") return print("[*] Triggering save via extension service worker evaluation...") save_result = await worker.evaluate( SAVE_SCRIPT, {"libraryName": library_name, "collectionId": collection_id}, ) log_save_result(save_result) print("[*] Waiting 5 seconds for any delayed connector activity...") await asyncio.sleep(POST_SAVE_WAIT_SECONDS) finally: print("[*] Operation finished. Closing browser.") await browser_context.close() def parse_args(): parser = argparse.ArgumentParser(description="Automate Zotero Connector via Playwright.") parser.add_argument( "url", nargs="?", default="https://arxiv.org/abs/1706.03762", help="URL to save to Zotero", ) parser.add_argument( "--headed", action="store_true", help="Show browser UI visually instead of headless=new", ) parser.add_argument("--library-name", help="Save into the library with this exact name") parser.add_argument( "--collection-id", help="Save into the collection with this ID, such as 13 or C13", ) return parser.parse_args() def main(): print(EXTENSION_DIR) args = parse_args() headless_mode = "false" if args.headed else "new" asyncio.run( save_to_zotero( args.url, headless_mode=headless_mode, library_name=args.library_name, collection_id=args.collection_id, ) ) if __name__ == "__main__": main()