Spaces:
Running
Running
# Utilities to build a RAG system to query information from the | |
# gwIAS search pipeline using Langchain | |
# Thanks to Pablo Villanueva Domingo for sharing his CAMELS template | |
# https://huggingface.co/spaces/PabloVD/CAMELSDocBot | |
from langchain import hub | |
from langchain_chroma import Chroma | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_community.document_loaders import WebBaseLoader | |
from langchain.schema import Document | |
import requests | |
import json | |
import base64 | |
from bs4 import BeautifulSoup | |
import re | |
def github_to_raw(url): | |
"""Convert GitHub URL to raw content URL""" | |
return url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/") | |
def load_github_notebook(url): | |
"""Load Jupyter notebook from GitHub URL using GitHub API""" | |
try: | |
# Convert GitHub blob URL to API URL | |
if "github.com" in url and "/blob/" in url: | |
# Extract owner, repo, branch and path from URL | |
parts = url.replace("https://github.com/", "").split("/") | |
owner = parts[0] | |
repo = parts[1] | |
branch = parts[3] # usually 'main' or 'master' | |
path = "/".join(parts[4:]) | |
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}" | |
else: | |
raise ValueError("URL must be a GitHub blob URL") | |
# Fetch notebook content | |
response = requests.get(api_url) | |
response.raise_for_status() | |
content_data = response.json() | |
if content_data.get('encoding') == 'base64': | |
notebook_content = base64.b64decode(content_data['content']).decode('utf-8') | |
else: | |
notebook_content = content_data['content'] | |
# Parse notebook JSON | |
notebook = json.loads(notebook_content) | |
docs = [] | |
cell_count = 0 | |
# Process each cell | |
for cell in notebook.get('cells', []): | |
cell_count += 1 | |
cell_type = cell.get('cell_type', 'unknown') | |
source = cell.get('source', []) | |
# Join source lines | |
if isinstance(source, list): | |
content = ''.join(source) | |
else: | |
content = str(source) | |
if content.strip(): # Only add non-empty cells | |
metadata = { | |
'source': url, | |
'cell_type': cell_type, | |
'cell_number': cell_count, | |
'name': f"{url} - Cell {cell_count} ({cell_type})" | |
} | |
# Add cell type prefix for better context | |
formatted_content = f"[{cell_type.upper()} CELL {cell_count}]\n{content}" | |
docs.append(Document(page_content=formatted_content, metadata=metadata)) | |
return docs | |
except Exception as e: | |
print(f"Error loading notebook from {url}: {str(e)}") | |
return [] | |
def clean_text(text): | |
"""Clean text content from a webpage""" | |
# Remove excessive newlines | |
text = re.sub(r'\n{3,}', '\n\n', text) | |
# Remove excessive whitespace | |
text = re.sub(r'\s{2,}', ' ', text) | |
return text.strip() | |
def clean_github_content(html_content): | |
"""Extract meaningful content from GitHub pages""" | |
# Ensure we're working with a BeautifulSoup object | |
if isinstance(html_content, str): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
else: | |
soup = html_content | |
# Remove navigation, footer, and other boilerplate | |
for element in soup.find_all(['nav', 'footer', 'header']): | |
element.decompose() | |
# For README and code files | |
readme_content = soup.find('article', class_='markdown-body') | |
if readme_content: | |
return clean_text(readme_content.get_text()) | |
# For code files | |
code_content = soup.find('table', class_='highlight') | |
if code_content: | |
return clean_text(code_content.get_text()) | |
# For directory listings | |
file_list = soup.find('div', role='grid') | |
if file_list: | |
return clean_text(file_list.get_text()) | |
# Fallback to main content | |
main_content = soup.find('main') | |
if main_content: | |
return clean_text(main_content.get_text()) | |
# If no specific content found, get text from body | |
body = soup.find('body') | |
if body: | |
return clean_text(body.get_text()) | |
# Final fallback | |
return clean_text(soup.get_text()) | |
class GitHubLoader(WebBaseLoader): | |
"""Custom loader for GitHub pages with better content cleaning""" | |
def clean_text(self, text): | |
"""Clean text content""" | |
# Remove excessive newlines and spaces | |
text = re.sub(r'\n{2,}', '\n', text) | |
text = re.sub(r'\s{2,}', ' ', text) | |
# Remove common GitHub boilerplate | |
text = re.sub(r'Skip to content|Sign in|Search or jump to|Footer navigation|Terms|Privacy|Security|Status|Docs', '', text) | |
return text.strip() | |
def _scrape(self, url: str, *args, **kwargs) -> str: | |
"""Scrape data from URL and clean it. | |
Args: | |
url: The URL to scrape | |
*args: Additional positional arguments | |
**kwargs: Additional keyword arguments including bs_kwargs | |
Returns: | |
str: The cleaned content | |
""" | |
response = requests.get(url) | |
response.raise_for_status() | |
# For directory listings (tree URLs), use the API | |
if '/tree/' in url: | |
# Parse URL components | |
parts = url.replace("https://github.com/", "").split("/") | |
owner = parts[0] | |
repo = parts[1] | |
branch = parts[3] # usually 'main' or 'master' | |
path = "/".join(parts[4:]) if len(parts) > 4 else "" | |
# Construct API URL | |
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}" | |
api_response = requests.get(api_url) | |
api_response.raise_for_status() | |
# Parse directory listing | |
contents = api_response.json() | |
if isinstance(contents, list): | |
# Format directory contents | |
files = [f"{item['name']} ({item['type']})" for item in contents] | |
return "Directory contents:\n" + "\n".join(files) | |
else: | |
return f"Error: Unexpected API response for {url}" | |
# For regular files, parse HTML | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# For README and markdown files | |
readme_content = soup.find('article', class_='markdown-body') | |
if readme_content: | |
return self.clean_text(readme_content.get_text()) | |
# For code files | |
code_content = soup.find('table', class_='highlight') | |
if code_content: | |
return self.clean_text(code_content.get_text()) | |
# For other content, get main content | |
main_content = soup.find('main') | |
if main_content: | |
return self.clean_text(main_content.get_text()) | |
# Final fallback | |
return self.clean_text(soup.get_text()) | |
# Load documentation from urls | |
def load_docs(): | |
# Get urls | |
urlsfile = open("urls.txt") | |
urls = urlsfile.readlines() | |
urls = [url.replace("\n","") for url in urls] | |
urlsfile.close() | |
# Load documents from URLs | |
docs = [] | |
for url in urls: | |
url = url.strip() | |
if not url: | |
continue | |
# Check if URL is a Jupyter notebook | |
if url.endswith('.ipynb') and 'github.com' in url and '/blob/' in url: | |
print(f"Loading notebook: {url}") | |
notebook_docs = load_github_notebook(url) | |
docs.extend(notebook_docs) | |
# Handle Python and Markdown files using raw content | |
elif url.endswith(('.py', '.md')) and 'github.com' in url and '/blob/' in url: | |
print(f"Loading raw content: {url}") | |
try: | |
raw_url = github_to_raw(url) | |
loader = WebBaseLoader([raw_url]) | |
web_docs = loader.load() | |
# Preserve original URL in metadata | |
for doc in web_docs: | |
doc.metadata['source'] = url | |
docs.extend(web_docs) | |
except Exception as e: | |
print(f"Error loading {url}: {str(e)}") | |
# Handle directory listings | |
elif '/tree/' in url and 'github.com' in url: | |
print(f"Loading directory: {url}") | |
try: | |
# Parse URL components | |
parts = url.replace("https://github.com/", "").split("/") | |
owner = parts[0] | |
repo = parts[1] | |
branch = parts[3] # usually 'main' or 'master' | |
path = "/".join(parts[4:]) if len(parts) > 4 else "" | |
# Construct API URL | |
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}" | |
response = requests.get(api_url) | |
response.raise_for_status() | |
# Parse directory listing | |
contents = response.json() | |
if isinstance(contents, list): | |
# Format directory contents | |
content = "Directory contents:\n" + "\n".join([f"{item['name']} ({item['type']})" for item in contents]) | |
docs.append(Document(page_content=content, metadata={'source': url})) | |
else: | |
print(f"Error: Unexpected API response for {url}") | |
except Exception as e: | |
print(f"Error loading directory {url}: {str(e)}") | |
else: | |
print(f"Loading web page: {url}") | |
try: | |
loader = GitHubLoader([url]) # Use custom loader | |
web_docs = loader.load() | |
docs.extend(web_docs) | |
except Exception as e: | |
print(f"Error loading {url}: {str(e)}") | |
# Add source URLs as document names for reference | |
for i, doc in enumerate(docs): | |
if 'source' in doc.metadata: | |
doc.metadata['name'] = doc.metadata['source'] | |
else: | |
doc.metadata['name'] = f"Document {i+1}" | |
print(f"Loaded {len(docs)} documents:") | |
for doc in docs: | |
print(f" - {doc.metadata.get('name')}") | |
return docs | |
def extract_reference(url): | |
"""Extract a reference keyword from the GitHub URL""" | |
if "blob/main" in url: | |
return url.split("blob/main/")[-1] | |
elif "tree/main" in url: | |
return url.split("tree/main/")[-1] or "root" | |
return url | |
# Join content pages for processing | |
def format_docs(docs): | |
formatted_docs = [] | |
for doc in docs: | |
source = doc.metadata.get('source', 'Unknown source') | |
reference = f"[{extract_reference(source)}]" | |
content = doc.page_content | |
formatted_docs.append(f"{content}\n\nReference: {reference}") | |
return "\n\n---\n\n".join(formatted_docs) | |
# Create a RAG chain | |
def RAG(llm, docs, embeddings): | |
# Split text | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
splits = text_splitter.split_documents(docs) | |
# Create vector store | |
vectorstore = Chroma.from_documents(documents=splits, embedding=embeddings) | |
# Retrieve and generate using the relevant snippets of the documents | |
retriever = vectorstore.as_retriever() | |
# Prompt basis example for RAG systems | |
prompt = hub.pull("rlm/rag-prompt") | |
# Adding custom instructions to the prompt | |
template = prompt.messages[0].prompt.template | |
template_parts = template.split("\nQuestion: {question}") | |
combined_template = "You are an assistant for question-answering tasks. "\ | |
+ "Use the following pieces of retrieved context to answer the question. "\ | |
+ "If you don't know the answer, just say that you don't know. "\ | |
+ "Try to keep the answer concise if possible. "\ | |
+ "Write the names of the relevant functions from the retrived code and include code snippets to aid the user's understanding. "\ | |
+ "Include the references used in square brackets at the end of your answer."\ | |
+ template_parts[1] | |
prompt.messages[0].prompt.template = combined_template | |
# Create the chain | |
rag_chain = ( | |
{"context": retriever | format_docs, "question": RunnablePassthrough()} | |
| prompt | |
| llm | |
| StrOutputParser() | |
) | |
return rag_chain |