news_verification / src /application /text /search_detection.py
pmkhanh7890's picture
update
9919b54
raw
history blame
13.6 kB
import string
import warnings
from difflib import SequenceMatcher
import nltk
import numpy as np
import pandas as pd
import torch
from sentence_transformers import (
SentenceTransformer,
util,
)
from src.application.text.helper import extract_equal_text
from src.application.text.preprocessing import split_into_paragraphs
from src.application.text.search import (
generate_search_phrases,
search_by_google,
)
from src.application.url_reader import URLReader
warnings.simplefilter(action="ignore", category=FutureWarning)
# Download necessary NLTK data files
nltk.download("punkt", quiet=True)
nltk.download("punkt_tab", quiet=True)
nltk.download("stopwords", quiet=True)
# load the model
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PARAPHASE_MODEL = SentenceTransformer("paraphrase-MiniLM-L6-v2")
PARAPHASE_MODEL.to(DEVICE)
PARAPHRASE_THRESHOLD_HUMAN = 0.963
PARAPHRASE_THRESHOLD_MACHINE = 0.8
PARAPHRASE_THRESHOLD = 0.8
MIN_SAME_SENTENCE_LEN = 6
MIN_PHRASE_SENTENCE_LEN = 10
MIN_RATIO_PARAPHRASE_NUM = 0.5
MAX_CHAR_SIZE = 30000
def detect_text_by_relative_search(
input_text,
index,
is_support_opposite=False,
):
checked_urls = set()
searched_phrases = generate_search_phrases(input_text[index])
for candidate in searched_phrases:
search_results = search_by_google(candidate)
urls = [item["link"] for item in search_results.get("items", [])]
for url in urls[:3]:
if url in checked_urls: # visited url
continue
if "bbc.com" not in url:
continue
checked_urls.add(url)
print(f"\t\tChecking URL: {url}")
content = URLReader(url)
if content.is_extracted is True:
if content.title is None or content.text is None:
print("\t\t\t↑↑↑ Title or text not found")
continue
page_text = content.title + "\n" + content.text
if len(page_text) > MAX_CHAR_SIZE:
print(f"\t\t\t↑↑↑ More than {MAX_CHAR_SIZE} characters")
continue
print(f"\t\t\t↑↑↑ Title: {content.title}")
aligned_first_sentences = check_paraphrase(
input_text[index],
page_text,
url,
)
is_paraphrased = aligned_first_sentences["is_paraphrased"]
if is_paraphrased is False:
return (
is_paraphrased,
url,
aligned_first_sentences,
content.images,
index,
)
sub_paraphrase = True
while sub_paraphrase is True:
index += 1
print(f"----search {index} < {len(input_text)}----")
if index >= len(input_text):
print(f"input_text_last: {input_text[-1]}")
break
print(f"input_text: {input_text[index]}")
sub_sentences = check_paraphrase(
input_text[index],
page_text,
url,
)
sub_paraphrase = sub_sentences["is_paraphrased"]
print(f"sub_paraphrase: {sub_paraphrase}")
print(f"sub_sentences: {sub_sentences}")
if sub_paraphrase is True:
aligned_first_sentences["input"] += (
"<br>" + sub_sentences["input"]
)
aligned_first_sentences["source"] += (
"<br>" + sub_sentences["source"]
)
aligned_first_sentences["similarity"] += sub_sentences[
"similarity"
]
aligned_first_sentences["similarity"] /= 2
print(f"paraphrase: {is_paraphrased}")
print(f"aligned_first_sentences: {aligned_first_sentences}")
return (
is_paraphrased,
url,
aligned_first_sentences,
content.images,
index,
)
return False, None, [], [], index
def find_paragraph_source(text, text_index, sentences_df):
checked_urls = set()
searched_phrases = generate_search_phrases(text[text_index])
print(f"text[text_index]: {text[text_index]}")
print(f"searched_phrases: {searched_phrases}")
for candidate in searched_phrases:
search_results = search_by_google(candidate)
urls = [item["link"] for item in search_results.get("items", [])]
for url in urls[:3]:
if url in checked_urls: # visited url
continue
if "bbc.com" not in url:
continue
checked_urls.add(url)
print(f"\t\tChecking URL: {url}")
content = URLReader(url)
if content.is_extracted is True:
if content.title is None or content.text is None:
print("\t\t\t↑↑↑ Title or text not found")
continue
page_text = content.title + "\n" + content.text
if len(page_text) > MAX_CHAR_SIZE:
print(f"\t\t\t↑↑↑ More than {MAX_CHAR_SIZE} characters")
continue
print(f"\t\t\t↑↑↑ Title: {content.title}")
aligned_sentence = check_paraphrase(
text[text_index],
page_text,
url,
)
if aligned_sentence["paraphrase"] is False:
print(f'sentence_1: {sentences_df.loc[text_index, "input"]}')
print(f'sentence_2: {aligned_sentence["input"]}')
sentences_df.loc[text_index, "input"] = aligned_sentence["input"]
sentences_df.loc[text_index, "paraphrase"] = aligned_sentence["paraphrase"]
return sentences_df, []
# assign values
columns = [
"input",
"source",
"label",
"similarity",
"paraphrase",
"url",
]
for c in columns:
if c in sentences_df.columns:
sentences_df.loc[text_index, c] = aligned_sentence[c]
print(f"sen: {sentences_df}")
for idx, _ in enumerate(sentences_df):
print(f"{idx}")
if idx > len(sentences_df):
break
if sentences_df.loc[idx, "url"] is not None:
continue
# find content in new url
aligned_sentence = check_paraphrase(
text[idx],
page_text,
url,
)
if aligned_sentence["url"] is None:
continue
columns = ["input", "source", "label", "similarity", "url"]
for c in columns:
if c in sentences_df.columns:
sentences_df.loc[text_index, c] = aligned_sentence[c]
return sentences_df, content.images
return sentences_df, []
def longest_common_subsequence(arr1, arr2):
"""
Finds the length of the longest common subsequence (contiguous) between
two arrays.
Args:
arr1: The first array.
arr2: The second array.
Returns:
The length of the longest common subsequence.
Returns 0 if either input is invalid.
"""
if not isinstance(arr1, list) or not isinstance(arr2, list):
return 0
n = len(arr1)
m = len(arr2)
if n == 0 or m == 0: # handle empty list
return 0
# Create table dp with size (n+1) x (m+1)
dp = [[0] * (m + 1) for _ in range(n + 1)]
max_length = 0
for i in range(1, n + 1):
for j in range(1, m + 1):
if arr1[i - 1] == arr2[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
max_length = max(max_length, dp[i][j])
else:
dp[i][j] = 0 # set 0 since the array must be consecutive
return max_length
def check_sentence(
input_sentence,
source_sentence,
min_same_sentence_len,
min_phrase_sentence_len,
verbose=False,
):
"""
Checks if two sentences are similar based on exact match or
longest common subsequence.
Args:
input_sentence: The input sentence.
source_sentence: The source sentence.
min_same_sentence_len: Minimum length for exact sentence match.
min_phrase_sentence_len: Minimum length for common subsequence match.
verbose: If True, print debug information.
Returns:
True if the sentences are considered similar, False otherwise.
Returns False if input is not valid.
"""
if not isinstance(input_sentence, str) or not isinstance(
source_sentence,
str,
):
return False
input_sentence = input_sentence.strip()
source_sentence = source_sentence.strip()
if not input_sentence or not source_sentence: # handle empty string
return False
input_words = input_sentence.split() # split without arguments
source_words = source_sentence.split() # split without arguments
if (
input_sentence == source_sentence
and len(input_words) >= min_same_sentence_len
):
if verbose:
print("Exact match found.")
return True
max_overlap_len = longest_common_subsequence(input_words, source_words)
if verbose:
print(f"Max overlap length: {max_overlap_len}") # print overlap length
if max_overlap_len >= min_phrase_sentence_len:
return True
return False
def check_paraphrase(input_text, page_text, url):
"""
Checks if the input text is paraphrased in the content at the given URL.
Args:
input_text: The text to check for paraphrase.
page_text: The text of the web page to compare with.
url
Returns:
A tuple containing:
"""
# Extract sentences from input text and web page
input_paragraphs = [input_text]
if not page_text:
return {}
page_paragraphs = split_into_paragraphs(page_text)
if not input_paragraphs or not page_paragraphs:
return {}
additional_sentences = []
for sentence in page_paragraphs:
if ", external" in sentence:
additional_sentences.append(sentence.replace(", external", ""))
page_paragraphs.extend(additional_sentences)
# Encode sentences into embeddings
embeddings1 = PARAPHASE_MODEL.encode(
input_paragraphs,
convert_to_tensor=True,
device=DEVICE,
)
embeddings2 = PARAPHASE_MODEL.encode(
page_paragraphs,
convert_to_tensor=True,
device=DEVICE,
)
# Compute cosine similarity matrix
similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy()
# Find sentence alignments
alignment = {}
for i, paragraph in enumerate(input_paragraphs):
max_sim_index = np.argmax(similarity_matrix[i])
max_similarity = similarity_matrix[i][max_sim_index]
label, is_paraphrased = determine_label(max_similarity)
print(f"is_paraphrased: {is_paraphrased}")
if is_paraphrased is False:
url = None
best_matched_paragraph = None
else:
best_matched_paragraph = page_paragraphs[max_sim_index]
alignment = {
"input": paragraph,
"source": best_matched_paragraph,
"similarity": max_similarity,
"label": label,
"paraphrase": is_paraphrased,
"url": url,
}
return alignment
def similarity_ratio(a, b):
"""
Calculates the similarity ratio between two strings using SequenceMatcher.
Args:
a: The first string.
b: The second string.
Returns:
A float representing the similarity ratio between 0.0 and 1.0.
Returns 0.0 if either input is None or not a string.
"""
if (
not isinstance(a, str)
or not isinstance(b, str)
or a is None
or b is None
):
return 0.0 # Handle cases where inputs are not strings or None
return SequenceMatcher(None, a, b).ratio()
def check_human(alligned_sentences):
"""
Checks if a sufficient number of input sentences are found within
source sentences.
Returns:
bool: True if the condition is met, False otherwise.
"""
if not alligned_sentences: # Handle empty data case
return False
if alligned_sentences["similarity"] >= 0.99:
return True
return False
def determine_label(similarity):
if similarity >= PARAPHRASE_THRESHOLD_HUMAN:
return "HUMAN", True
elif similarity >= PARAPHRASE_THRESHOLD_MACHINE:
return "MACHINE", True
else:
return "", False
if __name__ == "__main__":
pass