SFOSR / sfosr_core /sfosr_database.py
DanielSwift's picture
Initial SFOSR system with Gradio interface
2249e80
raw
history blame
18.6 kB
"""
SFOSR Database Module
Обеспечивает взаимодействие с базой данных SFOSR.
Предоставляет методы для извлечения аксиом, правил вывода,
концептов и их свойств, необходимых для работы системы.
"""
import sqlite3
import json
from typing import Dict, List, Any, Optional, Tuple, Union, Set
class SFOSRDatabase:
"""
Класс для работы с базой данных SFOSR
Предоставляет интерфейс для:
- Получения аксиом и правил вывода
- Извлечения информации о концептах
- Получения векторных связей между концептами
- Добавления новых данных в базу знаний
"""
def __init__(self, db_path="sfosr.db"):
"""Инициализация соединения с БД"""
self.db_path = db_path
self.connection = None
def connect(self):
"""Подключение к БД"""
# Возвращаем новое соединение каждый раз
# row_factory установим здесь же
connection = sqlite3.connect(self.db_path)
connection.row_factory = sqlite3.Row
return connection
# Добавляем контекстный менеджер
def __enter__(self):
self.connection = self.connect()
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection:
self.connection.close()
self.connection = None # Сбрасываем соединение
def get_axioms(self) -> List[Dict]:
"""Получение всех аксиом из БД"""
with self as conn: # Используем with
cursor = conn.cursor()
cursor.execute("SELECT * FROM axioms")
axioms = [dict(row) for row in cursor.fetchall()]
return axioms
def get_inference_rules(self) -> List[Dict]:
"""Получение всех правил вывода из БД"""
with self as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, name, description, pattern, premise_types, conclusion_types, domain FROM inference_rules")
rules = [dict(row) for row in cursor.fetchall()]
return rules
def get_concept_by_name(self, name: str) -> Optional[Dict]:
"""Поиск концепта по имени"""
with self as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, name, description, domain, level FROM concepts WHERE name=?", (name,))
concept = cursor.fetchone()
return dict(concept) if concept else None
def get_all_concepts(self) -> List[Dict]:
"""Получение всех концептов из БД"""
with self as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, name, description, domain, level FROM concepts")
concepts = [dict(row) for row in cursor.fetchall()]
return concepts
def get_all_concept_names(self) -> Set[str]:
"""Получение имен всех концептов из БД"""
with self as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM concepts")
names = {row['name'] for row in cursor.fetchall()}
return names
def get_vectors_for_concept(self, concept_id: int) -> List[Dict]:
"""
Получение всех векторов, связанных с концептом
Args:
concept_id: ID концепта
Returns:
Список векторов с именами источника и цели
"""
with self as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT v.id, v.source_id, v.target_id, v.vector_type, v.axis, v.justification,
c1.name as source_name, c2.name as target_name
FROM vectors v
JOIN concepts c1 ON v.source_id = c1.id
JOIN concepts c2 ON v.target_id = c2.id
WHERE v.source_id=? OR v.target_id=?
""", (concept_id, concept_id))
vectors = [dict(row) for row in cursor.fetchall()]
return vectors
def get_concept_properties(self, concept_id: int) -> Dict[str, Any]:
"""
Получение всех свойств концепта
Args:
concept_id: ID концепта
Returns:
Словарь свойств в формате {имя_свойства: значение}
"""
with self as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT property_name, property_value
FROM concept_properties
WHERE concept_id=?
""", (concept_id,))
properties = {}
for row in cursor.fetchall():
prop_name = row['property_name']
prop_value = row['property_value']
try:
if isinstance(prop_value, str) and (prop_value.startswith('[') or prop_value.startswith('{')):
prop_value = json.loads(prop_value)
except (json.JSONDecodeError, TypeError):
pass
properties[prop_name] = prop_value
return properties
def get_complete_concept_info(self, concept_name: str) -> Optional[Dict]:
"""
Получение полной информации о концепте
Args:
concept_name: Имя концепта
Returns:
Словарь с информацией о концепте, его свойствах и связях
"""
concept = self.get_concept_by_name(concept_name)
if not concept:
return None
concept_id = concept['id']
properties = self.get_concept_properties(concept_id)
vectors = self.get_vectors_for_concept(concept_id)
return {
"concept": concept,
"properties": properties,
"vectors": vectors
}
def get_related_concepts(self, concept_id: int, depth: int = 1) -> List[Dict]:
"""
Получение связанных концептов с заданной глубиной
Args:
concept_id: ID исходного концепта
depth: Глубина поиска связей (1 = только прямые связи)
Returns:
Список связанных концептов
"""
if depth <= 0:
return []
# Получаем прямые связи
vectors = self.get_vectors_for_concept(concept_id)
related_ids = set()
for vector in vectors:
if vector['source_id'] != concept_id:
related_ids.add(vector['source_id'])
if vector['target_id'] != concept_id:
related_ids.add(vector['target_id'])
# Рекурсивно получаем связи с заданной глубиной
all_related = []
for related_id in related_ids:
with self as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM concepts WHERE id=?", (related_id,))
concept = cursor.fetchone()
if concept:
related_concept = dict(concept)
all_related.append(related_concept)
# Если нужна большая глубина, рекурсивно получаем связанные концепты
if depth > 1:
deeper_related = self.get_related_concepts(related_id, depth - 1)
all_related.extend(deeper_related)
return all_related
def find_path_between_concepts(self, source_name: str, target_name: str, max_depth: int = 3) -> List[Dict]:
"""
Поиск пути между двумя концептами
Args:
source_name: Имя исходного концепта
target_name: Имя целевого концепта
max_depth: Максимальная глубина поиска
Returns:
Список векторов, образующих путь между концептами
"""
source = self.get_concept_by_name(source_name)
target = self.get_concept_by_name(target_name)
if not source or not target:
return []
# Поиск в ширину для нахождения пути
visited_concepts = {source['id']} # Храним ID концептов, чтобы не зацикливаться
queue = [(source['id'], [])] # (id_концепта, path_из_векторов_до_него)
# Ограничиваем глубину поиска
current_depth = 0
nodes_at_current_depth = 1
nodes_at_next_depth = 0
while queue and current_depth < max_depth:
if nodes_at_current_depth == 0:
current_depth += 1
nodes_at_current_depth = nodes_at_next_depth
nodes_at_next_depth = 0
if current_depth >= max_depth: # Проверка после инкремента глубины
break
current_id, path = queue.pop(0)
nodes_at_current_depth -= 1
# Проверка, не достигли ли мы цели
if current_id == target['id']:
return path # Возвращаем список векторов
# Получаем связанные векторы для текущего концепта
# Используем get_vectors_for_concept, так как он возвращает нужные данные
connected_vectors = self.get_vectors_for_concept(current_id)
for vector in connected_vectors:
next_id = None
# Определяем следующий концепт в пути
if vector['source_id'] == current_id and vector['target_id'] not in visited_concepts:
next_id = vector['target_id']
elif vector['target_id'] == current_id and vector['source_id'] not in visited_concepts:
next_id = vector['source_id']
if next_id:
visited_concepts.add(next_id)
# Добавляем сам вектор (как словарь) в путь
new_path = path + [vector]
queue.append((next_id, new_path))
nodes_at_next_depth += 1
return [] # Путь не найден в пределах max_depth
def convert_db_vector_to_system_format(self, db_vector: Dict) -> Dict:
"""
Преобразование вектора из формата БД в формат системы SFOSR
Args:
db_vector: Вектор в формате БД
Returns:
Вектор в формате системы SFOSR
"""
return {
"id": f"V{db_vector['id']}",
"source": db_vector['source_name'],
"target": db_vector['target_name'],
"type": db_vector['vector_type'],
"axis": db_vector['axis'],
"justification": db_vector['justification']
}
# Методы для обновления БД
def add_concept(self, name: str, description: str, domain: str, level: str) -> int:
"""
Добавление нового концепта в БД
Args:
name: Имя концепта
description: Описание концепта
domain: Домен (область знаний)
level: Уровень абстракции
Returns:
ID добавленного концепта
"""
with self as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO concepts (name, description, domain, level)
VALUES (?, ?, ?, ?)
""", (name, description, domain, level))
new_id = cursor.lastrowid
conn.commit()
return new_id
def add_concept_property(self, concept_id: int, property_name: str, property_value: Union[str, List, Dict]) -> int:
"""
Добавление свойства концепта
Args:
concept_id: ID концепта
property_name: Имя свойства
property_value: Значение свойства (строка или JSON)
Returns:
ID добавленного свойства
"""
# Если значение не строка, преобразуем в JSON
if not isinstance(property_value, str):
property_value = json.dumps(property_value)
with self as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO concept_properties (concept_id, property_name, property_value)
VALUES (?, ?, ?)
""", (concept_id, property_name, property_value))
new_id = cursor.lastrowid
conn.commit()
return new_id
def add_vector(self, source_id: int, target_id: int, vector_type: str,
axis: str, justification: Optional[str] = None) -> int:
"""
Добавление нового вектора (связи между концептами)
Args:
source_id: ID исходного концепта
target_id: ID целевого концепта
vector_type: Тип вектора
axis: Семантическая ось
justification: Обоснование связи
Returns:
ID добавленного вектора
"""
with self as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO vectors (source_id, target_id, vector_type, axis, justification)
VALUES (?, ?, ?, ?, ?)
""", (source_id, target_id, vector_type, axis, justification))
new_id = cursor.lastrowid
conn.commit()
return new_id
def add_axiom(self, name: str, description: str, formulation: str, domain: str) -> int:
"""
Добавление новой аксиомы
Args:
name: Имя аксиомы
description: Описание аксиомы
formulation: Формальная формулировка
domain: Домен (область применения)
Returns:
ID добавленной аксиомы
"""
with self as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO axioms (name, description, formulation, domain)
VALUES (?, ?, ?, ?)
""", (name, description, formulation, domain))
new_id = cursor.lastrowid
conn.commit()
return new_id
def add_inference_rule(self, name: str, description: str, pattern: str,
premise_types: str, conclusion_types: str, domain: str) -> int:
"""
Добавление нового правила вывода
Args:
name: Имя правила
description: Описание правила
pattern: Паттерн вывода
premise_types: Типы посылок
conclusion_types: Типы выводов
domain: Домен (область применения)
Returns:
ID добавленного правила
"""
with self as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO inference_rules (name, description, pattern, premise_types,
conclusion_types, domain)
VALUES (?, ?, ?, ?, ?, ?)
""", (name, description, pattern, premise_types, conclusion_types, domain))
new_id = cursor.lastrowid
conn.commit()
return new_id
def get_all_vectors(self):
"""Получить все векторы из базы данных"""
query = """
SELECT
v.id,
v.source_id,
v.target_id,
v.vector_type,
v.axis,
v.justification,
s.name as source_name,
t.name as target_name,
v.is_valid
FROM vectors v
JOIN concepts s ON v.source_id = s.id
JOIN concepts t ON v.target_id = t.id
WHERE v.is_valid = 1
"""
with self as conn:
cursor = conn.cursor()
cursor.execute(query)
rows = cursor.fetchall()
vectors = []
for row in rows:
vector = {
"id": f"V{row[0]}", # Добавляем префикс V к ID
"source_name": row[6],
"target_name": row[7],
"type": row[3], # vector_type из БД становится type в объекте
"axis": row[4],
"justification": row[5],
"is_valid": bool(row[8])
}
vectors.append(vector)
return vectors