Skip to content

Conversation

@WangCHEN9
Copy link

Introduce classes and functions for extracting data from SharePoint lists and files, including configuration for file types and paths. This implementation supports both list and file data extraction into a DuckDB pipeline.

@WangCHEN9
Copy link
Author

sources\sharepoint_pipeline.py tested locally OK

@anuunchin
Copy link
Collaborator

anuunchin commented Nov 17, 2025

Hi @WangCHEN9, thanks for the contribution! Could add the tests you have written for the source?

btw, please rebase on master, some import failures were fixed which were causing ci errors

@anuunchin anuunchin added the question Further information is requested label Nov 17, 2025
sd41847 added 2 commits December 8, 2025 15:19
- Updated `pyproject.toml` to include SharePoint dependencies.
- Created `README.md` for SharePoint source documentation.
- Implemented SharePoint client in `helpers.py` for API interaction.
- Added configuration classes in `sharepoint_files_config.py` for lists and files.
- Developed extraction functions in `__init__.py` for SharePoint lists and files.
- Created unit tests for SharePoint source in `test_sharepoint_source.py`.
- Added requirements file for SharePoint source dependencies.
@WangCHEN9
Copy link
Author

Hi @WangCHEN9, thanks for the contribution! Could add the tests you have written for the source?

btw, please rebase on master, some import failures were fixed which were causing ci errors

Added tests

@anuunchin anuunchin self-assigned this Dec 9, 2025
Copy link
Collaborator

@anuunchin anuunchin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works well, but a few improvements are needed imo 🙂. Mainly around pagination (I know it was marked as a todo, but it should be fairly easy to add here), some edge cases that could cause runtime errors, and simplifying parts to better follow dlt conventions

If you have any questions, I’m happy to discuss further here or in the community Slack

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not specific to this file - would be great if you run make format and make lint, and address any issues they surface, otherwise the linter checks will fail

Comment on lines +36 to +49
def get_pd_function(self):
"""Get the pandas read function for this file type.

Returns:
Callable pandas read function (e.g., pd.read_csv, pd.read_excel)
"""
return {
self.EXCEL: pd.read_excel,
self.CSV: pd.read_csv,
self.JSON: pd.read_json,
self.PARQUET: pd.read_parquet,
self.SAS: pd.read_sas,
self.SPSS: pd.read_spss,
}[self]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like FileType.SAV is missing here

