SFOSR / sfosr_core /sfosr_system.py
DanielSwift's picture
Initial SFOSR system with Gradio interface
2249e80
raw
history blame
77.2 kB
"""
SFOSR Integrated System (with Binary Validity)
Этот модуль объединяет основные компоненты системы SFOSR:
- Анализатор структуры (`SFOSRAnalyzer`)
- Верификатор контрактов (`ContractVerifier`)
- Систему построения доказательств (`ProofSystem`)
для обеспечения комплексной обработки и формальной оценки
смысловых структур на основе бинарной валидности.
"""
import json
import subprocess
import os
from typing import Dict, List, Any, Tuple, Optional, Set, Union
from .sfosr_database import SFOSRDatabase # Use relative import within the package
# Конфигурация системы
SFOSR_CONFIG = {
"version": "0.4.0",
"description": "Integrated SFOSR System",
"components": ["analyzer", "verifier", "proof_system"],
"debug_mode": False,
"auto_update_plausibility": True # Автоматическое обновление plausibility
}
# Общие типы векторов
VECTOR_TYPES = {
"Causality": {"weight": 2.0, "requires_justification": True, "description": "Причинно-следственная связь"},
"Implication": {"weight": 1.8, "requires_justification": True, "description": "Логическое следование (если-то)"},
"Transformation": {"weight": 1.5, "requires_justification": False, "description": "Превращение из одного состояния в другое"},
"Goal": {"weight": 1.3, "requires_justification": False, "description": "Целеполагание, намерение"},
"Prevention": {"weight": 1.3, "requires_justification": False, "description": "Предотвращение нежелательного исхода"},
"Contrast": {"weight": 1.2, "requires_justification": False, "description": "Противопоставление"},
"Comparison": {"weight": 1.0, "requires_justification": False, "description": "Сравнение элементов"},
"Inclusion": {"weight": 0.8, "requires_justification": False, "description": "Отношение часть-целое"},
"Attribution": {"weight": 0.7, "requires_justification": False, "description": "Приписывание свойства объекту"},
"Temporal": {"weight": 1.1, "requires_justification": False, "description": "Временная последовательность"},
"Qualification": {"weight": 0.6, "requires_justification": False, "description": "Ограничение или уточнение"},
"Definition": {"weight": 1.4, "requires_justification": False, "description": "Определение понятия"},
"PartOf": {"constraints": [], "requires_justification": False},
"Mechanism": {"constraints": [], "requires_justification": True},
"Example": {"constraints": [], "requires_justification": False},
"Requirement": {"constraints": [], "requires_justification": True},
"Action": {"constraints": [], "requires_justification": False},
"Capability": {"constraints": [], "requires_justification": False},
"PropertyOf": {"constraints": [], "requires_justification": False},
"Purpose": {"constraints": [], "requires_justification": False},
"Governs": {"constraints": [], "requires_justification": False},
"Contains": {"constraints": [], "requires_justification": False},
"Represents": {"constraints": [], "requires_justification": False},
"Context": {"constraints": [], "requires_justification": False},
"IsA": {"constraints": [], "requires_justification": False},
# "ActsOn": {"constraints": [], "requires_justification": False}, # Removed as it's not used now
"Dependency": {"constraints": [], "requires_justification": True}
}
# Интерфейс для интеграции компонентов
class SFOSRSystem:
"""
Основной класс интегрированной системы SFOSR (на бинарной валидности)
Объединяет:
- Анализ структуры векторов
- Проверку контрактов и определение валидности
- Построение валидных доказательств
"""
def __init__(self, db_path="sfosr.db", debug=False):
self.db_path = db_path
self.db = SFOSRDatabase(db_path) # Database connection
self._analyzer = SFOSRAnalyzer()
# Prepare data for ContractVerifier
all_concepts = self.db.get_all_concepts()
known_concepts_names = {c['name'] for c in all_concepts}
concepts_data_map = {c['name']: c for c in all_concepts}
self._verifier = ContractVerifier(known_concepts=known_concepts_names, concepts_data=concepts_data_map)
self._proof_system = ProofSystem(db_conn=self.db) # Pass db connection
self.concept_graph = None
self.debug = debug
# Load inference rules from DB
try:
db_rules = self.db.get_inference_rules()
if db_rules:
self._proof_system.load_rules(db_rules)
except Exception as e:
if self.debug:
print(f"Error loading inference rules from database: {str(e)}")
# Графы
self.concept_graph = {}
def process(self, input_data):
"""
Основной метод обработки входных данных (с бинарной валидностью)
Последовательно выполняет:
1. Анализ структуры
2. Проверку контрактов (определение is_valid)
3. Построение доказательств (если применимо, определение is_valid)
(Использует только валидированные векторы из input_data, без обогащения из БД)
Args:
input_data: Словарь с текстом и векторами SFOSR
Returns:
Dict: Результаты обработки (с полями is_valid)
"""
# Шаг 1: Анализ структуры
analysis_result = self._analyzer.analyze(input_data)
self.concept_graph = analysis_result["concept_graph"]
# Если анализ не прошел
if analysis_result["analysis_status"] != "Completed":
return {
"status": "Error",
"message": f"Analysis failed: {analysis_result['analysis_status']}",
"details": {
"validation_issues": analysis_result["validation_issues"]
}
}
# Получаем инстансы из контекста, если они есть
instance_definitions = input_data.get("instance_definitions", {})
# Шаг 2: Проверка контрактов
vectors_to_verify = analysis_result["vectors_analyzed"]
# Передаем инстансы в верификатор
verification_result = self._verifier.verify_all(vectors_to_verify, instance_definitions)
# Собираем только валидные векторы для системы доказательств
valid_input_vectors = []
vectors_data = verification_result["vectors_data"]
for vector in vectors_to_verify:
v_id = vector.get("id")
# Используем get для безопасного доступа и проверяем наличие ключа 'vector'
vector_dict = vectors_data.get(v_id, {}).get('vector')
if vector_dict and vectors_data[v_id].get("is_valid", False):
valid_input_vectors.append(vector_dict) # Добавляем исходный вектор
# --- Генерация временных IsA векторов ---
temporary_isa_vectors = []
for instance_id, definition in instance_definitions.items():
general_type = definition.get('is_a')
instance_label = definition.get('label', instance_id) # Use label or ID
if general_type:
# Проверяем, существует ли общий тип в БД
if self.db.get_concept_by_name(general_type):
temporary_isa_vectors.append({
"id": f"isa_{instance_id}", # Уникальный временный ID
"source": instance_id, # Используем временный ID
"target": general_type, # Ссылка на общий тип в БД
"type": "IsA",
"axis": "classification",
"justification": f"Instance '{instance_label}' defined as type '{general_type}' in input context.",
"is_valid": True # Считаем эти связи априори валидными для доказательства
})
else:
print(f"Warning: General type '{general_type}' for instance '{instance_id}' not found in DB. Skipping IsA vector generation.")
# ----------------------------------------
# Базовый результат обработки
result = {
"status": "Success",
"input_text": input_data.get("text", ""),
"analysis": {
"status": analysis_result["analysis_status"],
"is_compilable": analysis_result["is_compilable"],
"graph_metrics": analysis_result["graph_metrics"]
},
"verification": {
"total_vectors": verification_result["total_vectors_processed"],
"valid_count": verification_result["valid_count"],
"compliance_rate": verification_result["compliance_rate"],
"vectors_data": verification_result["vectors_data"]
}
}
# Шаг 3: Построение доказательств
vectors_for_proof = valid_input_vectors + temporary_isa_vectors
# Запускаем, если есть запрос И есть ХОТЬ КАКИЕ-ТО векторы (входные или IsA)
if "proof_query" in input_data and vectors_for_proof:
query = input_data["proof_query"]
source = query.get("source")
target = query.get("target")
if source and target:
proof_result = self._proof_system.construct_proof(
vectors_for_proof, source, target
)
result["proof"] = proof_result
else: # Нет source/target
result["proof"] = {"status": "Failed", "reason": "Missing source or target in proof query", "is_valid": False}
# else: # Нет proof_query или нет векторов - proof не создается
# pass
return result
def analyze(self, input_data):
"""Удобный метод для выполнения только анализа"""
return self._analyzer.analyze(input_data)
def verify(self, input_data):
"""Удобный метод для выполнения только верификации"""
vectors = input_data.get("vectors", [])
# Сначала базовый анализ для получения структурно валидных векторов
analysis_result = self._analyzer.analyze(input_data)
# Передаем пустой словарь instance_definitions, т.к. verify не работает с контекстом
return self._verifier.verify_all(analysis_result["vectors_analyzed"], instance_definitions={})
def prove(self, input_data, source, target):
"""Удобный метод для построения доказательства.
Анализирует, верифицирует и строит доказательство, используя только
валидированные векторы из input_data (без обогащения из БД).
"""
# Сначала анализ
analysis_res = self.analyze(input_data)
if analysis_res["analysis_status"] != "Completed":
return {"status": "Failed", "reason": "Analysis failed"}
# Получаем список векторов, прошедших анализ
vectors_analyzed = analysis_res.get("vectors_analyzed", [])
if not vectors_analyzed:
return {"status": "Failed", "reason": "No vectors passed analysis"}
# Затем верификация этих векторов
# Передаем пустой instance_definitions, т.к. prove работает с готовым input_data
# Хотя, возможно, стоило бы передавать реальный instance_definitions из input_data?
# Пока оставим пустым для совместимости.
verification_res = self._verifier.verify_all(vectors_analyzed, instance_definitions={})
vectors_data = verification_res.get("vectors_data", {})
# Извлекаем валидные векторы, ИТЕРРИРУЯ ПО ИСХОДНОМУ СПИСКУ
valid_vectors = [
vector # Берем исходный вектор
for vector in vectors_analyzed # Итерируем по результатам анализа
if vectors_data.get(vector.get("id", ""), {}).get("is_valid", False) # Проверяем валидность в результатах верификации
]
if not valid_vectors:
return {"status": "Failed", "reason": "No valid vectors after verification"}
# Enrichment is disabled, use valid_vectors directly
vectors_for_proof = valid_vectors
return self._proof_system.construct_proof(vectors_for_proof, source, target)
def get_concept_info(self, concept_name):
"""
Получение информации о концепте из БД
Args:
concept_name: Имя концепта
Returns:
Dict: Информация о концепте или None
"""
return self.db.get_complete_concept_info(concept_name)
def find_related_concepts(self, concept_name, depth=1):
"""
Поиск связанных концептов
Args:
concept_name: Имя концепта
depth: Глубина поиска
Returns:
List: Список связанных концептов
"""
concept = self.db.get_concept_by_name(concept_name)
if not concept:
return []
return self.db.get_related_concepts(concept["id"], depth)
def add_concept_to_db(self, name, description, domain, level):
"""
Добавление нового концепта в БД
Args:
name: Имя концепта
description: Описание
domain: Домен (область знаний)
level: Уровень абстракции
Returns:
int: ID добавленного концепта
"""
return self.db.add_concept(name, description, domain, level)
def add_vector_to_db(self, source_name, target_name, vector_type, axis, justification=None):
"""
Добавление нового вектора в БД
Args:
source_name: Имя исходного концепта
target_name: Имя целевого концепта
vector_type: Тип вектора
axis: Ось
justification: Обоснование
Returns:
int: ID добавленного вектора или None в случае ошибки
"""
source = self.db.get_concept_by_name(source_name)
target = self.db.get_concept_by_name(target_name)
if not source or not target:
return None
return self.db.add_vector(source["id"], target["id"], vector_type, axis, justification)
# Реализация компонентов системы
class SFOSRAnalyzer:
"""
Анализатор структуры векторов SFOSR
Отвечает за:
- Проверку синтаксиса и базовой структуры векторов
- Проверку компилируемости (наличие необходимых полей)
- Построение графа концептов
"""
def __init__(self, vector_types=None):
"""Инициализация анализатора"""
self.vector_types = vector_types or VECTOR_TYPES
def build_concept_graph(self, vectors):
"""
Строит граф концептов и связей между ними
Args:
vectors: Список векторов SFOSR
Returns:
Dict: Структура графа с узлами и связями
"""
# Структура для хранения графа
graph = {
"nodes": set(), # уникальные концепты
"edges": [], # связи (кортежи source, target, vector_id)
"adjacency": {}, # словарь смежности для быстрого доступа
}
# Собираем все уникальные концепты и ребра
all_nodes = set()
for vector in vectors:
source = vector.get("source")
target = vector.get("target")
vector_id = vector.get("id")
if source:
all_nodes.add(source)
if source not in graph["adjacency"]:
graph["adjacency"][source] = {"out": [], "in": []}
if target:
all_nodes.add(target)
if target not in graph["adjacency"]:
graph["adjacency"][target] = {"out": [], "in": []}
if source and target and vector_id:
edge = (source, target, vector_id)
graph["edges"].append(edge)
graph["adjacency"][source]["out"].append((target, vector_id))
graph["adjacency"][target]["in"].append((source, vector_id))
graph["nodes"] = all_nodes
return graph
def validate_vector_structure(self, vector):
"""
Проверяет структуру вектора на соответствие базовым требованиям
Args:
vector: Словарь с данными вектора
Returns:
Tuple[bool, Optional[str]]: (валидность, сообщение об ошибке)
"""
required_keys = ["id", "source", "target", "type", "axis"]
missing_keys = [key for key in required_keys if key not in vector or not vector[key]]
if missing_keys:
return False, f"Vector {vector.get('id', 'Unknown')} missing keys: {', '.join(missing_keys)}"
# Проверяем, существует ли указанный тип вектора
vector_type = vector.get("type")
if vector_type not in self.vector_types:
return False, f"Vector {vector.get('id', 'Unknown')} has invalid type: {vector_type}"
return True, None
def validate_compilability(self, vector):
"""
Проверяет на компилируемость (достаточность данных)
Args:
vector: Словарь с данными вектора
Returns:
Tuple[bool, Optional[str]]: (компилируемость, сообщение об ошибке)
"""
vector_type = vector.get("type")
# Проверяем требования обоснования в зависимости от типа
if (vector_type in self.vector_types and
self.vector_types[vector_type]["requires_justification"]):
if not vector.get("justification"):
return False, f"Vector {vector.get('id', 'Unknown')} requires justification for type {vector_type}."
return True, None
def analyze(self, input_data):
"""
Главная функция анализа структуры SFOSR (упрощенная)
Args:
input_data: Словарь с текстом и векторами
Returns:
Dict: Результаты анализа (валидация и граф)
"""
input_text = input_data.get("text", "N/A")
vectors = input_data.get("vectors", [])
valid_vectors = []
validation_issues = []
analysis_status = "Completed"
# 1. Валидация структуры и компилируемости каждого вектора
for vector in vectors:
is_struct_valid, struct_error = self.validate_vector_structure(vector)
if not is_struct_valid:
validation_issues.append(struct_error)
analysis_status = "Validation Error"
continue # Невалидную структуру дальше не проверяем
is_comp_valid, comp_error = self.validate_compilability(vector)
if not is_comp_valid:
validation_issues.append(comp_error)
# Продолжаем анализ, но помечаем проблему
# Собираем только структурно валидные векторы
valid_vectors.append(vector)
# Определяем компилируемость по наличию проблем
is_compilable = len(validation_issues) == 0
if not is_compilable and analysis_status == "Completed":
analysis_status = "Compilability Error" # Если были только проблемы компилируемости
# 2. Построение графа концептов (только из структурно валидных векторов)
graph = self.build_concept_graph(valid_vectors)
# 3. Формирование упрощенного результата
result = {
"input_text": input_text,
"analysis_status": analysis_status,
"is_compilable": is_compilable,
"validation_issues": validation_issues,
"graph_metrics": { # Упрощенные метрики графа
"concepts_count": len(graph["nodes"]),
"connections_count": len(graph["edges"]),
},
"vectors_analyzed": valid_vectors, # Содержит только структурно валидные
"concept_graph": graph
}
return result
class ContractVerifier:
"""
Верификатор контрактов векторов SFOSR
Проверяет соответствие векторов формальным контрактам,
определяет бинарную валидность (`is_valid`) вектора
и собирает метаданные.
"""
def __init__(self, contract_types=None, known_concepts: Optional[Set[str]] = None, concepts_data: Optional[Dict[str, Dict]] = None):
"""Инициализация верификатора с известными концептами и их данными (уровнями)."""
self.contract_types = contract_types or set(VECTOR_TYPES.keys())
self.known_concepts = known_concepts or set()
# --- Сохраняем данные об уровнях ---
self.concepts_data = concepts_data or {}
# ----------------------------------
self.axis_registry = set()
def verify_vector_contract(self, vector: Dict[str, Any], instance_definitions: Dict[str, Dict]) -> Tuple[bool, List[str], Dict[str, Any]]:
"""Проверяет отдельный вектор на соответствие контрактам"""
issues = []
metadata = {}
is_valid = True # Начинаем с предположения о валидности
# --- Проверка существования концептов и их типов ---
source_name = vector.get("source")
target_name = vector.get("target")
vector_type = vector.get("type")
vector_id = vector.get("id", "Unknown")
# --- Получаем реальные ТИПЫ концептов для проверки ---
source_type_name = source_name
target_type_name = target_name
is_source_instance = False
is_target_instance = False
if source_name in instance_definitions:
source_type_name = instance_definitions[source_name].get("is_a")
is_source_instance = True
if not source_type_name:
issues.append(f"Instance '{source_name}' in vector {vector_id} has no 'is_a' type defined in context.")
is_valid = False
source_type_name = None # Не можем проверить дальше
if target_name in instance_definitions:
target_type_name = instance_definitions[target_name].get("is_a")
is_target_instance = True
if not target_type_name:
issues.append(f"Instance '{target_name}' in vector {vector_id} has no 'is_a' type defined in context.")
is_valid = False
target_type_name = None # Не можем проверить дальше
# ----------------------------------------------------
source_concept_data = None
target_concept_data = None
if source_type_name and is_valid:
source_concept_data = self.concepts_data.get(source_type_name)
if not source_concept_data:
issues.append(f"Source concept/type '{source_type_name}' (for '{source_name}') not found in known concepts for vector {vector_id}.")
is_valid = False
if target_type_name and is_valid:
target_concept_data = self.concepts_data.get(target_type_name)
if not target_concept_data:
issues.append(f"Target concept/type '{target_type_name}' (for '{target_name}') not found in known concepts for vector {vector_id}.")
is_valid = False
# --- Проверка контрактов типа Transformation ---
if is_valid and vector_type == "Transformation":
if source_name == target_name:
issues.append(f"Transformation vector {vector_id} cannot have the same source and target ('{source_name}').")
is_valid = False
# --- Проверка контракта для Causality (разные уровни) ---
if is_valid and vector_type == "Causality" and "level" in vector.get("axis", ""):
if source_concept_data and target_concept_data:
source_level = source_concept_data.get('level')
target_level = target_concept_data.get('level')
if source_level and target_level and source_level == target_level:
issues.append(f"Causality vector {vector_id} ('{source_type_name}' -> '{target_type_name}') links concepts on the same level '{source_level}' with axis containing 'level'.")
is_valid = False
# --- Добавить другие специфичные для типов векторов проверки ---
# Например, для ActsOn: source должен быть подтипом Action, target - подтипом Object?
# Это потребует иерархии в БД или более сложной логики.
# Регистрация осей остается
if vector.get("axis") and vector["axis"] not in self.axis_registry:
self.axis_registry.add(vector["axis"])
# Добавляем сами данные вектора в метаданные для использования в `prove`
# metadata['vector'] = vector # Убрали - теперь prove получает исходный список
return is_valid, issues, metadata
def verify_all(self, vectors: List[Dict[str, Any]], instance_definitions: Dict[str, Dict]) -> Dict[str, Any]:
"""Проверка всех векторов, агрегация валидности и метаданных"""
vectors_data = {}
valid_count = 0
processed_count = 0
for vector in vectors:
processed_count += 1
vector_id = vector.get("id", f"unknown_{processed_count}")
# Передаем instance_definitions в проверку контракта
is_valid, issues, metadata = self.verify_vector_contract(vector, instance_definitions)
vectors_data[vector_id] = {
"vector": vector,
"is_valid": is_valid,
"issues": issues,
"metadata": metadata
}
if is_valid:
valid_count += 1
# Формируем отчет
report = {
"total_vectors_processed": processed_count,
"valid_count": valid_count,
"compliance_rate": round(valid_count / processed_count, 3) if processed_count > 0 else 0.0,
"vectors_data": vectors_data # Основные данные теперь здесь
}
return report
class ProofSystem:
"""
Система построения доказательств SFOSR
Отвечает за:
- Построение доказательств на основе ВАЛИДНЫХ векторов (и данных из БД)
- Проверку итоговой валидности (`is_valid`) доказательств
- Поиск путей доказательства между концептами (с использованием БД)
"""
def __init__(self, db_conn):
"""Инициализация системы доказательств.
Args:
db_conn: Экземпляр SFOSRDatabase для доступа к БД.
"""
self.db_conn = db_conn # Store the database connection
# Базовые правила вывода (с бинарной валидностью)
self.inference_rules = {
"chain_rule": {
"pattern": "A → B, B → C ⊢ A → C",
"premise_types": ["Implication", "Implication"],
"conclusion_type": "Implication",
"domain": "logical_inference"
},
"causality_transfer": {
"pattern": "A → B (Causality), B → C (Causality) ⊢ A → C (Causality)",
"premise_types": ["Causality", "Causality"],
"conclusion_type": "Causality",
"domain": "causal_inference"
},
"implication_causality_chain": {
"pattern": "A → B (Implication), B → C (Causality) ⊢ A → C (Causality)",
"premise_types": ["Implication", "Causality"],
"conclusion_type": "Causality",
"domain": "mixed_inference"
},
# --- New Rule ---
"part_of_transitivity": {
"pattern": "A PartOf B, B PartOf C ⊢ A PartOf C",
"premise_types": ["PartOf", "PartOf"],
"conclusion_type": "PartOf",
"domain": "mereology"
},
# --- НОВОЕ ПРАВИЛО ---
"action_causality_chain": {
"pattern": "A -> B (Action), B -> C (Causality) |- A -> C (Causality)",
"premise_types": ["Action", "Causality"],
"conclusion_type": "Causality",
"domain": "action_inference"
},
# --- ЕЩЕ ОДНО НОВОЕ ПРАВИЛО ---
"action_isa_generalization": {
"pattern": "A -> B_inst (Action), B_inst IsA B_type |- A -> B_type (Action)",
"premise_types": ["Action", "IsA"],
"conclusion_type": "Action", # Результат - обобщенное действие
"domain": "inheritance_inference"
}
}
# Кэш для хранения построенных доказательств (только структура вывода)
self.proof_cache = {}
def load_rules(self, db_rules):
"""
Загрузка правил вывода из БД (игнорируя любые старые данные plausibility)
Args:
db_rules: Словарь с правилами вывода из БД
"""
for name, rule_data in db_rules.items():
rule_data.pop('plausibility', None) # Убеждаемся, что plausibility удалено
self.inference_rules[name] = rule_data
# --- Helper for Input-Only BFS ---
def _find_path_using_input_graph(self, input_graph, source_concept, target_concept) -> Dict[str, Any]:
"""BFS using only the input graph."""
# print(f"DEBUG _find_path_using_input_graph: Start {source_concept} -> {target_concept}") # UNCOMMENTED
if source_concept not in input_graph["nodes"]:
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Source '{source_concept}' not in input graph nodes: {input_graph['nodes']}")
# --- END DEBUG PRINT ---
return {"status": "Source node not found"}
visited = {source_concept}
queue: List[Tuple[str, List[Tuple[str, str, str, str]]]] = [(source_concept, [])]
while queue:
current_concept, path = queue.pop(0)
# print(f"DEBUG _find_path_using_input_graph: Dequeue '{current_concept}'") # UNCOMMENTED
if current_concept in input_graph["adjacency"]:
for next_concept_input, vector_id_input in input_graph["adjacency"][current_concept].get("out", []):
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Input BFS): Edge {current_concept} -> {next_concept_input} via {vector_id_input}") # UNCOMMENTED
# --- END DEBUG PRINT ---
if next_concept_input == target_concept:
final_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')]
# print(f"DEBUG _find_path_using_input_graph: Target reached. Path: {final_path}") # UNCOMMENTED
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Input BFS): Target '{target_concept}' reached. Path: {final_path}")
# --- END DEBUG PRINT ---
return {"status": "Path found", "path": final_path, "db_vectors_used": []}
if next_concept_input not in visited:
visited.add(next_concept_input)
new_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')]
queue.append((next_concept_input, new_path))
# print(f"DEBUG _find_path_using_input_graph: Enqueue '{next_concept_input}'") # UNCOMMENTED
# print(f"DEBUG _find_path_using_input_graph: Path not found.") # UNCOMMENTED
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Input BFS): Path not found from '{source_concept}' to '{target_concept}'")
# --- END DEBUG PRINT ---
return {"status": "Path not found (input only)"}
# --- Helper for Combined BFS ---
def _find_path_using_combined_graph(self, input_graph, source_concept, target_concept) -> Dict[str, Any]:
"""BFS using input graph AND database lookups."""
# print(f"\\nDEBUG _find_path_using_combined_graph: Start {source_concept} -> {target_concept}")
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): Start {source_concept} -> {target_concept}")
# --- END DEBUG PRINT ---
if source_concept not in input_graph["nodes"]:
if not self.db_conn.get_concept_by_name(source_concept):
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): Source '{source_concept}' not in input graph or DB.")
# --- END DEBUG PRINT ---
return {"status": "Source node not found"}
# visited теперь словарь: {concept_name: origin ('input' или 'db')}
visited: Dict[str, str] = {source_concept: 'start'}
queue: List[Tuple[str, List[Tuple[str, str, str, str]], Set[int]]] = [(source_concept, [], set())]
used_db_vector_ids = set()
db_vector_cache = {}
while queue:
current_concept, path, current_used_db_ids = queue.pop(0)
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): Dequeue '{current_concept}'")
# --- END DEBUG PRINT ---
# --- Шаг 1: Входной граф ---
if current_concept in input_graph["adjacency"]:
for next_concept_input, vector_id_input in input_graph["adjacency"][current_concept].get("out", []):
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): Input Edge {current_concept} -> {next_concept_input} via {vector_id_input}")
# --- END DEBUG PRINT ---
if next_concept_input == target_concept:
final_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')]
final_db_vectors_list = [ self.db_conn.convert_db_vector_to_system_format(db_vector_cache[vid]) for vid in current_used_db_ids if vid in db_vector_cache ]
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): Target '{target_concept}' reached via input edge. Path: {final_path}")
# --- END DEBUG PRINT ---
return {"status": "Path found", "path": final_path, "db_vectors_used": final_db_vectors_list}
if next_concept_input not in visited:
visited[next_concept_input] = 'input' # Помечаем как посещенный через input
new_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')]
queue.append((next_concept_input, new_path, current_used_db_ids))
# --- Шаг 2: База Данных ---
try:
current_concept_info = self.db_conn.get_concept_by_name(current_concept)
if not current_concept_info:
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): Concept '{current_concept}' not found in DB for DB search.")
# --- END DEBUG PRINT ---
continue
current_concept_id = current_concept_info['id']
# --- MORE DEBUG ---
# print(f"DEBUG _find_path_using_combined_graph: Querying DB vectors for concept '{current_concept}' (ID: {current_concept_id})")
# --- END MORE DEBUG ---
db_vectors_raw = self.db_conn.get_vectors_for_concept(current_concept_id)
# --- MORE DEBUG ---
# print(f"DEBUG _find_path_using_combined_graph: Received {len(db_vectors_raw)} vectors from DB for ID {current_concept_id}:")
# for dbv in db_vectors_raw:
# print(f" - ID: V{dbv.get('id')}, Type: {dbv.get('vector_type')}, Source: {dbv.get('source_name')}, Target: {dbv.get('target_name')}")
# --- END MORE DEBUG ---
for db_vector in db_vectors_raw:
if db_vector['source_id'] == current_concept_id:
next_concept_db = db_vector['target_name']
db_vector_actual_id = db_vector['id']
db_vector_system_id = f"V{db_vector_actual_id}"
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): DB Edge {current_concept} -> {next_concept_db} via {db_vector_system_id}")
# --- END DEBUG PRINT ---
if db_vector_actual_id not in db_vector_cache:
db_vector_cache[db_vector_actual_id] = db_vector
new_used_db_ids = current_used_db_ids.union({db_vector_actual_id})
if next_concept_db == target_concept:
final_path = path + [(current_concept, next_concept_db, db_vector_system_id, 'db')]
final_db_vectors_list = [ self.db_conn.convert_db_vector_to_system_format(db_vector_cache[vid]) for vid in new_used_db_ids if vid in db_vector_cache ]
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof (Combined BFS): Target '{target_concept}' reached via DB edge. Path: {final_path}")
# --- END DEBUG PRINT ---
return {"status": "Path found", "path": final_path, "db_vectors_used": final_db_vectors_list}
# Проверяем, был ли узел посещен и откуда
current_visit_status = visited.get(next_concept_db)
# Добавляем в очередь, ТОЛЬКО если не посещен через input
if current_visit_status != 'input':
# Если еще не посещался или посещался через db, обновляем/добавляем
if current_visit_status is None or current_visit_status == 'db':
visited[next_concept_db] = 'db' # Помечаем как посещенный через db
new_path = path + [(current_concept, next_concept_db, db_vector_system_id, 'db')]
queue.append((next_concept_db, new_path, new_used_db_ids))
except Exception as e:
print(f"DB Error during path finding in combined search: {e}")
return {"status": "DB error", "reason": str(e)}
return {"status": "Path not found (combined)"}
# --- Orchestrator Method ---
def find_proof_path(self, input_graph, source_concept, target_concept) -> Dict[str, Any]:
"""
Ищет путь доказательства: сначала только по входным данным, затем с БД.
Args:
input_graph: Граф, построенный ТОЛЬКО из валидных входных векторов.
source_concept: Имя исходного концепта.
target_concept: Имя целевого концепта.
Returns:
Dict: Результат поиска пути (статус, путь, db_vectors_used).
"""
# Phase 1: Input vectors only
# print("DEBUG find_proof_path: Starting Phase 1 (Input Only)")
input_path_info = self._find_path_using_input_graph(input_graph, source_concept, target_concept)
if input_path_info["status"] == "Path found":
# print("DEBUG find_proof_path: Path found in Phase 1. Returning.")
return input_path_info
# Phase 2: Combined input and DB vectors
# print("DEBUG find_proof_path: Path not found in Phase 1. Starting Phase 2 (Combined Input+DB)")
combined_path_info = self._find_path_using_combined_graph(input_graph, source_concept, target_concept)
if combined_path_info["status"] == "Path not found (combined)":
combined_path_info["status"] = "Path not found"
# print(f"DEBUG find_proof_path: Phase 2 finished with status: {combined_path_info['status']}")
return combined_path_info
def _apply_chain_rule(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]:
"""Логика для правила chain_rule. Возвращает (вывод, валидность_шага)."""
if len(premises) != 2:
return None, False
v1, v2 = premises
# Проверяем соответствие паттерну правила
if v1["target"] == v2["source"] and \
v1["type"] == "Implication" and \
v2["type"] == "Implication": # Вторая посылка должна быть Implication
# Определяем тип вывода (просто берем из правила)
conclusion_type = self.inference_rules["chain_rule"]["conclusion_type"]
# Формируем вывод
conclusion = {
"id": f"S{len(self.proof_cache) + 1}", # Генерируем ID для шага
"source": v1["source"],
"target": v2["target"],
"type": conclusion_type,
"axis": v1["axis"], # Берем ось из первой посылки (можно уточнить)
"justification": f"Derived by chain_rule from {v1['id']} and {v2['id']}",
"derived": True # Помечаем, что вектор выведен
}
return conclusion, True # Шаг валиден
return None, False # Правило неприменимо
def _apply_causality_transfer(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]:
"""Логика для правила causality_transfer. Возвращает (вывод, валидность_шага)."""
if len(premises) != 2:
return None, False
v1, v2 = premises
# Проверяем соответствие паттерну правила
if v1["target"] == v2["source"] and \
v1["type"] == "Causality" and \
v2["type"] == "Causality": # Вторая посылка должна быть Causality
conclusion_type = self.inference_rules["causality_transfer"]["conclusion_type"]
conclusion = {
"id": f"S{len(self.proof_cache) + 1}",
"source": v1["source"],
"target": v2["target"],
"type": conclusion_type,
"axis": v1["axis"],
"justification": f"Derived by causality_transfer from {v1['id']} and {v2['id']}",
"derived": True
}
return conclusion, True
return None, False
def _apply_implication_causality_chain(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]:
"""Логика для правила implication_causality_chain. Возвращает (вывод, валидность_шага)."""
if len(premises) != 2:
return None, False
v1, v2 = premises
# Проверяем соответствие паттерну
if v1["target"] == v2["source"] and \
v1["type"] == "Implication" and \
v2["type"] == "Causality":
conclusion_type = self.inference_rules["implication_causality_chain"]["conclusion_type"]
conclusion = {
"id": f"S{len(self.proof_cache) + 1}",
"source": v1["source"],
"target": v2["target"],
"type": conclusion_type,
"axis": v1["axis"], # Берем ось из первой посылки
"justification": f"Derived by implication_causality_chain from {v1['id']} and {v2['id']}",
"derived": True
}
return conclusion, True
return None, False
# --- New Method for PartOf Rule ---
def _apply_part_of_transitivity(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]:
"""Логика для правила part_of_transitivity."""
if len(premises) != 2:
return None, False
v1, v2 = premises
# Проверяем типы посылок и связь
if v1["target"] == v2["source"] and \
v1.get("type") == "PartOf" and \
v2.get("type") == "PartOf":
conclusion_type = self.inference_rules["part_of_transitivity"]["conclusion_type"]
conclusion = {
"id": f"S{len(self.proof_cache) + 1}",
"source": v1["source"],
"target": v2["target"],
"type": conclusion_type,
"axis": v1.get("axis", "partonomy"), # Use axis from v1 or default
"justification": f"Derived by part_of_transitivity from {v1.get('id', '?')} and {v2.get('id', '?')}",
"derived": True
}
return conclusion, True
return None, False
# --- Новая логика для правила Action -> Causality ---
def _apply_action_causality_chain(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]:
"""Логика для правила action_causality_chain. Возвращает (вывод, валидность_шага)."""
if len(premises) != 2:
return None, False
v1, v2 = premises
# Проверяем соответствие паттерну правила: A->B (Action), B->C (Causality)
if v1["target"] == v2["source"] and \
v1.get("type") == "Action" and \
v2.get("type") == "Causality":
conclusion_type = self.inference_rules["action_causality_chain"]["conclusion_type"]
conclusion = {
"id": f"S{len(self.proof_cache) + 1}",
"source": v1["source"],
"target": v2["target"],
"type": conclusion_type,
"axis": v1.get("axis", v2.get("axis")), # Ось можно взять из Action или Causality
"justification": f"Derived by action_causality_chain from {v1.get('id', '?')} and {v2.get('id', '?')}",
"derived": True
}
return conclusion, True # Шаг считаем валидным, если правило применилось
return None, False
# --- Логика для правила Action -> IsA ---
def _apply_action_isa_generalization(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]:
"""Логика для правила action_isa_generalization."""
if len(premises) != 2:
return None, False
v_action, v_isa = premises # Ожидаем Action, затем IsA
# Проверяем типы и связь: A -> B_inst (Action), B_inst IsA B_type
if v_action.get("type") == "Action" and \
v_isa.get("type") == "IsA" and \
v_action.get("target") == v_isa.get("source"): # Target(Action) == Source(IsA)
conclusion_type = self.inference_rules["action_isa_generalization"]["conclusion_type"]
source_a = v_action.get("source")
target_b_type = v_isa.get("target") # Берем тип из IsA
conclusion = {
"id": f"S{len(self.proof_cache) + 1}",
"source": source_a,
"target": target_b_type,
"type": conclusion_type, # Тип сохраняется как Action
"axis": v_action.get("axis"), # Ось берем из Action
"justification": f"Derived by action_isa_generalization from {v_action.get('id', '?')} and {v_isa.get('id', '?')}",
"derived": True
}
# print(f"DEBUG _apply_action_isa_generalization: Applied. Conclusion: {conclusion}") # Временный дебаг
return conclusion, True
# print(f"DEBUG _apply_action_isa_generalization: Rule not applicable. v_action type: {v_action.get('type')}, v_isa type: {v_isa.get('type')}, link: {v_action.get('target')} == {v_isa.get('source')}") # Временный дебаг
return None, False
def apply_inference_rule(self, rule_name: str, premises: List[Dict]) -> Tuple[Optional[Dict], bool]:
"""Применяет правило вывода, возвращая (вывод, валидность_шага)."""
rule_functions = {
"chain_rule": self._apply_chain_rule,
"causality_transfer": self._apply_causality_transfer,
"implication_causality_chain": self._apply_implication_causality_chain,
"part_of_transitivity": self._apply_part_of_transitivity,
"action_causality_chain": self._apply_action_causality_chain, # Добавляем новое правило
"action_isa_generalization": self._apply_action_isa_generalization # Добавляем еще одно новое правило
}
conclusion, is_step_valid = None, False
if rule_name in rule_functions:
# Предполагаем, что в premises УЖЕ только валидные векторы
conclusion, is_step_valid = rule_functions[rule_name](premises)
# --- DEBUG PRINT ---
# print(f"DEBUG apply_inference_rule: Rule='{rule_name}', Premises={[p.get('id', '?') for p in premises]}, Conclusion='{conclusion.get('id', None) if conclusion else None}', StepValid={is_step_valid}")
# --- END DEBUG PRINT ---
if conclusion:
# Кэшируем только структуру успешного вывода
self.proof_cache[conclusion["id"]] = conclusion
return conclusion, is_step_valid
def construct_proof(self, vectors_for_proof: List[Dict], source_concept: str, target_concept: str) -> Dict:
"""Построение доказательства от source_concept к target_concept.
Использует граф из предоставленных векторов (входных + временных IsA)
и динамически подгружает векторы из БД.
"""
# --- DEBUG PRINT ---
# print(f"\\nDEBUG construct_proof: Start. Query: {source_concept} -> {target_concept}")
# print(f"DEBUG construct_proof: Input vectors count: {len(vectors_for_proof)}")
# --- END DEBUG PRINT ---
# Убираем instance_definitions из параметров
valid_vectors_input = vectors_for_proof # Переименуем для консистентности с кодом ниже
if not valid_vectors_input and not self.db_conn.get_concept_by_name(source_concept):
# --- DEBUG PRINT ---
# print("DEBUG construct_proof: Failed - No input vectors and source concept not found in DB.")
# --- END DEBUG PRINT ---
return {"status": "Failed", "reason": "No input vectors and source concept not found in DB", "is_valid": False}
# --- Теперь строим граф и vector_map из ВСЕХ предоставленных векторов ---
input_graph = self._build_proof_graph(valid_vectors_input)
vector_map = {v["id"]: v for v in valid_vectors_input}
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Built input graph with {len(input_graph['nodes'])} nodes and {len(input_graph['edges'])} edges.")
# print(f"DEBUG construct_proof: Vector map keys: {list(vector_map.keys())}")
# --- MORE DEBUG ---
import pprint
# print(f"DEBUG construct_proof: Input Graph Adjacency:\n{pprint.pformat(input_graph.get('adjacency', {}))}")
# --- END MORE DEBUG ---
# --- END DEBUG PRINT ---
# ---------------------------------------------------------------------
# Ищем путь: сначала только входные, потом с БД
path_info = self.find_proof_path(input_graph, source_concept, target_concept)
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Path finding result: Status='{path_info.get('status')}', Path length={len(path_info.get('path', []))}")
# --- END DEBUG PRINT ---
if path_info.get("status") != "Path found":
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Failed - Path not found. Reason: {path_info.get('status', 'Unknown')}")
# --- END DEBUG PRINT ---
return {"status": "Failed", "reason": f"Path not found: {path_info.get('status', 'Unknown')}", "is_valid": False}
path = path_info["path"]
db_vectors_used = path_info.get("db_vectors_used", [])
# --- MORE DEBUG ---
# print(f"DEBUG construct_proof: Found Path: {path}")
# print(f"DEBUG construct_proof: DB Vectors Used: {[v.get('id') for v in db_vectors_used]}")
# --- END MORE DEBUG ---
# Добавляем векторы из БД в vector_map
for db_vec in db_vectors_used:
if db_vec["id"] not in vector_map:
vector_map[db_vec["id"]] = db_vec
# --- MORE DEBUG ---
# print(f"DEBUG construct_proof: Vector Map Contents:")
# for vid, vdata in vector_map.items():
# print(f" - {vid}: Type={vdata.get('type')}, Source={vdata.get('source')}, Target={vdata.get('target')}")
# --- END MORE DEBUG ---
# --- Проверка на прямой путь ---
if len(path) == 1:
direct_vector_id = path[0][2]
direct_vector = vector_map.get(direct_vector_id)
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Path length is 1. Direct vector ID: {direct_vector_id}")
# --- END DEBUG PRINT ---
if direct_vector:
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Success - Direct proof found using vector {direct_vector_id}.")
# --- END DEBUG PRINT ---
return {
"status": "Success",
"source": source_concept,
"target": target_concept,
"steps": [], # Нет шагов для прямого доказательства
"rule": "direct", # Указываем, что это прямой путь
"direct_vector_id": direct_vector_id,
"is_valid": True, # Прямой путь считается валидным
"final_conclusion_type": direct_vector.get("type"),
"metadata": {} # Пока без метаданных о цикле здесь
}
else:
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Failed - Direct vector {direct_vector_id} not found in map.")
# --- END DEBUG PRINT ---
return {"status": "Failed", "reason": f"Direct vector {direct_vector_id} not found", "is_valid": False}
# --- Построение доказательства по шагам ---
steps = []
current_premise = None # Будет содержать ВЕКТОР (словарь)
overall_validity = True # Валидность всего доказательства
cycle_warning = None
visited_nodes_in_proof = {source_concept} # Для обнаружения циклов во время ПОСТРОЕНИЯ
# --- DEBUG PRINT ---
# print("DEBUG construct_proof: Starting step-by-step construction...")
# --- END DEBUG PRINT ---
for i, (seg_source, seg_target, vector_id, origin) in enumerate(path):
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Processing segment {i+1}/{len(path)}: {seg_source} -> {seg_target} via {vector_id} (from {origin})")
# --- END DEBUG PRINT ---
# Защита от отсутствия вектора в карте (на всякий случай)
premise2 = vector_map.get(vector_id)
if not premise2:
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Failed - Vector {vector_id} for segment {i+1} not found in map.")
# --- END DEBUG PRINT ---
overall_validity = False
return {"status": "Failed", "reason": f"Vector {vector_id} not found during step construction", "is_valid": False}
premise2_source = origin # 'input' or 'db'
if current_premise is None:
current_premise = premise2
source1 = premise2_source # Источник первой посылки - сам этот вектор
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Segment {i+1}: Initial premise set to {current_premise.get('id')}")
# --- MORE DEBUG ---
# print(f"DEBUG construct_proof: Segment {i+1}: Initial premise set to: ID={current_premise.get('id', 'NO_ID')}, Type={current_premise.get('type', 'NO_TYPE')}, Source={current_premise.get('source')}, Target={current_premise.get('target')}")
# --- END MORE DEBUG ---
# --- END DEBUG PRINT ---
else:
premises = [current_premise, premise2]
premise_ids = [p.get("id", "?") for p in premises]
source1 = "derived" if current_premise.get("derived") else current_premise.get("origin", "input") # Откуда первая посылка?
conclusion = None
is_step_valid = False
rule_name = None
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Segment {i+1}: Trying to apply rules. Premise1='{premises[0].get('id')}' ({source1}), Premise2='{premises[1].get('id')}' ({premise2_source})")
# --- MORE DEBUG ---
prem1_id = current_premise.get('id', 'NO_ID')
prem1_type = current_premise.get('type', 'NO_TYPE')
prem2_id = premise2.get('id', 'NO_ID')
prem2_type = premise2.get('type', 'NO_TYPE')
# print(f"DEBUG construct_proof: Applying rules. Premise1: ID={prem1_id}, Type={prem1_type} | Premise2: ID={prem2_id}, Type={prem2_type}")
# --- MORE DEBUG ---
# print(f" Premise1 Details: Source={current_premise.get('source')}, Target={current_premise.get('target')}")
# print(f" Premise2 Details: Source={premise2.get('source')}, Target={premise2.get('target')}")
# --- END MORE DEBUG ---
# --- END DEBUG PRINT ---
# Применяем подходящее правило
for key in self.inference_rules.keys():
temp_conclusion, temp_valid = self.apply_inference_rule(key, premises)
if temp_conclusion:
conclusion = temp_conclusion
is_step_valid = temp_valid
rule_name = key
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Segment {i+1}: Applied rule '{rule_name}'. Conclusion='{conclusion.get('id')}', StepValid={is_step_valid}")
# --- END DEBUG PRINT ---
break # Нашли подходящее правило
if conclusion:
conclusion["origin"] = "derived" # Помечаем, что вывод получен
step_detail = {
"id": conclusion["id"],
"rule": rule_name,
"premises": premise_ids,
"conclusion": conclusion,
"is_valid": is_step_valid,
"premise1_source": source1,
"premise2_source": premise2_source, # 'input' or 'db'
# --- DEBUG PRINT ---
# "debug_premise1": current_premise,
# "debug_premise2": premise2
# --- END DEBUG PRINT ---
}
steps.append(step_detail)
current_premise = conclusion # Результат этого шага становится первой посылкой для следующего
if not is_step_valid:
overall_validity = False
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Segment {i+1}: Step marked invalid, setting overall validity to False.")
# --- END DEBUG PRINT ---
# Можно прервать, если один шаг невалиден? Или достроить? Пока достраиваем.
# Проверка на цикл в построении
target_node = conclusion.get("target")
if target_node in visited_nodes_in_proof:
cycle_warning = f"Cycle detected during proof construction: revisiting node '{target_node}'"
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Segment {i+1}: {cycle_warning}")
# --- END DEBUG PRINT ---
else:
visited_nodes_in_proof.add(target_node)
else:
# Не смогли применить правило - доказательство невалидно
overall_validity = False
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Failed - No applicable rule found for premises {premise_ids} in segment {i+1}.")
# --- END DEBUG PRINT ---
return {"status": "Failed", "reason": f"No inference rule applicable for premises {premise_ids}", "is_valid": False}
# --- Финальное формирование результата ---
final_conclusion = current_premise # Последний вывод - это и есть результат
# Проверка, что финальный вывод соответствует запросу
if not final_conclusion or \
final_conclusion.get("source") != source_concept or \
final_conclusion.get("target") != target_concept:
# --- DEBUG PRINT ---
final_src = final_conclusion.get('source') if final_conclusion else 'None'
final_tgt = final_conclusion.get('target') if final_conclusion else 'None'
# print(f"DEBUG construct_proof: Failed - Final conclusion mismatch. Expected={source_concept}->{target_concept}, Got={final_src}->{final_tgt}")
# --- END DEBUG PRINT ---
overall_validity = False
# Статус все еще может быть Success, но is_valid = False? Или статус Failed?
# Сделаем статус Failed, если вывод не совпал.
return {
"status": "Failed",
"reason": f"Final conclusion mismatch: expected {source_concept}->{target_concept}, got {final_conclusion.get('source') if final_conclusion else 'N/A'}->{final_conclusion.get('target') if final_conclusion else 'N/A'}",
"is_valid": False,
"source": source_concept,
"target": target_concept,
"steps": steps,
"metadata": {"cycle_warning": cycle_warning} if cycle_warning else {}
}
final_result = {
"status": "Success", # Если дошли сюда, структура доказательства построена
"source": source_concept,
"target": target_concept,
"steps": steps,
"is_valid": overall_validity, # Валидность зависит от валидности всех шагов
"final_conclusion_type": final_conclusion.get("type"),
"metadata": {"cycle_warning": cycle_warning} if cycle_warning else {}
}
# --- DEBUG PRINT ---
# print(f"DEBUG construct_proof: Finished successfully.")
# print(f"DEBUG construct_proof: Final Result Status: {final_result['status']}")
# print(f"DEBUG construct_proof: Final Result IsValid: {final_result['is_valid']}")
# print(f"DEBUG construct_proof: Final Result Steps Count: {len(final_result['steps'])}")
# if final_result['steps']:
# for idx, step in enumerate(final_result['steps']):
# print(f" Step {idx+1} ({step['id']}): Rule='{step['rule']}', Premises={step['premises']}, Valid={step['is_valid']}, Conc={step['conclusion']['source']}->{step['conclusion']['target']}")
# print(f"DEBUG construct_proof: Final Conclusion Type: {final_result['final_conclusion_type']}")
# print(f"DEBUG construct_proof: Metadata: {final_result['metadata']}")
# --- END DEBUG PRINT ---
return final_result
def _build_proof_graph(self, vectors):
"""Вспомогательная функция для построения графа из векторов"""
graph = {"nodes": set(), "edges": [], "adjacency": {}}
for v in vectors:
source, target, v_id = v["source"], v["target"], v["id"]
graph["nodes"].add(source)
graph["nodes"].add(target)
graph["edges"].append((source, target, v_id))
# Обновление списка смежности
if source not in graph["adjacency"]:
graph["adjacency"][source] = {"out": [], "in": []}
if target not in graph["adjacency"]:
graph["adjacency"][target] = {"out": [], "in": []}
graph["adjacency"][source]["out"].append((target, v_id))
graph["adjacency"][target]["in"].append((source, v_id))
return graph
if __name__ == "__main__":
print(f"SFOSR Integrated System v{SFOSR_CONFIG['version']}")
print("Готов к анализу смысловых структур.")
# Пример входных данных (используем концепты из БД для демонстрации)
example = {
"text": "Emergence leads to multi-level space, which causes cross-level causation, finally leading to regulatory flow.",
"vectors": [
{
"id": "V_EC_MLS",
"source": "emergent_complexity",
"target": "multi_level_space",
"type": "Implication",
"axis": "structure <-> hierarchy",
"justification": "Emergence creates levels"
},
{
"id": "V_MLS_CLC",
"source": "multi_level_space",
"target": "cross_level_causation",
"type": "Causality", # Тип Causality
"axis": "level <-> interaction",
"justification": "Levels imply cross-level effects"
},
{
"id": "V_CLC_RF",
"source": "cross_level_causation",
"target": "regulatory_flow",
"type": "Causality", # Тип Causality
"axis": "cause <-> regulation",
"justification": "Cross-level effects drive regulation"
}
],
"proof_query": {
"source": "multi_level_space",
"target": "regulatory_flow"
}
}
# Создаем и тестируем интегрированную систему
system = SFOSRSystem()
result = system.process(example)
# Выводим результаты (обновлено для бинарной валидности)
print("\n--- Результаты Обработки ---")
print(f"Статус: {result['status']}")
if result['status'] == 'Success':
print(f"Компилируемость: {result['analysis']['is_compilable']}")
print("\n--- Верификация ---")
print(f"Всего обработано векторов: {result['verification']['total_vectors_processed']}")
print(f"Валидных векторов: {result['verification']['valid_count']}")
print(f"Уровень соответствия: {result['verification']['compliance_rate'] * 100:.1f}%")
print("Данные по Векторам:")
for v_id, data in result['verification']['vectors_data'].items():
valid_str = "Валиден" if data['is_valid'] else "Не валиден"
issues_str = ', '.join(data['issues']) if data['issues'] else 'Нет'
print(f" - {v_id}: Статус={valid_str}, Проблемы: {issues_str}")
if "proof" in result:
print("\n--- Доказательство ---")
proof = result['proof']
print(f"Статус: {proof['status']}")
valid_proof_str = "Валидно" if proof.get('is_valid', False) else "Не валидно"
print(f"Общая Валидность: {valid_proof_str}")
if proof['status'] == 'Success':
print(f"Доказательство: {proof['source']}{proof['target']}")
print(f"Тип финального вывода: {proof.get('final_conclusion_type', 'N/A')}")
print("Шаги доказательства:")
if proof['steps']:
for step in proof['steps']:
step_valid_str = "Валиден" if step['is_valid'] else "Не валиден"
print(f" - {step['id']}: Правило={step['rule']}, Посылки=({', '.join(step['premises'])}) | Статус={step_valid_str}")
if step['conclusion']:
print(f" Вывод: {step['conclusion']['source']}{step['conclusion']['target']} ({step['conclusion']['type']})")
else:
print(f" Вывод: None (Ошибка: {step.get('reason', '')})")
else:
print(" (Нет шагов)")
elif 'reason' in proof:
print(f"Причина неудачи: {proof['reason']}")
elif 'details' in result and 'validation_issues' in result['details']:
print("\n--- Ошибки Анализа ---")
for issue in result['details']['validation_issues']:
print(f" - {issue}")