Spaces:
Running
Running
from time import sleep | |
import httpx | |
from fastapi import FastAPI | |
from fastapi.responses import JSONResponse, FileResponse | |
from pydantic import BaseModel | |
from enum import Enum | |
from transformers import pipeline | |
from phishing_datasets import submit_entry | |
from url_tools import extract_urls, resolve_short_url, extract_domain_from_url | |
from urlscan_client import UrlscanClient | |
import requests | |
import re | |
app = FastAPI() | |
urlscan = UrlscanClient() | |
class MessageModel(BaseModel): | |
text: str | |
class QueryModel(BaseModel): | |
sender: str | |
message: MessageModel | |
class AppModel(BaseModel): | |
version: str | |
class InputModel(BaseModel): | |
_version: int | |
query: QueryModel | |
app: AppModel | |
class ActionModel(Enum): | |
# Insufficient information to determine an action to take. In a query response, has the effect of allowing the message to be shown normally. | |
NONE = 0 | |
# Allow the message to be shown normally. | |
ALLOW = 1 | |
# Prevent the message from being shown normally, filtered as Junk message. | |
JUNK = 2 | |
# Prevent the message from being shown normally, filtered as Promotional message. | |
PROMOTION = 3 | |
# Prevent the message from being shown normally, filtered as Transactional message. | |
TRANSACTION = 4 | |
class SubActionModel(Enum): | |
NONE = 0 | |
class OutputModel(BaseModel): | |
action: ActionModel | |
sub_action: SubActionModel | |
pipe = pipeline(task="text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection") | |
def get_well_known_aasa(): | |
return JSONResponse( | |
content={ | |
"messagefilter": { | |
"apps": [ | |
"X9NN3FSS3T.com.lela.Serenity.SerenityMessageFilterExtension", | |
"X9NN3FSS3T.com.lela.Serenity" | |
] | |
} | |
}, | |
media_type="application/json" | |
) | |
def get_robots_txt(): | |
return FileResponse("robots.txt") | |
def predict(model: InputModel) -> OutputModel: | |
sender = model.query.sender | |
text = model.query.message.text | |
print(f"[{sender}] {text}") | |
# Debug sleep | |
pattern = r"^Sent from your Twilio trial account - sleep (\d+)$" | |
match = re.search(pattern, text) | |
if match: | |
number_str = match.group(1) | |
sleep_duration = int(number_str) | |
sleep(sleep_duration) | |
return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
# Debug category | |
pattern = r"^Sent from your Twilio trial account - (junk|transaction|promotion)$" | |
match = re.search(pattern, text) | |
if match: | |
category_str = match.group(1) | |
match category_str: | |
case 'junk': | |
return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
case 'transaction': | |
return OutputModel(action=ActionModel.TRANSACTION, sub_action=SubActionModel.NONE) | |
case 'promotion': | |
return OutputModel(action=ActionModel.PROMOTION, sub_action=SubActionModel.NONE) | |
result = pipe(text) | |
label = result[0]['label'] | |
score = result[0]['score'] | |
print(f"classification {label} score {score}") | |
if label == 'LABEL_0': | |
score = 1 - score | |
commercial_sender_pattern = r'\b[2-8]\d{4}\b' | |
commercial_stop_pattern = r'\bSTOP(?:\s+SMS)?(?:\s+au)?\s+([2-8]\d{4})\b' | |
commercial_stop = False | |
if re.search(commercial_sender_pattern, sender): | |
print("commercial sender") | |
score = score * 0.9 | |
if re.search(commercial_stop_pattern, text): | |
print("STOP founded") | |
score = score * 0.9 | |
commercial_stop = True | |
else: | |
print("STOP missing") | |
urls = extract_urls(text) | |
if urls: | |
print(f"found URLs: {urls}") | |
print("searching for past scans") | |
search_results = [urlscan.search(f"domain:{extract_domain_from_url(url)}") for url in urls] | |
scan_results = [] | |
for search_result in search_results: | |
results = search_result.get('results', []) | |
for result in results: | |
result_uuid = result.get('_id', str) | |
scan_result = urlscan.get_result(result_uuid) | |
scan_results.append(scan_result) | |
if not scan_results: | |
print("scanning...") | |
scan_results = [urlscan.scan(url) for url in urls] | |
for result in scan_results: | |
overall = result.get('verdicts', {}).get('overall', {}) | |
print(f"overall verdict: {overall}") | |
if overall.get('hasVerdicts'): | |
score = overall.get('score') | |
print(f"verdict score {score}") | |
if 0 < overall.get('score'): | |
score = 1.0 | |
break | |
elif overall.get('score') < 0: | |
score = score * 0.9 | |
else: | |
print(f"no URL found") | |
score = score * 0.9 | |
print(f"final score {score}") | |
action = ActionModel.NONE | |
if score > 0.7: | |
action=ActionModel.JUNK | |
elif score > 0.5: | |
if commercial_stop: | |
action=ActionModel.PROMOTION | |
else: | |
action=ActionModel.JUNK | |
print(f"final action {action}") | |
return OutputModel(action=action, sub_action=SubActionModel.NONE) | |
class ReportModel(BaseModel): | |
sender: str | |
message: str | |
def report(model: ReportModel): | |
submit_entry(model.sender, model.message) |