zotero-paper-fetcher/zotero_automator.py

356 lines
12 KiB
Python

import argparse
import asyncio
import io
import os
import tempfile
import urllib.request
import zipfile
from patchright.async_api import async_playwright
EXTENSION_ID = "ekhagklcjbdpajgpjgmbionohlpdbjgc"
EXTENSION_URL = (
"https://clients2.google.com/service/update2/crx"
"?response=redirect&os=mac&arch=x86-64&os_arch=x86-64&nacl_arch=x86-64"
"&prod=chromecrx&prodchannel=&prodversion=114.0.5735.90&lang=en-US"
f"&acceptformat=crx3&x=id%3D{EXTENSION_ID}%26installsource%3Dondemand%26uc"
)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
EXTENSION_DIR = os.path.join(BASE_DIR, "zotero_extension")
WINDOW_SIZE = {"width": 1280, "height": 800}
TRANSLATOR_WAIT_SECONDS = 3
WELCOME_TAB_WAIT_SECONDS = 2
POST_SAVE_WAIT_SECONDS = 10
SERVICE_WORKER_POLL_ATTEMPTS = 60
SERVICE_WORKER_POLL_INTERVAL = 0.5
SAVE_SCRIPT = r"""
async ({ libraryName, collectionId }) => {
function normalizeCollectionId(value) {
if (!value) return null;
const trimmed = value.trim();
if (/^[cC]\d+$/.test(trimmed)) return `C${trimmed.slice(1)}`;
if (/^\d+$/.test(trimmed)) return `C${trimmed}`;
return trimmed;
}
function findOwningLibrary(targets, targetId) {
const index = targets.findIndex((target) => target.id === targetId);
if (index === -1) return null;
let level = targets[index].level || 0;
for (let i = index - 1; i >= 0; i -= 1) {
const candidate = targets[i];
const candidateLevel = candidate.level || 0;
if (candidateLevel >= level) continue;
if (candidate.id.startsWith("L")) return candidate;
level = candidateLevel;
}
return targets[index].id.startsWith("L") ? targets[index] : null;
}
async function resolveTarget() {
if (!libraryName && !collectionId) return null;
const response = await Zotero.Connector.callMethod("getSelectedCollection", {
switchToReadableLibrary: true
});
const targets = response.targets || [];
if (!targets.length) {
throw new Error("Zotero did not return any selectable targets.");
}
let libraryTarget = null;
if (libraryName) {
const normalizedLibraryName = libraryName.trim().toLowerCase();
const matches = targets.filter(
(target) =>
target.id.startsWith("L") &&
target.name.trim().toLowerCase() === normalizedLibraryName
);
if (!matches.length) {
throw new Error(`Library '${libraryName}' was not found.`);
}
if (matches.length > 1) {
throw new Error(`Library '${libraryName}' is ambiguous.`);
}
libraryTarget = matches[0];
}
let collectionTarget = null;
if (collectionId) {
const normalizedCollectionId = normalizeCollectionId(collectionId);
collectionTarget = targets.find((target) => target.id === normalizedCollectionId);
if (!collectionTarget) {
throw new Error(`Collection '${collectionId}' was not found.`);
}
if (libraryTarget) {
const owningLibrary = findOwningLibrary(targets, collectionTarget.id);
if (!owningLibrary || owningLibrary.id !== libraryTarget.id) {
throw new Error(
`Collection '${collectionId}' does not belong to library '${libraryName}'.`
);
}
}
}
return collectionTarget || libraryTarget;
}
function installSessionHooks(target) {
const originalCallMethodWithCookies =
Zotero.Connector.callMethodWithCookies.bind(Zotero.Connector);
const originalSaveStandaloneAttachment =
Zotero.ItemSaver?.saveStandaloneAttachmentToZotero?.bind(Zotero.ItemSaver);
async function applyTargetToSession(sessionID) {
if (!target || !sessionID) return;
await Zotero.Connector.callMethod("updateSession", {
sessionID,
target: target.id
});
}
Zotero.Connector.callMethodWithCookies = async function(method, payload, ...args) {
const result = await originalCallMethodWithCookies(method, payload, ...args);
if ((method === "saveItems" || method === "saveSnapshot") && payload?.sessionID) {
await applyTargetToSession(payload.sessionID);
}
return result;
};
if (originalSaveStandaloneAttachment) {
Zotero.ItemSaver.saveStandaloneAttachmentToZotero = async function(
attachment,
sessionID,
...args
) {
const result = await originalSaveStandaloneAttachment(
attachment,
sessionID,
...args
);
await applyTargetToSession(sessionID);
return result;
};
}
return () => {
Zotero.Connector.callMethodWithCookies = originalCallMethodWithCookies;
if (originalSaveStandaloneAttachment) {
Zotero.ItemSaver.saveStandaloneAttachmentToZotero =
originalSaveStandaloneAttachment;
}
};
}
async function runSave() {
const isOnline = await Zotero.Connector.checkIsOnline();
if (!isOnline) {
return { error: "Zotero Connector is offline." };
}
const tabs = await chrome.tabs.query({ active: true, currentWindow: true });
if (!tabs?.length) {
return { error: "No active tab found." };
}
const tab = tabs[0];
const tabInfo = Zotero.Connector_Browser.getTabInfo(tab.id);
if (!tabInfo) {
return { error: "No translator or webpage saving options available." };
}
if (tabInfo.translators?.length) {
const result = await Zotero.Connector_Browser.saveWithTranslator(tab, 0, {
fallbackOnFailure: true
});
return { ok: true, mode: "translator", result };
}
const result = await Zotero.Connector_Browser.saveAsWebpage(tab, tabInfo.frameId, {
snapshot: true
});
return { ok: true, mode: "webpage", result };
}
try {
const target = await resolveTarget();
const restoreHooks = installSessionHooks(target);
try {
const result = await runSave();
return { ...result, target };
} finally {
restoreHooks();
}
} catch (error) {
return { error: error.message };
}
}
"""
def setup_extension():
"""Download and unpack the Zotero Connector extension if needed."""
manifest_path = os.path.join(EXTENSION_DIR, "manifest.json")
if os.path.exists(manifest_path):
print("[*] Zotero Extension already unpacked locally.")
return os.path.abspath(EXTENSION_DIR)
print("[*] Downloading Zotero Connector")
request = urllib.request.Request(
EXTENSION_URL,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"},
)
with urllib.request.urlopen(request) as response:
data = response.read()
print("[*] Unpacking CRX file...")
zip_start = data.find(b"PK\x03\x04")
if zip_start == -1:
raise ValueError("Could not find ZIP header in downloaded CRX.")
os.makedirs(EXTENSION_DIR, exist_ok=True)
with zipfile.ZipFile(io.BytesIO(data[zip_start:])) as archive:
archive.extractall(EXTENSION_DIR)
print("[*] Zotero Extension setup complete.")
return os.path.abspath(EXTENSION_DIR)
def get_browser_launch_config(extension_path, headless_mode):
args = [
f"--disable-extensions-except={extension_path}",
f"--load-extension={extension_path}",
]
if headless_mode == "new":
args.append("--headless=new")
return False, args
if headless_mode == "false":
return False, args
return True, args
async def close_extra_pages(browser_context):
await asyncio.sleep(WELCOME_TAB_WAIT_SECONDS)
for extra_page in browser_context.pages[1:]:
await extra_page.close()
async def get_primary_page(browser_context):
await close_extra_pages(browser_context)
if browser_context.pages:
return browser_context.pages[0]
return await browser_context.new_page()
async def wait_for_service_worker(browser_context):
print("[*] Finding Zotero Connector service worker...")
for _ in range(SERVICE_WORKER_POLL_ATTEMPTS):
for worker in browser_context.service_workers:
if "background-worker.js" in worker.url or "zotero" in worker.url:
return worker
await asyncio.sleep(SERVICE_WORKER_POLL_INTERVAL)
return None
async def navigate_to_page(page, url):
print(f"[*] Navigating to {url}...")
await page.goto(url, wait_until="load")
print("[*] Page loaded. Waiting for Zotero translator to initialize...")
await asyncio.sleep(TRANSLATOR_WAIT_SECONDS)
def print_save_result(save_result):
if not save_result or "error" in save_result:
error = save_result.get("error") if save_result else "Unknown error"
print(f"[!] Save trigger failed: {error}")
return
print(f"[*] Save completed successfully via {save_result.get('mode', 'unknown')}.")
target = save_result.get("target")
if target:
print(f"[*] Save target: {target.get('name')} ({target.get('id')})")
if save_result.get("result") is not None:
print(f"[*] Save returned: {save_result['result']}")
async def save_to_zotero(url, headless_mode="new", library_name=None, collection_id=None):
extension_path = setup_extension()
playwright_headless, browser_args = get_browser_launch_config(extension_path, headless_mode)
print(f"[*] Launching Chromium browser (headless={headless_mode}) with Zotero Connector...")
async with async_playwright() as playwright:
with tempfile.TemporaryDirectory(prefix="zotero-paper-fetcher-") as user_data_dir:
browser_context = await playwright.chromium.launch_persistent_context(
user_data_dir,
headless=playwright_headless,
args=browser_args,
viewport=WINDOW_SIZE,
)
try:
page = await get_primary_page(browser_context)
await navigate_to_page(page, url)
worker = await wait_for_service_worker(browser_context)
if not worker:
print("[!] Could not find Zotero extension service worker.")
return
print("[*] Triggering save via extension service worker evaluation...")
save_result = await worker.evaluate(
SAVE_SCRIPT,
{"libraryName": library_name, "collectionId": collection_id},
)
print_save_result(save_result)
print("[*] Waiting 5 seconds for any delayed connector activity...")
await asyncio.sleep(POST_SAVE_WAIT_SECONDS)
finally:
print("[*] Operation finished. Closing browser.")
await browser_context.close()
def parse_args():
parser = argparse.ArgumentParser(description="Automate Zotero Connector via Playwright.")
parser.add_argument(
"url",
nargs="?",
default="https://arxiv.org/abs/1706.03762",
help="URL to save to Zotero",
)
parser.add_argument(
"--headed",
action="store_true",
help="Show browser UI visually instead of headless=new",
)
parser.add_argument("--library-name", help="Save into the library with this exact name")
parser.add_argument(
"--collection-id",
help="Save into the collection with this ID, such as 13 or C13",
)
return parser.parse_args()
def main():
print(EXTENSION_DIR)
args = parse_args()
headless_mode = "false" if args.headed else "new"
asyncio.run(
save_to_zotero(
args.url,
headless_mode=headless_mode,
library_name=args.library_name,
collection_id=args.collection_id,
)
)
if __name__ == "__main__":
main()