Skip to content

Commit 1f1cda2

Browse files
authored
Fix: stages_timing uses correct prefix (#647)
* stages_timing uses correct prefix
1 parent d4b1e3a commit 1f1cda2

File tree

3 files changed

+36
-32
lines changed

3 files changed

+36
-32
lines changed

sources/hubspot/__init__.py

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969

7070

7171
def fetch_data_for_properties(
72-
props: Sequence[str],
72+
props: List[str],
7373
api_key: str,
7474
object_type: str,
7575
soft_delete: bool,
@@ -78,16 +78,17 @@ def fetch_data_for_properties(
7878
Fetch data for a given set of properties from the HubSpot API.
7979
8080
Args:
81-
props (Sequence[str]): List of property names to fetch.
81+
props (List[str]): List of property names to fetch.
8282
api_key (str): HubSpot API key for authentication.
8383
object_type (str): The type of HubSpot object (e.g., 'company', 'contact').
8484
soft_delete (bool): Flag to fetch soft-deleted (archived) records.
8585
8686
Yields:
8787
Iterator[TDataItems]: Data retrieved from the HubSpot API.
8888
"""
89-
90-
params: Dict[str, Any] = {"properties": props, "limit": 100}
89+
# The Hubspot API expects a comma separated string as properties
90+
joined_props = ",".join(sorted(props))
91+
params: Dict[str, Any] = {"properties": joined_props, "limit": 100}
9192
context: Optional[Dict[str, Any]] = (
9293
{SOFT_DELETE_KEY: False} if soft_delete else None
9394
)
@@ -135,7 +136,7 @@ def crm_objects(
135136
for prop, hb_type in props_to_type.items()
136137
}
137138
for batch in fetch_data_for_properties(
138-
",".join(sorted(props_to_type.keys())), api_key, object_type, archived
139+
list(props_to_type.keys()), api_key, object_type, archived
139140
):
140141
yield dlt.mark.with_hints(batch, dlt.mark.make_hints(columns=col_type_hints))
141142

@@ -151,9 +152,9 @@ def crm_object_history(
151152
152153
Args:
153154
object_type (str): Type of HubSpot object (e.g., 'company', 'contact').
154-
api_key (str, optional): API key for HubSpot authentication.
155-
props (List[str], optional): List of properties to retrieve. Defaults to None.
156-
include_custom_props (bool, optional): Include custom properties in the result. Defaults to True.
155+
api_key (str): API key for HubSpot authentication.
156+
props (List[str]): List of properties to retrieve. Defaults to None.
157+
include_custom_props (bool): Include custom properties in the result. Defaults to True.
157158
158159
Yields:
159160
Iterator[TDataItems]: Historical property data.
@@ -207,7 +208,11 @@ def pivot_stages_properties(
207208
continue
208209
id_val = record_not_null.pop(id_prop)
209210
new_data += [
210-
{id_prop: id_val, property_prefix: v, "stage": k.split(property_prefix)[1]}
211+
{
212+
id_prop: id_val,
213+
property_prefix: v,
214+
"stage": k.split(property_prefix)[1],
215+
}
211216
for k, v in record_not_null.items()
212217
if k.startswith(property_prefix)
213218
]
@@ -221,12 +226,8 @@ def stages_timing(
221226
) -> Iterator[TDataItems]:
222227
"""
223228
Fetch stage timing data for a specific object type from the HubSpot API. Some entities, like,
224-
deals and tickets actually have pipelines with multiple stages, which they can enter and exit. This function fetches
225-
history of entering and exiting different stages for the given object.
226-
227-
We have to request them separately, because these properties has the pipeline stage_id in the name.
228-
For example, "hs_date_entered_12345678", where 12345678 is the stage_id.
229-
229+
deals and tickets have pipelines with multiple stages, which they can enter and exit. This function fetches
230+
history of entering different stages for the given object.
230231
231232
Args:
232233
object_type (str): Type of HubSpot object (e.g., 'deal', 'ticket').
@@ -236,7 +237,6 @@ def stages_timing(
236237
Yields:
237238
Iterator[TDataItems]: Stage timing data.
238239
"""
239-
240240
all_properties: List[str] = list(
241241
_get_property_names_types(api_key, object_type).keys()
242242
)
@@ -248,10 +248,7 @@ def stages_timing(
248248
# data for the whole properties list. Therefore, in the following lines we request
249249
# data iteratively for chunks of the properties list.
250250
for chunk in chunk_properties(date_entered_properties, MAX_PROPS_LENGTH):
251-
props_part = ",".join(chunk)
252-
for data in fetch_data_for_properties(
253-
props_part, api_key, object_type, soft_delete
254-
):
251+
for data in fetch_data_for_properties(chunk, api_key, object_type, soft_delete):
255252
yield pivot_stages_properties(data)
256253

257254

sources/hubspot/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117
SOFT_DELETE_KEY = "is_deleted"
118118
ARCHIVED_PARAM = {"archived": True}
119119
PREPROCESSING = {"split": ["hs_merged_object_ids"]}
120-
STAGE_PROPERTY_PREFIX = "hs_date_entered_"
120+
STAGE_PROPERTY_PREFIX = "hs_v2_date_entered_"
121121
MAX_PROPS_LENGTH = 2000
122122
PROPERTIES_WITH_CUSTOM_LABELS = ()
123123

tests/hubspot/test_hubspot_source.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -464,23 +464,30 @@ def test_all_resources(destination_name: str) -> None:
464464

465465
load_info = pipeline.run(
466466
hubspot(include_history=True).with_resources(
467-
"contacts", "deals", "companies", "contacts_property_history"
467+
"contacts",
468+
"deals",
469+
"companies",
470+
"contacts_property_history",
471+
"stages_timing_deals",
468472
)
469473
)
470474

471475
assert_load_info(load_info)
472-
table_names = [
473-
t["name"]
474-
for t in pipeline.default_schema.data_tables()
475-
if not t["name"].endswith("_property_history") and not t.get("parent")
476-
]
477476

478477
# make sure no duplicates (ie. pages wrongly overlap)
479-
assert (
480-
load_table_counts(pipeline, *table_names)
481-
== load_table_distinct_counts(pipeline, "hs_object_id", *table_names)
482-
== {"companies": 4, "contacts": 3, "deals": 3}
483-
)
478+
expected_counts = {
479+
"companies": 4,
480+
"contacts": 3,
481+
"deals": 3,
482+
"stages_timing_deals": 8,
483+
}
484+
for table, expected in expected_counts.items():
485+
distinct_col = "_dlt_id" if table == "stages_timing_deals" else "hs_object_id"
486+
assert (
487+
load_table_counts(pipeline, table)
488+
== load_table_distinct_counts(pipeline, distinct_col, table)
489+
== {table: expected}
490+
)
484491

485492
history_table_names = [
486493
t["name"]

0 commit comments

Comments
 (0)