Spaces:
Running
Running
| from time import sleep | |
| import httpx | |
| from fastapi import FastAPI | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from pydantic import BaseModel | |
| from enum import Enum | |
| from transformers import pipeline | |
| from phishing_datasets import submit_entry | |
| from url_tools import extract_urls, resolve_short_url, extract_domain_from_url | |
| from urlscan_client import UrlscanClient | |
| import requests | |
| import re | |
| app = FastAPI() | |
| urlscan = UrlscanClient() | |
| class MessageModel(BaseModel): | |
| text: str | |
| class QueryModel(BaseModel): | |
| sender: str | |
| message: MessageModel | |
| class AppModel(BaseModel): | |
| version: str | |
| class InputModel(BaseModel): | |
| _version: int | |
| query: QueryModel | |
| app: AppModel | |
| class ActionModel(Enum): | |
| # Insufficient information to determine an action to take. In a query response, has the effect of allowing the message to be shown normally. | |
| NONE = 0 | |
| # Allow the message to be shown normally. | |
| ALLOW = 1 | |
| # Prevent the message from being shown normally, filtered as Junk message. | |
| JUNK = 2 | |
| # Prevent the message from being shown normally, filtered as Promotional message. | |
| PROMOTION = 3 | |
| # Prevent the message from being shown normally, filtered as Transactional message. | |
| TRANSACTION = 4 | |
| class SubActionModel(Enum): | |
| NONE = 0 | |
| class OutputModel(BaseModel): | |
| action: ActionModel | |
| sub_action: SubActionModel | |
| pipe = pipeline(task="text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection") | |
| def get_well_known_aasa(): | |
| return JSONResponse( | |
| content={ | |
| "messagefilter": { | |
| "apps": [ | |
| "X9NN3FSS3T.com.lela.Serenity.SerenityMessageFilterExtension", | |
| "X9NN3FSS3T.com.lela.Serenity" | |
| ] | |
| } | |
| }, | |
| media_type="application/json" | |
| ) | |
| def get_robots_txt(): | |
| return FileResponse("robots.txt") | |
| def predict(model: InputModel) -> OutputModel: | |
| sender = model.query.sender | |
| text = model.query.message.text | |
| print(f"[{sender}] {text}") | |
| # Debug sleep | |
| pattern = r"^Sent from your Twilio trial account - sleep (\d+)$" | |
| match = re.search(pattern, text) | |
| if match: | |
| number_str = match.group(1) | |
| sleep_duration = int(number_str) | |
| sleep(sleep_duration) | |
| return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
| # Debug category | |
| pattern = r"^Sent from your Twilio trial account - (junk|transaction|promotion)$" | |
| match = re.search(pattern, text) | |
| if match: | |
| category_str = match.group(1) | |
| match category_str: | |
| case 'junk': | |
| return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
| case 'transaction': | |
| return OutputModel(action=ActionModel.TRANSACTION, sub_action=SubActionModel.NONE) | |
| case 'promotion': | |
| return OutputModel(action=ActionModel.PROMOTION, sub_action=SubActionModel.NONE) | |
| result = pipe(text) | |
| label = result[0]['label'] | |
| score = result[0]['score'] | |
| print(f"classification {label} score {score}") | |
| if label == 'LABEL_0': | |
| score = 1 - score | |
| commercial_sender_pattern = r'\b[2-8]\d{4}\b' | |
| commercial_stop_pattern = r'\bSTOP(?:\s+SMS)?(?:\s+au)?\s+([2-8]\d{4})\b' | |
| commercial_stop = False | |
| if re.search(commercial_sender_pattern, sender): | |
| print("commercial sender") | |
| score = score * 0.9 | |
| if re.search(commercial_stop_pattern, text): | |
| print("STOP founded") | |
| score = score * 0.9 | |
| commercial_stop = True | |
| else: | |
| print("STOP missing") | |
| urls = extract_urls(text) | |
| if urls: | |
| print(f"found URLs: {urls}") | |
| print("searching for past scans") | |
| search_results = [urlscan.search(f"domain:{extract_domain_from_url(url)}") for url in urls] | |
| scan_results = [] | |
| for search_result in search_results: | |
| results = search_result.get('results', []) | |
| for result in results: | |
| result_uuid = result.get('_id', str) | |
| scan_result = urlscan.get_result(result_uuid) | |
| scan_results.append(scan_result) | |
| if not scan_results: | |
| print("scanning...") | |
| scan_results = [urlscan.scan(url) for url in urls] | |
| for result in scan_results: | |
| overall = result.get('verdicts', {}).get('overall', {}) | |
| print(f"overall verdict: {overall}") | |
| if overall.get('hasVerdicts'): | |
| score = overall.get('score') | |
| print(f"verdict score {score}") | |
| if 0 < overall.get('score'): | |
| score = 1.0 | |
| break | |
| elif overall.get('score') < 0: | |
| score = score * 0.9 | |
| else: | |
| print(f"no URL found") | |
| score = score * 0.9 | |
| print(f"final score {score}") | |
| action = ActionModel.NONE | |
| if score > 0.7: | |
| action=ActionModel.JUNK | |
| elif score > 0.5: | |
| if commercial_stop: | |
| action=ActionModel.PROMOTION | |
| else: | |
| action=ActionModel.JUNK | |
| print(f"final action {action}") | |
| return OutputModel(action=action, sub_action=SubActionModel.NONE) | |
| class ReportModel(BaseModel): | |
| sender: str | |
| message: str | |
| def report(model: ReportModel): | |
| submit_entry(model.sender, model.message) |