File size: 8,908 Bytes
9681c5d
 
 
 
 
 
 
4e87dd5
9681c5d
 
 
 
4e87dd5
 
9681c5d
 
 
 
 
 
 
4e87dd5
9681c5d
d5d262c
9681c5d
 
4dc95d7
9681c5d
4e87dd5
 
 
 
 
 
 
 
9681c5d
 
 
 
 
d5d262c
 
4e87dd5
 
 
 
9681c5d
 
4e87dd5
 
 
 
 
 
 
 
 
 
 
9681c5d
 
 
4e87dd5
9681c5d
4e87dd5
 
 
 
 
 
9681c5d
 
 
 
 
 
 
 
 
 
 
 
 
 
4e87dd5
 
 
 
 
 
 
 
 
9681c5d
4e87dd5
9ffcda2
f5df877
 
 
 
4e87dd5
f5df877
 
4e87dd5
 
f5df877
 
4e87dd5
f5df877
 
d5d262c
4e87dd5
d5d262c
 
 
9ffcda2
d5d262c
 
 
9ffcda2
d5d262c
9ffcda2
 
 
 
 
 
 
 
 
 
d5d262c
f5df877
 
 
4e87dd5
f5df877
 
 
 
 
 
4e87dd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f5df877
4e87dd5
 
f5df877
4e87dd5
 
 
 
 
 
 
 
 
 
 
 
f5df877
9681c5d
 
 
 
4e87dd5
 
 
 
 
 
9681c5d
 
f5df877
4e87dd5
 
f5df877
4e87dd5
 
 
f5df877
9681c5d
4e87dd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9681c5d
f5df877
 
9681c5d
 
4e87dd5
9681c5d
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
Blog Data Update Script

This script updates the blog data vector store when new posts are added.
It can be scheduled to run periodically or manually executed.

Usage:
    python pipeline.py [--force-recreate] [--data-dir DATA_DIR] [--output-dir OUTPUT_DIR] [--ci]

Options:
    --force-recreate   Force recreation of the vector store even if it exists
    --data-dir DIR     Directory containing the blog posts (default: data/)
    --output-dir DIR   Directory to save stats and artifacts (default: ./stats)
    --ci               Run in CI mode (no interactive prompts, exit codes for CI)
