""" SFOSR Database Module Обеспечивает взаимодействие с базой данных SFOSR. Предоставляет методы для извлечения аксиом, правил вывода, концептов и их свойств, необходимых для работы системы. """ import sqlite3 import json from typing import Dict, List, Any, Optional, Tuple, Union, Set class SFOSRDatabase: """ Класс для работы с базой данных SFOSR Предоставляет интерфейс для: - Получения аксиом и правил вывода - Извлечения информации о концептах - Получения векторных связей между концептами - Добавления новых данных в базу знаний """ def __init__(self, db_path="sfosr.db"): """Инициализация соединения с БД""" self.db_path = db_path self.connection = None def connect(self): """Подключение к БД""" # Возвращаем новое соединение каждый раз # row_factory установим здесь же connection = sqlite3.connect(self.db_path) connection.row_factory = sqlite3.Row return connection # Добавляем контекстный менеджер def __enter__(self): self.connection = self.connect() return self.connection def __exit__(self, exc_type, exc_val, exc_tb): if self.connection: self.connection.close() self.connection = None # Сбрасываем соединение def get_axioms(self) -> List[Dict]: """Получение всех аксиом из БД""" with self as conn: # Используем with cursor = conn.cursor() cursor.execute("SELECT * FROM axioms") axioms = [dict(row) for row in cursor.fetchall()] return axioms def get_inference_rules(self) -> List[Dict]: """Получение всех правил вывода из БД""" with self as conn: cursor = conn.cursor() cursor.execute("SELECT id, name, description, pattern, premise_types, conclusion_types, domain FROM inference_rules") rules = [dict(row) for row in cursor.fetchall()] return rules def get_concept_by_name(self, name: str) -> Optional[Dict]: """Поиск концепта по имени""" with self as conn: cursor = conn.cursor() cursor.execute("SELECT id, name, description, domain, level FROM concepts WHERE name=?", (name,)) concept = cursor.fetchone() return dict(concept) if concept else None def get_all_concepts(self) -> List[Dict]: """Получение всех концептов из БД""" with self as conn: cursor = conn.cursor() cursor.execute("SELECT id, name, description, domain, level FROM concepts") concepts = [dict(row) for row in cursor.fetchall()] return concepts def get_all_concept_names(self) -> Set[str]: """Получение имен всех концептов из БД""" with self as conn: cursor = conn.cursor() cursor.execute("SELECT name FROM concepts") names = {row['name'] for row in cursor.fetchall()} return names def get_vectors_for_concept(self, concept_id: int) -> List[Dict]: """ Получение всех векторов, связанных с концептом Args: concept_id: ID концепта Returns: Список векторов с именами источника и цели """ with self as conn: cursor = conn.cursor() cursor.execute(""" SELECT v.id, v.source_id, v.target_id, v.vector_type, v.axis, v.justification, c1.name as source_name, c2.name as target_name FROM vectors v JOIN concepts c1 ON v.source_id = c1.id JOIN concepts c2 ON v.target_id = c2.id WHERE v.source_id=? OR v.target_id=? """, (concept_id, concept_id)) vectors = [dict(row) for row in cursor.fetchall()] return vectors def get_concept_properties(self, concept_id: int) -> Dict[str, Any]: """ Получение всех свойств концепта Args: concept_id: ID концепта Returns: Словарь свойств в формате {имя_свойства: значение} """ with self as conn: cursor = conn.cursor() cursor.execute(""" SELECT property_name, property_value FROM concept_properties WHERE concept_id=? """, (concept_id,)) properties = {} for row in cursor.fetchall(): prop_name = row['property_name'] prop_value = row['property_value'] try: if isinstance(prop_value, str) and (prop_value.startswith('[') or prop_value.startswith('{')): prop_value = json.loads(prop_value) except (json.JSONDecodeError, TypeError): pass properties[prop_name] = prop_value return properties def get_complete_concept_info(self, concept_name: str) -> Optional[Dict]: """ Получение полной информации о концепте Args: concept_name: Имя концепта Returns: Словарь с информацией о концепте, его свойствах и связях """ concept = self.get_concept_by_name(concept_name) if not concept: return None concept_id = concept['id'] properties = self.get_concept_properties(concept_id) vectors = self.get_vectors_for_concept(concept_id) return { "concept": concept, "properties": properties, "vectors": vectors } def get_related_concepts(self, concept_id: int, depth: int = 1) -> List[Dict]: """ Получение связанных концептов с заданной глубиной Args: concept_id: ID исходного концепта depth: Глубина поиска связей (1 = только прямые связи) Returns: Список связанных концептов """ if depth <= 0: return [] # Получаем прямые связи vectors = self.get_vectors_for_concept(concept_id) related_ids = set() for vector in vectors: if vector['source_id'] != concept_id: related_ids.add(vector['source_id']) if vector['target_id'] != concept_id: related_ids.add(vector['target_id']) # Рекурсивно получаем связи с заданной глубиной all_related = [] for related_id in related_ids: with self as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM concepts WHERE id=?", (related_id,)) concept = cursor.fetchone() if concept: related_concept = dict(concept) all_related.append(related_concept) # Если нужна большая глубина, рекурсивно получаем связанные концепты if depth > 1: deeper_related = self.get_related_concepts(related_id, depth - 1) all_related.extend(deeper_related) return all_related def find_path_between_concepts(self, source_name: str, target_name: str, max_depth: int = 3) -> List[Dict]: """ Поиск пути между двумя концептами Args: source_name: Имя исходного концепта target_name: Имя целевого концепта max_depth: Максимальная глубина поиска Returns: Список векторов, образующих путь между концептами """ source = self.get_concept_by_name(source_name) target = self.get_concept_by_name(target_name) if not source or not target: return [] # Поиск в ширину для нахождения пути visited_concepts = {source['id']} # Храним ID концептов, чтобы не зацикливаться queue = [(source['id'], [])] # (id_концепта, path_из_векторов_до_него) # Ограничиваем глубину поиска current_depth = 0 nodes_at_current_depth = 1 nodes_at_next_depth = 0 while queue and current_depth < max_depth: if nodes_at_current_depth == 0: current_depth += 1 nodes_at_current_depth = nodes_at_next_depth nodes_at_next_depth = 0 if current_depth >= max_depth: # Проверка после инкремента глубины break current_id, path = queue.pop(0) nodes_at_current_depth -= 1 # Проверка, не достигли ли мы цели if current_id == target['id']: return path # Возвращаем список векторов # Получаем связанные векторы для текущего концепта # Используем get_vectors_for_concept, так как он возвращает нужные данные connected_vectors = self.get_vectors_for_concept(current_id) for vector in connected_vectors: next_id = None # Определяем следующий концепт в пути if vector['source_id'] == current_id and vector['target_id'] not in visited_concepts: next_id = vector['target_id'] elif vector['target_id'] == current_id and vector['source_id'] not in visited_concepts: next_id = vector['source_id'] if next_id: visited_concepts.add(next_id) # Добавляем сам вектор (как словарь) в путь new_path = path + [vector] queue.append((next_id, new_path)) nodes_at_next_depth += 1 return [] # Путь не найден в пределах max_depth def convert_db_vector_to_system_format(self, db_vector: Dict) -> Dict: """ Преобразование вектора из формата БД в формат системы SFOSR Args: db_vector: Вектор в формате БД Returns: Вектор в формате системы SFOSR """ return { "id": f"V{db_vector['id']}", "source": db_vector['source_name'], "target": db_vector['target_name'], "type": db_vector['vector_type'], "axis": db_vector['axis'], "justification": db_vector['justification'] } # Методы для обновления БД def add_concept(self, name: str, description: str, domain: str, level: str) -> int: """ Добавление нового концепта в БД Args: name: Имя концепта description: Описание концепта domain: Домен (область знаний) level: Уровень абстракции Returns: ID добавленного концепта """ with self as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO concepts (name, description, domain, level) VALUES (?, ?, ?, ?) """, (name, description, domain, level)) new_id = cursor.lastrowid conn.commit() return new_id def add_concept_property(self, concept_id: int, property_name: str, property_value: Union[str, List, Dict]) -> int: """ Добавление свойства концепта Args: concept_id: ID концепта property_name: Имя свойства property_value: Значение свойства (строка или JSON) Returns: ID добавленного свойства """ # Если значение не строка, преобразуем в JSON if not isinstance(property_value, str): property_value = json.dumps(property_value) with self as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO concept_properties (concept_id, property_name, property_value) VALUES (?, ?, ?) """, (concept_id, property_name, property_value)) new_id = cursor.lastrowid conn.commit() return new_id def add_vector(self, source_id: int, target_id: int, vector_type: str, axis: str, justification: Optional[str] = None) -> int: """ Добавление нового вектора (связи между концептами) Args: source_id: ID исходного концепта target_id: ID целевого концепта vector_type: Тип вектора axis: Семантическая ось justification: Обоснование связи Returns: ID добавленного вектора """ with self as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO vectors (source_id, target_id, vector_type, axis, justification) VALUES (?, ?, ?, ?, ?) """, (source_id, target_id, vector_type, axis, justification)) new_id = cursor.lastrowid conn.commit() return new_id def add_axiom(self, name: str, description: str, formulation: str, domain: str) -> int: """ Добавление новой аксиомы Args: name: Имя аксиомы description: Описание аксиомы formulation: Формальная формулировка domain: Домен (область применения) Returns: ID добавленной аксиомы """ with self as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO axioms (name, description, formulation, domain) VALUES (?, ?, ?, ?) """, (name, description, formulation, domain)) new_id = cursor.lastrowid conn.commit() return new_id def add_inference_rule(self, name: str, description: str, pattern: str, premise_types: str, conclusion_types: str, domain: str) -> int: """ Добавление нового правила вывода Args: name: Имя правила description: Описание правила pattern: Паттерн вывода premise_types: Типы посылок conclusion_types: Типы выводов domain: Домен (область применения) Returns: ID добавленного правила """ with self as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO inference_rules (name, description, pattern, premise_types, conclusion_types, domain) VALUES (?, ?, ?, ?, ?, ?) """, (name, description, pattern, premise_types, conclusion_types, domain)) new_id = cursor.lastrowid conn.commit() return new_id def get_all_vectors(self): """Получить все векторы из базы данных""" query = """ SELECT v.id, v.source_id, v.target_id, v.vector_type, v.axis, v.justification, s.name as source_name, t.name as target_name, v.is_valid FROM vectors v JOIN concepts s ON v.source_id = s.id JOIN concepts t ON v.target_id = t.id WHERE v.is_valid = 1 """ with self as conn: cursor = conn.cursor() cursor.execute(query) rows = cursor.fetchall() vectors = [] for row in rows: vector = { "id": f"V{row[0]}", # Добавляем префикс V к ID "source_name": row[6], "target_name": row[7], "type": row[3], # vector_type из БД становится type в объекте "axis": row[4], "justification": row[5], "is_valid": bool(row[8]) } vectors.append(vector) return vectors