# src/utils/drive_document_processor.py from pathlib import Path from typing import Dict, List, Any, Tuple import logging from fastapi import HTTPException from src.utils.google_drive_service import GoogleDriveService from src.utils.document_processor import DocumentProcessor from src.vectorstores.chroma_vectorstore import ChromaVectorStore from src.utils.logger import logger class DriveDocumentProcessor: def __init__( self, google_service_account_path: str, folder_id: str, temp_dir: str, doc_processor: DocumentProcessor ): """ Initialize Drive Document Processor Args: google_service_account_path (str): Path to Google service account credentials folder_id (str): Google Drive folder ID to process temp_dir (str): Directory for temporary files doc_processor (DocumentProcessor): Instance of DocumentProcessor """ self.google_drive_service = GoogleDriveService(google_service_account_path) self.folder_id = folder_id self.temp_dir = Path(temp_dir) self.doc_processor = doc_processor # Create temp directory if it doesn't exist self.temp_dir.mkdir(exist_ok=True) # Define supported MIME types self.supported_mime_types = { # Google Docs 'application/vnd.google-apps.document': '.docx', # Export Google Docs as DOCX # Microsoft Word Documents 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx', 'application/msword': '.doc', # Microsoft Excel Documents 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx', 'application/vnd.ms-excel': '.xls', # Text Documents 'text/plain': '.txt', 'text/csv': '.csv', 'text/markdown': '.md', 'text/html': '.html', 'text/xml': '.xml', 'application/json': '.json', 'application/rtf': '.rtf', # PDF Documents 'application/pdf': '.pdf' } # Define export MIME types for Google Docs formats self.google_docs_export_types = { 'application/vnd.google-apps.document': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' } async def process_documents( self, vector_store: ChromaVectorStore ) -> Dict[str, Any]: """ Process all documents in the specified Drive folder Args: vector_store (ChromaVectorStore): Vector store instance Returns: Dict[str, Any]: Processing results """ try: # Get documents from folder files = self.google_drive_service.get_folder_contents(self.folder_id) processed_files = [] skipped_files = [] errors = [] for file in files: result = await self._process_single_file(file, vector_store) if result['status'] == 'processed': processed_files.append(result['data']) elif result['status'] == 'skipped': skipped_files.append(result['data']) else: # status == 'error' errors.append(result['data']) # Clean up temporary directory if empty self._cleanup_temp_dir() return { "status": "completed", "processed_files": { "count": len(processed_files), "details": processed_files }, "skipped_files": { "count": len(skipped_files), "details": skipped_files }, "errors": { "count": len(errors), "details": errors } } except Exception as e: logger.error(f"Error processing Drive documents: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to process drive documents: {str(e)}" ) async def _process_single_file( self, file: Dict[str, Any], vector_store: ChromaVectorStore ) -> Dict[str, Any]: """Process a single Drive file""" mime_type = file.get('mimeType', '') # Skip if mime type not supported if mime_type not in self.supported_mime_types: return { 'status': 'skipped', 'data': { 'name': file['name'], 'reason': f'Unsupported mime type: {mime_type}' } } try: document_id = file['id'] modified_time = file.get('modifiedTime', 'N/A') # Get last modified time # Check if document should be processed if self.save_document(document_id, vector_store, modified_time): # Download and process file temp_file_path = await self._download_and_save_file( file['id'], mime_type ) try: # Process document processed_doc = await self.doc_processor.process_document( str(temp_file_path) ) # Add to vector store self._add_to_vector_store( processed_doc['chunks'], file, mime_type, vector_store ) return { 'status': 'processed', 'data': { 'name': file['name'], 'id': file['id'], 'chunks_processed': len(processed_doc['chunks']) } } finally: # Clean up temporary file if temp_file_path.exists(): temp_file_path.unlink() else: # Return skipped status if document already exists and is up to date return { 'status': 'skipped', 'data': { 'name': file['name'], 'reason': 'Document already exists in the memory.' } } except Exception as e: logger.error(f"Error processing file {file['name']}: {str(e)}") return { 'status': 'error', 'data': { 'file_name': file['name'], 'error': str(e) } } except Exception as e: logger.error(f"Error processing file {file['name']}: {str(e)}") return { 'status': 'error', 'data': { 'file_name': file['name'], 'error': str(e) } } async def _download_and_save_file( self, file_id: str, mime_type: str ) -> Path: """Download and save file to temporary location""" extension = self.supported_mime_types[mime_type] temp_file_path = self.temp_dir / f"{file_id}{extension}" if mime_type in self.google_docs_export_types: # Download Google Doc in the specified export format content = self.google_drive_service.export_file( file_id, self.google_docs_export_types[mime_type] ) else: # Download regular file content = self.google_drive_service.download_file(file_id) with open(temp_file_path, 'wb') as f: if isinstance(content, str): f.write(content.encode('utf-8')) else: f.write(content) return temp_file_path def _add_to_vector_store( self, chunks: List[str], file: Dict[str, Any], mime_type: str, vector_store: ChromaVectorStore ) -> None: """Add processed chunks to vector store""" chunk_metadatas = [] chunk_ids = [] # document_id = file['id'] modified_time = file.get('modifiedTime', 'N/A') # Get last modified time #self.delete_updated_document(document_id, vector_store, modified_time) for i, chunk in enumerate(chunks): chunk_id = f"{file['id']}-chunk-{i}" chunk_ids.append(chunk_id) chunk_metadatas.append({ "source": file['name'], "document_id": file['id'], "chunk_index": i, "mime_type": mime_type, "modified_time": modified_time, "total_chunks": len(chunks), "file_type": self.supported_mime_types[mime_type], "is_google_doc": mime_type.startswith('application/vnd.google-apps') }) vector_store.add_documents( documents=chunks, metadatas=chunk_metadatas, ids=chunk_ids ) def save_document(self, document_id: str, vector_store: ChromaVectorStore, modified_date: str) -> bool: """ Deletes all chunks of a document if the modified_time does not match the given modified_date. Args: document_id (str): The ID of the document. vector_store (ChromaVectorStore): The Chroma vector store instance. modified_date (str): The expected modification date. """ try: # Retrieve all chunks for the given document_id chunks = vector_store.get_document_chunks(document_id) if not chunks: logging.warning(f"No chunks found for document_id: {document_id}. Nothing to delete.") return True # Check the modified_time of the first chunk first_chunk_metadata = chunks[0].get("metadata", {}) if first_chunk_metadata.get("modified_time") != modified_date: # If modified_time doesn't match, delete all chunks vector_store.delete_document(document_id) logging.info(f"Deleted all chunks for document_id: {document_id} due to modified_time mismatch.") return True else: logging.info(f"No deletion needed for document_id: {document_id}, modified_time is unchanged.") return False except Exception as e: logging.error(f"Error while deleting chunks for document_id {document_id}: {str(e)}") return True def _cleanup_temp_dir(self) -> None: """Clean up temporary directory if empty""" if self.temp_dir.exists() and not any(self.temp_dir.iterdir()): self.temp_dir.rmdir()