"""

import os
import sys
import argparse
from datetime import datetime
import json
import logging
from pathlib import Path
from lets_talk.config import VECTOR_STORAGE_PATH, DATA_DIR

# Import the blog utilities module
import lets_talk.utils.blog as blog

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("blog-pipeline")

def parse_args():
    """Parse command-line arguments"""
    parser = argparse.ArgumentParser(description="Update blog data vector store")
    parser.add_argument("--force-recreate", action="store_true", 
                        help="Force recreation of the vector store")
    parser.add_argument("--data-dir", default=DATA_DIR,
                        help=f"Directory containing blog posts (default: {DATA_DIR})")
    parser.add_argument("--output-dir", default="./stats",
                        help="Directory to save stats and artifacts (default: ./stats)")
    parser.add_argument("--ci", action="store_true",
                        help="Run in CI mode (no interactive prompts, exit codes for CI)")
    return parser.parse_args()

def save_stats(stats, output_dir="./stats", ci_mode=False):
    """Save stats to a JSON file for tracking changes over time
    
    Args:
        stats: Dictionary containing statistics about the blog posts
        output_dir: Directory to save the stats file
        ci_mode: Whether to run in CI mode (use fixed filename)
    
    Returns:
        Tuple of (filename, stats_dict)
    """
    # Create directory if it doesn't exist
    Path(output_dir).mkdir(exist_ok=True, parents=True)
    
    # Create filename with timestamp or use fixed name for CI
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    if ci_mode:
        filename = f"{output_dir}/blog_stats_latest.json"
        # Also create a timestamped version for historical tracking
        history_filename = f"{output_dir}/blog_stats_{timestamp}.json"
    else:
        filename = f"{output_dir}/blog_stats_{timestamp}.json"
    
    # Save only the basic stats, not the full document list
    basic_stats = {
        "timestamp": timestamp,
        "total_documents": stats["total_documents"],
        "total_characters": stats["total_characters"],
        "min_length": stats["min_length"],
        "max_length": stats["max_length"],
        "avg_length": stats["avg_length"],
    }
    
    with open(filename, "w") as f:
        json.dump(basic_stats, f, indent=2)
    
    # In CI mode, also save a timestamped version
    if ci_mode:
        with open(history_filename, "w") as f:
            json.dump(basic_stats, f, indent=2)
        logger.info(f"Saved stats to {filename} and {history_filename}")
    else:
        logger.info(f"Saved stats to {filename}")
    
    return filename, basic_stats

def create_vector_database(data_dir=DATA_DIR, storage_path=VECTOR_STORAGE_PATH, 
                      force_recreate=False, output_dir="./stats", ci_mode=False, use_chunking=True, save_stats=True):
    """
    Create or update the vector database with blog documents.
    
    Args:
        data_dir: Directory containing the blog posts
        storage_path: Path where the vector database will be stored
        force_recreate: Whether to force recreation of the vector store
        output_dir: Directory to save stats and artifacts
        ci_mode: Whether to run in CI mode
        
    Returns:
        Tuple of (success status, message, stats, stats_file, stats_file_content)
    """
    try:
        # Load and process documents
        logger.info(f"Loading blog posts from {data_dir}")
        documents = blog.load_blog_posts(data_dir)
        documents = blog.update_document_metadata(documents)
        
        
        # Get stats
        stats = blog.get_document_stats(documents)
        blog.display_document_stats(stats)

        # Save stats for tracking
        stats_file = None
        stats_content = None
        if save_stats:
            stats_file, stats_content = save_stats(stats, output_dir=output_dir, ci_mode=ci_mode)
        
        if use_chunking:
            logger.info("Chunking documents...")
            documents = blog.split_documents(documents)

        

        create_vector_store = (not Path.exists(Path(storage_path))) or force_recreate
        
        if create_vector_store:
            logger.info("Creating vector store...")
            vector_store = blog.create_vector_store(
                documents, 
                storage_path=storage_path, 
                force_recreate=force_recreate
            )
            vector_store.client.close()
            logger.info(f"Vector store successfully created at {storage_path}")
            
            # In CI mode, create a metadata file with the build info
            if ci_mode:
                build_info = {
                    "build_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    "document_count": stats["total_documents"],
                    "storage_path": str(storage_path),
                    "vector_store_size_bytes": get_directory_size(storage_path),
                }
                build_info_path = Path(output_dir) / "vector_store_build_info.json"
                with open(build_info_path, "w") as f:
                    json.dump(build_info, f, indent=2)
                logger.info(f"Build info saved to {build_info_path}")
                
            return True, f"Vector store successfully created at {storage_path}", stats, stats_file, stats_content
        else:
            logger.info(f"Vector store already exists at {storage_path}")
            return True, f"Vector store already exists at {storage_path} (use --force-recreate to rebuild)", stats, stats_file, stats_content
    except Exception as e:
        logger.error(f"Error creating vector store: {str(e)}", exc_info=True)
        return False, f"Error creating vector store: {str(e)}", None, None, None

def get_directory_size(path):
    """Get the size of a directory in bytes"""
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            if not os.path.islink(filepath):
                total_size += os.path.getsize(filepath)
    return total_size

def main():
    """Main function to update blog data"""
    args = parse_args()
    
    logger.info("=== Blog Data Update ===")
    logger.info(f"Data directory: {args.data_dir}")
    logger.info(f"Force recreate: {args.force_recreate}")
    logger.info(f"Output directory: {args.output_dir}")
    logger.info(f"CI mode: {args.ci}")
    logger.info("========================")
    
    try:
        # Create or update vector database
        success, message, stats, stats_file, stats_content = create_vector_database(
            data_dir=args.data_dir, 
            storage_path=VECTOR_STORAGE_PATH, 
            force_recreate=args.force_recreate,
            output_dir=args.output_dir,
            ci_mode=args.ci
        )
        
        logger.info("\n=== Update Summary ===")
        if stats:
            logger.info(f"Processed {stats['total_documents']} documents")
            logger.info(f"Stats saved to: {stats_file}")
        logger.info(f"Vector DB status: {message}")
        logger.info("=====================")
        
        # In CI mode, create a summary file that GitHub Actions can use to set outputs
        if args.ci and stats:
            ci_summary_path = Path(args.output_dir) / "ci_summary.json"
            ci_summary = {
                "status": "success" if success else "failure",
                "message": message,
                "stats_file": stats_file,
                "document_count": stats["total_documents"],
                "vector_store_path": str(VECTOR_STORAGE_PATH)
            }
            with open(ci_summary_path, "w") as f:
                json.dump(ci_summary, f, indent=2)
            logger.info(f"CI summary saved to {ci_summary_path}")
        
        if not success:
            return 1
        return 0
    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)
        return 1

if __name__ == "__main__":
    sys.exit(main())