From f6d476058fb0e0ecf0184fe4af9462ed0e51b57a Mon Sep 17 00:00:00 2001 From: DankerMu Date: Tue, 27 Jan 2026 08:18:15 +0800 Subject: [PATCH] Add sync/async PageIndex API clients --- pageindex/__init__.py | 9 +- pageindex/client.py | 240 +++++++++++++++++++++++++++++++++ requirements.txt | 1 + tests/test_pageindex_client.py | 75 +++++++++++ 4 files changed, 323 insertions(+), 2 deletions(-) create mode 100644 pageindex/client.py create mode 100644 tests/test_pageindex_client.py diff --git a/pageindex/__init__.py b/pageindex/__init__.py index 4606eb396..23f0a9c64 100644 --- a/pageindex/__init__.py +++ b/pageindex/__init__.py @@ -1,2 +1,7 @@ -from .page_index import * -from .page_index_md import md_to_tree \ No newline at end of file +from .client import AsyncPageIndexClient, PageIndexClient + +try: + from .page_index import * + from .page_index_md import md_to_tree +except ImportError: + pass diff --git a/pageindex/client.py b/pageindex/client.py new file mode 100644 index 000000000..53e5c4886 --- /dev/null +++ b/pageindex/client.py @@ -0,0 +1,240 @@ +import os +from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Union + +import httpx + + +DocId = Union[str, List[str]] +Message = Dict[str, Any] + + +class PageIndexClient: + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + timeout: float = 60.0, + transport: Optional[httpx.BaseTransport] = None, + ) -> None: + self.api_key = api_key or os.getenv("PAGEINDEX_API_KEY") + self.base_url = (base_url or os.getenv("PAGEINDEX_BASE_URL") or "https://api.pageindex.ai").rstrip("/") + + headers: Dict[str, str] = {} + if self.api_key: + headers["api_key"] = self.api_key + + self._client = httpx.Client( + base_url=self.base_url, + headers=headers, + timeout=timeout, + transport=transport, + ) + + def close(self) -> None: + self._client.close() + + def __enter__(self) -> "PageIndexClient": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + def _request_json(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]: + response = self._client.request(method, url, **kwargs) + response.raise_for_status() + if not response.content: + return {} + return response.json() + + def submit_document(self, file_path: str) -> Dict[str, Any]: + with open(file_path, "rb") as file: + files = {"file": (os.path.basename(file_path), file, "application/pdf")} + return self._request_json("POST", "/doc/", files=files) + + def submit_markdown(self, file_path: str) -> Dict[str, Any]: + with open(file_path, "rb") as file: + files = {"file": (os.path.basename(file_path), file, "text/markdown")} + return self._request_json("POST", "/markdown/", files=files) + + def get_document(self, doc_id: str) -> Dict[str, Any]: + return self._request_json("GET", f"/doc/{doc_id}/") + + def get_document_status(self, doc_id: str) -> Optional[str]: + return self.get_document(doc_id).get("status") + + def get_tree(self, doc_id: str, summary: bool = False) -> Dict[str, Any]: + params = {"type": "tree", "summary": summary} + return self._request_json("GET", f"/doc/{doc_id}/", params=params) + + def get_ocr(self, doc_id: str, format: str = "page") -> Dict[str, Any]: + params = {"type": "ocr", "format": format} + return self._request_json("GET", f"/doc/{doc_id}/", params=params) + + def delete_document(self, doc_id: str) -> Dict[str, Any]: + response = self._client.request("DELETE", f"/doc/{doc_id}/") + response.raise_for_status() + if not response.content: + return {"status": "deleted"} + return response.json() + + def is_retrieval_ready(self, doc_id: str) -> bool: + try: + result = self.get_tree(doc_id) + except httpx.HTTPError: + return False + return bool(result.get("retrieval_ready")) + + def submit_retrieval_query(self, doc_id: str, query: str, thinking: bool = False) -> Dict[str, Any]: + payload = {"doc_id": doc_id, "query": query, "thinking": thinking} + return self._request_json("POST", "/retrieval/", json=payload) + + def get_retrieval_result(self, retrieval_id: str) -> Dict[str, Any]: + return self._request_json("GET", f"/retrieval/{retrieval_id}/") + + def chat_completions(self, messages: List[Message], doc_id: Optional[DocId] = None) -> Dict[str, Any]: + payload: Dict[str, Any] = {"messages": messages, "stream": False} + if doc_id is not None: + payload["doc_id"] = doc_id + return self._request_json("POST", "/chat/completions", json=payload) + + def chat_completions_stream( + self, messages: List[Message], doc_id: Optional[DocId] = None + ) -> Iterator[Dict[str, Any]]: + payload: Dict[str, Any] = {"messages": messages, "stream": True} + if doc_id is not None: + payload["doc_id"] = doc_id + + with self._client.stream("POST", "/chat/completions", json=payload) as response: + response.raise_for_status() + for line in response.iter_lines(): + if not line: + continue + if isinstance(line, bytes): + line = line.decode("utf-8", errors="replace") + if not line.startswith("data: "): + continue + data = line[6:] + if data == "[DONE]": + break + yield json_loads_safe(data) + + +class AsyncPageIndexClient: + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + timeout: float = 60.0, + transport: Optional[httpx.AsyncBaseTransport] = None, + ) -> None: + self.api_key = api_key or os.getenv("PAGEINDEX_API_KEY") + self.base_url = (base_url or os.getenv("PAGEINDEX_BASE_URL") or "https://api.pageindex.ai").rstrip("/") + + headers: Dict[str, str] = {} + if self.api_key: + headers["api_key"] = self.api_key + + self._client = httpx.AsyncClient( + base_url=self.base_url, + headers=headers, + timeout=timeout, + transport=transport, + ) + + async def aclose(self) -> None: + await self._client.aclose() + + async def __aenter__(self) -> "AsyncPageIndexClient": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.aclose() + + async def _request_json(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]: + response = await self._client.request(method, url, **kwargs) + response.raise_for_status() + if not response.content: + return {} + return response.json() + + async def submit_document(self, file_path: str) -> Dict[str, Any]: + with open(file_path, "rb") as file: + files = {"file": (os.path.basename(file_path), file.read(), "application/pdf")} + return await self._request_json("POST", "/doc/", files=files) + + async def submit_markdown(self, file_path: str) -> Dict[str, Any]: + with open(file_path, "rb") as file: + files = {"file": (os.path.basename(file_path), file.read(), "text/markdown")} + return await self._request_json("POST", "/markdown/", files=files) + + async def get_document(self, doc_id: str) -> Dict[str, Any]: + return await self._request_json("GET", f"/doc/{doc_id}/") + + async def get_document_status(self, doc_id: str) -> Optional[str]: + return (await self.get_document(doc_id)).get("status") + + async def get_tree(self, doc_id: str, summary: bool = False) -> Dict[str, Any]: + params = {"type": "tree", "summary": summary} + return await self._request_json("GET", f"/doc/{doc_id}/", params=params) + + async def get_ocr(self, doc_id: str, format: str = "page") -> Dict[str, Any]: + params = {"type": "ocr", "format": format} + return await self._request_json("GET", f"/doc/{doc_id}/", params=params) + + async def delete_document(self, doc_id: str) -> Dict[str, Any]: + response = await self._client.request("DELETE", f"/doc/{doc_id}/") + response.raise_for_status() + if not response.content: + return {"status": "deleted"} + return response.json() + + async def is_retrieval_ready(self, doc_id: str) -> bool: + try: + result = await self.get_tree(doc_id) + except httpx.HTTPError: + return False + return bool(result.get("retrieval_ready")) + + async def submit_retrieval_query(self, doc_id: str, query: str, thinking: bool = False) -> Dict[str, Any]: + payload = {"doc_id": doc_id, "query": query, "thinking": thinking} + return await self._request_json("POST", "/retrieval/", json=payload) + + async def get_retrieval_result(self, retrieval_id: str) -> Dict[str, Any]: + return await self._request_json("GET", f"/retrieval/{retrieval_id}/") + + async def chat_completions(self, messages: List[Message], doc_id: Optional[DocId] = None) -> Dict[str, Any]: + payload: Dict[str, Any] = {"messages": messages, "stream": False} + if doc_id is not None: + payload["doc_id"] = doc_id + return await self._request_json("POST", "/chat/completions", json=payload) + + async def chat_completions_stream( + self, messages: List[Message], doc_id: Optional[DocId] = None + ) -> AsyncIterator[Dict[str, Any]]: + payload: Dict[str, Any] = {"messages": messages, "stream": True} + if doc_id is not None: + payload["doc_id"] = doc_id + + async with self._client.stream("POST", "/chat/completions", json=payload) as response: + response.raise_for_status() + async for line in response.aiter_lines(): + if not line: + continue + if not line.startswith("data: "): + continue + data = line[6:] + if data == "[DONE]": + break + yield json_loads_safe(data) + + +def json_loads_safe(data: str) -> Dict[str, Any]: + try: + import json + + value = json.loads(data) + except Exception: + return {"raw": data} + if isinstance(value, dict): + return value + return {"value": value} diff --git a/requirements.txt b/requirements.txt index 463db58f1..aaa362184 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ openai==1.101.0 +httpx>=0.27.0 pymupdf==1.26.4 PyPDF2==3.0.1 python-dotenv==1.1.0 diff --git a/tests/test_pageindex_client.py b/tests/test_pageindex_client.py new file mode 100644 index 000000000..cab293ec6 --- /dev/null +++ b/tests/test_pageindex_client.py @@ -0,0 +1,75 @@ +import json +import unittest + +import httpx + +from pageindex.client import AsyncPageIndexClient, PageIndexClient + + +class TestPageIndexClient(unittest.TestCase): + def test_chat_completions_builds_expected_request(self) -> None: + captured = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["method"] = request.method + captured["url"] = str(request.url) + captured["headers"] = dict(request.headers) + captured["json"] = json.loads(request.content.decode("utf-8")) + return httpx.Response(200, json={"choices": [{"message": {"content": "ok"}}]}) + + transport = httpx.MockTransport(handler) + + with PageIndexClient(api_key="test-key", base_url="https://api.pageindex.ai/", transport=transport) as client: + response = client.chat_completions( + messages=[{"role": "user", "content": "hi"}], + doc_id="pi-abc123", + ) + + self.assertEqual(captured["method"], "POST") + self.assertEqual(captured["url"], "https://api.pageindex.ai/chat/completions") + self.assertEqual(captured["headers"].get("api_key"), "test-key") + self.assertEqual(captured["json"]["doc_id"], "pi-abc123") + self.assertFalse(captured["json"]["stream"]) + self.assertIn("choices", response) + + def test_get_tree_sets_query_params(self) -> None: + captured = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = request.url + return httpx.Response(200, json={"status": "completed", "result": [], "retrieval_ready": True}) + + transport = httpx.MockTransport(handler) + + with PageIndexClient(api_key="test-key", base_url="https://api.pageindex.ai", transport=transport) as client: + response = client.get_tree("pi-abc123", summary=True) + + self.assertEqual(str(captured["url"]).split("?")[0], "https://api.pageindex.ai/doc/pi-abc123/") + self.assertEqual(captured["url"].params.get("type"), "tree") + self.assertEqual(captured["url"].params.get("summary"), "true") + self.assertTrue(response.get("retrieval_ready")) + + +class TestAsyncPageIndexClient(unittest.IsolatedAsyncioTestCase): + async def test_async_get_document(self) -> None: + captured = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["method"] = request.method + captured["url"] = str(request.url) + captured["headers"] = dict(request.headers) + return httpx.Response(200, json={"doc_id": "pi-abc123", "status": "processing"}) + + transport = httpx.MockTransport(handler) + + async with AsyncPageIndexClient( + api_key="test-key", + base_url="https://api.pageindex.ai", + transport=transport, + ) as client: + result = await client.get_document("pi-abc123") + + self.assertEqual(captured["method"], "GET") + self.assertEqual(captured["url"], "https://api.pageindex.ai/doc/pi-abc123/") + self.assertEqual(captured["headers"].get("api_key"), "test-key") + self.assertEqual(result.get("status"), "processing")