Skip to content
18 changes: 15 additions & 3 deletions cognition_objects/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,27 @@ def get_integration_progress(
) -> float:
integration = get_by_id(integration_id)
count_all_records = integration_records_bo.count(integration)
all_tasks = get_all_etl_tasks(integration_id)
finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES]

if (
count_all_records == 0
or integration.state == enums.CognitionMarkdownFileState.FAILED.value
):
return 0.0
integration_progress = round((len(finished_tasks) / count_all_records) * 100.0, 2)

all_tasks = get_all_etl_tasks(integration_id)
finished_tasks = [task for task in all_tasks if task.state in FINISHED_STATES]
count_finished_tasks = len(finished_tasks)

# backward compatibility
if not all_tasks or len(all_tasks) != count_all_records:
all_records, _ = integration_records_bo.get_all_by_integration_id(
integration_id
)
count_finished_tasks += len(
[record for record in all_records if not record.etl_task_id]
)

integration_progress = round((count_finished_tasks / count_all_records) * 100.0, 2)
if integration.state not in FINISHED_STATES:
integration_progress = min(integration_progress - 1, 0)
return integration_progress
Expand Down
11 changes: 8 additions & 3 deletions cognition_objects/markdown_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __get_enriched_query(
category_origin: Optional[str] = None,
query_add: Optional[str] = "",
) -> str:
where_add = " AND (ed.config_ids->>'isDefault')::bool is true"
where_add = ""
if id:
id = prevent_sql_injection(id, isinstance(id, str))
where_add += f" AND md.id = '{id}'"
Expand All @@ -46,7 +46,8 @@ def __get_enriched_query(
md.*,
COALESCE(mf.num_files, 0) AS num_files,
COALESCE(mf.num_reviewed_files, 0) AS num_reviewed_files,
ecp.etl_config
ecp.etl_config,
ecp.id as etl_config_id
FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md
LEFT JOIN (
SELECT dataset_id, COUNT(*) as num_files, COUNT(CASE WHEN is_reviewed = TRUE THEN 1 END) AS num_reviewed_files
Expand All @@ -56,7 +57,7 @@ def __get_enriched_query(
LEFT JOIN(
SELECT md.id, json_array_elements(md.useable_etl_configurations) config_ids
FROM cognition.{Tablenames.MARKDOWN_DATASET.value} md
) ed ON ed.id = md.id
) ed ON ed.id = md.id AND (ed.config_ids->>'isDefault')::bool is true
LEFT JOIN(
SELECT ecp.id, ecp.etl_config
FROM cognition.{Tablenames.ETL_CONFIG_PRESET.value} ecp
Expand Down Expand Up @@ -177,6 +178,7 @@ def update(
dataset_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
useable_etl_configurations: Optional[List[Dict[str, Any]]] = None,
with_commit: bool = True,
) -> CognitionMarkdownDataset:
dataset = get(org_id, dataset_id)
Expand All @@ -187,6 +189,9 @@ def update(
if description:
dataset.description = description

if useable_etl_configurations:
dataset.useable_etl_configurations = useable_etl_configurations

general.flush_or_commit(with_commit)

return dataset
Expand Down
16 changes: 11 additions & 5 deletions cognition_objects/markdown_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,20 @@ def __get_enriched_query(
"etl_task",
"global",
prefix=et_prefix,
include_columns=["is_active", "error_message"],
include_columns=[
"started_at",
"finished_at",
"is_active",
"is_stale",
"llm_ops",
"error_message",
],
)

query = f"""SELECT
{mf_select}, {et_select}, LENGTH({mf_prefix}.content) as content_length,
COALESCE({et_prefix}.state, {mf_prefix}.state) state,
COALESCE({et_prefix}.started_at, {mf_prefix}.started_at) started_at,
COALESCE({et_prefix}.finished_at, {mf_prefix}.finished_at) finished_at
{mf_select}, {et_select}, LENGTH({mf_prefix}.content) AS content_length,
COALESCE({et_prefix}.state, {mf_prefix}.state) AS state,
{et_prefix}.meta_data->>'scope_readable' AS scope_readable
FROM cognition.markdown_file {mf_prefix}
LEFT JOIN global.etl_task {et_prefix} ON {mf_prefix}.etl_task_id = {et_prefix}.id
"""
Expand Down
3 changes: 0 additions & 3 deletions etl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,6 @@ def rm_tree(path: Path):
rm_tree(etl_cache_dir)


# TODO: delete_etl_tasks for related file_reference_id


def get_download_key(org_id: str, download_id: str) -> Path:
return Path(org_id) / download_id / "download"

Expand Down
157 changes: 129 additions & 28 deletions global_objects/etl_task.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from typing import Any, List, Optional, Dict, Union
from typing import Any, List, Optional, Dict, Tuple, Union
from sqlalchemy.sql.expression import cast
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.dialects.postgresql import UUID

import datetime
import mimetypes

from submodules.model import enums
from submodules.model import enums, etl_utils
from submodules.model.session import session
from submodules.model.business_objects import general
from submodules.model.cognition_objects import file_reference as file_reference_co_bo
from submodules.model.models import (
EtlTask,
CognitionIntegration,
IntegrationSharepoint,
)
from submodules.model.util import prevent_sql_injection
from submodules.model.etl_utils import get_hashed_string


FINISHED_STATES = [
Expand All @@ -31,6 +33,33 @@ def get_by_id(id: str) -> EtlTask:
return session.query(EtlTask).filter(EtlTask.id == id).first()


def is_stale(
etl_task_id: Optional[str] = None,
etl_task: Optional[Dict[str, Any]] = None,
) -> bool:
if not etl_task:
if not etl_task_id:
raise ValueError("Either etl_task_id or etl_task must be provided")
etl_task = get_enriched(etl_task_id)
if not etl_task:
return False

file_reference = file_reference_co_bo.get_by_id(
etl_task["organization_id"], etl_task["file_reference_id"]
)
if not file_reference:
return True

new_full_config, tokenizer = etl_utils.get_full_config_and_tokenizer_from_config_id(
file_reference=file_reference,
etl_config_id=etl_task["etl_config_id"],
markdown_file_id=etl_task["markdown_file_id"],
)
return etl_task["tokenizer"] == tokenizer and etl_task[
"full_config_hash"
] != get_hashed_string(new_full_config)


def get_all(
exclude_failed: Optional[bool] = False,
only_active: Optional[bool] = False,
Expand All @@ -45,6 +74,61 @@ def get_all(
return query.order_by(EtlTask.created_at.desc()).all()


def get_enriched(etl_task_id: str) -> Dict[str, Any]:
etl_tasks = get_all_enriched(etl_task_id=etl_task_id)
return etl_tasks[0] if etl_tasks else {}


def get_all_enriched(
exclude_failed: Optional[bool] = False,
only_active: Optional[bool] = False,
only_markdown_files: Optional[bool] = False,
where_add: Optional[str] = "",
etl_task_id: Optional[str] = None,
dataset_id: Optional[str] = None,
) -> List[Dict[str, Any]]:
mf_join = ""
if etl_task_id:
where_add += " AND et.id::TEXT = '{}'".format(
prevent_sql_injection(etl_task_id, True)
)
if dataset_id:
where_add += " AND md.id::TEXT = '{}'".format(
prevent_sql_injection(dataset_id, True)
)
if exclude_failed:
where_add += " AND et.state != '{}'".format(
enums.CognitionMarkdownFileState.FAILED.value
)
if only_active:
where_add += " AND et.is_active IS TRUE"
if only_markdown_files:
mf_join = ""
else:
mf_join = "LEFT"

query = f"""
SELECT
et.*,
mf.id::TEXT AS markdown_file_id,
md.id::TEXT AS dataset_id,
md.config_ids->>'id' AS etl_config_id,
et.meta_data->>'file_reference_id' AS file_reference_id
FROM global.{enums.Tablenames.ETL_TASK.value} et
{mf_join} JOIN (
SELECT id, dataset_id
FROM cognition.{enums.Tablenames.MARKDOWN_FILE.value}
) mf ON et.meta_data->>'markdown_file_id' = mf.id::TEXT
LEFT JOIN(
SELECT id, json_array_elements(useable_etl_configurations) config_ids
FROM cognition.{enums.Tablenames.MARKDOWN_DATASET.value}
) md ON md.id = mf.dataset_id AND (md.config_ids->>'isDefault')::BOOL IS true
WHERE 1=1 {where_add}
ORDER BY et.created_at DESC
"""
return list(map(lambda x: x._asdict(), general.execute_all(query)))


def get_all_in_org(
org_id: str,
exclude_failed: Optional[bool] = False,
Expand Down Expand Up @@ -127,17 +211,14 @@ def get_or_create(
tokenizer: Optional[str] = None,
full_config: Optional[Dict[str, Any]] = None,
file_path: Optional[str] = None,
meta_data: Optional[Dict[str, Any]] = None,
meta_data: Optional[Dict[str, Any]] = {},
priority: Optional[int] = -1,
id: Optional[str] = None,
with_commit: bool = True,
):
) -> Tuple[EtlTask, bool]:
if id:
return get_by_id(id)

file_reference_id = meta_data.get("file_reference_id") if meta_data else None
integration_id = meta_data.get("integration_id") if meta_data else None
markdown_file_id = meta_data.get("markdown_file_id") if meta_data else None
query: EtlTask = session.query(EtlTask).filter(
EtlTask.organization_id == org_id,
EtlTask.original_file_name == original_file_name,
Expand All @@ -146,40 +227,40 @@ def get_or_create(

if file_path:
query = query.filter(EtlTask.file_path == file_path)
if file_reference_id:

if file_reference_id := meta_data.get("file_reference_id"):
query = query.filter(
file_reference_id
== cast(EtlTask.meta_data.op("->>")("file_reference_id"), UUID)
)
if markdown_file_id:
if markdown_file_id := meta_data.get("markdown_file_id"):
query = query.filter(
markdown_file_id
== cast(EtlTask.meta_data.op("->>")("markdown_file_id"), UUID)
)
if integration_id:
if integration_id := meta_data.get("integration_id"):
query = query.filter(
integration_id == cast(EtlTask.meta_data.op("->>")("integration_id"), UUID)
)

# TODO: enhance
if with_commit is False:
return query.first()

if etl_task := query.first():
return etl_task
return etl_task, True

return create(
org_id=org_id,
user_id=user_id,
original_file_name=original_file_name,
file_size_bytes=file_size_bytes,
tokenizer=tokenizer,
full_config=full_config,
meta_data=meta_data,
priority=priority,
file_path=file_path,
id=id,
with_commit=with_commit,
return (
create(
org_id=org_id,
user_id=user_id,
original_file_name=original_file_name,
file_size_bytes=file_size_bytes,
tokenizer=tokenizer,
full_config=full_config,
meta_data=meta_data,
priority=priority,
file_path=file_path,
id=id,
with_commit=with_commit,
),
False,
)


Expand All @@ -205,8 +286,15 @@ def create(
file_size_bytes=file_size_bytes,
tokenizer=tokenizer,
full_config=full_config,
full_config_hash=get_hashed_string(full_config),
meta_data=meta_data,
priority=priority,
llm_ops={
"total_llm_calls": 0,
"total_tokens_input": 0,
"total_tokens_output": 0,
"total_cost_eur": 0.0,
},
)
general.add(etl_task, with_commit)

Expand All @@ -221,13 +309,16 @@ def update(
file_path: Optional[str] = None,
file_size_bytes: Optional[int] = None,
full_config: Optional[Dict] = None,
tokenizer: Optional[str] = None,
started_at: Optional[datetime.datetime] = None,
finished_at: Optional[Union[str, datetime.datetime]] = None,
state: Optional[enums.CognitionMarkdownFileState] = None,
is_active: Optional[bool] = None,
meta_data: Optional[Dict[str, Any]] = None,
priority: Optional[int] = None,
error_message: Optional[str] = None,
is_stale: Optional[bool] = None,
llm_ops: Optional[Dict[str, Any]] = None,
overwrite_meta_data: bool = False,
with_commit: bool = True,
) -> Optional[EtlTask]:
Expand All @@ -244,10 +335,13 @@ def update(
etl_task.file_path = file_path
if file_size_bytes is not None and etl_task.file_size_bytes is None:
etl_task.file_size_bytes = file_size_bytes
if tokenizer is not None:
etl_task.tokenizer = tokenizer
if original_file_name is not None and etl_task.original_file_name is None:
etl_task.original_file_name = original_file_name
if full_config is not None:
etl_task.full_config = full_config
etl_task.full_config_hash = get_hashed_string(full_config)
flag_modified(etl_task, "full_config")
if started_at is not None:
etl_task.started_at = started_at
Expand All @@ -260,6 +354,14 @@ def update(
etl_task.state = state.value
if is_active is not None:
etl_task.is_active = is_active
if is_stale is not None:
etl_task.is_stale = is_stale
if llm_ops is not None:
if overwrite_meta_data:
etl_task.llm_ops = llm_ops
else:
etl_task.llm_ops.update(llm_ops)
flag_modified(etl_task, "llm_ops")
if meta_data is not None:
if overwrite_meta_data:
etl_task.meta_data = meta_data
Expand Down Expand Up @@ -292,7 +394,6 @@ def execution_finished(id: str) -> bool:


def delete_many(ids: List[str], with_commit: bool = True) -> None:
# TODO: cascade delete cached files
(
session.query(EtlTask)
.filter(EtlTask.id.in_(ids))
Expand Down
Loading