Spaces:
Sleeping
Sleeping
""" | |
SFOSR Integrated System (with Binary Validity) | |
Этот модуль объединяет основные компоненты системы SFOSR: | |
- Анализатор структуры (`SFOSRAnalyzer`) | |
- Верификатор контрактов (`ContractVerifier`) | |
- Систему построения доказательств (`ProofSystem`) | |
для обеспечения комплексной обработки и формальной оценки | |
смысловых структур на основе бинарной валидности. | |
""" | |
import json | |
import subprocess | |
import os | |
from typing import Dict, List, Any, Tuple, Optional, Set, Union | |
from .sfosr_database import SFOSRDatabase # Use relative import within the package | |
# Конфигурация системы | |
SFOSR_CONFIG = { | |
"version": "0.4.0", | |
"description": "Integrated SFOSR System", | |
"components": ["analyzer", "verifier", "proof_system"], | |
"debug_mode": False, | |
"auto_update_plausibility": True # Автоматическое обновление plausibility | |
} | |
# Общие типы векторов | |
VECTOR_TYPES = { | |
"Causality": {"weight": 2.0, "requires_justification": True, "description": "Причинно-следственная связь"}, | |
"Implication": {"weight": 1.8, "requires_justification": True, "description": "Логическое следование (если-то)"}, | |
"Transformation": {"weight": 1.5, "requires_justification": False, "description": "Превращение из одного состояния в другое"}, | |
"Goal": {"weight": 1.3, "requires_justification": False, "description": "Целеполагание, намерение"}, | |
"Prevention": {"weight": 1.3, "requires_justification": False, "description": "Предотвращение нежелательного исхода"}, | |
"Contrast": {"weight": 1.2, "requires_justification": False, "description": "Противопоставление"}, | |
"Comparison": {"weight": 1.0, "requires_justification": False, "description": "Сравнение элементов"}, | |
"Inclusion": {"weight": 0.8, "requires_justification": False, "description": "Отношение часть-целое"}, | |
"Attribution": {"weight": 0.7, "requires_justification": False, "description": "Приписывание свойства объекту"}, | |
"Temporal": {"weight": 1.1, "requires_justification": False, "description": "Временная последовательность"}, | |
"Qualification": {"weight": 0.6, "requires_justification": False, "description": "Ограничение или уточнение"}, | |
"Definition": {"weight": 1.4, "requires_justification": False, "description": "Определение понятия"}, | |
"PartOf": {"constraints": [], "requires_justification": False}, | |
"Mechanism": {"constraints": [], "requires_justification": True}, | |
"Example": {"constraints": [], "requires_justification": False}, | |
"Requirement": {"constraints": [], "requires_justification": True}, | |
"Action": {"constraints": [], "requires_justification": False}, | |
"Capability": {"constraints": [], "requires_justification": False}, | |
"PropertyOf": {"constraints": [], "requires_justification": False}, | |
"Purpose": {"constraints": [], "requires_justification": False}, | |
"Governs": {"constraints": [], "requires_justification": False}, | |
"Contains": {"constraints": [], "requires_justification": False}, | |
"Represents": {"constraints": [], "requires_justification": False}, | |
"Context": {"constraints": [], "requires_justification": False}, | |
"IsA": {"constraints": [], "requires_justification": False}, | |
# "ActsOn": {"constraints": [], "requires_justification": False}, # Removed as it's not used now | |
"Dependency": {"constraints": [], "requires_justification": True} | |
} | |
# Интерфейс для интеграции компонентов | |
class SFOSRSystem: | |
""" | |
Основной класс интегрированной системы SFOSR (на бинарной валидности) | |
Объединяет: | |
- Анализ структуры векторов | |
- Проверку контрактов и определение валидности | |
- Построение валидных доказательств | |
""" | |
def __init__(self, db_path="sfosr.db", debug=False): | |
self.db_path = db_path | |
self.db = SFOSRDatabase(db_path) # Database connection | |
self._analyzer = SFOSRAnalyzer() | |
# Prepare data for ContractVerifier | |
all_concepts = self.db.get_all_concepts() | |
known_concepts_names = {c['name'] for c in all_concepts} | |
concepts_data_map = {c['name']: c for c in all_concepts} | |
self._verifier = ContractVerifier(known_concepts=known_concepts_names, concepts_data=concepts_data_map) | |
self._proof_system = ProofSystem(db_conn=self.db) # Pass db connection | |
self.concept_graph = None | |
self.debug = debug | |
# Load inference rules from DB | |
try: | |
db_rules = self.db.get_inference_rules() | |
if db_rules: | |
self._proof_system.load_rules(db_rules) | |
except Exception as e: | |
if self.debug: | |
print(f"Error loading inference rules from database: {str(e)}") | |
# Графы | |
self.concept_graph = {} | |
def process(self, input_data): | |
""" | |
Основной метод обработки входных данных (с бинарной валидностью) | |
Последовательно выполняет: | |
1. Анализ структуры | |
2. Проверку контрактов (определение is_valid) | |
3. Построение доказательств (если применимо, определение is_valid) | |
(Использует только валидированные векторы из input_data, без обогащения из БД) | |
Args: | |
input_data: Словарь с текстом и векторами SFOSR | |
Returns: | |
Dict: Результаты обработки (с полями is_valid) | |
""" | |
# Шаг 1: Анализ структуры | |
analysis_result = self._analyzer.analyze(input_data) | |
self.concept_graph = analysis_result["concept_graph"] | |
# Если анализ не прошел | |
if analysis_result["analysis_status"] != "Completed": | |
return { | |
"status": "Error", | |
"message": f"Analysis failed: {analysis_result['analysis_status']}", | |
"details": { | |
"validation_issues": analysis_result["validation_issues"] | |
} | |
} | |
# Получаем инстансы из контекста, если они есть | |
instance_definitions = input_data.get("instance_definitions", {}) | |
# Шаг 2: Проверка контрактов | |
vectors_to_verify = analysis_result["vectors_analyzed"] | |
# Передаем инстансы в верификатор | |
verification_result = self._verifier.verify_all(vectors_to_verify, instance_definitions) | |
# Собираем только валидные векторы для системы доказательств | |
valid_input_vectors = [] | |
vectors_data = verification_result["vectors_data"] | |
for vector in vectors_to_verify: | |
v_id = vector.get("id") | |
# Используем get для безопасного доступа и проверяем наличие ключа 'vector' | |
vector_dict = vectors_data.get(v_id, {}).get('vector') | |
if vector_dict and vectors_data[v_id].get("is_valid", False): | |
valid_input_vectors.append(vector_dict) # Добавляем исходный вектор | |
# --- Генерация временных IsA векторов --- | |
temporary_isa_vectors = [] | |
for instance_id, definition in instance_definitions.items(): | |
general_type = definition.get('is_a') | |
instance_label = definition.get('label', instance_id) # Use label or ID | |
if general_type: | |
# Проверяем, существует ли общий тип в БД | |
if self.db.get_concept_by_name(general_type): | |
temporary_isa_vectors.append({ | |
"id": f"isa_{instance_id}", # Уникальный временный ID | |
"source": instance_id, # Используем временный ID | |
"target": general_type, # Ссылка на общий тип в БД | |
"type": "IsA", | |
"axis": "classification", | |
"justification": f"Instance '{instance_label}' defined as type '{general_type}' in input context.", | |
"is_valid": True # Считаем эти связи априори валидными для доказательства | |
}) | |
else: | |
print(f"Warning: General type '{general_type}' for instance '{instance_id}' not found in DB. Skipping IsA vector generation.") | |
# ---------------------------------------- | |
# Базовый результат обработки | |
result = { | |
"status": "Success", | |
"input_text": input_data.get("text", ""), | |
"analysis": { | |
"status": analysis_result["analysis_status"], | |
"is_compilable": analysis_result["is_compilable"], | |
"graph_metrics": analysis_result["graph_metrics"] | |
}, | |
"verification": { | |
"total_vectors": verification_result["total_vectors_processed"], | |
"valid_count": verification_result["valid_count"], | |
"compliance_rate": verification_result["compliance_rate"], | |
"vectors_data": verification_result["vectors_data"] | |
} | |
} | |
# Шаг 3: Построение доказательств | |
vectors_for_proof = valid_input_vectors + temporary_isa_vectors | |
# Запускаем, если есть запрос И есть ХОТЬ КАКИЕ-ТО векторы (входные или IsA) | |
if "proof_query" in input_data and vectors_for_proof: | |
query = input_data["proof_query"] | |
source = query.get("source") | |
target = query.get("target") | |
if source and target: | |
proof_result = self._proof_system.construct_proof( | |
vectors_for_proof, source, target | |
) | |
result["proof"] = proof_result | |
else: # Нет source/target | |
result["proof"] = {"status": "Failed", "reason": "Missing source or target in proof query", "is_valid": False} | |
# else: # Нет proof_query или нет векторов - proof не создается | |
# pass | |
return result | |
def analyze(self, input_data): | |
"""Удобный метод для выполнения только анализа""" | |
return self._analyzer.analyze(input_data) | |
def verify(self, input_data): | |
"""Удобный метод для выполнения только верификации""" | |
vectors = input_data.get("vectors", []) | |
# Сначала базовый анализ для получения структурно валидных векторов | |
analysis_result = self._analyzer.analyze(input_data) | |
# Передаем пустой словарь instance_definitions, т.к. verify не работает с контекстом | |
return self._verifier.verify_all(analysis_result["vectors_analyzed"], instance_definitions={}) | |
def prove(self, input_data, source, target): | |
"""Удобный метод для построения доказательства. | |
Анализирует, верифицирует и строит доказательство, используя только | |
валидированные векторы из input_data (без обогащения из БД). | |
""" | |
# Сначала анализ | |
analysis_res = self.analyze(input_data) | |
if analysis_res["analysis_status"] != "Completed": | |
return {"status": "Failed", "reason": "Analysis failed"} | |
# Получаем список векторов, прошедших анализ | |
vectors_analyzed = analysis_res.get("vectors_analyzed", []) | |
if not vectors_analyzed: | |
return {"status": "Failed", "reason": "No vectors passed analysis"} | |
# Затем верификация этих векторов | |
# Передаем пустой instance_definitions, т.к. prove работает с готовым input_data | |
# Хотя, возможно, стоило бы передавать реальный instance_definitions из input_data? | |
# Пока оставим пустым для совместимости. | |
verification_res = self._verifier.verify_all(vectors_analyzed, instance_definitions={}) | |
vectors_data = verification_res.get("vectors_data", {}) | |
# Извлекаем валидные векторы, ИТЕРРИРУЯ ПО ИСХОДНОМУ СПИСКУ | |
valid_vectors = [ | |
vector # Берем исходный вектор | |
for vector in vectors_analyzed # Итерируем по результатам анализа | |
if vectors_data.get(vector.get("id", ""), {}).get("is_valid", False) # Проверяем валидность в результатах верификации | |
] | |
if not valid_vectors: | |
return {"status": "Failed", "reason": "No valid vectors after verification"} | |
# Enrichment is disabled, use valid_vectors directly | |
vectors_for_proof = valid_vectors | |
return self._proof_system.construct_proof(vectors_for_proof, source, target) | |
def get_concept_info(self, concept_name): | |
""" | |
Получение информации о концепте из БД | |
Args: | |
concept_name: Имя концепта | |
Returns: | |
Dict: Информация о концепте или None | |
""" | |
return self.db.get_complete_concept_info(concept_name) | |
def find_related_concepts(self, concept_name, depth=1): | |
""" | |
Поиск связанных концептов | |
Args: | |
concept_name: Имя концепта | |
depth: Глубина поиска | |
Returns: | |
List: Список связанных концептов | |
""" | |
concept = self.db.get_concept_by_name(concept_name) | |
if not concept: | |
return [] | |
return self.db.get_related_concepts(concept["id"], depth) | |
def add_concept_to_db(self, name, description, domain, level): | |
""" | |
Добавление нового концепта в БД | |
Args: | |
name: Имя концепта | |
description: Описание | |
domain: Домен (область знаний) | |
level: Уровень абстракции | |
Returns: | |
int: ID добавленного концепта | |
""" | |
return self.db.add_concept(name, description, domain, level) | |
def add_vector_to_db(self, source_name, target_name, vector_type, axis, justification=None): | |
""" | |
Добавление нового вектора в БД | |
Args: | |
source_name: Имя исходного концепта | |
target_name: Имя целевого концепта | |
vector_type: Тип вектора | |
axis: Ось | |
justification: Обоснование | |
Returns: | |
int: ID добавленного вектора или None в случае ошибки | |
""" | |
source = self.db.get_concept_by_name(source_name) | |
target = self.db.get_concept_by_name(target_name) | |
if not source or not target: | |
return None | |
return self.db.add_vector(source["id"], target["id"], vector_type, axis, justification) | |
# Реализация компонентов системы | |
class SFOSRAnalyzer: | |
""" | |
Анализатор структуры векторов SFOSR | |
Отвечает за: | |
- Проверку синтаксиса и базовой структуры векторов | |
- Проверку компилируемости (наличие необходимых полей) | |
- Построение графа концептов | |
""" | |
def __init__(self, vector_types=None): | |
"""Инициализация анализатора""" | |
self.vector_types = vector_types or VECTOR_TYPES | |
def build_concept_graph(self, vectors): | |
""" | |
Строит граф концептов и связей между ними | |
Args: | |
vectors: Список векторов SFOSR | |
Returns: | |
Dict: Структура графа с узлами и связями | |
""" | |
# Структура для хранения графа | |
graph = { | |
"nodes": set(), # уникальные концепты | |
"edges": [], # связи (кортежи source, target, vector_id) | |
"adjacency": {}, # словарь смежности для быстрого доступа | |
} | |
# Собираем все уникальные концепты и ребра | |
all_nodes = set() | |
for vector in vectors: | |
source = vector.get("source") | |
target = vector.get("target") | |
vector_id = vector.get("id") | |
if source: | |
all_nodes.add(source) | |
if source not in graph["adjacency"]: | |
graph["adjacency"][source] = {"out": [], "in": []} | |
if target: | |
all_nodes.add(target) | |
if target not in graph["adjacency"]: | |
graph["adjacency"][target] = {"out": [], "in": []} | |
if source and target and vector_id: | |
edge = (source, target, vector_id) | |
graph["edges"].append(edge) | |
graph["adjacency"][source]["out"].append((target, vector_id)) | |
graph["adjacency"][target]["in"].append((source, vector_id)) | |
graph["nodes"] = all_nodes | |
return graph | |
def validate_vector_structure(self, vector): | |
""" | |
Проверяет структуру вектора на соответствие базовым требованиям | |
Args: | |
vector: Словарь с данными вектора | |
Returns: | |
Tuple[bool, Optional[str]]: (валидность, сообщение об ошибке) | |
""" | |
required_keys = ["id", "source", "target", "type", "axis"] | |
missing_keys = [key for key in required_keys if key not in vector or not vector[key]] | |
if missing_keys: | |
return False, f"Vector {vector.get('id', 'Unknown')} missing keys: {', '.join(missing_keys)}" | |
# Проверяем, существует ли указанный тип вектора | |
vector_type = vector.get("type") | |
if vector_type not in self.vector_types: | |
return False, f"Vector {vector.get('id', 'Unknown')} has invalid type: {vector_type}" | |
return True, None | |
def validate_compilability(self, vector): | |
""" | |
Проверяет на компилируемость (достаточность данных) | |
Args: | |
vector: Словарь с данными вектора | |
Returns: | |
Tuple[bool, Optional[str]]: (компилируемость, сообщение об ошибке) | |
""" | |
vector_type = vector.get("type") | |
# Проверяем требования обоснования в зависимости от типа | |
if (vector_type in self.vector_types and | |
self.vector_types[vector_type]["requires_justification"]): | |
if not vector.get("justification"): | |
return False, f"Vector {vector.get('id', 'Unknown')} requires justification for type {vector_type}." | |
return True, None | |
def analyze(self, input_data): | |
""" | |
Главная функция анализа структуры SFOSR (упрощенная) | |
Args: | |
input_data: Словарь с текстом и векторами | |
Returns: | |
Dict: Результаты анализа (валидация и граф) | |
""" | |
input_text = input_data.get("text", "N/A") | |
vectors = input_data.get("vectors", []) | |
valid_vectors = [] | |
validation_issues = [] | |
analysis_status = "Completed" | |
# 1. Валидация структуры и компилируемости каждого вектора | |
for vector in vectors: | |
is_struct_valid, struct_error = self.validate_vector_structure(vector) | |
if not is_struct_valid: | |
validation_issues.append(struct_error) | |
analysis_status = "Validation Error" | |
continue # Невалидную структуру дальше не проверяем | |
is_comp_valid, comp_error = self.validate_compilability(vector) | |
if not is_comp_valid: | |
validation_issues.append(comp_error) | |
# Продолжаем анализ, но помечаем проблему | |
# Собираем только структурно валидные векторы | |
valid_vectors.append(vector) | |
# Определяем компилируемость по наличию проблем | |
is_compilable = len(validation_issues) == 0 | |
if not is_compilable and analysis_status == "Completed": | |
analysis_status = "Compilability Error" # Если были только проблемы компилируемости | |
# 2. Построение графа концептов (только из структурно валидных векторов) | |
graph = self.build_concept_graph(valid_vectors) | |
# 3. Формирование упрощенного результата | |
result = { | |
"input_text": input_text, | |
"analysis_status": analysis_status, | |
"is_compilable": is_compilable, | |
"validation_issues": validation_issues, | |
"graph_metrics": { # Упрощенные метрики графа | |
"concepts_count": len(graph["nodes"]), | |
"connections_count": len(graph["edges"]), | |
}, | |
"vectors_analyzed": valid_vectors, # Содержит только структурно валидные | |
"concept_graph": graph | |
} | |
return result | |
class ContractVerifier: | |
""" | |
Верификатор контрактов векторов SFOSR | |
Проверяет соответствие векторов формальным контрактам, | |
определяет бинарную валидность (`is_valid`) вектора | |
и собирает метаданные. | |
""" | |
def __init__(self, contract_types=None, known_concepts: Optional[Set[str]] = None, concepts_data: Optional[Dict[str, Dict]] = None): | |
"""Инициализация верификатора с известными концептами и их данными (уровнями).""" | |
self.contract_types = contract_types or set(VECTOR_TYPES.keys()) | |
self.known_concepts = known_concepts or set() | |
# --- Сохраняем данные об уровнях --- | |
self.concepts_data = concepts_data or {} | |
# ---------------------------------- | |
self.axis_registry = set() | |
def verify_vector_contract(self, vector: Dict[str, Any], instance_definitions: Dict[str, Dict]) -> Tuple[bool, List[str], Dict[str, Any]]: | |
"""Проверяет отдельный вектор на соответствие контрактам""" | |
issues = [] | |
metadata = {} | |
is_valid = True # Начинаем с предположения о валидности | |
# --- Проверка существования концептов и их типов --- | |
source_name = vector.get("source") | |
target_name = vector.get("target") | |
vector_type = vector.get("type") | |
vector_id = vector.get("id", "Unknown") | |
# --- Получаем реальные ТИПЫ концептов для проверки --- | |
source_type_name = source_name | |
target_type_name = target_name | |
is_source_instance = False | |
is_target_instance = False | |
if source_name in instance_definitions: | |
source_type_name = instance_definitions[source_name].get("is_a") | |
is_source_instance = True | |
if not source_type_name: | |
issues.append(f"Instance '{source_name}' in vector {vector_id} has no 'is_a' type defined in context.") | |
is_valid = False | |
source_type_name = None # Не можем проверить дальше | |
if target_name in instance_definitions: | |
target_type_name = instance_definitions[target_name].get("is_a") | |
is_target_instance = True | |
if not target_type_name: | |
issues.append(f"Instance '{target_name}' in vector {vector_id} has no 'is_a' type defined in context.") | |
is_valid = False | |
target_type_name = None # Не можем проверить дальше | |
# ---------------------------------------------------- | |
source_concept_data = None | |
target_concept_data = None | |
if source_type_name and is_valid: | |
source_concept_data = self.concepts_data.get(source_type_name) | |
if not source_concept_data: | |
issues.append(f"Source concept/type '{source_type_name}' (for '{source_name}') not found in known concepts for vector {vector_id}.") | |
is_valid = False | |
if target_type_name and is_valid: | |
target_concept_data = self.concepts_data.get(target_type_name) | |
if not target_concept_data: | |
issues.append(f"Target concept/type '{target_type_name}' (for '{target_name}') not found in known concepts for vector {vector_id}.") | |
is_valid = False | |
# --- Проверка контрактов типа Transformation --- | |
if is_valid and vector_type == "Transformation": | |
if source_name == target_name: | |
issues.append(f"Transformation vector {vector_id} cannot have the same source and target ('{source_name}').") | |
is_valid = False | |
# --- Проверка контракта для Causality (разные уровни) --- | |
if is_valid and vector_type == "Causality" and "level" in vector.get("axis", ""): | |
if source_concept_data and target_concept_data: | |
source_level = source_concept_data.get('level') | |
target_level = target_concept_data.get('level') | |
if source_level and target_level and source_level == target_level: | |
issues.append(f"Causality vector {vector_id} ('{source_type_name}' -> '{target_type_name}') links concepts on the same level '{source_level}' with axis containing 'level'.") | |
is_valid = False | |
# --- Добавить другие специфичные для типов векторов проверки --- | |
# Например, для ActsOn: source должен быть подтипом Action, target - подтипом Object? | |
# Это потребует иерархии в БД или более сложной логики. | |
# Регистрация осей остается | |
if vector.get("axis") and vector["axis"] not in self.axis_registry: | |
self.axis_registry.add(vector["axis"]) | |
# Добавляем сами данные вектора в метаданные для использования в `prove` | |
# metadata['vector'] = vector # Убрали - теперь prove получает исходный список | |
return is_valid, issues, metadata | |
def verify_all(self, vectors: List[Dict[str, Any]], instance_definitions: Dict[str, Dict]) -> Dict[str, Any]: | |
"""Проверка всех векторов, агрегация валидности и метаданных""" | |
vectors_data = {} | |
valid_count = 0 | |
processed_count = 0 | |
for vector in vectors: | |
processed_count += 1 | |
vector_id = vector.get("id", f"unknown_{processed_count}") | |
# Передаем instance_definitions в проверку контракта | |
is_valid, issues, metadata = self.verify_vector_contract(vector, instance_definitions) | |
vectors_data[vector_id] = { | |
"vector": vector, | |
"is_valid": is_valid, | |
"issues": issues, | |
"metadata": metadata | |
} | |
if is_valid: | |
valid_count += 1 | |
# Формируем отчет | |
report = { | |
"total_vectors_processed": processed_count, | |
"valid_count": valid_count, | |
"compliance_rate": round(valid_count / processed_count, 3) if processed_count > 0 else 0.0, | |
"vectors_data": vectors_data # Основные данные теперь здесь | |
} | |
return report | |
class ProofSystem: | |
""" | |
Система построения доказательств SFOSR | |
Отвечает за: | |
- Построение доказательств на основе ВАЛИДНЫХ векторов (и данных из БД) | |
- Проверку итоговой валидности (`is_valid`) доказательств | |
- Поиск путей доказательства между концептами (с использованием БД) | |
""" | |
def __init__(self, db_conn): | |
"""Инициализация системы доказательств. | |
Args: | |
db_conn: Экземпляр SFOSRDatabase для доступа к БД. | |
""" | |
self.db_conn = db_conn # Store the database connection | |
# Базовые правила вывода (с бинарной валидностью) | |
self.inference_rules = { | |
"chain_rule": { | |
"pattern": "A → B, B → C ⊢ A → C", | |
"premise_types": ["Implication", "Implication"], | |
"conclusion_type": "Implication", | |
"domain": "logical_inference" | |
}, | |
"causality_transfer": { | |
"pattern": "A → B (Causality), B → C (Causality) ⊢ A → C (Causality)", | |
"premise_types": ["Causality", "Causality"], | |
"conclusion_type": "Causality", | |
"domain": "causal_inference" | |
}, | |
"implication_causality_chain": { | |
"pattern": "A → B (Implication), B → C (Causality) ⊢ A → C (Causality)", | |
"premise_types": ["Implication", "Causality"], | |
"conclusion_type": "Causality", | |
"domain": "mixed_inference" | |
}, | |
# --- New Rule --- | |
"part_of_transitivity": { | |
"pattern": "A PartOf B, B PartOf C ⊢ A PartOf C", | |
"premise_types": ["PartOf", "PartOf"], | |
"conclusion_type": "PartOf", | |
"domain": "mereology" | |
}, | |
# --- НОВОЕ ПРАВИЛО --- | |
"action_causality_chain": { | |
"pattern": "A -> B (Action), B -> C (Causality) |- A -> C (Causality)", | |
"premise_types": ["Action", "Causality"], | |
"conclusion_type": "Causality", | |
"domain": "action_inference" | |
}, | |
# --- ЕЩЕ ОДНО НОВОЕ ПРАВИЛО --- | |
"action_isa_generalization": { | |
"pattern": "A -> B_inst (Action), B_inst IsA B_type |- A -> B_type (Action)", | |
"premise_types": ["Action", "IsA"], | |
"conclusion_type": "Action", # Результат - обобщенное действие | |
"domain": "inheritance_inference" | |
} | |
} | |
# Кэш для хранения построенных доказательств (только структура вывода) | |
self.proof_cache = {} | |
def load_rules(self, db_rules): | |
""" | |
Загрузка правил вывода из БД (игнорируя любые старые данные plausibility) | |
Args: | |
db_rules: Словарь с правилами вывода из БД | |
""" | |
for name, rule_data in db_rules.items(): | |
rule_data.pop('plausibility', None) # Убеждаемся, что plausibility удалено | |
self.inference_rules[name] = rule_data | |
# --- Helper for Input-Only BFS --- | |
def _find_path_using_input_graph(self, input_graph, source_concept, target_concept) -> Dict[str, Any]: | |
"""BFS using only the input graph.""" | |
# print(f"DEBUG _find_path_using_input_graph: Start {source_concept} -> {target_concept}") # UNCOMMENTED | |
if source_concept not in input_graph["nodes"]: | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Source '{source_concept}' not in input graph nodes: {input_graph['nodes']}") | |
# --- END DEBUG PRINT --- | |
return {"status": "Source node not found"} | |
visited = {source_concept} | |
queue: List[Tuple[str, List[Tuple[str, str, str, str]]]] = [(source_concept, [])] | |
while queue: | |
current_concept, path = queue.pop(0) | |
# print(f"DEBUG _find_path_using_input_graph: Dequeue '{current_concept}'") # UNCOMMENTED | |
if current_concept in input_graph["adjacency"]: | |
for next_concept_input, vector_id_input in input_graph["adjacency"][current_concept].get("out", []): | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Input BFS): Edge {current_concept} -> {next_concept_input} via {vector_id_input}") # UNCOMMENTED | |
# --- END DEBUG PRINT --- | |
if next_concept_input == target_concept: | |
final_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] | |
# print(f"DEBUG _find_path_using_input_graph: Target reached. Path: {final_path}") # UNCOMMENTED | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Input BFS): Target '{target_concept}' reached. Path: {final_path}") | |
# --- END DEBUG PRINT --- | |
return {"status": "Path found", "path": final_path, "db_vectors_used": []} | |
if next_concept_input not in visited: | |
visited.add(next_concept_input) | |
new_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] | |
queue.append((next_concept_input, new_path)) | |
# print(f"DEBUG _find_path_using_input_graph: Enqueue '{next_concept_input}'") # UNCOMMENTED | |
# print(f"DEBUG _find_path_using_input_graph: Path not found.") # UNCOMMENTED | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Input BFS): Path not found from '{source_concept}' to '{target_concept}'") | |
# --- END DEBUG PRINT --- | |
return {"status": "Path not found (input only)"} | |
# --- Helper for Combined BFS --- | |
def _find_path_using_combined_graph(self, input_graph, source_concept, target_concept) -> Dict[str, Any]: | |
"""BFS using input graph AND database lookups.""" | |
# print(f"\\nDEBUG _find_path_using_combined_graph: Start {source_concept} -> {target_concept}") | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): Start {source_concept} -> {target_concept}") | |
# --- END DEBUG PRINT --- | |
if source_concept not in input_graph["nodes"]: | |
if not self.db_conn.get_concept_by_name(source_concept): | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): Source '{source_concept}' not in input graph or DB.") | |
# --- END DEBUG PRINT --- | |
return {"status": "Source node not found"} | |
# visited теперь словарь: {concept_name: origin ('input' или 'db')} | |
visited: Dict[str, str] = {source_concept: 'start'} | |
queue: List[Tuple[str, List[Tuple[str, str, str, str]], Set[int]]] = [(source_concept, [], set())] | |
used_db_vector_ids = set() | |
db_vector_cache = {} | |
while queue: | |
current_concept, path, current_used_db_ids = queue.pop(0) | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): Dequeue '{current_concept}'") | |
# --- END DEBUG PRINT --- | |
# --- Шаг 1: Входной граф --- | |
if current_concept in input_graph["adjacency"]: | |
for next_concept_input, vector_id_input in input_graph["adjacency"][current_concept].get("out", []): | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): Input Edge {current_concept} -> {next_concept_input} via {vector_id_input}") | |
# --- END DEBUG PRINT --- | |
if next_concept_input == target_concept: | |
final_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] | |
final_db_vectors_list = [ self.db_conn.convert_db_vector_to_system_format(db_vector_cache[vid]) for vid in current_used_db_ids if vid in db_vector_cache ] | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): Target '{target_concept}' reached via input edge. Path: {final_path}") | |
# --- END DEBUG PRINT --- | |
return {"status": "Path found", "path": final_path, "db_vectors_used": final_db_vectors_list} | |
if next_concept_input not in visited: | |
visited[next_concept_input] = 'input' # Помечаем как посещенный через input | |
new_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] | |
queue.append((next_concept_input, new_path, current_used_db_ids)) | |
# --- Шаг 2: База Данных --- | |
try: | |
current_concept_info = self.db_conn.get_concept_by_name(current_concept) | |
if not current_concept_info: | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): Concept '{current_concept}' not found in DB for DB search.") | |
# --- END DEBUG PRINT --- | |
continue | |
current_concept_id = current_concept_info['id'] | |
# --- MORE DEBUG --- | |
# print(f"DEBUG _find_path_using_combined_graph: Querying DB vectors for concept '{current_concept}' (ID: {current_concept_id})") | |
# --- END MORE DEBUG --- | |
db_vectors_raw = self.db_conn.get_vectors_for_concept(current_concept_id) | |
# --- MORE DEBUG --- | |
# print(f"DEBUG _find_path_using_combined_graph: Received {len(db_vectors_raw)} vectors from DB for ID {current_concept_id}:") | |
# for dbv in db_vectors_raw: | |
# print(f" - ID: V{dbv.get('id')}, Type: {dbv.get('vector_type')}, Source: {dbv.get('source_name')}, Target: {dbv.get('target_name')}") | |
# --- END MORE DEBUG --- | |
for db_vector in db_vectors_raw: | |
if db_vector['source_id'] == current_concept_id: | |
next_concept_db = db_vector['target_name'] | |
db_vector_actual_id = db_vector['id'] | |
db_vector_system_id = f"V{db_vector_actual_id}" | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): DB Edge {current_concept} -> {next_concept_db} via {db_vector_system_id}") | |
# --- END DEBUG PRINT --- | |
if db_vector_actual_id not in db_vector_cache: | |
db_vector_cache[db_vector_actual_id] = db_vector | |
new_used_db_ids = current_used_db_ids.union({db_vector_actual_id}) | |
if next_concept_db == target_concept: | |
final_path = path + [(current_concept, next_concept_db, db_vector_system_id, 'db')] | |
final_db_vectors_list = [ self.db_conn.convert_db_vector_to_system_format(db_vector_cache[vid]) for vid in new_used_db_ids if vid in db_vector_cache ] | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof (Combined BFS): Target '{target_concept}' reached via DB edge. Path: {final_path}") | |
# --- END DEBUG PRINT --- | |
return {"status": "Path found", "path": final_path, "db_vectors_used": final_db_vectors_list} | |
# Проверяем, был ли узел посещен и откуда | |
current_visit_status = visited.get(next_concept_db) | |
# Добавляем в очередь, ТОЛЬКО если не посещен через input | |
if current_visit_status != 'input': | |
# Если еще не посещался или посещался через db, обновляем/добавляем | |
if current_visit_status is None or current_visit_status == 'db': | |
visited[next_concept_db] = 'db' # Помечаем как посещенный через db | |
new_path = path + [(current_concept, next_concept_db, db_vector_system_id, 'db')] | |
queue.append((next_concept_db, new_path, new_used_db_ids)) | |
except Exception as e: | |
print(f"DB Error during path finding in combined search: {e}") | |
return {"status": "DB error", "reason": str(e)} | |
return {"status": "Path not found (combined)"} | |
# --- Orchestrator Method --- | |
def find_proof_path(self, input_graph, source_concept, target_concept) -> Dict[str, Any]: | |
""" | |
Ищет путь доказательства: сначала только по входным данным, затем с БД. | |
Args: | |
input_graph: Граф, построенный ТОЛЬКО из валидных входных векторов. | |
source_concept: Имя исходного концепта. | |
target_concept: Имя целевого концепта. | |
Returns: | |
Dict: Результат поиска пути (статус, путь, db_vectors_used). | |
""" | |
# Phase 1: Input vectors only | |
# print("DEBUG find_proof_path: Starting Phase 1 (Input Only)") | |
input_path_info = self._find_path_using_input_graph(input_graph, source_concept, target_concept) | |
if input_path_info["status"] == "Path found": | |
# print("DEBUG find_proof_path: Path found in Phase 1. Returning.") | |
return input_path_info | |
# Phase 2: Combined input and DB vectors | |
# print("DEBUG find_proof_path: Path not found in Phase 1. Starting Phase 2 (Combined Input+DB)") | |
combined_path_info = self._find_path_using_combined_graph(input_graph, source_concept, target_concept) | |
if combined_path_info["status"] == "Path not found (combined)": | |
combined_path_info["status"] = "Path not found" | |
# print(f"DEBUG find_proof_path: Phase 2 finished with status: {combined_path_info['status']}") | |
return combined_path_info | |
def _apply_chain_rule(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: | |
"""Логика для правила chain_rule. Возвращает (вывод, валидность_шага).""" | |
if len(premises) != 2: | |
return None, False | |
v1, v2 = premises | |
# Проверяем соответствие паттерну правила | |
if v1["target"] == v2["source"] and \ | |
v1["type"] == "Implication" and \ | |
v2["type"] == "Implication": # Вторая посылка должна быть Implication | |
# Определяем тип вывода (просто берем из правила) | |
conclusion_type = self.inference_rules["chain_rule"]["conclusion_type"] | |
# Формируем вывод | |
conclusion = { | |
"id": f"S{len(self.proof_cache) + 1}", # Генерируем ID для шага | |
"source": v1["source"], | |
"target": v2["target"], | |
"type": conclusion_type, | |
"axis": v1["axis"], # Берем ось из первой посылки (можно уточнить) | |
"justification": f"Derived by chain_rule from {v1['id']} and {v2['id']}", | |
"derived": True # Помечаем, что вектор выведен | |
} | |
return conclusion, True # Шаг валиден | |
return None, False # Правило неприменимо | |
def _apply_causality_transfer(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: | |
"""Логика для правила causality_transfer. Возвращает (вывод, валидность_шага).""" | |
if len(premises) != 2: | |
return None, False | |
v1, v2 = premises | |
# Проверяем соответствие паттерну правила | |
if v1["target"] == v2["source"] and \ | |
v1["type"] == "Causality" and \ | |
v2["type"] == "Causality": # Вторая посылка должна быть Causality | |
conclusion_type = self.inference_rules["causality_transfer"]["conclusion_type"] | |
conclusion = { | |
"id": f"S{len(self.proof_cache) + 1}", | |
"source": v1["source"], | |
"target": v2["target"], | |
"type": conclusion_type, | |
"axis": v1["axis"], | |
"justification": f"Derived by causality_transfer from {v1['id']} and {v2['id']}", | |
"derived": True | |
} | |
return conclusion, True | |
return None, False | |
def _apply_implication_causality_chain(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: | |
"""Логика для правила implication_causality_chain. Возвращает (вывод, валидность_шага).""" | |
if len(premises) != 2: | |
return None, False | |
v1, v2 = premises | |
# Проверяем соответствие паттерну | |
if v1["target"] == v2["source"] and \ | |
v1["type"] == "Implication" and \ | |
v2["type"] == "Causality": | |
conclusion_type = self.inference_rules["implication_causality_chain"]["conclusion_type"] | |
conclusion = { | |
"id": f"S{len(self.proof_cache) + 1}", | |
"source": v1["source"], | |
"target": v2["target"], | |
"type": conclusion_type, | |
"axis": v1["axis"], # Берем ось из первой посылки | |
"justification": f"Derived by implication_causality_chain from {v1['id']} and {v2['id']}", | |
"derived": True | |
} | |
return conclusion, True | |
return None, False | |
# --- New Method for PartOf Rule --- | |
def _apply_part_of_transitivity(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: | |
"""Логика для правила part_of_transitivity.""" | |
if len(premises) != 2: | |
return None, False | |
v1, v2 = premises | |
# Проверяем типы посылок и связь | |
if v1["target"] == v2["source"] and \ | |
v1.get("type") == "PartOf" and \ | |
v2.get("type") == "PartOf": | |
conclusion_type = self.inference_rules["part_of_transitivity"]["conclusion_type"] | |
conclusion = { | |
"id": f"S{len(self.proof_cache) + 1}", | |
"source": v1["source"], | |
"target": v2["target"], | |
"type": conclusion_type, | |
"axis": v1.get("axis", "partonomy"), # Use axis from v1 or default | |
"justification": f"Derived by part_of_transitivity from {v1.get('id', '?')} and {v2.get('id', '?')}", | |
"derived": True | |
} | |
return conclusion, True | |
return None, False | |
# --- Новая логика для правила Action -> Causality --- | |
def _apply_action_causality_chain(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: | |
"""Логика для правила action_causality_chain. Возвращает (вывод, валидность_шага).""" | |
if len(premises) != 2: | |
return None, False | |
v1, v2 = premises | |
# Проверяем соответствие паттерну правила: A->B (Action), B->C (Causality) | |
if v1["target"] == v2["source"] and \ | |
v1.get("type") == "Action" and \ | |
v2.get("type") == "Causality": | |
conclusion_type = self.inference_rules["action_causality_chain"]["conclusion_type"] | |
conclusion = { | |
"id": f"S{len(self.proof_cache) + 1}", | |
"source": v1["source"], | |
"target": v2["target"], | |
"type": conclusion_type, | |
"axis": v1.get("axis", v2.get("axis")), # Ось можно взять из Action или Causality | |
"justification": f"Derived by action_causality_chain from {v1.get('id', '?')} and {v2.get('id', '?')}", | |
"derived": True | |
} | |
return conclusion, True # Шаг считаем валидным, если правило применилось | |
return None, False | |
# --- Логика для правила Action -> IsA --- | |
def _apply_action_isa_generalization(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: | |
"""Логика для правила action_isa_generalization.""" | |
if len(premises) != 2: | |
return None, False | |
v_action, v_isa = premises # Ожидаем Action, затем IsA | |
# Проверяем типы и связь: A -> B_inst (Action), B_inst IsA B_type | |
if v_action.get("type") == "Action" and \ | |
v_isa.get("type") == "IsA" and \ | |
v_action.get("target") == v_isa.get("source"): # Target(Action) == Source(IsA) | |
conclusion_type = self.inference_rules["action_isa_generalization"]["conclusion_type"] | |
source_a = v_action.get("source") | |
target_b_type = v_isa.get("target") # Берем тип из IsA | |
conclusion = { | |
"id": f"S{len(self.proof_cache) + 1}", | |
"source": source_a, | |
"target": target_b_type, | |
"type": conclusion_type, # Тип сохраняется как Action | |
"axis": v_action.get("axis"), # Ось берем из Action | |
"justification": f"Derived by action_isa_generalization from {v_action.get('id', '?')} and {v_isa.get('id', '?')}", | |
"derived": True | |
} | |
# print(f"DEBUG _apply_action_isa_generalization: Applied. Conclusion: {conclusion}") # Временный дебаг | |
return conclusion, True | |
# print(f"DEBUG _apply_action_isa_generalization: Rule not applicable. v_action type: {v_action.get('type')}, v_isa type: {v_isa.get('type')}, link: {v_action.get('target')} == {v_isa.get('source')}") # Временный дебаг | |
return None, False | |
def apply_inference_rule(self, rule_name: str, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: | |
"""Применяет правило вывода, возвращая (вывод, валидность_шага).""" | |
rule_functions = { | |
"chain_rule": self._apply_chain_rule, | |
"causality_transfer": self._apply_causality_transfer, | |
"implication_causality_chain": self._apply_implication_causality_chain, | |
"part_of_transitivity": self._apply_part_of_transitivity, | |
"action_causality_chain": self._apply_action_causality_chain, # Добавляем новое правило | |
"action_isa_generalization": self._apply_action_isa_generalization # Добавляем еще одно новое правило | |
} | |
conclusion, is_step_valid = None, False | |
if rule_name in rule_functions: | |
# Предполагаем, что в premises УЖЕ только валидные векторы | |
conclusion, is_step_valid = rule_functions[rule_name](premises) | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG apply_inference_rule: Rule='{rule_name}', Premises={[p.get('id', '?') for p in premises]}, Conclusion='{conclusion.get('id', None) if conclusion else None}', StepValid={is_step_valid}") | |
# --- END DEBUG PRINT --- | |
if conclusion: | |
# Кэшируем только структуру успешного вывода | |
self.proof_cache[conclusion["id"]] = conclusion | |
return conclusion, is_step_valid | |
def construct_proof(self, vectors_for_proof: List[Dict], source_concept: str, target_concept: str) -> Dict: | |
"""Построение доказательства от source_concept к target_concept. | |
Использует граф из предоставленных векторов (входных + временных IsA) | |
и динамически подгружает векторы из БД. | |
""" | |
# --- DEBUG PRINT --- | |
# print(f"\\nDEBUG construct_proof: Start. Query: {source_concept} -> {target_concept}") | |
# print(f"DEBUG construct_proof: Input vectors count: {len(vectors_for_proof)}") | |
# --- END DEBUG PRINT --- | |
# Убираем instance_definitions из параметров | |
valid_vectors_input = vectors_for_proof # Переименуем для консистентности с кодом ниже | |
if not valid_vectors_input and not self.db_conn.get_concept_by_name(source_concept): | |
# --- DEBUG PRINT --- | |
# print("DEBUG construct_proof: Failed - No input vectors and source concept not found in DB.") | |
# --- END DEBUG PRINT --- | |
return {"status": "Failed", "reason": "No input vectors and source concept not found in DB", "is_valid": False} | |
# --- Теперь строим граф и vector_map из ВСЕХ предоставленных векторов --- | |
input_graph = self._build_proof_graph(valid_vectors_input) | |
vector_map = {v["id"]: v for v in valid_vectors_input} | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Built input graph with {len(input_graph['nodes'])} nodes and {len(input_graph['edges'])} edges.") | |
# print(f"DEBUG construct_proof: Vector map keys: {list(vector_map.keys())}") | |
# --- MORE DEBUG --- | |
import pprint | |
# print(f"DEBUG construct_proof: Input Graph Adjacency:\n{pprint.pformat(input_graph.get('adjacency', {}))}") | |
# --- END MORE DEBUG --- | |
# --- END DEBUG PRINT --- | |
# --------------------------------------------------------------------- | |
# Ищем путь: сначала только входные, потом с БД | |
path_info = self.find_proof_path(input_graph, source_concept, target_concept) | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Path finding result: Status='{path_info.get('status')}', Path length={len(path_info.get('path', []))}") | |
# --- END DEBUG PRINT --- | |
if path_info.get("status") != "Path found": | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Failed - Path not found. Reason: {path_info.get('status', 'Unknown')}") | |
# --- END DEBUG PRINT --- | |
return {"status": "Failed", "reason": f"Path not found: {path_info.get('status', 'Unknown')}", "is_valid": False} | |
path = path_info["path"] | |
db_vectors_used = path_info.get("db_vectors_used", []) | |
# --- MORE DEBUG --- | |
# print(f"DEBUG construct_proof: Found Path: {path}") | |
# print(f"DEBUG construct_proof: DB Vectors Used: {[v.get('id') for v in db_vectors_used]}") | |
# --- END MORE DEBUG --- | |
# Добавляем векторы из БД в vector_map | |
for db_vec in db_vectors_used: | |
if db_vec["id"] not in vector_map: | |
vector_map[db_vec["id"]] = db_vec | |
# --- MORE DEBUG --- | |
# print(f"DEBUG construct_proof: Vector Map Contents:") | |
# for vid, vdata in vector_map.items(): | |
# print(f" - {vid}: Type={vdata.get('type')}, Source={vdata.get('source')}, Target={vdata.get('target')}") | |
# --- END MORE DEBUG --- | |
# --- Проверка на прямой путь --- | |
if len(path) == 1: | |
direct_vector_id = path[0][2] | |
direct_vector = vector_map.get(direct_vector_id) | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Path length is 1. Direct vector ID: {direct_vector_id}") | |
# --- END DEBUG PRINT --- | |
if direct_vector: | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Success - Direct proof found using vector {direct_vector_id}.") | |
# --- END DEBUG PRINT --- | |
return { | |
"status": "Success", | |
"source": source_concept, | |
"target": target_concept, | |
"steps": [], # Нет шагов для прямого доказательства | |
"rule": "direct", # Указываем, что это прямой путь | |
"direct_vector_id": direct_vector_id, | |
"is_valid": True, # Прямой путь считается валидным | |
"final_conclusion_type": direct_vector.get("type"), | |
"metadata": {} # Пока без метаданных о цикле здесь | |
} | |
else: | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Failed - Direct vector {direct_vector_id} not found in map.") | |
# --- END DEBUG PRINT --- | |
return {"status": "Failed", "reason": f"Direct vector {direct_vector_id} not found", "is_valid": False} | |
# --- Построение доказательства по шагам --- | |
steps = [] | |
current_premise = None # Будет содержать ВЕКТОР (словарь) | |
overall_validity = True # Валидность всего доказательства | |
cycle_warning = None | |
visited_nodes_in_proof = {source_concept} # Для обнаружения циклов во время ПОСТРОЕНИЯ | |
# --- DEBUG PRINT --- | |
# print("DEBUG construct_proof: Starting step-by-step construction...") | |
# --- END DEBUG PRINT --- | |
for i, (seg_source, seg_target, vector_id, origin) in enumerate(path): | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Processing segment {i+1}/{len(path)}: {seg_source} -> {seg_target} via {vector_id} (from {origin})") | |
# --- END DEBUG PRINT --- | |
# Защита от отсутствия вектора в карте (на всякий случай) | |
premise2 = vector_map.get(vector_id) | |
if not premise2: | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Failed - Vector {vector_id} for segment {i+1} not found in map.") | |
# --- END DEBUG PRINT --- | |
overall_validity = False | |
return {"status": "Failed", "reason": f"Vector {vector_id} not found during step construction", "is_valid": False} | |
premise2_source = origin # 'input' or 'db' | |
if current_premise is None: | |
current_premise = premise2 | |
source1 = premise2_source # Источник первой посылки - сам этот вектор | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Segment {i+1}: Initial premise set to {current_premise.get('id')}") | |
# --- MORE DEBUG --- | |
# print(f"DEBUG construct_proof: Segment {i+1}: Initial premise set to: ID={current_premise.get('id', 'NO_ID')}, Type={current_premise.get('type', 'NO_TYPE')}, Source={current_premise.get('source')}, Target={current_premise.get('target')}") | |
# --- END MORE DEBUG --- | |
# --- END DEBUG PRINT --- | |
else: | |
premises = [current_premise, premise2] | |
premise_ids = [p.get("id", "?") for p in premises] | |
source1 = "derived" if current_premise.get("derived") else current_premise.get("origin", "input") # Откуда первая посылка? | |
conclusion = None | |
is_step_valid = False | |
rule_name = None | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Segment {i+1}: Trying to apply rules. Premise1='{premises[0].get('id')}' ({source1}), Premise2='{premises[1].get('id')}' ({premise2_source})") | |
# --- MORE DEBUG --- | |
prem1_id = current_premise.get('id', 'NO_ID') | |
prem1_type = current_premise.get('type', 'NO_TYPE') | |
prem2_id = premise2.get('id', 'NO_ID') | |
prem2_type = premise2.get('type', 'NO_TYPE') | |
# print(f"DEBUG construct_proof: Applying rules. Premise1: ID={prem1_id}, Type={prem1_type} | Premise2: ID={prem2_id}, Type={prem2_type}") | |
# --- MORE DEBUG --- | |
# print(f" Premise1 Details: Source={current_premise.get('source')}, Target={current_premise.get('target')}") | |
# print(f" Premise2 Details: Source={premise2.get('source')}, Target={premise2.get('target')}") | |
# --- END MORE DEBUG --- | |
# --- END DEBUG PRINT --- | |
# Применяем подходящее правило | |
for key in self.inference_rules.keys(): | |
temp_conclusion, temp_valid = self.apply_inference_rule(key, premises) | |
if temp_conclusion: | |
conclusion = temp_conclusion | |
is_step_valid = temp_valid | |
rule_name = key | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Segment {i+1}: Applied rule '{rule_name}'. Conclusion='{conclusion.get('id')}', StepValid={is_step_valid}") | |
# --- END DEBUG PRINT --- | |
break # Нашли подходящее правило | |
if conclusion: | |
conclusion["origin"] = "derived" # Помечаем, что вывод получен | |
step_detail = { | |
"id": conclusion["id"], | |
"rule": rule_name, | |
"premises": premise_ids, | |
"conclusion": conclusion, | |
"is_valid": is_step_valid, | |
"premise1_source": source1, | |
"premise2_source": premise2_source, # 'input' or 'db' | |
# --- DEBUG PRINT --- | |
# "debug_premise1": current_premise, | |
# "debug_premise2": premise2 | |
# --- END DEBUG PRINT --- | |
} | |
steps.append(step_detail) | |
current_premise = conclusion # Результат этого шага становится первой посылкой для следующего | |
if not is_step_valid: | |
overall_validity = False | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Segment {i+1}: Step marked invalid, setting overall validity to False.") | |
# --- END DEBUG PRINT --- | |
# Можно прервать, если один шаг невалиден? Или достроить? Пока достраиваем. | |
# Проверка на цикл в построении | |
target_node = conclusion.get("target") | |
if target_node in visited_nodes_in_proof: | |
cycle_warning = f"Cycle detected during proof construction: revisiting node '{target_node}'" | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Segment {i+1}: {cycle_warning}") | |
# --- END DEBUG PRINT --- | |
else: | |
visited_nodes_in_proof.add(target_node) | |
else: | |
# Не смогли применить правило - доказательство невалидно | |
overall_validity = False | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Failed - No applicable rule found for premises {premise_ids} in segment {i+1}.") | |
# --- END DEBUG PRINT --- | |
return {"status": "Failed", "reason": f"No inference rule applicable for premises {premise_ids}", "is_valid": False} | |
# --- Финальное формирование результата --- | |
final_conclusion = current_premise # Последний вывод - это и есть результат | |
# Проверка, что финальный вывод соответствует запросу | |
if not final_conclusion or \ | |
final_conclusion.get("source") != source_concept or \ | |
final_conclusion.get("target") != target_concept: | |
# --- DEBUG PRINT --- | |
final_src = final_conclusion.get('source') if final_conclusion else 'None' | |
final_tgt = final_conclusion.get('target') if final_conclusion else 'None' | |
# print(f"DEBUG construct_proof: Failed - Final conclusion mismatch. Expected={source_concept}->{target_concept}, Got={final_src}->{final_tgt}") | |
# --- END DEBUG PRINT --- | |
overall_validity = False | |
# Статус все еще может быть Success, но is_valid = False? Или статус Failed? | |
# Сделаем статус Failed, если вывод не совпал. | |
return { | |
"status": "Failed", | |
"reason": f"Final conclusion mismatch: expected {source_concept}->{target_concept}, got {final_conclusion.get('source') if final_conclusion else 'N/A'}->{final_conclusion.get('target') if final_conclusion else 'N/A'}", | |
"is_valid": False, | |
"source": source_concept, | |
"target": target_concept, | |
"steps": steps, | |
"metadata": {"cycle_warning": cycle_warning} if cycle_warning else {} | |
} | |
final_result = { | |
"status": "Success", # Если дошли сюда, структура доказательства построена | |
"source": source_concept, | |
"target": target_concept, | |
"steps": steps, | |
"is_valid": overall_validity, # Валидность зависит от валидности всех шагов | |
"final_conclusion_type": final_conclusion.get("type"), | |
"metadata": {"cycle_warning": cycle_warning} if cycle_warning else {} | |
} | |
# --- DEBUG PRINT --- | |
# print(f"DEBUG construct_proof: Finished successfully.") | |
# print(f"DEBUG construct_proof: Final Result Status: {final_result['status']}") | |
# print(f"DEBUG construct_proof: Final Result IsValid: {final_result['is_valid']}") | |
# print(f"DEBUG construct_proof: Final Result Steps Count: {len(final_result['steps'])}") | |
# if final_result['steps']: | |
# for idx, step in enumerate(final_result['steps']): | |
# print(f" Step {idx+1} ({step['id']}): Rule='{step['rule']}', Premises={step['premises']}, Valid={step['is_valid']}, Conc={step['conclusion']['source']}->{step['conclusion']['target']}") | |
# print(f"DEBUG construct_proof: Final Conclusion Type: {final_result['final_conclusion_type']}") | |
# print(f"DEBUG construct_proof: Metadata: {final_result['metadata']}") | |
# --- END DEBUG PRINT --- | |
return final_result | |
def _build_proof_graph(self, vectors): | |
"""Вспомогательная функция для построения графа из векторов""" | |
graph = {"nodes": set(), "edges": [], "adjacency": {}} | |
for v in vectors: | |
source, target, v_id = v["source"], v["target"], v["id"] | |
graph["nodes"].add(source) | |
graph["nodes"].add(target) | |
graph["edges"].append((source, target, v_id)) | |
# Обновление списка смежности | |
if source not in graph["adjacency"]: | |
graph["adjacency"][source] = {"out": [], "in": []} | |
if target not in graph["adjacency"]: | |
graph["adjacency"][target] = {"out": [], "in": []} | |
graph["adjacency"][source]["out"].append((target, v_id)) | |
graph["adjacency"][target]["in"].append((source, v_id)) | |
return graph | |
if __name__ == "__main__": | |
print(f"SFOSR Integrated System v{SFOSR_CONFIG['version']}") | |
print("Готов к анализу смысловых структур.") | |
# Пример входных данных (используем концепты из БД для демонстрации) | |
example = { | |
"text": "Emergence leads to multi-level space, which causes cross-level causation, finally leading to regulatory flow.", | |
"vectors": [ | |
{ | |
"id": "V_EC_MLS", | |
"source": "emergent_complexity", | |
"target": "multi_level_space", | |
"type": "Implication", | |
"axis": "structure <-> hierarchy", | |
"justification": "Emergence creates levels" | |
}, | |
{ | |
"id": "V_MLS_CLC", | |
"source": "multi_level_space", | |
"target": "cross_level_causation", | |
"type": "Causality", # Тип Causality | |
"axis": "level <-> interaction", | |
"justification": "Levels imply cross-level effects" | |
}, | |
{ | |
"id": "V_CLC_RF", | |
"source": "cross_level_causation", | |
"target": "regulatory_flow", | |
"type": "Causality", # Тип Causality | |
"axis": "cause <-> regulation", | |
"justification": "Cross-level effects drive regulation" | |
} | |
], | |
"proof_query": { | |
"source": "multi_level_space", | |
"target": "regulatory_flow" | |
} | |
} | |
# Создаем и тестируем интегрированную систему | |
system = SFOSRSystem() | |
result = system.process(example) | |
# Выводим результаты (обновлено для бинарной валидности) | |
print("\n--- Результаты Обработки ---") | |
print(f"Статус: {result['status']}") | |
if result['status'] == 'Success': | |
print(f"Компилируемость: {result['analysis']['is_compilable']}") | |
print("\n--- Верификация ---") | |
print(f"Всего обработано векторов: {result['verification']['total_vectors_processed']}") | |
print(f"Валидных векторов: {result['verification']['valid_count']}") | |
print(f"Уровень соответствия: {result['verification']['compliance_rate'] * 100:.1f}%") | |
print("Данные по Векторам:") | |
for v_id, data in result['verification']['vectors_data'].items(): | |
valid_str = "Валиден" if data['is_valid'] else "Не валиден" | |
issues_str = ', '.join(data['issues']) if data['issues'] else 'Нет' | |
print(f" - {v_id}: Статус={valid_str}, Проблемы: {issues_str}") | |
if "proof" in result: | |
print("\n--- Доказательство ---") | |
proof = result['proof'] | |
print(f"Статус: {proof['status']}") | |
valid_proof_str = "Валидно" if proof.get('is_valid', False) else "Не валидно" | |
print(f"Общая Валидность: {valid_proof_str}") | |
if proof['status'] == 'Success': | |
print(f"Доказательство: {proof['source']} → {proof['target']}") | |
print(f"Тип финального вывода: {proof.get('final_conclusion_type', 'N/A')}") | |
print("Шаги доказательства:") | |
if proof['steps']: | |
for step in proof['steps']: | |
step_valid_str = "Валиден" if step['is_valid'] else "Не валиден" | |
print(f" - {step['id']}: Правило={step['rule']}, Посылки=({', '.join(step['premises'])}) | Статус={step_valid_str}") | |
if step['conclusion']: | |
print(f" Вывод: {step['conclusion']['source']} → {step['conclusion']['target']} ({step['conclusion']['type']})") | |
else: | |
print(f" Вывод: None (Ошибка: {step.get('reason', '')})") | |
else: | |
print(" (Нет шагов)") | |
elif 'reason' in proof: | |
print(f"Причина неудачи: {proof['reason']}") | |
elif 'details' in result and 'validation_issues' in result['details']: | |
print("\n--- Ошибки Анализа ---") | |
for issue in result['details']['validation_issues']: | |
print(f" - {issue}") |