.PHONY: lint
lint: ##H @General Run ruff lint
ruff check src/git_sqlite_filter test
+ pylint src/git_sqlite_filter test
+ mypy src/git_sqlite_filter test
.PHONY: test
test: ##H @General Run tests w/ coverage report
build==1.2.1
coverage==7.4.4
isort==5.13.2
+mypy>=1.14.1
+pylint==4.0.4
pytest==8.1.1
pytest-cov==5.0.0
ruff==0.3.4
return True
return False
+ def _find_shadow_tables(self):
+ """Identify actual FTS shadow tables by scanning virtual table definitions."""
+ shadow_tables = set()
+ # Scan for FTS3/4/5 tables
+ vtabs = self.conn.execute(
+ "SELECT name, sql FROM sqlite_master WHERE sql LIKE '%VIRTUAL TABLE%USING fts%'"
+ ).fetchall()
+
+ for name, sql in vtabs:
+ # Check which FTS version is used
+ sql_upper = sql.upper()
+ if "USING FTS5" in sql_upper:
+ # FTS5 creates: _data, _idx, _content, _docsize, _config
+ suffixes = ["_data", "_idx", "_content", "_docsize", "_config"]
+ elif "USING FTS3" in sql_upper or "USING FTS4" in sql_upper:
+ # FTS3/4 creates: _content, _segments, _segdir, _docsize, _stat
+ suffixes = ["_content", "_segments", "_segdir", "_docsize", "_stat"]
+ else:
+ continue
+
+ for suffix in suffixes:
+ shadow_name = f"{name}{suffix}"
+ shadow_tables.add(shadow_name)
+
+ if self.debug and shadow_tables:
+ log(f"identified shadow tables: {shadow_tables}")
+ return shadow_tables
+
def dump(self):
"""Perform the full semantic dump."""
try:
+ # 0. Identify Shadow Tables (Pre-scan)
+ shadow_tables = self._find_shadow_tables()
+
# 1. Metadata / Versioning
user_version = self.conn.execute("PRAGMA user_version").fetchone()[0]
if not self.args.data_only:
# 3. Output Schema (Tables/Views)
if not self.args.data_only:
for obj in objects:
- # Skip auto-generated FTS5 shadow tables
- if re.search(r"_(content|data|idx|docsize|config)$", obj["name"]):
+ # Skip identified shadow tables
+ if obj["name"] in shadow_tables:
if self.debug:
log(f"skipping shadow table schema: {obj['name']}")
continue
+
if obj["sql"]:
# Ensure we have a semicolon and newline
sql = obj["sql"].strip()
# 4. Output Data (Sorted for Noise Reduction)
if not self.args.schema_only:
for obj in [o for o in objects if o["type"] == "table"]:
- if re.search(r"_(content|data|idx|docsize|config)$", obj["name"]):
+ if obj["name"] in shadow_tables:
continue
self._dump_table_data(obj["name"])
# 5. Finalize (Indexes/Triggers/Sequences)
if not self.args.data_only:
- self._dump_extras()
+ self._dump_extras(shadow_tables)
sys.stdout.write("COMMIT;\n")
return True
if not self._ensure_collation(e):
raise
- def _dump_extras(self):
+ def _dump_extras(self, shadow_tables=None):
"""Dump triggers, indexes, and autoincrement sequences."""
+ if shadow_tables is None:
+ shadow_tables = set()
# Triggers and Indexes (excluding auto-indexes)
extras = self.conn.execute(
"""
# Skip FTS5 internal triggers (they auto-recreate on FTS table creation)
if "CREATE TRIGGER" in statement_upper:
- parts = statement_upper.split()
- if len(parts) > 2:
- trigger_name = parts[2]
+ # Regex to find the table name the trigger is ON
+ # Matches: ON [schema.]table_name
+ match = re.search(r"ON\s+[\"']?([a-zA-Z0-9_]+)[\"']?", statement_upper)
+ if match:
+ table_name = match.group(1)
if any(
- x in statement_upper
+ table_name.endswith(x)
for x in ("_CONTENT", "_DOC", "_CONFIG", "_IDX", "_DATA")
):
if debug:
- log(f"skipping FTS5 internal trigger: {trigger_name}")
+ log(f"skipping FTS5 internal trigger on {table_name}")
continue
# Skip ROLLBACK - if this appears, the dump is corrupted, just skip it
for statement in sql_script_source:
f_sql.write(statement)
+ # Step 2: Attempt restoration (multi-pass if new collations are found)
+ max_retries = 100
# Step 2: Attempt restoration (multi-pass if new collations are found)
max_retries = 100
for i in range(max_retries):
self._create_temp_db()
+ retry_needed = False
+
try:
if self.debug:
log(f"restoration attempt {i+1}...")
# (though it's already statement-per-yield from the iterator)
# We use a simple generator to re-yield statements from the file.
for statement in self._yield_statements(f_sql):
- if statement.strip():
+ if not statement.strip():
+ continue
+
+ try:
self.conn.execute(statement)
- return True
+ except sqlite3.OperationalError as e:
+ if self.debug:
+ log(f"caught operational error: {e}")
+
+ # 1. Collation Error: MUST retry the whole process (broken state)
+ if self._ensure_collation(e):
+ retry_needed = True
+ break
+
+ # 2. Ignorable Error: WARN and continue to next statement
+ e_str = str(e).lower()
+ if "no such table" in e_str or (
+ "index" in e_str and "already exists" in e_str
+ ):
+ log(f"warning: ignoring error: {e}")
+ continue
+
+ # 3. Fatal Error: Raise to outer block
+ raise
+
except sqlite3.OperationalError as e:
- if self.debug:
- log(f"caught operational error: {e}")
- if not self._ensure_collation(e):
- log(f"error: restore failed: {e}")
- return False
+ log(f"error: restore failed: {e}")
+ return False
finally:
+ if not retry_needed and self.conn:
+ self.conn.close()
+
+ if retry_needed:
if self.conn:
self.conn.close()
+ continue
+
+ return True
return False
finally:
if os.path.exists(sql_tmp_path):
--- /dev/null
+PRAGMA user_version = 0;
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE access_log (id INTEGER PRIMARY KEY, user_id INTEGER);
+CREATE TABLE user_data (id INTEGER PRIMARY KEY, info TEXT);
+INSERT INTO "access_log" ("id", "user_id") VALUES (100, 1);
+INSERT INTO "user_data" ("id", "info") VALUES (1, 'alice');
+CREATE TRIGGER log_access AFTER INSERT ON access_log
+ BEGIN
+ SELECT id FROM user_data WHERE id = NEW.user_id;
+ END;
+COMMIT;
--- /dev/null
+import os
+import sqlite3
+import unittest
+from io import StringIO
+from unittest.mock import patch
+
+from git_sqlite_filter.smudge import DatabaseRestorer
+
+
+class TestSmudgeResilience(unittest.TestCase):
+ def test_missing_table_warning(self):
+ """
+ Verify that inserts into missing tables log a warning but do not abort the restoration.
+ """
+ # SQL with a valid table and an invalid table insert
+ sql_script = [
+ "PRAGMA foreign_keys=OFF;\n",
+ "BEGIN TRANSACTION;\n",
+ "CREATE TABLE valid_table (id INTEGER PRIMARY KEY, value TEXT);\n",
+ "INSERT INTO valid_table VALUES (1, 'foo');\n",
+ # This table does not exist, should trigger warning
+ "INSERT INTO missing_table VALUES (1, 'bar');\n",
+ "COMMIT;\n",
+ ]
+
+ with patch("sys.stderr", new_callable=StringIO) as mock_stderr:
+ restorer = DatabaseRestorer(debug=True)
+ # Should return True (success) despite the error
+ success = restorer.restore(sql_script)
+
+ self.assertTrue(
+ success, "Restore should succeed even with missing table errors"
+ )
+
+ # Verify the database state
+ self.assertIsNotNone(restorer.conn)
+ # The original connection is closed, but the file exists at restorer.tmp_path
+ self.assertTrue(os.path.exists(restorer.tmp_path))
+
+ with sqlite3.connect(restorer.tmp_path) as check_conn:
+ cursor = check_conn.cursor()
+
+ # Valid data should exist
+ cursor.execute("SELECT * FROM valid_table")
+ rows = cursor.fetchall()
+ self.assertEqual(len(rows), 1)
+ self.assertEqual(rows[0], (1, "foo"))
+
+ # Verify warning was logged
+ log_output = mock_stderr.getvalue()
+ # Relaxed assertion to handle sqlite version differences (main.table vs table)
+ self.assertIn("warning: ignoring error: no such table:", log_output)
+ self.assertIn("missing_table", log_output)
+
+ def test_collation_retry_limit(self):
+ """
+ Verify that we can handle a reasonable number of missing collations.
+ The user asked: "why try 100 times? isn't 2 enough?"
+ Answer: Each pass only discovers ONE missing collation.
+ If a DB uses 10 different custom collations, we need 10 retries.
+ """
+ # Create a script generating many collation errors
+ # We'll use a custom collation function for the test
+
+ # This script needs N passes to discover N collations
+ collations = [f"COLL_{i}" for i in range(5)]
+ sql_script = ["PRAGMA foreign_keys=OFF;\n", "BEGIN TRANSACTION;\n"]
+ for col in collations:
+ sql_script.append(f"CREATE TABLE t_{col} (x TEXT COLLATE {col});\n")
+ sql_script.append(
+ f"SELECT * FROM t_{col} ORDER BY x;\n"
+ ) # Trigger collation usage
+ sql_script.append("COMMIT;\n")
+
+ with patch("sys.stderr", new_callable=StringIO) as _:
+ restorer = DatabaseRestorer(debug=True)
+ success = restorer.restore(sql_script)
+
+ self.assertTrue(success)
+ self.assertEqual(len(restorer.registered_collations), 5)
+
+
+if __name__ == "__main__":
+ unittest.main()