Pujan-Dev's picture
feat: updated detector using Ela fft and meta
0b8f50d
import asyncio
from io import BytesIO
from fastapi import HTTPException, UploadFile, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
from features.nepali_text_classifier.inferencer import classify_text
from features.nepali_text_classifier.preprocess import *
import re
security = HTTPBearer()
def contains_english(text: str) -> bool:
# Remove escape characters
cleaned = text.replace("\n", "").replace("\t", "")
return bool(re.search(r'[a-zA-Z]', cleaned))
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
expected_token = os.getenv("MY_SECRET_TOKEN")
if token != expected_token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid or expired token"
)
return token
async def nepali_text_analysis(text: str):
end_symbol_for_NP_text(text)
words = text.split()
if len(words) < 10:
raise HTTPException(status_code=400, detail="Text must contain at least 10 words")
if len(text) > 10000:
raise HTTPException(status_code=413, detail="Text must be less than 10,000 characters")
result = await asyncio.to_thread(classify_text, text)
return result
#Extract text form uploaded files(.docx,.pdf,.txt)
async def extract_file_contents(file:UploadFile)-> str:
content = await file.read()
file_stream = BytesIO(content)
if file.content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
return parse_docx(file_stream)
elif file.content_type =="application/pdf":
return parse_pdf(file_stream)
elif file.content_type =="text/plain":
return parse_txt(file_stream)
else:
raise HTTPException(status_code=415,detail="Invalid file type. Only .docx,.pdf and .txt are allowed")
async def handle_file_upload(file: UploadFile):
try:
file_contents = await extract_file_contents(file)
end_symbol_for_NP_text(file_contents)
if len(file_contents) > 10000:
raise HTTPException(status_code=413, detail="Text must be less than 10,000 characters")
cleaned_text = file_contents.replace("\n", " ").replace("\t", " ").strip()
if not cleaned_text:
raise HTTPException(status_code=404, detail="The file is empty or only contains whitespace.")
result = await asyncio.to_thread(classify_text, cleaned_text)
return result
except Exception as e:
logging.error(f"Error processing file: {e}")
raise HTTPException(status_code=500, detail="Error processing the file")
async def handle_sentence_level_analysis(text: str):
text = text.strip()
if len(text) > 10000:
raise HTTPException(status_code=413, detail="Text must be less than 10,000 characters")
end_symbol_for_NP_text(text)
# Split text into sentences
sentences = [s.strip() + "।" for s in text.split("।") if s.strip()]
results = []
for sentence in sentences:
end_symbol_for_NP_text(sentence)
result = await asyncio.to_thread(classify_text, sentence)
results.append({
"text": sentence,
"result": result["label"],
"likelihood": result["confidence"]
})
return {"analysis": results}
async def handle_file_sentence(file:UploadFile):
try:
file_contents = await extract_file_contents(file)
if len(file_contents) > 10000:
raise HTTPException(status_code=413, detail="Text must be less than 10,000 characters")
cleaned_text = file_contents.replace("\n", " ").replace("\t", " ").strip()
if not cleaned_text:
raise HTTPException(status_code=404, detail="The file is empty or only contains whitespace.")
# Ensure text ends with danda so last sentence is included
# Split text into sentences
sentences = [s.strip() + "।" for s in cleaned_text.split("।") if s.strip()]
results = []
for sentence in sentences:
end_symbol_for_NP_text(sentence)
result = await asyncio.to_thread(classify_text, sentence)
results.append({
"text": sentence,
"result": result["label"],
"likelihood": result["confidence"]
})
return {"analysis": results}
except Exception as e:
logging.error(f"Error processing file: {e}")
raise HTTPException(status_code=500, detail="Error processing the file")
def classify(text: str):
return classify_text(text)