Spaces:
Paused
Paused
import os | |
import sqlite3 | |
from whoosh import index | |
from whoosh.fields import Schema, ID, TEXT | |
def extract_contents_from_db(db_path, max_len=25): | |
""" | |
Extract all non-null, unique text values of length <= max_len | |
from every table and column in the SQLite database. | |
Returns: | |
List of tuples [(doc_id, text), ...] | |
""" | |
conn = sqlite3.connect(db_path) | |
cur = conn.cursor() | |
docs = [] | |
# Iterate over all user tables in the database | |
for (table_name,) in cur.execute( | |
"SELECT name FROM sqlite_master WHERE type='table'" | |
): | |
if table_name == "sqlite_sequence": | |
continue | |
# PRAGMA table_info returns rows like (cid, name, type, ...) | |
# We want the column **name**, which is at index 1 | |
cols = [r[1] for r in cur.execute(f"PRAGMA table_info('{table_name}')")] | |
# Pull distinct non-null values from each column | |
for col in cols: | |
for (val,) in cur.execute( | |
f"SELECT DISTINCT `{col}` FROM `{table_name}` WHERE `{col}` IS NOT NULL" | |
): | |
text = str(val).strip() | |
if 0 < len(text) <= max_len: | |
# Generate a unique document ID | |
doc_id = f"{table_name}-{col}-{hash(text)}" | |
docs.append((doc_id, text)) | |
conn.close() | |
return docs | |
def build_index_for_db(db_id, db_path, index_root="db_contents_index"): | |
""" | |
Build (or open) a Whoosh index for a single database. | |
- If the index already exists in index_root/db_id, it will be opened. | |
- Otherwise, a new index is created and populated from the SQLite file. | |
""" | |
index_dir = os.path.join(index_root, db_id) | |
os.makedirs(index_dir, exist_ok=True) | |
# Define the schema: unique ID + stored text field | |
schema = Schema( | |
id=ID(stored=True, unique=True), | |
content=TEXT(stored=True) | |
) | |
# Open existing index if present | |
if index.exists_in(index_dir): | |
return index.open_dir(index_dir) | |
# Otherwise create a new index and add documents | |
ix = index.create_in(index_dir, schema) | |
writer = ix.writer() | |
docs = extract_contents_from_db(db_path) | |
for doc_id, text in docs: | |
writer.add_document(id=doc_id, content=text) | |
writer.commit() | |
return ix | |
if __name__ == "__main__": | |
DATABASE_ROOT = "databases" | |
INDEX_ROOT = "db_contents_index" | |
# Optionally remove any existing index directory to start fresh | |
if os.path.isdir(INDEX_ROOT): | |
import shutil | |
shutil.rmtree(INDEX_ROOT) | |
os.makedirs(INDEX_ROOT, exist_ok=True) | |
# Loop over each database folder in databases/ | |
for db_id in os.listdir(DATABASE_ROOT): | |
db_file = os.path.join(DATABASE_ROOT, db_id, f"{db_id}.sqlite") | |
if os.path.isfile(db_file): | |
print(f"Building Whoosh index for {db_id}...") | |
build_index_for_db(db_id, db_file, INDEX_ROOT) | |
print("All indexes built successfully.") |