Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import spaces | |
import torch | |
import numpy as np | |
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline | |
import re | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
from datetime import datetime | |
from torch.nn.functional import sigmoid | |
from collections import Counter | |
import logging | |
import traceback | |
# Set up logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
# Device configuration | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
logger.info(f"Using device: {device}") | |
# Set up custom logging | |
# Set up custom logging | |
class CustomFormatter(logging.Formatter): | |
"""Custom formatter with colors and better formatting""" | |
grey = "\x1b[38;21m" | |
blue = "\x1b[38;5;39m" | |
yellow = "\x1b[38;5;226m" | |
red = "\x1b[38;5;196m" | |
bold_red = "\x1b[31;1m" | |
reset = "\x1b[0m" | |
def format(self, record): | |
# Remove the logger name from the output | |
if record.levelno == logging.DEBUG: | |
return f"{self.blue}{record.getMessage()}{self.reset}" | |
elif record.levelno == logging.INFO: | |
return f"{self.grey}{record.getMessage()}{self.reset}" | |
elif record.levelno == logging.WARNING: | |
return f"{self.yellow}{record.getMessage()}{self.reset}" | |
elif record.levelno == logging.ERROR: | |
return f"{self.red}{record.getMessage()}{self.reset}" | |
elif record.levelno == logging.CRITICAL: | |
return f"{self.bold_red}{record.getMessage()}{self.reset}" | |
return record.getMessage() | |
# Setup logger | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
# Remove any existing handlers | |
logger.handlers = [] | |
# Create console handler with custom formatter | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
ch.setFormatter(CustomFormatter()) | |
logger.addHandler(ch) | |
# Suppress matplotlib font debugging | |
matplotlib_logger = logging.getLogger('matplotlib.font_manager') | |
matplotlib_logger.setLevel(logging.WARNING) | |
# Also suppress the UserWarning about tight layout | |
import warnings | |
warnings.filterwarnings("ignore", message="Tight layout not applied") | |
# Model initialization | |
model_name = "SamanthaStorm/tether-multilabel-v4" | |
model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
# sentiment model - add no_cache=True and force_download=True | |
# Model initialization | |
model_name = "SamanthaStorm/tether-multilabel-v4" | |
model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
# sentiment model | |
sentiment_model = AutoModelForSequenceClassification.from_pretrained( | |
"SamanthaStorm/tether-sentiment-v3", | |
force_download=True, | |
local_files_only=False | |
).to(device) | |
sentiment_tokenizer = AutoTokenizer.from_pretrained( | |
"SamanthaStorm/tether-sentiment-v3", | |
use_fast=False, | |
force_download=True, | |
local_files_only=False | |
) | |
# After loading the sentiment model | |
logger.debug(f"\nSentiment Model Config:") | |
logger.debug(f"Model name: {sentiment_model.config.name_or_path}") | |
logger.debug(f"Last modified: {sentiment_model.config._name_or_path}") | |
emotion_pipeline = hf_pipeline( | |
"text-classification", | |
model="j-hartmann/emotion-english-distilroberta-base", | |
return_all_scores=True, # Get all emotion scores | |
top_k=None, # Don't limit to top k predictions | |
truncation=True, | |
device=0 if torch.cuda.is_available() else -1 | |
) | |
# DARVO model | |
darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device) | |
darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) | |
darvo_model.eval() | |
# Constants and Labels | |
LABELS = [ | |
"recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", | |
"blame shifting", "nonabusive", "projection", "insults", | |
"contradictory statements", "obscure language" | |
] | |
SENTIMENT_LABELS = ["supportive", "undermining"] | |
THRESHOLDS = { | |
"recovery phase": 0.324, | |
"control": 0.33, | |
"gaslighting": 0.285, | |
"guilt tripping": 0.267, | |
"dismissiveness": 0.123, | |
"blame shifting": 0.116, | |
"projection": 0.425, | |
"insults": 0.347, | |
"contradictory statements": 0.378, | |
"obscure language": 0.206, | |
"nonabusive": 0.094 | |
} | |
PATTERN_WEIGHTS = { | |
"recovery phase": 0.7, | |
"control": 1.4, | |
"gaslighting": 1.3, | |
"guilt tripping": 1.2, | |
"dismissiveness": 0.9, | |
"blame shifting": 1.0, # Increased from 0.8 | |
"projection": 0.5, | |
"insults": 1.4, # Reduced from 2.1 | |
"contradictory statements": 1.0, | |
"obscure language": 0.9, | |
"nonabusive": 0.0 | |
} | |
ESCALATION_QUESTIONS = [ | |
("Partner has access to firearms or weapons", 4), | |
("Partner threatened to kill you", 3), | |
("Partner threatened you with a weapon", 3), | |
("Partner has ever choked you, even if you considered it consensual at the time", 4), | |
("Partner injured or threatened your pet(s)", 3), | |
("Partner has broken your things, punched or kicked walls, or thrown things ", 2), | |
("Partner forced or coerced you into unwanted sexual acts", 3), | |
("Partner threatened to take away your children", 2), | |
("Violence has increased in frequency or severity", 3), | |
("Partner monitors your calls/GPS/social media", 2) | |
] | |
RISK_STAGE_LABELS = { | |
1: "🌀 Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", | |
2: "🔥 Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", | |
3: "🌧️ Risk Stage: Reconciliation\nThis message reflects a reset attempt—apologies or emotional repair without accountability.", | |
4: "🌸 Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." | |
} | |
THREAT_MOTIFS = [ | |
"i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this", | |
"i'll break your face", "i'll bash your head in", "i'll snap your neck", | |
"i'll come over there and make you shut up", "i'll knock your teeth out", | |
"you're going to bleed", "you want me to hit you?", "i won't hold back next time", | |
"i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream", | |
"i know where you live", "i'm outside", "i'll be waiting", "i saw you with him", | |
"you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule", | |
"i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry", | |
"you're going to wish you hadn't", "you brought this on yourself", "don't push me", | |
"you have no idea what i'm capable of", "you better watch yourself", | |
"i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this", | |
"i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself", | |
"i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows", | |
"i'm going to destroy your name", "you'll lose everyone", "i'll expose you", | |
"your friends will hate you", "i'll post everything", "you'll be cancelled", | |
"you'll lose everything", "i'll take the house", "i'll drain your account", | |
"you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job", | |
"i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me", | |
"don't make me do this", "you know what happens when i'm mad", "you're forcing my hand", | |
"if you just behaved, this wouldn't happen", "this is your fault", | |
"you're making me hurt you", "i warned you", "you should have listened" | |
] | |
def get_emotion_profile(text): | |
"""Get emotion profile from text with all scores""" | |
try: | |
logger.debug("\n🎭 EMOTION ANALYSIS") | |
logger.debug(f"Analyzing text: {text}") | |
emotions = emotion_pipeline(text) | |
logger.debug(f"Raw emotion pipeline output: {emotions}") | |
if isinstance(emotions, list) and isinstance(emotions[0], list): | |
# Extract all scores from the first prediction | |
emotion_scores = emotions[0] | |
# Log raw scores | |
logger.debug("\nRaw emotion scores:") | |
for e in emotion_scores: | |
logger.debug(f" • {e['label']}: {e['score']:.3f}") | |
# Convert to dictionary | |
emotion_dict = {e['label'].lower(): round(e['score'], 3) for e in emotion_scores} | |
# Log final processed emotions | |
logger.debug("\nProcessed emotion profile:") | |
for emotion, score in emotion_dict.items(): | |
logger.debug(f" • {emotion}: {score:.3f}") | |
return emotion_dict | |
logger.debug("No valid emotions detected, returning empty dict") | |
return {} | |
except Exception as e: | |
logger.error(f"Error in get_emotion_profile: {e}") | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
default_emotions = { | |
"sadness": 0.0, | |
"joy": 0.0, | |
"neutral": 0.0, | |
"disgust": 0.0, | |
"anger": 0.0, | |
"fear": 0.0 | |
} | |
logger.debug(f"Returning default emotions: {default_emotions}") | |
return default_emotions | |
def get_emotional_tone_tag(text, sentiment, patterns, abuse_score): | |
"""Get emotional tone tag based on emotions and patterns""" | |
emotions = get_emotion_profile(text) | |
sadness = emotions.get("sadness", 0) | |
joy = emotions.get("joy", 0) | |
neutral = emotions.get("neutral", 0) | |
disgust = emotions.get("disgust", 0) | |
anger = emotions.get("anger", 0) | |
fear = emotions.get("fear", 0) | |
# Direct Threat (New) | |
text_lower = text.lower() | |
threat_indicators = [ | |
"if you", "i'll make", "don't forget", "remember", "regret", | |
"i control", "i'll take", "you'll lose", "make sure", | |
"never see", "won't let" | |
] | |
if ( | |
any(indicator in text_lower for indicator in threat_indicators) and | |
any(p in patterns for p in ["control", "insults"]) and | |
(anger > 0.2 or disgust > 0.2 or abuse_score > 70) | |
): | |
return "direct threat" | |
# Prophetic Punishment (New) | |
text_lower = text.lower() | |
future_consequences = [ | |
"will end up", "you'll be", "you will be", "going to be", | |
"will become", "will find yourself", "will realize", | |
"you'll regret", "you'll see", "will learn", "truly will", | |
"end up alone", "end up miserable" | |
] | |
dismissive_endings = [ | |
"i'm out", "i'm done", "whatever", "good luck", | |
"your choice", "your problem", "regardless", | |
"keep", "keep on" | |
] | |
if ( | |
(any(phrase in text_lower for phrase in future_consequences) or | |
any(end in text_lower for end in dismissive_endings)) and | |
any(p in ["dismissiveness", "control"] for p in patterns) and | |
(disgust > 0.2 or neutral > 0.3 or anger > 0.2) # Lowered thresholds | |
): | |
return "predictive punishment" | |
if ( | |
(any(phrase in text_lower for phrase in future_consequences) or | |
any(end in text_lower for end in dismissive_endings)) and | |
any(p in ["dismissiveness", "control"] for p in patterns) and | |
sadness > 0.6 and | |
all(e < 0.1 for e in [anger, disgust, neutral]) | |
): | |
return "predictive punishment" | |
# 1. Performative Regret | |
if ( | |
sadness > 0.3 and | |
any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and | |
(sentiment == "undermining" or abuse_score > 40) | |
): | |
return "performative regret" | |
# 2. Coercive Warmth | |
if ( | |
(joy > 0.2 or sadness > 0.3) and | |
any(p in patterns for p in ["control", "gaslighting"]) and | |
sentiment == "undermining" | |
): | |
return "coercive warmth" | |
# 3. Cold Invalidation | |
if ( | |
(neutral + disgust) > 0.4 and | |
any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and | |
sentiment == "undermining" | |
): | |
return "cold invalidation" | |
# 4. Genuine Vulnerability | |
if ( | |
(sadness + fear) > 0.4 and | |
sentiment == "supportive" and | |
all(p in ["recovery"] for p in patterns) | |
): | |
return "genuine vulnerability" | |
# 5. Emotional Threat | |
if ( | |
(anger + disgust) > 0.4 and | |
any(p in patterns for p in ["control", "insults", "dismissiveness"]) and | |
sentiment == "undermining" | |
): | |
return "emotional threat" | |
# 6. Weaponized Sadness | |
if ( | |
sadness > 0.5 and | |
any(p in patterns for p in ["guilt tripping", "projection"]) and | |
sentiment == "undermining" | |
): | |
return "weaponized sadness" | |
# 7. Toxic Resignation | |
if ( | |
neutral > 0.4 and | |
any(p in patterns for p in ["dismissiveness", "obscure language"]) and | |
sentiment == "undermining" | |
): | |
return "toxic resignation" | |
# 8. Aggressive Dismissal | |
if ( | |
anger > 0.4 and | |
any(p in patterns for p in ["insults", "control"]) and | |
sentiment == "undermining" | |
): | |
return "aggressive dismissal" | |
# 9. Deflective Hostility | |
if ( | |
(0.15 < anger < 0.6 or 0.15 < disgust < 0.6) and | |
any(p in patterns for p in ["projection"]) and | |
sentiment == "undermining" | |
): | |
return "deflective hostility" | |
# 10. Contradictory Gaslight | |
if ( | |
(joy + anger + sadness) > 0.4 and | |
any(p in patterns for p in ["gaslighting", "contradictory statements"]) and | |
sentiment == "undermining" | |
): | |
return "contradictory gaslight" | |
# 11. Forced Accountability Flip | |
if ( | |
(anger + disgust) > 0.4 and | |
any(p in patterns for p in ["blame shifting", "projection"]) and | |
sentiment == "undermining" | |
): | |
return "forced accountability flip" | |
# Emotional Instability Fallback | |
if ( | |
(anger + sadness + disgust) > 0.5 and | |
sentiment == "undermining" | |
): | |
return "emotional instability" | |
return "neutral" | |
def predict_darvo_score(text): | |
"""Predict DARVO score for given text""" | |
try: | |
inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
logits = darvo_model(**inputs).logits | |
return round(sigmoid(logits.cpu()).item(), 4) | |
except Exception as e: | |
logger.error(f"Error in DARVO prediction: {e}") | |
return 0.0 | |
def detect_weapon_language(text): | |
"""Detect weapon-related language in text""" | |
weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"] | |
t = text.lower() | |
return any(w in t for w in weapon_keywords) | |
def get_risk_stage(patterns, sentiment): | |
"""Determine risk stage based on patterns and sentiment""" | |
try: | |
if "insults" in patterns: | |
return 2 | |
elif "recovery" in patterns: | |
return 3 | |
elif "control" in patterns or "guilt tripping" in patterns: | |
return 1 | |
elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): | |
return 4 | |
return 1 | |
except Exception as e: | |
logger.error(f"Error determining risk stage: {e}") | |
return 1 | |
def detect_threat_pattern(text, patterns): | |
"""Detect if a message contains threat patterns""" | |
# Threat indicators in text | |
threat_words = [ | |
"regret", "sorry", "pay", "hurt", "suffer", "destroy", "ruin", | |
"expose", "tell everyone", "never see", "take away", "lose", | |
"control", "make sure", "won't let", "force", "warn", "never", | |
"punish", "teach you", "learn", "show you", "remember", | |
"if you", "don't forget", "i control", "i'll make sure", # Added these specific phrases | |
"bank account", "phone", "money", "access" # Added financial control indicators | |
] | |
# Check for conditional threats (if/then structures) | |
text_lower = text.lower() | |
conditional_threat = ( | |
"if" in text_lower and | |
any(word in text_lower for word in ["regret", "make sure", "control"]) | |
) | |
has_threat_words = any(word in text_lower for word in threat_words) | |
# Check for threat patterns | |
threat_patterns = {"control", "gaslighting", "blame shifting", "insults"} | |
has_threat_patterns = any(p in threat_patterns for p in patterns) | |
return has_threat_words or has_threat_patterns or conditional_threat | |
def detect_compound_threat(text, patterns): | |
"""Detect compound threats in a single message""" | |
try: | |
# Rule A: Single Message Multiple Patterns | |
high_risk_patterns = {"control", "gaslighting", "blame shifting", "insults"} | |
high_risk_count = sum(1 for p in patterns if p in high_risk_patterns) | |
has_threat = detect_threat_pattern(text, patterns) | |
# Special case for control + threats | |
has_control = "control" in patterns | |
has_conditional_threat = "if" in text.lower() and any(word in text.lower() | |
for word in ["regret", "make sure", "control"]) | |
# Single message compound threat | |
if (has_threat and high_risk_count >= 2) or (has_control and has_conditional_threat): | |
return True, "single_message" | |
return False, None | |
except Exception as e: | |
logger.error(f"Error in compound threat detection: {e}") | |
return False, None | |
def analyze_message_batch_threats(messages, results): | |
"""Analyze multiple messages for compound threats""" | |
threat_messages = [] | |
support_messages = [] | |
for i, (msg, (result, _)) in enumerate(zip(messages, results)): | |
if not msg.strip(): # Skip empty messages | |
continue | |
patterns = result[1] # Get detected patterns | |
# Check for threat in this message | |
if detect_threat_pattern(msg, patterns): | |
threat_messages.append(i) | |
# Check for supporting patterns | |
if any(p in {"control", "gaslighting", "blame shifting"} for p in patterns): | |
support_messages.append(i) | |
# Rule B: Multi-Message Accumulation | |
if len(threat_messages) >= 2: | |
return True, "multiple_threats" | |
elif len(threat_messages) == 1 and len(support_messages) >= 2: | |
return True, "threat_with_support" | |
return False, None | |
def compute_abuse_score(matched_scores, sentiment): | |
"""Compute abuse score from matched patterns and sentiment""" | |
try: | |
if not matched_scores: | |
logger.debug("No matched scores, returning 0") | |
return 0.0 | |
# Calculate weighted score | |
total_weight = sum(weight for _, _, weight in matched_scores) | |
if total_weight == 0: | |
logger.debug("Total weight is 0, returning 0") | |
return 0.0 | |
# Get highest pattern scores | |
pattern_scores = [(label, score) for label, score, _ in matched_scores] | |
sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True) | |
logger.debug(f"Sorted pattern scores: {sorted_scores}") | |
# Base score calculation | |
weighted_sum = sum(score * weight for _, score, weight in matched_scores) | |
base_score = (weighted_sum / total_weight) * 100 | |
logger.debug(f"Initial base score: {base_score:.1f}") | |
# Cap maximum score based on pattern severity | |
max_score = 85.0 # Set maximum possible score | |
if any(label in {'control', 'gaslighting'} for label, _, _ in matched_scores): | |
max_score = 90.0 | |
logger.debug(f"Increased max score to {max_score} due to high severity patterns") | |
# Apply diminishing returns for multiple patterns | |
if len(matched_scores) > 1: | |
multiplier = 1 + (0.1 * (len(matched_scores) - 1)) | |
base_score *= multiplier | |
logger.debug(f"Applied multiplier {multiplier:.2f} for {len(matched_scores)} patterns") | |
# Apply sentiment modifier | |
if sentiment == "supportive": | |
base_score *= 0.85 | |
logger.debug("Applied 15% reduction for supportive sentiment") | |
final_score = min(round(base_score, 1), max_score) | |
logger.debug(f"Final abuse score: {final_score}") | |
return final_score | |
except Exception as e: | |
logger.error(f"Error computing abuse score: {e}") | |
return 0.0 | |
def analyze_single_message(text, thresholds): | |
"""Analyze a single message for abuse patterns""" | |
logger.debug("\n=== DEBUG START ===") | |
logger.debug(f"Input text: {text}") | |
try: | |
if not text.strip(): | |
logger.debug("Empty text, returning zeros") | |
return 0.0, [], [], {"label": "none"}, 1, 0.0, None | |
# Check for explicit abuse | |
explicit_abuse_words = ['fuck', 'bitch', 'shit', 'ass', 'dick'] | |
explicit_abuse = any(word in text.lower() for word in explicit_abuse_words) | |
logger.debug(f"Explicit abuse detected: {explicit_abuse}") | |
# Abuse model inference | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy() | |
# Log raw model outputs | |
logger.debug("\nRaw model scores:") | |
for label, score in zip(LABELS, raw_scores): | |
logger.debug(f"{label}: {score:.3f}") | |
# Get predictions and sort them | |
predictions = list(zip(LABELS, raw_scores)) | |
sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True) | |
logger.debug("\nTop 3 predictions:") | |
for label, score in sorted_predictions[:3]: | |
logger.debug(f"{label}: {score:.3f}") | |
# Apply thresholds | |
threshold_labels = [] | |
if explicit_abuse: | |
threshold_labels.append("insults") | |
logger.debug("\nForced inclusion of 'insults' due to explicit abuse") | |
for label, score in sorted_predictions: | |
base_threshold = thresholds.get(label, 0.25) | |
if explicit_abuse: | |
base_threshold *= 0.5 | |
if score > base_threshold: | |
if label not in threshold_labels: | |
threshold_labels.append(label) | |
logger.debug("\nLabels that passed thresholds:", threshold_labels) | |
# Calculate matched scores | |
matched_scores = [] | |
for label in threshold_labels: | |
score = raw_scores[LABELS.index(label)] | |
weight = PATTERN_WEIGHTS.get(label, 1.0) | |
if explicit_abuse and label == "insults": | |
weight *= 1.5 | |
matched_scores.append((label, score, weight)) | |
# In analyze_single_message, modify the sentiment section: | |
# Get sentiment | |
sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()} | |
with torch.no_grad(): | |
sent_logits = sentiment_model(**sent_inputs).logits[0] | |
sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy() | |
# Add detailed logging | |
logger.debug("\n🎭 SENTIMENT ANALYSIS DETAILS") | |
logger.debug(f"Raw logits: {sent_logits}") | |
logger.debug(f"Probabilities: undermining={sent_probs[0]:.3f}, supportive={sent_probs[1]:.3f}") | |
sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))] | |
logger.debug(f"Selected sentiment: {sentiment}") | |
# Calculate abuse score | |
abuse_score = compute_abuse_score(matched_scores, sentiment) | |
if explicit_abuse: | |
abuse_score = max(abuse_score, 70.0) | |
# Check for compound threats | |
compound_threat_flag, threat_type = detect_compound_threat( | |
text, threshold_labels | |
) | |
if compound_threat_flag: | |
logger.debug(f"⚠️ Compound threat detected in message: {threat_type}") | |
abuse_score = max(abuse_score, 85.0) # Force high score for compound threats | |
# Get DARVO score | |
darvo_score = predict_darvo_score(text) | |
# Get tone using emotion-based approach | |
tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score) | |
# Check for the specific combination | |
highest_pattern = max(matched_scores, key=lambda x: x[1])[0] if matched_scores else None # Get highest pattern | |
if sentiment == "supportive" and tone_tag == "neutral" and highest_pattern == "obscure language": | |
logger.debug("Message classified as likely non-abusive (supportive, neutral, and obscure language). Returning low risk.") | |
return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral" # Return non-abusive values | |
# Set stage | |
stage = 2 if explicit_abuse or abuse_score > 70 else 1 | |
logger.debug("=== DEBUG END ===\n") | |
return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag | |
except Exception as e: | |
logger.error(f"Error in analyze_single_message: {e}") | |
return 0.0, [], [], {"label": "error"}, 1, 0.0, None | |
def generate_abuse_score_chart(dates, scores, patterns): | |
"""Generate a timeline chart of abuse scores""" | |
try: | |
plt.figure(figsize=(10, 6)) | |
plt.clf() | |
# Create new figure | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
# Plot points and lines | |
x = range(len(scores)) | |
plt.plot(x, scores, 'bo-', linewidth=2, markersize=8) | |
# Add labels for each point with highest scoring pattern | |
for i, (score, pattern) in enumerate(zip(scores, patterns)): | |
# Get the pattern and its score | |
plt.annotate( | |
f'{pattern}\n{score:.0f}%', | |
(i, score), | |
textcoords="offset points", | |
xytext=(0, 10), | |
ha='center', | |
bbox=dict( | |
boxstyle='round,pad=0.5', | |
fc='white', | |
ec='gray', | |
alpha=0.8 | |
) | |
) | |
# Customize the plot | |
plt.ylim(-5, 105) | |
plt.grid(True, linestyle='--', alpha=0.7) | |
plt.title('Abuse Pattern Timeline', pad=20, fontsize=12) | |
plt.ylabel('Abuse Score %') | |
# X-axis labels | |
plt.xticks(x, dates, rotation=45) | |
# Risk level bands with better colors | |
plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green - Low Risk | |
plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold - Moderate Risk | |
plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange - High Risk | |
plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red - Critical Risk | |
# Add risk level labels | |
plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center') | |
plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center') | |
plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center') | |
plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center') | |
# Adjust layout | |
plt.tight_layout() | |
# Convert plot to image | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight') | |
buf.seek(0) | |
plt.close('all') # Close all figures to prevent memory leaks | |
return Image.open(buf) | |
except Exception as e: | |
logger.error(f"Error generating abuse score chart: {e}") | |
return None | |
def analyze_composite(msg1, msg2, msg3, *answers_and_none): | |
"""Analyze multiple messages and checklist responses""" | |
logger.debug("\n🔄 STARTING NEW ANALYSIS") | |
logger.debug("=" * 50) | |
# Define severity categories at the start | |
high = {'control'} | |
moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults', | |
'contradictory statements', 'guilt tripping'} | |
low = {'blame shifting', 'projection', 'recovery'} | |
try: | |
# Process checklist responses | |
logger.debug("\n📋 CHECKLIST PROCESSING") | |
logger.debug("=" * 50) | |
none_selected_checked = answers_and_none[-1] | |
responses_checked = any(answers_and_none[:-1]) | |
none_selected = not responses_checked and none_selected_checked | |
logger.debug("Checklist Status:") | |
logger.debug(f" • None Selected Box: {'✓' if none_selected_checked else '✗'}") | |
logger.debug(f" • Has Responses: {'✓' if responses_checked else '✗'}") | |
logger.debug(f" • Final Status: {'None Selected' if none_selected else 'Has Selections'}") | |
if none_selected: | |
escalation_score = 0 | |
escalation_note = "Checklist completed: no danger items reported." | |
escalation_completed = True | |
logger.debug("\n✓ Checklist: No items selected") | |
elif responses_checked: | |
escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) | |
escalation_note = "Checklist completed." | |
escalation_completed = True | |
logger.debug(f"\n📊 Checklist Score: {escalation_score}") | |
# Log checked items | |
logger.debug("\n⚠️ Selected Risk Factors:") | |
for (q, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]): | |
if a: | |
logger.debug(f" • [{w} points] {q}") | |
else: | |
escalation_score = None | |
escalation_note = "Checklist not completed." | |
escalation_completed = False | |
logger.debug("\n❗ Checklist: Not completed") | |
# Process messages | |
logger.debug("\n📝 MESSAGE PROCESSING") | |
logger.debug("=" * 50) | |
messages = [msg1, msg2, msg3] | |
active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()] | |
logger.debug(f"Active Messages: {len(active)} of 3") | |
if not active: | |
logger.debug("❌ Error: No messages provided") | |
return "Please enter at least one message.", None | |
# Detect threats | |
logger.debug("\n🚨 THREAT DETECTION") | |
logger.debug("=" * 50) | |
def normalize(text): | |
import unicodedata | |
text = text.lower().strip() | |
text = unicodedata.normalize("NFKD", text) | |
text = text.replace("'", "'") | |
return re.sub(r"[^a-z0-9 ]", "", text) | |
def detect_threat_motifs(message, motif_list): | |
norm_msg = normalize(message) | |
return [motif for motif in motif_list if normalize(motif) in norm_msg] | |
# Analyze threats and patterns | |
immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active] | |
flat_threats = [t for sublist in immediate_threats for t in sublist] | |
threat_risk = "Yes" if flat_threats else "No" | |
# Analyze each message | |
logger.debug("\n🔍 INDIVIDUAL MESSAGE ANALYSIS") | |
logger.debug("=" * 50) | |
results = [] | |
for m, d in active: | |
logger.debug(f"\n📝 ANALYZING {d}") | |
logger.debug("-" * 40) # Separator for each message | |
result = analyze_single_message(m, THRESHOLDS.copy()) | |
# Check for non-abusive classification and skip further analysis | |
if result[0] == 0.0 and result[1] == [] and result[3] == {"label": "supportive"} and result[4] == 1 and result[5] == 0.0 and result[6] == "neutral": | |
logger.debug(f"✓ {d} classified as non-abusive, skipping further analysis.") | |
# Option to include in final output (uncomment if needed): | |
# results.append(({"abuse_score": 0.0, "patterns": [], "sentiment": {"label": "supportive"}, "stage": 1, "darvo_score": 0.0, "tone": "neutral"}, d)) | |
continue # Skip to the next message | |
results.append((result, d)) | |
# Log the detailed results for the current message (if not skipped) | |
abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result | |
logger.debug(f"\n📊 Results for {d}:") | |
logger.debug(f" • Abuse Score: {abuse_score:.1f}%") | |
logger.debug(f" • DARVO Score: {darvo_score:.3f}") | |
logger.debug(f" • Risk Stage: {stage}") | |
logger.debug(f" • Sentiment: {sentiment['label']}") | |
logger.debug(f" • Tone: {tone}") | |
if patterns: | |
logger.debug(" • Patterns: " + ", ".join(patterns)) | |
# Unpack results for cleaner logging | |
abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result | |
# Log core metrics | |
logger.debug("\n📊 CORE METRICS") | |
logger.debug(f" • Abuse Score: {abuse_score:.1f}%") | |
logger.debug(f" • DARVO Score: {darvo_score:.3f}") | |
logger.debug(f" • Risk Stage: {stage}") | |
logger.debug(f" • Sentiment: {sentiment['label']}") | |
logger.debug(f" • Tone: {tone}") | |
# Log detected patterns with scores | |
if patterns: | |
logger.debug("\n🎯 DETECTED PATTERNS") | |
for label, score, weight in matched_scores: | |
severity = "❗HIGH" if label in high else "⚠️ MODERATE" if label in moderate else "📝 LOW" | |
logger.debug(f" • {severity} | {label}: {score:.3f} (weight: {weight})") | |
else: | |
logger.debug("\n✓ No abuse patterns detected") | |
# Extract scores and metadata | |
abuse_scores = [r[0][0] for r in results] | |
stages = [r[0][4] for r in results] | |
darvo_scores = [r[0][5] for r in results] | |
tone_tags = [r[0][6] for r in results] | |
dates_used = [r[1] for r in results] | |
# Pattern Analysis Summary | |
logger.debug("\n📈 PATTERN ANALYSIS SUMMARY") | |
logger.debug("=" * 50) | |
predicted_labels = [label for r in results for label in r[0][1]] | |
if predicted_labels: | |
logger.debug("Detected Patterns Across All Messages:") | |
pattern_counts = Counter(predicted_labels) | |
# Log high severity patterns first | |
high_patterns = [p for p in pattern_counts if p in high] | |
if high_patterns: | |
logger.debug("\n❗ HIGH SEVERITY PATTERNS:") | |
for p in high_patterns: | |
logger.debug(f" • {p} (×{pattern_counts[p]})") | |
# Then moderate | |
moderate_patterns = [p for p in pattern_counts if p in moderate] | |
if moderate_patterns: | |
logger.debug("\n⚠️ MODERATE SEVERITY PATTERNS:") | |
for p in moderate_patterns: | |
logger.debug(f" • {p} (×{pattern_counts[p]})") | |
# Then low | |
low_patterns = [p for p in pattern_counts if p in low] | |
if low_patterns: | |
logger.debug("\n📝 LOW SEVERITY PATTERNS:") | |
for p in low_patterns: | |
logger.debug(f" • {p} (×{pattern_counts[p]})") | |
else: | |
logger.debug("✓ No patterns detected across messages") | |
# Pattern Severity Analysis | |
logger.debug("\n⚖️ SEVERITY ANALYSIS") | |
logger.debug("=" * 50) | |
counts = {'high': 0, 'moderate': 0, 'low': 0} | |
for label in predicted_labels: | |
if label in high: | |
counts['high'] += 1 | |
elif label in moderate: | |
counts['moderate'] += 1 | |
elif label in low: | |
counts['low'] += 1 | |
logger.debug("Pattern Distribution:") | |
if counts['high'] > 0: | |
logger.debug(f" ❗ High Severity: {counts['high']}") | |
if counts['moderate'] > 0: | |
logger.debug(f" ⚠️ Moderate Severity: {counts['moderate']}") | |
if counts['low'] > 0: | |
logger.debug(f" 📝 Low Severity: {counts['low']}") | |
total_patterns = sum(counts.values()) | |
if total_patterns > 0: | |
logger.debug(f"\nSeverity Percentages:") | |
logger.debug(f" • High: {(counts['high']/total_patterns)*100:.1f}%") | |
logger.debug(f" • Moderate: {(counts['moderate']/total_patterns)*100:.1f}%") | |
logger.debug(f" • Low: {(counts['low']/total_patterns)*100:.1f}%") | |
# Risk Assessment | |
logger.debug("\n🎯 RISK ASSESSMENT") | |
logger.debug("=" * 50) | |
if counts['high'] >= 2 and counts['moderate'] >= 2: | |
pattern_escalation_risk = "Critical" | |
logger.debug("❗❗ CRITICAL RISK") | |
logger.debug(" • Multiple high and moderate patterns detected") | |
logger.debug(f" • High patterns: {counts['high']}") | |
logger.debug(f" • Moderate patterns: {counts['moderate']}") | |
elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \ | |
(counts['moderate'] >= 3) or \ | |
(counts['high'] >= 1 and counts['moderate'] >= 2): | |
pattern_escalation_risk = "High" | |
logger.debug("❗ HIGH RISK") | |
logger.debug(" • Significant pattern combination detected") | |
logger.debug(f" • High patterns: {counts['high']}") | |
logger.debug(f" • Moderate patterns: {counts['moderate']}") | |
elif (counts['moderate'] == 2) or \ | |
(counts['high'] == 1 and counts['moderate'] == 1) or \ | |
(counts['moderate'] == 1 and counts['low'] >= 2) or \ | |
(counts['high'] == 1 and sum(counts.values()) == 1): | |
pattern_escalation_risk = "Moderate" | |
logger.debug("⚠️ MODERATE RISK") | |
logger.debug(" • Concerning pattern combination detected") | |
logger.debug(f" • Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") | |
else: | |
pattern_escalation_risk = "Low" | |
logger.debug("📝 LOW RISK") | |
logger.debug(" • Limited pattern severity detected") | |
logger.debug(f" • Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") | |
# Checklist Risk Assessment | |
logger.debug("\n📋 CHECKLIST RISK ASSESSMENT") | |
logger.debug("=" * 50) | |
checklist_escalation_risk = "Unknown" if escalation_score is None else ( | |
"Critical" if escalation_score >= 20 else | |
"Moderate" if escalation_score >= 10 else | |
"Low" | |
) | |
if escalation_score is not None: | |
logger.debug(f"Score: {escalation_score}/29") | |
logger.debug(f"Risk Level: {checklist_escalation_risk}") | |
if escalation_score >= 20: | |
logger.debug("❗❗ CRITICAL: Score indicates severe risk") | |
elif escalation_score >= 10: | |
logger.debug("⚠️ MODERATE: Score indicates concerning risk") | |
else: | |
logger.debug("📝 LOW: Score indicates limited risk") | |
else: | |
logger.debug("❓ Risk Level: Unknown (checklist not completed)") | |
# Escalation Analysis | |
logger.debug("\n📈 ESCALATION ANALYSIS") | |
logger.debug("=" * 50) | |
escalation_bump = 0 | |
for result, msg_id in results: | |
abuse_score, _, _, sentiment, stage, darvo_score, tone_tag = result | |
logger.debug(f"\n🔍 Message {msg_id} Risk Factors:") | |
factors = [] | |
if darvo_score > 0.65: | |
escalation_bump += 3 | |
factors.append(f"▲ +3: High DARVO score ({darvo_score:.3f})") | |
if tone_tag in ["forced accountability flip", "emotional threat"]: | |
escalation_bump += 2 | |
factors.append(f"▲ +2: Concerning tone ({tone_tag})") | |
if abuse_score > 80: | |
escalation_bump += 2 | |
factors.append(f"▲ +2: High abuse score ({abuse_score:.1f}%)") | |
if stage == 2: | |
escalation_bump += 3 | |
factors.append("▲ +3: Escalation stage") | |
if factors: | |
for factor in factors: | |
logger.debug(f" {factor}") | |
else: | |
logger.debug(" ✓ No escalation factors") | |
logger.debug(f"\n📊 Total Escalation Bump: +{escalation_bump}") | |
# Check for compound threats across messages | |
compound_threat_flag, threat_type = analyze_message_batch_threats( | |
[msg1, msg2, msg3], results | |
) | |
if compound_threat_flag: | |
logger.debug(f"⚠️ Compound threat detected across messages: {threat_type}") | |
pattern_escalation_risk = "Critical" # Override risk level | |
logger.debug("Risk level elevated to CRITICAL due to compound threats") | |
# Combined Risk Calculation | |
logger.debug("\n🎯 FINAL RISK CALCULATION") | |
logger.debug("=" * 50) | |
def rank(label): | |
return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0) | |
combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump | |
logger.debug("Risk Components:") | |
logger.debug(f" • Pattern Risk ({pattern_escalation_risk}): +{rank(pattern_escalation_risk)}") | |
logger.debug(f" • Checklist Risk ({checklist_escalation_risk}): +{rank(checklist_escalation_risk)}") | |
logger.debug(f" • Escalation Bump: +{escalation_bump}") | |
logger.debug(f" = Combined Score: {combined_score}") | |
escalation_risk = ( | |
"Critical" if combined_score >= 6 else | |
"High" if combined_score >= 4 else | |
"Moderate" if combined_score >= 2 else | |
"Low" | |
) | |
logger.debug(f"\n⚠️ Final Escalation Risk: {escalation_risk}") | |
# Generate Output Text | |
logger.debug("\n📝 GENERATING OUTPUT") | |
logger.debug("=" * 50) | |
if escalation_score is None: | |
escalation_text = ( | |
"🚫 **Escalation Potential: Unknown** (Checklist not completed)\n" | |
"⚠️ This section was not completed. Escalation potential is estimated using message data only.\n" | |
) | |
hybrid_score = 0 | |
logger.debug("Generated output for incomplete checklist") | |
elif escalation_score == 0: | |
escalation_text = ( | |
"✅ **Escalation Checklist Completed:** No danger items reported.\n" | |
"🧭 **Escalation potential estimated from detected message patterns only.**\n" | |
f"• Pattern Risk: {pattern_escalation_risk}\n" | |
f"• Checklist Risk: None reported\n" | |
f"• Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" | |
) | |
hybrid_score = escalation_bump | |
logger.debug("Generated output for no-risk checklist") | |
else: | |
hybrid_score = escalation_score + escalation_bump | |
escalation_text = ( | |
f"📈 **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n" | |
"📋 This score combines your safety checklist answers *and* detected high-risk behavior.\n" | |
f"• Pattern Risk: {pattern_escalation_risk}\n" | |
f"• Checklist Risk: {checklist_escalation_risk}\n" | |
f"• Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" | |
) | |
logger.debug(f"Generated output with hybrid score: {hybrid_score}/29") | |
# Final Metrics | |
logger.debug("\n📊 FINAL METRICS") | |
logger.debug("=" * 50) | |
composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) | |
logger.debug(f"Composite Abuse Score: {composite_abuse}%") | |
most_common_stage = max(set(stages), key=stages.count) | |
logger.debug(f"Most Common Stage: {most_common_stage}") | |
avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) | |
logger.debug(f"Average DARVO Score: {avg_darvo}") | |
# Generate Final Report | |
logger.debug("\n📄 GENERATING FINAL REPORT") | |
logger.debug("=" * 50) | |
out = f"Abuse Intensity: {composite_abuse}%\n" | |
# Add detected patterns to output | |
if predicted_labels: | |
out += "🔍 Detected Patterns:\n" | |
if high_patterns: | |
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in high_patterns) | |
out += f"❗ High Severity: {patterns_str}\n" | |
if moderate_patterns: | |
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in moderate_patterns) | |
out += f"⚠️ Moderate Severity: {patterns_str}\n" | |
if low_patterns: | |
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in low_patterns) | |
out += f"📝 Low Severity: {patterns_str}\n" | |
out += "\n" | |
out += "📊 This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" | |
# Risk Level Assessment | |
risk_level = ( | |
"Critical" if composite_abuse >= 85 or hybrid_score >= 20 else | |
"High" if composite_abuse >= 70 or hybrid_score >= 15 else | |
"Moderate" if composite_abuse >= 50 or hybrid_score >= 10 else | |
"Low" | |
) | |
logger.debug(f"Final Risk Level: {risk_level}") | |
# Add Risk Description | |
risk_descriptions = { | |
"Critical": ( | |
"🚨 **Risk Level: Critical**\n" | |
"Multiple severe abuse patterns detected. This situation shows signs of " | |
"dangerous escalation and immediate intervention may be needed." | |
), | |
"High": ( | |
"⚠️ **Risk Level: High**\n" | |
"Strong abuse patterns detected. This situation shows concerning " | |
"signs of manipulation and control." | |
), | |
"Moderate": ( | |
"⚡ **Risk Level: Moderate**\n" | |
"Concerning patterns detected. While not severe, these behaviors " | |
"indicate unhealthy relationship dynamics." | |
), | |
"Low": ( | |
"📝 **Risk Level: Low**\n" | |
"Minor concerning patterns detected. While present, the detected " | |
"behaviors are subtle or infrequent." | |
) | |
} | |
out += risk_descriptions[risk_level] | |
out += f"\n\n{RISK_STAGE_LABELS[most_common_stage]}" | |
logger.debug("Added risk description and stage information") | |
# Add DARVO Analysis | |
if avg_darvo > 0.25: | |
level = "moderate" if avg_darvo < 0.65 else "high" | |
out += f"\n\n🎭 **DARVO Score: {avg_darvo}** → This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." | |
logger.debug(f"Added DARVO analysis ({level} level)") | |
# Add Emotional Tones | |
logger.debug("\n🎭 Adding Emotional Tones") | |
out += "\n\n🎭 **Emotional Tones Detected:**\n" | |
for i, tone in enumerate(tone_tags): | |
out += f"• Message {i+1}: *{tone or 'none'}*\n" | |
logger.debug(f"Message {i+1} tone: {tone}") | |
# Add Threats Section | |
logger.debug("\n⚠️ Adding Threat Analysis") | |
if flat_threats: | |
out += "\n\n🚨 **Immediate Danger Threats Detected:**\n" | |
for t in set(flat_threats): | |
out += f"• \"{t}\"\n" | |
out += "\n⚠️ These phrases may indicate an imminent risk to physical safety." | |
logger.debug(f"Added {len(set(flat_threats))} unique threat warnings") | |
else: | |
out += "\n\n🧩 **Immediate Danger Threats:** None explicitly detected.\n" | |
out += "This does *not* rule out risk, but no direct threat phrases were matched." | |
logger.debug("No threats to add") | |
# Generate Timeline | |
logger.debug("\n📈 Generating Timeline") | |
pattern_labels = [] | |
for result, _ in results: | |
matched_scores = result[2] # Get the matched_scores from the result tuple | |
if matched_scores: | |
# Sort matched_scores by score and get the highest scoring pattern | |
highest_pattern = max(matched_scores, key=lambda x: x[1]) | |
pattern_labels.append(highest_pattern[0]) # Add the pattern name | |
else: | |
pattern_labels.append("none") | |
logger.debug("Pattern labels for timeline:") | |
for i, (pattern, score) in enumerate(zip(pattern_labels, abuse_scores)): | |
logger.debug(f"Message {i+1}: {pattern} ({score:.1f}%)") | |
timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) | |
logger.debug("Timeline generated successfully") | |
# Add Escalation Text | |
out += "\n\n" + escalation_text | |
logger.debug("Added escalation text to output") | |
logger.debug("\n✅ ANALYSIS COMPLETE") | |
logger.debug("=" * 50) | |
return out, timeline_image | |
except Exception as e: | |
logger.error("\n❌ ERROR IN ANALYSIS") | |
logger.error("=" * 50) | |
logger.error(f"Error type: {type(e).__name__}") | |
logger.error(f"Error message: {str(e)}") | |
logger.error(f"Traceback:\n{traceback.format_exc()}") | |
return "An error occurred during analysis.", None | |
# Gradio Interface Setup | |
def create_interface(): | |
try: | |
textbox_inputs = [gr.Textbox(label=f"Message {i+1}") for i in range(3)] | |
quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS] | |
none_box = gr.Checkbox(label="None of the above") | |
demo = gr.Interface( | |
fn=analyze_composite, | |
inputs=textbox_inputs + quiz_boxes + [none_box], | |
outputs=[ | |
gr.Textbox(label="Results"), | |
gr.Image(label="Abuse Score Timeline", type="pil") | |
], | |
title="Abuse Pattern Detector + Escalation Quiz", | |
description=( | |
"Enter up to three messages that concern you. " | |
"For the most accurate results, include messages from a recent emotionally intense period." | |
), | |
flagging_mode="manual" | |
) | |
return demo | |
except Exception as e: | |
logger.error(f"Error creating interface: {e}") | |
raise | |
# Main execution | |
if __name__ == "__main__": | |
try: | |
demo = create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch app: {e}") | |
raise | |