Skip to content

Commit 67d2713

Browse files
v1.22.0 (#205)
* Links in the notification center navigate to a non existing docs * fix: integration progress calculation * fix: llm_config for extraction configs * perf: always perform md chunking regardless of transform config * fix: revert splitting on NO_TRANSFORMATION * perf: get_or_create etl task * fix: azure di config presets * fix: update supported file extensions --------- Co-authored-by: andhreljaKern <andrea.hrelja@kern.ai>
1 parent df3a8d5 commit 67d2713

File tree

5 files changed

+123
-122
lines changed

5 files changed

+123
-122
lines changed

cognition_objects/integration.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,20 @@ def get_all_etl_tasks(
183183
def get_integration_progress(
184184
integration_id: str,
185185
) -> float:
186-
count_all_records = integration_records_bo.count(integration_id)
186+
integration = get_by_id(integration_id)
187+
count_all_records = integration_records_bo.count(integration)
187188
all_tasks = get_all_etl_tasks(integration_id)
188189
finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES]
189190

190-
if count_all_records == 0:
191+
if (
192+
count_all_records == 0
193+
or integration.state == enums.CognitionMarkdownFileState.FAILED.value
194+
):
191195
return 0.0
192-
return round((len(finished_tasks) / count_all_records) * 100.0, 2)
196+
integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2)
197+
if integration.state not in FINISHED_STATES:
198+
integration_progress = min(integration_progress - 1, 0)
199+
return integration_progress
193200

194201

195202
def count_org_integrations(org_id: str) -> Dict[str, int]:

enums.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -400,17 +400,6 @@ class Pages(Enum):
400400
SETTINGS = "settings"
401401

402402

403-
class DOCS(Enum):
404-
UPLOADING_DATA = "https://docs.kern.ai/refinery/project-creation-and-data-upload"
405-
KNOWLEDGE_BASE = "https://docs.kern.ai/refinery/heuristics#labeling-functions"
406-
WORKFLOW = "https://docs.kern.ai/refinery/manual-labeling#labeling-workflow"
407-
CREATING_PROJECTS = "https://docs.kern.ai/refinery/project-creation-and-data-upload#project-creation-workflow"
408-
WEAK_SUPERVISION = "https://docs.kern.ai/refinery/weak-supervision"
409-
CREATE_EMBEDDINGS = "https://docs.kern.ai/refinery/embedding-integration"
410-
INFORMATION_SOURCES = "https://docs.kern.ai/refinery/heuristics#labeling-functions"
411-
DATA_BROWSER = "https://docs.kern.ai/refinery/data-management"
412-
413-
414403
class SliceTypes(Enum):
415404
STATIC_DEFAULT = "STATIC_DEFAULT"
416405
STATIC_OUTLIER = "STATIC_OUTLIER"
@@ -1091,7 +1080,7 @@ def get_supported_file_extensions(self) -> List[str]:
10911080
".webp",
10921081
".avif",
10931082
]
1094-
return ["txt"]
1083+
return [".txt"]
10951084

10961085
@staticmethod
10971086
def from_extension(value: str):

etl_utils.py

Lines changed: 35 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
FileReference,
1212
CognitionIntegration,
1313
IntegrationSharepoint,
14-
CognitionMarkdownDataset,
15-
CognitionMarkdownFile,
1614
)
1715

1816
ETL_DIR = Path(os.getenv("ETL_DIR", "/app/data/etl"))
@@ -24,14 +22,20 @@ def get_full_config_and_tokenizer_from_config_id(
2422
etl_config_id: Optional[str] = None, # or in file_reference.meta_data
2523
content_type: Optional[str] = None, # or in file_reference.content_type
2624
chunk_size: Optional[int] = 1000,
25+
# only set for markdown datasets
26+
markdown_file_id: Optional[str] = None, # or in file_reference.meta_data
2727
# only set for chat messages
28-
project_id: Optional[str] = None,
29-
conversation_id: Optional[str] = None,
28+
project_id: Optional[str] = None, # or in file_reference.meta_data
29+
conversation_id: Optional[str] = None, # or in file_reference.meta_data
3030
) -> Tuple[Dict[str, Any], str]:
31+
for_dataset = False
3132
for_project = False
3233
if project_id and conversation_id:
3334
# project related load
3435
for_project = True
36+
elif markdown_file_id:
37+
# dataset related load
38+
for_dataset = True
3539

