|
""" |
|
Blog Data Update Script |
|
|
|
This script updates the blog data vector store when new posts are added. |
|
It can be scheduled to run periodically or manually executed. |
|
|
|
Usage: |
|
python pipeline.py [--force-recreate] [--data-dir DATA_DIR] |
|
|
|
Options: |
|
--force-recreate Force recreation of the vector store even if it exists |
|
--data-dir DIR Directory containing the blog posts (default: data/) |
|
""" |
|
|
|
import os |
|
import sys |
|
import argparse |
|
from datetime import datetime |
|
import json |
|
from pathlib import Path |
|
from lets_talk.config import VECTOR_STORAGE_PATH, DATA_DIR |
|
|
|
|
|
import lets_talk.utils.blog as blog |
|
|
|
def parse_args(): |
|
"""Parse command-line arguments""" |
|
parser = argparse.ArgumentParser(description="Update blog data vector store") |
|
parser.add_argument("--force-recreate", action="store_true", |
|
help="Force recreation of the vector store") |
|
parser.add_argument("--data-dir", default=DATA_DIR, |
|
help=f"Directory containing blog posts (default: {DATA_DIR})") |
|
return parser.parse_args() |
|
|
|
def save_stats(stats, output_dir="./stats"): |
|
"""Save stats to a JSON file for tracking changes over time""" |
|
|
|
Path(output_dir).mkdir(exist_ok=True, parents=True) |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"{output_dir}/blog_stats_{timestamp}.json" |
|
|
|
|
|
basic_stats = { |
|
"timestamp": timestamp, |
|
"total_documents": stats["total_documents"], |
|
"total_characters": stats["total_characters"], |
|
"min_length": stats["min_length"], |
|
"max_length": stats["max_length"], |
|
"avg_length": stats["avg_length"], |
|
} |
|
|
|
with open(filename, "w") as f: |
|
json.dump(basic_stats, f, indent=2) |
|
|
|
print(f"Saved stats to {filename}") |
|
return filename |
|
|
|
def create_vector_database(data_dir=DATA_DIR, storage_path=VECTOR_STORAGE_PATH, force_recreate=False): |
|
""" |
|
Create or update the vector database with blog documents. |
|
|
|
Args: |
|
documents: List of document objects to store in the vector database |
|
data_dir: Directory containing the blog posts (for reporting) |
|
storage_path: Path where the vector database will be stored |
|
force_recreate: Whether to force recreation of the vector store |
|
|
|
Returns: |
|
Tuple of (success status, message) |
|
""" |
|
|
|
|
|
|
|
try: |
|
|
|
documents = blog.load_blog_posts(data_dir) |
|
documents = blog.update_document_metadata(documents) |
|
|
|
|
|
stats = blog.get_document_stats(documents) |
|
blog.display_document_stats(stats) |
|
|
|
|
|
stats_file = save_stats(stats) |
|
|
|
create_vector_store = (not Path.exists(Path(storage_path))) or force_recreate |
|
|
|
if create_vector_store: |
|
print("\nAttempting to save vector store reference file...") |
|
vector_store = blog.create_vector_store( |
|
documents, |
|
storage_path=storage_path, |
|
force_recreate=force_recreate |
|
) |
|
vector_store.client.close() |
|
print("Vector store reference file saved.") |
|
return True, f"Vector store successfully created at {storage_path}",stats, stats_file |
|
else: |
|
return True, f"Vector store already exists at {storage_path} (use --force-recreate to rebuild)",stats, stats_file |
|
except Exception as e: |
|
return False, f"Error creating vector store: {str(e)}" |
|
|
|
def main(): |
|
"""Main function to update blog data""" |
|
args = parse_args() |
|
|
|
print("=== Blog Data Update ===") |
|
print(f"Data directory: {args.data_dir}") |
|
print(f"Force recreate: {args.force_recreate}") |
|
print("========================") |
|
|
|
try: |
|
|
|
|
|
|
|
success, message,stats,stats_file = create_vector_database( |
|
args.data_dir, |
|
storage_path=VECTOR_STORAGE_PATH, |
|
force_recreate=args.force_recreate |
|
) |
|
|
|
print("\n=== Update Summary ===") |
|
print(f"Processed {stats['total_documents']} documents") |
|
print(f"Stats saved to: {stats_file}") |
|
print(f"Vector DB status: {message}") |
|
print("=====================") |
|
|
|
if not success: |
|
return 1 |
|
return 0 |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
return 1 |
|
|
|
if __name__ == "__main__": |
|
sys.exit(main()) |
|
|