import asyncio from playwright.async_api import async_playwright from playwright_stealth import Stealth import os import urllib.request import zipfile import io import sys import argparse EXTENSION_ID = 'ekhagklcjbdpajgpjgmbionohlpdbjgc' # Fetch extension directly from the alternative Chrome Webstore endpoint EXTENSION_URL = f"https://clients2.google.com/service/update2/crx?response=redirect&os=mac&arch=x86-64&os_arch=x86-64&nacl_arch=x86-64&prod=chromecrx&prodchannel=&prodversion=114.0.5735.90&lang=en-US&acceptformat=crx3&x=id%3D{EXTENSION_ID}%26installsource%3Dondemand%26uc" EXTENSION_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'zotero_extension') USER_DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome_profile') print(EXTENSION_DIR) def setup_extension(): """Downloads and unpacks the Zotero Connector Chrome extension if not already present.""" if os.path.exists(EXTENSION_DIR) and os.path.exists(os.path.join(EXTENSION_DIR, 'manifest.json')): print("[*] Zotero Extension already unpacked locally.") return os.path.abspath(EXTENSION_DIR) print("[*] Downloading Zotero Connector") req = urllib.request.Request( EXTENSION_URL, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} ) with urllib.request.urlopen(req) as response: data = response.read() print("[*] Unpacking CRX file...") # .crx files are zip files with an extra header. Find the standard ZIP header (PK\x03\x04). zip_start = data.find(b'PK\x03\x04') if zip_start == -1: raise ValueError("Could not find ZIP header in downloaded CRX.") os.makedirs(EXTENSION_DIR, exist_ok=True) with zipfile.ZipFile(io.BytesIO(data[zip_start:])) as z: z.extractall(EXTENSION_DIR) print("[*] Zotero Extension setup complete.") return os.path.abspath(EXTENSION_DIR) async def save_to_zotero(url, headless_mode="new", library_name=None, collection_id=None): """Automates Chrome to load a URL and trigger Zotero Connector.""" extension_path = setup_extension() print(f"[*] Launching Chromium browser (headless={headless_mode}) with Zotero Connector...") # Prepare playwright arguments args = [ f"--disable-extensions-except={extension_path}", f"--load-extension={extension_path}", ] # Standard headless=True historically blocked extensions. # We use headless=False by default or can pass `--headless=new` parameter. if headless_mode == "new": args.append("--headless=new") playwright_headless = False elif headless_mode == "false": playwright_headless = False else: playwright_headless = True # May ignore extensions depending on Chromium version async with Stealth().use_async(async_playwright()) as p: browser_context = await p.chromium.launch_persistent_context( USER_DATA_DIR, headless=playwright_headless, args=args, viewport={'width': 1280, 'height': 800} ) # Close any welcome tabs the extension might open on first run await asyncio.sleep(2) if len(browser_context.pages) > 1: for p_ext in browser_context.pages[1:]: await p_ext.close() # Use the primary tab page = browser_context.pages[0] if browser_context.pages else await browser_context.new_page() print(f"[*] Navigating to {url}...") await page.goto(url, wait_until="load") print("[*] Page loaded. Waiting for Zotero translator to initialize...") # Give Zotero connection translator a moment to inject and detect the metadata await asyncio.sleep(3) print("[*] Finding Zotero Connector service worker...") worker = None for i in range(60): for w in browser_context.service_workers: if "background-worker.js" in w.url or "zotero" in w.url: worker = w break if worker: break await asyncio.sleep(0.5) if not worker: print("[!] Could not find Zotero extension service worker.") else: assert worker is not None print("[*] Triggering save via extension service worker evaluation...") save_result = await worker.evaluate('''async ({ libraryName, collectionId }) => { function normalizeCollectionId(value) { if (!value) return null; const trimmed = value.trim(); if (/^[cC]\\d+$/.test(trimmed)) return `C${trimmed.slice(1)}`; if (/^\\d+$/.test(trimmed)) return `C${trimmed}`; return trimmed; } function findOwningLibrary(targets, targetId) { let index = targets.findIndex((target) => target.id === targetId); if (index === -1) return null; let currentLevel = targets[index].level || 0; for (let i = index - 1; i >= 0; i--) { let candidate = targets[i]; let candidateLevel = candidate.level || 0; if (candidateLevel < currentLevel) { if (candidate.id.startsWith("L")) { return candidate; } currentLevel = candidateLevel; } } return targets[index].id.startsWith("L") ? targets[index] : null; } async function resolveTarget() { if (!libraryName && !collectionId) return null; let response = await Zotero.Connector.callMethod("getSelectedCollection", { switchToReadableLibrary: true }); let targets = response.targets || []; if (!targets.length) { throw new Error("Zotero did not return any selectable targets."); } let libraryTarget = null; if (libraryName) { let normalizedLibraryName = libraryName.trim().toLowerCase(); let matches = targets.filter((target) => target.id.startsWith("L") && target.name.trim().toLowerCase() === normalizedLibraryName ); if (!matches.length) { throw new Error(`Library '${libraryName}' was not found.`); } if (matches.length > 1) { throw new Error(`Library '${libraryName}' is ambiguous.`); } libraryTarget = matches[0]; } let collectionTarget = null; if (collectionId) { let normalizedCollectionId = normalizeCollectionId(collectionId); collectionTarget = targets.find((target) => target.id === normalizedCollectionId); if (!collectionTarget) { throw new Error(`Collection '${collectionId}' was not found.`); } if (libraryTarget) { let owningLibrary = findOwningLibrary(targets, collectionTarget.id); if (!owningLibrary || owningLibrary.id !== libraryTarget.id) { throw new Error( `Collection '${collectionId}' does not belong to library '${libraryName}'.` ); } } } return collectionTarget || libraryTarget; } let target = await resolveTarget(); let applyTargetToSession = async (sessionID) => { if (!target || !sessionID) return; await Zotero.Connector.callMethod("updateSession", { sessionID, target: target.id }); }; let originalCallMethodWithCookies = Zotero.Connector.callMethodWithCookies.bind(Zotero.Connector); let originalSaveStandaloneAttachment = Zotero.ItemSaver?.saveStandaloneAttachmentToZotero?.bind(Zotero.ItemSaver); Zotero.Connector.callMethodWithCookies = async function(method, payload, ...args) { let result = await originalCallMethodWithCookies(method, payload, ...args); if ((method === "saveItems" || method === "saveSnapshot") && payload?.sessionID) { await applyTargetToSession(payload.sessionID); } return result; }; if (originalSaveStandaloneAttachment) { Zotero.ItemSaver.saveStandaloneAttachmentToZotero = async function(attachment, sessionID, ...args) { let result = await originalSaveStandaloneAttachment(attachment, sessionID, ...args); await applyTargetToSession(sessionID); return result; }; } try { let tabs = await chrome.tabs.query({ active: true, currentWindow: true }); if (!tabs || tabs.length === 0) return {error: "No active tab found."}; let tab = tabs[0]; let tabInfo = Zotero.Connector_Browser.getTabInfo(tab.id); if (tabInfo && tabInfo.translators && tabInfo.translators.length) { let result = await Zotero.Connector_Browser.saveWithTranslator(tab, 0, {fallbackOnFailure: true}); return { ok: true, mode: "translator", result, target }; } else if (tabInfo) { let result = await Zotero.Connector_Browser.saveAsWebpage(tab, tabInfo.frameId, { snapshot: true }); return { ok: true, mode: "webpage", result, target }; } else { return {error: "No translator or webpage saving options available."}; } } catch(e) { return {error: e.message}; } finally { Zotero.Connector.callMethodWithCookies = originalCallMethodWithCookies; if (originalSaveStandaloneAttachment) { Zotero.ItemSaver.saveStandaloneAttachmentToZotero = originalSaveStandaloneAttachment; } } }''', {"libraryName": library_name, "collectionId": collection_id}) if not save_result or "error" in save_result: print(f"[!] Save trigger failed: {save_result.get('error') if save_result else 'Unknown error'}") else: save_mode = save_result.get("mode", "unknown") returned = save_result.get("result") target = save_result.get("target") print(f"[*] Save completed successfully via {save_mode}.") if target: print(f"[*] Save target: {target.get('name')} ({target.get('id')})") if returned is not None: print(f"[*] Save returned: {returned}") print("[*] Operation finished. Closing browser.") await browser_context.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automate Zotero Connector via Playwright.") parser.add_argument("url", nargs="?", default="https://arxiv.org/abs/1706.03762", help="URL to save to Zotero") parser.add_argument("--headed", action="store_true", help="Show browser UI visually instead of headless=new") parser.add_argument("--library-name", help="Save into the library with this exact name") parser.add_argument("--collection-id", help="Save into the collection with this ID, such as 13 or C13") args = parser.parse_args() headless_arg = "false" if args.headed else "new" asyncio.run( save_to_zotero( args.url, headless_mode=headless_arg, library_name=args.library_name, collection_id=args.collection_id, ) )