LangSQL / build_whoosh_index.py
Roxanne-WANG's picture
update
dedd8a5
import os
import sqlite3
from whoosh import index
from whoosh.fields import Schema, ID, TEXT
def extract_contents_from_db(db_path, max_len=25):
"""
Extract all non-null, unique text values of length <= max_len
from every table and column in the SQLite database.
Returns:
List of tuples [(doc_id, text), ...]
"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()
docs = []
# Iterate over all user tables in the database
for (table_name,) in cur.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
):
if table_name == "sqlite_sequence":
continue
# PRAGMA table_info returns rows like (cid, name, type, ...)
# We want the column **name**, which is at index 1
cols = [r[1] for r in cur.execute(f"PRAGMA table_info('{table_name}')")]
# Pull distinct non-null values from each column
for col in cols:
for (val,) in cur.execute(
f"SELECT DISTINCT `{col}` FROM `{table_name}` WHERE `{col}` IS NOT NULL"
):
text = str(val).strip()
if 0 < len(text) <= max_len:
# Generate a unique document ID
doc_id = f"{table_name}-{col}-{hash(text)}"
docs.append((doc_id, text))
conn.close()
return docs
def build_index_for_db(db_id, db_path, index_root="db_contents_index"):
"""
Build (or open) a Whoosh index for a single database.
- If the index already exists in index_root/db_id, it will be opened.
- Otherwise, a new index is created and populated from the SQLite file.
"""
index_dir = os.path.join(index_root, db_id)
os.makedirs(index_dir, exist_ok=True)
# Define the schema: unique ID + stored text field
schema = Schema(
id=ID(stored=True, unique=True),
content=TEXT(stored=True)
)
# Open existing index if present
if index.exists_in(index_dir):
return index.open_dir(index_dir)
# Otherwise create a new index and add documents
ix = index.create_in(index_dir, schema)
writer = ix.writer()
docs = extract_contents_from_db(db_path)
for doc_id, text in docs:
writer.add_document(id=doc_id, content=text)
writer.commit()
return ix
if __name__ == "__main__":
DATABASE_ROOT = "databases"
INDEX_ROOT = "db_contents_index"
# Optionally remove any existing index directory to start fresh
if os.path.isdir(INDEX_ROOT):
import shutil
shutil.rmtree(INDEX_ROOT)
os.makedirs(INDEX_ROOT, exist_ok=True)
# Loop over each database folder in databases/
for db_id in os.listdir(DATABASE_ROOT):
db_file = os.path.join(DATABASE_ROOT, db_id, f"{db_id}.sqlite")
if os.path.isfile(db_file):
print(f"Building Whoosh index for {db_id}...")
build_index_for_db(db_id, db_file, INDEX_ROOT)
print("All indexes built successfully.")