zotero-paper-fetcher/zotero_automator.py

169 lines
7.7 KiB
Python

import asyncio
from playwright.async_api import async_playwright
from playwright_stealth import Stealth
import os
import urllib.request
import zipfile
import io
import sys
import argparse
EXTENSION_ID = 'ekhagklcjbdpajgpjgmbionohlpdbjgc'
# Fetch extension directly from the alternative Chrome Webstore endpoint
EXTENSION_URL = f"https://clients2.google.com/service/update2/crx?response=redirect&os=mac&arch=x86-64&os_arch=x86-64&nacl_arch=x86-64&prod=chromecrx&prodchannel=&prodversion=114.0.5735.90&lang=en-US&acceptformat=crx3&x=id%3D{EXTENSION_ID}%26installsource%3Dondemand%26uc"
EXTENSION_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'zotero_extension')
USER_DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'chrome_profile')
print(EXTENSION_DIR)
def setup_extension():
"""Downloads and unpacks the Zotero Connector Chrome extension if not already present."""
if os.path.exists(EXTENSION_DIR) and os.path.exists(os.path.join(EXTENSION_DIR, 'manifest.json')):
print("[*] Zotero Extension already unpacked locally.")
return os.path.abspath(EXTENSION_DIR)
print("[*] Downloading Zotero Connector")
req = urllib.request.Request(
EXTENSION_URL,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
)
with urllib.request.urlopen(req) as response:
data = response.read()
print("[*] Unpacking CRX file...")
# .crx files are zip files with an extra header. Find the standard ZIP header (PK\x03\x04).
zip_start = data.find(b'PK\x03\x04')
if zip_start == -1:
raise ValueError("Could not find ZIP header in downloaded CRX.")
os.makedirs(EXTENSION_DIR, exist_ok=True)
with zipfile.ZipFile(io.BytesIO(data[zip_start:])) as z:
z.extractall(EXTENSION_DIR)
print("[*] Zotero Extension setup complete.")
return os.path.abspath(EXTENSION_DIR)
async def save_to_zotero(url, headless_mode="new"):
"""Automates Chrome to load a URL and trigger Zotero Connector."""
extension_path = setup_extension()
print(f"[*] Launching Chromium browser (headless={headless_mode}) with Zotero Connector...")
# Prepare playwright arguments
args = [
f"--disable-extensions-except={extension_path}",
f"--load-extension={extension_path}",
]
# Standard headless=True historically blocked extensions.
# We use headless=False by default or can pass `--headless=new` parameter.
if headless_mode == "new":
args.append("--headless=new")
playwright_headless = False
elif headless_mode == "false":
playwright_headless = False
else:
playwright_headless = True # May ignore extensions depending on Chromium version
async with Stealth().use_async(async_playwright()) as p:
browser_context = await p.chromium.launch_persistent_context(
USER_DATA_DIR,
headless=playwright_headless,
args=args,
viewport={'width': 1280, 'height': 800}
)
# Close any welcome tabs the extension might open on first run
await asyncio.sleep(2)
if len(browser_context.pages) > 1:
for p_ext in browser_context.pages[1:]:
await p_ext.close()
# Use the primary tab
page = browser_context.pages[0] if browser_context.pages else await browser_context.new_page()
print(f"[*] Navigating to {url}...")
await page.goto(url, wait_until="load")
print("[*] Page loaded. Waiting for Zotero translator to initialize...")
# Give Zotero connection translator a moment to inject and detect the metadata
await asyncio.sleep(3)
print("[*] Finding Zotero Connector service worker...")
worker = None
for i in range(10):
for w in browser_context.service_workers:
if EXTENSION_ID in w.url:
worker = w
break
if worker:
break
await asyncio.sleep(0.5)
if not worker:
print("[!] Could not find Zotero extension service worker.")
else:
assert worker is not None
print("[*] Triggering save via extension service worker evaluation...")
# We wrap the call in a try/catch and return a structured object so we can extract the sessionID
session_id = await worker.evaluate('''async () => {
try {
let tabs = await chrome.tabs.query({ active: true, currentWindow: true });
if (!tabs || tabs.length === 0) return {error: "No active tab found."};
let tab = tabs[0];
let tabInfo = Zotero.Connector_Browser.getTabInfo(tab.id);
if (tabInfo && tabInfo.translators && tabInfo.translators.length) {
await Zotero.Connector_Browser.saveWithTranslator(tab, 0, {fallbackOnFailure: true});
return { sessionID: tabInfo.instanceID }; // usually acts as sessionID
} else if (tabInfo) {
await Zotero.Connector_Browser.saveAsWebpage(tab, tabInfo.frameId, { snapshot: true });
return { sessionID: tabInfo.instanceID };
} else {
return {error: "No translator or webpage saving options available."};
}
} catch(e) {
return {error: e.message};
}
}''')
if not session_id or "error" in session_id:
print(f"[!] Save trigger failed: {session_id.get('error') if session_id else 'Unknown error'}")
else:
sid = session_id.get("sessionID")
print(f"[*] Save triggered (Session ID: {sid}). Polling for progress...")
# Poll for completion
max_polls = 60
for _ in range(max_polls):
await asyncio.sleep(1)
progress = await worker.evaluate('''async (sid) => {
try {
if (!Zotero.Connector) return {done: false, error: "Zotero.Connector uninitialized"};
let res = await Zotero.Connector.callMethod("sessionProgress", { sessionID: sid });
return res || {done: false};
} catch(e) {
return {done: true, error: e.message}; // typically throws when it's totally done/cleaned up
}
}''', sid)
if progress and progress.get("error"):
# Normally, when the progress tracker cleans up, callMethod("sessionProgress") throws
print(f"[*] Save completed or session ended. Error: {progress.get('error')}")
break
if progress and progress.get("done"):
print("[*] Translation and save processes finished successfully.")
break
print("[*] Operation finished. Closing browser.")
await browser_context.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automate Zotero Connector via Playwright.")
parser.add_argument("url", nargs="?", default="https://arxiv.org/abs/1706.03762", help="URL to save to Zotero")
parser.add_argument("--headed", action="store_true", help="Show browser UI visually instead of headless=new")
args = parser.parse_args()
headless_arg = "false" if args.headed else "new"
asyncio.run(save_to_zotero(args.url, headless_mode=headless_arg))