Comment on lines +209 to +212
else:
logger.warning(
f"No items found in list: {list_title}, with select: {select}"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be returning None, which is not reflected in the return type signature of the function - this is probably being raised by the linter as well

Comment on lines +109 to +110
else:
logger.warning(f"No subsite found in {url}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to the issue with the get_items_from_list function returning None

Comment on lines +176 to +181
filtered_lists = [
x
for x in all_lists
if x.get("list", {}).get("template") == "genericList"
and "Lists" in x.get("webUrl", "")
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems redundant, as self.get_all_lists_in_site already handles the filtering

list_title=sharepoint_list_config.list_title,
select=sharepoint_list_config.select,
)
yield from data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since get_items_from_list may currently return None, this will raise a NonType not iterable error

Comment on lines +155 to +159
if chunksize:
with file_item["pd_function"](file_io, **file_item["pd_kwargs"]) as reader:
for num, chunk in enumerate(reader):
logger.info(f"Processing chunk {num} of {file_item['name']}")
yield chunk
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will only work with pd.read_csv

)
logger.info(f"Connected to SharePoint site id: {self.site_id} successfully")
else:
raise ConnectionError("Connection failed : ", token_response)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the intention here to raise a tuple, not an error string?

Comment on lines +114 to +138
def get_files(
sharepoint_files_config: SharepointFilesConfig,
last_update_timestamp: dlt.sources.incremental = dlt.sources.incremental(
cursor_path="lastModifiedDateTime",
initial_value="2020-01-01T00:00:00Z",
primary_key=(),
),
):
current_last_value = last_update_timestamp.last_value
logger.debug(f"current_last_value: {current_last_value}")
for file_item in client.get_files_from_path(
folder_path=sharepoint_files_config.folder_path,
file_name_startswith=sharepoint_files_config.file_name_startswith,
pattern=sharepoint_files_config.pattern,
):
logger.debug(
"filtering files based on lastModifiedDateTime, compare to last_value:"
f" {current_last_value}"
)
if (
file_item["lastModifiedDateTime"] > current_last_value
or not sharepoint_files_config.is_file_incremental
):
logger.info(
f"Processing file after lastModifiedDateTime filter: {file_item['name']}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering what the reason for doing manual incremental filtering here is - I'm thinking something like this is more idiomatic and readable:

@dlt.source(name="sharepoint_files", max_table_nesting=0)
def sharepoint_files(
    folder_path: str = dlt.config.value,
    table_name: str = dlt.config.value,
    file_type: str = dlt.config.value,
    file_name_prefix: Optional[str] = dlt.config.value,
    file_name_pattern: Optional[str] = dlt.config.value,
    pandas_kwargs: Optional[Dict] = dlt.config.value,
    credentials: SharepointCredentials = dlt.secrets.value,
):
    folder_path = validate_folder_path(folder_path)
    file_type_enum = FileType(file_type)
    
    client = SharepointClient(**credentials)
    client.connect()

    @dlt.resource(name=f"{table_name}__files", selected=False)
    def file_items(
        last_modified: dlt.sources.incremental[str] = dlt.sources.incremental(
            cursor_path="lastModifiedDateTime",
            initial_value="1970-01-01T00:00:00Z",
        ),
    ):
        for item in client.get_files_from_path(
            folder_path=folder_path,
            file_name_startswith=file_name_prefix or "",
            pattern=file_name_pattern,
        ):
            yield item

    @dlt.transformer(name=table_name)
    def file_data(file_item: Dict):
        file_io = client.get_file_bytes_io(file_item)
        pd_func = file_type_enum.get_pd_function()
        kwargs = pandas_kwargs or {}
        chunksize = kwargs.get("chunksize")
        
        if chunksize and file_type_enum == FileType.CSV:
            with pd_func(file_io, **kwargs) as reader:
                for chunk in reader:
                    yield chunk
        else:
            clean_kwargs = {k: v for k, v in kwargs.items() if k != "chunksize"}
            yield pd_func(file_io, **clean_kwargs)

    return file_items | file_data

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same goes for the sharepoint_list source

@dlt.source(name="sharepoint_list", max_table_nesting=0)
def sharepoint_list(
    list_title: str = dlt.config.value,
    table_name: str = dlt.config.value,
    select: Optional[str] = dlt.config.value,
    credentials: SharepointCredentials = dlt.secrets.value,
):
    client = SharepointClient(**credentials)
    client.connect()

    @dlt.resource(name=table_name)
    def list_items():
        data = client.get_items_from_list(
            list_title=list_title,
            select=select,
        )
        yield from data

    return list_items

Comment on lines +170 to +175
# TODO, pagination not yet implemented
logger.warning(
"Pagination is not implemented for get_items_from_list, "
"it will return only first page of items."
)
all_lists = self.get_all_lists_in_site()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the client has the paginator already defined, I think we can just go with something like

def get_items_from_list(self, list_title: str, select: Optional[str] = None) -> Iterator[Dict]:
    all_lists = self.get_all_lists_in_site()
    
    possible_list_titles = [x["displayName"] for x in all_lists]
    if list_title not in possible_list_titles:
        raise ValueError(
            f"List with title '{list_title}' not found in site {self.site_id}. "
            f"Available lists: {possible_list_titles}"
        )

    # Get the list ID
    list_id = next(
        x["id"] for x in all_lists if x["displayName"] == list_title
    )

    url = f"{self.graph_site_url}/lists/{list_id}/items?expand=fields"
    if select:
        url += f"(select={select})"
    
    # Paginate through all results
    item_count = 0
    for page in self.client.paginate(url):
        page.raise_for_status()
        for item in page.json().get("value", []):
            item_count += 1
            yield item.get("fields", {})
    
    logger.info(f"Got {item_count} items from list: {list_title}")

wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

question Further information is requested

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants