-
Notifications
You must be signed in to change notification settings - Fork 75
Implement SharePoint data extraction sources and configuration classes #661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
|
…ion definitions in SharePoint modules
…tFilesConfig classes
|
Hi @WangCHEN9, thanks for the contribution! Could add the tests you have written for the source? btw, please rebase on master, some import failures were fixed which were causing ci errors |
- Updated `pyproject.toml` to include SharePoint dependencies. - Created `README.md` for SharePoint source documentation. - Implemented SharePoint client in `helpers.py` for API interaction. - Added configuration classes in `sharepoint_files_config.py` for lists and files. - Developed extraction functions in `__init__.py` for SharePoint lists and files. - Created unit tests for SharePoint source in `test_sharepoint_source.py`. - Added requirements file for SharePoint source dependencies.
…epointFilesSource tests for clarity
Added tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works well, but a few improvements are needed imo 🙂. Mainly around pagination (I know it was marked as a todo, but it should be fairly easy to add here), some edge cases that could cause runtime errors, and simplifying parts to better follow dlt conventions
If you have any questions, I’m happy to discuss further here or in the community Slack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not specific to this file - would be great if you run make format and make lint, and address any issues they surface, otherwise the linter checks will fail
| def get_pd_function(self): | ||
| """Get the pandas read function for this file type. | ||
|
|
||
| Returns: | ||
| Callable pandas read function (e.g., pd.read_csv, pd.read_excel) | ||
| """ | ||
| return { | ||
| self.EXCEL: pd.read_excel, | ||
| self.CSV: pd.read_csv, | ||
| self.JSON: pd.read_json, | ||
| self.PARQUET: pd.read_parquet, | ||
| self.SAS: pd.read_sas, | ||
| self.SPSS: pd.read_spss, | ||
| }[self] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like FileType.SAV is missing here
| else: | ||
| logger.warning( | ||
| f"No items found in list: {list_title}, with select: {select}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be returning None, which is not reflected in the return type signature of the function - this is probably being raised by the linter as well
| else: | ||
| logger.warning(f"No subsite found in {url}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to the issue with the get_items_from_list function returning None
| filtered_lists = [ | ||
| x | ||
| for x in all_lists | ||
| if x.get("list", {}).get("template") == "genericList" | ||
| and "Lists" in x.get("webUrl", "") | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems redundant, as self.get_all_lists_in_site already handles the filtering
| list_title=sharepoint_list_config.list_title, | ||
| select=sharepoint_list_config.select, | ||
| ) | ||
| yield from data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since get_items_from_list may currently return None, this will raise a NonType not iterable error
| if chunksize: | ||
| with file_item["pd_function"](file_io, **file_item["pd_kwargs"]) as reader: | ||
| for num, chunk in enumerate(reader): | ||
| logger.info(f"Processing chunk {num} of {file_item['name']}") | ||
| yield chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will only work with pd.read_csv
| ) | ||
| logger.info(f"Connected to SharePoint site id: {self.site_id} successfully") | ||
| else: | ||
| raise ConnectionError("Connection failed : ", token_response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was the intention here to raise a tuple, not an error string?
| def get_files( | ||
| sharepoint_files_config: SharepointFilesConfig, | ||
| last_update_timestamp: dlt.sources.incremental = dlt.sources.incremental( | ||
| cursor_path="lastModifiedDateTime", | ||
| initial_value="2020-01-01T00:00:00Z", | ||
| primary_key=(), | ||
| ), | ||
| ): | ||
| current_last_value = last_update_timestamp.last_value | ||
| logger.debug(f"current_last_value: {current_last_value}") | ||
| for file_item in client.get_files_from_path( | ||
| folder_path=sharepoint_files_config.folder_path, | ||
| file_name_startswith=sharepoint_files_config.file_name_startswith, | ||
| pattern=sharepoint_files_config.pattern, | ||
| ): | ||
| logger.debug( | ||
| "filtering files based on lastModifiedDateTime, compare to last_value:" | ||
| f" {current_last_value}" | ||
| ) | ||
| if ( | ||
| file_item["lastModifiedDateTime"] > current_last_value | ||
| or not sharepoint_files_config.is_file_incremental | ||
| ): | ||
| logger.info( | ||
| f"Processing file after lastModifiedDateTime filter: {file_item['name']}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering what the reason for doing manual incremental filtering here is - I'm thinking something like this is more idiomatic and readable:
@dlt.source(name="sharepoint_files", max_table_nesting=0)
def sharepoint_files(
folder_path: str = dlt.config.value,
table_name: str = dlt.config.value,
file_type: str = dlt.config.value,
file_name_prefix: Optional[str] = dlt.config.value,
file_name_pattern: Optional[str] = dlt.config.value,
pandas_kwargs: Optional[Dict] = dlt.config.value,
credentials: SharepointCredentials = dlt.secrets.value,
):
folder_path = validate_folder_path(folder_path)
file_type_enum = FileType(file_type)
client = SharepointClient(**credentials)
client.connect()
@dlt.resource(name=f"{table_name}__files", selected=False)
def file_items(
last_modified: dlt.sources.incremental[str] = dlt.sources.incremental(
cursor_path="lastModifiedDateTime",
initial_value="1970-01-01T00:00:00Z",
),
):
for item in client.get_files_from_path(
folder_path=folder_path,
file_name_startswith=file_name_prefix or "",
pattern=file_name_pattern,
):
yield item
@dlt.transformer(name=table_name)
def file_data(file_item: Dict):
file_io = client.get_file_bytes_io(file_item)
pd_func = file_type_enum.get_pd_function()
kwargs = pandas_kwargs or {}
chunksize = kwargs.get("chunksize")
if chunksize and file_type_enum == FileType.CSV:
with pd_func(file_io, **kwargs) as reader:
for chunk in reader:
yield chunk
else:
clean_kwargs = {k: v for k, v in kwargs.items() if k != "chunksize"}
yield pd_func(file_io, **clean_kwargs)
return file_items | file_dataThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same goes for the sharepoint_list source
@dlt.source(name="sharepoint_list", max_table_nesting=0)
def sharepoint_list(
list_title: str = dlt.config.value,
table_name: str = dlt.config.value,
select: Optional[str] = dlt.config.value,
credentials: SharepointCredentials = dlt.secrets.value,
):
client = SharepointClient(**credentials)
client.connect()
@dlt.resource(name=table_name)
def list_items():
data = client.get_items_from_list(
list_title=list_title,
select=select,
)
yield from data
return list_items| # TODO, pagination not yet implemented | ||
| logger.warning( | ||
| "Pagination is not implemented for get_items_from_list, " | ||
| "it will return only first page of items." | ||
| ) | ||
| all_lists = self.get_all_lists_in_site() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the client has the paginator already defined, I think we can just go with something like
def get_items_from_list(self, list_title: str, select: Optional[str] = None) -> Iterator[Dict]:
all_lists = self.get_all_lists_in_site()
possible_list_titles = [x["displayName"] for x in all_lists]
if list_title not in possible_list_titles:
raise ValueError(
f"List with title '{list_title}' not found in site {self.site_id}. "
f"Available lists: {possible_list_titles}"
)
# Get the list ID
list_id = next(
x["id"] for x in all_lists if x["displayName"] == list_title
)
url = f"{self.graph_site_url}/lists/{list_id}/items?expand=fields"
if select:
url += f"(select={select})"
# Paginate through all results
item_count = 0
for page in self.client.paginate(url):
page.raise_for_status()
for item in page.json().get("value", []):
item_count += 1
yield item.get("fields", {})
logger.info(f"Got {item_count} items from list: {list_title}")wdyt?
Introduce classes and functions for extracting data from SharePoint lists and files, including configuration for file types and paths. This implementation supports both list and file data extraction into a DuckDB pipeline.