# Developer Documentation ## Development Setup ### Prerequisites - Python 3.9 or higher - Git - Azure OpenAI account - Azure Document Intelligence account ### Local Development Environment 1. **Clone the repository** ```bash git clone cd doctorecord ``` 2. **Create virtual environment** ```bash python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate ``` 3. **Install dependencies** ```bash pip install -r requirements.txt ``` 4. **Set up environment variables** ```bash cp .env.example .env # Edit .env with your Azure credentials ``` 5. **Run the application** ```bash streamlit run src/app.py ``` ## Project Structure ``` doctorecord/ ├── src/ │ ├── agents/ # Agent implementations │ │ ├── base_agent.py # Base agent class │ │ ├── pdf_agent.py # PDF text extraction │ │ ├── table_agent.py # Table processing │ │ ├── field_mapper_agent.py # Field extraction │ │ ├── unique_indices_combinator.py # Unique combinations │ │ └── unique_indices_loop_agent.py # Loop processing │ ├── services/ # Service layer │ │ ├── llm_client.py # Azure OpenAI client │ │ ├── azure_di_service.py # Document Intelligence │ │ ├── cost_tracker.py # Cost tracking │ │ └── embedding_client.py # Semantic search │ ├── orchestrator/ # Orchestration layer │ │ ├── planner.py # Plan generation │ │ └── executor.py # Plan execution │ ├── config/ # Configuration │ │ └── settings.py # Settings management │ └── app.py # Streamlit application ├── tests/ # Test files ├── logs/ # Log files ├── requirements.txt # Python dependencies └── README.md # Project documentation ``` ## Coding Standards ### Python Style Guide - Follow PEP 8 style guidelines - Use type hints for function parameters and return values - Maximum line length: 88 characters (Black formatter) - Use descriptive variable and function names ### Code Organization ```python # Standard imports import logging from typing import Dict, Any, Optional, List # Third-party imports import pandas as pd from azure.ai.documentintelligence import DocumentIntelligenceClient # Local imports from .base_agent import BaseAgent from services.llm_client import LLMClient ``` ### Logging Standards ```python class MyAgent(BaseAgent): def __init__(self): self.logger = logging.getLogger(__name__) def execute(self, ctx: Dict[str, Any]) -> Optional[str]: self.logger.info("Starting execution") self.logger.debug(f"Context keys: {list(ctx.keys())}") try: # Implementation self.logger.info("Execution completed successfully") return result except Exception as e: self.logger.error(f"Execution failed: {str(e)}", exc_info=True) return None ``` ### Error Handling ```python def safe_execution(self, operation): try: return operation() except Exception as e: self.logger.error(f"Operation failed: {str(e)}", exc_info=True) # Return appropriate fallback or re-raise raise ``` ## Agent Development ### Creating a New Agent 1. **Inherit from BaseAgent** ```python from .base_agent import BaseAgent class MyNewAgent(BaseAgent): def __init__(self): super().__init__() self.logger = logging.getLogger(__name__) ``` 2. **Implement the execute method** ```python def execute(self, ctx: Dict[str, Any]) -> Optional[str]: """ Execute the agent's main functionality. Args: ctx: Context dictionary containing input data Returns: Result string or None if failed """ self.logger.info("Starting MyNewAgent execution") # Store context for use in helper methods self.ctx = ctx # Implementation here result = self._process_data(ctx) return result ``` 3. **Add to executor** ```python # In src/orchestrator/executor.py from agents.my_new_agent import MyNewAgent class Executor: def __init__(self, settings, cost_tracker=None): self.tools = { # ... existing tools "MyNewAgent": MyNewAgent(), } ``` ### Agent Best Practices 1. **Context Management** ```python def execute(self, ctx: Dict[str, Any]) -> Optional[str]: # Store context for helper methods self.ctx = ctx # Access context data text = ctx.get("text", "") fields = ctx.get("fields", []) ``` 2. **Cost Tracking Integration** ```python def _call_llm(self, prompt: str, description: str) -> str: # Get cost tracker from context cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None result = self.llm.responses( prompt, temperature=0.0, ctx={"cost_tracker": cost_tracker} if cost_tracker else None, description=description ) return result ``` 3. **Error Handling** ```python def execute(self, ctx: Dict[str, Any]) -> Optional[str]: try: # Implementation return result except Exception as e: self.logger.error(f"Agent execution failed: {str(e)}", exc_info=True) return None ``` ## Service Development ### LLM Client Usage ```python from services.llm_client import LLMClient from config.settings import settings class MyAgent(BaseAgent): def __init__(self): self.llm = LLMClient(settings) def _extract_data(self, text: str) -> str: prompt = f"Extract data from: {text}" # Get cost tracker from context cost_tracker = self.ctx.get("cost_tracker") if hasattr(self, 'ctx') else None result = self.llm.responses( prompt, temperature=0.0, ctx={"cost_tracker": cost_tracker} if cost_tracker else None, description="Data Extraction" ) return result ``` ### Cost Tracking Integration ```python from services.cost_tracker import CostTracker # In executor or main application cost_tracker = CostTracker() # Pass to agents via context ctx = { "cost_tracker": cost_tracker, # ... other context data } # Track costs costs = cost_tracker.calculate_current_file_costs() print(f"Total cost: ${costs['openai']['total_cost']:.4f}") ``` ## Testing ### Running Tests ```bash # Run all tests python -m pytest tests/ # Run specific test file python -m pytest tests/test_cost_tracking.py # Run with coverage python -m pytest --cov=src tests/ ``` ### Writing Tests ```python import pytest from unittest.mock import Mock, patch from src.agents.my_agent import MyAgent def test_my_agent_execution(): """Test MyAgent execution with mock data.""" agent = MyAgent() # Mock context ctx = { "text": "Test document content", "fields": ["field1", "field2"], "cost_tracker": Mock() } # Mock LLM response with patch.object(agent.llm, 'responses') as mock_llm: mock_llm.return_value = '{"field1": "value1", "field2": "value2"}' result = agent.execute(ctx) assert result is not None assert "field1" in result assert "field2" in result ``` ### Test Structure ``` tests/ ├── test_agents/ # Agent tests │ ├── test_field_mapper_agent.py │ └── test_unique_indices_combinator.py ├── test_services/ # Service tests │ ├── test_llm_client.py │ └── test_cost_tracker.py ├── test_orchestrator/ # Orchestrator tests │ ├── test_planner.py │ └── test_executor.py └── integration/ # Integration tests └── test_end_to_end.py ``` ## Configuration Management ### Settings Structure ```python # src/config/settings.py from pydantic_settings import BaseSettings class Settings(BaseSettings): # Azure OpenAI AZURE_OPENAI_ENDPOINT: str AZURE_OPENAI_API_KEY: str AZURE_OPENAI_DEPLOYMENT: str AZURE_OPENAI_API_VERSION: str = "2025-03-01-preview" # Azure Document Intelligence AZURE_DI_ENDPOINT: str AZURE_DI_KEY: str # Retry Configuration LLM_MAX_RETRIES: int = 5 LLM_BASE_DELAY: float = 1.0 LLM_MAX_DELAY: float = 60.0 class Config: env_file = ".env" ``` ### Environment Variables ```bash # .env file AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ AZURE_OPENAI_API_KEY=your-api-key AZURE_OPENAI_DEPLOYMENT=your-deployment-name AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/ AZURE_DI_KEY=your-di-key ``` ## Debugging ### Logging Configuration ```python import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Set specific logger levels logging.getLogger('azure').setLevel(logging.WARNING) logging.getLogger('openai').setLevel(logging.WARNING) ``` ### Debug Mode ```python # Enable debug logging logging.getLogger().setLevel(logging.DEBUG) # In agents self.logger.debug(f"Processing data: {data[:200]}...") ``` ### Cost Tracking Debug ```python # Check cost tracker state print(f"LLM calls: {len(cost_tracker.llm_calls)}") print(f"Input tokens: {cost_tracker.llm_input_tokens}") print(f"Output tokens: {cost_tracker.llm_output_tokens}") # Get detailed costs costs_df = cost_tracker.get_detailed_costs_table() print(costs_df) ``` ## Performance Optimization ### Memory Management ```python # Process large documents in chunks def process_large_document(self, text: str, chunk_size: int = 10000): chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] results = [] for chunk in chunks: result = self._process_chunk(chunk) results.append(result) return self._combine_results(results) ``` ### Caching ```python # Use session state for caching if 'processed_data' not in st.session_state: st.session_state.processed_data = {} # Check cache before processing if key in st.session_state.processed_data: return st.session_state.processed_data[key] ``` ### Batch Processing ```python # Process multiple items efficiently def process_batch(self, items: List[str]) -> List[str]: results = [] for item in items: try: result = self._process_item(item) results.append(result) except Exception as e: self.logger.error(f"Failed to process item: {str(e)}") results.append(None) return results ``` ## Deployment ### Production Setup 1. **Environment Configuration** ```bash # Set production environment variables export AZURE_OPENAI_ENDPOINT=... export AZURE_OPENAI_API_KEY=... ``` 2. **Dependencies** ```bash pip install -r requirements.txt ``` 3. **Run Application** ```bash streamlit run src/app.py --server.port 8501 ``` ### Docker Deployment ```dockerfile FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY src/ ./src/ COPY .env . EXPOSE 8501 CMD ["streamlit", "run", "src/app.py", "--server.port=8501"] ``` ## Contributing ### Development Workflow 1. Create feature branch: `git checkout -b feature/new-feature` 2. Make changes following coding standards 3. Add tests for new functionality 4. Run tests: `python -m pytest tests/` 5. Update documentation 6. Submit pull request ### Code Review Checklist - [ ] Code follows style guidelines - [ ] Tests are included and passing - [ ] Documentation is updated - [ ] Error handling is implemented - [ ] Cost tracking is integrated - [ ] Logging is appropriate ### Release Process 1. Update version in `__init__.py` 2. Update CHANGELOG.md 3. Create release tag 4. Deploy to production 5. Update documentation ## Troubleshooting ### Common Issues **Azure OpenAI Connection Errors** ```python # Check configuration print(f"Endpoint: {settings.AZURE_OPENAI_ENDPOINT}") print(f"Deployment: {settings.AZURE_OPENAI_DEPLOYMENT}") print(f"API Version: {settings.AZURE_OPENAI_API_VERSION}") ``` **Cost Tracking Issues** ```python # Verify cost tracker is passed correctly if 'cost_tracker' not in ctx: self.logger.warning("No cost tracker in context") # Check if agents store context if not hasattr(self, 'ctx'): self.logger.warning("Agent doesn't store context") ``` **Memory Issues** ```python # Monitor memory usage import psutil process = psutil.Process() print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.2f} MB") ``` ### Debug Tools - **Log Analysis**: Check logs for error patterns - **Cost Monitoring**: Track API usage and costs - **Performance Profiling**: Monitor execution times - **Memory Profiling**: Track memory usage ## API Reference ### Agent Base Class ```python class BaseAgent: def execute(self, ctx: Dict[str, Any]) -> Optional[str]: """Execute the agent's main functionality.""" raise NotImplementedError ``` ### LLM Client ```python class LLMClient: def responses(self, prompt: str, **kwargs) -> str: """Send prompt to Azure OpenAI and return response.""" ``` ### Cost Tracker ```python class CostTracker: def add_llm_tokens(self, input_tokens: int, output_tokens: int, description: str): """Track LLM token usage and costs.""" def calculate_current_file_costs(self) -> Dict[str, Any]: """Calculate costs for current file processing.""" ``` For more detailed information, refer to the inline documentation in the source code.