Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from transformers import pipeline as hf_pipeline, AutoModelForSequenceClassification, AutoTokenizer | |
from PIL import Image | |
import io | |
import easyocr | |
import numpy as np | |
import pandas as pd | |
# ——— Load and preprocess NRC EmoLex —————————————————————————————————— | |
# Make sure this filename matches exactly what you’ve uploaded | |
EMOLEX_PATH = "NRC-Emotion-Lexicon-Wordlevel-v0.92.txt" | |
# Load the raw triples | |
emo_raw = pd.read_csv( | |
EMOLEX_PATH, | |
sep="\t", | |
names=["word","emotion","flag"], | |
comment="#", # skip any commented lines | |
header=None | |
) | |
# Pivot: word → { emotion: 0 or 1, … } | |
emo_df = ( | |
emo_raw | |
.pivot(index="word", columns="emotion", values="flag") | |
.fillna(0) | |
.astype(int) | |
) | |
# Final lookup dict: EMOLEX["happy"]["joy"] == 1 | |
EMOLEX = emo_df.to_dict(orient="index") | |
def score_emolex(text_lower): | |
# count how many times each emotion appears in the lexicon | |
counts = {emo: 0 for emo in emo_df.columns} | |
for tok in text_lower.split(): | |
if tok in EMOLEX: | |
for emo, flag in EMOLEX[tok].items(): | |
counts[emo] += flag | |
return counts | |
# ——— 1) Emotion Pipeline ———————————————————————————————————————————————— | |
emotion_pipeline = hf_pipeline( | |
"text-classification", | |
model="j-hartmann/emotion-english-distilroberta-base", | |
top_k=None, | |
truncation=True | |
) | |
def get_emotion_profile(text): | |
results = emotion_pipeline(text) | |
if isinstance(results, list) and isinstance(results[0], list): | |
results = results[0] | |
return {r["label"].lower(): round(r["score"], 3) for r in results} | |
# apology keywords for pleading concern | |
APOLOGY_KEYWORDS = ["sorry", "apolog", "forgive"] | |
# ——— 2) Abuse-Patterns Model —————————————————————————————————————————————— | |
model_name = "SamanthaStorm/tether-multilabel-v3" | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
LABELS = [ | |
"blame shifting", "contradictory statements", "control", "dismissiveness", | |
"gaslighting", "guilt tripping", "insults", "obscure language", | |
"projection", "recovery phase", "threat" | |
] | |
THRESHOLDS = { | |
"blame shifting": 0.28, | |
"contradictory statements": 0.27, | |
"control": 0.08, | |
"dismissiveness": 0.32, | |
"gaslighting": 0.27, | |
"guilt tripping": 0.31, | |
"insults": 0.10, | |
"obscure language": 0.55, | |
"projection": 0.09, | |
"recovery phase": 0.33, | |
"threat": 0.15 | |
} | |
# ——— 3) Initialize EasyOCR reader ———————————————————————————————————————————— | |
ocr_reader = easyocr.Reader(["en"], gpu=False) | |
# ——— 4) Emotional-Tone Tagging ————————————————————————————————————————————— | |
def get_emotional_tone_tag(emotion_profile, patterns, text_lower): | |
""" | |
Assigns one of 18 nuanced tone categories based on emotion scores, patterns, and text. | |
""" | |
# unpack all emotion scores before any rules | |
sadness = emotion_profile.get("sadness", 0) | |
joy = emotion_profile.get("joy", 0) | |
neutral = emotion_profile.get("neutral", 0) | |
disgust = emotion_profile.get("disgust", 0) | |
anger = emotion_profile.get("anger", 0) | |
fear = emotion_profile.get("fear", 0) | |
surprise = emotion_profile.get("surprise", 0) | |
# 0. Support override | |
if any(k in text_lower for k in ["support", "hope", "grace"]): | |
return "supportive" | |
# 1. Performative Regret | |
if sadness > 0.4 and any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery phase"]): | |
return "performative regret" | |
# 2. Coercive Warmth | |
if (joy > 0.3 or sadness > 0.4) and any(p in patterns for p in ["control", "gaslighting"]): | |
return "coercive warmth" | |
# 3. Cold Invalidation | |
if (neutral + disgust) > 0.5 and any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]): | |
return "cold invalidation" | |
# 4. Genuine Vulnerability | |
if (sadness + fear) > 0.5 and all(p == "recovery phase" for p in patterns): | |
return "genuine vulnerability" | |
# 5. Emotional Threat | |
if (anger + disgust) > 0.5 and any(p in patterns for p in ["control", "threat", "insults", "dismissiveness"]): | |
return "emotional threat" | |
# 6. Weaponized Sadness | |
if sadness > 0.6 and any(p in patterns for p in ["guilt tripping", "projection"]): | |
return "weaponized sadness" | |
# 7. Toxic Resignation | |
if neutral > 0.5 and any(p in patterns for p in ["dismissiveness", "obscure language"]): | |
return "toxic resignation" | |
# 8. Indignant Reproach | |
if anger > 0.5 and any(p in patterns for p in ["guilt tripping", "contradictory statements"]): | |
return "indignant reproach" | |
# 9. Confrontational | |
if anger > 0.6 and patterns: | |
return "confrontational" | |
# 10. Passive Aggression | |
if neutral > 0.6 and any(p in patterns for p in ["dismissiveness", "projection"]): | |
return "passive aggression" | |
# 11. Sarcastic Mockery | |
if joy > 0.3 and "insults" in patterns: | |
return "sarcastic mockery" | |
# 12. Menacing Threat | |
if fear > 0.3 and "threat" in patterns: | |
return "menacing threat" | |
# 13. Pleading Concern | |
if sadness > 0.3 and any(k in text_lower for k in APOLOGY_KEYWORDS) and not patterns: | |
return "pleading concern" | |
# 14. Fear-mongering | |
if (fear + disgust) > 0.5 and "projection" in patterns: | |
return "fear-mongering" | |
# 15. Disbelieving Accusation | |
if surprise > 0.3 and "blame shifting" in patterns: | |
return "disbelieving accusation" | |
# 16. Empathetic Solidarity | |
if joy > 0.2 and sadness > 0.2 and not patterns: | |
return "empathetic solidarity" | |
# 17. Assertive Boundary | |
if anger > 0.4 and "control" in patterns: | |
return "assertive boundary" | |
# 18. Stonewalling | |
if neutral > 0.7 and not patterns: | |
return "stonewalling" | |
return None | |
# ——— 5) Single message analysis ——————————————————————————————————————————— | |
def analyze_message(text): | |
text_lower = text.lower() | |
emotion_profile = get_emotion_profile(text) | |
# 2a. get lexicon counts | |
lex_counts = score_emolex(text_lower) | |
max_lex = max(lex_counts.values()) or 1.0 # avoid div0 | |
# 2b. normalize them to [0,1] | |
lex_scores = {emo: cnt / max_lex for emo, cnt in lex_counts.items()} | |
# 2c. blend: take the max of transformer & lexicon | |
for emo in emotion_profile: | |
emotion_profile[emo] = max(emotion_profile[emo], lex_scores.get(emo, 0)) | |
toks = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
logits = model(**toks).logits.squeeze(0) | |
scores = torch.sigmoid(logits).cpu().numpy() | |
active_patterns = [label for label, prob in zip(LABELS, scores) if prob >= THRESHOLDS[label]] | |
if any(k in text_lower for k in APOLOGY_KEYWORDS) and "recovery phase" not in active_patterns: | |
active_patterns.append("recovery phase") | |
tone_tag = get_emotional_tone_tag(emotion_profile, active_patterns, text_lower) | |
return {"emotion_profile": emotion_profile, "active_patterns": active_patterns, "tone_tag": tone_tag} | |
# ——— 6) Composite wrapper ——————————————————————————————————————————————— | |
def analyze_composite(uploaded_file, *texts): | |
outputs = [] | |
if uploaded_file is not None: | |
try: | |
raw = uploaded_file.read() | |
except Exception: | |
with open(uploaded_file, "rb") as f: | |
raw = f.read() | |
name = ( | |
uploaded_file.name.lower() if hasattr(uploaded_file, "name") else uploaded_file.lower() | |
) | |
if name.endswith((".png",".jpg",".jpeg",".tiff",".bmp",".gif")): | |
img = Image.open(io.BytesIO(raw)) | |
arr = np.array(img.convert("RGB")) | |
texts_ocr = ocr_reader.readtext(arr, detail=0) | |
content = "\n".join(texts_ocr) | |
else: | |
try: | |
content = raw.decode("utf-8") | |
except UnicodeDecodeError: | |
content = raw.decode("latin-1") | |
r = analyze_message(content) | |
outputs.append( | |
"── Uploaded File ──\n" | |
f"Emotion Profile : {r['emotion_profile']}\n" | |
f"Active Patterns : {r['active_patterns']}\n" | |
f"Emotional Tone : {r['tone_tag']}\n" | |
) | |
for idx, txt in enumerate(texts, start=1): | |
if not txt: | |
continue | |
r = analyze_message(txt) | |
outputs.append( | |
f"── Message {idx} ──\n" | |
f"Emotion Profile : {r['emotion_profile']}\n" | |
f"Active Patterns : {r['active_patterns']}\n" | |
f"Emotional Tone : {r['tone_tag']}\n" | |
) | |
if not outputs: | |
return "Please enter at least one message." | |
return "\n".join(outputs) | |
# ——— 7) Gradio interface ——————————————————————————————————————————————— | |
message_inputs = [gr.Textbox(label="Message")] | |
iface = gr.Interface( | |
fn=analyze_composite, | |
inputs=[gr.File(file_types=[".txt",".png",".jpg",".jpeg"], label="Upload text or image")] + message_inputs, | |
outputs=gr.Textbox(label="Analysis"), | |
title="Tether Analyzer (extended tone tags)", | |
description="Emotion profiling, pattern tags, and a wide set of nuanced tone categories—no abuse score or DARVO." | |
) | |
if __name__ == "__main__": | |
iface.launch() | |