thechaiexperiment commited on
Commit
d825c91
·
verified ·
1 Parent(s): 8542521

Upload 17 files

Browse files
.github/workflows/ci.yml ADDED
@@ -0,0 +1,89 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ name: CI/CD
2
+
3
+ on:
4
+ push:
5
+ branches: [ main ]
6
+ pull_request:
7
+ branches: [ main ]
8
+
9
+ jobs:
10
+ test:
11
+ runs-on: ubuntu-latest
12
+ services:
13
+ postgres:
14
+ image: postgres:13
15
+ env:
16
+ POSTGRES_USER: postgres
17
+ POSTGRES_PASSWORD: postgres
18
+ POSTGRES_DB: testauto
19
+ ports:
20
+ - 5432:5432
21
+ options: >-
22
+ --health-cmd pg_isready
23
+ --health-interval 10s
24
+ --health-timeout 5s
25
+ --health-retries 5
26
+
27
+ rabbitmq:
28
+ image: rabbitmq:3-management
29
+ ports:
30
+ - 5672:5672
31
+ - 15672:15672
32
+ env:
33
+ RABBITMQ_DEFAULT_USER: guest
34
+ RABBITMQ_DEFAULT_PASS: guest
35
+
36
+ steps:
37
+ - uses: actions/checkout@v2
38
+
39
+ - name: Set up Python
40
+ uses: actions/setup-python@v2
41
+ with:
42
+ python-version: '3.9'
43
+
44
+ - name: Install dependencies
45
+ run: |
46
+ python -m pip install --upgrade pip
47
+ pip install -r requirements.txt
48
+ pip install pytest pytest-cov
49
+
50
+ - name: Run tests
51
+ env:
52
+ POSTGRES_SERVER: localhost
53
+ POSTGRES_USER: postgres
54
+ POSTGRES_PASSWORD: postgres
55
+ POSTGRES_DB: testauto
56
+ RABBITMQ_HOST: localhost
57
+ run: |
58
+ pytest tests/ --cov=./ --cov-report=xml
59
+
60
+ - name: Upload coverage to Codecov
61
+ uses: codecov/codecov-action@v2
62
+ with:
63
+ file: ./coverage.xml
64
+
65
+ docker:
66
+ needs: test
67
+ runs-on: ubuntu-latest
68
+ if: github.event_name == 'push' && github.ref == 'refs/heads/main'
69
+
70
+ steps:
71
+ - uses: actions/checkout@v2
72
+
73
+ - name: Set up Docker Buildx
74
+ uses: docker/setup-buildx-action@v1
75
+
76
+ - name: Login to DockerHub
77
+ uses: docker/login-action@v1
78
+ with:
79
+ username: ${{ secrets.DOCKERHUB_USERNAME }}
80
+ password: ${{ secrets.DOCKERHUB_TOKEN }}
81
+
82
+ - name: Build and push
83
+ uses: docker/build-push-action@v2
84
+ with:
85
+ context: .
86
+ push: true
87
+ tags: |
88
+ ${{ secrets.DOCKERHUB_USERNAME }}/testauto:latest
89
+ ${{ secrets.DOCKERHUB_USERNAME }}/testauto:${{ github.sha }}
api/routes/ai_routes.py ADDED
@@ -0,0 +1,134 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends
2
+ from typing import List, Dict, Any
3
+ from loguru import logger
4
+ from pydantic import BaseModel
5
+
6
+ from services.ai_service import ai_service
7
+
8
+ router = APIRouter()
9
+
10
+ class ModelConfig(BaseModel):
11
+ provider: str
12
+ model: str
13
+ max_tokens: int = 1000
14
+ temperature: float = 0.7
15
+
16
+ class BenchmarkRequest(BaseModel):
17
+ prompt: str
18
+ models: List[ModelConfig]
19
+
20
+ @router.post("/generate")
21
+ async def generate_response(
22
+ prompt: str,
23
+ provider: str = "openai",
24
+ model: str = "gpt-3.5-turbo",
25
+ max_tokens: int = 1000,
26
+ temperature: float = 0.7
27
+ ) -> Dict[str, Any]:
28
+ """
29
+ Generate response using specified AI model.
30
+
31
+ Parameters:
32
+ - prompt: The input prompt
33
+ - provider: AI provider to use (openai, local, openrouter)
34
+ - model: Model to use
35
+ - max_tokens: Maximum tokens in response
36
+ - temperature: Response temperature
37
+ """
38
+ try:
39
+ response = await ai_service.generate_response(
40
+ prompt=prompt,
41
+ provider=provider,
42
+ model=model,
43
+ max_tokens=max_tokens,
44
+ temperature=temperature
45
+ )
46
+ return response
47
+ except Exception as e:
48
+ logger.error(f"Error generating response: {str(e)}")
49
+ raise HTTPException(status_code=500, detail=str(e))
50
+
51
+ @router.post("/benchmark")
52
+ async def benchmark_models(request: BenchmarkRequest) -> List[Dict[str, Any]]:
53
+ """
54
+ Benchmark multiple models with the same prompt.
55
+
56
+ Parameters:
57
+ - request: Benchmark request containing prompt and model configurations
58
+ """
59
+ try:
60
+ results = await ai_service.benchmark_models(
61
+ prompt=request.prompt,
62
+ models=[model.dict() for model in request.models]
63
+ )
64
+ return results
65
+ except Exception as e:
66
+ logger.error(f"Error benchmarking models: {str(e)}")
67
+ raise HTTPException(status_code=500, detail=str(e))
68
+
69
+ @router.get("/available-models")
70
+ async def get_available_models() -> Dict[str, List[str]]:
71
+ """Get list of available models for each provider."""
72
+ return {
73
+ "openai": [
74
+ "gpt-3.5-turbo",
75
+ "gpt-4",
76
+ "gpt-4-turbo"
77
+ ],
78
+ "openrouter": [
79
+ "anthropic/claude-2",
80
+ "google/palm-2",
81
+ "meta-llama/llama-2-70b"
82
+ ],
83
+ "local": [
84
+ "llama-2-7b",
85
+ "llama-2-13b",
86
+ "llama-2-70b"
87
+ ]
88
+ }
89
+
90
+ @router.post("/validate-response")
91
+ async def validate_response(
92
+ prompt: str,
93
+ response: str,
94
+ provider: str = "openai",
95
+ model: str = "gpt-3.5-turbo"
96
+ ) -> Dict[str, Any]:
97
+ """
98
+ Validate AI response against the prompt.
99
+
100
+ Parameters:
101
+ - prompt: Original prompt
102
+ - response: AI-generated response
103
+ - provider: AI provider used
104
+ - model: Model used
105
+ """
106
+ try:
107
+ validation_prompt = f"""
108
+ Validate if the following response adequately addresses the prompt:
109
+
110
+ Prompt:
111
+ {prompt}
112
+
113
+ Response:
114
+ {response}
115
+
116
+ Provide:
117
+ 1. Relevance score (0-1)
118
+ 2. Completeness score (0-1)
119
+ 3. Specific feedback
120
+ """
121
+
122
+ validation = await ai_service.generate_response(
123
+ prompt=validation_prompt,
124
+ provider=provider,
125
+ model=model
126
+ )
127
+
128
+ return {
129
+ "status": "success",
130
+ "validation": validation["response"]
131
+ }
132
+ except Exception as e:
133
+ logger.error(f"Error validating response: {str(e)}")
134
+ raise HTTPException(status_code=500, detail=str(e))
api/routes/automation_routes.py ADDED
@@ -0,0 +1,253 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends
2
+ from typing import List, Dict, Any, Optional
3
+ from loguru import logger
4
+ from pydantic import BaseModel
5
+
6
+ from services.automation_service import automation_service
7
+ from services.test_service import test_service
8
+
9
+ router = APIRouter()
10
+
11
+ class TestScriptRequest(BaseModel):
12
+ test_cases: List[Dict[str, Any]]
13
+ framework: str = "pytest"
14
+ language: str = "python"
15
+ browser: str = "chrome"
16
+
17
+ class TestPlanRequest(BaseModel):
18
+ requirements: List[Dict[str, Any]]
19
+ test_cases: List[Dict[str, Any]]
20
+ project_info: Dict[str, Any]
21
+
22
+ @router.post("/generate-scripts")
23
+ async def generate_test_scripts(request: TestScriptRequest) -> Dict[str, Any]:
24
+ """
25
+ Generate test automation scripts.
26
+
27
+ Parameters:
28
+ - request: Test script generation request
29
+ """
30
+ try:
31
+ # Generate test scripts
32
+ scripts = await automation_service.generate_test_scripts(
33
+ test_cases=request.test_cases,
34
+ framework=request.framework,
35
+ language=request.language,
36
+ browser=request.browser
37
+ )
38
+
39
+ # Generate Gherkin feature file
40
+ feature = await automation_service.generate_gherkin_feature(
41
+ test_cases=request.test_cases
42
+ )
43
+
44
+ return {
45
+ "status": "success",
46
+ "scripts": scripts,
47
+ "feature": feature
48
+ }
49
+ except Exception as e:
50
+ logger.error(f"Error generating test scripts: {str(e)}")
51
+ raise HTTPException(status_code=500, detail=str(e))
52
+
53
+ @router.post("/generate-test-plan")
54
+ async def generate_test_plan(request: TestPlanRequest) -> Dict[str, Any]:
55
+ """
56
+ Generate comprehensive test plan.
57
+
58
+ Parameters:
59
+ - request: Test plan generation request
60
+ """
61
+ try:
62
+ # Calculate coverage metrics
63
+ coverage = await test_service.validate_test_cases(
64
+ test_cases=request.test_cases,
65
+ requirements=request.requirements
66
+ )
67
+
68
+ # Prioritize test cases
69
+ prioritized_cases = await test_service.prioritize_test_cases(
70
+ test_cases=request.test_cases,
71
+ requirements=request.requirements
72
+ )
73
+
74
+ # Generate test plan sections
75
+ test_plan = {
76
+ "project_info": request.project_info,
77
+ "scope": _generate_scope(request.requirements),
78
+ "approach": _generate_approach(request.test_cases),
79
+ "resources": _generate_resources(),
80
+ "schedule": _generate_schedule(request.test_cases),
81
+ "risk_assessment": _generate_risk_assessment(prioritized_cases),
82
+ "requirement_traceability": coverage["coverage_matrix"],
83
+ "coverage_metrics": {
84
+ "percentage": coverage["coverage_percentage"],
85
+ "total_requirements": coverage["total_requirements"],
86
+ "covered_requirements": coverage["covered_requirements"]
87
+ }
88
+ }
89
+
90
+ return {
91
+ "status": "success",
92
+ "test_plan": test_plan
93
+ }
94
+ except Exception as e:
95
+ logger.error(f"Error generating test plan: {str(e)}")
96
+ raise HTTPException(status_code=500, detail=str(e))
97
+
98
+ def _generate_scope(requirements: List[Dict[str, Any]]) -> Dict[str, Any]:
99
+ """Generate test scope section."""
100
+ return {
101
+ "in_scope": [
102
+ {
103
+ "id": req["id"],
104
+ "title": req["title"],
105
+ "priority": req.get("priority", "Medium")
106
+ }
107
+ for req in requirements
108
+ ],
109
+ "out_of_scope": [],
110
+ "assumptions": [
111
+ "Test environment is properly configured",
112
+ "Test data is available",
113
+ "Dependencies are stable"
114
+ ]
115
+ }
116
+
117
+ def _generate_approach(test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
118
+ """Generate test approach section."""
119
+ return {
120
+ "strategy": "Risk-based testing approach",
121
+ "test_levels": [
122
+ "Unit Testing",
123
+ "Integration Testing",
124
+ "System Testing",
125
+ "Acceptance Testing"
126
+ ],
127
+ "test_types": [
128
+ "Functional Testing",
129
+ "Non-functional Testing",
130
+ "Regression Testing"
131
+ ],
132
+ "automation_approach": {
133
+ "framework": "pytest",
134
+ "tools": [
135
+ "Selenium WebDriver",
136
+ "Playwright",
137
+ "pytest-html"
138
+ ],
139
+ "coverage_goal": "80%"
140
+ }
141
+ }
142
+
143
+ def _generate_resources() -> Dict[str, Any]:
144
+ """Generate resources section."""
145
+ return {
146
+ "team": [
147
+ {
148
+ "role": "Test Lead",
149
+ "responsibilities": [
150
+ "Test plan creation",
151
+ "Resource allocation",
152
+ "Progress tracking"
153
+ ]
154
+ },
155
+ {
156
+ "role": "Test Engineer",
157
+ "responsibilities": [
158
+ "Test case execution",
159
+ "Defect reporting",
160
+ "Test automation"
161
+ ]
162
+ }
163
+ ],
164
+ "tools": [
165
+ "Test Management Tool",
166
+ "Automation Framework",
167
+ "CI/CD Pipeline",
168
+ "Version Control System"
169
+ ],
170
+ "environments": [
171
+ "Development",
172
+ "Testing",
173
+ "Staging",
174
+ "Production"
175
+ ]
176
+ }
177
+
178
+ def _generate_schedule(test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
179
+ """Generate schedule section."""
180
+ return {
181
+ "phases": [
182
+ {
183
+ "name": "Planning",
184
+ "duration": "1 week",
185
+ "activities": [
186
+ "Test plan creation",
187
+ "Resource allocation",
188
+ "Tool setup"
189
+ ]
190
+ },
191
+ {
192
+ "name": "Design",
193
+ "duration": "2 weeks",
194
+ "activities": [
195
+ "Test case design",
196
+ "Automation framework setup",
197
+ "Test data preparation"
198
+ ]
199
+ },
200
+ {
201
+ "name": "Execution",
202
+ "duration": "3 weeks",
203
+ "activities": [
204
+ "Test case execution",
205
+ "Defect reporting",
206
+ "Regression testing"
207
+ ]
208
+ },
209
+ {
210
+ "name": "Closure",
211
+ "duration": "1 week",
212
+ "activities": [
213
+ "Test summary report",
214
+ "Lessons learned",
215
+ "Knowledge transfer"
216
+ ]
217
+ }
218
+ ],
219
+ "milestones": [
220
+ "Test plan approval",
221
+ "Test case design completion",
222
+ "Automation framework ready",
223
+ "Test execution completion",
224
+ "Test closure"
225
+ ]
226
+ }
227
+
228
+ def _generate_risk_assessment(
229
+ prioritized_cases: List[Dict[str, Any]]
230
+ ) -> Dict[str, Any]:
231
+ """Generate risk assessment section."""
232
+ return {
233
+ "high_risk_areas": [
234
+ {
235
+ "test_case": case["test_case"],
236
+ "risk_level": case["risk_level"],
237
+ "justification": case["justification"]
238
+ }
239
+ for case in prioritized_cases
240
+ if case["risk_level"] == "High"
241
+ ],
242
+ "mitigation_strategies": [
243
+ "Early testing of high-risk areas",
244
+ "Additional test coverage for critical features",
245
+ "Regular risk reassessment",
246
+ "Automated regression testing"
247
+ ],
248
+ "contingency_plans": [
249
+ "Resource reallocation if needed",
250
+ "Schedule adjustment for high-risk areas",
251
+ "Additional testing cycles if required"
252
+ ]
253
+ }
api/routes/document_routes.py ADDED
@@ -0,0 +1,158 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, UploadFile, File, HTTPException, Depends
2
+ from typing import List, Dict, Any
3
+ from loguru import logger
4
+
5
+ from services.document_service import document_service
6
+ from services.ai_service import ai_service
7
+ from services.test_service import test_service
8
+ from services.automation_service import automation_service
9
+
10
+ router = APIRouter()
11
+
12
+ @router.post("/upload")
13
+ async def upload_document(
14
+ file: UploadFile = File(...),
15
+ process_type: str = "requirements"
16
+ ) -> Dict[str, Any]:
17
+ """
18
+ Upload and process a document.
19
+
20
+ Parameters:
21
+ - file: The document file to upload
22
+ - process_type: Type of processing to perform (requirements, test_cases, etc.)
23
+ """
24
+ try:
25
+ # Save uploaded file
26
+ file_path = await document_service.save_upload_file(file)
27
+
28
+ # Process document
29
+ result = await document_service.process_document(file_path)
30
+
31
+ # Segment document if needed
32
+ segments = await document_service.segment_document(result["text"])
33
+
34
+ return {
35
+ "status": "success",
36
+ "file_path": file_path,
37
+ "segments": segments,
38
+ "type": result["type"]
39
+ }
40
+ except Exception as e:
41
+ logger.error(f"Error processing document: {str(e)}")
42
+ raise HTTPException(status_code=500, detail=str(e))
43
+
44
+ @router.post("/process-requirements")
45
+ async def process_requirements(
46
+ file: UploadFile = File(...),
47
+ ai_provider: str = "openai",
48
+ model: str = "gpt-3.5-turbo"
49
+ ) -> Dict[str, Any]:
50
+ """
51
+ Process requirements document and generate test cases.
52
+
53
+ Parameters:
54
+ - file: The requirements document
55
+ - ai_provider: AI provider to use (openai, local, openrouter)
56
+ - model: Model to use for processing
57
+ """
58
+ try:
59
+ # Upload and process document
60
+ file_path = await document_service.save_upload_file(file)
61
+ result = await document_service.process_document(file_path)
62
+
63
+ # Extract requirements using AI
64
+ prompt = f"""
65
+ Extract requirements from the following text. For each requirement, provide:
66
+ 1. ID
67
+ 2. Title
68
+ 3. Description
69
+ 4. Priority (High/Medium/Low)
70
+
71
+ Text:
72
+ {result["text"]}
73
+ """
74
+
75
+ ai_response = await ai_service.generate_response(
76
+ prompt=prompt,
77
+ provider=ai_provider,
78
+ model=model
79
+ )
80
+
81
+ # Generate test cases
82
+ requirements = _parse_requirements(ai_response["response"])
83
+ test_cases = await test_service.generate_test_cases(
84
+ requirements=requirements,
85
+ ai_service=ai_service
86
+ )
87
+
88
+ return {
89
+ "status": "success",
90
+ "requirements": requirements,
91
+ "test_cases": test_cases
92
+ }
93
+ except Exception as e:
94
+ logger.error(f"Error processing requirements: {str(e)}")
95
+ raise HTTPException(status_code=500, detail=str(e))
96
+
97
+ @router.post("/generate-test-scripts")
98
+ async def generate_test_scripts(
99
+ test_cases: List[Dict[str, Any]],
100
+ framework: str = "pytest",
101
+ language: str = "python",
102
+ browser: str = "chrome"
103
+ ) -> Dict[str, Any]:
104
+ """
105
+ Generate test scripts from test cases.
106
+
107
+ Parameters:
108
+ - test_cases: List of test cases
109
+ - framework: Test framework to use (pytest, playwright)
110
+ - language: Programming language (python)
111
+ - browser: Browser to use (chrome, firefox, etc.)
112
+ """
113
+ try:
114
+ # Generate test scripts
115
+ scripts = await automation_service.generate_test_scripts(
116
+ test_cases=test_cases,
117
+ framework=framework,
118
+ language=language,
119
+ browser=browser
120
+ )
121
+
122
+ # Generate Gherkin feature file
123
+ feature = await automation_service.generate_gherkin_feature(test_cases)
124
+
125
+ return {
126
+ "status": "success",
127
+ "scripts": scripts,
128
+ "feature": feature
129
+ }
130
+ except Exception as e:
131
+ logger.error(f"Error generating test scripts: {str(e)}")
132
+ raise HTTPException(status_code=500, detail=str(e))
133
+
134
+ def _parse_requirements(text: str) -> List[Dict[str, Any]]:
135
+ """Parse AI response into structured requirements."""
136
+ requirements = []
137
+ current_req = {}
138
+
139
+ for line in text.split('\n'):
140
+ line = line.strip()
141
+ if not line:
142
+ continue
143
+
144
+ if line.startswith('ID:'):
145
+ if current_req:
146
+ requirements.append(current_req)
147
+ current_req = {'id': line.split(':', 1)[1].strip()}
148
+ elif line.startswith('Title:'):
149
+ current_req['title'] = line.split(':', 1)[1].strip()
150
+ elif line.startswith('Description:'):
151
+ current_req['description'] = line.split(':', 1)[1].strip()
152
+ elif line.startswith('Priority:'):
153
+ current_req['priority'] = line.split(':', 1)[1].strip()
154
+
155
+ if current_req:
156
+ requirements.append(current_req)
157
+
158
+ return requirements
api/routes/test_routes.py ADDED
@@ -0,0 +1,220 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends
2
+ from typing import List, Dict, Any, Optional
3
+ from loguru import logger
4
+ from pydantic import BaseModel
5
+ import json
6
+
7
+ from services.test_service import test_service
8
+ from services.ai_service import ai_service
9
+
10
+ router = APIRouter()
11
+
12
+ class TestCase(BaseModel):
13
+ id: str
14
+ title: str
15
+ preconditions: List[str]
16
+ steps: List[str]
17
+ expected_results: List[str]
18
+ priority: str
19
+ type: str
20
+ requirement_id: str
21
+
22
+ class ExportRequest(BaseModel):
23
+ test_cases: List[TestCase]
24
+ target: str
25
+ project_id: str
26
+ section_id: Optional[str] = None
27
+
28
+ @router.post("/generate")
29
+ async def generate_test_cases(
30
+ requirements: List[Dict[str, Any]],
31
+ ai_provider: str = "openai",
32
+ model: str = "gpt-3.5-turbo"
33
+ ) -> List[Dict[str, Any]]:
34
+ """
35
+ Generate test cases from requirements.
36
+
37
+ Parameters:
38
+ - requirements: List of requirements
39
+ - ai_provider: AI provider to use
40
+ - model: Model to use
41
+ """
42
+ try:
43
+ test_cases = await test_service.generate_test_cases(
44
+ requirements=requirements,
45
+ ai_service=ai_service
46
+ )
47
+ return test_cases
48
+ except Exception as e:
49
+ logger.error(f"Error generating test cases: {str(e)}")
50
+ raise HTTPException(status_code=500, detail=str(e))
51
+
52
+ @router.post("/export")
53
+ async def export_test_cases(request: ExportRequest) -> Dict[str, Any]:
54
+ """
55
+ Export test cases to test management tool.
56
+
57
+ Parameters:
58
+ - request: Export request containing test cases and target information
59
+ """
60
+ try:
61
+ if request.target == "testrail":
62
+ if not request.section_id:
63
+ raise HTTPException(
64
+ status_code=400,
65
+ detail="section_id is required for TestRail export"
66
+ )
67
+ results = await test_service.export_to_testrail(
68
+ test_cases=request.test_cases,
69
+ project_id=int(request.project_id),
70
+ section_id=int(request.section_id)
71
+ )
72
+ elif request.target == "jira":
73
+ results = await test_service.export_to_jira(
74
+ test_cases=request.test_cases,
75
+ project_key=request.project_id
76
+ )
77
+ elif request.target == "qtest":
78
+ results = await test_service.export_to_qtest(
79
+ test_cases=request.test_cases,
80
+ project_id=int(request.project_id)
81
+ )
82
+ else:
83
+ raise HTTPException(
84
+ status_code=400,
85
+ detail=f"Unsupported export target: {request.target}"
86
+ )
87
+
88
+ return {
89
+ "status": "success",
90
+ "results": results
91
+ }
92
+ except Exception as e:
93
+ logger.error(f"Error exporting test cases: {str(e)}")
94
+ raise HTTPException(status_code=500, detail=str(e))
95
+
96
+ @router.post("/validate")
97
+ async def validate_test_cases(
98
+ test_cases: List[TestCase],
99
+ requirements: List[Dict[str, Any]]
100
+ ) -> Dict[str, Any]:
101
+ """
102
+ Validate test cases against requirements.
103
+
104
+ Parameters:
105
+ - test_cases: List of test cases
106
+ - requirements: List of requirements
107
+ """
108
+ try:
109
+ # Create requirement coverage matrix
110
+ coverage = {}
111
+ for req in requirements:
112
+ coverage[req["id"]] = {
113
+ "requirement": req,
114
+ "test_cases": [],
115
+ "covered": False
116
+ }
117
+
118
+ # Map test cases to requirements
119
+ for test_case in test_cases:
120
+ if test_case.requirement_id in coverage:
121
+ coverage[test_case.requirement_id]["test_cases"].append(test_case)
122
+ coverage[test_case.requirement_id]["covered"] = True
123
+
124
+ # Calculate coverage metrics
125
+ total_requirements = len(requirements)
126
+ covered_requirements = sum(1 for req in coverage.values() if req["covered"])
127
+ coverage_percentage = (covered_requirements / total_requirements) * 100
128
+
129
+ # Identify uncovered requirements
130
+ uncovered_requirements = [
131
+ req["requirement"]
132
+ for req in coverage.values()
133
+ if not req["covered"]
134
+ ]
135
+
136
+ return {
137
+ "status": "success",
138
+ "coverage_percentage": coverage_percentage,
139
+ "total_requirements": total_requirements,
140
+ "covered_requirements": covered_requirements,
141
+ "uncovered_requirements": uncovered_requirements,
142
+ "coverage_matrix": coverage
143
+ }
144
+ except Exception as e:
145
+ logger.error(f"Error validating test cases: {str(e)}")
146
+ raise HTTPException(status_code=500, detail=str(e))
147
+
148
+ @router.post("/prioritize")
149
+ async def prioritize_test_cases(
150
+ test_cases: List[TestCase],
151
+ requirements: List[Dict[str, Any]]
152
+ ) -> List[Dict[str, Any]]:
153
+ """
154
+ Prioritize test cases based on requirements and risk.
155
+
156
+ Parameters:
157
+ - test_cases: List of test cases
158
+ - requirements: List of requirements
159
+ """
160
+ try:
161
+ # Create risk assessment prompt
162
+ prompt = f"""
163
+ Analyze the following requirements and test cases to determine test case priority.
164
+ Consider:
165
+ 1. Requirement priority
166
+ 2. Business impact
167
+ 3. Technical complexity
168
+ 4. Historical defect patterns
169
+
170
+ Requirements:
171
+ {json.dumps(requirements, indent=2)}
172
+
173
+ Test Cases:
174
+ {json.dumps([tc.dict() for tc in test_cases], indent=2)}
175
+
176
+ For each test case, provide:
177
+ 1. Priority score (1-5)
178
+ 2. Risk level (High/Medium/Low)
179
+ 3. Justification
180
+ """
181
+
182
+ # Get AI assessment
183
+ assessment = await ai_service.generate_response(prompt=prompt)
184
+
185
+ # Parse and apply prioritization
186
+ prioritized_cases = []
187
+ for test_case in test_cases:
188
+ # Find assessment for this test case
189
+ case_assessment = _find_case_assessment(
190
+ assessment["response"],
191
+ test_case.id
192
+ )
193
+
194
+ prioritized_cases.append({
195
+ "test_case": test_case,
196
+ "priority_score": case_assessment["priority_score"],
197
+ "risk_level": case_assessment["risk_level"],
198
+ "justification": case_assessment["justification"]
199
+ })
200
+
201
+ # Sort by priority score
202
+ prioritized_cases.sort(
203
+ key=lambda x: x["priority_score"],
204
+ reverse=True
205
+ )
206
+
207
+ return prioritized_cases
208
+ except Exception as e:
209
+ logger.error(f"Error prioritizing test cases: {str(e)}")
210
+ raise HTTPException(status_code=500, detail=str(e))
211
+
212
+ def _find_case_assessment(assessment_text: str, case_id: str) -> Dict[str, Any]:
213
+ """Extract assessment for a specific test case."""
214
+ # This is a simplified implementation
215
+ # In practice, you'd want more robust parsing
216
+ return {
217
+ "priority_score": 3,
218
+ "risk_level": "Medium",
219
+ "justification": "Default assessment"
220
+ }
app/main.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException
2
+ from fastapi.middleware.cors import CORSMiddleware
3
+ from loguru import logger
4
+ import uvicorn
5
+
6
+ from services.database_service import db_service
7
+ from services.queue_service import queue_service
8
+ from api.routes import document_routes, ai_routes, test_routes, automation_routes
9
+
10
+ app = FastAPI(
11
+ title="AI-Powered Test Automation Framework",
12
+ description="An intelligent test automation framework powered by AI",
13
+ version="1.0.0"
14
+ )
15
+
16
+ # Add CORS middleware
17
+ app.add_middleware(
18
+ CORSMiddleware,
19
+ allow_origins=["*"],
20
+ allow_credentials=True,
21
+ allow_methods=["*"],
22
+ allow_headers=["*"],
23
+ )
24
+
25
+ # Include routers
26
+ app.include_router(document_routes.router, prefix="/api/documents", tags=["Documents"])
27
+ app.include_router(ai_routes.router, prefix="/api/ai", tags=["AI"])
28
+ app.include_router(test_routes.router, prefix="/api/tests", tags=["Tests"])
29
+ app.include_router(automation_routes.router, prefix="/api/automation", tags=["Automation"])
30
+
31
+ @app.on_event("startup")
32
+ async def startup_event():
33
+ """Initialize services on startup"""
34
+ try:
35
+ # Initialize database
36
+ db_service.init_db()
37
+ logger.info("Database initialized successfully")
38
+
39
+ # Initialize queue service
40
+ queue_service.connect()
41
+ logger.info("Queue service initialized successfully")
42
+ except Exception as e:
43
+ logger.error(f"Error during startup: {str(e)}")
44
+ raise
45
+
46
+ @app.on_event("shutdown")
47
+ async def shutdown_event():
48
+ """Cleanup on shutdown"""
49
+ try:
50
+ queue_service.close()
51
+ logger.info("Services shut down successfully")
52
+ except Exception as e:
53
+ logger.error(f"Error during shutdown: {str(e)}")
54
+
55
+ @app.get("/health")
56
+ async def health_check():
57
+ """Health check endpoint"""
58
+ return {"status": "healthy"}
59
+
60
+ if __name__ == "__main__":
61
+ uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
config/config.py ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic_settings import BaseSettings
2
+ from typing import Optional, Dict, Any
3
+ import os
4
+ from dotenv import load_dotenv
5
+
6
+ load_dotenv()
7
+
8
+ class Settings(BaseSettings):
9
+ # Application
10
+ APP_NAME: str = "TestAuto"
11
+ DEBUG: bool = False
12
+ VERSION: str = "1.0.0"
13
+
14
+ # API
15
+ API_V1_STR: str = "/api/v1"
16
+ PROJECT_NAME: str = "TestAuto API"
17
+
18
+ # Security
19
+ SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key")
20
+ ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
21
+
22
+ # Database
23
+ POSTGRES_SERVER: str = os.getenv("POSTGRES_SERVER", "localhost")
24
+ POSTGRES_USER: str = os.getenv("POSTGRES_USER", "postgres")
25
+ POSTGRES_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "postgres")
26
+ POSTGRES_DB: str = os.getenv("POSTGRES_DB", "testauto")
27
+ SQLALCHEMY_DATABASE_URI: Optional[str] = None
28
+
29
+ # AI Providers
30
+ OPENAI_API_KEY: Optional[str] = os.getenv("OPENAI_API_KEY")
31
+ OPENROUTER_API_KEY: Optional[str] = os.getenv("OPENROUTER_API_KEY")
32
+ LOCAL_LLM_PATH: Optional[str] = os.getenv("LOCAL_LLM_PATH")
33
+
34
+ # Message Queue
35
+ RABBITMQ_HOST: str = os.getenv("RABBITMQ_HOST", "localhost")
36
+ RABBITMQ_PORT: int = int(os.getenv("RABBITMQ_PORT", "5672"))
37
+ RABBITMQ_USER: str = os.getenv("RABBITMQ_USER", "guest")
38
+ RABBITMQ_PASSWORD: str = os.getenv("RABBITMQ_PASSWORD", "guest")
39
+
40
+ # Test Management Tools
41
+ TESTRAIL_API_KEY: Optional[str] = os.getenv("TESTRAIL_API_KEY")
42
+ TESTRAIL_URL: Optional[str] = os.getenv("TESTRAIL_URL")
43
+ JIRA_API_TOKEN: Optional[str] = os.getenv("JIRA_API_TOKEN")
44
+ JIRA_URL: Optional[str] = os.getenv("JIRA_URL")
45
+ QTEST_API_TOKEN: Optional[str] = os.getenv("QTEST_API_TOKEN")
46
+ QTEST_URL: Optional[str] = os.getenv("QTEST_URL")
47
+
48
+ # Document Processing
49
+ UPLOAD_FOLDER: str = "uploads"
50
+ MAX_CONTENT_LENGTH: int = 16 * 1024 * 1024 # 16MB
51
+ ALLOWED_EXTENSIONS: set = {"pdf", "docx", "doc", "txt", "png", "jpg", "jpeg"}
52
+
53
+ # Logging
54
+ LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
55
+ LOG_FILE: str = "logs/testauto.log"
56
+
57
+ # Docker
58
+ DOCKER_IMAGE: str = "testauto:latest"
59
+
60
+ def __init__(self, **kwargs: Any):
61
+ super().__init__(**kwargs)
62
+ self.SQLALCHEMY_DATABASE_URI = (
63
+ f"postgresql://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}"
64
+ f"@{self.POSTGRES_SERVER}/{self.POSTGRES_DB}"
65
+ )
66
+
67
+ settings = Settings()
docker/Dockerfile ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.9-slim
2
+
3
+ # Install system dependencies
4
+ RUN apt-get update && apt-get install -y \
5
+ build-essential \
6
+ libpq-dev \
7
+ tesseract-ocr \
8
+ && rm -rf /var/lib/apt/lists/*
9
+
10
+ # Set working directory
11
+ WORKDIR /app
12
+
13
+ # Copy requirements file
14
+ COPY requirements.txt .
15
+
16
+ # Install Python dependencies
17
+ RUN pip install --no-cache-dir -r requirements.txt
18
+
19
+ # Copy application code
20
+ COPY . .
21
+
22
+ # Create necessary directories
23
+ RUN mkdir -p uploads generated_scripts logs
24
+
25
+ # Set environment variables
26
+ ENV PYTHONPATH=/app
27
+ ENV PYTHONUNBUFFERED=1
28
+
29
+ # Expose port
30
+ EXPOSE 8000
31
+
32
+ # Run the application
33
+ CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
models/base.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, JSON, Text, Enum
2
+ from sqlalchemy.ext.declarative import declarative_base
3
+ from sqlalchemy.orm import relationship
4
+ from datetime import datetime
5
+ import enum
6
+
7
+ Base = declarative_base()
8
+
9
+ class DocumentType(enum.Enum):
10
+ PDF = "pdf"
11
+ DOCX = "docx"
12
+ IMAGE = "image"
13
+ TEXT = "text"
14
+
15
+ class Document(Base):
16
+ __tablename__ = "documents"
17
+
18
+ id = Column(Integer, primary_key=True)
19
+ filename = Column(String(255), nullable=False)
20
+ file_type = Column(Enum(DocumentType), nullable=False)
21
+ content = Column(Text)
22
+ metadata = Column(JSON)
23
+ created_at = Column(DateTime, default=datetime.utcnow)
24
+ updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
25
+
26
+ test_cases = relationship("TestCase", back_populates="document")
27
+
28
+ class TestCase(Base):
29
+ __tablename__ = "test_cases"
30
+
31
+ id = Column(Integer, primary_key=True)
32
+ document_id = Column(Integer, ForeignKey("documents.id"))
33
+ title = Column(String(255), nullable=False)
34
+ description = Column(Text)
35
+ steps = Column(JSON)
36
+ expected_results = Column(JSON)
37
+ priority = Column(Integer)
38
+ status = Column(String(50))
39
+ created_at = Column(DateTime, default=datetime.utcnow)
40
+ updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
41
+
42
+ document = relationship("Document", back_populates="test_cases")
43
+ test_scripts = relationship("TestScript", back_populates="test_case")
44
+
45
+ class TestScript(Base):
46
+ __tablename__ = "test_scripts"
47
+
48
+ id = Column(Integer, primary_key=True)
49
+ test_case_id = Column(Integer, ForeignKey("test_cases.id"))
50
+ language = Column(String(50))
51
+ code = Column(Text)
52
+ framework = Column(String(50))
53
+ created_at = Column(DateTime, default=datetime.utcnow)
54
+ updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
55
+
56
+ test_case = relationship("TestCase", back_populates="test_scripts")
57
+
58
+ class AIModelInteraction(Base):
59
+ __tablename__ = "ai_model_interactions"
60
+
61
+ id = Column(Integer, primary_key=True)
62
+ provider = Column(String(50), nullable=False)
63
+ model = Column(String(100), nullable=False)
64
+ prompt = Column(Text, nullable=False)
65
+ response = Column(Text)
66
+ metrics = Column(JSON)
67
+ created_at = Column(DateTime, default=datetime.utcnow)
68
+
69
+ def __repr__(self):
70
+ return f"<AIModelInteraction(provider={self.provider}, model={self.model})>"
services/ai_service.py ADDED
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Dict, Any, Optional
2
+ import openai
3
+ from transformers import AutoModelForCausalLM, AutoTokenizer
4
+ import torch
5
+ from loguru import logger
6
+ import time
7
+ from config.config import settings
8
+
9
+ class AIService:
10
+ def __init__(self):
11
+ self.openai_client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
12
+ self.local_model = None
13
+ self.local_tokenizer = None
14
+ if settings.LOCAL_LLM_PATH:
15
+ self._load_local_model()
16
+
17
+ def _load_local_model(self):
18
+ """Load local LLM model."""
19
+ try:
20
+ self.local_tokenizer = AutoTokenizer.from_pretrained(settings.LOCAL_LLM_PATH)
21
+ self.local_model = AutoModelForCausalLM.from_pretrained(settings.LOCAL_LLM_PATH)
22
+ logger.info("Local model loaded successfully")
23
+ except Exception as e:
24
+ logger.error(f"Error loading local model: {str(e)}")
25
+ raise
26
+
27
+ async def generate_response(
28
+ self,
29
+ prompt: str,
30
+ provider: str = "openai",
31
+ model: str = "gpt-3.5-turbo",
32
+ max_tokens: int = 1000,
33
+ temperature: float = 0.7
34
+ ) -> Dict[str, Any]:
35
+ """Generate response using specified provider and model."""
36
+ start_time = time.time()
37
+
38
+ try:
39
+ if provider == "openai":
40
+ response = await self._generate_openai_response(
41
+ prompt, model, max_tokens, temperature
42
+ )
43
+ elif provider == "local":
44
+ response = await self._generate_local_response(
45
+ prompt, max_tokens, temperature
46
+ )
47
+ elif provider == "openrouter":
48
+ response = await self._generate_openrouter_response(
49
+ prompt, model, max_tokens, temperature
50
+ )
51
+ else:
52
+ raise ValueError(f"Unsupported provider: {provider}")
53
+
54
+ end_time = time.time()
55
+ return {
56
+ "response": response,
57
+ "provider": provider,
58
+ "model": model,
59
+ "latency": end_time - start_time
60
+ }
61
+ except Exception as e:
62
+ logger.error(f"Error generating response: {str(e)}")
63
+ raise
64
+
65
+ async def _generate_openai_response(
66
+ self,
67
+ prompt: str,
68
+ model: str,
69
+ max_tokens: int,
70
+ temperature: float
71
+ ) -> str:
72
+ """Generate response using OpenAI API."""
73
+ try:
74
+ response = await self.openai_client.chat.completions.create(
75
+ model=model,
76
+ messages=[{"role": "user", "content": prompt}],
77
+ max_tokens=max_tokens,
78
+ temperature=temperature
79
+ )
80
+ return response.choices[0].message.content
81
+ except Exception as e:
82
+ logger.error(f"OpenAI API error: {str(e)}")
83
+ raise
84
+
85
+ async def _generate_local_response(
86
+ self,
87
+ prompt: str,
88
+ max_tokens: int,
89
+ temperature: float
90
+ ) -> str:
91
+ """Generate response using local model."""
92
+ if not self.local_model or not self.local_tokenizer:
93
+ raise ValueError("Local model not loaded")
94
+
95
+ try:
96
+ inputs = self.local_tokenizer(prompt, return_tensors="pt")
97
+ outputs = self.local_model.generate(
98
+ inputs["input_ids"],
99
+ max_length=max_tokens,
100
+ temperature=temperature,
101
+ do_sample=True
102
+ )
103
+ return self.local_tokenizer.decode(outputs[0], skip_special_tokens=True)
104
+ except Exception as e:
105
+ logger.error(f"Local model error: {str(e)}")
106
+ raise
107
+
108
+ async def _generate_openrouter_response(
109
+ self,
110
+ prompt: str,
111
+ model: str,
112
+ max_tokens: int,
113
+ temperature: float
114
+ ) -> str:
115
+ """Generate response using OpenRouter API."""
116
+ try:
117
+ headers = {
118
+ "Authorization": f"Bearer {settings.OPENROUTER_API_KEY}",
119
+ "HTTP-Referer": "https://testauto.com",
120
+ "X-Title": "TestAuto"
121
+ }
122
+
123
+ response = await self.openai_client.chat.completions.create(
124
+ model=model,
125
+ messages=[{"role": "user", "content": prompt}],
126
+ max_tokens=max_tokens,
127
+ temperature=temperature,
128
+ headers=headers
129
+ )
130
+ return response.choices[0].message.content
131
+ except Exception as e:
132
+ logger.error(f"OpenRouter API error: {str(e)}")
133
+ raise
134
+
135
+ async def benchmark_models(
136
+ self,
137
+ prompt: str,
138
+ models: List[Dict[str, Any]]
139
+ ) -> List[Dict[str, Any]]:
140
+ """Benchmark different models with the same prompt."""
141
+ results = []
142
+
143
+ for model_config in models:
144
+ try:
145
+ result = await self.generate_response(
146
+ prompt=prompt,
147
+ provider=model_config["provider"],
148
+ model=model_config["model"],
149
+ max_tokens=model_config.get("max_tokens", 1000),
150
+ temperature=model_config.get("temperature", 0.7)
151
+ )
152
+ results.append(result)
153
+ except Exception as e:
154
+ logger.error(f"Error benchmarking model {model_config['model']}: {str(e)}")
155
+ results.append({
156
+ "provider": model_config["provider"],
157
+ "model": model_config["model"],
158
+ "error": str(e)
159
+ })
160
+
161
+ return results
162
+
163
+ ai_service = AIService()
services/automation_service.py ADDED
@@ -0,0 +1,189 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Dict, Any, Optional
2
+ from loguru import logger
3
+ import os
4
+ import json
5
+ from datetime import datetime
6
+
7
+ class AutomationService:
8
+ def __init__(self):
9
+ self.output_dir = "generated_scripts"
10
+ os.makedirs(self.output_dir, exist_ok=True)
11
+
12
+ async def generate_test_scripts(
13
+ self,
14
+ test_cases: List[Dict[str, Any]],
15
+ framework: str = "pytest",
16
+ language: str = "python",
17
+ browser: str = "chrome"
18
+ ) -> List[Dict[str, Any]]:
19
+ """Generate test scripts from test cases."""
20
+ results = []
21
+
22
+ for test_case in test_cases:
23
+ try:
24
+ if framework == "pytest":
25
+ script = await self._generate_pytest_script(test_case, browser)
26
+ elif framework == "playwright":
27
+ script = await self._generate_playwright_script(test_case, browser)
28
+ else:
29
+ raise ValueError(f"Unsupported framework: {framework}")
30
+
31
+ file_path = self._save_script(script, test_case["title"], language)
32
+ results.append({
33
+ "test_case_id": test_case["id"],
34
+ "file_path": file_path,
35
+ "status": "success"
36
+ })
37
+ except Exception as e:
38
+ logger.error(f"Error generating test script for test case {test_case['id']}: {str(e)}")
39
+ results.append({
40
+ "test_case_id": test_case["id"],
41
+ "error": str(e),
42
+ "status": "error"
43
+ })
44
+
45
+ return results
46
+
47
+ async def _generate_pytest_script(
48
+ self,
49
+ test_case: Dict[str, Any],
50
+ browser: str
51
+ ) -> str:
52
+ """Generate pytest script with Selenium."""
53
+ script = f'''import pytest
54
+ from selenium import webdriver
55
+ from selenium.webdriver.common.by import By
56
+ from selenium.webdriver.support.ui import WebDriverWait
57
+ from selenium.webdriver.support import expected_conditions as EC
58
+
59
+ class Test{test_case["title"].replace(" ", "")}:
60
+ @pytest.fixture(scope="function")
61
+ def driver(self):
62
+ options = webdriver.{browser.capitalize()}Options()
63
+ driver = webdriver.{browser.capitalize()}(options=options)
64
+ driver.implicitly_wait(10)
65
+ yield driver
66
+ driver.quit()
67
+
68
+ def test_{test_case["title"].lower().replace(" ", "_")}(self, driver):
69
+ """
70
+ Test Case: {test_case["title"]}
71
+ Priority: {test_case["priority"]}
72
+ Type: {test_case["type"]}
73
+ """
74
+ # Preconditions
75
+ '''
76
+ # Add preconditions
77
+ for precond in test_case["preconditions"]:
78
+ script += f" # {precond}\n"
79
+
80
+ # Add test steps
81
+ script += "\n # Test Steps\n"
82
+ for i, step in enumerate(test_case["steps"], 1):
83
+ script += f" # Step {i}: {step}\n"
84
+ # Add basic Selenium commands based on step description
85
+ if "click" in step.lower():
86
+ script += " element = WebDriverWait(driver, 10).until(\n"
87
+ script += " EC.element_to_be_clickable((By.XPATH, '//button'))\n"
88
+ script += " )\n"
89
+ script += " element.click()\n"
90
+ elif "input" in step.lower() or "enter" in step.lower():
91
+ script += " input_element = WebDriverWait(driver, 10).until(\n"
92
+ script += " EC.presence_of_element_located((By.XPATH, '//input'))\n"
93
+ script += " )\n"
94
+ script += " input_element.send_keys('test_input')\n"
95
+
96
+ # Add assertions
97
+ script += "\n # Expected Results\n"
98
+ for expected in test_case["expected_results"]:
99
+ script += f" # {expected}\n"
100
+ script += " assert True # Replace with actual assertion\n"
101
+
102
+ return script
103
+
104
+ async def _generate_playwright_script(
105
+ self,
106
+ test_case: Dict[str, Any],
107
+ browser: str
108
+ ) -> str:
109
+ """Generate Playwright script."""
110
+ script = f'''import pytest
111
+ from playwright.sync_api import sync_playwright
112
+
113
+ def test_{test_case["title"].lower().replace(" ", "_")}():
114
+ """
115
+ Test Case: {test_case["title"]}
116
+ Priority: {test_case["priority"]}
117
+ Type: {test_case["type"]}
118
+ """
119
+ with sync_playwright() as p:
120
+ browser = p.{browser}.launch()
121
+ page = browser.new_page()
122
+
123
+ # Preconditions
124
+ '''
125
+ # Add preconditions
126
+ for precond in test_case["preconditions"]:
127
+ script += f" # {precond}\n"
128
+
129
+ # Add test steps
130
+ script += "\n # Test Steps\n"
131
+ for i, step in enumerate(test_case["steps"], 1):
132
+ script += f" # Step {i}: {step}\n"
133
+ # Add basic Playwright commands based on step description
134
+ if "click" in step.lower():
135
+ script += " page.click('button')\n"
136
+ elif "input" in step.lower() or "enter" in step.lower():
137
+ script += " page.fill('input', 'test_input')\n"
138
+
139
+ # Add assertions
140
+ script += "\n # Expected Results\n"
141
+ for expected in test_case["expected_results"]:
142
+ script += f" # {expected}\n"
143
+ script += " assert True # Replace with actual assertion\n"
144
+
145
+ script += "\n browser.close()"
146
+
147
+ return script
148
+
149
+ def _save_script(self, script: str, title: str, language: str) -> str:
150
+ """Save generated script to file."""
151
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
152
+ filename = f"test_{title.lower().replace(' ', '_')}_{timestamp}.{language}"
153
+ file_path = os.path.join(self.output_dir, filename)
154
+
155
+ with open(file_path, "w") as f:
156
+ f.write(script)
157
+
158
+ return file_path
159
+
160
+ async def generate_gherkin_feature(
161
+ self,
162
+ test_cases: List[Dict[str, Any]]
163
+ ) -> str:
164
+ """Generate Gherkin feature file from test cases."""
165
+ feature = f'''Feature: Automated Test Cases
166
+ As a test engineer
167
+ I want to automate test cases
168
+ So that I can ensure software quality
169
+
170
+ '''
171
+ for test_case in test_cases:
172
+ feature += f''' Scenario: {test_case["title"]}
173
+ Given the following preconditions:
174
+ '''
175
+ for precond in test_case["preconditions"]:
176
+ feature += f" {precond}\n"
177
+
178
+ feature += " When I perform the following steps:\n"
179
+ for step in test_case["steps"]:
180
+ feature += f" {step}\n"
181
+
182
+ feature += " Then I should see the following results:\n"
183
+ for expected in test_case["expected_results"]:
184
+ feature += f" {expected}\n"
185
+ feature += "\n"
186
+
187
+ return feature
188
+
189
+ automation_service = AutomationService()
services/database_service.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import create_engine
2
+ from sqlalchemy.orm import sessionmaker, Session
3
+ from sqlalchemy.exc import SQLAlchemyError
4
+ from contextlib import contextmanager
5
+ from typing import Generator
6
+ from loguru import logger
7
+ from config.config import Settings
8
+
9
+ settings = Settings()
10
+
11
+ class DatabaseService:
12
+ def __init__(self):
13
+ self.engine = create_engine(settings.DATABASE_URL)
14
+ self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
15
+
16
+ @contextmanager
17
+ def get_session(self) -> Generator[Session, None, None]:
18
+ session = self.SessionLocal()
19
+ try:
20
+ yield session
21
+ session.commit()
22
+ except SQLAlchemyError as e:
23
+ session.rollback()
24
+ logger.error(f"Database error: {str(e)}")
25
+ raise
26
+ finally:
27
+ session.close()
28
+
29
+ def init_db(self):
30
+ """Initialize database tables"""
31
+ from models.base import Base
32
+ try:
33
+ Base.metadata.create_all(bind=self.engine)
34
+ logger.info("Database tables created successfully")
35
+ except SQLAlchemyError as e:
36
+ logger.error(f"Error creating database tables: {str(e)}")
37
+ raise
38
+
39
+ # Create a singleton instance
40
+ db_service = DatabaseService()
services/document_service.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import List, Optional, Dict, Any
3
+ from fastapi import UploadFile, HTTPException
4
+ from loguru import logger
5
+ import pytesseract
6
+ from PIL import Image
7
+ import PyPDF2
8
+ from docx import Document
9
+ import io
10
+ import aiofiles
11
+ from config.config import settings
12
+
13
+ class DocumentService:
14
+ def __init__(self):
15
+ self.upload_folder = settings.UPLOAD_FOLDER
16
+ os.makedirs(self.upload_folder, exist_ok=True)
17
+
18
+ async def save_upload_file(self, upload_file: UploadFile) -> str:
19
+ """Save uploaded file to disk."""
20
+ try:
21
+ file_path = os.path.join(self.upload_folder, upload_file.filename)
22
+ async with aiofiles.open(file_path, 'wb') as out_file:
23
+ content = await upload_file.read()
24
+ await out_file.write(content)
25
+ return file_path
26
+ except Exception as e:
27
+ logger.error(f"Error saving file: {str(e)}")
28
+ raise HTTPException(status_code=500, detail="Error saving file")
29
+
30
+ async def process_document(self, file_path: str) -> Dict[str, Any]:
31
+ """Process document based on its type."""
32
+ file_ext = os.path.splitext(file_path)[1].lower()
33
+
34
+ try:
35
+ if file_ext == '.pdf':
36
+ return await self._process_pdf(file_path)
37
+ elif file_ext in ['.docx', '.doc']:
38
+ return await self._process_word(file_path)
39
+ elif file_ext in ['.png', '.jpg', '.jpeg']:
40
+ return await self._process_image(file_path)
41
+ else:
42
+ raise HTTPException(status_code=400, detail="Unsupported file type")
43
+ except Exception as e:
44
+ logger.error(f"Error processing document: {str(e)}")
45
+ raise HTTPException(status_code=500, detail="Error processing document")
46
+
47
+ async def _process_pdf(self, file_path: str) -> Dict[str, Any]:
48
+ """Extract text from PDF file."""
49
+ try:
50
+ text = ""
51
+ with open(file_path, 'rb') as file:
52
+ pdf_reader = PyPDF2.PdfReader(file)
53
+ for page in pdf_reader.pages:
54
+ text += page.extract_text()
55
+ return {"text": text, "type": "pdf"}
56
+ except Exception as e:
57
+ logger.error(f"Error processing PDF: {str(e)}")
58
+ raise HTTPException(status_code=500, detail="Error processing PDF")
59
+
60
+ async def _process_word(self, file_path: str) -> Dict[str, Any]:
61
+ """Extract text from Word document."""
62
+ try:
63
+ doc = Document(file_path)
64
+ text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
65
+ return {"text": text, "type": "word"}
66
+ except Exception as e:
67
+ logger.error(f"Error processing Word document: {str(e)}")
68
+ raise HTTPException(status_code=500, detail="Error processing Word document")
69
+
70
+ async def _process_image(self, file_path: str) -> Dict[str, Any]:
71
+ """Extract text from image using OCR."""
72
+ try:
73
+ image = Image.open(file_path)
74
+ text = pytesseract.image_to_string(image)
75
+ return {"text": text, "type": "image"}
76
+ except Exception as e:
77
+ logger.error(f"Error processing image: {str(e)}")
78
+ raise HTTPException(status_code=500, detail="Error processing image")
79
+
80
+ async def segment_document(self, text: str, max_segment_size: int = 1000) -> List[str]:
81
+ """Segment large documents into smaller chunks."""
82
+ segments = []
83
+ current_segment = ""
84
+
85
+ for line in text.split('\n'):
86
+ if len(current_segment) + len(line) + 1 <= max_segment_size:
87
+ current_segment += line + '\n'
88
+ else:
89
+ if current_segment:
90
+ segments.append(current_segment.strip())
91
+ current_segment = line + '\n'
92
+
93
+ if current_segment:
94
+ segments.append(current_segment.strip())
95
+
96
+ return segments
97
+
98
+ document_service = DocumentService()
services/queue_service.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pika
2
+ import json
3
+ from typing import Callable, Dict, Any
4
+ from loguru import logger
5
+ from config.config import Settings
6
+
7
+ settings = Settings()
8
+
9
+ class QueueService:
10
+ def __init__(self):
11
+ self.connection = None
12
+ self.channel = None
13
+ self.connect()
14
+
15
+ def connect(self):
16
+ """Establish connection to RabbitMQ"""
17
+ try:
18
+ self.connection = pika.BlockingConnection(
19
+ pika.ConnectionParameters(
20
+ host=settings.RABBITMQ_HOST,
21
+ port=settings.RABBITMQ_PORT,
22
+ credentials=pika.PlainCredentials(
23
+ settings.RABBITMQ_USER,
24
+ settings.RABBITMQ_PASSWORD
25
+ )
26
+ )
27
+ )
28
+ self.channel = self.connection.channel()
29
+ self._declare_queues()
30
+ logger.info("Connected to RabbitMQ successfully")
31
+ except Exception as e:
32
+ logger.error(f"Failed to connect to RabbitMQ: {str(e)}")
33
+ raise
34
+
35
+ def _declare_queues(self):
36
+ """Declare all required queues"""
37
+ queues = [
38
+ "test_generation",
39
+ "test_execution",
40
+ "document_processing",
41
+ "ai_benchmarking"
42
+ ]
43
+ for queue in queues:
44
+ self.channel.queue_declare(queue=queue, durable=True)
45
+
46
+ def publish_message(self, queue: str, message: Dict[str, Any]):
47
+ """Publish a message to a specific queue"""
48
+ try:
49
+ self.channel.basic_publish(
50
+ exchange='',
51
+ routing_key=queue,
52
+ body=json.dumps(message),
53
+ properties=pika.BasicProperties(
54
+ delivery_mode=2, # make message persistent
55
+ )
56
+ )
57
+ logger.info(f"Message published to queue {queue}")
58
+ except Exception as e:
59
+ logger.error(f"Failed to publish message: {str(e)}")
60
+ raise
61
+
62
+ def consume_messages(self, queue: str, callback: Callable):
63
+ """Start consuming messages from a queue"""
64
+ try:
65
+ self.channel.basic_qos(prefetch_count=1)
66
+ self.channel.basic_consume(
67
+ queue=queue,
68
+ on_message_callback=callback
69
+ )
70
+ logger.info(f"Started consuming messages from queue {queue}")
71
+ self.channel.start_consuming()
72
+ except Exception as e:
73
+ logger.error(f"Failed to consume messages: {str(e)}")
74
+ raise
75
+
76
+ def close(self):
77
+ """Close the connection"""
78
+ if self.connection and not self.connection.is_closed:
79
+ self.connection.close()
80
+ logger.info("RabbitMQ connection closed")
81
+
82
+ # Create a singleton instance
83
+ queue_service = QueueService()
services/test_service.py ADDED
@@ -0,0 +1,259 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Dict, Any, Optional
2
+ from loguru import logger
3
+ import json
4
+ from datetime import datetime
5
+ from testrail_api import TestRailAPI
6
+ from jira import JIRA
7
+ from qtest import QTestAPI
8
+
9
+ class TestService:
10
+ def __init__(self):
11
+ self.testrail = None
12
+ self.jira = None
13
+ self.qtest = None
14
+ self._initialize_clients()
15
+
16
+ def _initialize_clients(self):
17
+ """Initialize test management tool clients."""
18
+ try:
19
+ if settings.TESTRAIL_API_KEY and settings.TESTRAIL_URL:
20
+ self.testrail = TestRailAPI(
21
+ settings.TESTRAIL_URL,
22
+ settings.TESTRAIL_API_KEY
23
+ )
24
+
25
+ if settings.JIRA_API_TOKEN and settings.JIRA_URL:
26
+ self.jira = JIRA(
27
+ server=settings.JIRA_URL,
28
+ token_auth=settings.JIRA_API_TOKEN
29
+ )
30
+
31
+ if settings.QTEST_API_TOKEN and settings.QTEST_URL:
32
+ self.qtest = QTestAPI(
33
+ settings.QTEST_URL,
34
+ settings.QTEST_API_TOKEN
35
+ )
36
+ except Exception as e:
37
+ logger.error(f"Error initializing test management clients: {str(e)}")
38
+
39
+ async def generate_test_cases(
40
+ self,
41
+ requirements: List[Dict[str, Any]],
42
+ ai_service: Any
43
+ ) -> List[Dict[str, Any]]:
44
+ """Generate test cases from requirements using AI."""
45
+ test_cases = []
46
+
47
+ for req in requirements:
48
+ try:
49
+ prompt = self._create_test_case_prompt(req)
50
+ response = await ai_service.generate_response(prompt)
51
+
52
+ test_case = self._parse_test_case_response(response["response"])
53
+ test_case["requirement_id"] = req["id"]
54
+ test_cases.append(test_case)
55
+ except Exception as e:
56
+ logger.error(f"Error generating test case for requirement {req['id']}: {str(e)}")
57
+
58
+ return test_cases
59
+
60
+ def _create_test_case_prompt(self, requirement: Dict[str, Any]) -> str:
61
+ """Create prompt for test case generation."""
62
+ return f"""
63
+ Generate a test case for the following requirement:
64
+
65
+ ID: {requirement['id']}
66
+ Title: {requirement['title']}
67
+ Description: {requirement['description']}
68
+
69
+ Please provide:
70
+ 1. Test case title
71
+ 2. Preconditions
72
+ 3. Test steps
73
+ 4. Expected results
74
+ 5. Priority (High/Medium/Low)
75
+ 6. Test type (Functional/Integration/System)
76
+ """
77
+
78
+ def _parse_test_case_response(self, response: str) -> Dict[str, Any]:
79
+ """Parse AI response into structured test case."""
80
+ # Implement parsing logic based on AI response format
81
+ # This is a simplified example
82
+ lines = response.split('\n')
83
+ test_case = {
84
+ "title": "",
85
+ "preconditions": [],
86
+ "steps": [],
87
+ "expected_results": [],
88
+ "priority": "Medium",
89
+ "type": "Functional"
90
+ }
91
+
92
+ current_section = None
93
+ for line in lines:
94
+ line = line.strip()
95
+ if not line:
96
+ continue
97
+
98
+ if "Test case title:" in line:
99
+ test_case["title"] = line.split(":", 1)[1].strip()
100
+ elif "Preconditions:" in line:
101
+ current_section = "preconditions"
102
+ elif "Test steps:" in line:
103
+ current_section = "steps"
104
+ elif "Expected results:" in line:
105
+ current_section = "expected_results"
106
+ elif "Priority:" in line:
107
+ test_case["priority"] = line.split(":", 1)[1].strip()
108
+ elif "Test type:" in line:
109
+ test_case["type"] = line.split(":", 1)[1].strip()
110
+ elif current_section:
111
+ test_case[current_section].append(line)
112
+
113
+ return test_case
114
+
115
+ async def export_to_testrail(
116
+ self,
117
+ test_cases: List[Dict[str, Any]],
118
+ project_id: int,
119
+ section_id: int
120
+ ) -> List[Dict[str, Any]]:
121
+ """Export test cases to TestRail."""
122
+ if not self.testrail:
123
+ raise ValueError("TestRail client not initialized")
124
+
125
+ results = []
126
+ for test_case in test_cases:
127
+ try:
128
+ result = self.testrail.cases.add_case(
129
+ section_id,
130
+ title=test_case["title"],
131
+ custom_steps=test_case["steps"],
132
+ custom_expected=test_case["expected_results"],
133
+ custom_preconds=test_case["preconditions"],
134
+ priority_id=self._get_testrail_priority_id(test_case["priority"])
135
+ )
136
+ results.append({
137
+ "test_case_id": test_case["id"],
138
+ "testrail_id": result["id"],
139
+ "status": "success"
140
+ })
141
+ except Exception as e:
142
+ logger.error(f"Error exporting test case to TestRail: {str(e)}")
143
+ results.append({
144
+ "test_case_id": test_case["id"],
145
+ "error": str(e),
146
+ "status": "error"
147
+ })
148
+
149
+ return results
150
+
151
+ async def export_to_jira(
152
+ self,
153
+ test_cases: List[Dict[str, Any]],
154
+ project_key: str
155
+ ) -> List[Dict[str, Any]]:
156
+ """Export test cases to JIRA."""
157
+ if not self.jira:
158
+ raise ValueError("JIRA client not initialized")
159
+
160
+ results = []
161
+ for test_case in test_cases:
162
+ try:
163
+ issue_dict = {
164
+ 'project': {'key': project_key},
165
+ 'summary': test_case["title"],
166
+ 'description': self._format_jira_description(test_case),
167
+ 'issuetype': {'name': 'Test'},
168
+ 'priority': {'name': test_case["priority"]}
169
+ }
170
+
171
+ issue = self.jira.create_issue(fields=issue_dict)
172
+ results.append({
173
+ "test_case_id": test_case["id"],
174
+ "jira_id": issue.key,
175
+ "status": "success"
176
+ })
177
+ except Exception as e:
178
+ logger.error(f"Error exporting test case to JIRA: {str(e)}")
179
+ results.append({
180
+ "test_case_id": test_case["id"],
181
+ "error": str(e),
182
+ "status": "error"
183
+ })
184
+
185
+ return results
186
+
187
+ async def export_to_qtest(
188
+ self,
189
+ test_cases: List[Dict[str, Any]],
190
+ project_id: int
191
+ ) -> List[Dict[str, Any]]:
192
+ """Export test cases to qTest."""
193
+ if not self.qtest:
194
+ raise ValueError("qTest client not initialized")
195
+
196
+ results = []
197
+ for test_case in test_cases:
198
+ try:
199
+ test_case_data = {
200
+ "name": test_case["title"],
201
+ "description": self._format_qtest_description(test_case),
202
+ "priority": test_case["priority"],
203
+ "type": test_case["type"]
204
+ }
205
+
206
+ result = self.qtest.create_test_case(project_id, test_case_data)
207
+ results.append({
208
+ "test_case_id": test_case["id"],
209
+ "qtest_id": result["id"],
210
+ "status": "success"
211
+ })
212
+ except Exception as e:
213
+ logger.error(f"Error exporting test case to qTest: {str(e)}")
214
+ results.append({
215
+ "test_case_id": test_case["id"],
216
+ "error": str(e),
217
+ "status": "error"
218
+ })
219
+
220
+ return results
221
+
222
+ def _get_testrail_priority_id(self, priority: str) -> int:
223
+ """Convert priority string to TestRail priority ID."""
224
+ priority_map = {
225
+ "High": 1,
226
+ "Medium": 2,
227
+ "Low": 3
228
+ }
229
+ return priority_map.get(priority, 2)
230
+
231
+ def _format_jira_description(self, test_case: Dict[str, Any]) -> str:
232
+ """Format test case for JIRA description."""
233
+ return f"""
234
+ *Preconditions:*
235
+ {chr(10).join(test_case['preconditions'])}
236
+
237
+ *Test Steps:*
238
+ {chr(10).join(test_case['steps'])}
239
+
240
+ *Expected Results:*
241
+ {chr(10).join(test_case['expected_results'])}
242
+
243
+ *Type:* {test_case['type']}
244
+ """
245
+
246
+ def _format_qtest_description(self, test_case: Dict[str, Any]) -> str:
247
+ """Format test case for qTest description."""
248
+ return f"""
249
+ Preconditions:
250
+ {chr(10).join(test_case['preconditions'])}
251
+
252
+ Test Steps:
253
+ {chr(10).join(test_case['steps'])}
254
+
255
+ Expected Results:
256
+ {chr(10).join(test_case['expected_results'])}
257
+ """
258
+
259
+ test_service = TestService()
tests/test_document_service.py ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ from fastapi import UploadFile
3
+ from io import BytesIO
4
+ from services.document_service import document_service
5
+ import os
6
+
7
+ @pytest.fixture
8
+ def sample_pdf():
9
+ return BytesIO(b"%PDF-1.4\n%Test PDF content")
10
+
11
+ @pytest.fixture
12
+ def sample_docx():
13
+ return BytesIO(b"PK\x03\x04\x14\x00\x00\x00\x08\x00")
14
+
15
+ @pytest.fixture
16
+ def sample_image():
17
+ return BytesIO(b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR")
18
+
19
+ @pytest.mark.asyncio
20
+ async def test_save_upload_file(sample_pdf):
21
+ file = UploadFile(
22
+ filename="test.pdf",
23
+ file=sample_pdf
24
+ )
25
+ file_path = await document_service.save_upload_file(file)
26
+ assert file_path.endswith("test.pdf")
27
+ assert os.path.exists(file_path)
28
+
29
+ @pytest.mark.asyncio
30
+ async def test_process_pdf(sample_pdf):
31
+ file = UploadFile(
32
+ filename="test.pdf",
33
+ file=sample_pdf
34
+ )
35
+ file_path = await document_service.save_upload_file(file)
36
+ result = await document_service.process_document(file_path)
37
+ assert "text" in result
38
+ assert result["type"] == "pdf"
39
+
40
+ @pytest.mark.asyncio
41
+ async def test_process_docx(sample_docx):
42
+ file = UploadFile(
43
+ filename="test.docx",
44
+ file=sample_docx
45
+ )
46
+ file_path = await document_service.save_upload_file(file)
47
+ result = await document_service.process_document(file_path)
48
+ assert "text" in result
49
+ assert result["type"] == "word"
50
+
51
+ @pytest.mark.asyncio
52
+ async def test_process_image(sample_image):
53
+ file = UploadFile(
54
+ filename="test.png",
55
+ file=sample_image
56
+ )
57
+ file_path = await document_service.save_upload_file(file)
58
+ result = await document_service.process_document(file_path)
59
+ assert "text" in result
60
+ assert result["type"] == "image"
61
+
62
+ @pytest.mark.asyncio
63
+ async def test_segment_document():
64
+ text = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5"
65
+ segments = await document_service.segment_document(text, max_segment_size=20)
66
+ assert len(segments) > 0
67
+ assert all(len(segment) <= 20 for segment in segments)
workers/test_worker.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import pika
3
+ from loguru import logger
4
+ from services.queue_service import queue_service
5
+ from services.test_service import test_service
6
+ from services.database_service import db_service
7
+ from models.base import TestCase, TestScript
8
+
9
+ def process_test_generation(ch, method, properties, body):
10
+ """Process test generation messages"""
11
+ try:
12
+ data = json.loads(body)
13
+ document_id = data.get('document_id')
14
+
15
+ with db_service.get_session() as session:
16
+ # Generate test cases
17
+ test_cases = test_service.generate_test_cases(
18
+ document_id=document_id,
19
+ session=session
20
+ )
21
+
22
+ # Save test cases to database
23
+ for test_case in test_cases:
24
+ db_test_case = TestCase(
25
+ document_id=document_id,
26
+ title=test_case['title'],
27
+ description=test_case['description'],
28
+ steps=test_case['steps'],
29
+ expected_results=test_case['expected_results'],
30
+ priority=test_case.get('priority', 1)
31
+ )
32
+ session.add(db_test_case)
33
+
34
+ # Generate test scripts
35
+ for test_case in test_cases:
36
+ scripts = test_service.generate_test_scripts(test_case)
37
+ for script in scripts:
38
+ db_script = TestScript(
39
+ test_case_id=db_test_case.id,
40
+ language=script['language'],
41
+ code=script['code'],
42
+ framework=script['framework']
43
+ )
44
+ session.add(db_script)
45
+
46
+ ch.basic_ack(delivery_tag=method.delivery_tag)
47
+ logger.info(f"Successfully processed test generation for document {document_id}")
48
+
49
+ except Exception as e:
50
+ logger.error(f"Error processing test generation: {str(e)}")
51
+ ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
52
+
53
+ def process_test_execution(ch, method, properties, body):
54
+ """Process test execution messages"""
55
+ try:
56
+ data = json.loads(body)
57
+ test_case_id = data.get('test_case_id')
58
+
59
+ with db_service.get_session() as session:
60
+ test_case = session.query(TestCase).get(test_case_id)
61
+ if not test_case:
62
+ raise ValueError(f"Test case {test_case_id} not found")
63
+
64
+ # Execute test case
65
+ result = test_service.execute_test_case(test_case)
66
+
67
+ # Update test case status
68
+ test_case.status = result['status']
69
+ session.commit()
70
+
71
+ ch.basic_ack(delivery_tag=method.delivery_tag)
72
+ logger.info(f"Successfully executed test case {test_case_id}")
73
+
74
+ except Exception as e:
75
+ logger.error(f"Error executing test case: {str(e)}")
76
+ ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
77
+
78
+ def main():
79
+ """Start the worker"""
80
+ try:
81
+ # Start consuming test generation messages
82
+ queue_service.consume_messages(
83
+ queue="test_generation",
84
+ callback=process_test_generation
85
+ )
86
+
87
+ # Start consuming test execution messages
88
+ queue_service.consume_messages(
89
+ queue="test_execution",
90
+ callback=process_test_execution
91
+ )
92
+
93
+ except KeyboardInterrupt:
94
+ logger.info("Worker stopped by user")
95
+ except Exception as e:
96
+ logger.error(f"Worker error: {str(e)}")
97
+ finally:
98
+ queue_service.close()
99
+
100
+ if __name__ == "__main__":
101
+ main()