diff --git a/dsms/knowledge/data_format.py b/dsms/knowledge/data_format.py new file mode 100644 index 0000000..6f1960a --- /dev/null +++ b/dsms/knowledge/data_format.py @@ -0,0 +1,11 @@ +"""Data Formats""" + +from enum import Enum + + +class DataFormat(Enum): + """Data formats""" + + JSON = "json" + YAML = "yaml" + HDF5 = "hdf5" diff --git a/dsms/knowledge/kitem.py b/dsms/knowledge/kitem.py index abf4193..50567ff 100644 --- a/dsms/knowledge/kitem.py +++ b/dsms/knowledge/kitem.py @@ -1,5 +1,6 @@ """Knowledge Item implementation of the DSMS""" +import json import logging import warnings from datetime import datetime @@ -9,6 +10,7 @@ from uuid import UUID, uuid4 import pandas as pd +import yaml from rdflib import Graph from pydantic import ( # isort:skip @@ -48,6 +50,8 @@ UserGroupsProperty, ) +from dsms.knowledge.data_format import DataFormat # isort:skip + from dsms.knowledge.ktype import KType # isort:skip from dsms.knowledge.utils import ( # isort:skip @@ -655,3 +659,41 @@ def is_a(self, to_be_compared: KType) -> bool: def refresh(self) -> None: """Refresh the KItem""" _refresh_kitem(self) + + def export(self, data_format: DataFormat) -> Any: + """Export kitems to different formats""" + + if data_format == DataFormat.HDF5: + from dsms.knowledge.knowledge_wrapper import ( # isort:skip + data_to_dict, + dict_to_hdf5, + ) + + return dict_to_hdf5(data_to_dict(self)) + + if data_format == DataFormat.JSON: + from dsms.knowledge.knowledge_wrapper import data_to_dict + + return json.dumps(data_to_dict(self)) + + if data_format == DataFormat.YAML: + from dsms.knowledge.knowledge_wrapper import data_to_dict + + return yaml.dump(data_to_dict(self), default_flow_style=False) + + raise ValueError(f"Unsupported data format: {data_format}") + + def import_kitem(data, data_format: DataFormat) -> Any: + """Import objects in different formats to KItem""" + + if data_format == DataFormat.HDF5: + from dsms.knowledge.knowledge_wrapper import hdf5_to_dict + + return hdf5_to_dict(data) + + if data_format == DataFormat.JSON: + return json.load(data) + if data_format == DataFormat.YAML: + return yaml.safe_load(data) + + raise ValueError(f"Unsupported data format: {data_format}") diff --git a/dsms/knowledge/knowledge_wrapper.py b/dsms/knowledge/knowledge_wrapper.py new file mode 100644 index 0000000..f782ce8 --- /dev/null +++ b/dsms/knowledge/knowledge_wrapper.py @@ -0,0 +1,189 @@ +"""Wrapper for data conversion to and from different data formats""" + +import base64 +import io +from typing import Any + +import h5py +import numpy as np +from pydantic import BaseModel + + +def data_to_dict(data) -> Any: + """Convert data to python dictionary""" + + data_dict = {} + + def handle_value(key, value): + """Handles the values under different scenarios""" + + result = None # Default value for result + + # Handle special cases based on 'key' and 'value' + if not isinstance( + value, (int, float, str, bytes, bool, type(None)) + ) and hasattr(value, "__dict__"): + result = data_to_dict(value) + + elif key == "id": + result = str(value) + + elif key == "summary": + summary = getattr(data, "summary", None) + result = summary.text if summary else None + + elif key == "dataframe": + dataframe = getattr(data, "dataframe", None) + if dataframe: + result = dataframe.to_df().to_json() + + elif key == "file": + avatar = getattr(data, "avatar", None) + if avatar: + image = avatar.download() + image_bytes = io.BytesIO() + image.save(image_bytes, format="PNG") + image_bytes.seek(0) + result = base64.b64encode(image_bytes.getvalue()).decode( + "utf-8" + ) + + elif key == "subgraph" and value is not None: + result = value.serialize() + + elif key == "content": + content = data.download().encode("utf-8") + bytes_io = io.BytesIO(content) if content else None + result = base64.b64encode(bytes_io.getvalue()).decode("utf-8") + + # Process the value for other cases (lists, dicts, models, etc.) + if result is None: + if isinstance(value, (int, float, str, bytes, bool, type(None))): + result = str(value) + elif isinstance(value, list): + result = [handle_value(key, v) for v in value] + elif isinstance(value, dict): + result = {k: handle_value(k, v) for k, v in value.items()} + elif isinstance(value, BaseModel): + result = { + k: handle_value(k, v) + for k, v in value.model_dump().items() + } + elif isinstance(value, io.BytesIO): + result = base64.b64encode(value.getvalue()).decode("utf-8") + + return result + + for k, v in data.model_dump().items(): + if k == "attachments": + for attachment in getattr(data, "attachments"): + data_dict.setdefault("attachments", []).append( + handle_value(k, attachment) + ) + continue + if k == "linked_kitems": + for linked_kitem in getattr(data, "linked_kitems"): + item = {} + for key in ["id", "name", "slug", "ktype_id"]: + value = getattr(linked_kitem, key) + item[key] = str(value) + data_dict.setdefault("linked_kitems", []).append(item) + continue + data_dict[k] = handle_value(k, v) + + return data_dict + + +def dict_to_hdf5(dict_data): + """Converts data from a dictionary to HDF5""" + byte_data = io.BytesIO() + + # Create an HDF5 file in memory + with h5py.File(byte_data, "w") as f: + # Recursively add dictionary contents + def add_to_hdf5(data, group): + for key, value in data.items(): + if isinstance(value, dict): + # Handle nested dictionaries recursively + subgroup = group.create_group(key) + add_to_hdf5(value, subgroup) + elif isinstance(value, list): + # Handle lists, check if the list contains dictionaries + subgroup = group.create_group(key) + for idx, item in enumerate(value): + if isinstance(item, dict): + item_group = subgroup.create_group(f"item_{idx}") + add_to_hdf5(item, item_group) + else: + subgroup.create_dataset(f"item_{idx}", data=item) + elif value is not None: + group.create_dataset(key, data=value) + else: + group.create_dataset(key, data="") + + # Add data to the root group + add_to_hdf5(dict_data, f) + + # Get the bytes data from the memory buffer + byte_data.seek(0) + return byte_data.read() + + +def hdf5_to_dict(hdf5_file: io.BytesIO) -> dict: + """Convert an HDF5 file into a Python dictionary.""" + + def decode_if_bytes(value): + """Decode bytes to string if needed.""" + if isinstance(value, bytes): + return value.decode("utf-8") + if isinstance(value, np.ndarray) and value.dtype.type is np.bytes_: + return [elem.decode("utf-8") for elem in value.tolist()] + return value + + def convert_numpy(obj): + """Convert numpy data types to native Python types.""" + if isinstance(obj, np.generic): + return obj.item() + if isinstance(obj, dict): + return {key: convert_numpy(value) for key, value in obj.items()} + if isinstance(obj, list): + return [convert_numpy(item) for item in obj] + return obj + + def read_group(group): + """Recursively read HDF5 groups, grouping 'item_X' keys into lists efficiently.""" + data_dict = {} + grouped_items = [] + + for key, value in group.attrs.items(): + data_dict[key] = decode_if_bytes(value) + + for key, dataset in group.items(): + if isinstance(dataset, h5py.Dataset): + data = dataset[()] + if isinstance(data, np.ndarray) and data.dtype == np.uint8: + try: + value = data.tobytes().decode() + except UnicodeDecodeError: + value = data.tobytes() + elif isinstance(data, np.ndarray): + value = decode_if_bytes(data.tolist()) + else: + value = decode_if_bytes(data) + + elif isinstance(dataset, h5py.Group): + value = read_group(dataset) + + if key.startswith("item_") and key[5:].isdigit(): + grouped_items.append(value) + else: + data_dict[key] = value + + # If there are grouped items, store them correctly + if grouped_items: + return grouped_items + + return convert_numpy(data_dict) + + with h5py.File(hdf5_file, "r") as hdf: + return read_group(hdf) diff --git a/dsms/knowledge/ktype.py b/dsms/knowledge/ktype.py index c4e2f3c..c8dbb46 100644 --- a/dsms/knowledge/ktype.py +++ b/dsms/knowledge/ktype.py @@ -1,13 +1,16 @@ """KItem types""" +import json import logging from datetime import datetime from typing import TYPE_CHECKING, Any, Optional, Union from uuid import UUID +import yaml from pydantic import BaseModel, Field, model_serializer from dsms.core.logging import handler +from dsms.knowledge.data_format import DataFormat from dsms.knowledge.utils import _ktype_exists, _refresh_ktype, print_ktype from dsms.knowledge.webform import Webform @@ -137,3 +140,42 @@ def serialize(self): ) for key, value in self.__dict__.items() } + + def export(self, data_format: DataFormat) -> Any: + """Export ktypes to different formats""" + + if data_format == DataFormat.HDF5: + from dsms.knowledge.knowledge_wrapper import ( # isort:skip + data_to_dict, + dict_to_hdf5, + ) + + return dict_to_hdf5(data_to_dict(self)) + + if data_format == DataFormat.JSON: + from dsms.knowledge.knowledge_wrapper import data_to_dict + + return json.dumps(data_to_dict(self)) + + if data_format == DataFormat.YAML: + from dsms.knowledge.knowledge_wrapper import data_to_dict + + return yaml.dump(data_to_dict(self), default_flow_style=False) + + raise ValueError(f"Unsupported data format: {data_format}") + + def import_ktype(data, data_format: DataFormat) -> Any: + """Import objects in different formats to KType""" + + if data_format == DataFormat.HDF5: + from dsms.knowledge.knowledge_wrapper import hdf5_to_dict + + return hdf5_to_dict(data) + + if data_format == DataFormat.JSON: + return json.load(data) + + if data_format == DataFormat.YAML: + return yaml.safe_load(data) + + raise ValueError(f"Unsupported data format: {data_format}") diff --git a/setup.cfg b/setup.cfg index 7a3ef24..2e7f80b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,6 +21,7 @@ packages = find: install_requires = PyYAML>=6,<7 click>=8,<9 + h5py>=3,<4 html5lib>=1,<2 lru-cache<1 oyaml==1