From 03c81839cba08cf9d57793f2586fb5b1e8e5cbe5 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Tue, 29 Oct 2024 20:22:11 +0100 Subject: [PATCH 01/23] Implement prefixes parameter. --- pandas/core/generic.py | 5 +++++ pandas/io/sql.py | 15 +++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 42516f0a85e07..078a32aa43b2f 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -2772,6 +2772,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + prefixes: Sequence[str] | None = None, ) -> int | None: """ Write records stored in a DataFrame to a SQL database. @@ -2828,6 +2829,9 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + prefixs : sequence, optional + A list of strings to insert after CREATE in the CREATE TABLE statement. + They will be separated by spaces. Returns ------- @@ -3001,6 +3005,7 @@ def to_sql( chunksize=chunksize, dtype=dtype, method=method, + prefixes=prefixes, ) @final diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 9aff5600cf49b..857f717d415a9 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -70,6 +70,7 @@ Generator, Iterator, Mapping, + Sequence, ) from sqlalchemy import Table @@ -742,6 +743,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + prefixes: Sequence[str] | None = None, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -789,6 +791,9 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + prefixs : sequence, optional + A list of strings to insert after CREATE in the CREATE TABLE statement. + They will be separated by spaces. engine : {'auto', 'sqlalchemy'}, default 'auto' SQL engine library to use. If 'auto', then the option ``io.sql.engine`` is used. The default ``io.sql.engine`` @@ -837,6 +842,7 @@ def to_sql( chunksize=chunksize, dtype=dtype, method=method, + prefixes=prefixes, engine=engine, **engine_kwargs, ) @@ -930,6 +936,7 @@ def __init__( schema=None, keys=None, dtype: DtypeArg | None = None, + prefixes: Sequence[str] | None = None, ) -> None: self.name = name self.pd_sql = pandas_sql_engine @@ -940,6 +947,7 @@ def __init__( self.if_exists = if_exists self.keys = keys self.dtype = dtype + self.prefixes = prefixes if frame is not None: # We want to initialize based on a dataframe @@ -1859,6 +1867,7 @@ def prep_table( index_label=None, schema=None, dtype: DtypeArg | None = None, + prefixes: Sequence[str] | None = None, ) -> SQLTable: """ Prepares table in the database for data insertion. Creates it if needed, etc. @@ -1894,6 +1903,7 @@ def prep_table( index_label=index_label, schema=schema, dtype=dtype, + prefixes=prefixes, ) table.create() return table @@ -1938,6 +1948,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + prefixes: Sequence[str] | None = None, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -1979,6 +1990,9 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + prefixs : sequence, optional + A list of strings to insert after CREATE in the CREATE TABLE statement. + They will be separated by spaces. engine : {'auto', 'sqlalchemy'}, default 'auto' SQL engine library to use. If 'auto', then the option ``io.sql.engine`` is used. The default ``io.sql.engine`` @@ -1999,6 +2013,7 @@ def to_sql( index_label=index_label, schema=schema, dtype=dtype, + prefixes=prefixes, ) total_inserted = sql_engine.insert_records( From f09fd548e3a67b69764cd4ed918a4a58babdb8c8 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Tue, 29 Oct 2024 20:39:42 +0100 Subject: [PATCH 02/23] [WIP] implent special cases for temporary tables. --- pandas/io/sql.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 857f717d415a9..95e1a429a9ae4 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -948,6 +948,10 @@ def __init__( self.keys = keys self.dtype = dtype self.prefixes = prefixes + # check if the table to be created is a temporary table + self.is_temporary = self.prefixes is not None and "TEMPORARY".casefold() in [ + prefix.casefold() for prefix in self.prefixes + ] if frame is not None: # We want to initialize based on a dataframe From 03b864276246f35980ea5085701fde5a5a6acae0 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Sun, 17 Nov 2024 16:48:40 +0100 Subject: [PATCH 03/23] Specify Exception in _exists_temporary. --- pandas/io/sql.py | 32 +++++++++++-- pandas/tests/io/test_sql.py | 92 +++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 95e1a429a9ae4..46674b09a0a8e 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -30,6 +30,7 @@ import warnings import numpy as np +from sqlalchemy.exc import ProgrammingError from pandas._config import using_string_dtype @@ -966,8 +967,30 @@ def __init__( if not len(self.name): raise ValueError("Empty table name specified") + def _drop_temporary_table(self): + if self.schema is None: + query = f"DROP TABLE {self.name}" + else: + query = f"DROP TABLE {self.schema}.{self.name}" + self.pd_sql.execute(query) + + def _exists_temporary(self): + if self.schema is None: + query = f"SELECT * FROM {self.name} LIMIT 1" + else: + query = f"SELECT * FROM {self.schema}.{self.name} LIMIT 1" + try: + _ = self.pd_sql.read_query(query) + return True + except ProgrammingError: + print("doesn't exist") + return False + def exists(self): - return self.pd_sql.has_table(self.name, self.schema) + if self.is_temporary: + return self._exists_temporary() + else: + return self.pd_sql.has_table(self.name, self.schema) def sql_schema(self) -> str: from sqlalchemy.schema import CreateTable @@ -985,7 +1008,10 @@ def create(self) -> None: if self.if_exists == "fail": raise ValueError(f"Table '{self.name}' already exists.") if self.if_exists == "replace": - self.pd_sql.drop_table(self.name, self.schema) + if self.is_temporary: + self._drop_temporary_table() + else: + self.pd_sql.drop_table(self.name, self.schema) self._execute_create() elif self.if_exists == "append": pass @@ -1279,7 +1305,7 @@ def _create_table_setup(self): # At this point, attach to new metadata, only attach to self.meta # once table is created. meta = MetaData() - return Table(self.name, meta, *columns, schema=schema) + return Table(self.name, meta, *columns, schema=schema, prefixes=self.prefixes) def _harmonize_columns( self, diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index c28a33069d23f..737a9eb55306c 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -39,6 +39,7 @@ to_timedelta, ) import pandas._testing as tm +from pandas.testing import assert_frame_equal from pandas.util.version import Version from pandas.io import sql @@ -4352,3 +4353,94 @@ def test_xsqlite_if_exists(sqlite_buildin): (5, "E"), ] drop_table(table_name, sqlite_buildin) + + +@pytest.mark.db +def test_exists_temporary_table(mysql_pymysql_engine): + df_true = DataFrame( + { + "id": [1, 2], + "name": ["Siegfried", "Isolde"], + } + ) + + pandas_sql = pandasSQL_builder( + mysql_pymysql_engine, schema=None, need_transaction=True + ) + table = sql.SQLTable( + name="DF_TRUE", + pandas_sql_engine=pandas_sql, + frame=df_true, + index=False, + if_exists="fail", + prefixes=["TEMPORARY"], + ) + + table.create() + + assert True if table.exists() else False + + +@pytest.mark.db +def test_to_sql_temporary_table_replace(mysql_pymysql_engine): + from sqlalchemy import text + + df_true = DataFrame( + { + "id": [1, 2], + "name": ["Siegried", "Isolde"], + } + ) + + query = """ + CREATE TEMPORARY TABLE DF_TRUE ( + ID SMALLINT, + NAME VARCHAR(20) + ) + """ + + with mysql_pymysql_engine.begin() as conn: + conn.execute(text(query)) + + df_true.to_sql( + name="DF_TRUE", + con=conn, + if_exists="replace", + index=False, + prefixes=["TEMPORARY"], + ) + + df_test = pd.read_sql("SELECT * FROM DF_TRUE", conn) + + assert_frame_equal(df_true, df_test) + + +@pytest.mark.db +def test_to_sql_temporary_table_fail(mysql_pymysql_engine): + from sqlalchemy import text + + df_true = DataFrame( + { + "id": [1, 2], + "name": ["Siegfried", "Isolde"], + } + ) + + query = """ + CREATE TEMPORARY TABLE DF_TRUE ( + ID SMALLINT, + NAME VARCHAR(20) + ) + """ + + with mysql_pymysql_engine.begin() as conn: + conn.execute(text(query)) + + with pytest.raises(ValueError, match=r"Table 'DF_TRUE' already exists."): + df_true.to_sql( + name="DF_TRUE", + con=conn, + if_exists="fail", + index=False, + prefixes=["TEMPORARY"], + ) From ccb9eacf352412470f42a7685d8277b1c83812be Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Sun, 17 Nov 2024 16:57:53 +0100 Subject: [PATCH 04/23] remove print --- pandas/io/sql.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 46674b09a0a8e..913fa344f8c55 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -983,7 +983,6 @@ def _exists_temporary(self): _ = self.pd_sql.read_query(query) return True except ProgrammingError: - print("doesn't exist") return False def exists(self): From d792f2774e7b8ba13499b65b047e7bcfc7beaa84 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Sun, 17 Nov 2024 18:32:52 +0100 Subject: [PATCH 05/23] Finalize working mysql implementation. --- pandas/io/sql.py | 4 +- pandas/tests/io/test_sql.py | 113 +++++++++++++++++------------------- 2 files changed, 57 insertions(+), 60 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 913fa344f8c55..2bab3f4431c0f 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -998,7 +998,9 @@ def sql_schema(self) -> str: def _execute_create(self) -> None: # Inserting table into database, add to MetaData object - self.table = self.table.to_metadata(self.pd_sql.meta) + if not self.is_temporary: + # only insert into meta data, if table is not temporary + self.table = self.table.to_metadata(self.pd_sql.meta) with self.pd_sql.run_transaction(): self.table.create(bind=self.pd_sql.con) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 737a9eb55306c..f4a48b0e3a775 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -17,6 +17,8 @@ import numpy as np import pytest +from sqlalchemy import text +from sqlalchemy.engine import Connection from pandas._config import using_string_dtype @@ -4355,22 +4357,15 @@ def test_xsqlite_if_exists(sqlite_buildin): drop_table(table_name, sqlite_buildin) -@pytest.mark.db -def test_exists_temporary_table(mysql_pymysql_engine): - df_true = DataFrame( - { - "id": [1, 2], - "name": ["Siegfried", "Isolde"], - } - ) +@pytest.mark.parametrize("conn", mysql_connectable) +def test_exists_temporary_table(conn, test_frame1, request): + conn = request.getfixturevalue(conn) - pandas_sql = pandasSQL_builder( - mysql_pymysql_engine, schema=None, need_transaction=True - ) + pandas_sql = pandasSQL_builder(conn, schema=None, need_transaction=True) table = sql.SQLTable( - name="DF_TRUE", + name="test_frame1", pandas_sql_engine=pandas_sql, - frame=df_true, + frame=test_frame1, index=False, if_exists="fail", prefixes=["TEMPORARY"], @@ -4381,66 +4376,66 @@ def test_exists_temporary_table(mysql_pymysql_engine): assert True if table.exists() else False -@pytest.mark.db -def test_to_sql_temporary_table_replace(mysql_pymysql_engine): - from sqlalchemy import text - - df_true = DataFrame( - { - "id": [1, 2], - "name": ["Siegried", "Isolde"], - } - ) +@pytest.mark.parametrize("conn", mysql_connectable) +def test_to_sql_temporary_table_replace(conn, test_frame1, request): + conn = request.getfixturevalue(conn) query = """ - CREATE TEMPORARY TABLE DF_TRUE ( - ID SMALLINT, - NAME VARCHAR(20) + CREATE TEMPORARY TABLE test_frame1 ( + `INDEX` TEXT, + A FLOAT(53), + B FLOAT(53), + C FLOAT(53), + D FLOAT(53) ) """ - with mysql_pymysql_engine.begin() as conn: - conn.execute(text(query)) + if isinstance(conn, Connection): + con = conn + else: + con = conn.connect() - df_true.to_sql( - name="DF_TRUE", - con=conn, - if_exists="replace", - index=False, - prefixes=["TEMPORARY"], - ) + con.execute(text(query)) - df_test = pd.read_sql("SELECT * FROM DF_TRUE", conn) + test_frame1.to_sql( + name="test_frame1", + con=con, + if_exists="replace", + index=False, + prefixes=["TEMPORARY"], + ) - assert_frame_equal(df_true, df_test) + df_test = pd.read_sql("SELECT * FROM test_frame1", con) + assert_frame_equal(test_frame1, df_test) -@pytest.mark.db -def test_to_sql_temporary_table_fail(mysql_pymysql_engine): - from sqlalchemy import text - df_true = DataFrame( - { - "id": [1, 2], - "name": ["Siegfried", "Isolde"], - } - ) +@pytest.mark.parametrize("conn", mysql_connectable) +def test_to_sql_temporary_table_fail(conn, test_frame1, request): + conn = request.getfixturevalue(conn) query = """ - CREATE TEMPORARY TABLE DF_TRUE ( - ID SMALLINT, - NAME VARCHAR(20) + CREATE TEMPORARY TABLE test_frame1 ( + `INDEX` TEXT, + A FLOAT(53), + B FLOAT(53), + C FLOAT(53), + D FLOAT(53) ) """ - with mysql_pymysql_engine.begin() as conn: - conn.execute(text(query)) + if isinstance(conn, Connection): + con = conn + else: + con = conn.connect() - with pytest.raises(ValueError, match=r"Table 'DF_TRUE' already exists."): - df_true.to_sql( - name="DF_TRUE", - con=conn, - if_exists="fail", - index=False, - prefixes=["TEMPORARY"], - ) + con.execute(text(query)) + + with pytest.raises(ValueError, match=r"Table 'test_frame1' already exists."): + test_frame1.to_sql( + name="test_frame1", + con=con, + if_exists="fail", + index=False, + prefixes=["TEMPORARY"], + ) From 258283468794f480f999d75086c2c9384d1a771f Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Wed, 20 Nov 2024 19:56:41 +0100 Subject: [PATCH 06/23] [WIP] Add rollback for postgres. --- pandas/io/sql.py | 2 ++ pandas/tests/io/test_sql.py | 46 ++++++++++++++----------------------- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 2bab3f4431c0f..72f778b96b5a0 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -983,6 +983,8 @@ def _exists_temporary(self): _ = self.pd_sql.read_query(query) return True except ProgrammingError: + # Some DBMS (e.g. postgres) require a rollback after a caught exception + self.pd_sql.execute("rollback") return False def exists(self): diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index f4a48b0e3a775..26db5e3a8f542 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -1247,7 +1247,6 @@ def test_read_procedure(conn, request): # GH 7324 # Although it is more an api test, it is added to the # mysql tests as sqlite does not have stored procedures - from sqlalchemy import text from sqlalchemy.engine import Engine df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]}) @@ -2387,7 +2386,6 @@ def test_read_sql_delegate(conn, request): def test_not_reflect_all_tables(sqlite_conn): conn = sqlite_conn - from sqlalchemy import text from sqlalchemy.engine import Engine # create invalid table @@ -2532,7 +2530,6 @@ def test_query_by_text_obj(conn, request): # WIP : GH10846 conn_name = conn conn = request.getfixturevalue(conn) - from sqlalchemy import text if "postgres" in conn_name: name_text = text('select * from iris where "Name"=:name') @@ -3199,7 +3196,6 @@ def test_get_schema_create_table(conn, request, test_frame3): conn = request.getfixturevalue(conn) - from sqlalchemy import text from sqlalchemy.engine import Engine tbl = "test_get_schema_create_table" @@ -4357,7 +4353,7 @@ def test_xsqlite_if_exists(sqlite_buildin): drop_table(table_name, sqlite_buildin) -@pytest.mark.parametrize("conn", mysql_connectable) +@pytest.mark.parametrize("conn", mysql_connectable + postgresql_connectable) def test_exists_temporary_table(conn, test_frame1, request): conn = request.getfixturevalue(conn) @@ -4376,26 +4372,22 @@ def test_exists_temporary_table(conn, test_frame1, request): assert True if table.exists() else False -@pytest.mark.parametrize("conn", mysql_connectable) +@pytest.mark.parametrize("conn", mysql_connectable + postgresql_connectable) def test_to_sql_temporary_table_replace(conn, test_frame1, request): conn = request.getfixturevalue(conn) - query = """ - CREATE TEMPORARY TABLE test_frame1 ( - `INDEX` TEXT, - A FLOAT(53), - B FLOAT(53), - C FLOAT(53), - D FLOAT(53) - ) - """ - if isinstance(conn, Connection): con = conn else: con = conn.connect() - con.execute(text(query)) + test_frame1.to_sql( + name="test_frame1", + con=con, + if_exists="fail", + index=False, + prefixes=["TEMPORARY"], + ) test_frame1.to_sql( name="test_frame1", @@ -4410,26 +4402,22 @@ def test_to_sql_temporary_table_replace(conn, test_frame1, request): assert_frame_equal(test_frame1, df_test) -@pytest.mark.parametrize("conn", mysql_connectable) +@pytest.mark.parametrize("conn", mysql_connectable + postgresql_connectable) def test_to_sql_temporary_table_fail(conn, test_frame1, request): conn = request.getfixturevalue(conn) - query = """ - CREATE TEMPORARY TABLE test_frame1 ( - `INDEX` TEXT, - A FLOAT(53), - B FLOAT(53), - C FLOAT(53), - D FLOAT(53) - ) - """ - if isinstance(conn, Connection): con = conn else: con = conn.connect() - con.execute(text(query)) + test_frame1.to_sql( + name="test_frame1", + con=con, + if_exists="fail", + index=False, + prefixes=["TEMPORARY"], + ) with pytest.raises(ValueError, match=r"Table 'test_frame1' already exists."): test_frame1.to_sql( From b138532ce0ddf50bcc43b0fd16b8d5b7cd29803e Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Thu, 21 Nov 2024 16:50:23 +0100 Subject: [PATCH 07/23] Add support for sqlite. --- pandas/io/sql.py | 13 ++++++++++++- pandas/tests/io/test_sql.py | 33 ++++++++++++++++++++++----------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 72f778b96b5a0..942e6d69f6250 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -30,7 +30,6 @@ import warnings import numpy as np -from sqlalchemy.exc import ProgrammingError from pandas._config import using_string_dtype @@ -975,6 +974,11 @@ def _drop_temporary_table(self): self.pd_sql.execute(query) def _exists_temporary(self): + from sqlalchemy.exc import ( + OperationalError, + ProgrammingError, + ) + if self.schema is None: query = f"SELECT * FROM {self.name} LIMIT 1" else: @@ -986,6 +990,8 @@ def _exists_temporary(self): # Some DBMS (e.g. postgres) require a rollback after a caught exception self.pd_sql.execute("rollback") return False + except OperationalError: + return False def exists(self): if self.is_temporary: @@ -2816,6 +2822,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + prefixes: Sequence[str] | None = None, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -2856,6 +2863,9 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. + prefixs : sequence, optional + A list of strings to insert after CREATE in the CREATE TABLE statement. + They will be separated by spaces. """ if dtype: if not is_dict_like(dtype): @@ -2881,6 +2891,7 @@ def to_sql( if_exists=if_exists, index_label=index_label, dtype=dtype, + prefixes=prefixes, ) table.create() return table.insert(chunksize, method) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 26db5e3a8f542..c7016eca2501d 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -17,8 +17,11 @@ import numpy as np import pytest -from sqlalchemy import text -from sqlalchemy.engine import Connection +from sqlalchemy import ( + create_engine, + text, +) +from sqlalchemy.engine import Engine from pandas._config import using_string_dtype @@ -4353,7 +4356,7 @@ def test_xsqlite_if_exists(sqlite_buildin): drop_table(table_name, sqlite_buildin) -@pytest.mark.parametrize("conn", mysql_connectable + postgresql_connectable) +@pytest.mark.parametrize("conn", sqlalchemy_connectable) def test_exists_temporary_table(conn, test_frame1, request): conn = request.getfixturevalue(conn) @@ -4372,14 +4375,18 @@ def test_exists_temporary_table(conn, test_frame1, request): assert True if table.exists() else False -@pytest.mark.parametrize("conn", mysql_connectable + postgresql_connectable) +@pytest.mark.parametrize("conn", sqlalchemy_connectable) def test_to_sql_temporary_table_replace(conn, test_frame1, request): conn = request.getfixturevalue(conn) - if isinstance(conn, Connection): - con = conn - else: + # some DBMS only allow temporary tables to exist within a connection, therefore + # we can only test for a connection and not all types of connectables. + if isinstance(conn, Engine): con = conn.connect() + elif isinstance(conn, str): + con = create_engine(conn).connect() + else: + con = conn test_frame1.to_sql( name="test_frame1", @@ -4402,14 +4409,18 @@ def test_to_sql_temporary_table_replace(conn, test_frame1, request): assert_frame_equal(test_frame1, df_test) -@pytest.mark.parametrize("conn", mysql_connectable + postgresql_connectable) +@pytest.mark.parametrize("conn", sqlalchemy_connectable) def test_to_sql_temporary_table_fail(conn, test_frame1, request): conn = request.getfixturevalue(conn) - if isinstance(conn, Connection): - con = conn - else: + # some DBMS only allow temporary tables to exist within a connection, therefore + # we can only test for a connection and not all types of connectables. + if isinstance(conn, Engine): con = conn.connect() + elif isinstance(conn, str): + con = create_engine(conn).connect() + else: + con = conn test_frame1.to_sql( name="test_frame1", From f6962572a8100d8d0448506899efef4671230b9d Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Sun, 24 Nov 2024 19:18:56 +0100 Subject: [PATCH 08/23] Add connectables with default pool and add test for if_exists=append. --- pandas/tests/io/test_sql.py | 156 +++++++++++++++++++++++++++--------- 1 file changed, 119 insertions(+), 37 deletions(-) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index c7016eca2501d..3dc2d4abcce6d 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -312,7 +312,6 @@ def create_and_load_types_postgresql(conn, types_data: list[dict]): def create_and_load_types(conn, types_data: list[dict], dialect: str): from sqlalchemy import insert - from sqlalchemy.engine import Engine types = types_table_metadata(dialect) @@ -338,7 +337,6 @@ def create_and_load_postgres_datetz(conn): Table, insert, ) - from sqlalchemy.engine import Engine metadata = MetaData() datetz = Table("datetz", metadata, Column("DateColWithTz", DateTime(timezone=True))) @@ -620,6 +618,22 @@ def mysql_pymysql_engine(): engine.dispose() +@pytest.fixture +def mysql_pymysql_engine_default_pool(): + sqlalchemy = pytest.importorskip("sqlalchemy") + pymysql = pytest.importorskip("pymysql") + engine = sqlalchemy.create_engine( + "mysql+pymysql://root@localhost:3306/pandas", + connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, + ) + yield engine + for view in get_all_views(engine): + drop_view(view, engine) + for tbl in get_all_tables(engine): + drop_table(tbl, engine) + engine.dispose() + + @pytest.fixture def mysql_pymysql_engine_iris(mysql_pymysql_engine, iris_path): create_and_load_iris(mysql_pymysql_engine, iris_path) @@ -639,6 +653,12 @@ def mysql_pymysql_conn(mysql_pymysql_engine): yield conn +@pytest.fixture +def mysql_pymysql_conn_default_pool(mysql_pymysql_engine_default_pool): + with mysql_pymysql_engine_default_pool.connect() as conn: + yield conn + + @pytest.fixture def mysql_pymysql_conn_iris(mysql_pymysql_engine_iris): with mysql_pymysql_engine_iris.connect() as conn: @@ -667,6 +687,21 @@ def postgresql_psycopg2_engine(): engine.dispose() +@pytest.fixture +def postgresql_psycopg2_engine_default_pool(): + sqlalchemy = pytest.importorskip("sqlalchemy") + pytest.importorskip("psycopg2") + engine = sqlalchemy.create_engine( + "postgresql+psycopg2://postgres:postgres@localhost:5432/pandas", + ) + yield engine + for view in get_all_views(engine): + drop_view(view, engine) + for tbl in get_all_tables(engine): + drop_table(tbl, engine) + engine.dispose() + + @pytest.fixture def postgresql_psycopg2_engine_iris(postgresql_psycopg2_engine, iris_path): create_and_load_iris(postgresql_psycopg2_engine, iris_path) @@ -686,6 +721,12 @@ def postgresql_psycopg2_conn(postgresql_psycopg2_engine): yield conn +@pytest.fixture +def postgresql_psycopg2_conn_default_pool(postgresql_psycopg2_engine_default_pool): + with postgresql_psycopg2_engine_default_pool.connect() as conn: + yield conn + + @pytest.fixture def postgresql_adbc_conn(): pytest.importorskip("adbc_driver_postgresql") @@ -768,12 +809,30 @@ def sqlite_engine(sqlite_str): engine.dispose() +@pytest.fixture +def sqlite_engine_default_pool(sqlite_str): + sqlalchemy = pytest.importorskip("sqlalchemy") + engine = sqlalchemy.create_engine(sqlite_str) + yield engine + for view in get_all_views(engine): + drop_view(view, engine) + for tbl in get_all_tables(engine): + drop_table(tbl, engine) + engine.dispose() + + @pytest.fixture def sqlite_conn(sqlite_engine): with sqlite_engine.connect() as conn: yield conn +@pytest.fixture +def sqlite_conn_default_pool(sqlite_engine_default_pool): + with sqlite_engine_default_pool.connect() as conn: + yield conn + + @pytest.fixture def sqlite_str_iris(sqlite_str, iris_path): sqlalchemy = pytest.importorskip("sqlalchemy") @@ -900,6 +959,11 @@ def sqlite_buildin_types(sqlite_buildin, types_data): pytest.param("mysql_pymysql_conn", marks=pytest.mark.db), ] +mysql_connectable_default_pool = [ + pytest.param("mysql_pymysql_engine_default_pool", marks=pytest.mark.db), + pytest.param("mysql_pymysql_conn_default_pool", marks=pytest.mark.db), +] + mysql_connectable_iris = [ pytest.param("mysql_pymysql_engine_iris", marks=pytest.mark.db), pytest.param("mysql_pymysql_conn_iris", marks=pytest.mark.db), @@ -915,6 +979,11 @@ def sqlite_buildin_types(sqlite_buildin, types_data): pytest.param("postgresql_psycopg2_conn", marks=pytest.mark.db), ] +postgresql_connectable_default_pool = [ + pytest.param("postgresql_psycopg2_engine_default_pool", marks=pytest.mark.db), + pytest.param("postgresql_psycopg2_conn_default_pool", marks=pytest.mark.db), +] + postgresql_connectable_iris = [ pytest.param("postgresql_psycopg2_engine_iris", marks=pytest.mark.db), pytest.param("postgresql_psycopg2_conn_iris", marks=pytest.mark.db), @@ -931,6 +1000,11 @@ def sqlite_buildin_types(sqlite_buildin, types_data): "sqlite_str", ] +sqlite_connectable_default_pool = [ + "sqlite_engine_default_pool", + "sqlite_conn_default_pool", +] + sqlite_connectable_iris = [ "sqlite_engine_iris", "sqlite_conn_iris", @@ -945,6 +1019,12 @@ def sqlite_buildin_types(sqlite_buildin, types_data): sqlalchemy_connectable = mysql_connectable + postgresql_connectable + sqlite_connectable +sqlalchemy_connectable_default_pool = ( + mysql_connectable_default_pool + + postgresql_connectable_default_pool + + sqlite_connectable_default_pool +) + sqlalchemy_connectable_iris = ( mysql_connectable_iris + postgresql_connectable_iris + sqlite_connectable_iris ) @@ -1135,7 +1215,6 @@ def test_read_iris_query_expression_with_parameter(conn, request): from sqlalchemy import ( MetaData, Table, - create_engine, select, ) @@ -1250,7 +1329,6 @@ def test_read_procedure(conn, request): # GH 7324 # Although it is more an api test, it is added to the # mysql tests as sqlite does not have stored procedures - from sqlalchemy.engine import Engine df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]}) df.to_sql(name="test_frame", con=conn, index=False) @@ -1324,7 +1402,6 @@ def test_insertion_method_on_conflict_do_nothing(conn, request): conn = request.getfixturevalue(conn) from sqlalchemy.dialects.postgresql import insert - from sqlalchemy.engine import Engine from sqlalchemy.sql import text def insert_on_conflict(table, conn, keys, data_iter): @@ -1406,7 +1483,6 @@ def test_insertion_method_on_conflict_update(conn, request): conn = request.getfixturevalue(conn) from sqlalchemy.dialects.mysql import insert - from sqlalchemy.engine import Engine from sqlalchemy.sql import text def insert_on_conflict(table, conn, keys, data_iter): @@ -1458,7 +1534,6 @@ def test_read_view_postgres(conn, request): # GH 52969 conn = request.getfixturevalue(conn) - from sqlalchemy.engine import Engine from sqlalchemy.sql import text table_name = f"group_{uuid.uuid4().hex}" @@ -2389,7 +2464,6 @@ def test_read_sql_delegate(conn, request): def test_not_reflect_all_tables(sqlite_conn): conn = sqlite_conn - from sqlalchemy.engine import Engine # create invalid table query_list = [ @@ -3199,8 +3273,6 @@ def test_get_schema_create_table(conn, request, test_frame3): conn = request.getfixturevalue(conn) - from sqlalchemy.engine import Engine - tbl = "test_get_schema_create_table" create_sql = sql.get_schema(test_frame3, tbl, con=conn) blank_test_df = test_frame3.iloc[:0] @@ -3353,7 +3425,6 @@ def test_connectable_issue_example(conn, request): # This tests the example raised in issue # https://github.com/pandas-dev/pandas/issues/10104 - from sqlalchemy.engine import Engine def test_select(connection): query = "SELECT test_foo_data FROM test_foo_data" @@ -4356,7 +4427,7 @@ def test_xsqlite_if_exists(sqlite_buildin): drop_table(table_name, sqlite_buildin) -@pytest.mark.parametrize("conn", sqlalchemy_connectable) +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) def test_exists_temporary_table(conn, test_frame1, request): conn = request.getfixturevalue(conn) @@ -4375,22 +4446,13 @@ def test_exists_temporary_table(conn, test_frame1, request): assert True if table.exists() else False -@pytest.mark.parametrize("conn", sqlalchemy_connectable) +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) def test_to_sql_temporary_table_replace(conn, test_frame1, request): conn = request.getfixturevalue(conn) - # some DBMS only allow temporary tables to exist within a connection, therefore - # we can only test for a connection and not all types of connectables. - if isinstance(conn, Engine): - con = conn.connect() - elif isinstance(conn, str): - con = create_engine(conn).connect() - else: - con = conn - test_frame1.to_sql( name="test_frame1", - con=con, + con=conn, if_exists="fail", index=False, prefixes=["TEMPORARY"], @@ -4398,33 +4460,24 @@ def test_to_sql_temporary_table_replace(conn, test_frame1, request): test_frame1.to_sql( name="test_frame1", - con=con, + con=conn, if_exists="replace", index=False, prefixes=["TEMPORARY"], ) - df_test = pd.read_sql("SELECT * FROM test_frame1", con) + df_test = pd.read_sql("SELECT * FROM test_frame1", conn) assert_frame_equal(test_frame1, df_test) -@pytest.mark.parametrize("conn", sqlalchemy_connectable) +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) def test_to_sql_temporary_table_fail(conn, test_frame1, request): conn = request.getfixturevalue(conn) - # some DBMS only allow temporary tables to exist within a connection, therefore - # we can only test for a connection and not all types of connectables. - if isinstance(conn, Engine): - con = conn.connect() - elif isinstance(conn, str): - con = create_engine(conn).connect() - else: - con = conn - test_frame1.to_sql( name="test_frame1", - con=con, + con=conn, if_exists="fail", index=False, prefixes=["TEMPORARY"], @@ -4433,8 +4486,37 @@ def test_to_sql_temporary_table_fail(conn, test_frame1, request): with pytest.raises(ValueError, match=r"Table 'test_frame1' already exists."): test_frame1.to_sql( name="test_frame1", - con=con, + con=conn, if_exists="fail", index=False, prefixes=["TEMPORARY"], ) + + +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) +def test_to_sql_temporary_table_append(conn, test_frame1, request): + conn = request.getfixturevalue(conn) + + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="fail", + index=False, + prefixes=["TEMPORARY"], + ) + + test_frame1.to_sql( + name="test_frame1", + con=conn, + if_exists="append", + index=False, + prefixes=["TEMPORARY"], + ) + + df_test = pd.read_sql("SELECT * FROM test_frame1", conn) + + df_true = concat([test_frame1, test_frame1], axis=0, ignore_index=True).reset_index( + drop=True + ) + + assert_frame_equal(df_true, df_test) From 0bc6504ad110798b05eb0c1e52e5e602fa3b3a2f Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Mon, 25 Nov 2024 20:44:35 +0100 Subject: [PATCH 09/23] Fix typo in prefixes docstring. --- pandas/io/sql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 942e6d69f6250..b61395b9deaf0 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -791,7 +791,7 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixs : sequence, optional + prefixes : sequence, optional A list of strings to insert after CREATE in the CREATE TABLE statement. They will be separated by spaces. engine : {'auto', 'sqlalchemy'}, default 'auto' @@ -2029,7 +2029,7 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixs : sequence, optional + prefixes : sequence, optional A list of strings to insert after CREATE in the CREATE TABLE statement. They will be separated by spaces. engine : {'auto', 'sqlalchemy'}, default 'auto' @@ -2863,7 +2863,7 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixs : sequence, optional + prefixes : sequence, optional A list of strings to insert after CREATE in the CREATE TABLE statement. They will be separated by spaces. """ From 8a94c2bb2250714c646057c51e936ce3691929ad Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Mon, 25 Nov 2024 20:55:45 +0100 Subject: [PATCH 10/23] Undo experimental import changes. --- pandas/tests/io/test_sql.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 3dc2d4abcce6d..5bff62c154f56 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -17,11 +17,6 @@ import numpy as np import pytest -from sqlalchemy import ( - create_engine, - text, -) -from sqlalchemy.engine import Engine from pandas._config import using_string_dtype @@ -312,6 +307,7 @@ def create_and_load_types_postgresql(conn, types_data: list[dict]): def create_and_load_types(conn, types_data: list[dict], dialect: str): from sqlalchemy import insert + from sqlalchemy.engine import Engine types = types_table_metadata(dialect) @@ -337,6 +333,7 @@ def create_and_load_postgres_datetz(conn): Table, insert, ) + from sqlalchemy.engine import Engine metadata = MetaData() datetz = Table("datetz", metadata, Column("DateColWithTz", DateTime(timezone=True))) @@ -1215,6 +1212,7 @@ def test_read_iris_query_expression_with_parameter(conn, request): from sqlalchemy import ( MetaData, Table, + create_engine, select, ) @@ -1329,6 +1327,8 @@ def test_read_procedure(conn, request): # GH 7324 # Although it is more an api test, it is added to the # mysql tests as sqlite does not have stored procedures + from sqlalchemy import text + from sqlalchemy.engine import Engine df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]}) df.to_sql(name="test_frame", con=conn, index=False) @@ -1402,6 +1402,7 @@ def test_insertion_method_on_conflict_do_nothing(conn, request): conn = request.getfixturevalue(conn) from sqlalchemy.dialects.postgresql import insert + from sqlalchemy.engine import Engine from sqlalchemy.sql import text def insert_on_conflict(table, conn, keys, data_iter): @@ -1483,6 +1484,7 @@ def test_insertion_method_on_conflict_update(conn, request): conn = request.getfixturevalue(conn) from sqlalchemy.dialects.mysql import insert + from sqlalchemy.engine import Engine from sqlalchemy.sql import text def insert_on_conflict(table, conn, keys, data_iter): @@ -1534,6 +1536,7 @@ def test_read_view_postgres(conn, request): # GH 52969 conn = request.getfixturevalue(conn) + from sqlalchemy.engine import Engine from sqlalchemy.sql import text table_name = f"group_{uuid.uuid4().hex}" @@ -2464,6 +2467,8 @@ def test_read_sql_delegate(conn, request): def test_not_reflect_all_tables(sqlite_conn): conn = sqlite_conn + from sqlalchemy import text + from sqlalchemy.engine import Engine # create invalid table query_list = [ @@ -2607,6 +2612,7 @@ def test_query_by_text_obj(conn, request): # WIP : GH10846 conn_name = conn conn = request.getfixturevalue(conn) + from sqlalchemy import text if "postgres" in conn_name: name_text = text('select * from iris where "Name"=:name') @@ -3273,6 +3279,9 @@ def test_get_schema_create_table(conn, request, test_frame3): conn = request.getfixturevalue(conn) + from sqlalchemy import text + from sqlalchemy.engine import Engine + tbl = "test_get_schema_create_table" create_sql = sql.get_schema(test_frame3, tbl, con=conn) blank_test_df = test_frame3.iloc[:0] @@ -3425,6 +3434,7 @@ def test_connectable_issue_example(conn, request): # This tests the example raised in issue # https://github.com/pandas-dev/pandas/issues/10104 + from sqlalchemy.engine import Engine def test_select(connection): query = "SELECT test_foo_data FROM test_foo_data" From be305ffbeba88b1508d8128cdfbf286c43ec5bc4 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Mon, 25 Nov 2024 21:08:21 +0100 Subject: [PATCH 11/23] Add some documentation. --- pandas/io/sql.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 0b23c21b95260..2cb40db5828fb 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -969,6 +969,8 @@ def __init__( raise ValueError("Empty table name specified") def _drop_temporary_table(self): + """Drop a temporary table. Temporary tables are not in a database's meta data + and need to be dropped hard coded.""" if self.schema is None: query = f"DROP TABLE {self.name}" else: @@ -976,6 +978,8 @@ def _drop_temporary_table(self): self.pd_sql.execute(query) def _exists_temporary(self): + """Check if a temporary table exists. Temporary tables are not in a database's + meta data. The existence is duck tested by a SELECT statement.""" from sqlalchemy.exc import ( OperationalError, ProgrammingError, From 35a6394d84df1cc64b221ccb752a8f2dc79bf258 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Tue, 26 Nov 2024 14:17:52 +0100 Subject: [PATCH 12/23] Fix typo in NDFrame.to_sql docstring. --- pandas/core/generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 2a3a628bd8eff..ba369dc4a2aed 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -2845,7 +2845,7 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixs : sequence, optional + prefixes : sequence, optional A list of strings to insert after CREATE in the CREATE TABLE statement. They will be separated by spaces. From 145d18cc91bd259b3e706f3fffe65f2ed280b8fb Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Tue, 26 Nov 2024 14:57:27 +0100 Subject: [PATCH 13/23] Add prefixes parameter in to_sql suberclass. --- pandas/io/sql.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 2cb40db5828fb..a8e209043f5a6 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -1538,6 +1538,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + prefixes: Sequence[str] | None = None, engine: str = "auto", **engine_kwargs, ) -> int | None: From eddf687da131899f8025a56e3d52cae74a182c0e Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Tue, 26 Nov 2024 15:20:20 +0100 Subject: [PATCH 14/23] Add prefixes parameter to ADBC subclass method to_sql. --- pandas/io/sql.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index a8e209043f5a6..8bdda4a079d77 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -2362,6 +2362,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, + prefixes: Sequence[str] | None = None, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -2391,6 +2392,9 @@ def to_sql( Raises NotImplementedError method : {None', 'multi', callable}, default None Raises NotImplementedError + prefixes : sequence, optional + A list of strings to insert after CREATE in the CREATE TABLE statement. + They will be separated by spaces. engine : {'auto', 'sqlalchemy'}, default 'auto' Raises NotImplementedError if not set to 'auto' """ From 3190142ad7b006cad4548e3fd94fc4c1408e2ec1 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Tue, 26 Nov 2024 15:38:09 +0100 Subject: [PATCH 15/23] Disable case sensitivity check for temporary tables. --- pandas/io/sql.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 8bdda4a079d77..4e9fa5e41c93b 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -2084,7 +2084,9 @@ def to_sql( **engine_kwargs, ) - self.check_case_sensitive(name=name, schema=schema) + # only check case sensitivity for non temporary tables + if not table.is_temporary: + self.check_case_sensitive(name=name, schema=schema) return total_inserted @property From 8a0611dffcf807bf45dda9473f32093d28ec9000 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Wed, 27 Nov 2024 12:56:03 +0100 Subject: [PATCH 16/23] [WIP] Add support for adbc driver. --- pandas/io/sql.py | 45 +++++++++++++++++++++++++++++++++++-- pandas/tests/io/test_sql.py | 6 ++--- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 4e9fa5e41c93b..6533db346ca27 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -2415,6 +2415,11 @@ def to_sql( "engine != 'auto' not implemented for ADBC drivers" ) + # check if the table to be created is a temporary table + temporary = prefixes is not None and "TEMPORARY".casefold() in [ + prefix.casefold() for prefix in prefixes + ] + if schema: table_name = f"{schema}.{name}" else: @@ -2425,7 +2430,14 @@ def to_sql( # as applicable modes, so the semantics get blurred across # the libraries mode = "create" - if self.has_table(name, schema): + + # for temporary tables use duck testing for existence check + if temporary: + exists = self._has_table_temporary(name, schema) + else: + exists = self.has_table(name, schema) + + if exists: if if_exists == "fail": raise ValueError(f"Table '{table_name}' already exists.") elif if_exists == "replace": @@ -2443,12 +2455,41 @@ def to_sql( with self.con.cursor() as cur: total_inserted = cur.adbc_ingest( - table_name=name, data=tbl, mode=mode, db_schema_name=schema + table_name=name, + data=tbl, + mode=mode, + db_schema_name=schema, + temporary=temporary, ) self.con.commit() return total_inserted + def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: + """Check if a temporary table exists. Temporary tables are not in a database's + meta data. The existence is duck tested by a SELECT statement.""" + from adbc_driver_manager import ProgrammingError + + # sqlite doesn't allow a rollback at this point + rollback = ( + True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False + ) + + if schema is None: + query = f"SELECT * FROM {name} LIMIT 1" + else: + query = f"SELECT * FROM {schema}.{name} LIMIT 1" + try: + with self.con.cursor() as cur: + cur.execute(query) + return True + except ProgrammingError: + if rollback: + # Some DBMS (e.g. postgres) require a rollback after a caught exception + with self.con.cursor() as cur: + cur.execute("rollback") + return False + def has_table(self, name: str, schema: str | None = None) -> bool: meta = self.con.adbc_get_objects( db_schema_filter=schema, table_name_filter=name diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index ecccceafad516..be9c90cdc96ef 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -4462,7 +4462,7 @@ def test_exists_temporary_table(conn, test_frame1, request): assert True if table.exists() else False -@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool + adbc_connectable) def test_to_sql_temporary_table_replace(conn, test_frame1, request): conn = request.getfixturevalue(conn) @@ -4487,7 +4487,7 @@ def test_to_sql_temporary_table_replace(conn, test_frame1, request): assert_frame_equal(test_frame1, df_test) -@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool + adbc_connectable) def test_to_sql_temporary_table_fail(conn, test_frame1, request): conn = request.getfixturevalue(conn) @@ -4509,7 +4509,7 @@ def test_to_sql_temporary_table_fail(conn, test_frame1, request): ) -@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool) +@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool + adbc_connectable) def test_to_sql_temporary_table_append(conn, test_frame1, request): conn = request.getfixturevalue(conn) From 522f8428fdaa6d3013700dd875d027122cc58ca9 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Wed, 27 Nov 2024 14:11:53 +0100 Subject: [PATCH 17/23] Fix mypy unsupported operand types error. --- pandas/tests/io/test_sql.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index be9c90cdc96ef..f793f03a2ffd6 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -1061,6 +1061,8 @@ def sqlite_buildin_types(sqlite_buildin, types_data): sqlalchemy_connectable_types + ["sqlite_buildin_types"] + adbc_connectable_types ) +temporary_connectable = sqlalchemy_connectable_default_pool + adbc_connectable + @pytest.mark.parametrize("conn", all_connectable) def test_dataframe_to_sql(conn, test_frame1, request): @@ -4462,7 +4464,7 @@ def test_exists_temporary_table(conn, test_frame1, request): assert True if table.exists() else False -@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool + adbc_connectable) +@pytest.mark.parametrize("conn", temporary_connectable) def test_to_sql_temporary_table_replace(conn, test_frame1, request): conn = request.getfixturevalue(conn) @@ -4487,7 +4489,7 @@ def test_to_sql_temporary_table_replace(conn, test_frame1, request): assert_frame_equal(test_frame1, df_test) -@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool + adbc_connectable) +@pytest.mark.parametrize("conn", temporary_connectable) def test_to_sql_temporary_table_fail(conn, test_frame1, request): conn = request.getfixturevalue(conn) @@ -4509,7 +4511,7 @@ def test_to_sql_temporary_table_fail(conn, test_frame1, request): ) -@pytest.mark.parametrize("conn", sqlalchemy_connectable_default_pool + adbc_connectable) +@pytest.mark.parametrize("conn", temporary_connectable) def test_to_sql_temporary_table_append(conn, test_frame1, request): conn = request.getfixturevalue(conn) From efad6b4c7ec32a4e3b83bbbb238484520e58c1fc Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Tue, 28 Jan 2025 21:21:21 +0100 Subject: [PATCH 18/23] Replace prefixes parameter with temporary. --- pandas/core/generic.py | 9 +++-- pandas/io/sql.py | 70 ++++++++++++++++--------------------- pandas/tests/io/test_sql.py | 14 ++++---- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 6de8c91bca528..725dee48236e7 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -2788,7 +2788,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, ) -> int | None: """ Write records stored in a DataFrame to a SQL database. @@ -2845,9 +2845,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixes : sequence, optional - A list of strings to insert after CREATE in the CREATE TABLE statement. - They will be separated by spaces. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. Returns ------- @@ -3021,7 +3020,7 @@ def to_sql( chunksize=chunksize, dtype=dtype, method=method, - prefixes=prefixes, + temporary=temporary, ) @final diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 5396c22102c25..a9bcfa2d924e9 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -72,7 +72,6 @@ Generator, Iterator, Mapping, - Sequence, ) from sqlalchemy import Table @@ -745,7 +744,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -793,9 +792,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixes : sequence, optional - A list of strings to insert after CREATE in the CREATE TABLE statement. - They will be separated by spaces. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. engine : {'auto', 'sqlalchemy'}, default 'auto' SQL engine library to use. If 'auto', then the option ``io.sql.engine`` is used. The default ``io.sql.engine`` @@ -844,7 +842,7 @@ def to_sql( chunksize=chunksize, dtype=dtype, method=method, - prefixes=prefixes, + temporary=temporary, engine=engine, **engine_kwargs, ) @@ -938,7 +936,7 @@ def __init__( schema=None, keys=None, dtype: DtypeArg | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, ) -> None: self.name = name self.pd_sql = pandas_sql_engine @@ -949,11 +947,7 @@ def __init__( self.if_exists = if_exists self.keys = keys self.dtype = dtype - self.prefixes = prefixes - # check if the table to be created is a temporary table - self.is_temporary = self.prefixes is not None and "TEMPORARY".casefold() in [ - prefix.casefold() for prefix in self.prefixes - ] + self.temporary = temporary if frame is not None: # We want to initialize based on a dataframe @@ -1000,7 +994,7 @@ def _exists_temporary(self): return False def exists(self): - if self.is_temporary: + if self.temporary: return self._exists_temporary() else: return self.pd_sql.has_table(self.name, self.schema) @@ -1012,7 +1006,7 @@ def sql_schema(self) -> str: def _execute_create(self) -> None: # Inserting table into database, add to MetaData object - if not self.is_temporary: + if not self.temporary: # only insert into meta data, if table is not temporary self.table = self.table.to_metadata(self.pd_sql.meta) with self.pd_sql.run_transaction(): @@ -1023,7 +1017,7 @@ def create(self) -> None: if self.if_exists == "fail": raise ValueError(f"Table '{self.name}' already exists.") if self.if_exists == "replace": - if self.is_temporary: + if self.temporary: self._drop_temporary_table() else: self.pd_sql.drop_table(self.name, self.schema) @@ -1317,10 +1311,16 @@ def _create_table_setup(self): schema = self.schema or self.pd_sql.meta.schema + # check if table is temporary + if self.temporary: + prefixes = ["TEMPORARY"] + else: + prefixes = None + # At this point, attach to new metadata, only attach to self.meta # once table is created. meta = MetaData() - return Table(self.name, meta, *columns, schema=schema, prefixes=self.prefixes) + return Table(self.name, meta, *columns, schema=schema, prefixes=prefixes) def _harmonize_columns( self, @@ -1538,7 +1538,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -1923,7 +1923,7 @@ def prep_table( index_label=None, schema=None, dtype: DtypeArg | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, ) -> SQLTable: """ Prepares table in the database for data insertion. Creates it if needed, etc. @@ -1959,7 +1959,7 @@ def prep_table( index_label=index_label, schema=schema, dtype=dtype, - prefixes=prefixes, + temporary=temporary, ) table.create() return table @@ -2004,7 +2004,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -2046,9 +2046,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixes : sequence, optional - A list of strings to insert after CREATE in the CREATE TABLE statement. - They will be separated by spaces. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. engine : {'auto', 'sqlalchemy'}, default 'auto' SQL engine library to use. If 'auto', then the option ``io.sql.engine`` is used. The default ``io.sql.engine`` @@ -2069,7 +2068,7 @@ def to_sql( index_label=index_label, schema=schema, dtype=dtype, - prefixes=prefixes, + temporary=temporary, ) total_inserted = sql_engine.insert_records( @@ -2085,7 +2084,7 @@ def to_sql( ) # only check case sensitivity for non temporary tables - if not table.is_temporary: + if not table.temporary: self.check_case_sensitive(name=name, schema=schema) return total_inserted @@ -2364,7 +2363,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -2394,9 +2393,8 @@ def to_sql( Raises NotImplementedError method : {None', 'multi', callable}, default None Raises NotImplementedError - prefixes : sequence, optional - A list of strings to insert after CREATE in the CREATE TABLE statement. - They will be separated by spaces. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. engine : {'auto', 'sqlalchemy'}, default 'auto' Raises NotImplementedError if not set to 'auto' """ @@ -2415,11 +2413,6 @@ def to_sql( "engine != 'auto' not implemented for ADBC drivers" ) - # check if the table to be created is a temporary table - temporary = prefixes is not None and "TEMPORARY".casefold() in [ - prefix.casefold() for prefix in prefixes - ] - if schema: table_name = f"{schema}.{name}" else: @@ -2864,7 +2857,7 @@ def to_sql( chunksize: int | None = None, dtype: DtypeArg | None = None, method: Literal["multi"] | Callable | None = None, - prefixes: Sequence[str] | None = None, + temporary: bool = False, engine: str = "auto", **engine_kwargs, ) -> int | None: @@ -2905,9 +2898,8 @@ def to_sql( Details and a sample callable implementation can be found in the section :ref:`insert method `. - prefixes : sequence, optional - A list of strings to insert after CREATE in the CREATE TABLE statement. - They will be separated by spaces. + temporary : bool, default False + Indicates if the created, replaced or appended table is temporary. """ if dtype: if not is_dict_like(dtype): @@ -2933,7 +2925,7 @@ def to_sql( if_exists=if_exists, index_label=index_label, dtype=dtype, - prefixes=prefixes, + temporary=temporary, ) table.create() return table.insert(chunksize, method) diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index f793f03a2ffd6..df9fac70bcfa1 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -4456,7 +4456,7 @@ def test_exists_temporary_table(conn, test_frame1, request): frame=test_frame1, index=False, if_exists="fail", - prefixes=["TEMPORARY"], + temporary=True, ) table.create() @@ -4473,7 +4473,7 @@ def test_to_sql_temporary_table_replace(conn, test_frame1, request): con=conn, if_exists="fail", index=False, - prefixes=["TEMPORARY"], + temporary=True, ) test_frame1.to_sql( @@ -4481,7 +4481,7 @@ def test_to_sql_temporary_table_replace(conn, test_frame1, request): con=conn, if_exists="replace", index=False, - prefixes=["TEMPORARY"], + temporary=True, ) df_test = pd.read_sql("SELECT * FROM test_frame1", conn) @@ -4498,7 +4498,7 @@ def test_to_sql_temporary_table_fail(conn, test_frame1, request): con=conn, if_exists="fail", index=False, - prefixes=["TEMPORARY"], + temporary=True, ) with pytest.raises(ValueError, match=r"Table 'test_frame1' already exists."): @@ -4507,7 +4507,7 @@ def test_to_sql_temporary_table_fail(conn, test_frame1, request): con=conn, if_exists="fail", index=False, - prefixes=["TEMPORARY"], + temporary=True, ) @@ -4520,7 +4520,7 @@ def test_to_sql_temporary_table_append(conn, test_frame1, request): con=conn, if_exists="fail", index=False, - prefixes=["TEMPORARY"], + temporary=True, ) test_frame1.to_sql( @@ -4528,7 +4528,7 @@ def test_to_sql_temporary_table_append(conn, test_frame1, request): con=conn, if_exists="append", index=False, - prefixes=["TEMPORARY"], + temporary=True, ) df_test = pd.read_sql("SELECT * FROM test_frame1", conn) From 4da49dd0a13fd1184715725c3b6068691e999424 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Wed, 5 Feb 2025 15:57:55 +0100 Subject: [PATCH 19/23] Add whatsnew entry. --- doc/source/whatsnew/v2.3.0.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/source/whatsnew/v2.3.0.rst b/doc/source/whatsnew/v2.3.0.rst index 8bdddb5b7f85d..4070d70dde391 100644 --- a/doc/source/whatsnew/v2.3.0.rst +++ b/doc/source/whatsnew/v2.3.0.rst @@ -39,6 +39,9 @@ Other enhancements - :meth:`~Series.to_hdf` and :meth:`~DataFrame.to_hdf` now round-trip with ``StringDtype`` (:issue:`60663`) - The :meth:`~Series.cumsum`, :meth:`~Series.cummin`, and :meth:`~Series.cummax` reductions are now implemented for ``StringDtype`` columns when backed by PyArrow (:issue:`60633`) - The :meth:`~Series.sum` reduction is now implemented for ``StringDtype`` columns (:issue:`59853`) +- The ``to_sql`` method now supports a new parameter ``temporary: bool = False``. + Setting this parameter to ``True`` enables creating, appending or replacing temporary + tables via ``to_sql`` using ``sqlalchemy``, ``adbc`` and ``sqlite`` connectors. .. --------------------------------------------------------------------------- .. _whatsnew_230.notable_bug_fixes: From b0db943e120eeed1d50c1a27abb45085d439af43 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Wed, 5 Feb 2025 16:00:50 +0100 Subject: [PATCH 20/23] Add issue number to whatsnew --- doc/source/whatsnew/v2.3.0.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/whatsnew/v2.3.0.rst b/doc/source/whatsnew/v2.3.0.rst index 4070d70dde391..e8630fd63b02b 100644 --- a/doc/source/whatsnew/v2.3.0.rst +++ b/doc/source/whatsnew/v2.3.0.rst @@ -42,6 +42,7 @@ Other enhancements - The ``to_sql`` method now supports a new parameter ``temporary: bool = False``. Setting this parameter to ``True`` enables creating, appending or replacing temporary tables via ``to_sql`` using ``sqlalchemy``, ``adbc`` and ``sqlite`` connectors. + (:issue:`60422`) .. --------------------------------------------------------------------------- .. _whatsnew_230.notable_bug_fixes: From cd51d7c6724c4d20f13bfbcbe178bc1cacfd6f77 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Sat, 15 Feb 2025 20:02:21 +0100 Subject: [PATCH 21/23] Change error handling of _exists_temporary to a higher abstraction level. --- pandas/io/sql.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index a9bcfa2d924e9..1c31b4554e22b 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -974,24 +974,22 @@ def _drop_temporary_table(self): def _exists_temporary(self): """Check if a temporary table exists. Temporary tables are not in a database's meta data. The existence is duck tested by a SELECT statement.""" - from sqlalchemy.exc import ( - OperationalError, - ProgrammingError, - ) + from sqlalchemy import text + from sqlalchemy.exc import DatabaseError if self.schema is None: query = f"SELECT * FROM {self.name} LIMIT 1" else: query = f"SELECT * FROM {self.schema}.{self.name} LIMIT 1" try: - _ = self.pd_sql.read_query(query) + _ = self.pd_sql.con.execute(text(query)) return True - except ProgrammingError: - # Some DBMS (e.g. postgres) require a rollback after a caught exception - self.pd_sql.execute("rollback") - return False - except OperationalError: - return False + except DatabaseError: + try: + self.pd_sql.con.execute(text("rollback")) + return False + except DatabaseError: + return False def exists(self): if self.temporary: From 5d306c26644cae12ffbaa6528c00bba7b9643d74 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Sat, 15 Feb 2025 20:50:08 +0100 Subject: [PATCH 22/23] Use nested try except to handle rollbacks. --- pandas/io/sql.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 1c31b4554e22b..336f1a0acdbd5 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -985,6 +985,7 @@ def _exists_temporary(self): _ = self.pd_sql.con.execute(text(query)) return True except DatabaseError: + # Some DBMS (e.g. postgres) require a rollback after a caught exception try: self.pd_sql.con.execute(text("rollback")) return False @@ -2461,11 +2462,6 @@ def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: meta data. The existence is duck tested by a SELECT statement.""" from adbc_driver_manager import ProgrammingError - # sqlite doesn't allow a rollback at this point - rollback = ( - True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False - ) - if schema is None: query = f"SELECT * FROM {name} LIMIT 1" else: @@ -2475,11 +2471,13 @@ def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: cur.execute(query) return True except ProgrammingError: - if rollback: - # Some DBMS (e.g. postgres) require a rollback after a caught exception + # Some DBMS (e.g. postgres) require a rollback after a caught exception + try: with self.con.cursor() as cur: cur.execute("rollback") - return False + return False + except ProgrammingError: + return False def has_table(self, name: str, schema: str | None = None) -> bool: meta = self.con.adbc_get_objects( From 8cafc4493cdbff18a3528bfb02ca0a4981407f39 Mon Sep 17 00:00:00 2001 From: Diadochokinetic Date: Sat, 15 Feb 2025 21:43:10 +0100 Subject: [PATCH 23/23] Nested try except blocks don't work with adbc. --- pandas/io/sql.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pandas/io/sql.py b/pandas/io/sql.py index 336f1a0acdbd5..c778f7997b653 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -2462,6 +2462,11 @@ def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: meta data. The existence is duck tested by a SELECT statement.""" from adbc_driver_manager import ProgrammingError + # sqlite doesn't allow a rollback at this point + rollback = ( + True if not self.con.adbc_get_info()["vendor_name"] == "SQLite" else False + ) + if schema is None: query = f"SELECT * FROM {name} LIMIT 1" else: @@ -2471,13 +2476,11 @@ def _has_table_temporary(self, name: str, schema: str | None = None) -> bool: cur.execute(query) return True except ProgrammingError: - # Some DBMS (e.g. postgres) require a rollback after a caught exception - try: + if rollback: + # Some DBMS (e.g. postgres) require a rollback after a caught exception with self.con.cursor() as cur: cur.execute("rollback") - return False - except ProgrammingError: - return False + return False def has_table(self, name: str, schema: str | None = None) -> bool: meta = self.con.adbc_get_objects(