3640
etl_preset_item = etl_config_presets_db_co.get(
3741
etl_config_id or file_reference.meta_data.get("etl_config_id")
@@ -46,6 +50,11 @@ def get_full_config_and_tokenizer_from_config_id(
4650
"llmIdentifier": llm_indicator_extract,
4751
"overwriteVisionPrompt": extraction_config.get("overwriteVisionPrompt"),
4852
}
53+
elif extraction_config.get("azureDiApiBase"):
54+
llm_config = {
55+
"azureDiApiBase": extraction_config["azureDiApiBase"],
56+
"azureDiEnvVarId": extraction_config["azureDiEnvVarId"],
57+
}
4958
full_config = [
5059
{
5160
"task_type": enums.CognitionMarkdownFileState.EXTRACTING.value,
@@ -56,7 +65,7 @@ def get_full_config_and_tokenizer_from_config_id(
5665
"minio_path": file_reference.minio_path,
5766
"fallback": None, # later filled by config of project
5867
},
59-
**llm_config,
68+
"llm_config": llm_config,
6069
},
6170
]
6271

@@ -68,8 +77,9 @@ def get_full_config_and_tokenizer_from_config_id(
6877
"llmIdentifier": transformation_config.get("llmIdentifier"),
6978
}
7079

80+
# splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm`
7181
splitting_config = {
72-
"llm_config": transformation_llm_config, # splitting strategy "CHUNK" needs llm_config to execute `split_large_sections_via_llm`
82+
"llm_config": transformation_llm_config,
7383
"task_type": enums.CognitionMarkdownFileState.SPLITTING.value,
7484
"task_config": {
7585
"use_cache": True,
@@ -79,8 +89,6 @@ def get_full_config_and_tokenizer_from_config_id(
7989
}
8090

8191
if transformation_type == "COMMON_ETL":
82-
# add default splitting for common etl
83-
8492
full_config.append(splitting_config)
8593
transformers = [
8694
{ # NOTE: __call_gpt_with_key only reads user_prompt
@@ -119,6 +127,7 @@ def get_full_config_and_tokenizer_from_config_id(
119127
},
120128
}
121129
)
130+
122131
if for_project:
123132
full_config.append(
124133
{
@@ -139,109 +148,24 @@ def get_full_config_and_tokenizer_from_config_id(
139148
},
140149
},
141150
)
142-
else:
151+
elif for_dataset:
143152
full_config.append(
144153
{
145154
"task_type": enums.CognitionMarkdownFileState.LOADING.value,
146155
"task_config": {
147156
"markdown_file": {
148157
"enabled": True,
149-
"id": file_reference.meta_data["markdown_file_id"],
158+
"id": (
159+
markdown_file_id
160+
or file_reference.meta_data["markdown_file_id"]
161+
),
150162
}
151163
},
152164
},
153165
)
154166
return full_config, etl_preset_item.etl_config.get("tokenizer")
155167

156168

157-
def get_full_config_for_markdown_file(
158-
file_reference: FileReference,
159-
markdown_dataset: CognitionMarkdownDataset,
160-
markdown_file: CognitionMarkdownFile,
161-
chunk_size: Optional[int] = 1000,
162-
) -> List[Dict[str, Any]]:
163-
extraction_llm_config, transformation_llm_config = __get_llm_config_from_dataset(
164-
markdown_dataset
165-
)
166-
extractor = markdown_file.meta_data.get("extractor")
167-
if extractor is None:
168-
print(
169-
f"WARNING: {__name__} - no extractor found in markdown_file meta_data for {file_reference.original_file_name}, will infer default"
170-
)
171-
172-
full_config = [
173-
{
174-
"llm_config": extraction_llm_config,
175-
"task_type": enums.CognitionMarkdownFileState.EXTRACTING.value,
176-
"task_config": {
177-
"use_cache": True,
178-
"extractor": extractor,
179-
"minio_path": file_reference.minio_path,
180-
"fallback": None, # later filled by config of project
181-
},
182-
},
183-
{
184-
"llm_config": extraction_llm_config,
185-
"task_type": enums.CognitionMarkdownFileState.SPLITTING.value,
186-
"task_config": {
187-
"use_cache": True,
188-
"strategy": enums.ETLSplitStrategy.CHUNK.value,
189-
"chunk_size": chunk_size,
190-
},
191-
},
192-
{
193-
"llm_config": transformation_llm_config,
194-
"task_type": enums.CognitionMarkdownFileState.TRANSFORMING.value,
195-
"task_config": {
196-
"use_cache": True,
197-
"transformers": [
198-
{ # NOTE: __call_gpt_with_key only reads user_prompt
199-
"enabled": False,
200-
"name": enums.ETLTransformer.CLEANSE.value,
201-
"system_prompt": None,
202-
"user_prompt": None,
203-
},
204-
{
205-
"enabled": True,
206-
"name": enums.ETLTransformer.TEXT_TO_TABLE.value,
207-
"system_prompt": None,
208-
"user_prompt": None,
209-
},
210-
{
211-
"enabled": False,
212-
"name": enums.ETLTransformer.SUMMARIZE.value,
213-
"system_prompt": None,
214-
"user_prompt": None,
215-
},
216-
],
217-
},
218-
},
219-
{
220-
"task_type": enums.CognitionMarkdownFileState.LOADING.value,
221-
"task_config": {
222-
"markdown_file": {
223-
"enabled": True,
224-
"id": str(markdown_file.id),
225-
},
226-
},
227-
},
228-
]
229-
return full_config
230-
231-
232-
def __get_llm_config_from_dataset(
233-
markdown_dataset: CognitionMarkdownDataset,
234-
) -> Tuple[Dict[str, Any], str]:
235-
extraction_llm_config = markdown_dataset.llm_config.get("extraction", {})
236-
transformation_llm_config = markdown_dataset.llm_config.get("transformation", {})
237-
if not extraction_llm_config or not transformation_llm_config:
238-
raise ValueError(
239-
f"Dataset with id {markdown_dataset.id} has incomplete llm_config"
240-
)
241-
242-
return extraction_llm_config, transformation_llm_config
243-
244-
245169
def get_full_config_for_integration(
246170
integration: CognitionIntegration,
247171
record: IntegrationSharepoint,
@@ -385,6 +309,9 @@ def rm_tree(path: Path):
385309
rm_tree(etl_cache_dir)
386310

387311

312+
# TODO: delete_etl_tasks for related file_reference_id
313+
314+
388315
def get_download_key(org_id: str, download_id: str) -> Path:
389316
return Path(org_id) / download_id / "download"
390317

@@ -490,10 +417,16 @@ def get_transformation_key(
490417
return transformation_key
491418

492419

493-
def get_hashed_string(*args, delimiter: str = "_") -> str:
494-
hash_string = delimiter.join(map(str, args))
495-
hasher = hashlib.new("sha256")
496-
hasher.update(hash_string.encode())
420+
def get_hashed_string(*args, delimiter: str = "_", from_bytes: bool = False) -> str:
421+
if not from_bytes:
422+
_hash = delimiter.join(map(str, args)).encode()
423+
else:
424+
try:
425+
_hash = next(map(bytes, args))
426+
except StopIteration:
427+
raise ValueError("ERROR: A 'bytes' argument is required to hash")
428+
429+
hasher = hashlib.sha256(_hash)
497430
return hasher.hexdigest()
498431

499432

global_objects/etl_task.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Any, List, Optional, Dict, Union
2+
from sqlalchemy.sql.expression import cast
23
from sqlalchemy.orm.attributes import flag_modified
4+
from sqlalchemy.dialects.postgresql import UUID
35

46
import datetime
57
import mimetypes
@@ -117,6 +119,70 @@ def get_supported_file_extensions() -> Dict[str, List[str]]:
117119
return file_extensions
118120

119121

122+
def get_or_create(
123+
org_id: str,
124+
user_id: str,
125+
original_file_name: str,
126+
file_size_bytes: int,
127+
tokenizer: Optional[str] = None,
128+
full_config: Optional[Dict[str, Any]] = None,
129+
file_path: Optional[str] = None,
130+
meta_data: Optional[Dict[str, Any]] = None,
131+
priority: Optional[int] = -1,
132+
id: Optional[str] = None,
133+
with_commit: bool = True,
134+
):
135+
if id:
136+
return get_by_id(id)
137+
138+
file_reference_id = meta_data.get("file_reference_id") if meta_data else None
139+
integration_id = meta_data.get("integration_id") if meta_data else None
140+
markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None
141+
query: EtlTask = session.query(EtlTask).filter(
142+
EtlTask.organization_id == org_id,
143+
EtlTask.original_file_name == original_file_name,
144+
EtlTask.file_size_bytes == file_size_bytes,
145+
)
146+
147+
if file_path:
148+
query = query.filter(EtlTask.file_path == file_path)
149+
if file_reference_id:
150+
query = query.filter(
151+
file_reference_id
152+
== cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID)
153+
)
154+
if markdown_file_id:
155+
query = query.filter(
156+
markdown_file_id
157+
== cast(EtlTask.meta_data.op("->>")("markdown_file_id"), UUID)
158+
)
159+
if integration_id:
160+
query = query.filter(
161+
integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID)
162+
)
163+
164+
# TODO: enhance
165+
if with_commit is False:
166+
return query.first()
167+
168+
if etl_task := query.first():
169+
return etl_task
170+
171+
return create(
172+
org_id=org_id,
173+
user_id=user_id,
174+
original_file_name=original_file_name,
175+
file_size_bytes=file_size_bytes,
176+
tokenizer=tokenizer,
177+
full_config=full_config,
178+
meta_data=meta_data,
179+
priority=priority,
180+
file_path=file_path,
181+
id=id,
182+
with_commit=with_commit,
183+
)
184+
185+
120186
def create(
121187
org_id: str,
122188
user_id: str,

integration_objects/manager.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
IntegrationPdf,
1515
IntegrationGithubIssue,
1616
IntegrationGithubFile,
17+
CognitionIntegration,
1718
)
1819

1920

@@ -31,12 +32,12 @@ def get(
3132
return query.order_by(IntegrationModel.created_at.desc()).all()
3233

3334

34-
def count(integration_id: str) -> Union[List[object], object]:
35-
IntegrationModel = integration_model(integration_id)
35+
def count(integration: CognitionIntegration) -> int:
36+
IntegrationModel = integration_model(integration=integration)
3637
return (
3738
session.query(IntegrationModel)
3839
.filter(
39-
IntegrationModel.integration_id == integration_id,
40+
IntegrationModel.integration_id == integration.id,
4041
)
4142
.count()
4243
)
@@ -105,8 +106,13 @@ def get_all_by_integration_id(
105106
)
106107

107108

108-
def integration_model(integration_id: str) -> Type:
109-
integration = integration_db_bo.get_by_id(integration_id)
109+
def integration_model(
110+
integration_id: Optional[str] = None,
111+
integration: Optional[CognitionIntegration] = None,
112+
) -> Type:
113+
if not integration_id and not integration:
114+
raise ValueError("Either integration_id or integration must be provided")
115+
integration = integration or integration_db_bo.get_by_id(integration_id)
110116
if integration.type == CognitionIntegrationType.SHAREPOINT.value:
111117
return IntegrationSharepoint
112118
elif integration.type == CognitionIntegrationType.PDF.value:

0 commit comments

Comments
 (0)