Skip to content

Commit 761250f

Browse files
committed
allow search to work past 10000
1 parent e2bf235 commit 761250f

File tree

2 files changed

+59
-39
lines changed

2 files changed

+59
-39
lines changed

sources/hubspot/__init__.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from .helpers import (
4242
_get_property_names_types,
4343
_to_dlt_columns_schema,
44-
search_data,
44+
search_data_since,
4545
fetch_data,
4646
fetch_property_history,
4747
get_properties_labels,
@@ -107,29 +107,14 @@ def fetch_data_for_properties(
107107
)
108108

109109
if last_modified is not None:
110-
logger.info(f"Attempting search starting at {last_modified}.")
111-
search_params: Dict[str, Any] = {
112-
"properties": sorted(props),
113-
"limit": 200,
114-
"filterGroups": [
115-
{
116-
"filters": [
117-
{
118-
"propertyName": LAST_MODIFIED_PROPERTY[object_type],
119-
"operator": "GTE",
120-
"value": last_modified,
121-
}
122-
]
123-
}
124-
],
125-
}
126-
127110
try:
128-
yield from search_data(
111+
yield from search_data_since(
129112
CRM_OBJECT_ENDPOINTS[object_type],
130113
api_key,
114+
last_modified,
115+
LAST_MODIFIED_PROPERTY[object_type],
116+
props=props,
131117
associations=associations,
132-
params=search_params,
133118
context=context,
134119
)
135120
except SearchOutOfBoundsException:
@@ -396,7 +381,7 @@ def properties_custom_labels(api_key: str = api_key) -> Iterator[TDataItems]:
396381
"""
397382

398383
def get_properties_description(
399-
properties_list_inner: List[Dict[str, Any]]
384+
properties_list_inner: List[Dict[str, Any]],
400385
) -> Iterator[Dict[str, Any]]:
401386
"""Fetch properties."""
402387
for property_info in properties_list_inner:

sources/hubspot/helpers.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import urllib.parse
66
from typing import Any, Dict, Iterator, List, Optional
77

8+
from dlt.common import logger
89
from dlt.common.schema.typing import TColumnSchema
910
from dlt.sources.helpers import requests
1011

@@ -63,11 +64,10 @@ def search_pagination(
6364
headers: Dict[str, Any],
6465
params: Optional[Dict[str, Any]] = None,
6566
) -> Optional[Dict[str, Any]]:
66-
_next = _data.get("paging", {}).get("next", False)
67-
if _next:
68-
after = _next["after"]
67+
_after = _data.get("paging", {}).get("next", {}).get("after", False)
68+
if _after and _after != "10000":
6969
# Get the next page response
70-
r = requests.post(url, headers=headers, json={**params, "after": after})
70+
r = requests.post(url, headers=headers, json={**params, "after": _after})
7171
return r.json() # type: ignore
7272
else:
7373
return None
@@ -151,35 +151,70 @@ def fetch_property_history(
151151
_data = None
152152

153153

154-
def search_data(
154+
def search_data_since(
155155
endpoint: str,
156156
api_key: str,
157+
last_modified: str,
158+
last_modified_prop: str,
159+
props: List[str],
157160
associations: Optional[List[str]] = None,
158-
params: Optional[Dict[str, Any]] = None,
159161
context: Optional[Dict[str, Any]] = None,
160162
) -> Iterator[List[Dict[str, Any]]]:
161163
# Construct the URL and headers for the API request
162164
url = get_url(CRM_SEARCH_ENDPOINT.format(crm_endpoint=endpoint))
163165
headers = _get_headers(api_key)
166+
body: Dict[str, Any] = {
167+
"properties": sorted(props),
168+
"limit": 200,
169+
"filterGroups": [
170+
{
171+
"filters": [
172+
{
173+
"propertyName": last_modified_prop,
174+
"operator": "GTE",
175+
"value": last_modified,
176+
}
177+
]
178+
}
179+
],
180+
"sorts": [{"propertyName": last_modified_prop, "direction": "ASCENDING"}],
181+
}
164182

165183
# Make the API request
166-
r = requests.post(url, headers=headers, json=params)
184+
r = requests.post(url, headers=headers, json=body)
167185
# Parse the API response and yield the properties of each result
168186
# Parse the response JSON data
169187
_data = r.json()
170188

171-
if _data.get("total", 0) > 9999:
172-
raise SearchOutOfBoundsException
173-
else:
174-
# Yield the properties of each result in the API response
175-
while _data is not None:
176-
if "results" in _data:
177-
yield _data_to_objects(
178-
_data, endpoint, headers, associations=associations, context=context
179-
)
189+
_total = _data.get("total", 0)
190+
logger.info(f"Getting {_total} new objects from {url} starting at {last_modified}")
191+
_max_last_modified = last_modified
192+
# Yield the properties of each result in the API response
193+
while _data is not None:
194+
if "results" in _data:
195+
for _result in _data["results"]:
196+
if _result["updatedAt"]:
197+
_max_last_modified = max(_max_last_modified, _result["updatedAt"])
198+
yield _data_to_objects(
199+
_data, endpoint, headers, associations=associations, context=context
200+
)
180201

181-
# Follow pagination links if they exist
182-
_data = search_pagination(url, _data, headers, params)
202+
# Follow pagination links if they exist
203+
_data = search_pagination(url, _data, headers, body)
204+
205+
if _total > 9999:
206+
if _max_last_modified == last_modified:
207+
raise SearchOutOfBoundsException
208+
logger.info(f"Starting new search iteration at {_max_last_modified}")
209+
yield from search_data_since(
210+
endpoint,
211+
api_key,
212+
_max_last_modified,
213+
last_modified_prop,
214+
props,
215+
associations,
216+
context,
217+
)
183218

184219

185220
def fetch_data(

0 commit comments

Comments
 (0)