encoding.py: write_embeddings_batch accepts commit parameter for transactional composition

Adds an optional commit=True parameter to write_embeddings_batch. When True
(default, matching prior behavior), the function commits the connection
after the per-row UPSERT loop. When False, the caller manages the
transaction.

This unblocks fix #1 (pgvector-bypass paths) and fix #2 (watcher
two-transaction pattern), both of which need to compose embeddings writes
with other database writes in the same transaction. Without this lever,
either fix would require duplicating the UPSERT logic outside this helper
or introducing a second commit boundary inside an otherwise atomic
operation.

No behavior change for existing callers — they all use the default
commit=True and continue working unchanged.
This commit is contained in:
2026-05-05 02:52:33 +00:00
parent b09e35892c
commit 5b4a299414
+9 -3
View File
@@ -202,8 +202,8 @@ def chunk_and_embed(text: str,
return rows return rows
def write_embeddings_batch(conn, batch: list[dict]) -> int: def write_embeddings_batch(conn, batch: list[dict], commit: bool = True) -> int:
"""Single canonical INSERT. Sets created_at = NOW() server-side. Commits. """Single canonical INSERT. Sets created_at = NOW() server-side.
Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so
callers do not need to provide it. The application-layer assertion is the callers do not need to provide it. The application-layer assertion is the
@@ -211,6 +211,11 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int:
historical NULLs were resolved by the Improvement #2 backfill, and a historical NULLs were resolved by the Improvement #2 backfill, and a
Python-level raise gives a faster, more debuggable failure than a Python-level raise gives a faster, more debuggable failure than a
Postgres constraint error. Postgres constraint error.
When commit=True (default), this function commits the connection itself.
When commit=False, the caller is responsible for committing. Use
commit=False when composing this write with other writes that must land
atomically in the same transaction.
""" """
if not batch: if not batch:
return 0 return 0
@@ -233,5 +238,6 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int:
metadata = EXCLUDED.metadata metadata = EXCLUDED.metadata
""", (row["id"], row["document"], row["embedding"], """, (row["id"], row["document"], row["embedding"],
row["source"], row["type"], json.dumps(row["metadata"]))) row["source"], row["type"], json.dumps(row["metadata"])))
conn.commit() if commit:
conn.commit()
return len(batch) return len(batch)