88from dlt .common .schema .typing import TColumnSchema
99from dlt .sources .helpers import requests
1010
11- from .settings import OBJECT_TYPE_PLURAL , HS_TO_DLT_TYPE
11+ from .settings import (
12+ CRM_ASSOCIATIONS_ENDPOINT ,
13+ CRM_SEARCH_ENDPOINT ,
14+ OBJECT_TYPE_PLURAL ,
15+ HS_TO_DLT_TYPE ,
16+ )
1217
1318BASE_URL = "https://api.hubapi.com/"
1419
@@ -62,7 +67,7 @@ def search_pagination(
6267 if _next :
6368 after = _next ["after" ]
6469 # Get the next page response
65- r = requests .post (url , headers = headers , json = {** params , after : after })
70+ r = requests .post (url , headers = headers , json = {** params , " after" : after })
6671 return r .json () # type: ignore
6772 else :
6873 return None
@@ -149,11 +154,12 @@ def fetch_property_history(
149154def search_data (
150155 endpoint : str ,
151156 api_key : str ,
157+ associations : Optional [List [str ]] = None ,
152158 params : Optional [Dict [str , Any ]] = None ,
153159 context : Optional [Dict [str , Any ]] = None ,
154160) -> Iterator [List [Dict [str , Any ]]]:
155161 # Construct the URL and headers for the API request
156- url = get_url (endpoint )
162+ url = get_url (CRM_SEARCH_ENDPOINT . format ( crm_endpoint = endpoint ) )
157163 headers = _get_headers (api_key )
158164
159165 # Make the API request
@@ -168,7 +174,9 @@ def search_data(
168174 # Yield the properties of each result in the API response
169175 while _data is not None :
170176 if "results" in _data :
171- yield _data_to_objects (_data , headers , context )
177+ yield _data_to_objects (
178+ _data , endpoint , headers , associations = associations , context = context
179+ )
172180
173181 # Follow pagination links if they exist
174182 _data = search_pagination (url , _data , headers , params )
@@ -220,15 +228,17 @@ def fetch_data(
220228 # Yield the properties of each result in the API response
221229 while _data is not None :
222230 if "results" in _data :
223- yield _data_to_objects (_data , headers , context )
231+ yield _data_to_objects (_data , endpoint , headers , context = context )
224232
225233 # Follow pagination links if they exist
226234 _data = pagination (_data , headers )
227235
228236
229237def _data_to_objects (
230238 data : Any ,
239+ endpoint : str ,
231240 headers : Dict [str , str ],
241+ associations : Optional [List [str ]] = None ,
232242 context : Optional [Dict [str , Any ]] = None ,
233243) -> List [Dict [str , Any ]]:
234244 _objects : List [Dict [str , Any ]] = []
@@ -240,19 +250,36 @@ def _data_to_objects(
240250 if "associations" in _result :
241251 for association in _result ["associations" ]:
242252 __data = _result ["associations" ][association ]
243-
244- __values = extract_association_data (_obj , __data , association , headers )
245-
246- # remove duplicates from list of dicts
247- __values = [dict (t ) for t in {tuple (d .items ()) for d in __values }]
248-
249- _obj [association ] = __values
253+ _add_association_data (__data , association , headers , _obj )
254+ elif associations is not None :
255+ for association in associations :
256+ __endpoint = get_url (
257+ CRM_ASSOCIATIONS_ENDPOINT .format (
258+ crm_endpoint = endpoint ,
259+ object_id = _result ["id" ],
260+ association = association ,
261+ )
262+ )
263+ r = requests .get (__endpoint , headers = headers , params = {"limit" : 500 })
264+ __data = r .json ()
265+ _add_association_data (__data , association , headers , _obj )
250266 if context :
251267 _obj .update (context )
252268 _objects .append (_obj )
253269 return _objects
254270
255271
272+ def _add_association_data (
273+ data : Any , association : str , headers : Dict [str , str ], obj : Any
274+ ) -> None :
275+ __values = extract_association_data (obj , data , association , headers )
276+
277+ # remove duplicates from list of dicts
278+ __values = [dict (t ) for t in {tuple (d .items ()) for d in __values }]
279+
280+ obj [association ] = __values
281+
282+
256283def _get_property_names_types (
257284 api_key : str , object_type : str
258285) -> Dict [str , Union [str , None ]]:
0 commit comments