import os import sqlite3 from whoosh import index from whoosh.fields import Schema, ID, TEXT def extract_contents_from_db(db_path, max_len=25): """ Extract all non-null, unique text values of length <= max_len from every table and column in the SQLite database. Returns: List of tuples [(doc_id, text), ...] """ conn = sqlite3.connect(db_path) cur = conn.cursor() docs = [] # Iterate over all user tables in the database for (table_name,) in cur.execute( "SELECT name FROM sqlite_master WHERE type='table'" ): if table_name == "sqlite_sequence": continue # PRAGMA table_info returns rows like (cid, name, type, ...) # We want the column **name**, which is at index 1 cols = [r[1] for r in cur.execute(f"PRAGMA table_info('{table_name}')")] # Pull distinct non-null values from each column for col in cols: for (val,) in cur.execute( f"SELECT DISTINCT `{col}` FROM `{table_name}` WHERE `{col}` IS NOT NULL" ): text = str(val).strip() if 0 < len(text) <= max_len: # Generate a unique document ID doc_id = f"{table_name}-{col}-{hash(text)}" docs.append((doc_id, text)) conn.close() return docs def build_index_for_db(db_id, db_path, index_root="db_contents_index"): """ Build (or open) a Whoosh index for a single database. - If the index already exists in index_root/db_id, it will be opened. - Otherwise, a new index is created and populated from the SQLite file. """ index_dir = os.path.join(index_root, db_id) os.makedirs(index_dir, exist_ok=True) # Define the schema: unique ID + stored text field schema = Schema( id=ID(stored=True, unique=True), content=TEXT(stored=True) ) # Open existing index if present if index.exists_in(index_dir): return index.open_dir(index_dir) # Otherwise create a new index and add documents ix = index.create_in(index_dir, schema) writer = ix.writer() docs = extract_contents_from_db(db_path) for doc_id, text in docs: writer.add_document(id=doc_id, content=text) writer.commit() return ix if __name__ == "__main__": DATABASE_ROOT = "databases" INDEX_ROOT = "db_contents_index" # Optionally remove any existing index directory to start fresh if os.path.isdir(INDEX_ROOT): import shutil shutil.rmtree(INDEX_ROOT) os.makedirs(INDEX_ROOT, exist_ok=True) # Loop over each database folder in databases/ for db_id in os.listdir(DATABASE_ROOT): db_file = os.path.join(DATABASE_ROOT, db_id, f"{db_id}.sqlite") if os.path.isfile(db_file): print(f"Building Whoosh index for {db_id}...") build_index_for_db(db_id, db_file, INDEX_ROOT) print("All indexes built successfully.")