Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pageindex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
from .page_index import *
from .page_index_md import md_to_tree
from .client import AsyncPageIndexClient, PageIndexClient

try:
from .page_index import *
from .page_index_md import md_to_tree
except ImportError:
pass
240 changes: 240 additions & 0 deletions pageindex/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import os
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Union

import httpx


DocId = Union[str, List[str]]
Message = Dict[str, Any]


class PageIndexClient:
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: float = 60.0,
transport: Optional[httpx.BaseTransport] = None,
) -> None:
self.api_key = api_key or os.getenv("PAGEINDEX_API_KEY")
self.base_url = (base_url or os.getenv("PAGEINDEX_BASE_URL") or "https://api.pageindex.ai").rstrip("/")

headers: Dict[str, str] = {}
if self.api_key:
headers["api_key"] = self.api_key

self._client = httpx.Client(
base_url=self.base_url,
headers=headers,
timeout=timeout,
transport=transport,
)

def close(self) -> None:
self._client.close()

def __enter__(self) -> "PageIndexClient":
return self

def __exit__(self, exc_type, exc, tb) -> None:
self.close()

def _request_json(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]:
response = self._client.request(method, url, **kwargs)
response.raise_for_status()
if not response.content:
return {}
return response.json()

def submit_document(self, file_path: str) -> Dict[str, Any]:
with open(file_path, "rb") as file:
files = {"file": (os.path.basename(file_path), file, "application/pdf")}
return self._request_json("POST", "/doc/", files=files)

def submit_markdown(self, file_path: str) -> Dict[str, Any]:
with open(file_path, "rb") as file:
files = {"file": (os.path.basename(file_path), file, "text/markdown")}
return self._request_json("POST", "/markdown/", files=files)

def get_document(self, doc_id: str) -> Dict[str, Any]:
return self._request_json("GET", f"/doc/{doc_id}/")

def get_document_status(self, doc_id: str) -> Optional[str]:
return self.get_document(doc_id).get("status")

def get_tree(self, doc_id: str, summary: bool = False) -> Dict[str, Any]:
params = {"type": "tree", "summary": summary}
return self._request_json("GET", f"/doc/{doc_id}/", params=params)

def get_ocr(self, doc_id: str, format: str = "page") -> Dict[str, Any]:
params = {"type": "ocr", "format": format}
return self._request_json("GET", f"/doc/{doc_id}/", params=params)

def delete_document(self, doc_id: str) -> Dict[str, Any]:
response = self._client.request("DELETE", f"/doc/{doc_id}/")
response.raise_for_status()
if not response.content:
return {"status": "deleted"}
return response.json()

def is_retrieval_ready(self, doc_id: str) -> bool:
try:
result = self.get_tree(doc_id)
except httpx.HTTPError:
return False
return bool(result.get("retrieval_ready"))

def submit_retrieval_query(self, doc_id: str, query: str, thinking: bool = False) -> Dict[str, Any]:
payload = {"doc_id": doc_id, "query": query, "thinking": thinking}
return self._request_json("POST", "/retrieval/", json=payload)

def get_retrieval_result(self, retrieval_id: str) -> Dict[str, Any]:
return self._request_json("GET", f"/retrieval/{retrieval_id}/")

def chat_completions(self, messages: List[Message], doc_id: Optional[DocId] = None) -> Dict[str, Any]:
payload: Dict[str, Any] = {"messages": messages, "stream": False}
if doc_id is not None:
payload["doc_id"] = doc_id
return self._request_json("POST", "/chat/completions", json=payload)

def chat_completions_stream(
self, messages: List[Message], doc_id: Optional[DocId] = None
) -> Iterator[Dict[str, Any]]:
payload: Dict[str, Any] = {"messages": messages, "stream": True}
if doc_id is not None:
payload["doc_id"] = doc_id

with self._client.stream("POST", "/chat/completions", json=payload) as response:
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
if isinstance(line, bytes):
line = line.decode("utf-8", errors="replace")
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
yield json_loads_safe(data)


class AsyncPageIndexClient:
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: float = 60.0,
transport: Optional[httpx.AsyncBaseTransport] = None,
) -> None:
self.api_key = api_key or os.getenv("PAGEINDEX_API_KEY")
self.base_url = (base_url or os.getenv("PAGEINDEX_BASE_URL") or "https://api.pageindex.ai").rstrip("/")

headers: Dict[str, str] = {}
if self.api_key:
headers["api_key"] = self.api_key

self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=headers,
timeout=timeout,
transport=transport,
)

async def aclose(self) -> None:
await self._client.aclose()

async def __aenter__(self) -> "AsyncPageIndexClient":
return self

async def __aexit__(self, exc_type, exc, tb) -> None:
await self.aclose()

async def _request_json(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]:
response = await self._client.request(method, url, **kwargs)
response.raise_for_status()
if not response.content:
return {}
return response.json()

async def submit_document(self, file_path: str) -> Dict[str, Any]:
with open(file_path, "rb") as file:
files = {"file": (os.path.basename(file_path), file.read(), "application/pdf")}
return await self._request_json("POST", "/doc/", files=files)

async def submit_markdown(self, file_path: str) -> Dict[str, Any]:
with open(file_path, "rb") as file:
files = {"file": (os.path.basename(file_path), file.read(), "text/markdown")}
return await self._request_json("POST", "/markdown/", files=files)

async def get_document(self, doc_id: str) -> Dict[str, Any]:
return await self._request_json("GET", f"/doc/{doc_id}/")

