Skip to content

Commit 3bfc440

Browse files
committed
move CRM endpoints to search first
1 parent b3ef9bf commit 3bfc440

File tree

4 files changed

+3399
-2983
lines changed

4 files changed

+3399
-2983
lines changed

sources/hubspot/__init__.py

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,28 +30,32 @@
3030
List,
3131
Literal,
3232
Optional,
33-
Sequence,
3433
)
3534
from urllib.parse import quote
3635

3736
import dlt
38-
from dlt.common import pendulum
37+
from dlt.common import logger, pendulum
3938
from dlt.common.typing import TDataItems
4039
from dlt.sources import DltResource
4140

4241
from .helpers import (
4342
_get_property_names_types,
4443
_to_dlt_columns_schema,
44+
search_data,
4545
fetch_data,
4646
fetch_property_history,
4747
get_properties_labels,
48+
SearchOutOfBoundsException,
4849
)
4950
from .settings import (
5051
ALL_OBJECTS,
5152
ARCHIVED_PARAM,
5253
CRM_OBJECT_ENDPOINTS,
5354
CRM_PIPELINES_ENDPOINT,
55+
CRM_SEARCH_OBJECT_ENDPOINTS,
5456
ENTITY_PROPERTIES,
57+
LAST_MODIFIED_PROPERTY,
58+
HUBSPOT_CREATION_DATE,
5559
MAX_PROPS_LENGTH,
5660
OBJECT_TYPE_PLURAL,
5761
OBJECT_TYPE_SINGULAR,
@@ -73,6 +77,7 @@ def fetch_data_for_properties(
7377
api_key: str,
7478
object_type: str,
7579
soft_delete: bool,
80+
last_modified: str = None,
7681
) -> Iterator[TDataItems]:
7782
"""
7883
Fetch data for a given set of properties from the HubSpot API.
@@ -82,20 +87,56 @@ def fetch_data_for_properties(
8287
api_key (str): HubSpot API key for authentication.
8388
object_type (str): The type of HubSpot object (e.g., 'company', 'contact').
8489
soft_delete (bool): Flag to fetch soft-deleted (archived) records.
90+
last_modified (str): The date from which to fetch records. If None, get all records.
8591
8692
Yields:
8793
Iterator[TDataItems]: Data retrieved from the HubSpot API.
8894
"""
95+
logger.info(f"Fetching data for {object_type}.")
8996
# The Hubspot API expects a comma separated string as properties
9097
joined_props = ",".join(sorted(props))
9198
params: Dict[str, Any] = {"properties": joined_props, "limit": 100}
9299
context: Optional[Dict[str, Any]] = (
93100
{SOFT_DELETE_KEY: False} if soft_delete else None
94101
)
95102

96-
yield from fetch_data(
97-
CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context
98-
)
103+
if last_modified is not None:
104+
logger.info(f"Attempting search starting at {last_modified}.")
105+
search_params: Dict[str, Any] = {
106+
"properties": sorted(props),
107+
"limit": 200,
108+
"filterGroups": [
109+
{
110+
"filters": [
111+
{
112+
"propertyName": LAST_MODIFIED_PROPERTY[object_type],
113+
"operator": "GTE",
114+
"value": last_modified,
115+
}
116+
]
117+
}
118+
],
119+
}
120+
121+
try:
122+
yield from search_data(
123+
CRM_SEARCH_OBJECT_ENDPOINTS[object_type],
124+
api_key,
125+
params=search_params,
126+
context=context,
127+
)
128+
except SearchOutOfBoundsException:
129+
logger.info("Search out of bounds, fetching all data")
130+
yield from fetch_data(
131+
CRM_OBJECT_ENDPOINTS[object_type],
132+
api_key,
133+
params=params,
134+
context=context,
135+
)
136+
else:
137+
yield from fetch_data(
138+
CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context
139+
)
99140
if soft_delete:
100141
yield from fetch_data(
101142
CRM_OBJECT_ENDPOINTS[object_type],
@@ -109,6 +150,7 @@ def crm_objects(
109150
object_type: str,
110151
api_key: str,
111152
props: List[str],
153+
last_modified: dlt.sources.incremental[str],
112154
include_custom_props: bool = True,
113155
archived: bool = False,
114156
) -> Iterator[TDataItems]:
@@ -119,6 +161,7 @@ def crm_objects(
119161
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
120162
api_key (str): API key for HubSpot authentication.
121163
props (List[str]): List of properties to retrieve.
164+
last_modified (str): The date from which to fetch records
122165
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
123166
archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False.
124167
@@ -135,8 +178,17 @@ def crm_objects(
135178
prop: _to_dlt_columns_schema({prop: hb_type})
136179
for prop, hb_type in props_to_type.items()
137180
}
181+
last_modified_on = (
182+
None
183+
if last_modified.start_value == last_modified.initial_value
184+
else last_modified.start_value
185+
)
138186
for batch in fetch_data_for_properties(
139-
list(props_to_type.keys()), api_key, object_type, archived
187+
list(props_to_type.keys()),
188+
api_key,
189+
object_type,
190+
archived,
191+
last_modified_on,
140192
):
141193
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))
142194

@@ -176,7 +228,7 @@ def crm_object_history(
176228
# This is especially relevant for columns of type "number" in Hubspot
177229
# that are returned as strings by the API
178230
for batch in fetch_property_history(
179-
CRM_OBJECT_ENDPOINTS[object_type],
231+
CRM_SEARCH_OBJECT_ENDPOINTS[object_type],
180232
api_key,
181233
",".join(sorted(props_to_type.keys())),
182234
):
@@ -411,6 +463,10 @@ def get_pipelines(object_type: str) -> Iterator[TDataItems]:
411463
object_type=obj,
412464
api_key=api_key,
413465
props=properties.get(obj),
466+
last_modified=dlt.sources.incremental(
467+
LAST_MODIFIED_PROPERTY[obj],
468+
initial_value=HUBSPOT_CREATION_DATE.isoformat(),
469+
),
414470
include_custom_props=include_custom_props,
415471
archived=soft_delete,
416472
)

sources/hubspot/helpers.py

Lines changed: 77 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
BASE_URL = "https://api.hubapi.com/"
1414

1515

16+
class SearchOutOfBoundsException(Exception):
17+
pass
18+
19+
1620
def get_url(endpoint: str) -> str:
1721
"""Get absolute hubspot endpoint URL"""
1822
return urllib.parse.urljoin(BASE_URL, endpoint)
@@ -48,6 +52,22 @@ def pagination(
4852
return None
4953

5054

55+
def search_pagination(
56+
url: str,
57+
_data: Dict[str, Any],
58+
headers: Dict[str, Any],
59+
params: Optional[Dict[str, Any]] = None,
60+
) -> Optional[Dict[str, Any]]:
61+
_next = _data.get("paging", {}).get("next", False)
62+
if _next:
63+
after = _next["after"]
64+
# Get the next page response
65+
r = requests.post(url, headers=headers, json={**params, after: after})
66+
return r.json() # type: ignore
67+
else:
68+
return None
69+
70+
5171
def extract_association_data(
5272
_obj: Dict[str, Any],
5373
data: Dict[str, Any],
@@ -126,6 +146,34 @@ def fetch_property_history(
126146
_data = None
127147

128148

149+
def search_data(
150+
endpoint: str,
151+
api_key: str,
152+
params: Optional[Dict[str, Any]] = None,
153+
context: Optional[Dict[str, Any]] = None,
154+
) -> Iterator[List[Dict[str, Any]]]:
155+
# Construct the URL and headers for the API request
156+
url = get_url(endpoint)
157+
headers = _get_headers(api_key)
158+
159+
# Make the API request
160+
r = requests.post(url, headers=headers, json=params)
161+
# Parse the API response and yield the properties of each result
162+
# Parse the response JSON data
163+
_data = r.json()
164+
165+
if _data.get("total", 0) > 9999:
166+
raise SearchOutOfBoundsException
167+
else:
168+
# Yield the properties of each result in the API response
169+
while _data is not None:
170+
if "results" in _data:
171+
yield _data_to_objects(_data, headers, context)
172+
173+
# Follow pagination links if they exist
174+
_data = search_pagination(url, _data, headers, params)
175+
176+
129177
def fetch_data(
130178
endpoint: str,
131179
api_key: str,
@@ -168,38 +216,43 @@ def fetch_data(
168216
# Parse the API response and yield the properties of each result
169217
# Parse the response JSON data
170218
_data = r.json()
219+
171220
# Yield the properties of each result in the API response
172221
while _data is not None:
173222
if "results" in _data:
174-
_objects: List[Dict[str, Any]] = []
175-
for _result in _data["results"]:
176-
_obj = _result.get("properties", _result)
177-
if "id" not in _obj and "id" in _result:
178-
# Move id from properties to top level
179-
_obj["id"] = _result["id"]
180-
if "associations" in _result:
181-
for association in _result["associations"]:
182-
__data = _result["associations"][association]
183-
184-
__values = extract_association_data(
185-
_obj, __data, association, headers
186-
)
187-
188-
# remove duplicates from list of dicts
189-
__values = [
190-
dict(t) for t in {tuple(d.items()) for d in __values}
191-
]
192-
193-
_obj[association] = __values
194-
if context:
195-
_obj.update(context)
196-
_objects.append(_obj)
197-
yield _objects
223+
yield _data_to_objects(_data, headers, context)
198224

199225
# Follow pagination links if they exist
200226
_data = pagination(_data, headers)
201227

202228

229+
def _data_to_objects(
230+
data: Any,
231+
headers: Dict[str, str],
232+
context: Optional[Dict[str, Any]] = None,
233+
) -> List[Dict[str, Any]]:
234+
_objects: List[Dict[str, Any]] = []
235+
for _result in data["results"]:
236+
_obj = _result.get("properties", _result)
237+
if "id" not in _obj and "id" in _result:
238+
# Move id from properties to top level
239+
_obj["id"] = _result["id"]
240+
if "associations" in _result:
241+
for association in _result["associations"]:
242+
__data = _result["associations"][association]
243+
244+
__values = extract_association_data(_obj, __data, association, headers)
245+
246+
# remove duplicates from list of dicts
247+
__values = [dict(t) for t in {tuple(d.items()) for d in __values}]
248+
249+
_obj[association] = __values
250+
if context:
251+
_obj.update(context)
252+
_objects.append(_obj)
253+
return _objects
254+
255+
203256
def _get_property_names_types(
204257
api_key: str, object_type: str
205258
) -> Dict[str, Union[str, None]]:

sources/hubspot/settings.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,25 @@
33
from dlt.common import pendulum
44
from dlt.common.data_types import TDataType
55

6+
HUBSPOT_CREATION_DATE = pendulum.datetime(year=2006, month=6, day=1)
67
STARTDATE = pendulum.datetime(year=2024, month=2, day=10)
78

9+
CRM_CONTACTS_SEARCH_ENDPOINT = "/crm/v3/objects/contacts/search"
10+
CRM_COMPANIES_SEARCH_ENDPOINT = "/crm/v3/objects/companies/search"
11+
CRM_DEALS_SEARCH_ENDPOINT = "/crm/v3/objects/deals/search"
12+
CRM_PRODUCTS_SEARCH_ENDPOINT = "/crm/v3/objects/products/search"
13+
CRM_TICKETS_SEARCH_ENDPOINT = "/crm/v3/objects/tickets/search"
14+
CRM_QUOTES_SEARCH_ENDPOINT = "/crm/v3/objects/quotes/search"
15+
16+
CRM_SEARCH_OBJECT_ENDPOINTS = {
17+
"contact": CRM_CONTACTS_SEARCH_ENDPOINT,
18+
"company": CRM_COMPANIES_SEARCH_ENDPOINT,
19+
"deal": CRM_DEALS_SEARCH_ENDPOINT,
20+
"product": CRM_PRODUCTS_SEARCH_ENDPOINT,
21+
"ticket": CRM_TICKETS_SEARCH_ENDPOINT,
22+
"quote": CRM_QUOTES_SEARCH_ENDPOINT,
23+
}
24+
825
CRM_CONTACTS_ENDPOINT = (
926
"/crm/v3/objects/contacts?associations=deals,products,tickets,quotes"
1027
)
@@ -112,6 +129,14 @@
112129
"quote": DEFAULT_QUOTE_PROPS,
113130
}
114131

132+
LAST_MODIFIED_PROPERTY = {
133+
"company": "hs_lastmodifieddate",
134+
"contact": "lastmodifieddate",
135+
"deal": "hs_lastmodifieddate",
136+
"ticket": "hs_lastmodifieddate",
137+
"product": "hs_lastmodifieddate",
138+
"quote": "hs_lastmodifieddate",
139+
}
115140

116141
PIPELINES_OBJECTS = ["deals", "tickets"]
117142
SOFT_DELETE_KEY = "is_deleted"

0 commit comments

Comments
 (0)