diff --git a/README.md b/README.md index 1b51d680..a2561726 100644 --- a/README.md +++ b/README.md @@ -61,13 +61,14 @@ Furthermore, GraphGen incorporates multi-hop neighborhood sampling to capture co After data generation, you can use [LLaMA-Factory](https://github.com/hiyouga/LLaMA-Factory) and [xtuner](https://github.com/InternLM/xtuner) to finetune your LLMs. ## 📌 Latest Updates +- **2026.02.04**: We support HuggingFace Datasets as input data source for data generation now. - **2026.01.15**: **LLM benchmark synthesis** now supports single/multiple-choice & fill-in-the-blank & true-or-false—ideal for education 🌟🌟 - **2025.12.26**: Knowledge graph evaluation metrics about accuracy (entity/relation), consistency (conflict detection), structural robustness (noise, connectivity, degree distribution) -- **2025.12.16**: Added [rocksdb](https://github.com/facebook/rocksdb) for key-value storage backend and [kuzudb](https://github.com/kuzudb/kuzu) for graph database backend support.
History +- **2025.12.16**: Added [rocksdb](https://github.com/facebook/rocksdb) for key-value storage backend and [kuzudb](https://github.com/kuzudb/kuzu) for graph database backend support. - **2025.12.16**: Added [vllm](https://github.com/vllm-project/vllm) for local inference backend support. - **2025.12.16**: Refactored the data generation pipeline using [ray](https://github.com/ray-project/ray) to improve the efficiency of distributed execution and resource management. - **2025.12.1**: Added search support for [NCBI](https://www.ncbi.nlm.nih.gov/) and [RNAcentral](https://rnacentral.org/) databases, enabling extraction of DNA and RNA data from these bioinformatics databases. diff --git a/README_zh.md b/README_zh.md index 9ed66fed..6c964d65 100644 --- a/README_zh.md +++ b/README_zh.md @@ -62,14 +62,15 @@ GraphGen 首先根据源文本构建细粒度的知识图谱,然后利用期 在数据生成后,您可以使用[LLaMA-Factory](https://github.com/hiyouga/LLaMA-Factory) 和 [xtuner](https://github.com/InternLM/xtuner)对大语言模型进行微调。 ## 📌 最新功能 +- **2026.02.04**:支持使用直接读入 HuggingFace 数据集进行数据生成 - **2026.01.15**:合成垂域评测数据(单选题、多选题、填空题和判断题型)🌟🌟 - **2025.12.26**:引入知识图谱评估指标,包括准确度评估(实体/关系抽取质量)、一致性评估(冲突检测)和结构鲁棒性评估(噪声比、连通性、度分布) -- **2025.12.16**:支持 [rocksdb](https://github.com/facebook/rocksdb) 作为键值存储后端, [kuzudb](https://github.com/kuzudb/kuzu) 作为图数据库后端
历史更新记录 +- **2025.12.16**:支持 [rocksdb](https://github.com/facebook/rocksdb) 作为键值存储后端, [kuzudb](https://github.com/kuzudb/kuzu) 作为图数据库后端。 - **2025.12.16**:支持 [vllm](https://github.com/vllm-project/vllm) 作为本地推理后端。 - **2025.12.16**:使用 [ray](https://github.com/ray-project/ray) 重构了数据生成 pipeline,提升了分布式执行和资源管理的效率。 - **2025.12.1**:新增对 [NCBI](https://www.ncbi.nlm.nih.gov/) 和 [RNAcentral](https://rnacentral.org/) 数据库的检索支持,现在可以从这些生物信息学数据库中提取DNA和RNA数据。 diff --git a/examples/generate/generate_aggregated_qa/generate_aggregated_from_hf.sh b/examples/generate/generate_aggregated_qa/generate_aggregated_from_hf.sh new file mode 100644 index 00000000..54fee204 --- /dev/null +++ b/examples/generate/generate_aggregated_qa/generate_aggregated_from_hf.sh @@ -0,0 +1,2 @@ +python3 -m graphgen.run \ +--config_file examples/generate/generate_aggregated_qa/huggingface_config.yaml diff --git a/examples/generate/generate_aggregated_qa/huggingface_config.yaml b/examples/generate/generate_aggregated_qa/huggingface_config.yaml new file mode 100644 index 00000000..1bb60ef9 --- /dev/null +++ b/examples/generate/generate_aggregated_qa/huggingface_config.yaml @@ -0,0 +1,83 @@ +global_params: + working_dir: cache + graph_backend: networkx # graph database backend, support: kuzu, networkx + kv_backend: json_kv # key-value store backend, support: rocksdb, json_kv + +nodes: + - id: read_hf_dataset # Read from Hugging Face Hub + op_name: read + type: source + dependencies: [] + params: + input_path: + - huggingface://wikitext:wikitext-103-v1:train # Format: huggingface://dataset_name:subset:split + # Optional parameters for HuggingFaceReader: + text_column: text # Column name containing text content (default: content) + # cache_dir: /path/to/cache # Optional: directory to cache downloaded datasets + # trust_remote_code: false # Optional: whether to trust remote code in datasets + + - id: chunk_documents + op_name: chunk + type: map_batch + dependencies: + - read_hf_dataset + execution_params: + replicas: 4 + params: + chunk_size: 1024 + chunk_overlap: 100 + + - id: build_kg + op_name: build_kg + type: map_batch + dependencies: + - chunk_documents + execution_params: + replicas: 1 + batch_size: 128 + + - id: quiz + op_name: quiz + type: map_batch + dependencies: + - build_kg + execution_params: + replicas: 1 + batch_size: 128 + params: + quiz_samples: 2 + + - id: judge + op_name: judge + type: map_batch + dependencies: + - quiz + execution_params: + replicas: 1 + batch_size: 128 + + - id: partition + op_name: partition + type: aggregate + dependencies: + - judge + params: + method: ece + method_params: + max_units_per_community: 20 + min_units_per_community: 5 + max_tokens_per_community: 10240 + unit_sampling: max_loss + + - id: generate + op_name: generate + type: map_batch + dependencies: + - partition + execution_params: + replicas: 1 + batch_size: 128 + save_output: true + params: + method: aggregated + data_format: ChatML diff --git a/graphgen/models/__init__.py b/graphgen/models/__init__.py index 2381d9b1..6b75587c 100644 --- a/graphgen/models/__init__.py +++ b/graphgen/models/__init__.py @@ -33,6 +33,7 @@ ) from .reader import ( CSVReader, + HuggingFaceReader, JSONReader, ParquetReader, PDFReader, @@ -92,6 +93,7 @@ "PickleReader": ".reader", "RDFReader": ".reader", "TXTReader": ".reader", + "HuggingFaceReader": ".reader", # Searcher "NCBISearch": ".searcher.db.ncbi_searcher", "RNACentralSearch": ".searcher.db.rnacentral_searcher", diff --git a/graphgen/models/reader/__init__.py b/graphgen/models/reader/__init__.py index 220460c3..1bb9a45f 100644 --- a/graphgen/models/reader/__init__.py +++ b/graphgen/models/reader/__init__.py @@ -1,4 +1,5 @@ from .csv_reader import CSVReader +from .huggingface_reader import HuggingFaceReader from .json_reader import JSONReader from .parquet_reader import ParquetReader from .pdf_reader import PDFReader diff --git a/graphgen/models/reader/huggingface_reader.py b/graphgen/models/reader/huggingface_reader.py new file mode 100644 index 00000000..3d3cd938 --- /dev/null +++ b/graphgen/models/reader/huggingface_reader.py @@ -0,0 +1,201 @@ +""" +Hugging Face Datasets Reader +This module provides a reader for accessing datasets from Hugging Face Hub. +""" + +from typing import TYPE_CHECKING, List, Optional, Union + +from graphgen.bases.base_reader import BaseReader + +if TYPE_CHECKING: + import numpy as np + import ray + from ray.data import Dataset + + +class HuggingFaceReader(BaseReader): + """ + Reader for Hugging Face Datasets. + + Supports loading datasets from the Hugging Face Hub. + Can specify a dataset by name and optional subset/split. + + Columns: + - type: The type of the document (e.g., "text", "image", etc.) + - if type is "text", "content" column must be present (or specify via text_column). + + Example: + reader = HuggingFaceReader(text_column="text") + ds = reader.read("wikitext") + # or with split and subset + ds = reader.read("wikitext:wikitext-103-v1:train") + """ + + def __init__( + self, + text_column: str = "content", + modalities: Optional[list] = None, + cache_dir: Optional[str] = None, + trust_remote_code: bool = False, + ): + """ + Initialize HuggingFaceReader. + + :param text_column: Column name containing text content + :param modalities: List of supported modalities + :param cache_dir: Directory to cache downloaded datasets + :param trust_remote_code: Whether to trust remote code in datasets + """ + super().__init__(text_column=text_column, modalities=modalities) + self.cache_dir = cache_dir + self.trust_remote_code = trust_remote_code + + def read( + self, + input_path: Union[str, List[str]], + split: Optional[str] = None, + subset: Optional[str] = None, + streaming: bool = False, + limit: Optional[int] = None, + ) -> "Dataset": + """ + Read dataset from Hugging Face Hub. + + :param input_path: Dataset identifier(s) from Hugging Face Hub + Format: "dataset_name" or "dataset_name:subset:split" + Example: "wikitext" or "wikitext:wikitext-103-v1:train" + :param split: Specific split to load (overrides split in path) + :param subset: Specific subset/configuration to load (overrides subset in path) + :param streaming: Whether to stream the dataset instead of downloading + :param limit: Maximum number of samples to load + :return: Ray Dataset containing the data + """ + try: + import datasets as hf_datasets + except ImportError as exc: + raise ImportError( + "The 'datasets' package is required to use HuggingFaceReader. " + "Please install it with: pip install datasets" + ) from exc + + if isinstance(input_path, list): + # Handle multiple datasets + all_dss = [] + for path in input_path: + ds = self._load_single_dataset( + path, + split=split, + subset=subset, + streaming=streaming, + limit=limit, + hf_datasets=hf_datasets, + ) + all_dss.append(ds) + + if len(all_dss) == 1: + combined_ds = all_dss[0] + else: + combined_ds = all_dss[0].union(*all_dss[1:]) + else: + combined_ds = self._load_single_dataset( + input_path, + split=split, + subset=subset, + streaming=streaming, + limit=limit, + hf_datasets=hf_datasets, + ) + + # Validate and filter + combined_ds = combined_ds.map_batches( + self._validate_batch, batch_format="pandas" + ) + combined_ds = combined_ds.filter(self._should_keep_item) + + return combined_ds + + def _load_single_dataset( + self, + dataset_path: str, + split: Optional[str] = None, + subset: Optional[str] = None, + streaming: bool = False, + limit: Optional[int] = None, + hf_datasets=None, + ) -> "Dataset": + """ + Load a single dataset from Hugging Face Hub. + + :param dataset_path: Dataset path, can include subset and split + :param split: Override split + :param subset: Override subset + :param streaming: Whether to stream + :param limit: Max samples + :param hf_datasets: Imported datasets module + :return: Ray Dataset + """ + import numpy as np + import ray + + # Parse dataset path format: "dataset_name:subset:split" + parts = dataset_path.split(":") + dataset_name = parts[0] + parsed_subset = parts[1] if len(parts) > 1 else None + parsed_split = parts[2] if len(parts) > 2 else None + + # Override with explicit parameters + final_subset = subset or parsed_subset + final_split = split or parsed_split or "train" + + # Load dataset from Hugging Face + load_kwargs = { + "cache_dir": self.cache_dir, + "trust_remote_code": self.trust_remote_code, + "streaming": streaming, + } + + if final_subset: + load_kwargs["name"] = final_subset + + hf_dataset = hf_datasets.load_dataset( + dataset_name, split=final_split, **load_kwargs + ) + + # Apply limit before converting to Ray dataset for memory efficiency + if limit: + if streaming: + hf_dataset = hf_dataset.take(limit) + else: + hf_dataset = hf_dataset.select(range(limit)) + + # Convert to Ray dataset using lazy evaluation + ray_ds = ray.data.from_huggingface(hf_dataset) + + # Define batch processing function for lazy evaluation + def _process_batch(batch: dict[str, "np.ndarray"]) -> dict[str, "np.ndarray"]: + """ + Process a batch of data to add type field and rename text column. + + :param batch: A dictionary with column names as keys and numpy arrays + :return: Processed batch dictionary with numpy arrays + """ + if not batch: + return {} + + # Get the number of rows in the batch + num_rows = len(next(iter(batch.values()))) + + # Add type field if not present + if "type" not in batch: + batch["type"] = np.array(["text"] * num_rows) + + # Rename text_column to 'content' if different + if self.text_column != "content" and self.text_column in batch: + batch["content"] = batch.pop(self.text_column) + + return batch + + # Apply post-processing using map_batches for distributed lazy evaluation + ray_ds = ray_ds.map_batches(_process_batch) + + return ray_ds diff --git a/graphgen/operators/read/read.py b/graphgen/operators/read/read.py index ec623a76..3a2b585a 100644 --- a/graphgen/operators/read/read.py +++ b/graphgen/operators/read/read.py @@ -4,6 +4,7 @@ from graphgen.common.init_storage import init_storage from graphgen.models import ( CSVReader, + HuggingFaceReader, JSONReader, ParquetReader, PDFReader, @@ -51,6 +52,103 @@ def _build_reader(suffix: str, cache_dir: str | None, **reader_kwargs): return reader_cls(**reader_kwargs) +def _process_huggingface_datasets(hf_uris: List[str], reader_kwargs: dict) -> list: + """Process HuggingFace datasets and return list of Ray datasets.""" + logger.info("[READ] Processing HuggingFace datasets: %s", hf_uris) + hf_reader = HuggingFaceReader(**reader_kwargs) + read_tasks = [] + for hf_uri in hf_uris: + # Parse URI format: "huggingface://dataset_name:subset:split" + uri_part = hf_uri.replace("huggingface://", "") + ds = hf_reader.read(uri_part) + read_tasks.append(ds) + logger.info("[READ] Successfully loaded %d HuggingFace dataset(s)", len(hf_uris)) + return read_tasks + + +def _process_local_files( + local_paths: List[str], + allowed_suffix: Optional[List[str]], + kv_backend: str, + working_dir: str, + parallelism: int, + recursive: bool, + reader_kwargs: dict, +) -> list: + """Process local files and return list of Ray datasets.""" + logger.info("[READ] Scanning local paths: %s", local_paths) + read_tasks = [] + input_path_cache = init_storage( + backend=kv_backend, working_dir=working_dir, namespace="input_path" + ) + with ParallelFileScanner( + input_path_cache=input_path_cache, + allowed_suffix=allowed_suffix, + rescan=False, + max_workers=parallelism if parallelism > 0 else 1, + ) as scanner: + all_files = [] + scan_results = scanner.scan(local_paths, recursive=recursive) + + for result in scan_results.values(): + all_files.extend(result.get("files", [])) + + logger.info("[READ] Found %d files to process", len(all_files)) + + if all_files: + # Group files by suffix to use appropriate reader + files_by_suffix = {} + for file_info in all_files: + suffix = Path(file_info["path"]).suffix.lower().lstrip(".") + if allowed_suffix and suffix not in [ + s.lower().lstrip(".") for s in allowed_suffix + ]: + continue + files_by_suffix.setdefault(suffix, []).append(file_info["path"]) + + # Create read tasks for files + for suffix, file_paths in files_by_suffix.items(): + reader = _build_reader(suffix, working_dir, **reader_kwargs) + ds = reader.read(file_paths) + read_tasks.append(ds) + + return read_tasks + + +def _combine_datasets( + read_tasks: list, + read_nums: Optional[int], + read_storage, + input_path: Union[str, List[str]], +) -> "ray.data.Dataset": + """Combine datasets and apply post-processing.""" + combined_ds = ( + read_tasks[0] if len(read_tasks) == 1 else read_tasks[0].union(*read_tasks[1:]) + ) + + if read_nums is not None: + combined_ds = combined_ds.limit(read_nums) + + def add_trace_id(batch): + batch["_trace_id"] = batch.apply( + lambda row: compute_dict_hash(row, prefix="read-"), axis=1 + ) + records = batch.to_dict(orient="records") + data_to_upsert = {record["_trace_id"]: record for record in records} + read_storage.upsert(data_to_upsert) + read_storage.index_done_callback() + return batch + + combined_ds = combined_ds.map_batches(add_trace_id, batch_format="pandas") + + # sample record + for i, item in enumerate(combined_ds.take(1)): + logger.debug("[READ] Sample record %d: %s", i, item) + + logger.info("[READ] Successfully read data from %s", input_path) + return combined_ds + + def read( input_path: Union[str, List[str]], allowed_suffix: Optional[List[str]] = None, @@ -63,8 +161,11 @@ def read( ) -> "ray.data.Dataset": """ Unified entry point to read files of multiple types using Ray Data. + Supports both local files and Hugging Face datasets. - :param input_path: File or directory path(s) to read from + :param input_path: File or directory path(s) to read from, or HuggingFace dataset URIs + Format for HuggingFace: "huggingface://dataset_name:subset:split" + Example: "huggingface://wikitext:wikitext-103-v1:train" :param allowed_suffix: List of allowed file suffixes (e.g., ['pdf', 'txt']) :param working_dir: Directory to cache intermediate files (PDF processing) :param kv_backend: Backend for key-value storage @@ -76,80 +177,53 @@ def read( """ import ray - input_path_cache = init_storage( - backend=kv_backend, working_dir=working_dir, namespace="input_path" - ) + # Convert single input_path to list for uniform processing + if isinstance(input_path, str): + input_paths = [input_path] + else: + input_paths = input_path + + # Separate HuggingFace URIs from local file paths + hf_uris = [] + local_paths = [] + for path in input_paths: + if isinstance(path, str) and path.startswith("huggingface://"): + hf_uris.append(path) + else: + local_paths.append(path) + read_storage = init_storage( backend=kv_backend, working_dir=working_dir, namespace="read" ) - try: - # 1. Scan all paths to discover files - logger.info("[READ] Scanning paths: %s", input_path) - with ParallelFileScanner( - input_path_cache=input_path_cache, - allowed_suffix=allowed_suffix, - rescan=False, - max_workers=parallelism if parallelism > 0 else 1, - ) as scanner: - all_files = [] - scan_results = scanner.scan(input_path, recursive=recursive) - - for result in scan_results.values(): - all_files.extend(result.get("files", [])) - - logger.info("[READ] Found %d files to process", len(all_files)) - - if not all_files: - raise ValueError("No files found to read.") - - # 2. Group files by suffix to use appropriate reader - files_by_suffix = {} - for file_info in all_files: - suffix = Path(file_info["path"]).suffix.lower().lstrip(".") - if allowed_suffix and suffix not in [ - s.lower().lstrip(".") for s in allowed_suffix - ]: - continue - files_by_suffix.setdefault(suffix, []).append(file_info["path"]) - # 3. Create read tasks - read_tasks = [] - for suffix, file_paths in files_by_suffix.items(): - reader = _build_reader(suffix, working_dir, **reader_kwargs) - ds = reader.read(file_paths) - read_tasks.append(ds) - - # 4. Combine all datasets - if not read_tasks: - raise ValueError("No datasets created from the provided files.") - - if len(read_tasks) == 1: - combined_ds = read_tasks[0] - else: - combined_ds = read_tasks[0].union(*read_tasks[1:]) - - if read_nums is not None: - combined_ds = combined_ds.limit(read_nums) - - def add_trace_id(batch): - batch["_trace_id"] = batch.apply( - lambda row: compute_dict_hash(row, prefix="read-"), axis=1 + try: + read_tasks = [] + + # 1. Process HuggingFace datasets if any + if hf_uris: + read_tasks.extend(_process_huggingface_datasets(hf_uris, reader_kwargs)) + + # 2. Process local file paths if any + if local_paths: + read_tasks.extend( + _process_local_files( + local_paths, + allowed_suffix, + kv_backend, + working_dir, + parallelism, + recursive, + reader_kwargs, ) - records = batch.to_dict(orient="records") - data_to_upsert = {record["_trace_id"]: record for record in records} - read_storage.upsert(data_to_upsert) - read_storage.index_done_callback() - return batch - - combined_ds = combined_ds.map_batches(add_trace_id, batch_format="pandas") + ) - # sample record - for i, item in enumerate(combined_ds.take(1)): - logger.debug("[READ] Sample record %d: %s", i, item) + # 3. Validate we have at least one dataset + if not read_tasks: + raise ValueError("No datasets created from the provided input paths.") - logger.info("[READ] Successfully read files from %s", input_path) - return combined_ds + # 4. Combine and process datasets + return _combine_datasets(read_tasks, read_nums, read_storage, input_path) except Exception as e: - logger.error("[READ] Failed to read files from %s: %s", input_path, e) + logger.error("[READ] Failed to read data from %s: %s", input_path, e) raise