Spaces:
Running
on
Zero
Running
on
Zero
File size: 51,474 Bytes
456b288 da3b80d 456b288 7df97d2 456b288 68ee468 6c8beb4 025be57 68ee468 7df97d2 9879504 21f4a39 9879504 21f4a39 9879504 21f4a39 9879504 21f4a39 5d703a7 21f4a39 da3b80d 16d3234 e01a07a 16d3234 950da18 e01a07a 16d3234 950da18 16d3234 e01a07a 16d3234 394b179 da3b80d e01a07a ba75107 7df97d2 a2ab030 7df97d2 da3b80d 7df97d2 456b288 68ee468 456b288 c3a3860 68ee468 950da18 68ee468 456b288 6c8218a 9c4d0f5 6c8218a 456b288 bcfa072 456b288 9c4d0f5 456b288 bcfa072 456b288 bcfa072 456b288 bcfa072 456b288 68ee468 456b288 68ee468 456b288 7df97d2 9784a67 a2ab030 4096bc4 a2ab030 4096bc4 a2ab030 4096bc4 a2ab030 4096bc4 a2ab030 4096bc4 a2ab030 4096bc4 9784a67 7df97d2 9784a67 e5da456 b55c8f1 41b1d41 b55c8f1 41b1d41 b55c8f1 41b1d41 b55c8f1 41b1d41 b55c8f1 a84b6dc c39a1d0 b55c8f1 7df97d2 9784a67 a2ab030 7df97d2 9784a67 a2ab030 9784a67 a2ab030 9784a67 7df97d2 9784a67 a2ab030 9784a67 7df97d2 9784a67 a2ab030 9784a67 a2ab030 9784a67 a2ab030 9784a67 7df97d2 9784a67 a2ab030 7df97d2 9784a67 7df97d2 9784a67 a2ab030 7df97d2 9784a67 7df97d2 9784a67 a2ab030 9784a67 7df97d2 9784a67 a2ab030 7df97d2 9784a67 a2ab030 9784a67 456b288 7df97d2 456b288 68ee468 456b288 68ee468 456b288 68ee468 456b288 9c4d0f5 456b288 da3b80d 456b288 7df97d2 68ee468 bcfa072 68ee468 bcfa072 68ee468 bcfa072 68ee468 456b288 bcfa072 68ee468 bcfa072 456b288 bcfa072 68ee468 bcfa072 68ee468 bcfa072 68ee468 bcfa072 68ee468 bcfa072 68ee468 bcfa072 68ee468 bcfa072 68ee468 456b288 bcfa072 da3b80d 456b288 68ee468 025be57 68ee468 7df97d2 68ee468 456b288 68ee468 d0e2f9e 40e9611 025be57 6d45d2a 025be57 6d45d2a 025be57 68ee468 16d3234 68ee468 16d3234 68ee468 16d3234 68ee468 025be57 9c4d0f5 7df97d2 025be57 b79ebea 456b288 68ee468 025be57 68ee468 a95563f 7df97d2 68ee468 e345a71 68ee468 456b288 68ee468 5062436 68ee468 5062436 68ee468 456b288 68ee468 5062436 68ee468 456b288 5062436 456b288 68ee468 9879504 cbb0dfa 68ee468 9879504 68ee468 6c8beb4 9879504 68ee468 9879504 68ee468 9879504 6c8beb4 9879504 6c8beb4 9879504 68ee468 9879504 68ee468 9879504 68ee468 9879504 68ee468 9879504 68ee468 9879504 cbb0dfa 68ee468 b79ebea 9879504 6c8beb4 9879504 b79ebea 6c8beb4 b79ebea 6c8beb4 b79ebea 6c8beb4 9879504 6c8beb4 9879504 68ee468 cbb0dfa 9879504 68ee468 9879504 cbb0dfa 9879504 68ee468 6c8beb4 9879504 cbb0dfa 9879504 68ee468 cbb0dfa 68ee468 cbb0dfa 68ee468 cbb0dfa 68ee468 cbb0dfa 456b288 9879504 68ee468 456b288 9879504 cbb0dfa 9879504 68ee468 6c8beb4 68ee468 cbb0dfa 9879504 68ee468 cbb0dfa 68ee468 9879504 68ee468 9879504 68ee468 9879504 6c8beb4 9879504 9c4d0f5 68ee468 9879504 68ee468 9879504 cbb0dfa 9879504 6c8beb4 68ee468 e345a71 9879504 68ee468 9879504 68ee468 9879504 68ee468 9879504 e345a71 9879504 68ee468 9879504 68ee468 9879504 68ee468 cbb0dfa 9879504 7df97d2 e5da456 7df97d2 e5da456 68ee468 9879504 7df97d2 9879504 7df97d2 9879504 cbb0dfa 7df97d2 9879504 68ee468 7df97d2 9879504 68ee468 9879504 68ee468 6c8beb4 68ee468 9879504 68ee468 cbb0dfa 68ee468 6c8beb4 68ee468 9879504 5062436 68ee468 6c8beb4 68ee468 9879504 68ee468 cbb0dfa 6c8beb4 9879504 68ee468 9879504 68ee468 6c8beb4 9879504 cbb0dfa 68ee468 7df97d2 68ee468 da3b80d 68ee468 7df97d2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 |
import gradio as gr
import spaces
import torch
import numpy as np
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline
import re
import matplotlib.pyplot as plt
import io
from PIL import Image
from datetime import datetime
from torch.nn.functional import sigmoid
from collections import Counter
import logging
import traceback
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
logger.info(f"Using device: {device}")
# Set up custom logging
# Set up custom logging
class CustomFormatter(logging.Formatter):
"""Custom formatter with colors and better formatting"""
grey = "\x1b[38;21m"
blue = "\x1b[38;5;39m"
yellow = "\x1b[38;5;226m"
red = "\x1b[38;5;196m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
def format(self, record):
# Remove the logger name from the output
if record.levelno == logging.DEBUG:
return f"{self.blue}{record.getMessage()}{self.reset}"
elif record.levelno == logging.INFO:
return f"{self.grey}{record.getMessage()}{self.reset}"
elif record.levelno == logging.WARNING:
return f"{self.yellow}{record.getMessage()}{self.reset}"
elif record.levelno == logging.ERROR:
return f"{self.red}{record.getMessage()}{self.reset}"
elif record.levelno == logging.CRITICAL:
return f"{self.bold_red}{record.getMessage()}{self.reset}"
return record.getMessage()
# Setup logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Remove any existing handlers
logger.handlers = []
# Create console handler with custom formatter
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
# Suppress matplotlib font debugging
matplotlib_logger = logging.getLogger('matplotlib.font_manager')
matplotlib_logger.setLevel(logging.WARNING)
# Also suppress the UserWarning about tight layout
import warnings
warnings.filterwarnings("ignore", message="Tight layout not applied")
# Model initialization
model_name = "SamanthaStorm/tether-multilabel-v4"
model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
# sentiment model - add no_cache=True and force_download=True
# Model initialization
model_name = "SamanthaStorm/tether-multilabel-v4"
model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
# sentiment model
sentiment_model = AutoModelForSequenceClassification.from_pretrained(
"SamanthaStorm/tether-sentiment-v3",
force_download=True,
local_files_only=False
).to(device)
sentiment_tokenizer = AutoTokenizer.from_pretrained(
"SamanthaStorm/tether-sentiment-v3",
use_fast=False,
force_download=True,
local_files_only=False
)
# After loading the sentiment model
logger.debug(f"\nSentiment Model Config:")
logger.debug(f"Model name: {sentiment_model.config.name_or_path}")
logger.debug(f"Last modified: {sentiment_model.config._name_or_path}")
emotion_pipeline = hf_pipeline(
"text-classification",
model="j-hartmann/emotion-english-distilroberta-base",
return_all_scores=True, # Get all emotion scores
top_k=None, # Don't limit to top k predictions
truncation=True,
device=0 if torch.cuda.is_available() else -1
)
# DARVO model
darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device)
darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False)
darvo_model.eval()
# Constants and Labels
LABELS = [
"recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness",
"blame shifting", "nonabusive", "projection", "insults",
"contradictory statements", "obscure language"
]
SENTIMENT_LABELS = ["supportive", "undermining"]
THRESHOLDS = {
"recovery phase": 0.324,
"control": 0.33,
"gaslighting": 0.285,
"guilt tripping": 0.267,
"dismissiveness": 0.123,
"blame shifting": 0.116,
"projection": 0.425,
"insults": 0.347,
"contradictory statements": 0.378,
"obscure language": 0.206,
"nonabusive": 0.094
}
PATTERN_WEIGHTS = {
"recovery phase": 0.7,
"control": 1.4,
"gaslighting": 1.3,
"guilt tripping": 1.2,
"dismissiveness": 0.9,
"blame shifting": 1.0, # Increased from 0.8
"projection": 0.5,
"insults": 1.4, # Reduced from 2.1
"contradictory statements": 1.0,
"obscure language": 0.9,
"nonabusive": 0.0
}
ESCALATION_QUESTIONS = [
("Partner has access to firearms or weapons", 4),
("Partner threatened to kill you", 3),
("Partner threatened you with a weapon", 3),
("Partner has ever choked you, even if you considered it consensual at the time", 4),
("Partner injured or threatened your pet(s)", 3),
("Partner has broken your things, punched or kicked walls, or thrown things ", 2),
("Partner forced or coerced you into unwanted sexual acts", 3),
("Partner threatened to take away your children", 2),
("Violence has increased in frequency or severity", 3),
("Partner monitors your calls/GPS/social media", 2)
]
RISK_STAGE_LABELS = {
1: "π Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.",
2: "π₯ Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.",
3: "π§οΈ Risk Stage: Reconciliation\nThis message reflects a reset attemptβapologies or emotional repair without accountability.",
4: "πΈ Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it."
}
THREAT_MOTIFS = [
"i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this",
"i'll break your face", "i'll bash your head in", "i'll snap your neck",
"i'll come over there and make you shut up", "i'll knock your teeth out",
"you're going to bleed", "you want me to hit you?", "i won't hold back next time",
"i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream",
"i know where you live", "i'm outside", "i'll be waiting", "i saw you with him",
"you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule",
"i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry",
"you're going to wish you hadn't", "you brought this on yourself", "don't push me",
"you have no idea what i'm capable of", "you better watch yourself",
"i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this",
"i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself",
"i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows",
"i'm going to destroy your name", "you'll lose everyone", "i'll expose you",
"your friends will hate you", "i'll post everything", "you'll be cancelled",
"you'll lose everything", "i'll take the house", "i'll drain your account",
"you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job",
"i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me",
"don't make me do this", "you know what happens when i'm mad", "you're forcing my hand",
"if you just behaved, this wouldn't happen", "this is your fault",
"you're making me hurt you", "i warned you", "you should have listened"
]
def get_emotion_profile(text):
"""Get emotion profile from text with all scores"""
try:
logger.debug("\nπ EMOTION ANALYSIS")
logger.debug(f"Analyzing text: {text}")
emotions = emotion_pipeline(text)
logger.debug(f"Raw emotion pipeline output: {emotions}")
if isinstance(emotions, list) and isinstance(emotions[0], list):
# Extract all scores from the first prediction
emotion_scores = emotions[0]
# Log raw scores
logger.debug("\nRaw emotion scores:")
for e in emotion_scores:
logger.debug(f" β’ {e['label']}: {e['score']:.3f}")
# Convert to dictionary
emotion_dict = {e['label'].lower(): round(e['score'], 3) for e in emotion_scores}
# Log final processed emotions
logger.debug("\nProcessed emotion profile:")
for emotion, score in emotion_dict.items():
logger.debug(f" β’ {emotion}: {score:.3f}")
return emotion_dict
logger.debug("No valid emotions detected, returning empty dict")
return {}
except Exception as e:
logger.error(f"Error in get_emotion_profile: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
default_emotions = {
"sadness": 0.0,
"joy": 0.0,
"neutral": 0.0,
"disgust": 0.0,
"anger": 0.0,
"fear": 0.0
}
logger.debug(f"Returning default emotions: {default_emotions}")
return default_emotions
def get_emotional_tone_tag(text, sentiment, patterns, abuse_score):
"""Get emotional tone tag based on emotions and patterns"""
emotions = get_emotion_profile(text)
sadness = emotions.get("sadness", 0)
joy = emotions.get("joy", 0)
neutral = emotions.get("neutral", 0)
disgust = emotions.get("disgust", 0)
anger = emotions.get("anger", 0)
fear = emotions.get("fear", 0)
# Direct Threat (New)
text_lower = text.lower()
threat_indicators = [
"if you", "i'll make", "don't forget", "remember", "regret",
"i control", "i'll take", "you'll lose", "make sure",
"never see", "won't let"
]
if (
any(indicator in text_lower for indicator in threat_indicators) and
any(p in patterns for p in ["control", "insults"]) and
(anger > 0.2 or disgust > 0.2 or abuse_score > 70)
):
return "direct threat"
# Prophetic Punishment (New)
text_lower = text.lower()
future_consequences = [
"will end up", "you'll be", "you will be", "going to be",
"will become", "will find yourself", "will realize",
"you'll regret", "you'll see", "will learn", "truly will",
"end up alone", "end up miserable"
]
dismissive_endings = [
"i'm out", "i'm done", "whatever", "good luck",
"your choice", "your problem", "regardless",
"keep", "keep on"
]
if (
(any(phrase in text_lower for phrase in future_consequences) or
any(end in text_lower for end in dismissive_endings)) and
any(p in ["dismissiveness", "control"] for p in patterns) and
(disgust > 0.2 or neutral > 0.3 or anger > 0.2) # Lowered thresholds
):
return "predictive punishment"
if (
(any(phrase in text_lower for phrase in future_consequences) or
any(end in text_lower for end in dismissive_endings)) and
any(p in ["dismissiveness", "control"] for p in patterns) and
sadness > 0.6 and
all(e < 0.1 for e in [anger, disgust, neutral])
):
return "predictive punishment"
# 1. Performative Regret
if (
sadness > 0.3 and
any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and
(sentiment == "undermining" or abuse_score > 40)
):
return "performative regret"
# 2. Coercive Warmth
if (
(joy > 0.2 or sadness > 0.3) and
any(p in patterns for p in ["control", "gaslighting"]) and
sentiment == "undermining"
):
return "coercive warmth"
# 3. Cold Invalidation
if (
(neutral + disgust) > 0.4 and
any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and
sentiment == "undermining"
):
return "cold invalidation"
# 4. Genuine Vulnerability
if (
(sadness + fear) > 0.4 and
sentiment == "supportive" and
all(p in ["recovery"] for p in patterns)
):
return "genuine vulnerability"
# 5. Emotional Threat
if (
(anger + disgust) > 0.4 and
any(p in patterns for p in ["control", "insults", "dismissiveness"]) and
sentiment == "undermining"
):
return "emotional threat"
# 6. Weaponized Sadness
if (
sadness > 0.5 and
any(p in patterns for p in ["guilt tripping", "projection"]) and
sentiment == "undermining"
):
return "weaponized sadness"
# 7. Toxic Resignation
if (
neutral > 0.4 and
any(p in patterns for p in ["dismissiveness", "obscure language"]) and
sentiment == "undermining"
):
return "toxic resignation"
# 8. Aggressive Dismissal
if (
anger > 0.4 and
any(p in patterns for p in ["insults", "control"]) and
sentiment == "undermining"
):
return "aggressive dismissal"
# 9. Deflective Hostility
if (
(0.15 < anger < 0.6 or 0.15 < disgust < 0.6) and
any(p in patterns for p in ["projection"]) and
sentiment == "undermining"
):
return "deflective hostility"
# 10. Contradictory Gaslight
if (
(joy + anger + sadness) > 0.4 and
any(p in patterns for p in ["gaslighting", "contradictory statements"]) and
sentiment == "undermining"
):
return "contradictory gaslight"
# 11. Forced Accountability Flip
if (
(anger + disgust) > 0.4 and
any(p in patterns for p in ["blame shifting", "projection"]) and
sentiment == "undermining"
):
return "forced accountability flip"
# Emotional Instability Fallback
if (
(anger + sadness + disgust) > 0.5 and
sentiment == "undermining"
):
return "emotional instability"
return "neutral"
@spaces.GPU
def predict_darvo_score(text):
"""Predict DARVO score for given text"""
try:
inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
logits = darvo_model(**inputs).logits
return round(sigmoid(logits.cpu()).item(), 4)
except Exception as e:
logger.error(f"Error in DARVO prediction: {e}")
return 0.0
def detect_weapon_language(text):
"""Detect weapon-related language in text"""
weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"]
t = text.lower()
return any(w in t for w in weapon_keywords)
def get_risk_stage(patterns, sentiment):
"""Determine risk stage based on patterns and sentiment"""
try:
if "insults" in patterns:
return 2
elif "recovery" in patterns:
return 3
elif "control" in patterns or "guilt tripping" in patterns:
return 1
elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]):
return 4
return 1
except Exception as e:
logger.error(f"Error determining risk stage: {e}")
return 1
def detect_threat_pattern(text, patterns):
"""Detect if a message contains threat patterns"""
# Threat indicators in text
threat_words = [
"regret", "sorry", "pay", "hurt", "suffer", "destroy", "ruin",
"expose", "tell everyone", "never see", "take away", "lose",
"control", "make sure", "won't let", "force", "warn", "never",
"punish", "teach you", "learn", "show you", "remember",
"if you", "don't forget", "i control", "i'll make sure", # Added these specific phrases
"bank account", "phone", "money", "access" # Added financial control indicators
]
# Check for conditional threats (if/then structures)
text_lower = text.lower()
conditional_threat = (
"if" in text_lower and
any(word in text_lower for word in ["regret", "make sure", "control"])
)
has_threat_words = any(word in text_lower for word in threat_words)
# Check for threat patterns
threat_patterns = {"control", "gaslighting", "blame shifting", "insults"}
has_threat_patterns = any(p in threat_patterns for p in patterns)
return has_threat_words or has_threat_patterns or conditional_threat
def detect_compound_threat(text, patterns):
"""Detect compound threats in a single message"""
try:
# Rule A: Single Message Multiple Patterns
high_risk_patterns = {"control", "gaslighting", "blame shifting", "insults"}
high_risk_count = sum(1 for p in patterns if p in high_risk_patterns)
has_threat = detect_threat_pattern(text, patterns)
# Special case for control + threats
has_control = "control" in patterns
has_conditional_threat = "if" in text.lower() and any(word in text.lower()
for word in ["regret", "make sure", "control"])
# Single message compound threat
if (has_threat and high_risk_count >= 2) or (has_control and has_conditional_threat):
return True, "single_message"
return False, None
except Exception as e:
logger.error(f"Error in compound threat detection: {e}")
return False, None
def analyze_message_batch_threats(messages, results):
"""Analyze multiple messages for compound threats"""
threat_messages = []
support_messages = []
for i, (msg, (result, _)) in enumerate(zip(messages, results)):
if not msg.strip(): # Skip empty messages
continue
patterns = result[1] # Get detected patterns
# Check for threat in this message
if detect_threat_pattern(msg, patterns):
threat_messages.append(i)
# Check for supporting patterns
if any(p in {"control", "gaslighting", "blame shifting"} for p in patterns):
support_messages.append(i)
# Rule B: Multi-Message Accumulation
if len(threat_messages) >= 2:
return True, "multiple_threats"
elif len(threat_messages) == 1 and len(support_messages) >= 2:
return True, "threat_with_support"
return False, None
@spaces.GPU
def compute_abuse_score(matched_scores, sentiment):
"""Compute abuse score from matched patterns and sentiment"""
try:
if not matched_scores:
logger.debug("No matched scores, returning 0")
return 0.0
# Calculate weighted score
total_weight = sum(weight for _, _, weight in matched_scores)
if total_weight == 0:
logger.debug("Total weight is 0, returning 0")
return 0.0
# Get highest pattern scores
pattern_scores = [(label, score) for label, score, _ in matched_scores]
sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True)
logger.debug(f"Sorted pattern scores: {sorted_scores}")
# Base score calculation
weighted_sum = sum(score * weight for _, score, weight in matched_scores)
base_score = (weighted_sum / total_weight) * 100
logger.debug(f"Initial base score: {base_score:.1f}")
# Cap maximum score based on pattern severity
max_score = 85.0 # Set maximum possible score
if any(label in {'control', 'gaslighting'} for label, _, _ in matched_scores):
max_score = 90.0
logger.debug(f"Increased max score to {max_score} due to high severity patterns")
# Apply diminishing returns for multiple patterns
if len(matched_scores) > 1:
multiplier = 1 + (0.1 * (len(matched_scores) - 1))
base_score *= multiplier
logger.debug(f"Applied multiplier {multiplier:.2f} for {len(matched_scores)} patterns")
# Apply sentiment modifier
if sentiment == "supportive":
base_score *= 0.85
logger.debug("Applied 15% reduction for supportive sentiment")
final_score = min(round(base_score, 1), max_score)
logger.debug(f"Final abuse score: {final_score}")
return final_score
except Exception as e:
logger.error(f"Error computing abuse score: {e}")
return 0.0
@spaces.GPU
def analyze_single_message(text, thresholds):
"""Analyze a single message for abuse patterns"""
logger.debug("\n=== DEBUG START ===")
logger.debug(f"Input text: {text}")
try:
if not text.strip():
logger.debug("Empty text, returning zeros")
return 0.0, [], [], {"label": "none"}, 1, 0.0, None
# Check for explicit abuse
explicit_abuse_words = ['fuck', 'bitch', 'shit', 'ass', 'dick']
explicit_abuse = any(word in text.lower() for word in explicit_abuse_words)
logger.debug(f"Explicit abuse detected: {explicit_abuse}")
# Abuse model inference
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy()
# Log raw model outputs
logger.debug("\nRaw model scores:")
for label, score in zip(LABELS, raw_scores):
logger.debug(f"{label}: {score:.3f}")
# Get predictions and sort them
predictions = list(zip(LABELS, raw_scores))
sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True)
logger.debug("\nTop 3 predictions:")
for label, score in sorted_predictions[:3]:
logger.debug(f"{label}: {score:.3f}")
# Apply thresholds
threshold_labels = []
if explicit_abuse:
threshold_labels.append("insults")
logger.debug("\nForced inclusion of 'insults' due to explicit abuse")
for label, score in sorted_predictions:
base_threshold = thresholds.get(label, 0.25)
if explicit_abuse:
base_threshold *= 0.5
if score > base_threshold:
if label not in threshold_labels:
threshold_labels.append(label)
logger.debug("\nLabels that passed thresholds:", threshold_labels)
# Calculate matched scores
matched_scores = []
for label in threshold_labels:
score = raw_scores[LABELS.index(label)]
weight = PATTERN_WEIGHTS.get(label, 1.0)
if explicit_abuse and label == "insults":
weight *= 1.5
matched_scores.append((label, score, weight))
# In analyze_single_message, modify the sentiment section:
# Get sentiment
sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True)
sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()}
with torch.no_grad():
sent_logits = sentiment_model(**sent_inputs).logits[0]
sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy()
# Add detailed logging
logger.debug("\nπ SENTIMENT ANALYSIS DETAILS")
logger.debug(f"Raw logits: {sent_logits}")
logger.debug(f"Probabilities: undermining={sent_probs[0]:.3f}, supportive={sent_probs[1]:.3f}")
sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))]
logger.debug(f"Selected sentiment: {sentiment}")
# Calculate abuse score
abuse_score = compute_abuse_score(matched_scores, sentiment)
if explicit_abuse:
abuse_score = max(abuse_score, 70.0)
# Check for compound threats
compound_threat_flag, threat_type = detect_compound_threat(
text, threshold_labels
)
if compound_threat_flag:
logger.debug(f"β οΈ Compound threat detected in message: {threat_type}")
abuse_score = max(abuse_score, 85.0) # Force high score for compound threats
# Get DARVO score
darvo_score = predict_darvo_score(text)
# Get tone using emotion-based approach
tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score)
# Check for the specific combination
highest_pattern = max(matched_scores, key=lambda x: x[1])[0] if matched_scores else None # Get highest pattern
if sentiment == "supportive" and tone_tag == "neutral" and highest_pattern == "obscure language":
logger.debug("Message classified as likely non-abusive (supportive, neutral, and obscure language). Returning low risk.")
return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral" # Return non-abusive values
# Set stage
stage = 2 if explicit_abuse or abuse_score > 70 else 1
logger.debug("=== DEBUG END ===\n")
return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag
except Exception as e:
logger.error(f"Error in analyze_single_message: {e}")
return 0.0, [], [], {"label": "error"}, 1, 0.0, None
def generate_abuse_score_chart(dates, scores, patterns):
"""Generate a timeline chart of abuse scores"""
try:
plt.figure(figsize=(10, 6))
plt.clf()
# Create new figure
fig, ax = plt.subplots(figsize=(10, 6))
# Plot points and lines
x = range(len(scores))
plt.plot(x, scores, 'bo-', linewidth=2, markersize=8)
# Add labels for each point with highest scoring pattern
for i, (score, pattern) in enumerate(zip(scores, patterns)):
# Get the pattern and its score
plt.annotate(
f'{pattern}\n{score:.0f}%',
(i, score),
textcoords="offset points",
xytext=(0, 10),
ha='center',
bbox=dict(
boxstyle='round,pad=0.5',
fc='white',
ec='gray',
alpha=0.8
)
)
# Customize the plot
plt.ylim(-5, 105)
plt.grid(True, linestyle='--', alpha=0.7)
plt.title('Abuse Pattern Timeline', pad=20, fontsize=12)
plt.ylabel('Abuse Score %')
# X-axis labels
plt.xticks(x, dates, rotation=45)
# Risk level bands with better colors
plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green - Low Risk
plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold - Moderate Risk
plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange - High Risk
plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red - Critical Risk
# Add risk level labels
plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center')
plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center')
plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center')
plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center')
# Adjust layout
plt.tight_layout()
# Convert plot to image
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
plt.close('all') # Close all figures to prevent memory leaks
return Image.open(buf)
except Exception as e:
logger.error(f"Error generating abuse score chart: {e}")
return None
def analyze_composite(msg1, msg2, msg3, *answers_and_none):
"""Analyze multiple messages and checklist responses"""
logger.debug("\nπ STARTING NEW ANALYSIS")
logger.debug("=" * 50)
# Define severity categories at the start
high = {'control'}
moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults',
'contradictory statements', 'guilt tripping'}
low = {'blame shifting', 'projection', 'recovery'}
try:
# Process checklist responses
logger.debug("\nπ CHECKLIST PROCESSING")
logger.debug("=" * 50)
none_selected_checked = answers_and_none[-1]
responses_checked = any(answers_and_none[:-1])
none_selected = not responses_checked and none_selected_checked
logger.debug("Checklist Status:")
logger.debug(f" β’ None Selected Box: {'β' if none_selected_checked else 'β'}")
logger.debug(f" β’ Has Responses: {'β' if responses_checked else 'β'}")
logger.debug(f" β’ Final Status: {'None Selected' if none_selected else 'Has Selections'}")
if none_selected:
escalation_score = 0
escalation_note = "Checklist completed: no danger items reported."
escalation_completed = True
logger.debug("\nβ Checklist: No items selected")
elif responses_checked:
escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a)
escalation_note = "Checklist completed."
escalation_completed = True
logger.debug(f"\nπ Checklist Score: {escalation_score}")
# Log checked items
logger.debug("\nβ οΈ Selected Risk Factors:")
for (q, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]):
if a:
logger.debug(f" β’ [{w} points] {q}")
else:
escalation_score = None
escalation_note = "Checklist not completed."
escalation_completed = False
logger.debug("\nβ Checklist: Not completed")
# Process messages
logger.debug("\nπ MESSAGE PROCESSING")
logger.debug("=" * 50)
messages = [msg1, msg2, msg3]
active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()]
logger.debug(f"Active Messages: {len(active)} of 3")
if not active:
logger.debug("β Error: No messages provided")
return "Please enter at least one message.", None
# Detect threats
logger.debug("\nπ¨ THREAT DETECTION")
logger.debug("=" * 50)
def normalize(text):
import unicodedata
text = text.lower().strip()
text = unicodedata.normalize("NFKD", text)
text = text.replace("'", "'")
return re.sub(r"[^a-z0-9 ]", "", text)
def detect_threat_motifs(message, motif_list):
norm_msg = normalize(message)
return [motif for motif in motif_list if normalize(motif) in norm_msg]
# Analyze threats and patterns
immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active]
flat_threats = [t for sublist in immediate_threats for t in sublist]
threat_risk = "Yes" if flat_threats else "No"
# Analyze each message
logger.debug("\nπ INDIVIDUAL MESSAGE ANALYSIS")
logger.debug("=" * 50)
results = []
for m, d in active:
logger.debug(f"\nπ ANALYZING {d}")
logger.debug("-" * 40) # Separator for each message
result = analyze_single_message(m, THRESHOLDS.copy())
# Check for non-abusive classification and skip further analysis
if result[0] == 0.0 and result[1] == [] and result[3] == {"label": "supportive"} and result[4] == 1 and result[5] == 0.0 and result[6] == "neutral":
logger.debug(f"β {d} classified as non-abusive, skipping further analysis.")
# Option to include in final output (uncomment if needed):
# results.append(({"abuse_score": 0.0, "patterns": [], "sentiment": {"label": "supportive"}, "stage": 1, "darvo_score": 0.0, "tone": "neutral"}, d))
continue # Skip to the next message
results.append((result, d))
# Log the detailed results for the current message (if not skipped)
abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result
logger.debug(f"\nπ Results for {d}:")
logger.debug(f" β’ Abuse Score: {abuse_score:.1f}%")
logger.debug(f" β’ DARVO Score: {darvo_score:.3f}")
logger.debug(f" β’ Risk Stage: {stage}")
logger.debug(f" β’ Sentiment: {sentiment['label']}")
logger.debug(f" β’ Tone: {tone}")
if patterns:
logger.debug(" β’ Patterns: " + ", ".join(patterns))
# Unpack results for cleaner logging
abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result
# Log core metrics
logger.debug("\nπ CORE METRICS")
logger.debug(f" β’ Abuse Score: {abuse_score:.1f}%")
logger.debug(f" β’ DARVO Score: {darvo_score:.3f}")
logger.debug(f" β’ Risk Stage: {stage}")
logger.debug(f" β’ Sentiment: {sentiment['label']}")
logger.debug(f" β’ Tone: {tone}")
# Log detected patterns with scores
if patterns:
logger.debug("\nπ― DETECTED PATTERNS")
for label, score, weight in matched_scores:
severity = "βHIGH" if label in high else "β οΈ MODERATE" if label in moderate else "π LOW"
logger.debug(f" β’ {severity} | {label}: {score:.3f} (weight: {weight})")
else:
logger.debug("\nβ No abuse patterns detected")
# Extract scores and metadata
abuse_scores = [r[0][0] for r in results]
stages = [r[0][4] for r in results]
darvo_scores = [r[0][5] for r in results]
tone_tags = [r[0][6] for r in results]
dates_used = [r[1] for r in results]
# Pattern Analysis Summary
logger.debug("\nπ PATTERN ANALYSIS SUMMARY")
logger.debug("=" * 50)
predicted_labels = [label for r in results for label in r[0][1]]
if predicted_labels:
logger.debug("Detected Patterns Across All Messages:")
pattern_counts = Counter(predicted_labels)
# Log high severity patterns first
high_patterns = [p for p in pattern_counts if p in high]
if high_patterns:
logger.debug("\nβ HIGH SEVERITY PATTERNS:")
for p in high_patterns:
logger.debug(f" β’ {p} (Γ{pattern_counts[p]})")
# Then moderate
moderate_patterns = [p for p in pattern_counts if p in moderate]
if moderate_patterns:
logger.debug("\nβ οΈ MODERATE SEVERITY PATTERNS:")
for p in moderate_patterns:
logger.debug(f" β’ {p} (Γ{pattern_counts[p]})")
# Then low
low_patterns = [p for p in pattern_counts if p in low]
if low_patterns:
logger.debug("\nπ LOW SEVERITY PATTERNS:")
for p in low_patterns:
logger.debug(f" β’ {p} (Γ{pattern_counts[p]})")
else:
logger.debug("β No patterns detected across messages")
# Pattern Severity Analysis
logger.debug("\nβοΈ SEVERITY ANALYSIS")
logger.debug("=" * 50)
counts = {'high': 0, 'moderate': 0, 'low': 0}
for label in predicted_labels:
if label in high:
counts['high'] += 1
elif label in moderate:
counts['moderate'] += 1
elif label in low:
counts['low'] += 1
logger.debug("Pattern Distribution:")
if counts['high'] > 0:
logger.debug(f" β High Severity: {counts['high']}")
if counts['moderate'] > 0:
logger.debug(f" β οΈ Moderate Severity: {counts['moderate']}")
if counts['low'] > 0:
logger.debug(f" π Low Severity: {counts['low']}")
total_patterns = sum(counts.values())
if total_patterns > 0:
logger.debug(f"\nSeverity Percentages:")
logger.debug(f" β’ High: {(counts['high']/total_patterns)*100:.1f}%")
logger.debug(f" β’ Moderate: {(counts['moderate']/total_patterns)*100:.1f}%")
logger.debug(f" β’ Low: {(counts['low']/total_patterns)*100:.1f}%")
# Risk Assessment
logger.debug("\nπ― RISK ASSESSMENT")
logger.debug("=" * 50)
if counts['high'] >= 2 and counts['moderate'] >= 2:
pattern_escalation_risk = "Critical"
logger.debug("ββ CRITICAL RISK")
logger.debug(" β’ Multiple high and moderate patterns detected")
logger.debug(f" β’ High patterns: {counts['high']}")
logger.debug(f" β’ Moderate patterns: {counts['moderate']}")
elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \
(counts['moderate'] >= 3) or \
(counts['high'] >= 1 and counts['moderate'] >= 2):
pattern_escalation_risk = "High"
logger.debug("β HIGH RISK")
logger.debug(" β’ Significant pattern combination detected")
logger.debug(f" β’ High patterns: {counts['high']}")
logger.debug(f" β’ Moderate patterns: {counts['moderate']}")
elif (counts['moderate'] == 2) or \
(counts['high'] == 1 and counts['moderate'] == 1) or \
(counts['moderate'] == 1 and counts['low'] >= 2) or \
(counts['high'] == 1 and sum(counts.values()) == 1):
pattern_escalation_risk = "Moderate"
logger.debug("β οΈ MODERATE RISK")
logger.debug(" β’ Concerning pattern combination detected")
logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}")
else:
pattern_escalation_risk = "Low"
logger.debug("π LOW RISK")
logger.debug(" β’ Limited pattern severity detected")
logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}")
# Checklist Risk Assessment
logger.debug("\nπ CHECKLIST RISK ASSESSMENT")
logger.debug("=" * 50)
checklist_escalation_risk = "Unknown" if escalation_score is None else (
"Critical" if escalation_score >= 20 else
"Moderate" if escalation_score >= 10 else
"Low"
)
if escalation_score is not None:
logger.debug(f"Score: {escalation_score}/29")
logger.debug(f"Risk Level: {checklist_escalation_risk}")
if escalation_score >= 20:
logger.debug("ββ CRITICAL: Score indicates severe risk")
elif escalation_score >= 10:
logger.debug("β οΈ MODERATE: Score indicates concerning risk")
else:
logger.debug("π LOW: Score indicates limited risk")
else:
logger.debug("β Risk Level: Unknown (checklist not completed)")
# Escalation Analysis
logger.debug("\nπ ESCALATION ANALYSIS")
logger.debug("=" * 50)
escalation_bump = 0
for result, msg_id in results:
abuse_score, _, _, sentiment, stage, darvo_score, tone_tag = result
logger.debug(f"\nπ Message {msg_id} Risk Factors:")
factors = []
if darvo_score > 0.65:
escalation_bump += 3
factors.append(f"β² +3: High DARVO score ({darvo_score:.3f})")
if tone_tag in ["forced accountability flip", "emotional threat"]:
escalation_bump += 2
factors.append(f"β² +2: Concerning tone ({tone_tag})")
if abuse_score > 80:
escalation_bump += 2
factors.append(f"β² +2: High abuse score ({abuse_score:.1f}%)")
if stage == 2:
escalation_bump += 3
factors.append("β² +3: Escalation stage")
if factors:
for factor in factors:
logger.debug(f" {factor}")
else:
logger.debug(" β No escalation factors")
logger.debug(f"\nπ Total Escalation Bump: +{escalation_bump}")
# Check for compound threats across messages
compound_threat_flag, threat_type = analyze_message_batch_threats(
[msg1, msg2, msg3], results
)
if compound_threat_flag:
logger.debug(f"β οΈ Compound threat detected across messages: {threat_type}")
pattern_escalation_risk = "Critical" # Override risk level
logger.debug("Risk level elevated to CRITICAL due to compound threats")
# Combined Risk Calculation
logger.debug("\nπ― FINAL RISK CALCULATION")
logger.debug("=" * 50)
def rank(label):
return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0)
combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump
logger.debug("Risk Components:")
logger.debug(f" β’ Pattern Risk ({pattern_escalation_risk}): +{rank(pattern_escalation_risk)}")
logger.debug(f" β’ Checklist Risk ({checklist_escalation_risk}): +{rank(checklist_escalation_risk)}")
logger.debug(f" β’ Escalation Bump: +{escalation_bump}")
logger.debug(f" = Combined Score: {combined_score}")
escalation_risk = (
"Critical" if combined_score >= 6 else
"High" if combined_score >= 4 else
"Moderate" if combined_score >= 2 else
"Low"
)
logger.debug(f"\nβ οΈ Final Escalation Risk: {escalation_risk}")
# Generate Output Text
logger.debug("\nπ GENERATING OUTPUT")
logger.debug("=" * 50)
if escalation_score is None:
escalation_text = (
"π« **Escalation Potential: Unknown** (Checklist not completed)\n"
"β οΈ This section was not completed. Escalation potential is estimated using message data only.\n"
)
hybrid_score = 0
logger.debug("Generated output for incomplete checklist")
elif escalation_score == 0:
escalation_text = (
"β
**Escalation Checklist Completed:** No danger items reported.\n"
"π§ **Escalation potential estimated from detected message patterns only.**\n"
f"β’ Pattern Risk: {pattern_escalation_risk}\n"
f"β’ Checklist Risk: None reported\n"
f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)"
)
hybrid_score = escalation_bump
logger.debug("Generated output for no-risk checklist")
else:
hybrid_score = escalation_score + escalation_bump
escalation_text = (
f"π **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n"
"π This score combines your safety checklist answers *and* detected high-risk behavior.\n"
f"β’ Pattern Risk: {pattern_escalation_risk}\n"
f"β’ Checklist Risk: {checklist_escalation_risk}\n"
f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)"
)
logger.debug(f"Generated output with hybrid score: {hybrid_score}/29")
# Final Metrics
logger.debug("\nπ FINAL METRICS")
logger.debug("=" * 50)
composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores)))
logger.debug(f"Composite Abuse Score: {composite_abuse}%")
most_common_stage = max(set(stages), key=stages.count)
logger.debug(f"Most Common Stage: {most_common_stage}")
avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3)
logger.debug(f"Average DARVO Score: {avg_darvo}")
# Generate Final Report
logger.debug("\nπ GENERATING FINAL REPORT")
logger.debug("=" * 50)
out = f"Abuse Intensity: {composite_abuse}%\n"
# Add detected patterns to output
if predicted_labels:
out += "π Detected Patterns:\n"
if high_patterns:
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in high_patterns)
out += f"β High Severity: {patterns_str}\n"
if moderate_patterns:
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in moderate_patterns)
out += f"β οΈ Moderate Severity: {patterns_str}\n"
if low_patterns:
patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in low_patterns)
out += f"π Low Severity: {patterns_str}\n"
out += "\n"
out += "π This reflects the strength and severity of detected abuse patterns in the message(s).\n\n"
# Risk Level Assessment
risk_level = (
"Critical" if composite_abuse >= 85 or hybrid_score >= 20 else
"High" if composite_abuse >= 70 or hybrid_score >= 15 else
"Moderate" if composite_abuse >= 50 or hybrid_score >= 10 else
"Low"
)
logger.debug(f"Final Risk Level: {risk_level}")
# Add Risk Description
risk_descriptions = {
"Critical": (
"π¨ **Risk Level: Critical**\n"
"Multiple severe abuse patterns detected. This situation shows signs of "
"dangerous escalation and immediate intervention may be needed."
),
"High": (
"β οΈ **Risk Level: High**\n"
"Strong abuse patterns detected. This situation shows concerning "
"signs of manipulation and control."
),
"Moderate": (
"β‘ **Risk Level: Moderate**\n"
"Concerning patterns detected. While not severe, these behaviors "
"indicate unhealthy relationship dynamics."
),
"Low": (
"π **Risk Level: Low**\n"
"Minor concerning patterns detected. While present, the detected "
"behaviors are subtle or infrequent."
)
}
out += risk_descriptions[risk_level]
out += f"\n\n{RISK_STAGE_LABELS[most_common_stage]}"
logger.debug("Added risk description and stage information")
# Add DARVO Analysis
if avg_darvo > 0.25:
level = "moderate" if avg_darvo < 0.65 else "high"
out += f"\n\nπ **DARVO Score: {avg_darvo}** β This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame."
logger.debug(f"Added DARVO analysis ({level} level)")
# Add Emotional Tones
logger.debug("\nπ Adding Emotional Tones")
out += "\n\nπ **Emotional Tones Detected:**\n"
for i, tone in enumerate(tone_tags):
out += f"β’ Message {i+1}: *{tone or 'none'}*\n"
logger.debug(f"Message {i+1} tone: {tone}")
# Add Threats Section
logger.debug("\nβ οΈ Adding Threat Analysis")
if flat_threats:
out += "\n\nπ¨ **Immediate Danger Threats Detected:**\n"
for t in set(flat_threats):
out += f"β’ \"{t}\"\n"
out += "\nβ οΈ These phrases may indicate an imminent risk to physical safety."
logger.debug(f"Added {len(set(flat_threats))} unique threat warnings")
else:
out += "\n\nπ§© **Immediate Danger Threats:** None explicitly detected.\n"
out += "This does *not* rule out risk, but no direct threat phrases were matched."
logger.debug("No threats to add")
# Generate Timeline
logger.debug("\nπ Generating Timeline")
pattern_labels = []
for result, _ in results:
matched_scores = result[2] # Get the matched_scores from the result tuple
if matched_scores:
# Sort matched_scores by score and get the highest scoring pattern
highest_pattern = max(matched_scores, key=lambda x: x[1])
pattern_labels.append(highest_pattern[0]) # Add the pattern name
else:
pattern_labels.append("none")
logger.debug("Pattern labels for timeline:")
for i, (pattern, score) in enumerate(zip(pattern_labels, abuse_scores)):
logger.debug(f"Message {i+1}: {pattern} ({score:.1f}%)")
timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels)
logger.debug("Timeline generated successfully")
# Add Escalation Text
out += "\n\n" + escalation_text
logger.debug("Added escalation text to output")
logger.debug("\nβ
ANALYSIS COMPLETE")
logger.debug("=" * 50)
return out, timeline_image
except Exception as e:
logger.error("\nβ ERROR IN ANALYSIS")
logger.error("=" * 50)
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error message: {str(e)}")
logger.error(f"Traceback:\n{traceback.format_exc()}")
return "An error occurred during analysis.", None
# Gradio Interface Setup
def create_interface():
try:
textbox_inputs = [gr.Textbox(label=f"Message {i+1}") for i in range(3)]
quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS]
none_box = gr.Checkbox(label="None of the above")
demo = gr.Interface(
fn=analyze_composite,
inputs=textbox_inputs + quiz_boxes + [none_box],
outputs=[
gr.Textbox(label="Results"),
gr.Image(label="Abuse Score Timeline", type="pil")
],
title="Abuse Pattern Detector + Escalation Quiz",
description=(
"Enter up to three messages that concern you. "
"For the most accurate results, include messages from a recent emotionally intense period."
),
flagging_mode="manual"
)
return demo
except Exception as e:
logger.error(f"Error creating interface: {e}")
raise
# Main execution
if __name__ == "__main__":
try:
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)
except Exception as e:
logger.error(f"Failed to launch app: {e}")
raise
|