File size: 7,359 Bytes
ae44182
d25b499
 
 
 
 
 
 
7832e21
 
64ffc8f
d25b499
ae44182
d25b499
ae44182
d25b499
 
 
ae44182
d25b499
 
 
dd3df57
 
 
d25b499
dd3df57
64ffc8f
d25b499
 
dd3df57
 
 
 
d25b499
dd3df57
64ffc8f
d25b499
64d5a00
d25b499
 
 
ae44182
64ffc8f
 
 
d25b499
 
 
 
 
64ffc8f
c7f60fc
 
64ffc8f
 
c7f60fc
 
ae44182
64ffc8f
c7f60fc
 
64ffc8f
 
64d5a00
c7f60fc
d25b499
 
64d5a00
 
7832e21
64ffc8f
 
d25b499
ae44182
d25b499
ae44182
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d25b499
 
ae44182
 
d25b499
ae44182
 
 
628c80f
64ffc8f
628c80f
ae44182
 
 
 
 
 
 
 
7832e21
ae44182
64ffc8f
7c727fa
ae44182
7c727fa
ae44182
64d5a00
 
ae44182
 
c7f60fc
ae44182
c7f60fc
ae44182
7c727fa
ae44182
c7f60fc
 
ae44182
 
c7f60fc
 
64d5a00
c7f60fc
 
 
64d5a00
7c727fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d25b499
64d5a00
d25b499
dd3df57
 
 
 
d25b499
dd3df57
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from typing import List, Tuple

from transformers import (
    pipeline,
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForTokenClassification,
)
from bs4 import BeautifulSoup
import requests

# ---------------------------------------------------------------------------
# Model identifiers – custom binary‑sentiment model hosted on Hugging Face
# ---------------------------------------------------------------------------
SENTIMENT_MODEL_ID = "LinkLinkWu/Stock_Analysis_Test_Ahamed"  # LABEL_0 = Negative, LABEL_1 = Positive
NER_MODEL_ID = "dslim/bert-base-NER"

# ---------------------------------------------------------------------------
# Pipeline singletons (initialised once per session)
# ---------------------------------------------------------------------------
sentiment_tokenizer = AutoTokenizer.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_pipeline = pipeline(
    "sentiment-analysis",
    model=sentiment_model,
    tokenizer=sentiment_tokenizer,
)

ner_tokenizer = AutoTokenizer.from_pretrained(NER_MODEL_ID)
ner_model = AutoModelForTokenClassification.from_pretrained(NER_MODEL_ID)
ner_pipeline = pipeline(
    "ner",
    model=ner_model,
    tokenizer=ner_tokenizer,
    grouped_entities=True,
)

# ---------------------------------------------------------------------------
# Web‑scraping helper (Finviz)
# ---------------------------------------------------------------------------

def fetch_news(ticker: str) -> List[dict]:
    """Return ≤30 latest Finviz headlines for *ticker* ("title" & "link")."""
    try:
        url = f"https://finviz.com/quote.ashx?t={ticker}"
        headers = {
            "User-Agent": "Mozilla/5.0",
            "Accept": "text/html",
            "Accept-Language": "en-US,en;q=0.5",
            "Referer": "https://finviz.com/",
            "Connection": "keep-alive",
        }
        r = requests.get(url, headers=headers, timeout=10)
        if r.status_code != 200:
            return []

        soup = BeautifulSoup(r.text, "html.parser")
        if ticker.upper() not in (soup.title.text if soup.title else "").upper():
            return []  # redirect / placeholder page

        table = soup.find(id="news-table")
        if table is None:
            return []

        headlines: List[dict] = []
        for row in table.find_all("tr")[:30]:
            link_tag = row.find("a")
            if link_tag:
                headlines.append({"title": link_tag.get_text(strip=True), "link": link_tag["href"]})
        return headlines
    except Exception:
        return []

