Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
Changelog
===========

.. _unreleased:

Unreleased
----------

- The ``table.insert_all()`` and ``table.upsert_all()`` methods can now accept an iterator of lists or tuples as an alternative to dictionaries. The first item should be a list/tuple of column names. See :ref:`python_api_insert_lists` for details. (:issue:`672`)

.. _v4_0a0:

4.0a0 (2025-05-08)
Expand Down
29 changes: 29 additions & 0 deletions docs/python-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,35 @@ You can delete all the existing rows in the table before inserting the new recor

Pass ``analyze=True`` to run ``ANALYZE`` against the table after inserting the new records.

.. _python_api_insert_lists:

Inserting data from a list or tuple iterator
--------------------------------------------

As an alternative to passing an iterator of dictionaries, you can pass an iterator of lists or tuples. The first item yielded by the iterator must be a list or tuple of string column names, and subsequent items should be lists or tuples of values:

.. code-block:: python

db["creatures"].insert_all([
["name", "species"],
["Cleo", "dog"],
["Lila", "chicken"],
["Bants", "chicken"],
])

This also works with generators:

.. code-block:: python

def creatures():
yield "id", "name", "city"
yield 1, "Cleo", "San Francisco"
yield 2, "Lila", "Los Angeles"

db["creatures"].insert_all(creatures())

Tuples and lists are both supported.

.. _python_api_insert_replace:

