Skip to content

Commit e00df1d

Browse files
andhreljaKernJWittmeyerlumburovskalina
authored
Cognition ETL Provider (#201)
* perf: add etl task table * perf(etl_task): add llm_config * perf: rename EtlTask class * perf: add business objects * fix: optional file ids in etl task create * perf: add file_path to etl_task * perf: file paths * perf: add tokenizer column * perf: add file_size col * perf: add split config * perf: add split config to etl task * fix: set split types to chunk and shrink * perf: update enum name * perf: default ETLExtractorPDF to PDF2MD * perf: fkey alignment * perf: add ETLTransformer * perf: remove deleted cols * perf: add monitor.set_etl_task_to_failed * perf: EnumKern * perf: add etl_task_id to integration records * perf: enum alignment * perf: add project update for integration * perf: etl and integration deletions * fix: thread start in general * perf: add ETLExtractorPDF from_string * perf: integration_objects cleanup * perf: etl_task.cache_config * perf: error print * fix: overwrite etl_task_id fkey * perf: make get_or_create_etl_task a submodule function * fix: get_or_create_etl_task args * perf: align to integrations * perf: add ETLCacheKeys * perf: add file-cache to ETLCacheKey enum * perf: minor fixes * New table * Adds new enums * Tmp commot * tmp commit * Helper method * Helper for queue view * util changes * Adds optional fields for overwriting * Adds transf key & extr key * model iwth / * PArse scope proj * perf: etl utils update * perf: make enum EnumKern * fix: fallback config for integrations * fix: cleanup integration fallback * perf: update etl_utils * perf: etl task original file name * perf: models update etl utils enhancements * fix: localhost reference * perf: integration - refinery load * perf: update integration etl config * perf: add get_splitting_key method * perf: update config generation * perf: update cache keys * perf: rename get hashed str fn * perf: function defs * perf: update cache keys * perf: etl file cache deletion * perf: rework delete_etl cache * fix: delete etl cache * perf: query task queue for active tmp doc * perf: enhance fn args * Config preset creation * Change PDF to documents * perf: add langchain enums * Removes outdated project columns * perf: file_type infer * style: error message * style: error message * Submodule change for refinery-test * perf: add from_mimetype * small change * perf: rename ETLExtractor enums * update * fix: enum fix * fix: llmindicator -> llmidentifier * perf: minor enhancements * perf: enum file type enhancement * fix: integration notify config * Fix query * Fix project creation without tokenizer * Fix image types * perf: enhance file type inference * fix: hash prompt for transformation cache * perf: integration records * perf: track integration progress * perf: add supported extractors enum fn * perf: add extractor validations * perf: update dataset model * fix: markdown_file etl config loading step * perf: add meta_data to etl_task * perf: remove dataset.llm_config col * perf: various enhancements --------- Co-authored-by: JWittmeyer <jens.wittmeyer@kern.ai> Co-authored-by: Lina <lina.lumburovska@kern.ai>
1 parent 4ed1d9f commit e00df1d

File tree

15 files changed

+1547
-179
lines changed

15 files changed

+1547
-179
lines changed

business_objects/general.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def force_remove_and_refresh_session_by_id(session_id: str) -> bool:
7777
if session_id not in session_lookup:
7878
return False
7979
# context vars cant be closed from a different context but we can work around it by using a thread (which creates a new context) with the same id
80-
daemon.run_without_db_token(__close_in_context(session_id))
80+
daemon.run_without_db_token(__close_in_context, session_id)
8181
return True
8282

8383

business_objects/monitor.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from typing import Any, List, Optional
22
import datetime
3-
from . import general
4-
from submodules.model import enums, telemetry
5-
from submodules.model.models import TaskQueue, Organization
3+
from submodules.model.business_objects import general
4+
from submodules.model import enums
5+
from submodules.model.models import TaskQueue, Organization, CognitionIntegration
66
from submodules.model.util import prevent_sql_injection
77
from submodules.model.session import session
8+
from submodules.model.global_objects import etl_task as etl_task_db_bo
89
from submodules.model.cognition_objects import (
910
macro as macro_db_bo,
1011
markdown_file as markdown_file_db_bo,
@@ -207,9 +208,9 @@ def set_integration_task_to_failed(
207208
enums.CognitionMarkdownFileState
208209
] = enums.CognitionMarkdownFileState.FAILED,
209210
with_commit: bool = True,
210-
) -> None:
211+
) -> CognitionIntegration:
211212
# argument `state` is a workaround for cognition-gateway/api/routes/integrations.delete_many
212-
integration_db_bo.update(
213+
return integration_db_bo.update(
213214
id=integration_id,
214215
state=state,
215216
finished_at=datetime.datetime.now(datetime.timezone.utc),
@@ -220,6 +221,26 @@ def set_integration_task_to_failed(
220221
)
221222

222223

224+
def set_etl_task_to_failed(
225+
id: str,
226+
is_active: bool = False,
227+
error_message: Optional[str] = None,
228+
state: Optional[
229+
enums.CognitionMarkdownFileState
230+
] = enums.CognitionMarkdownFileState.FAILED,
231+
with_commit: bool = True,
232+
) -> None:
233+
# argument `state` is a workaround for cognition-gateway/api/routes/integrations.delete_many
234+
etl_task_db_bo.update(
235+
id=id,
236+
state=state,
237+
finished_at=datetime.datetime.now(datetime.timezone.utc),
238+
is_active=is_active,
239+
error_message=error_message,
240+
with_commit=with_commit,
241+
)
242+
243+
223244
def __select_running_information_source_payloads(
224245
project_id: Optional[str] = None,
225246
only_running: bool = False,

business_objects/task_queue.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,23 @@ def get_orphan_tasks() -> List[TaskQueue]:
3333
)
3434

3535

36+
def get_all_queued_etl_task_for_conversation(
37+
org_id: str, project_id: str, conversation_id: str
38+
) -> Optional[List[TaskQueue]]:
39+
return (
40+
session.query(TaskQueue)
41+
.filter(
42+
TaskQueue.organization_id == org_id,
43+
TaskQueue.task_type == enums.TaskType.EXECUTE_ETL.value,
44+
text(f"task_info->'tmp_doc_metadata'->>'project_id' = '{project_id}'"),
45+
text(
46+
f"task_info->'tmp_doc_metadata'->>'conversation_id' = '{conversation_id}'"
47+
),
48+
)
49+
.all()
50+
)
51+
52+
3653
def get_likely_failed_tasks(days: int = 1) -> List[TaskQueue]:
3754
return (
3855
session.query(TaskQueue)

cognition_objects/environment_variable.py

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from ..models import (
77
CognitionEnvironmentVariable,
88
CognitionMarkdownDataset,
9-
CognitionProject,
109
GraphRAGIndex,
1110
)
1211
from ..util import prevent_sql_injection
@@ -64,6 +63,22 @@ def get_by_name_and_org_id(
6463
)
6564

6665

66+
def get_by_id_and_org_id(
67+
org_id: str,
68+
id: str,
69+
) -> CognitionEnvironmentVariable:
70+
71+
return (
72+
session.query(CognitionEnvironmentVariable)
73+
.filter(
74+
CognitionEnvironmentVariable.organization_id == org_id,
75+
CognitionEnvironmentVariable.project_id == None,
76+
CognitionEnvironmentVariable.id == id,
77+
)
78+
.first()
79+
)
80+
81+
6782
def get_dataset_env_var_value(
6883
dataset_id: str, org_id: str, scope: Literal["extraction", "transformation"]
6984
) -> Union[str, None]:
@@ -122,43 +137,6 @@ def get_all_by_project_id(project_id: str) -> List[CognitionEnvironmentVariable]
122137
)
123138

124139

125-
def get_cognition_project_env_var_value(cognition_project_id: str) -> str:
126-
127-
env_var_id = cast(
128-
CognitionProject.llm_config.op("->")("transformation").op("->>")("envVarId"),
129-
UUID,
130-
)
131-
v = (
132-
session.query(CognitionEnvironmentVariable.value)
133-
.join(CognitionProject, env_var_id == CognitionEnvironmentVariable.id)
134-
.filter(
135-
CognitionProject.id == cognition_project_id,
136-
)
137-
.first()
138-
)
139-
if v and v[0]:
140-
return str(v[0])
141-
142-
143-
def get_cognition_project_extraction_env_var_value(
144-
cognition_project_id: str, envVar: str
145-
) -> str:
146-
env_var_id = cast(
147-
CognitionProject.llm_config.op("->")("extraction").op("->>")(envVar),
148-
UUID,
149-
)
150-
v = (
151-
session.query(CognitionEnvironmentVariable.value)
152-
.join(CognitionProject, env_var_id == CognitionEnvironmentVariable.id)
153-
.filter(
154-
CognitionProject.id == cognition_project_id,
155-
)
156-
.first()
157-
)
158-
if v and v[0]:
159-
return str(v[0])
160-
161-
162140
def get_cognition_graphrag_env_var_value(org_id: str, graphrag_index_id: str) -> str:
163141
# currently not in use because of fixed env var
164142

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from typing import Any, List, Optional, Dict
2+
from ..business_objects import general
3+
from ..session import session
4+
from ..models import (
5+
ETLConfigPresets,
6+
)
7+
8+
9+
def get(config_id: str) -> ETLConfigPresets:
10+
return (
11+
session.query(ETLConfigPresets)
12+
.filter(
13+
ETLConfigPresets.id == config_id,
14+
)
15+
.first()
16+
)
17+
18+
19+
def get_all_in_org(
20+
org_id: str,
21+
) -> List[ETLConfigPresets]:
22+
23+
return (
24+
session.query(ETLConfigPresets)
25+
.filter(
26+
ETLConfigPresets.organization_id == org_id,
27+
)
28+
.order_by(ETLConfigPresets.created_at.asc())
29+
.all()
30+
)
31+
32+
33+
def create(
34+
org_id: str,
35+
user_id: str,
36+
name: str,
37+
description: str,
38+
etl_config: Dict[str, Any],
39+
add_config: Dict[str, Any],
40+
with_commit: bool = True,
41+
) -> ETLConfigPresets:
42+
etl_config: ETLConfigPresets = ETLConfigPresets(
43+
created_by=user_id,
44+
organization_id=org_id,
45+
name=name,
46+
description=description,
47+
etl_config=etl_config,
48+
add_config=add_config,
49+
)
50+
general.add(etl_config, with_commit)
51+
return etl_config
52+
53+
54+
def update(
55+
org_id: str,
56+
etl_config_id: str,
57+
name: Optional[str] = None,
58+
description: Optional[str] = None,
59+
etl_config: Optional[Dict[str, Any]] = None,
60+
add_config: Optional[Dict[str, Any]] = None,
61+
with_commit: bool = True,
62+
) -> ETLConfigPresets:
63+
etl_config_item: ETLConfigPresets = get(etl_config_id)
64+
if not etl_config_item or str(etl_config_item.organization_id) != org_id:
65+
raise Exception("ETL Config not found")
66+
67+
if name is not None:
68+
etl_config_item.name = name
69+
if description is not None:
70+
etl_config_item.description = description
71+
if etl_config is not None:
72+
etl_config_item.etl_config = etl_config
73+
if add_config is not None:
74+
etl_config_item.add_config = add_config
75+
general.flush_or_commit(with_commit)
76+
return etl_config_item
77+
78+
79+
def delete(org_id: str, etl_config_id: str, with_commit: bool = True) -> None:
80+
session.query(ETLConfigPresets).filter(
81+
ETLConfigPresets.organization_id == org_id,
82+
ETLConfigPresets.id == etl_config_id,
83+
).delete()
84+
general.flush_or_commit(with_commit)

cognition_objects/integration.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
from sqlalchemy import func
44
from sqlalchemy.orm.attributes import flag_modified
55

6+
67
from ..business_objects import general
8+
from ..integration_objects import manager as integration_records_bo
79
from ..session import session
8-
from ..models import CognitionIntegration, CognitionGroup
10+
from ..models import CognitionIntegration, CognitionGroup, EtlTask
911
from ..enums import (
1012
CognitionMarkdownFileState,
1113
CognitionIntegrationType,
@@ -147,6 +149,49 @@ def get_last_synced_at(
147149
return result[0] if result else None
148150

149151

152+
def get_active_etl_tasks(
153+
integration_id: str,
154+
) -> List[EtlTask]:
155+
IntegrationModel = integration_records_bo.integration_model(integration_id)
156+
return (
157+
session.query(EtlTask)
158+
.filter(EtlTask.is_active == True)
159+
.join(
160+
IntegrationModel,
161+
(EtlTask.id == IntegrationModel.etl_task_id)
162+
& (IntegrationModel.integration_id == integration_id),
163+
)
164+
.all()
165+
)
166+
167+
168+
def get_all_etl_tasks(
169+
integration_id: str,
170+
) -> List[EtlTask]:
171+
IntegrationModel = integration_records_bo.integration_model(integration_id)
172+
return (
173+
session.query(EtlTask)
174+
.join(
175+
IntegrationModel,
176+
(IntegrationModel.etl_task_id == EtlTask.id)
177+
& (IntegrationModel.integration_id == integration_id),
178+
)
179+
.all()
180+
)
181+
182+
183+
def get_integration_progress(
184+
integration_id: str,
185+
) -> float:
186+
count_all_records = integration_records_bo.count(integration_id)
187+
all_tasks = get_all_etl_tasks(integration_id)
188+
finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES]
189+
190+
if count_all_records == 0:
191+
return 0.0
192+
return round((len(finished_tasks) / count_all_records) * 100.0, 2)
193+
194+
150195
def count_org_integrations(org_id: str) -> Dict[str, int]:
151196
counts = (
152197
session.query(CognitionIntegration.type, func.count(CognitionIntegration.id))
@@ -201,6 +246,7 @@ def create(
201246

202247
def update(
203248
id: str,
249+
project_id: Optional[str] = None,
204250
updated_by: Optional[str] = None,
205251
name: Optional[str] = None,
206252
description: Optional[str] = None,
@@ -220,6 +266,8 @@ def update(
220266
if not integration:
221267
return None
222268

269+
if project_id is not None and integration.project_id is None:
270+
integration.project_id = project_id
223271
if updated_by is not None:
224272
integration.updated_by = updated_by
225273
if name is not None:
@@ -279,6 +327,16 @@ def execution_finished(id: str) -> bool:
279327
def delete_many(
280328
ids: List[str], delete_cognition_groups: bool = True, with_commit: bool = True
281329
) -> None:
330+
for id in ids:
331+
integration_records, IntegrationModel = (
332+
integration_records_bo.get_all_by_integration_id(id)
333+
)
334+
integration_records_bo.delete_many(
335+
IntegrationModel,
336+
ids=[rec.id for rec in integration_records],
337+
with_commit=True,
338+
)
339+
282340
(
283341
session.query(CognitionIntegration)
284342
.filter(CognitionIntegration.id.in_(ids))
@@ -290,6 +348,7 @@ def delete_many(
290348
.filter(CognitionGroup.meta_data.op("->>")("integration_id").in_(ids))
291349
.delete(synchronize_session=False)
292350
)
351+
293352
general.flush_or_commit(with_commit)
294353

295354

0 commit comments

Comments
 (0)