async def get_document_status(self, doc_id: str) -> Optional[str]:
return (await self.get_document(doc_id)).get("status")

async def get_tree(self, doc_id: str, summary: bool = False) -> Dict[str, Any]:
params = {"type": "tree", "summary": summary}
return await self._request_json("GET", f"/doc/{doc_id}/", params=params)

async def get_ocr(self, doc_id: str, format: str = "page") -> Dict[str, Any]:
params = {"type": "ocr", "format": format}
return await self._request_json("GET", f"/doc/{doc_id}/", params=params)

async def delete_document(self, doc_id: str) -> Dict[str, Any]:
response = await self._client.request("DELETE", f"/doc/{doc_id}/")
response.raise_for_status()
if not response.content:
return {"status": "deleted"}
return response.json()

async def is_retrieval_ready(self, doc_id: str) -> bool:
try:
result = await self.get_tree(doc_id)
except httpx.HTTPError:
return False
return bool(result.get("retrieval_ready"))

async def submit_retrieval_query(self, doc_id: str, query: str, thinking: bool = False) -> Dict[str, Any]:
payload = {"doc_id": doc_id, "query": query, "thinking": thinking}
return await self._request_json("POST", "/retrieval/", json=payload)

async def get_retrieval_result(self, retrieval_id: str) -> Dict[str, Any]:
return await self._request_json("GET", f"/retrieval/{retrieval_id}/")

async def chat_completions(self, messages: List[Message], doc_id: Optional[DocId] = None) -> Dict[str, Any]:
payload: Dict[str, Any] = {"messages": messages, "stream": False}
if doc_id is not None:
payload["doc_id"] = doc_id
return await self._request_json("POST", "/chat/completions", json=payload)

async def chat_completions_stream(
self, messages: List[Message], doc_id: Optional[DocId] = None
) -> AsyncIterator[Dict[str, Any]]:
payload: Dict[str, Any] = {"messages": messages, "stream": True}
if doc_id is not None:
payload["doc_id"] = doc_id

async with self._client.stream("POST", "/chat/completions", json=payload) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line:
continue
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
yield json_loads_safe(data)


def json_loads_safe(data: str) -> Dict[str, Any]:
try:
import json

value = json.loads(data)
except Exception:
return {"raw": data}
if isinstance(value, dict):
return value
return {"value": value}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
openai==1.101.0
httpx>=0.27.0
pymupdf==1.26.4
PyPDF2==3.0.1
python-dotenv==1.1.0
Expand Down
75 changes: 75 additions & 0 deletions tests/test_pageindex_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import json
import unittest

import httpx

from pageindex.client import AsyncPageIndexClient, PageIndexClient


class TestPageIndexClient(unittest.TestCase):
def test_chat_completions_builds_expected_request(self) -> None:
captured = {}

def handler(request: httpx.Request) -> httpx.Response:
captured["method"] = request.method
captured["url"] = str(request.url)
captured["headers"] = dict(request.headers)
captured["json"] = json.loads(request.content.decode("utf-8"))
return httpx.Response(200, json={"choices": [{"message": {"content": "ok"}}]})

transport = httpx.MockTransport(handler)

with PageIndexClient(api_key="test-key", base_url="https://api.pageindex.ai/", transport=transport) as client:
response = client.chat_completions(
messages=[{"role": "user", "content": "hi"}],
doc_id="pi-abc123",
)

self.assertEqual(captured["method"], "POST")
self.assertEqual(captured["url"], "https://api.pageindex.ai/chat/completions")
self.assertEqual(captured["headers"].get("api_key"), "test-key")
self.assertEqual(captured["json"]["doc_id"], "pi-abc123")
self.assertFalse(captured["json"]["stream"])
self.assertIn("choices", response)

def test_get_tree_sets_query_params(self) -> None:
captured = {}

def handler(request: httpx.Request) -> httpx.Response:
captured["url"] = request.url
return httpx.Response(200, json={"status": "completed", "result": [], "retrieval_ready": True})

transport = httpx.MockTransport(handler)

with PageIndexClient(api_key="test-key", base_url="https://api.pageindex.ai", transport=transport) as client:
response = client.get_tree("pi-abc123", summary=True)

self.assertEqual(str(captured["url"]).split("?")[0], "https://api.pageindex.ai/doc/pi-abc123/")
self.assertEqual(captured["url"].params.get("type"), "tree")
self.assertEqual(captured["url"].params.get("summary"), "true")
self.assertTrue(response.get("retrieval_ready"))


class TestAsyncPageIndexClient(unittest.IsolatedAsyncioTestCase):
async def test_async_get_document(self) -> None:
captured = {}

def handler(request: httpx.Request) -> httpx.Response:
captured["method"] = request.method
captured["url"] = str(request.url)
captured["headers"] = dict(request.headers)
return httpx.Response(200, json={"doc_id": "pi-abc123", "status": "processing"})

transport = httpx.MockTransport(handler)

async with AsyncPageIndexClient(
api_key="test-key",
base_url="https://api.pageindex.ai",
transport=transport,
) as client:
result = await client.get_document("pi-abc123")

self.assertEqual(captured["method"], "GET")
self.assertEqual(captured["url"], "https://api.pageindex.ai/doc/pi-abc123/")
self.assertEqual(captured["headers"].get("api_key"), "test-key")
self.assertEqual(result.get("status"), "processing")