# ---------------------------------------------------------------------------
# Sentiment helpers – binary output, internal probabilities retained
# ---------------------------------------------------------------------------
_LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive", "NEUTRAL": "Positive"}
_POSITIVE_RAW = "LABEL_1"
_NEUTRAL_RAW = "NEUTRAL"  # rarely returned; mapped to Positive on purpose
_SINGLE_THRESHOLD = 0.55  # per‑headline cut‑off


def analyze_sentiment(
    text: str,
    pipe=None,
    threshold: float = _SINGLE_THRESHOLD,
) -> Tuple[str, float]:
    """Return ``(label, positive_probability)`` for *text*.

    * Neutral predictions – if produced by the model – are **treated as Positive**.
    * Numeric probability is kept for aggregation; front‑end may discard it to
      satisfy the "no numbers" display requirement.
    """
    try:
        sentiment_pipe = pipe or sentiment_pipeline
        all_scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0]
        score_map = {item["label"].upper(): item["score"] for item in all_scores}

        # Positive probability: include Neutral as positive when present
        pos_prob = score_map.get(_POSITIVE_RAW, 0.0)
        if _NEUTRAL_RAW in score_map:
            pos_prob = max(pos_prob, score_map[_NEUTRAL_RAW])

        # Determine final label (Neutral → Positive by design)
        label = "Positive" if (
            (_NEUTRAL_RAW in score_map) or (pos_prob >= threshold)
        ) else "Negative"
        return label, pos_prob
    except Exception:
        return "Unknown", 0.0

# ---------------------------------------------------------------------------
_LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive"}
_POSITIVE_RAW = "LABEL_1"
_SINGLE_THRESHOLD = 0.55  # per‑headline cut‑off


def analyze_sentiment(text: str, pipe=None, threshold: float = _SINGLE_THRESHOLD) -> Tuple[str, float]:
    """Return ``(label, positive_probability)`` for *text*.

    * Neutral is not expected from a binary model; if encountered, treat as Negative.
    * Numeric probability is for internal aggregation only – front‑end can ignore
      it to satisfy the "no numbers" requirement.
    """
    try:
        sentiment_pipe = pipe or sentiment_pipeline
        scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0]
        pos_prob = 0.0
        for item in scores:
            if item["label"].upper() == _POSITIVE_RAW:
                pos_prob = item["score"]
                break
        label = "Positive" if pos_prob >= threshold else "Negative"
        return label, pos_prob
    except Exception:
        return "Unknown", 0.0

# ---------------------------------------------------------------------------
# Aggregation – average positive probability → binary overall label
# ---------------------------------------------------------------------------
_AVG_THRESHOLD = 0.55  # ≥55 % mean positive probability → overall Positive


def aggregate_sentiments(results: List[Tuple[str, float]], avg_threshold: float = _AVG_THRESHOLD) -> str:
    """Compute overall **Positive/Negative** via *average positive probability*.

    * *results* – list of tuples from ``analyze_sentiment``.
    * Empty list → *Unknown*.
    * The returned label is **binary**; numeric values remain internal.
    """
    if not results:
        return "Unknown"

    avg_pos = sum(prob for _, prob in results) / len(results)
    return "Positive" if avg_pos >= avg_threshold else "Negative"

# ---------------------------------------------------------------------------
# ORG‑entity extraction (ticker discovery)
# ---------------------------------------------------------------------------

def extract_org_entities(text: str, pipe=None, max_entities: int = 5) -> List[str]:
    """Extract up to *max_entities* unique ORG tokens (upper‑case, de‑hashed)."""
    try:
        ner_pipe = pipe or ner_pipeline
        entities = ner_pipe(text)
        orgs: List[str] = []
        for ent in entities:
            if ent.get("entity_group") == "ORG":
                token = ent["word"].replace("##", "").strip().upper()
                if token and token not in orgs:
                    orgs.append(token)
                if len(orgs) >= max_entities:
                    break
        return orgs
    except Exception:
        return []

# ---------------------------------------------------------------------------
# Public accessors (legacy compatibility)
# ---------------------------------------------------------------------------

def get_sentiment_pipeline():
    return sentiment_pipeline


def get_ner_pipeline():
    return ner_pipeline