from time import sleep import httpx from fastapi import FastAPI from fastapi.responses import JSONResponse, FileResponse from pydantic import BaseModel from enum import Enum from transformers import pipeline from phishing_datasets import submit_entry from url_tools import extract_urls, resolve_short_url, extract_domain_from_url from urlscan_client import UrlscanClient import requests import re app = FastAPI() urlscan = UrlscanClient() class MessageModel(BaseModel): text: str class QueryModel(BaseModel): sender: str message: MessageModel class AppModel(BaseModel): version: str class InputModel(BaseModel): _version: int query: QueryModel app: AppModel class ActionModel(Enum): # Insufficient information to determine an action to take. In a query response, has the effect of allowing the message to be shown normally. NONE = 0 # Allow the message to be shown normally. ALLOW = 1 # Prevent the message from being shown normally, filtered as Junk message. JUNK = 2 # Prevent the message from being shown normally, filtered as Promotional message. PROMOTION = 3 # Prevent the message from being shown normally, filtered as Transactional message. TRANSACTION = 4 class SubActionModel(Enum): NONE = 0 class OutputModel(BaseModel): action: ActionModel sub_action: SubActionModel pipe = pipeline(task="text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection") @app.get("/.well-known/apple-app-site-association", include_in_schema=False) def get_well_known_aasa(): return JSONResponse( content={ "messagefilter": { "apps": [ "X9NN3FSS3T.com.lela.Serenity.SerenityMessageFilterExtension", "X9NN3FSS3T.com.lela.Serenity" ] } }, media_type="application/json" ) @app.get("/robots.txt", include_in_schema=False) def get_robots_txt(): return FileResponse("robots.txt") @app.post("/predict") def predict(model: InputModel) -> OutputModel: sender = model.query.sender text = model.query.message.text print(f"[{sender}] {text}") # Debug sleep pattern = r"^Sent from your Twilio trial account - sleep (\d+)$" match = re.search(pattern, text) if match: number_str = match.group(1) sleep_duration = int(number_str) sleep(sleep_duration) return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) # Debug category pattern = r"^Sent from your Twilio trial account - (junk|transaction|promotion)$" match = re.search(pattern, text) if match: category_str = match.group(1) match category_str: case 'junk': return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) case 'transaction': return OutputModel(action=ActionModel.TRANSACTION, sub_action=SubActionModel.NONE) case 'promotion': return OutputModel(action=ActionModel.PROMOTION, sub_action=SubActionModel.NONE) result = pipe(text) label = result[0]['label'] score = result[0]['score'] print(f"classification {label} score {score}") if label == 'LABEL_0': score = 1 - score commercial_sender_pattern = r'\b[2-8]\d{4}\b' commercial_stop_pattern = r'\bSTOP(?:\s+SMS)?(?:\s+au)?\s+([2-8]\d{4})\b' commercial_stop = False if re.search(commercial_sender_pattern, sender): print("commercial sender") score = score * 0.9 if re.search(commercial_stop_pattern, text): print("STOP founded") score = score * 0.9 commercial_stop = True else: print("STOP missing") urls = extract_urls(text) if urls: print(f"found URLs: {urls}") print("searching for past scans") search_results = [urlscan.search(f"domain:{extract_domain_from_url(url)}") for url in urls] scan_results = [] for search_result in search_results: results = search_result.get('results', []) for result in results: result_uuid = result.get('_id', str) scan_result = urlscan.get_result(result_uuid) scan_results.append(scan_result) if not scan_results: print("scanning...") scan_results = [urlscan.scan(url) for url in urls] for result in scan_results: overall = result.get('verdicts', {}).get('overall', {}) print(f"overall verdict: {overall}") if overall.get('hasVerdicts'): score = overall.get('score') print(f"verdict score {score}") if 0 < overall.get('score'): score = 1.0 break elif overall.get('score') < 0: score = score * 0.9 else: print(f"no URL found") score = score * 0.9 print(f"final score {score}") action = ActionModel.NONE if score > 0.7: action=ActionModel.JUNK elif score > 0.5: if commercial_stop: action=ActionModel.PROMOTION else: action=ActionModel.JUNK print(f"final action {action}") return OutputModel(action=action, sub_action=SubActionModel.NONE) class ReportModel(BaseModel): sender: str message: str @app.post("/report") def report(model: ReportModel): submit_entry(model.sender, model.message)