Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ All warnings for upcoming changes in pandas will have the base class :class:`pan

Other enhancements
^^^^^^^^^^^^^^^^^^
- :func:`DataFrame.to_sql` now accepts a ``hints`` parameter to pass database-specific query hints for optimizing insert performance. The hints are specified as a dictionary mapping dialect names to hint strings (e.g., ``{'oracle': '/*+ APPEND PARALLEL(4) */', 'mysql': 'DELAYED'}``). Users are responsible for providing correctly formatted hint strings for their target database (:issue:`61370`)
- :func:`pandas.merge` propagates the ``attrs`` attribute to the result if all
inputs have identical ``attrs``, as has so far already been the case for
:func:`pandas.concat`.
Expand Down
17 changes: 17 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2798,6 +2798,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
) -> int | None:
"""
Write records stored in a DataFrame to a SQL database.
Expand Down Expand Up @@ -2861,6 +2862,21 @@ def to_sql(

Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
hints : dict[str, str], optional
Dictionary of SQL hints to optimize insertion performance, keyed by
database dialect name (e.g., 'oracle', 'mysql', 'postgresql', 'mssql').
Each value should be a complete hint string formatted exactly as required
by the target database. The user is responsible for providing correctly
formatted hint strings.

Examples: ``{'oracle': '/*+ APPEND PARALLEL(4) */', 'mysql': 'DELAYED'}``

.. note::
- Hints are database-specific and ignored for unsupported dialects.
- SQLite raises a ``UserWarning`` (hints not supported).
- ADBC connections raise ``NotImplementedError``.

.. versionadded:: 3.0.0

Returns
-------
Expand Down Expand Up @@ -3044,6 +3060,7 @@ def to_sql(
chunksize=chunksize,
dtype=dtype,
method=method,
hints=hints,
)

@final
Expand Down
108 changes: 96 additions & 12 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
datetime,
time,
)
from functools import partial
import re
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -235,6 +234,18 @@ def _wrap_result_adbc(
return df


def _process_sql_hints(hints: dict[str, str] | None, dialect_name: str) -> str | None:
if hints is None:
return None

dialect_name = dialect_name.lower()
for key, value in hints.items():
if key.lower() == dialect_name:
return value

return None


# -----------------------------------------------------------------------------
# -- Read and write to DataFrames

Expand Down Expand Up @@ -753,6 +764,7 @@ def to_sql(
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
engine: str = "auto",
hints: dict[str, str] | None = None,
**engine_kwargs,
) -> int | None:
"""
Expand Down Expand Up @@ -813,6 +825,23 @@ def to_sql(

.. versionadded:: 1.3.0

hints : dict[str, str], optional
SQL hints to optimize insertion performance, keyed by database dialect name.
Each hint should be a complete string formatted exactly as required by the
target database. The user is responsible for constructing dialect-specific
syntax.

Examples: ``{'oracle': '/*+ APPEND PARALLEL(4) */'}``
``{'mysql': 'DELAYED'}``
``{'mssql': 'WITH (TABLOCK)'}``

.. note::
- Hints are database-specific and will be ignored for unsupported dialects
- SQLite will raise a UserWarning (hints not supported)
- ADBC connections will raise NotImplementedError

.. versionadded:: 3.0.0

**engine_kwargs
Any additional kwargs are passed to the engine.

Expand Down Expand Up @@ -855,6 +884,7 @@ def to_sql(
dtype=dtype,
method=method,
engine=engine,
hints=hints,
**engine_kwargs,
)

Expand Down Expand Up @@ -1004,7 +1034,13 @@ def create(self) -> None:
else:
self._execute_create()

def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
def _execute_insert(
self,
conn,
keys: list[str],
data_iter,
hint_str: str | None = None,
) -> int:
"""
Execute SQL statement inserting data

Expand All @@ -1016,11 +1052,23 @@ def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
data_iter : generator of list
Each item contains a list of values to be inserted
"""
data = [dict(zip(keys, row, strict=True)) for row in data_iter]
result = self.pd_sql.execute(self.table.insert(), data)
data = [dict(zip(keys, row, strict=False)) for row in data_iter]

if hint_str:
stmt = self.table.insert().prefix_with(hint_str)
else:
stmt = self.table.insert()

result = self.pd_sql.execute(stmt, data)
return result.rowcount

def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
def _execute_insert_multi(
self,
conn,
keys: list[str],
data_iter,
hint_str: str | None = None,
) -> int:
"""
Alternative to _execute_insert for DBs support multi-value INSERT.

Expand All @@ -1029,11 +1077,15 @@ def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
but performance degrades quickly with increase of columns.

"""

from sqlalchemy import insert

data = [dict(zip(keys, row, strict=True)) for row in data_iter]
stmt = insert(self.table).values(data)
data = [dict(zip(keys, row, strict=False)) for row in data_iter]

if hint_str:
stmt = insert(self.table).values(data).prefix_with(hint_str)
else:
stmt = insert(self.table).values(data)

result = self.pd_sql.execute(stmt)
return result.rowcount

Expand Down Expand Up @@ -1090,14 +1142,20 @@ def insert(
self,
chunksize: int | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
dialect_name: str | None = None,
) -> int | None:
# set insert method
if method is None:
exec_insert = self._execute_insert
elif method == "multi":
exec_insert = self._execute_insert_multi
elif callable(method):
exec_insert = partial(method, self)

def callable_wrapper(conn, keys, data_iter, hint_str=None):
return method(self, conn, keys, data_iter)

exec_insert = callable_wrapper
else:
raise ValueError(f"Invalid parameter `method`: {method}")

Expand All @@ -1114,6 +1172,9 @@ def insert(
raise ValueError("chunksize argument should be non-zero")

chunks = (nrows // chunksize) + 1

hint_str = _process_sql_hints(hints, dialect_name) if dialect_name else None

total_inserted = None
with self.pd_sql.run_transaction() as conn:
for i in range(chunks):
Expand All @@ -1125,7 +1186,7 @@ def insert(
chunk_iter = zip(
*(arr[start_i:end_i] for arr in data_list), strict=True
)
num_inserted = exec_insert(conn, keys, chunk_iter)
num_inserted = exec_insert(conn, keys, chunk_iter, hint_str)
# GH 46891
if num_inserted is not None:
if total_inserted is None:
Expand Down Expand Up @@ -1509,6 +1570,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -1545,6 +1607,8 @@ def insert_records(
schema=None,
chunksize: int | None = None,
method=None,
hints: dict[str, str] | None = None,
dialect_name: str | None = None,
**engine_kwargs,
) -> int | None:
"""
Expand All @@ -1569,6 +1633,8 @@ def insert_records(
schema=None,
chunksize: int | None = None,
method=None,
hints: dict[str, str] | None = None,
dialect_name: str | None = None,
**engine_kwargs,
) -> int | None:
from sqlalchemy import exc
Expand Down Expand Up @@ -1980,6 +2046,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2053,6 +2120,8 @@ def to_sql(
schema=schema,
chunksize=chunksize,
method=method,
hints=hints,
dialect_name=self.con.dialect.name,
**engine_kwargs,
)

Expand Down Expand Up @@ -2344,6 +2413,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2394,6 +2464,8 @@ def to_sql(
raise NotImplementedError(
"engine != 'auto' not implemented for ADBC drivers"
)
if hints:
raise NotImplementedError("'hints' is not implemented for ADBC drivers")

if schema:
table_name = f"{schema}.{name}"
Expand Down Expand Up @@ -2575,7 +2647,9 @@ def insert_statement(self, *, num_rows: int) -> str:
)
return insert_statement

def _execute_insert(self, conn, keys, data_iter) -> int:
def _execute_insert(
self, conn, keys: list[str], data_iter, hint_str: str | None = None
) -> int:
from sqlite3 import Error

data_list = list(data_iter)
Expand All @@ -2585,7 +2659,9 @@ def _execute_insert(self, conn, keys, data_iter) -> int:
raise DatabaseError("Execution failed") from exc
return conn.rowcount

def _execute_insert_multi(self, conn, keys, data_iter) -> int:
def _execute_insert_multi(
self, conn, keys: list[str], data_iter, hint_str: str | None = None
) -> int:
data_list = list(data_iter)
flattened_data = [x for row in data_list for x in row]
conn.execute(self.insert_statement(num_rows=len(data_list)), flattened_data)
Expand Down Expand Up @@ -2821,6 +2897,7 @@ def to_sql(
chunksize: int | None = None,
dtype: DtypeArg | None = None,
method: Literal["multi"] | Callable | None = None,
hints: dict[str, str] | None = None,
engine: str = "auto",
**engine_kwargs,
) -> int | None:
Expand Down Expand Up @@ -2863,6 +2940,13 @@ def to_sql(
Details and a sample callable implementation can be found in the
section :ref:`insert method <io.sql.method>`.
"""
if hints:
warnings.warn(
"SQL hints are not supported for SQLite and will be ignored.",
UserWarning,
stacklevel=find_stack_level(),
)

if dtype:
if not is_dict_like(dtype):
# error: Value expression in dictionary comprehension has incompatible
Expand Down
Loading
Loading