Insert-replacing data
Expand Down
192 changes: 146 additions & 46 deletions sqlite_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
Dict,
Generator,
Iterable,
Sequence,
Union,
Optional,
List,
Expand Down Expand Up @@ -3010,6 +3011,7 @@ def build_insert_queries_and_params(
num_records_processed,
replace,
ignore,
list_mode=False,
):
"""
Given a list ``chunk`` of records that should be written to *this* table,
Expand All @@ -3024,24 +3026,47 @@ def build_insert_queries_and_params(
# Build a row-list ready for executemany-style flattening
values = []

for record in chunk:
record_values = []
for key in all_columns:
value = jsonify_if_needed(
record.get(
key,
(
None
if key != hash_id
else hash_record(record, hash_id_columns)
),
if list_mode:
# In list mode, records are already lists of values
num_columns = len(all_columns)
has_extracts = bool(extracts)
for record in chunk:
# Pad short records with None, truncate long ones
record_len = len(record)
if record_len < num_columns:
record_values = [jsonify_if_needed(v) for v in record] + [None] * (
num_columns - record_len
)
)
if key in extracts:
extract_table = extracts[key]
value = self.db[extract_table].lookup({"value": value})
record_values.append(value)
values.append(record_values)
else:
record_values = [jsonify_if_needed(v) for v in record[:num_columns]]
# Only process extracts if there are any
if has_extracts:
for i, key in enumerate(all_columns):
if key in extracts:
record_values[i] = self.db[extracts[key]].lookup(
{"value": record_values[i]}
)
values.append(record_values)
else:
# Dict mode: original logic
for record in chunk:
record_values = []
for key in all_columns:
value = jsonify_if_needed(
record.get(
key,
(
None
if key != hash_id
else hash_record(record, hash_id_columns)
),
)
)
if key in extracts:
extract_table = extracts[key]
value = self.db[extract_table].lookup({"value": value})
record_values.append(value)
values.append(record_values)

columns_sql = ", ".join(f"[{c}]" for c in all_columns)
placeholder_expr = ", ".join(conversions.get(c, "?") for c in all_columns)
Expand Down Expand Up @@ -3157,6 +3182,7 @@ def insert_chunk(
num_records_processed,
replace,
ignore,
list_mode=False,
) -> Optional[sqlite3.Cursor]:
queries_and_params = self.build_insert_queries_and_params(
extracts,
Expand All @@ -3171,6 +3197,7 @@ def insert_chunk(
num_records_processed,
replace,
ignore,
list_mode,
)
result = None
with self.db.conn:
Expand Down Expand Up @@ -3200,6 +3227,7 @@ def insert_chunk(
num_records_processed,
replace,
ignore,
list_mode,
)

result = self.insert_chunk(
Expand All @@ -3216,6 +3244,7 @@ def insert_chunk(
num_records_processed,
replace,
ignore,
list_mode,
)

else:
Expand Down Expand Up @@ -3293,7 +3322,10 @@ def insert(

def insert_all(
self,
records,
records: Union[
Iterable[Dict[str, Any]],
Iterable[Sequence[Any]],
],
pk=DEFAULT,
foreign_keys=DEFAULT,
column_order=DEFAULT,
Expand Down Expand Up @@ -3353,17 +3385,54 @@ def insert_all(
all_columns = []
first = True
num_records_processed = 0
# Fix up any records with square braces in the column names
records = fix_square_braces(records)
# We can only handle a max of 999 variables in a SQL insert, so
# we need to adjust the batch_size down if we have too many cols
records = iter(records)
# Peek at first record to count its columns:

# Detect if we're using list-based iteration or dict-based iteration
list_mode = False
column_names: List[str] = []

# Fix up any records with square braces in the column names (only for dict mode)
# We'll handle this differently for list mode
records_iter = iter(records)

# Peek at first record to determine mode:
try:
first_record = next(records)
first_record = next(records_iter)
except StopIteration:
return self # It was an empty list
num_columns = len(first_record.keys())

# Check if this is list mode or dict mode
if isinstance(first_record, (list, tuple)):
# List/tuple mode: first record should be column names
list_mode = True
if not all(isinstance(col, str) for col in first_record):
raise ValueError(
"When using list-based iteration, the first yielded value must be a list of column name strings"
)
column_names = list(first_record)
all_columns = column_names
num_columns = len(column_names)
# Get the actual first data record
try:
first_record = next(records_iter)
except StopIteration:
return self # Only headers, no data
if not isinstance(first_record, (list, tuple)):
raise ValueError(
"After column names list, all subsequent records must also be lists"
)
else:
# Dict mode: traditional behavior
records_iter = itertools.chain([first_record], records_iter)
records_iter = fix_square_braces(
cast(Iterable[Dict[str, Any]], records_iter)
)
try:
first_record = next(records_iter)
except StopIteration:
return self
first_record = cast(Dict[str, Any], first_record)
num_columns = len(first_record.keys())

assert (
num_columns <= SQLITE_MAX_VARS
), "Rows can have a maximum of {} columns".format(SQLITE_MAX_VARS)
Expand All @@ -3373,13 +3442,18 @@ def insert_all(
if truncate and self.exists():
self.db.execute("DELETE FROM [{}];".format(self.name))
result = None
for chunk in chunks(itertools.chain([first_record], records), batch_size):
for chunk in chunks(itertools.chain([first_record], records_iter), batch_size):
chunk = list(chunk)
num_records_processed += len(chunk)
if first:
if not self.exists():
# Use the first batch to derive the table names
column_types = suggest_column_types(chunk)
if list_mode:
# Convert list records to dicts for type detection
chunk_as_dicts = [dict(zip(column_names, row)) for row in chunk]
column_types = suggest_column_types(chunk_as_dicts)
else:
column_types = suggest_column_types(chunk)
if extracts:
for col in extracts:
if col in column_types:
Expand All @@ -3399,17 +3473,24 @@ def insert_all(
extracts=extracts,
strict=strict,
)
all_columns_set = set()
for record in chunk:
all_columns_set.update(record.keys())
all_columns = list(sorted(all_columns_set))
if hash_id:
all_columns.insert(0, hash_id)
if list_mode:
# In list mode, columns are already known
all_columns = list(column_names)
if hash_id:
all_columns.insert(0, hash_id)
else:
all_columns_set = set()
for record in chunk:
all_columns_set.update(record.keys())
all_columns = list(sorted(all_columns_set))
if hash_id:
all_columns.insert(0, hash_id)
else:
for record in chunk:
all_columns += [
column for column in record if column not in all_columns
]
if not list_mode:
for record in chunk:
all_columns += [
column for column in record if column not in all_columns
]

first = False

Expand All @@ -3427,6 +3508,7 @@ def insert_all(
num_records_processed,
replace,
ignore,
list_mode,
)

# If we only handled a single row populate self.last_pk
Expand All @@ -3447,14 +3529,29 @@ def insert_all(
self.last_pk = self.last_rowid
else:
# For an upsert use first_record from earlier
if hash_id:
self.last_pk = hash_record(first_record, hash_id_columns)
if list_mode:
# In list mode, look up pk value by column index
first_record_list = cast(Sequence[Any], first_record)
if hash_id:
# hash_id not supported in list mode for last_pk
pass
elif isinstance(pk, str):
pk_index = column_names.index(pk)
self.last_pk = first_record_list[pk_index]
else:
self.last_pk = tuple(
first_record_list[column_names.index(p)] for p in pk
)
else:
self.last_pk = (
first_record[pk]
if isinstance(pk, str)
else tuple(first_record[p] for p in pk)
)
first_record_dict = cast(Dict[str, Any], first_record)
if hash_id:
self.last_pk = hash_record(first_record_dict, hash_id_columns)
else:
self.last_pk = (
first_record_dict[pk]
if isinstance(pk, str)
else tuple(first_record_dict[p] for p in pk)
)

if analyze:
self.analyze()
Expand Down Expand Up @@ -3501,7 +3598,10 @@ def upsert(

def upsert_all(
self,
records,
records: Union[
Iterable[Dict[str, Any]],
Iterable[Sequence[Any]],
],
pk=DEFAULT,
foreign_keys=DEFAULT,
column_order=DEFAULT,
Expand Down
Loading