Spaces:
Running
Running
from fastapi import APIRouter, HTTPException, Depends | |
from typing import List, Dict, Any, Optional | |
from loguru import logger | |
from pydantic import BaseModel | |
import json | |
from services.test_service import test_service | |
from services.ai_service import ai_service | |
router = APIRouter() | |
class TestCase(BaseModel): | |
id: str | |
title: str | |
preconditions: List[str] | |
steps: List[str] | |
expected_results: List[str] | |
priority: str | |
type: str | |
requirement_id: str | |
class ExportRequest(BaseModel): | |
test_cases: List[TestCase] | |
target: str | |
project_id: str | |
section_id: Optional[str] = None | |
async def generate_test_cases( | |
requirements: List[Dict[str, Any]], | |
ai_provider: str = "openai", | |
model: str = "gpt-3.5-turbo" | |
) -> List[Dict[str, Any]]: | |
""" | |
Generate test cases from requirements. | |
Parameters: | |
- requirements: List of requirements | |
- ai_provider: AI provider to use | |
- model: Model to use | |
""" | |
try: | |
test_cases = await test_service.generate_test_cases( | |
requirements=requirements, | |
ai_service=ai_service | |
) | |
return test_cases | |
except Exception as e: | |
logger.error(f"Error generating test cases: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def export_test_cases(request: ExportRequest) -> Dict[str, Any]: | |
""" | |
Export test cases to test management tool. | |
Parameters: | |
- request: Export request containing test cases and target information | |
""" | |
try: | |
if request.target == "testrail": | |
if not request.section_id: | |
raise HTTPException( | |
status_code=400, | |
detail="section_id is required for TestRail export" | |
) | |
results = await test_service.export_to_testrail( | |
test_cases=request.test_cases, | |
project_id=int(request.project_id), | |
section_id=int(request.section_id) | |
) | |
elif request.target == "jira": | |
results = await test_service.export_to_jira( | |
test_cases=request.test_cases, | |
project_key=request.project_id | |
) | |
elif request.target == "qtest": | |
results = await test_service.export_to_qtest( | |
test_cases=request.test_cases, | |
project_id=int(request.project_id) | |
) | |
else: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Unsupported export target: {request.target}" | |
) | |
return { | |
"status": "success", | |
"results": results | |
} | |
except Exception as e: | |
logger.error(f"Error exporting test cases: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def validate_test_cases( | |
test_cases: List[TestCase], | |
requirements: List[Dict[str, Any]] | |
) -> Dict[str, Any]: | |
""" | |
Validate test cases against requirements. | |
Parameters: | |
- test_cases: List of test cases | |
- requirements: List of requirements | |
""" | |
try: | |
# Create requirement coverage matrix | |
coverage = {} | |
for req in requirements: | |
coverage[req["id"]] = { | |
"requirement": req, | |
"test_cases": [], | |
"covered": False | |
} | |
# Map test cases to requirements | |
for test_case in test_cases: | |
if test_case.requirement_id in coverage: | |
coverage[test_case.requirement_id]["test_cases"].append(test_case) | |
coverage[test_case.requirement_id]["covered"] = True | |
# Calculate coverage metrics | |
total_requirements = len(requirements) | |
covered_requirements = sum(1 for req in coverage.values() if req["covered"]) | |
coverage_percentage = (covered_requirements / total_requirements) * 100 | |
# Identify uncovered requirements | |
uncovered_requirements = [ | |
req["requirement"] | |
for req in coverage.values() | |
if not req["covered"] | |
] | |
return { | |
"status": "success", | |
"coverage_percentage": coverage_percentage, | |
"total_requirements": total_requirements, | |
"covered_requirements": covered_requirements, | |
"uncovered_requirements": uncovered_requirements, | |
"coverage_matrix": coverage | |
} | |
except Exception as e: | |
logger.error(f"Error validating test cases: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def prioritize_test_cases( | |
test_cases: List[TestCase], | |
requirements: List[Dict[str, Any]] | |
) -> List[Dict[str, Any]]: | |
""" | |
Prioritize test cases based on requirements and risk. | |
Parameters: | |
- test_cases: List of test cases | |
- requirements: List of requirements | |
""" | |
try: | |
# Create risk assessment prompt | |
prompt = f""" | |
Analyze the following requirements and test cases to determine test case priority. | |
Consider: | |
1. Requirement priority | |
2. Business impact | |
3. Technical complexity | |
4. Historical defect patterns | |
Requirements: | |
{json.dumps(requirements, indent=2)} | |
Test Cases: | |
{json.dumps([tc.dict() for tc in test_cases], indent=2)} | |
For each test case, provide: | |
1. Priority score (1-5) | |
2. Risk level (High/Medium/Low) | |
3. Justification | |
""" | |
# Get AI assessment | |
assessment = await ai_service.generate_response(prompt=prompt) | |
# Parse and apply prioritization | |
prioritized_cases = [] | |
for test_case in test_cases: | |
# Find assessment for this test case | |
case_assessment = _find_case_assessment( | |
assessment["response"], | |
test_case.id | |
) | |
prioritized_cases.append({ | |
"test_case": test_case, | |
"priority_score": case_assessment["priority_score"], | |
"risk_level": case_assessment["risk_level"], | |
"justification": case_assessment["justification"] | |
}) | |
# Sort by priority score | |
prioritized_cases.sort( | |
key=lambda x: x["priority_score"], | |
reverse=True | |
) | |
return prioritized_cases | |
except Exception as e: | |
logger.error(f"Error prioritizing test cases: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
def _find_case_assessment(assessment_text: str, case_id: str) -> Dict[str, Any]: | |
"""Extract assessment for a specific test case.""" | |
# This is a simplified implementation | |
# In practice, you'd want more robust parsing | |
return { | |
"priority_score": 3, | |
"risk_level": "Medium", | |
"justification": "Default assessment" | |
} |