""" SFOSR Integrated System (with Binary Validity) Этот модуль объединяет основные компоненты системы SFOSR: - Анализатор структуры (`SFOSRAnalyzer`) - Верификатор контрактов (`ContractVerifier`) - Систему построения доказательств (`ProofSystem`) для обеспечения комплексной обработки и формальной оценки смысловых структур на основе бинарной валидности. """ import json import subprocess import os from typing import Dict, List, Any, Tuple, Optional, Set, Union from .sfosr_database import SFOSRDatabase # Use relative import within the package # Конфигурация системы SFOSR_CONFIG = { "version": "0.4.0", "description": "Integrated SFOSR System", "components": ["analyzer", "verifier", "proof_system"], "debug_mode": False, "auto_update_plausibility": True # Автоматическое обновление plausibility } # Общие типы векторов VECTOR_TYPES = { "Causality": {"weight": 2.0, "requires_justification": True, "description": "Причинно-следственная связь"}, "Implication": {"weight": 1.8, "requires_justification": True, "description": "Логическое следование (если-то)"}, "Transformation": {"weight": 1.5, "requires_justification": False, "description": "Превращение из одного состояния в другое"}, "Goal": {"weight": 1.3, "requires_justification": False, "description": "Целеполагание, намерение"}, "Prevention": {"weight": 1.3, "requires_justification": False, "description": "Предотвращение нежелательного исхода"}, "Contrast": {"weight": 1.2, "requires_justification": False, "description": "Противопоставление"}, "Comparison": {"weight": 1.0, "requires_justification": False, "description": "Сравнение элементов"}, "Inclusion": {"weight": 0.8, "requires_justification": False, "description": "Отношение часть-целое"}, "Attribution": {"weight": 0.7, "requires_justification": False, "description": "Приписывание свойства объекту"}, "Temporal": {"weight": 1.1, "requires_justification": False, "description": "Временная последовательность"}, "Qualification": {"weight": 0.6, "requires_justification": False, "description": "Ограничение или уточнение"}, "Definition": {"weight": 1.4, "requires_justification": False, "description": "Определение понятия"}, "PartOf": {"constraints": [], "requires_justification": False}, "Mechanism": {"constraints": [], "requires_justification": True}, "Example": {"constraints": [], "requires_justification": False}, "Requirement": {"constraints": [], "requires_justification": True}, "Action": {"constraints": [], "requires_justification": False}, "Capability": {"constraints": [], "requires_justification": False}, "PropertyOf": {"constraints": [], "requires_justification": False}, "Purpose": {"constraints": [], "requires_justification": False}, "Governs": {"constraints": [], "requires_justification": False}, "Contains": {"constraints": [], "requires_justification": False}, "Represents": {"constraints": [], "requires_justification": False}, "Context": {"constraints": [], "requires_justification": False}, "IsA": {"constraints": [], "requires_justification": False}, # "ActsOn": {"constraints": [], "requires_justification": False}, # Removed as it's not used now "Dependency": {"constraints": [], "requires_justification": True} } # Интерфейс для интеграции компонентов class SFOSRSystem: """ Основной класс интегрированной системы SFOSR (на бинарной валидности) Объединяет: - Анализ структуры векторов - Проверку контрактов и определение валидности - Построение валидных доказательств """ def __init__(self, db_path="sfosr.db", debug=False): self.db_path = db_path self.db = SFOSRDatabase(db_path) # Database connection self._analyzer = SFOSRAnalyzer() # Prepare data for ContractVerifier all_concepts = self.db.get_all_concepts() known_concepts_names = {c['name'] for c in all_concepts} concepts_data_map = {c['name']: c for c in all_concepts} self._verifier = ContractVerifier(known_concepts=known_concepts_names, concepts_data=concepts_data_map) self._proof_system = ProofSystem(db_conn=self.db) # Pass db connection self.concept_graph = None self.debug = debug # Load inference rules from DB try: db_rules = self.db.get_inference_rules() if db_rules: self._proof_system.load_rules(db_rules) except Exception as e: if self.debug: print(f"Error loading inference rules from database: {str(e)}") # Графы self.concept_graph = {} def process(self, input_data): """ Основной метод обработки входных данных (с бинарной валидностью) Последовательно выполняет: 1. Анализ структуры 2. Проверку контрактов (определение is_valid) 3. Построение доказательств (если применимо, определение is_valid) (Использует только валидированные векторы из input_data, без обогащения из БД) Args: input_data: Словарь с текстом и векторами SFOSR Returns: Dict: Результаты обработки (с полями is_valid) """ # Шаг 1: Анализ структуры analysis_result = self._analyzer.analyze(input_data) self.concept_graph = analysis_result["concept_graph"] # Если анализ не прошел if analysis_result["analysis_status"] != "Completed": return { "status": "Error", "message": f"Analysis failed: {analysis_result['analysis_status']}", "details": { "validation_issues": analysis_result["validation_issues"] } } # Получаем инстансы из контекста, если они есть instance_definitions = input_data.get("instance_definitions", {}) # Шаг 2: Проверка контрактов vectors_to_verify = analysis_result["vectors_analyzed"] # Передаем инстансы в верификатор verification_result = self._verifier.verify_all(vectors_to_verify, instance_definitions) # Собираем только валидные векторы для системы доказательств valid_input_vectors = [] vectors_data = verification_result["vectors_data"] for vector in vectors_to_verify: v_id = vector.get("id") # Используем get для безопасного доступа и проверяем наличие ключа 'vector' vector_dict = vectors_data.get(v_id, {}).get('vector') if vector_dict and vectors_data[v_id].get("is_valid", False): valid_input_vectors.append(vector_dict) # Добавляем исходный вектор # --- Генерация временных IsA векторов --- temporary_isa_vectors = [] for instance_id, definition in instance_definitions.items(): general_type = definition.get('is_a') instance_label = definition.get('label', instance_id) # Use label or ID if general_type: # Проверяем, существует ли общий тип в БД if self.db.get_concept_by_name(general_type): temporary_isa_vectors.append({ "id": f"isa_{instance_id}", # Уникальный временный ID "source": instance_id, # Используем временный ID "target": general_type, # Ссылка на общий тип в БД "type": "IsA", "axis": "classification", "justification": f"Instance '{instance_label}' defined as type '{general_type}' in input context.", "is_valid": True # Считаем эти связи априори валидными для доказательства }) else: print(f"Warning: General type '{general_type}' for instance '{instance_id}' not found in DB. Skipping IsA vector generation.") # ---------------------------------------- # Базовый результат обработки result = { "status": "Success", "input_text": input_data.get("text", ""), "analysis": { "status": analysis_result["analysis_status"], "is_compilable": analysis_result["is_compilable"], "graph_metrics": analysis_result["graph_metrics"] }, "verification": { "total_vectors": verification_result["total_vectors_processed"], "valid_count": verification_result["valid_count"], "compliance_rate": verification_result["compliance_rate"], "vectors_data": verification_result["vectors_data"] } } # Шаг 3: Построение доказательств vectors_for_proof = valid_input_vectors + temporary_isa_vectors # Запускаем, если есть запрос И есть ХОТЬ КАКИЕ-ТО векторы (входные или IsA) if "proof_query" in input_data and vectors_for_proof: query = input_data["proof_query"] source = query.get("source") target = query.get("target") if source and target: proof_result = self._proof_system.construct_proof( vectors_for_proof, source, target ) result["proof"] = proof_result else: # Нет source/target result["proof"] = {"status": "Failed", "reason": "Missing source or target in proof query", "is_valid": False} # else: # Нет proof_query или нет векторов - proof не создается # pass return result def analyze(self, input_data): """Удобный метод для выполнения только анализа""" return self._analyzer.analyze(input_data) def verify(self, input_data): """Удобный метод для выполнения только верификации""" vectors = input_data.get("vectors", []) # Сначала базовый анализ для получения структурно валидных векторов analysis_result = self._analyzer.analyze(input_data) # Передаем пустой словарь instance_definitions, т.к. verify не работает с контекстом return self._verifier.verify_all(analysis_result["vectors_analyzed"], instance_definitions={}) def prove(self, input_data, source, target): """Удобный метод для построения доказательства. Анализирует, верифицирует и строит доказательство, используя только валидированные векторы из input_data (без обогащения из БД). """ # Сначала анализ analysis_res = self.analyze(input_data) if analysis_res["analysis_status"] != "Completed": return {"status": "Failed", "reason": "Analysis failed"} # Получаем список векторов, прошедших анализ vectors_analyzed = analysis_res.get("vectors_analyzed", []) if not vectors_analyzed: return {"status": "Failed", "reason": "No vectors passed analysis"} # Затем верификация этих векторов # Передаем пустой instance_definitions, т.к. prove работает с готовым input_data # Хотя, возможно, стоило бы передавать реальный instance_definitions из input_data? # Пока оставим пустым для совместимости. verification_res = self._verifier.verify_all(vectors_analyzed, instance_definitions={}) vectors_data = verification_res.get("vectors_data", {}) # Извлекаем валидные векторы, ИТЕРРИРУЯ ПО ИСХОДНОМУ СПИСКУ valid_vectors = [ vector # Берем исходный вектор for vector in vectors_analyzed # Итерируем по результатам анализа if vectors_data.get(vector.get("id", ""), {}).get("is_valid", False) # Проверяем валидность в результатах верификации ] if not valid_vectors: return {"status": "Failed", "reason": "No valid vectors after verification"} # Enrichment is disabled, use valid_vectors directly vectors_for_proof = valid_vectors return self._proof_system.construct_proof(vectors_for_proof, source, target) def get_concept_info(self, concept_name): """ Получение информации о концепте из БД Args: concept_name: Имя концепта Returns: Dict: Информация о концепте или None """ return self.db.get_complete_concept_info(concept_name) def find_related_concepts(self, concept_name, depth=1): """ Поиск связанных концептов Args: concept_name: Имя концепта depth: Глубина поиска Returns: List: Список связанных концептов """ concept = self.db.get_concept_by_name(concept_name) if not concept: return [] return self.db.get_related_concepts(concept["id"], depth) def add_concept_to_db(self, name, description, domain, level): """ Добавление нового концепта в БД Args: name: Имя концепта description: Описание domain: Домен (область знаний) level: Уровень абстракции Returns: int: ID добавленного концепта """ return self.db.add_concept(name, description, domain, level) def add_vector_to_db(self, source_name, target_name, vector_type, axis, justification=None): """ Добавление нового вектора в БД Args: source_name: Имя исходного концепта target_name: Имя целевого концепта vector_type: Тип вектора axis: Ось justification: Обоснование Returns: int: ID добавленного вектора или None в случае ошибки """ source = self.db.get_concept_by_name(source_name) target = self.db.get_concept_by_name(target_name) if not source or not target: return None return self.db.add_vector(source["id"], target["id"], vector_type, axis, justification) # Реализация компонентов системы class SFOSRAnalyzer: """ Анализатор структуры векторов SFOSR Отвечает за: - Проверку синтаксиса и базовой структуры векторов - Проверку компилируемости (наличие необходимых полей) - Построение графа концептов """ def __init__(self, vector_types=None): """Инициализация анализатора""" self.vector_types = vector_types or VECTOR_TYPES def build_concept_graph(self, vectors): """ Строит граф концептов и связей между ними Args: vectors: Список векторов SFOSR Returns: Dict: Структура графа с узлами и связями """ # Структура для хранения графа graph = { "nodes": set(), # уникальные концепты "edges": [], # связи (кортежи source, target, vector_id) "adjacency": {}, # словарь смежности для быстрого доступа } # Собираем все уникальные концепты и ребра all_nodes = set() for vector in vectors: source = vector.get("source") target = vector.get("target") vector_id = vector.get("id") if source: all_nodes.add(source) if source not in graph["adjacency"]: graph["adjacency"][source] = {"out": [], "in": []} if target: all_nodes.add(target) if target not in graph["adjacency"]: graph["adjacency"][target] = {"out": [], "in": []} if source and target and vector_id: edge = (source, target, vector_id) graph["edges"].append(edge) graph["adjacency"][source]["out"].append((target, vector_id)) graph["adjacency"][target]["in"].append((source, vector_id)) graph["nodes"] = all_nodes return graph def validate_vector_structure(self, vector): """ Проверяет структуру вектора на соответствие базовым требованиям Args: vector: Словарь с данными вектора Returns: Tuple[bool, Optional[str]]: (валидность, сообщение об ошибке) """ required_keys = ["id", "source", "target", "type", "axis"] missing_keys = [key for key in required_keys if key not in vector or not vector[key]] if missing_keys: return False, f"Vector {vector.get('id', 'Unknown')} missing keys: {', '.join(missing_keys)}" # Проверяем, существует ли указанный тип вектора vector_type = vector.get("type") if vector_type not in self.vector_types: return False, f"Vector {vector.get('id', 'Unknown')} has invalid type: {vector_type}" return True, None def validate_compilability(self, vector): """ Проверяет на компилируемость (достаточность данных) Args: vector: Словарь с данными вектора Returns: Tuple[bool, Optional[str]]: (компилируемость, сообщение об ошибке) """ vector_type = vector.get("type") # Проверяем требования обоснования в зависимости от типа if (vector_type in self.vector_types and self.vector_types[vector_type]["requires_justification"]): if not vector.get("justification"): return False, f"Vector {vector.get('id', 'Unknown')} requires justification for type {vector_type}." return True, None def analyze(self, input_data): """ Главная функция анализа структуры SFOSR (упрощенная) Args: input_data: Словарь с текстом и векторами Returns: Dict: Результаты анализа (валидация и граф) """ input_text = input_data.get("text", "N/A") vectors = input_data.get("vectors", []) valid_vectors = [] validation_issues = [] analysis_status = "Completed" # 1. Валидация структуры и компилируемости каждого вектора for vector in vectors: is_struct_valid, struct_error = self.validate_vector_structure(vector) if not is_struct_valid: validation_issues.append(struct_error) analysis_status = "Validation Error" continue # Невалидную структуру дальше не проверяем is_comp_valid, comp_error = self.validate_compilability(vector) if not is_comp_valid: validation_issues.append(comp_error) # Продолжаем анализ, но помечаем проблему # Собираем только структурно валидные векторы valid_vectors.append(vector) # Определяем компилируемость по наличию проблем is_compilable = len(validation_issues) == 0 if not is_compilable and analysis_status == "Completed": analysis_status = "Compilability Error" # Если были только проблемы компилируемости # 2. Построение графа концептов (только из структурно валидных векторов) graph = self.build_concept_graph(valid_vectors) # 3. Формирование упрощенного результата result = { "input_text": input_text, "analysis_status": analysis_status, "is_compilable": is_compilable, "validation_issues": validation_issues, "graph_metrics": { # Упрощенные метрики графа "concepts_count": len(graph["nodes"]), "connections_count": len(graph["edges"]), }, "vectors_analyzed": valid_vectors, # Содержит только структурно валидные "concept_graph": graph } return result class ContractVerifier: """ Верификатор контрактов векторов SFOSR Проверяет соответствие векторов формальным контрактам, определяет бинарную валидность (`is_valid`) вектора и собирает метаданные. """ def __init__(self, contract_types=None, known_concepts: Optional[Set[str]] = None, concepts_data: Optional[Dict[str, Dict]] = None): """Инициализация верификатора с известными концептами и их данными (уровнями).""" self.contract_types = contract_types or set(VECTOR_TYPES.keys()) self.known_concepts = known_concepts or set() # --- Сохраняем данные об уровнях --- self.concepts_data = concepts_data or {} # ---------------------------------- self.axis_registry = set() def verify_vector_contract(self, vector: Dict[str, Any], instance_definitions: Dict[str, Dict]) -> Tuple[bool, List[str], Dict[str, Any]]: """Проверяет отдельный вектор на соответствие контрактам""" issues = [] metadata = {} is_valid = True # Начинаем с предположения о валидности # --- Проверка существования концептов и их типов --- source_name = vector.get("source") target_name = vector.get("target") vector_type = vector.get("type") vector_id = vector.get("id", "Unknown") # --- Получаем реальные ТИПЫ концептов для проверки --- source_type_name = source_name target_type_name = target_name is_source_instance = False is_target_instance = False if source_name in instance_definitions: source_type_name = instance_definitions[source_name].get("is_a") is_source_instance = True if not source_type_name: issues.append(f"Instance '{source_name}' in vector {vector_id} has no 'is_a' type defined in context.") is_valid = False source_type_name = None # Не можем проверить дальше if target_name in instance_definitions: target_type_name = instance_definitions[target_name].get("is_a") is_target_instance = True if not target_type_name: issues.append(f"Instance '{target_name}' in vector {vector_id} has no 'is_a' type defined in context.") is_valid = False target_type_name = None # Не можем проверить дальше # ---------------------------------------------------- source_concept_data = None target_concept_data = None if source_type_name and is_valid: source_concept_data = self.concepts_data.get(source_type_name) if not source_concept_data: issues.append(f"Source concept/type '{source_type_name}' (for '{source_name}') not found in known concepts for vector {vector_id}.") is_valid = False if target_type_name and is_valid: target_concept_data = self.concepts_data.get(target_type_name) if not target_concept_data: issues.append(f"Target concept/type '{target_type_name}' (for '{target_name}') not found in known concepts for vector {vector_id}.") is_valid = False # --- Проверка контрактов типа Transformation --- if is_valid and vector_type == "Transformation": if source_name == target_name: issues.append(f"Transformation vector {vector_id} cannot have the same source and target ('{source_name}').") is_valid = False # --- Проверка контракта для Causality (разные уровни) --- if is_valid and vector_type == "Causality" and "level" in vector.get("axis", ""): if source_concept_data and target_concept_data: source_level = source_concept_data.get('level') target_level = target_concept_data.get('level') if source_level and target_level and source_level == target_level: issues.append(f"Causality vector {vector_id} ('{source_type_name}' -> '{target_type_name}') links concepts on the same level '{source_level}' with axis containing 'level'.") is_valid = False # --- Добавить другие специфичные для типов векторов проверки --- # Например, для ActsOn: source должен быть подтипом Action, target - подтипом Object? # Это потребует иерархии в БД или более сложной логики. # Регистрация осей остается if vector.get("axis") and vector["axis"] not in self.axis_registry: self.axis_registry.add(vector["axis"]) # Добавляем сами данные вектора в метаданные для использования в `prove` # metadata['vector'] = vector # Убрали - теперь prove получает исходный список return is_valid, issues, metadata def verify_all(self, vectors: List[Dict[str, Any]], instance_definitions: Dict[str, Dict]) -> Dict[str, Any]: """Проверка всех векторов, агрегация валидности и метаданных""" vectors_data = {} valid_count = 0 processed_count = 0 for vector in vectors: processed_count += 1 vector_id = vector.get("id", f"unknown_{processed_count}") # Передаем instance_definitions в проверку контракта is_valid, issues, metadata = self.verify_vector_contract(vector, instance_definitions) vectors_data[vector_id] = { "vector": vector, "is_valid": is_valid, "issues": issues, "metadata": metadata } if is_valid: valid_count += 1 # Формируем отчет report = { "total_vectors_processed": processed_count, "valid_count": valid_count, "compliance_rate": round(valid_count / processed_count, 3) if processed_count > 0 else 0.0, "vectors_data": vectors_data # Основные данные теперь здесь } return report class ProofSystem: """ Система построения доказательств SFOSR Отвечает за: - Построение доказательств на основе ВАЛИДНЫХ векторов (и данных из БД) - Проверку итоговой валидности (`is_valid`) доказательств - Поиск путей доказательства между концептами (с использованием БД) """ def __init__(self, db_conn): """Инициализация системы доказательств. Args: db_conn: Экземпляр SFOSRDatabase для доступа к БД. """ self.db_conn = db_conn # Store the database connection # Базовые правила вывода (с бинарной валидностью) self.inference_rules = { "chain_rule": { "pattern": "A → B, B → C ⊢ A → C", "premise_types": ["Implication", "Implication"], "conclusion_type": "Implication", "domain": "logical_inference" }, "causality_transfer": { "pattern": "A → B (Causality), B → C (Causality) ⊢ A → C (Causality)", "premise_types": ["Causality", "Causality"], "conclusion_type": "Causality", "domain": "causal_inference" }, "implication_causality_chain": { "pattern": "A → B (Implication), B → C (Causality) ⊢ A → C (Causality)", "premise_types": ["Implication", "Causality"], "conclusion_type": "Causality", "domain": "mixed_inference" }, # --- New Rule --- "part_of_transitivity": { "pattern": "A PartOf B, B PartOf C ⊢ A PartOf C", "premise_types": ["PartOf", "PartOf"], "conclusion_type": "PartOf", "domain": "mereology" }, # --- НОВОЕ ПРАВИЛО --- "action_causality_chain": { "pattern": "A -> B (Action), B -> C (Causality) |- A -> C (Causality)", "premise_types": ["Action", "Causality"], "conclusion_type": "Causality", "domain": "action_inference" }, # --- ЕЩЕ ОДНО НОВОЕ ПРАВИЛО --- "action_isa_generalization": { "pattern": "A -> B_inst (Action), B_inst IsA B_type |- A -> B_type (Action)", "premise_types": ["Action", "IsA"], "conclusion_type": "Action", # Результат - обобщенное действие "domain": "inheritance_inference" } } # Кэш для хранения построенных доказательств (только структура вывода) self.proof_cache = {} def load_rules(self, db_rules): """ Загрузка правил вывода из БД (игнорируя любые старые данные plausibility) Args: db_rules: Словарь с правилами вывода из БД """ for name, rule_data in db_rules.items(): rule_data.pop('plausibility', None) # Убеждаемся, что plausibility удалено self.inference_rules[name] = rule_data # --- Helper for Input-Only BFS --- def _find_path_using_input_graph(self, input_graph, source_concept, target_concept) -> Dict[str, Any]: """BFS using only the input graph.""" # print(f"DEBUG _find_path_using_input_graph: Start {source_concept} -> {target_concept}") # UNCOMMENTED if source_concept not in input_graph["nodes"]: # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Source '{source_concept}' not in input graph nodes: {input_graph['nodes']}") # --- END DEBUG PRINT --- return {"status": "Source node not found"} visited = {source_concept} queue: List[Tuple[str, List[Tuple[str, str, str, str]]]] = [(source_concept, [])] while queue: current_concept, path = queue.pop(0) # print(f"DEBUG _find_path_using_input_graph: Dequeue '{current_concept}'") # UNCOMMENTED if current_concept in input_graph["adjacency"]: for next_concept_input, vector_id_input in input_graph["adjacency"][current_concept].get("out", []): # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Input BFS): Edge {current_concept} -> {next_concept_input} via {vector_id_input}") # UNCOMMENTED # --- END DEBUG PRINT --- if next_concept_input == target_concept: final_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] # print(f"DEBUG _find_path_using_input_graph: Target reached. Path: {final_path}") # UNCOMMENTED # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Input BFS): Target '{target_concept}' reached. Path: {final_path}") # --- END DEBUG PRINT --- return {"status": "Path found", "path": final_path, "db_vectors_used": []} if next_concept_input not in visited: visited.add(next_concept_input) new_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] queue.append((next_concept_input, new_path)) # print(f"DEBUG _find_path_using_input_graph: Enqueue '{next_concept_input}'") # UNCOMMENTED # print(f"DEBUG _find_path_using_input_graph: Path not found.") # UNCOMMENTED # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Input BFS): Path not found from '{source_concept}' to '{target_concept}'") # --- END DEBUG PRINT --- return {"status": "Path not found (input only)"} # --- Helper for Combined BFS --- def _find_path_using_combined_graph(self, input_graph, source_concept, target_concept) -> Dict[str, Any]: """BFS using input graph AND database lookups.""" # print(f"\\nDEBUG _find_path_using_combined_graph: Start {source_concept} -> {target_concept}") # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): Start {source_concept} -> {target_concept}") # --- END DEBUG PRINT --- if source_concept not in input_graph["nodes"]: if not self.db_conn.get_concept_by_name(source_concept): # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): Source '{source_concept}' not in input graph or DB.") # --- END DEBUG PRINT --- return {"status": "Source node not found"} # visited теперь словарь: {concept_name: origin ('input' или 'db')} visited: Dict[str, str] = {source_concept: 'start'} queue: List[Tuple[str, List[Tuple[str, str, str, str]], Set[int]]] = [(source_concept, [], set())] used_db_vector_ids = set() db_vector_cache = {} while queue: current_concept, path, current_used_db_ids = queue.pop(0) # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): Dequeue '{current_concept}'") # --- END DEBUG PRINT --- # --- Шаг 1: Входной граф --- if current_concept in input_graph["adjacency"]: for next_concept_input, vector_id_input in input_graph["adjacency"][current_concept].get("out", []): # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): Input Edge {current_concept} -> {next_concept_input} via {vector_id_input}") # --- END DEBUG PRINT --- if next_concept_input == target_concept: final_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] final_db_vectors_list = [ self.db_conn.convert_db_vector_to_system_format(db_vector_cache[vid]) for vid in current_used_db_ids if vid in db_vector_cache ] # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): Target '{target_concept}' reached via input edge. Path: {final_path}") # --- END DEBUG PRINT --- return {"status": "Path found", "path": final_path, "db_vectors_used": final_db_vectors_list} if next_concept_input not in visited: visited[next_concept_input] = 'input' # Помечаем как посещенный через input new_path = path + [(current_concept, next_concept_input, vector_id_input, 'input')] queue.append((next_concept_input, new_path, current_used_db_ids)) # --- Шаг 2: База Данных --- try: current_concept_info = self.db_conn.get_concept_by_name(current_concept) if not current_concept_info: # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): Concept '{current_concept}' not found in DB for DB search.") # --- END DEBUG PRINT --- continue current_concept_id = current_concept_info['id'] # --- MORE DEBUG --- # print(f"DEBUG _find_path_using_combined_graph: Querying DB vectors for concept '{current_concept}' (ID: {current_concept_id})") # --- END MORE DEBUG --- db_vectors_raw = self.db_conn.get_vectors_for_concept(current_concept_id) # --- MORE DEBUG --- # print(f"DEBUG _find_path_using_combined_graph: Received {len(db_vectors_raw)} vectors from DB for ID {current_concept_id}:") # for dbv in db_vectors_raw: # print(f" - ID: V{dbv.get('id')}, Type: {dbv.get('vector_type')}, Source: {dbv.get('source_name')}, Target: {dbv.get('target_name')}") # --- END MORE DEBUG --- for db_vector in db_vectors_raw: if db_vector['source_id'] == current_concept_id: next_concept_db = db_vector['target_name'] db_vector_actual_id = db_vector['id'] db_vector_system_id = f"V{db_vector_actual_id}" # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): DB Edge {current_concept} -> {next_concept_db} via {db_vector_system_id}") # --- END DEBUG PRINT --- if db_vector_actual_id not in db_vector_cache: db_vector_cache[db_vector_actual_id] = db_vector new_used_db_ids = current_used_db_ids.union({db_vector_actual_id}) if next_concept_db == target_concept: final_path = path + [(current_concept, next_concept_db, db_vector_system_id, 'db')] final_db_vectors_list = [ self.db_conn.convert_db_vector_to_system_format(db_vector_cache[vid]) for vid in new_used_db_ids if vid in db_vector_cache ] # --- DEBUG PRINT --- # print(f"DEBUG construct_proof (Combined BFS): Target '{target_concept}' reached via DB edge. Path: {final_path}") # --- END DEBUG PRINT --- return {"status": "Path found", "path": final_path, "db_vectors_used": final_db_vectors_list} # Проверяем, был ли узел посещен и откуда current_visit_status = visited.get(next_concept_db) # Добавляем в очередь, ТОЛЬКО если не посещен через input if current_visit_status != 'input': # Если еще не посещался или посещался через db, обновляем/добавляем if current_visit_status is None or current_visit_status == 'db': visited[next_concept_db] = 'db' # Помечаем как посещенный через db new_path = path + [(current_concept, next_concept_db, db_vector_system_id, 'db')] queue.append((next_concept_db, new_path, new_used_db_ids)) except Exception as e: print(f"DB Error during path finding in combined search: {e}") return {"status": "DB error", "reason": str(e)} return {"status": "Path not found (combined)"} # --- Orchestrator Method --- def find_proof_path(self, input_graph, source_concept, target_concept) -> Dict[str, Any]: """ Ищет путь доказательства: сначала только по входным данным, затем с БД. Args: input_graph: Граф, построенный ТОЛЬКО из валидных входных векторов. source_concept: Имя исходного концепта. target_concept: Имя целевого концепта. Returns: Dict: Результат поиска пути (статус, путь, db_vectors_used). """ # Phase 1: Input vectors only # print("DEBUG find_proof_path: Starting Phase 1 (Input Only)") input_path_info = self._find_path_using_input_graph(input_graph, source_concept, target_concept) if input_path_info["status"] == "Path found": # print("DEBUG find_proof_path: Path found in Phase 1. Returning.") return input_path_info # Phase 2: Combined input and DB vectors # print("DEBUG find_proof_path: Path not found in Phase 1. Starting Phase 2 (Combined Input+DB)") combined_path_info = self._find_path_using_combined_graph(input_graph, source_concept, target_concept) if combined_path_info["status"] == "Path not found (combined)": combined_path_info["status"] = "Path not found" # print(f"DEBUG find_proof_path: Phase 2 finished with status: {combined_path_info['status']}") return combined_path_info def _apply_chain_rule(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: """Логика для правила chain_rule. Возвращает (вывод, валидность_шага).""" if len(premises) != 2: return None, False v1, v2 = premises # Проверяем соответствие паттерну правила if v1["target"] == v2["source"] and \ v1["type"] == "Implication" and \ v2["type"] == "Implication": # Вторая посылка должна быть Implication # Определяем тип вывода (просто берем из правила) conclusion_type = self.inference_rules["chain_rule"]["conclusion_type"] # Формируем вывод conclusion = { "id": f"S{len(self.proof_cache) + 1}", # Генерируем ID для шага "source": v1["source"], "target": v2["target"], "type": conclusion_type, "axis": v1["axis"], # Берем ось из первой посылки (можно уточнить) "justification": f"Derived by chain_rule from {v1['id']} and {v2['id']}", "derived": True # Помечаем, что вектор выведен } return conclusion, True # Шаг валиден return None, False # Правило неприменимо def _apply_causality_transfer(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: """Логика для правила causality_transfer. Возвращает (вывод, валидность_шага).""" if len(premises) != 2: return None, False v1, v2 = premises # Проверяем соответствие паттерну правила if v1["target"] == v2["source"] and \ v1["type"] == "Causality" and \ v2["type"] == "Causality": # Вторая посылка должна быть Causality conclusion_type = self.inference_rules["causality_transfer"]["conclusion_type"] conclusion = { "id": f"S{len(self.proof_cache) + 1}", "source": v1["source"], "target": v2["target"], "type": conclusion_type, "axis": v1["axis"], "justification": f"Derived by causality_transfer from {v1['id']} and {v2['id']}", "derived": True } return conclusion, True return None, False def _apply_implication_causality_chain(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: """Логика для правила implication_causality_chain. Возвращает (вывод, валидность_шага).""" if len(premises) != 2: return None, False v1, v2 = premises # Проверяем соответствие паттерну if v1["target"] == v2["source"] and \ v1["type"] == "Implication" and \ v2["type"] == "Causality": conclusion_type = self.inference_rules["implication_causality_chain"]["conclusion_type"] conclusion = { "id": f"S{len(self.proof_cache) + 1}", "source": v1["source"], "target": v2["target"], "type": conclusion_type, "axis": v1["axis"], # Берем ось из первой посылки "justification": f"Derived by implication_causality_chain from {v1['id']} and {v2['id']}", "derived": True } return conclusion, True return None, False # --- New Method for PartOf Rule --- def _apply_part_of_transitivity(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: """Логика для правила part_of_transitivity.""" if len(premises) != 2: return None, False v1, v2 = premises # Проверяем типы посылок и связь if v1["target"] == v2["source"] and \ v1.get("type") == "PartOf" and \ v2.get("type") == "PartOf": conclusion_type = self.inference_rules["part_of_transitivity"]["conclusion_type"] conclusion = { "id": f"S{len(self.proof_cache) + 1}", "source": v1["source"], "target": v2["target"], "type": conclusion_type, "axis": v1.get("axis", "partonomy"), # Use axis from v1 or default "justification": f"Derived by part_of_transitivity from {v1.get('id', '?')} and {v2.get('id', '?')}", "derived": True } return conclusion, True return None, False # --- Новая логика для правила Action -> Causality --- def _apply_action_causality_chain(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: """Логика для правила action_causality_chain. Возвращает (вывод, валидность_шага).""" if len(premises) != 2: return None, False v1, v2 = premises # Проверяем соответствие паттерну правила: A->B (Action), B->C (Causality) if v1["target"] == v2["source"] and \ v1.get("type") == "Action" and \ v2.get("type") == "Causality": conclusion_type = self.inference_rules["action_causality_chain"]["conclusion_type"] conclusion = { "id": f"S{len(self.proof_cache) + 1}", "source": v1["source"], "target": v2["target"], "type": conclusion_type, "axis": v1.get("axis", v2.get("axis")), # Ось можно взять из Action или Causality "justification": f"Derived by action_causality_chain from {v1.get('id', '?')} and {v2.get('id', '?')}", "derived": True } return conclusion, True # Шаг считаем валидным, если правило применилось return None, False # --- Логика для правила Action -> IsA --- def _apply_action_isa_generalization(self, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: """Логика для правила action_isa_generalization.""" if len(premises) != 2: return None, False v_action, v_isa = premises # Ожидаем Action, затем IsA # Проверяем типы и связь: A -> B_inst (Action), B_inst IsA B_type if v_action.get("type") == "Action" and \ v_isa.get("type") == "IsA" and \ v_action.get("target") == v_isa.get("source"): # Target(Action) == Source(IsA) conclusion_type = self.inference_rules["action_isa_generalization"]["conclusion_type"] source_a = v_action.get("source") target_b_type = v_isa.get("target") # Берем тип из IsA conclusion = { "id": f"S{len(self.proof_cache) + 1}", "source": source_a, "target": target_b_type, "type": conclusion_type, # Тип сохраняется как Action "axis": v_action.get("axis"), # Ось берем из Action "justification": f"Derived by action_isa_generalization from {v_action.get('id', '?')} and {v_isa.get('id', '?')}", "derived": True } # print(f"DEBUG _apply_action_isa_generalization: Applied. Conclusion: {conclusion}") # Временный дебаг return conclusion, True # print(f"DEBUG _apply_action_isa_generalization: Rule not applicable. v_action type: {v_action.get('type')}, v_isa type: {v_isa.get('type')}, link: {v_action.get('target')} == {v_isa.get('source')}") # Временный дебаг return None, False def apply_inference_rule(self, rule_name: str, premises: List[Dict]) -> Tuple[Optional[Dict], bool]: """Применяет правило вывода, возвращая (вывод, валидность_шага).""" rule_functions = { "chain_rule": self._apply_chain_rule, "causality_transfer": self._apply_causality_transfer, "implication_causality_chain": self._apply_implication_causality_chain, "part_of_transitivity": self._apply_part_of_transitivity, "action_causality_chain": self._apply_action_causality_chain, # Добавляем новое правило "action_isa_generalization": self._apply_action_isa_generalization # Добавляем еще одно новое правило } conclusion, is_step_valid = None, False if rule_name in rule_functions: # Предполагаем, что в premises УЖЕ только валидные векторы conclusion, is_step_valid = rule_functions[rule_name](premises) # --- DEBUG PRINT --- # print(f"DEBUG apply_inference_rule: Rule='{rule_name}', Premises={[p.get('id', '?') for p in premises]}, Conclusion='{conclusion.get('id', None) if conclusion else None}', StepValid={is_step_valid}") # --- END DEBUG PRINT --- if conclusion: # Кэшируем только структуру успешного вывода self.proof_cache[conclusion["id"]] = conclusion return conclusion, is_step_valid def construct_proof(self, vectors_for_proof: List[Dict], source_concept: str, target_concept: str) -> Dict: """Построение доказательства от source_concept к target_concept. Использует граф из предоставленных векторов (входных + временных IsA) и динамически подгружает векторы из БД. """ # --- DEBUG PRINT --- # print(f"\\nDEBUG construct_proof: Start. Query: {source_concept} -> {target_concept}") # print(f"DEBUG construct_proof: Input vectors count: {len(vectors_for_proof)}") # --- END DEBUG PRINT --- # Убираем instance_definitions из параметров valid_vectors_input = vectors_for_proof # Переименуем для консистентности с кодом ниже if not valid_vectors_input and not self.db_conn.get_concept_by_name(source_concept): # --- DEBUG PRINT --- # print("DEBUG construct_proof: Failed - No input vectors and source concept not found in DB.") # --- END DEBUG PRINT --- return {"status": "Failed", "reason": "No input vectors and source concept not found in DB", "is_valid": False} # --- Теперь строим граф и vector_map из ВСЕХ предоставленных векторов --- input_graph = self._build_proof_graph(valid_vectors_input) vector_map = {v["id"]: v for v in valid_vectors_input} # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Built input graph with {len(input_graph['nodes'])} nodes and {len(input_graph['edges'])} edges.") # print(f"DEBUG construct_proof: Vector map keys: {list(vector_map.keys())}") # --- MORE DEBUG --- import pprint # print(f"DEBUG construct_proof: Input Graph Adjacency:\n{pprint.pformat(input_graph.get('adjacency', {}))}") # --- END MORE DEBUG --- # --- END DEBUG PRINT --- # --------------------------------------------------------------------- # Ищем путь: сначала только входные, потом с БД path_info = self.find_proof_path(input_graph, source_concept, target_concept) # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Path finding result: Status='{path_info.get('status')}', Path length={len(path_info.get('path', []))}") # --- END DEBUG PRINT --- if path_info.get("status") != "Path found": # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Failed - Path not found. Reason: {path_info.get('status', 'Unknown')}") # --- END DEBUG PRINT --- return {"status": "Failed", "reason": f"Path not found: {path_info.get('status', 'Unknown')}", "is_valid": False} path = path_info["path"] db_vectors_used = path_info.get("db_vectors_used", []) # --- MORE DEBUG --- # print(f"DEBUG construct_proof: Found Path: {path}") # print(f"DEBUG construct_proof: DB Vectors Used: {[v.get('id') for v in db_vectors_used]}") # --- END MORE DEBUG --- # Добавляем векторы из БД в vector_map for db_vec in db_vectors_used: if db_vec["id"] not in vector_map: vector_map[db_vec["id"]] = db_vec # --- MORE DEBUG --- # print(f"DEBUG construct_proof: Vector Map Contents:") # for vid, vdata in vector_map.items(): # print(f" - {vid}: Type={vdata.get('type')}, Source={vdata.get('source')}, Target={vdata.get('target')}") # --- END MORE DEBUG --- # --- Проверка на прямой путь --- if len(path) == 1: direct_vector_id = path[0][2] direct_vector = vector_map.get(direct_vector_id) # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Path length is 1. Direct vector ID: {direct_vector_id}") # --- END DEBUG PRINT --- if direct_vector: # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Success - Direct proof found using vector {direct_vector_id}.") # --- END DEBUG PRINT --- return { "status": "Success", "source": source_concept, "target": target_concept, "steps": [], # Нет шагов для прямого доказательства "rule": "direct", # Указываем, что это прямой путь "direct_vector_id": direct_vector_id, "is_valid": True, # Прямой путь считается валидным "final_conclusion_type": direct_vector.get("type"), "metadata": {} # Пока без метаданных о цикле здесь } else: # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Failed - Direct vector {direct_vector_id} not found in map.") # --- END DEBUG PRINT --- return {"status": "Failed", "reason": f"Direct vector {direct_vector_id} not found", "is_valid": False} # --- Построение доказательства по шагам --- steps = [] current_premise = None # Будет содержать ВЕКТОР (словарь) overall_validity = True # Валидность всего доказательства cycle_warning = None visited_nodes_in_proof = {source_concept} # Для обнаружения циклов во время ПОСТРОЕНИЯ # --- DEBUG PRINT --- # print("DEBUG construct_proof: Starting step-by-step construction...") # --- END DEBUG PRINT --- for i, (seg_source, seg_target, vector_id, origin) in enumerate(path): # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Processing segment {i+1}/{len(path)}: {seg_source} -> {seg_target} via {vector_id} (from {origin})") # --- END DEBUG PRINT --- # Защита от отсутствия вектора в карте (на всякий случай) premise2 = vector_map.get(vector_id) if not premise2: # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Failed - Vector {vector_id} for segment {i+1} not found in map.") # --- END DEBUG PRINT --- overall_validity = False return {"status": "Failed", "reason": f"Vector {vector_id} not found during step construction", "is_valid": False} premise2_source = origin # 'input' or 'db' if current_premise is None: current_premise = premise2 source1 = premise2_source # Источник первой посылки - сам этот вектор # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Segment {i+1}: Initial premise set to {current_premise.get('id')}") # --- MORE DEBUG --- # print(f"DEBUG construct_proof: Segment {i+1}: Initial premise set to: ID={current_premise.get('id', 'NO_ID')}, Type={current_premise.get('type', 'NO_TYPE')}, Source={current_premise.get('source')}, Target={current_premise.get('target')}") # --- END MORE DEBUG --- # --- END DEBUG PRINT --- else: premises = [current_premise, premise2] premise_ids = [p.get("id", "?") for p in premises] source1 = "derived" if current_premise.get("derived") else current_premise.get("origin", "input") # Откуда первая посылка? conclusion = None is_step_valid = False rule_name = None # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Segment {i+1}: Trying to apply rules. Premise1='{premises[0].get('id')}' ({source1}), Premise2='{premises[1].get('id')}' ({premise2_source})") # --- MORE DEBUG --- prem1_id = current_premise.get('id', 'NO_ID') prem1_type = current_premise.get('type', 'NO_TYPE') prem2_id = premise2.get('id', 'NO_ID') prem2_type = premise2.get('type', 'NO_TYPE') # print(f"DEBUG construct_proof: Applying rules. Premise1: ID={prem1_id}, Type={prem1_type} | Premise2: ID={prem2_id}, Type={prem2_type}") # --- MORE DEBUG --- # print(f" Premise1 Details: Source={current_premise.get('source')}, Target={current_premise.get('target')}") # print(f" Premise2 Details: Source={premise2.get('source')}, Target={premise2.get('target')}") # --- END MORE DEBUG --- # --- END DEBUG PRINT --- # Применяем подходящее правило for key in self.inference_rules.keys(): temp_conclusion, temp_valid = self.apply_inference_rule(key, premises) if temp_conclusion: conclusion = temp_conclusion is_step_valid = temp_valid rule_name = key # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Segment {i+1}: Applied rule '{rule_name}'. Conclusion='{conclusion.get('id')}', StepValid={is_step_valid}") # --- END DEBUG PRINT --- break # Нашли подходящее правило if conclusion: conclusion["origin"] = "derived" # Помечаем, что вывод получен step_detail = { "id": conclusion["id"], "rule": rule_name, "premises": premise_ids, "conclusion": conclusion, "is_valid": is_step_valid, "premise1_source": source1, "premise2_source": premise2_source, # 'input' or 'db' # --- DEBUG PRINT --- # "debug_premise1": current_premise, # "debug_premise2": premise2 # --- END DEBUG PRINT --- } steps.append(step_detail) current_premise = conclusion # Результат этого шага становится первой посылкой для следующего if not is_step_valid: overall_validity = False # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Segment {i+1}: Step marked invalid, setting overall validity to False.") # --- END DEBUG PRINT --- # Можно прервать, если один шаг невалиден? Или достроить? Пока достраиваем. # Проверка на цикл в построении target_node = conclusion.get("target") if target_node in visited_nodes_in_proof: cycle_warning = f"Cycle detected during proof construction: revisiting node '{target_node}'" # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Segment {i+1}: {cycle_warning}") # --- END DEBUG PRINT --- else: visited_nodes_in_proof.add(target_node) else: # Не смогли применить правило - доказательство невалидно overall_validity = False # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Failed - No applicable rule found for premises {premise_ids} in segment {i+1}.") # --- END DEBUG PRINT --- return {"status": "Failed", "reason": f"No inference rule applicable for premises {premise_ids}", "is_valid": False} # --- Финальное формирование результата --- final_conclusion = current_premise # Последний вывод - это и есть результат # Проверка, что финальный вывод соответствует запросу if not final_conclusion or \ final_conclusion.get("source") != source_concept or \ final_conclusion.get("target") != target_concept: # --- DEBUG PRINT --- final_src = final_conclusion.get('source') if final_conclusion else 'None' final_tgt = final_conclusion.get('target') if final_conclusion else 'None' # print(f"DEBUG construct_proof: Failed - Final conclusion mismatch. Expected={source_concept}->{target_concept}, Got={final_src}->{final_tgt}") # --- END DEBUG PRINT --- overall_validity = False # Статус все еще может быть Success, но is_valid = False? Или статус Failed? # Сделаем статус Failed, если вывод не совпал. return { "status": "Failed", "reason": f"Final conclusion mismatch: expected {source_concept}->{target_concept}, got {final_conclusion.get('source') if final_conclusion else 'N/A'}->{final_conclusion.get('target') if final_conclusion else 'N/A'}", "is_valid": False, "source": source_concept, "target": target_concept, "steps": steps, "metadata": {"cycle_warning": cycle_warning} if cycle_warning else {} } final_result = { "status": "Success", # Если дошли сюда, структура доказательства построена "source": source_concept, "target": target_concept, "steps": steps, "is_valid": overall_validity, # Валидность зависит от валидности всех шагов "final_conclusion_type": final_conclusion.get("type"), "metadata": {"cycle_warning": cycle_warning} if cycle_warning else {} } # --- DEBUG PRINT --- # print(f"DEBUG construct_proof: Finished successfully.") # print(f"DEBUG construct_proof: Final Result Status: {final_result['status']}") # print(f"DEBUG construct_proof: Final Result IsValid: {final_result['is_valid']}") # print(f"DEBUG construct_proof: Final Result Steps Count: {len(final_result['steps'])}") # if final_result['steps']: # for idx, step in enumerate(final_result['steps']): # print(f" Step {idx+1} ({step['id']}): Rule='{step['rule']}', Premises={step['premises']}, Valid={step['is_valid']}, Conc={step['conclusion']['source']}->{step['conclusion']['target']}") # print(f"DEBUG construct_proof: Final Conclusion Type: {final_result['final_conclusion_type']}") # print(f"DEBUG construct_proof: Metadata: {final_result['metadata']}") # --- END DEBUG PRINT --- return final_result def _build_proof_graph(self, vectors): """Вспомогательная функция для построения графа из векторов""" graph = {"nodes": set(), "edges": [], "adjacency": {}} for v in vectors: source, target, v_id = v["source"], v["target"], v["id"] graph["nodes"].add(source) graph["nodes"].add(target) graph["edges"].append((source, target, v_id)) # Обновление списка смежности if source not in graph["adjacency"]: graph["adjacency"][source] = {"out": [], "in": []} if target not in graph["adjacency"]: graph["adjacency"][target] = {"out": [], "in": []} graph["adjacency"][source]["out"].append((target, v_id)) graph["adjacency"][target]["in"].append((source, v_id)) return graph if __name__ == "__main__": print(f"SFOSR Integrated System v{SFOSR_CONFIG['version']}") print("Готов к анализу смысловых структур.") # Пример входных данных (используем концепты из БД для демонстрации) example = { "text": "Emergence leads to multi-level space, which causes cross-level causation, finally leading to regulatory flow.", "vectors": [ { "id": "V_EC_MLS", "source": "emergent_complexity", "target": "multi_level_space", "type": "Implication", "axis": "structure <-> hierarchy", "justification": "Emergence creates levels" }, { "id": "V_MLS_CLC", "source": "multi_level_space", "target": "cross_level_causation", "type": "Causality", # Тип Causality "axis": "level <-> interaction", "justification": "Levels imply cross-level effects" }, { "id": "V_CLC_RF", "source": "cross_level_causation", "target": "regulatory_flow", "type": "Causality", # Тип Causality "axis": "cause <-> regulation", "justification": "Cross-level effects drive regulation" } ], "proof_query": { "source": "multi_level_space", "target": "regulatory_flow" } } # Создаем и тестируем интегрированную систему system = SFOSRSystem() result = system.process(example) # Выводим результаты (обновлено для бинарной валидности) print("\n--- Результаты Обработки ---") print(f"Статус: {result['status']}") if result['status'] == 'Success': print(f"Компилируемость: {result['analysis']['is_compilable']}") print("\n--- Верификация ---") print(f"Всего обработано векторов: {result['verification']['total_vectors_processed']}") print(f"Валидных векторов: {result['verification']['valid_count']}") print(f"Уровень соответствия: {result['verification']['compliance_rate'] * 100:.1f}%") print("Данные по Векторам:") for v_id, data in result['verification']['vectors_data'].items(): valid_str = "Валиден" if data['is_valid'] else "Не валиден" issues_str = ', '.join(data['issues']) if data['issues'] else 'Нет' print(f" - {v_id}: Статус={valid_str}, Проблемы: {issues_str}") if "proof" in result: print("\n--- Доказательство ---") proof = result['proof'] print(f"Статус: {proof['status']}") valid_proof_str = "Валидно" if proof.get('is_valid', False) else "Не валидно" print(f"Общая Валидность: {valid_proof_str}") if proof['status'] == 'Success': print(f"Доказательство: {proof['source']} → {proof['target']}") print(f"Тип финального вывода: {proof.get('final_conclusion_type', 'N/A')}") print("Шаги доказательства:") if proof['steps']: for step in proof['steps']: step_valid_str = "Валиден" if step['is_valid'] else "Не валиден" print(f" - {step['id']}: Правило={step['rule']}, Посылки=({', '.join(step['premises'])}) | Статус={step_valid_str}") if step['conclusion']: print(f" Вывод: {step['conclusion']['source']} → {step['conclusion']['target']} ({step['conclusion']['type']})") else: print(f" Вывод: None (Ошибка: {step.get('reason', '')})") else: print(" (Нет шагов)") elif 'reason' in proof: print(f"Причина неудачи: {proof['reason']}") elif 'details' in result and 'validation_issues' in result['details']: print("\n--- Ошибки Анализа ---") for issue in result['details']['validation_issues']: print(f" - {issue}")