Tether / app.py
SamanthaStorm's picture
Update app.py
a84b6dc verified
import gradio as gr
import spaces
import torch
import numpy as np
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline
import re
import matplotlib.pyplot as plt
import io
from PIL import Image
from datetime import datetime
from torch.nn.functional import sigmoid
from collections import Counter
import logging
import traceback
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
logger.info(f"Using device: {device}")
# Set up custom logging
# Set up custom logging
class CustomFormatter(logging.Formatter):
"""Custom formatter with colors and better formatting"""
grey = "\x1b[38;21m"
blue = "\x1b[38;5;39m"
yellow = "\x1b[38;5;226m"
red = "\x1b[38;5;196m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
def format(self, record):
# Remove the logger name from the output
if record.levelno == logging.DEBUG:
return f"{self.blue}{record.getMessage()}{self.reset}"
elif record.levelno == logging.INFO:
return f"{self.grey}{record.getMessage()}{self.reset}"
elif record.levelno == logging.WARNING:
return f"{self.yellow}{record.getMessage()}{self.reset}"
elif record.levelno == logging.ERROR:
return f"{self.red}{record.getMessage()}{self.reset}"
elif record.levelno == logging.CRITICAL:
return f"{self.bold_red}{record.getMessage()}{self.reset}"
return record.getMessage()
# Setup logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Remove any existing handlers
logger.handlers = []
# Create console handler with custom formatter
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
# Suppress matplotlib font debugging
matplotlib_logger = logging.getLogger('matplotlib.font_manager')
matplotlib_logger.setLevel(logging.WARNING)
# Also suppress the UserWarning about tight layout
import warnings
warnings.filterwarnings("ignore", message="Tight layout not applied")
# Model initialization
model_name = "SamanthaStorm/tether-multilabel-v4"
model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
# sentiment model - add no_cache=True and force_download=True
# Model initialization
model_name = "SamanthaStorm/tether-multilabel-v4"
model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
# sentiment model
sentiment_model = AutoModelForSequenceClassification.from_pretrained(
"SamanthaStorm/tether-sentiment-v3",
force_download=True,
local_files_only=False
).to(device)
sentiment_tokenizer = AutoTokenizer.from_pretrained(
"SamanthaStorm/tether-sentiment-v3",
use_fast=False,
force_download=True,
local_files_only=False
)
# After loading the sentiment model
logger.debug(f"\nSentiment Model Config:")
logger.debug(f"Model name: {sentiment_model.config.name_or_path}")
logger.debug(f"Last modified: {sentiment_model.config._name_or_path}")
emotion_pipeline = hf_pipeline(
"text-classification",
model="j-hartmann/emotion-english-distilroberta-base",
return_all_scores=True, # Get all emotion scores
top_k=None, # Don't limit to top k predictions
truncation=True,
device=0 if torch.cuda.is_available() else -1
)
# DARVO model
darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device)
darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False)
darvo_model.eval()
# Constants and Labels
LABELS = [
"recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness",
"blame shifting", "nonabusive", "projection", "insults",
"contradictory statements", "obscure language"
]
SENTIMENT_LABELS = ["supportive", "undermining"]
THRESHOLDS = {
"recovery phase": 0.324,
"control": 0.33,
"gaslighting": 0.285,
"guilt tripping": 0.267,
"dismissiveness": 0.123,
"blame shifting": 0.116,
"projection": 0.425,
"insults": 0.347,
"contradictory statements": 0.378,
"obscure language": 0.206,
"nonabusive": 0.094
}
PATTERN_WEIGHTS = {
"recovery phase": 0.7,
"control": 1.4,
"gaslighting": 1.3,
"guilt tripping": 1.2,
"dismissiveness": 0.9,
"blame shifting": 1.0, # Increased from 0.8
"projection": 0.5,
"insults": 1.4, # Reduced from 2.1
"contradictory statements": 1.0,
"obscure language": 0.9,
"nonabusive": 0.0
}
ESCALATION_QUESTIONS = [
("Partner has access to firearms or weapons", 4),
("Partner threatened to kill you", 3),
("Partner threatened you with a weapon", 3),
("Partner has ever choked you, even if you considered it consensual at the time", 4),
("Partner injured or threatened your pet(s)", 3),
("Partner has broken your things, punched or kicked walls, or thrown things ", 2),
("Partner forced or coerced you into unwanted sexual acts", 3),
("Partner threatened to take away your children", 2),
("Violence has increased in frequency or severity", 3),
("Partner monitors your calls/GPS/social media", 2)
]
RISK_STAGE_LABELS = {
1: "🌀 Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.",
2: "🔥 Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.",
3: "🌧️ Risk Stage: Reconciliation\nThis message reflects a reset attempt—apologies or emotional repair without accountability.",
4: "🌸 Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it."
}
THREAT_MOTIFS = [
"i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this",
"i'll break your face", "i'll bash your head in", "i'll snap your neck",
"i'll come over there and make you shut up", "i'll knock your teeth out",
"you're going to bleed", "you want me to hit you?", "i won't hold back next time",
"i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream",
"i know where you live", "i'm outside", "i'll be waiting", "i saw you with him",
"you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule",
"i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry",
"you're going to wish you hadn't", "you brought this on yourself", "don't push me",
"you have no idea what i'm capable of", "you better watch yourself",
"i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this",
"i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself",
"i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows",
"i'm going to destroy your name", "you'll lose everyone", "i'll expose you",
"your friends will hate you", "i'll post everything", "you'll be cancelled",
"you'll lose everything", "i'll take the house", "i'll drain your account",
"you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job",
"i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me",
"don't make me do this", "you know what happens when i'm mad", "you're forcing my hand",
"if you just behaved, this wouldn't happen", "this is your fault",
"you're making me hurt you", "i warned you", "you should have listened"
]
def get_emotion_profile(text):
"""Get emotion profile from text with all scores"""
try:
logger.debug("\n🎭 EMOTION ANALYSIS")
logger.debug(f"Analyzing text: {text}")
emotions = emotion_pipeline(text)
logger.debug(f"Raw emotion pipeline output: {emotions}")
if isinstance(emotions, list) and isinstance(emotions[0], list):
# Extract all scores from the first prediction
emotion_scores = emotions[0]
# Log raw scores
logger.debug("\nRaw emotion scores:")
for e in emotion_scores:
logger.debug(f" • {e['label']}: {e['score']:.3f}")
# Convert to dictionary
emotion_dict = {e['label'].lower(): round(e['score'], 3) for e in emotion_scores}
# Log final processed emotions
logger.debug("\nProcessed emotion profile:")
for emotion, score in emotion_dict.items():
logger.debug(f" • {emotion}: {score:.3f}")
return emotion_dict
logger.debug("No valid emotions detected, returning empty dict")
return {}
except Exception as e:
logger.error(f"Error in get_emotion_profile: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
default_emotions = {
"sadness": 0.0,
"joy": 0.0,
"neutral": 0.0,
"disgust": 0.0,
"anger": 0.0,
"fear": 0.0
}
logger.debug(f"Returning default emotions: {default_emotions}")
return default_emotions
def get_emotional_tone_tag(text, sentiment, patterns, abuse_score):
"""Get emotional tone tag based on emotions and patterns"""
emotions = get_emotion_profile(text)
sadness = emotions.get("sadness", 0)
joy = emotions.get("joy", 0)
neutral = emotions.get("neutral", 0)
disgust = emotions.get("disgust", 0)
anger = emotions.get("anger", 0)
fear = emotions.get("fear", 0)
# Direct Threat (New)
text_lower = text.lower()
threat_indicators = [
"if you", "i'll make", "don't forget", "remember", "regret",
"i control", "i'll take", "you'll lose", "make sure",
"never see", "won't let"
]
if (
any(indicator in text_lower for indicator in threat_indicators) and
any(p in patterns for p in ["control", "insults"]) and
(anger > 0.2 or disgust > 0.2 or abuse_score > 70)
):
return "direct threat"
# Prophetic Punishment (New)
text_lower = text.lower()
future_consequences = [
"will end up", "you'll be", "you will be", "going to be",
"will become", "will find yourself", "will realize",
"you'll regret", "you'll see", "will learn", "truly will",
"end up alone", "end up miserable"
]
dismissive_endings = [
"i'm out", "i'm done", "whatever", "good luck",
"your choice", "your problem", "regardless",
"keep", "keep on"
]
if (
(any(phrase in text_lower for phrase in future_consequences) or
any(end in text_lower for end in dismissive_endings)) and
any(p in ["dismissiveness", "control"] for p in patterns) and
(disgust > 0.2 or neutral > 0.3 or anger > 0.2) # Lowered thresholds
):
return "predictive punishment"
if (
(any(phrase in text_lower for phrase in future_consequences) or
any(end in text_lower for end in dismissive_endings)) and
any(p in ["dismissiveness", "control"] for p in patterns) and
sadness > 0.6 and
all(e < 0.1 for e in [anger, disgust, neutral])
):
return "predictive punishment"
# 1. Performative Regret
if (
sadness > 0.3 and
any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and
(sentiment == "undermining" or abuse_score > 40)
):
return "performative regret"
# 2. Coercive Warmth
if (
(joy > 0.2 or sadness > 0.3) and
any(p in patterns for p in ["control", "gaslighting"]) and
sentiment == "undermining"
):
return "coercive warmth"
# 3. Cold Invalidation
if (
(neutral + disgust) > 0.4 and
any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and
sentiment == "undermining"
):
return "cold invalidation"
# 4. Genuine Vulnerability
if (
(sadness + fear) > 0.4 and
sentiment == "supportive" and
all(p in ["recovery"] for p in patterns)
):
return "genuine vulnerability"
# 5. Emotional Threat
if (
(anger + disgust) > 0.4 and
any(p in patterns for p in ["control", "insults", "dismissiveness"]) and
sentiment == "undermining"
):
return "emotional threat"
# 6. Weaponized Sadness
if (
sadness > 0.5 and
any(p in patterns for p in ["guilt tripping", "projection"]) and
sentiment == "undermining"
):
return "weaponized sadness"
# 7. Toxic Resignation
if (
neutral > 0.4 and
any(p in patterns for p in ["dismissiveness", "obscure language"]) and
sentiment == "undermining"
):
return "toxic resignation"
# 8. Aggressive Dismissal
if (
anger > 0.4 and
any(p in patterns for p in ["insults", "control"]) and
sentiment == "undermining"
):
return "aggressive dismissal"
# 9. Deflective Hostility
if (
(0.15 < anger < 0.6 or 0.15 < disgust < 0.6) and
any(p in patterns for p in ["projection"]) and
sentiment == "undermining"
):
return "deflective hostility"
# 10. Contradictory Gaslight
if (
(joy + anger + sadness) > 0.4 and
any(p in patterns for p in ["gaslighting", "contradictory statements"]) and
sentiment == "undermining"
):
return "contradictory gaslight"
# 11. Forced Accountability Flip
if (
(anger + disgust) > 0.4 and
any(p in patterns for p in ["blame shifting", "projection"]) and
sentiment == "undermining"
):
return "forced accountability flip"
# Emotional Instability Fallback
if (
(anger + sadness + disgust) > 0.5 and
sentiment == "undermining"
):
return "emotional instability"
return "neutral"
@spaces.GPU
def predict_darvo_score(text):
"""Predict DARVO score for given text"""
try:
inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
logits = darvo_model(**inputs).logits
return round(sigmoid(logits.cpu()).item(), 4)
except Exception as e:
logger.error(f"Error in DARVO prediction: {e}")
return 0.0
def detect_weapon_language(text):
"""Detect weapon-related language in text"""
weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"]
t = text.lower()
return any(w in t for w in weapon_keywords)
def get_risk_stage(patterns, sentiment):
"""Determine risk stage based on patterns and sentiment"""
try:
if "insults" in patterns:
return 2
elif "recovery" in patterns:
return 3
elif "control" in patterns or "guilt tripping" in patterns:
return 1
elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]):
return 4
return 1
except Exception as e:
logger.error(f"Error determining risk stage: {e}")
return 1
def detect_threat_pattern(text, patterns):
"""Detect if a message contains threat patterns"""
# Threat indicators in text
threat_words = [
"regret", "sorry", "pay", "hurt", "suffer", "destroy", "ruin",
"expose", "tell everyone", "never see", "take away", "lose",
"control", "make sure", "won't let", "force", "warn", "never",
"punish", "teach you", "learn", "show you", "remember",
"if you", "don't forget", "i control", "i'll make sure", # Added these specific phrases
"bank account", "phone", "money", "access" # Added financial control indicators
]
# Check for conditional threats (if/then structures)
text_lower = text.lower()
conditional_threat = (
"if" in text_lower and
any(word in text_lower for word in ["regret", "make sure", "control"])
)
has_threat_words = any(word in text_lower for word in threat_words)
# Check for threat patterns
threat_patterns = {"control", "gaslighting", "blame shifting", "insults"}
has_threat_patterns = any(p in threat_patterns for p in patterns)
return has_threat_words or has_threat_patterns or conditional_threat
def detect_compound_threat(text, patterns):
"""Detect compound threats in a single message"""
try:
# Rule A: Single Message Multiple Patterns
high_risk_patterns = {"control", "gaslighting", "blame shifting", "insults"}
high_risk_count = sum(1 for p in patterns if p in high_risk_patterns)
has_threat = detect_threat_pattern(text, patterns)
# Special case for control + threats
has_control = "control" in patterns
has_conditional_threat = "if" in text.lower() and any(word in text.lower()
for word in ["regret", "make sure", "control"])
# Single message compound threat
if (has_threat and high_risk_count >= 2) or (has_control and has_conditional_threat):
return True, "single_message"
return False, None
except Exception as e:
logger.error(f"Error in compound threat detection: {e}")
return False, None
def analyze_message_batch_threats(messages, results):
"""Analyze multiple messages for compound threats"""
threat_messages = []
support_messages = []
for i, (msg, (result, _)) in enumerate(zip(messages, results)):
if not msg.strip(): # Skip empty messages
continue
patterns = result[1] # Get detected patterns
# Check for threat in this message
if detect_threat_pattern(msg, patterns):
threat_messages.append(i)
# Check for supporting patterns
if any(p in {"control", "gaslighting", "blame shifting"} for p in patterns):
support_messages.append(i)
# Rule B: Multi-Message Accumulation
if len(threat_messages) >= 2:
return True, "multiple_threats"
elif len(threat_messages) == 1 and len(support_messages) >= 2:
return True, "threat_with_support"
return False, None
@spaces.GPU
def compute_abuse_score(matched_scores, sentiment):
"""Compute abuse score from matched patterns and sentiment"""
try:
if not matched_scores:
logger.debug("No matched scores, returning 0")
return 0.0
# Calculate weighted score
total_weight = sum(weight for _, _, weight in matched_scores)
if total_weight == 0:
logger.debug("Total weight is 0, returning 0")
return 0.0
# Get highest pattern scores
pattern_scores = [(label, score) for label, score, _ in matched_scores]
sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True)
logger.debug(f"Sorted pattern scores: {sorted_scores}")
# Base score calculation
weighted_sum = sum(score * weight for _, score, weight in matched_scores)
base_score = (weighted_sum / total_weight) * 100
logger.debug(f"Initial base score: {base_score:.1f}")
# Cap maximum score based on pattern severity
max_score = 85.0 # Set maximum possible score
if any(label in {'control', 'gaslighting'} for label, _, _ in matched_scores):
max_score = 90.0
logger.debug(f"Increased max score to {max_score} due to high severity patterns")
# Apply diminishing returns for multiple patterns
if len(matched_scores) > 1:
multiplier = 1 + (0.1 * (len(matched_scores) - 1))
base_score *= multiplier
logger.debug(f"Applied multiplier {multiplier:.2f} for {len(matched_scores)} patterns")
# Apply sentiment modifier
if sentiment == "supportive":
base_score *= 0.85
logger.debug("Applied 15% reduction for supportive sentiment")
final_score = min(round(base_score, 1), max_score)
logger.debug(f"Final abuse score: {final_score}")
return final_score
except Exception as e:
logger.error(f"Error computing abuse score: {e}")
return 0.0
@spaces.GPU
def analyze_single_message(text, thresholds):
"""Analyze a single message for abuse patterns"""
logger.debug("\n=== DEBUG START ===")
logger.debug(f"Input text: {text}")
try:
if not text.strip():
logger.debug("Empty text, returning zeros")
return 0.0, [], [], {"label": "none"}, 1, 0.0, None
# Check for explicit abuse
explicit_abuse_words = ['fuck', 'bitch', 'shit', 'ass', 'dick']
explicit_abuse = any(word in text.lower() for word in explicit_abuse_words)
logger.debug(f"Explicit abuse detected: {explicit_abuse}")
# Abuse model inference
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy()
# Log raw model outputs
logger.debug("\nRaw model scores:")
for label, score in zip(LABELS, raw_scores):
logger.debug(f"{label}: {score:.3f}")
# Get predictions and sort them
predictions = list(zip(LABELS, raw_scores))
sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True)
logger.debug("\nTop 3 predictions:")
for label, score in sorted_predictions[:3]:
logger.debug(f"{label}: {score:.3f}")
# Apply thresholds
threshold_labels = []
if explicit_abuse:
threshold_labels.append("insults")
logger.debug("\nForced inclusion of 'insults' due to explicit abuse")
for label, score in sorted_predictions:
base_threshold = thresholds.get(label, 0.25)
if explicit_abuse:
base_threshold *= 0.5
if score > base_threshold:
if label not in threshold_labels:
threshold_labels.append(label)
logger.debug("\nLabels that passed thresholds:", threshold_labels)
# Calculate matched scores
matched_scores = []
for label in threshold_labels:
score = raw_scores[LABELS.index(label)]
weight = PATTERN_WEIGHTS.get(label, 1.0)
if explicit_abuse and label == "insults":
weight *= 1.5
matched_scores.append((label, score, weight))
# In analyze_single_message, modify the sentiment section:
# Get sentiment
sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True)
sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()}
with torch.no_grad():
sent_logits = sentiment_model(**sent_inputs).logits[0]
sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy()
# Add detailed logging
logger.debug("\n🎭 SENTIMENT ANALYSIS DETAILS")
logger.debug(f"Raw logits: {sent_logits}")
logger.debug(f"Probabilities: undermining={sent_probs[0]:.3f}, supportive={sent_probs[1]:.3f}")
sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))]
logger.debug(f"Selected sentiment: {sentiment}")
# Calculate abuse score
abuse_score = compute_abuse_score(matched_scores, sentiment)
if explicit_abuse:
abuse_score = max(abuse_score, 70.0)
# Check for compound threats
compound_threat_flag, threat_type = detect_compound_threat(
text, threshold_labels
)
if compound_threat_flag:
logger.debug(f"⚠️ Compound threat detected in message: {threat_type}")
abuse_score = max(abuse_score, 85.0) # Force high score for compound threats
# Get DARVO score
darvo_score = predict_darvo_score(text)
# Get tone using emotion-based approach
tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score)
# Check for the specific combination
highest_pattern = max(matched_scores, key=lambda x: x[1])[0] if matched_scores else None # Get highest pattern
if sentiment == "supportive" and tone_tag == "neutral" and highest_pattern == "obscure language":
logger.debug("Message classified as likely non-abusive (supportive, neutral, and obscure language). Returning low risk.")
return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral" # Return non-abusive values
# Set stage
stage = 2 if explicit_abuse or abuse_score > 70 else 1
logger.debug("=== DEBUG END ===\n")
return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag
except Exception as e:
logger.error(f"Error in analyze_single_message: {e}")
return 0.0, [], [], {"label": "error"}, 1, 0.0, None
def generate_abuse_score_chart(dates, scores, patterns):
"""Generate a timeline chart of abuse scores"""
try:
plt.figure(figsize=(10, 6))
plt.clf()
# Create new figure
fig, ax = plt.subplots(figsize=(10, 6))
# Plot points and lines
x = range(len(scores))
plt.plot(x, scores, 'bo-', linewidth=2, markersize=8)
# Add labels for each point with highest scoring pattern
for i, (score, pattern) in enumerate(zip(scores, patterns)):
# Get the pattern and its score
plt.annotate(
f'{pattern}\n{score:.0f}%',
(i, score),
textcoords="offset points",
xytext=(0, 10),
ha='center',
bbox=dict(
boxstyle='round,pad=0.5',
fc='white',
ec='gray',
alpha=0.8
)
)
# Customize the plot
plt.ylim(-5, 105)
plt.grid(True, linestyle='--', alpha=0.7)
plt.title('Abuse Pattern Timeline', pad=20, fontsize=12)
plt.ylabel('Abuse Score %')
# X-axis labels
plt.xticks(x, dates, rotation=45)
# Risk level bands with better colors
plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green - Low Risk
plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold - Moderate Risk
plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange - High Risk
plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red - Critical Risk
# Add risk level labels
plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center')
plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center')
plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center')
plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center')
# Adjust layout
plt.tight_layout()
# Convert plot to image
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
plt.close('all') # Close all figures to prevent memory leaks
return Image.open(buf)
except Exception as e:
logger.error(f"Error generating abuse score chart: {e}")
return None
def analyze_composite(msg1, msg2, msg3, *answers_and_none):
"""Analyze multiple messages and checklist responses"""
logger.debug("\n🔄 STARTING NEW ANALYSIS")
logger.debug("=" * 50)
# Define severity categories at the start
high = {'control'}
moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults',
'contradictory statements', 'guilt tripping'}
low = {'blame shifting', 'projection', 'recovery'}
try:
# Process checklist responses
logger.debug("\n📋 CHECKLIST PROCESSING")
logger.debug("=" * 50)
none_selected_checked = answers_and_none[-1]
responses_checked = any(answers_and_none[:-1])
none_selected = not responses_checked and none_selected_checked
logger.debug("Checklist Status:")
logger.debug(f" • None Selected Box: {'✓' if none_selected_checked else '✗'}")
logger.debug(f" • Has Responses: {'✓' if responses_checked else '✗'}")
logger.debug(f" • Final Status: {'None Selected' if none_selected else 'Has Selections'}")
if none_selected:
escalation_score = 0
escalation_note = "Checklist completed: no danger items reported."
escalation_completed = True
logger.debug("\n✓ Checklist: No items selected")
elif responses_checked:
escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a)
escalation_note = "Checklist completed."
escalation_completed = True
logger.debug(f"\n📊 Checklist Score: {escalation_score}")
# Log checked items
logger.debug("\n⚠️ Selected Risk Factors:")
for (q, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]):
if a:
logger.debug(f" • [{w} points] {q}")
else:
escalation_score = None
escalation_note = "Checklist not completed."
escalation_completed = False
logger.debug("\n❗ Checklist: Not completed")
# Process messages
logger.debug("\n📝 MESSAGE PROCESSING")
logger.debug("=" * 50)
messages = [msg1, msg2, msg3]
active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()]
logger.debug(f"Active Messages: {len(active)} of 3")
if not active:
logger.debug("❌ Error: No messages provided")
return "Please enter at least one message.", None
# Detect threats
logger.debug("\n🚨 THREAT DETECTION")
logger.debug("=" * 50)
def normalize(text):
import unicodedata
text = text.lower().strip()
text = unicodedata.normalize("NFKD", text)
text = text.replace("'", "'")
return re.sub(r"[^a-z0-9 ]", "", text)
def detect_threat_motifs(message, motif_list):
norm_msg = normalize(message)
return [motif for motif in motif_list if normalize(motif) in norm_msg]
# Analyze threats and patterns
immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active]
flat_threats = [t for sublist in immediate_threats for t in sublist]
threat_risk = "Yes" if flat_threats else "No"
# Analyze each message
logger.debug("\n🔍 INDIVIDUAL MESSAGE ANALYSIS")
logger.debug("=" * 50)
results = []
for m, d in active:
logger.debug(f"\n📝 ANALYZING {d}")
logger.debug("-" * 40) # Separator for each message
result = analyze_single_message(m, THRESHOLDS.copy())
# Check for non-abusive classification and skip further analysis
if result[0] == 0.0 and result[1] == [] and result[3] == {"label": "supportive"} and result[4] == 1 and result[5] == 0.0 and result[6] == "neutral":
logger.debug(f"✓ {d} classified as non-abusive, skipping further analysis.")
# Option to include in final output (uncomment if needed):
# results.append(({"abuse_score": 0.0, "patterns": [], "sentiment": {"label": "supportive"}, "stage": 1, "darvo_score": 0.0, "tone": "neutral"}, d))
continue # Skip to the next message
results.append((result, d))
# Log the detailed results for the current message (if not skipped)
abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result
logger.debug(f"\n📊 Results for {d}:")
logger.debug(f" • Abuse Score: {abuse_score:.1f}%")
logger.debug(f" • DARVO Score: {darvo_score:.3f}")
logger.debug(f" • Risk Stage: {stage}")
logger.debug(f" • Sentiment: {sentiment['label']}")
logger.debug(f" • Tone: {tone}")
if patterns:
logger.debug(" • Patterns: " + ", ".join(patterns))
# Unpack results for cleaner logging
abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result
# Log core metrics
logger.debug("\n📊 CORE METRICS")
logger.debug(f" • Abuse Score: {abuse_score:.1f}%")
logger.debug(f" • DARVO Score: {darvo_score:.3f}")
logger.debug(f" • Risk Stage: {stage}")
logger.debug(f" • Sentiment: {sentiment['label']}")
logger.debug(f" • Tone: {tone}")
# Log detected patterns with scores
if patterns:
logger.debug("\n🎯 DETECTED PATTERNS")
for label, score, weight in matched_scores:
severity = "❗HIGH" if label in high else "⚠️ MODERATE" if label in moderate else "📝 LOW"
logger.debug(f" • {severity} | {label}: {score:.3f} (weight: {weight})")
else:
logger.debug("\n✓ No abuse patterns detected")
# Extract scores and metadata
abuse_scores = [r[0][0] for r in results]
stages = [r[0][4] for r in results]
darvo_scores = [r[0][5] for r in results]
tone_tags = [r[0][6] for r in results]
dates_used = [r[1] for r in results]
# Pattern Analysis Summary
logger.debug("\n📈 PATTERN ANALYSIS SUMMARY")
logger.debug("=" * 50)
predicted_labels = [label for r in results for label in r[0][1]]
if predicted_labels:
logger.debug("Detected Patterns Across All Messages:")
pattern_counts = Counter(predicted_labels)
# Log high severity patterns first
high_patterns = [p for p in pattern_counts if p in high]
if high_patterns:
logger.debug("\n❗ HIGH SEVERITY PATTERNS:")
for p in high_patterns:
logger.debug(f" • {p}{pattern_counts[p]})")
# Then moderate
moderate_patterns = [p for p in pattern_counts if p in moderate]
if moderate_patterns:
logger.debug("\n⚠️ MODERATE SEVERITY PATTERNS:")
for p in moderate_patterns:
logger.debug(f" • {p}{pattern_counts[p]})")
# Then low
low_patterns = [p for p in pattern_counts if p in low]
if low_patterns:
logger.debug("\n📝 LOW SEVERITY PATTERNS:")
for p in low_patterns:
logger.debug(f" • {p}{pattern_counts[p]})")
else:
logger.debug("✓ No patterns detected across messages")
# Pattern Severity Analysis
logger.debug("\n⚖️ SEVERITY ANALYSIS")
logger.debug("=" * 50)
counts = {'high': 0, 'moderate': 0, 'low': 0}
for label in predicted_labels:
if label in high:
counts['high'] += 1
elif label in moderate:
counts['moderate'] += 1
elif label in low:
counts['low'] += 1
logger.debug("Pattern Distribution:")
if counts['high'] > 0:
logger.debug(f" ❗ High Severity: {counts['high']}")
if counts['moderate'] > 0:
logger.debug(f" ⚠️ Moderate Severity: {counts['moderate']}")
if counts['low'] > 0:
logger.debug(f" 📝 Low Severity: {counts['low']}")
total_patterns = sum(counts.values())
if total_patterns > 0:
logger.debug(f"\nSeverity Percentages:")
logger.debug(f" • High: {(counts['high']/total_patterns)*100:.1f}%")
logger.debug(f" • Moderate: {(counts['moderate']/total_patterns)*100:.1f}%")
logger.debug(f" • Low: {(counts['low']/total_patterns)*100:.1f}%")
# Risk Assessment
logger.debug("\n🎯 RISK ASSESSMENT")
logger.debug("=" * 50)
if counts['high'] >= 2 and counts['moderate'] >= 2:
pattern_escalation_risk = "Critical"
logger.debug("❗❗ CRITICAL RISK")
logger.debug(" • Multiple high and moderate patterns detected")
logger.debug(f" • High patterns: {counts['high']}")
logger.debug(f" • Moderate patterns: {counts['moderate']}")
elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \
(counts['moderate'] >= 3) or \
(counts['high'] >= 1 and counts['moderate'] >= 2):
pattern_escalation_risk = "High"
logger.debug("❗ HIGH RISK")
logger.debug(" • Significant pattern combination detected")
logger.debug(f" • High patterns: {counts['high']}")
logger.debug(f" • Moderate patterns: {counts['moderate']}")
elif (counts['moderate'] == 2) or \
(counts['high'] == 1 and counts['moderate'] == 1) or \
(counts['moderate'] == 1 and counts['low'] >= 2) or \
(counts['high'] == 1 and sum(counts.values()) == 1):
pattern_escalation_risk = "Moderate"
logger.debug("⚠️ MODERATE RISK")
logger.debug(" • Concerning pattern combination detected")
logger.debug(f" • Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}")
else:
pattern_escalation_risk = "Low"
logger.debug("📝 LOW RISK")
logger.debug(" • Limited pattern severity detected")
logger.debug(f" • Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}")
# Checklist Risk Assessment
logger.debug("\n📋 CHECKLIST RISK ASSESSMENT")
logger.debug("=" * 50)
checklist_escalation_risk = "Unknown" if escalation_score is None else (
"Critical" if escalation_score >= 20 else
"Moderate" if escalation_score >= 10 else
"Low"
)
if escalation_score is not None:
logger.debug(f"Score: {escalation_score}/29")
logger.debug(f"Risk Level: {checklist_escalation_risk}")
if escalation_score >= 20:
logger.debug("❗❗ CRITICAL: Score indicates severe risk")
elif escalation_score >= 10:
logger.debug("⚠️ MODERATE: Score indicates concerning risk")
else:
logger.debug("📝 LOW: Score indicates limited risk")
else:
logger.debug("❓ Risk Level: Unknown (checklist not completed)")
# Escalation Analysis
logger.debug("\n📈 ESCALATION ANALYSIS")
logger.debug("=" * 50)
escalation_bump = 0
for result, msg_id in results:
abuse_score, _, _, sentiment, stage, darvo_score, tone_tag = result
logger.debug(f"\n🔍 Message {msg_id} Risk Factors:")
factors = []
if darvo_score > 0.65:
escalation_bump += 3
factors.append(f"▲ +3: High DARVO score ({darvo_score:.3f})")
if tone_tag in ["forced accountability flip", "emotional threat"]:
escalation_bump += 2
factors.append(f"▲ +2: Concerning tone ({tone_tag})")
if abuse_score > 80:
escalation_bump += 2
factors.append(f"▲ +2: High abuse score ({abuse_score:.1f}%)")
if stage == 2:
escalation_bump += 3
factors.append("▲ +3: Escalation stage")
if factors:
for factor in factors:
logger.debug(f" {factor}")
else:
logger.debug(" ✓ No escalation factors")
logger.debug(f"\n📊 Total Escalation Bump: +{escalation_bump}")
# Check for compound threats across messages
compound_threat_flag, threat_type = analyze_message_batch_threats(
[msg1, msg2, msg3], results
)
if compound_threat_flag:
logger.debug(f"⚠️ Compound threat detected across messages: {threat_type}")
pattern_escalation_risk = "Critical" # Override risk level
logger.debug("Risk level elevated to CRITICAL due to compound threats")
# Combined Risk Calculation
logger.debug("\n🎯 FINAL RISK CALCULATION")
logger.debug("=" * 50)
def rank(label):
return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0)
combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump
logger.debug("Risk Components:")
logger.debug(f" • Pattern Risk ({pattern_escalation_risk}): +{rank(pattern_escalation_risk)}")
logger.debug(f" • Checklist Risk ({checklist_escalation_risk}): +{rank(checklist_escalation_risk)}")
logger.debug(f" • Escalation Bump: +{escalation_bump}")
logger.debug(f" = Combined Score: {combined_score}")
escalation_risk = (
"Critical" if combined_score >= 6 else
"High" if combined_score >= 4 else
"Moderate" if combined_score >= 2 else
"Low"
)
logger.debug(f"\n⚠️ Final Escalation Risk: {escalation_risk}")
# Generate Output Text
logger.debug("\n📝 GENERATING OUTPUT")
logger.debug("=" * 50)
if escalation_score is None:
escalation_text = (
"🚫 **Escalation Potential: Unknown** (Checklist not completed)\n"
"⚠️ This section was not completed. Escalation potential is estimated using message data only.\n"
)
hybrid_score = 0
logger.debug("Generated output for incomplete checklist")
elif escalation_score == 0:
escalation_text = (
"✅ **Escalation Checklist Completed:** No danger items reported.\n"
"🧭 **Escalation potential estimated from detected message patterns only.**\n"
f"• Pattern Risk: {pattern_escalation_risk}\n"
f"• Checklist Risk: None reported\n"
f"• Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)"
)
hybrid_score = escalation_bump
logger.debug("Generated output for no-risk checklist")
else:
hybrid_score = escalation_score + escalation_bump
escalation_text = (
f"📈 **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n"
"📋 This score combines your safety checklist answers *and* detected high-risk behavior.\n"
f"• Pattern Risk: {pattern_escalation_risk}\n"
f"• Checklist Risk: {checklist_escalation_risk}\n"
f"• Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)"
)
logger.debug(f"Generated output with hybrid score: {hybrid_score}/29")
# Final Metrics
logger.debug("\n📊 FINAL METRICS")
logger.debug("=" * 50)
composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores)))
logger.debug(f"Composite Abuse Score: {composite_abuse}%")
most_common_stage = max(set(stages), key=stages.count)
logger.debug(f"Most Common Stage: {most_common_stage}")
avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3)
logger.debug(f"Average DARVO Score: {avg_darvo}")
# Generate Final Report
logger.debug("\n📄 GENERATING FINAL REPORT")
logger.debug("=" * 50)
out = f"Abuse Intensity: {composite_abuse}%\n"
# Add detected patterns to output
if predicted_labels:
out += "🔍 Detected Patterns:\n"
if high_patterns:
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in high_patterns)
out += f"❗ High Severity: {patterns_str}\n"
if moderate_patterns:
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in moderate_patterns)
out += f"⚠️ Moderate Severity: {patterns_str}\n"
if low_patterns:
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in low_patterns)
out += f"📝 Low Severity: {patterns_str}\n"
out += "\n"
out += "📊 This reflects the strength and severity of detected abuse patterns in the message(s).\n\n"
# Risk Level Assessment
risk_level = (
"Critical" if composite_abuse >= 85 or hybrid_score >= 20 else
"High" if composite_abuse >= 70 or hybrid_score >= 15 else
"Moderate" if composite_abuse >= 50 or hybrid_score >= 10 else
"Low"
)
logger.debug(f"Final Risk Level: {risk_level}")
# Add Risk Description
risk_descriptions = {
"Critical": (
"🚨 **Risk Level: Critical**\n"
"Multiple severe abuse patterns detected. This situation shows signs of "
"dangerous escalation and immediate intervention may be needed."
),
"High": (
"⚠️ **Risk Level: High**\n"
"Strong abuse patterns detected. This situation shows concerning "
"signs of manipulation and control."
),
"Moderate": (
"⚡ **Risk Level: Moderate**\n"
"Concerning patterns detected. While not severe, these behaviors "
"indicate unhealthy relationship dynamics."
),
"Low": (
"📝 **Risk Level: Low**\n"
"Minor concerning patterns detected. While present, the detected "
"behaviors are subtle or infrequent."
)
}
out += risk_descriptions[risk_level]
out += f"\n\n{RISK_STAGE_LABELS[most_common_stage]}"
logger.debug("Added risk description and stage information")
# Add DARVO Analysis
if avg_darvo > 0.25:
level = "moderate" if avg_darvo < 0.65 else "high"
out += f"\n\n🎭 **DARVO Score: {avg_darvo}** → This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame."
logger.debug(f"Added DARVO analysis ({level} level)")
# Add Emotional Tones
logger.debug("\n🎭 Adding Emotional Tones")
out += "\n\n🎭 **Emotional Tones Detected:**\n"
for i, tone in enumerate(tone_tags):
out += f"• Message {i+1}: *{tone or 'none'}*\n"
logger.debug(f"Message {i+1} tone: {tone}")
# Add Threats Section
logger.debug("\n⚠️ Adding Threat Analysis")
if flat_threats:
out += "\n\n🚨 **Immediate Danger Threats Detected:**\n"
for t in set(flat_threats):
out += f"• \"{t}\"\n"
out += "\n⚠️ These phrases may indicate an imminent risk to physical safety."
logger.debug(f"Added {len(set(flat_threats))} unique threat warnings")
else:
out += "\n\n🧩 **Immediate Danger Threats:** None explicitly detected.\n"
out += "This does *not* rule out risk, but no direct threat phrases were matched."
logger.debug("No threats to add")
# Generate Timeline
logger.debug("\n📈 Generating Timeline")
pattern_labels = []
for result, _ in results:
matched_scores = result[2] # Get the matched_scores from the result tuple
if matched_scores:
# Sort matched_scores by score and get the highest scoring pattern
highest_pattern = max(matched_scores, key=lambda x: x[1])
pattern_labels.append(highest_pattern[0]) # Add the pattern name
else:
pattern_labels.append("none")
logger.debug("Pattern labels for timeline:")
for i, (pattern, score) in enumerate(zip(pattern_labels, abuse_scores)):
logger.debug(f"Message {i+1}: {pattern} ({score:.1f}%)")
timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels)
logger.debug("Timeline generated successfully")
# Add Escalation Text
out += "\n\n" + escalation_text
logger.debug("Added escalation text to output")
logger.debug("\n✅ ANALYSIS COMPLETE")
logger.debug("=" * 50)
return out, timeline_image
except Exception as e:
logger.error("\n❌ ERROR IN ANALYSIS")
logger.error("=" * 50)
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error message: {str(e)}")
logger.error(f"Traceback:\n{traceback.format_exc()}")
return "An error occurred during analysis.", None
# Gradio Interface Setup
def create_interface():
try:
textbox_inputs = [gr.Textbox(label=f"Message {i+1}") for i in range(3)]
quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS]
none_box = gr.Checkbox(label="None of the above")
demo = gr.Interface(
fn=analyze_composite,
inputs=textbox_inputs + quiz_boxes + [none_box],
outputs=[
gr.Textbox(label="Results"),
gr.Image(label="Abuse Score Timeline", type="pil")
],
title="Abuse Pattern Detector + Escalation Quiz",
description=(
"Enter up to three messages that concern you. "
"For the most accurate results, include messages from a recent emotionally intense period."
),
flagging_mode="manual"
)
return demo
except Exception as e:
logger.error(f"Error creating interface: {e}")
raise
# Main execution
if __name__ == "__main__":
try:
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)
except Exception as e:
logger.error(f"Failed to launch app: {e}")
raise