File size: 5,808 Bytes
d25b499
 
 
 
 
 
 
 
7832e21
 
64ffc8f
d25b499
 
 
c7f60fc
d25b499
 
 
c7f60fc
d25b499
 
 
dd3df57
 
 
d25b499
dd3df57
64ffc8f
d25b499
 
dd3df57
 
 
 
d25b499
dd3df57
64ffc8f
d25b499
c7f60fc
d25b499
 
 
c7f60fc
d25b499
c7f60fc
 
d25b499
64ffc8f
 
 
d25b499
 
 
 
 
64ffc8f
c7f60fc
 
64ffc8f
 
c7f60fc
 
 
64ffc8f
c7f60fc
 
64ffc8f
 
c7f60fc
 
d25b499
 
c7f60fc
 
7832e21
64ffc8f
 
d25b499
c7f60fc
d25b499
 
c7f60fc
d25b499
 
 
 
 
 
 
c7f60fc
d25b499
c7f60fc
 
 
 
628c80f
64ffc8f
628c80f
c7f60fc
 
 
 
 
 
 
 
7832e21
d25b499
64ffc8f
7c727fa
c7f60fc
7c727fa
 
c7f60fc
 
 
 
 
 
 
 
 
7c727fa
c7f60fc
 
 
 
 
 
 
 
 
 
 
 
7c727fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d25b499
c7f60fc
d25b499
dd3df57
 
 
 
d25b499
dd3df57
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from typing import List, Tuple

from transformers import (
    pipeline,
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForTokenClassification,
)
from bs4 import BeautifulSoup
import requests

# ---------------------------------------------------------------------------
# Model identifiers
# ---------------------------------------------------------------------------
SENTIMENT_MODEL_ID = "LinkLinkWu/Stock_Analysis_Test_Ahamed" 
NER_MODEL_ID = "dslim/bert-base-NER"

# ---------------------------------------------------------------------------
# Eager initialisation of Hugging Face pipelines (shared singletons)
# ---------------------------------------------------------------------------
sentiment_tokenizer = AutoTokenizer.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_pipeline = pipeline(
    "sentiment-analysis",
    model=sentiment_model,
    tokenizer=sentiment_tokenizer,
)

ner_tokenizer = AutoTokenizer.from_pretrained(NER_MODEL_ID)
ner_model = AutoModelForTokenClassification.from_pretrained(NER_MODEL_ID)
ner_pipeline = pipeline(
    "ner",
    model=ner_model,
    tokenizer=ner_tokenizer,
    grouped_entities=True,
)

# ---------------------------------------------------------------------------
# Web‑scraping helper
# ---------------------------------------------------------------------------

def fetch_news(ticker: str) -> List[dict]:
    """Return up to 30 latest Finviz headlines for *ticker* (title & link).

    Empty list on network / parsing errors or if Finviz redirects to a generic
    page (e.g. wrong ticker).
    """
    try:
        url = f"https://finviz.com/quote.ashx?t={ticker}"
        headers = {
            "User-Agent": "Mozilla/5.0",
            "Accept": "text/html",
            "Accept-Language": "en-US,en;q=0.5",
            "Referer": "https://finviz.com/",
            "Connection": "keep-alive",
        }
        r = requests.get(url, headers=headers, timeout=10)
        if r.status_code != 200:
            return []

        soup = BeautifulSoup(r.text, "html.parser")
        if ticker.upper() not in (soup.title.text if soup.title else "").upper():
            return []  # Finviz placeholder page

        table = soup.find(id="news-table")
        if table is None:
            return []

        news: List[dict] = []
        for row in table.find_all("tr")[:30]:
            link_tag = row.find("a")
            if link_tag:
                news.append({"title": link_tag.get_text(strip=True), "link": link_tag["href"]})
        return news
    except Exception:
        return []

# ---------------------------------------------------------------------------
# Sentiment helpers
# ---------------------------------------------------------------------------
_POSITIVE = "positive"
_DEFAULT_THRESHOLD = 0.55  # per‑headline probability cut‑off


def analyze_sentiment(
    text: str,
    pipe=None,
    threshold: float = _DEFAULT_THRESHOLD,
) -> Tuple[str, float]:
    """Classify *text* and return ``(label, positive_probability)``.

    * Binary label (*Positive* / *Negative*) is determined by comparing the
      *positive* probability with *threshold*.
    * Neutral headlines are mapped to *Negative* by design.
    * On any internal error → ("Unknown", 0.0).
    """
    try:
        sentiment_pipe = pipe or sentiment_pipeline
        scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0]
        pos_prob = 0.0
        for item in scores:
            if item["label"].lower() == _POSITIVE:
                pos_prob = item["score"]
                break
        label = "Positive" if pos_prob >= threshold else "Negative"
        return label, pos_prob
    except Exception:
        return "Unknown", 0.0

# ---------------------------------------------------------------------------
# Aggregation – average positive probability → binary overall label
# ---------------------------------------------------------------------------

def aggregate_sentiments(
    results: List[Tuple[str, float]],
    avg_threshold: float = _DEFAULT_THRESHOLD,
) -> str:
    """Compute overall **Positive/Negative** based on *mean* positive probability.

    * *results* – list returned by ``analyze_sentiment`` for each headline.
    * If the average positive probability ≥ *avg_threshold* → *Positive*.
    * Empty list → *Unknown*.
    """
    if not results:
        return "Unknown"

    avg_pos = sum(prob for _, prob in results) / len(results)
    return "Positive" if avg_pos >= avg_threshold else "Negative"

# ---------------------------------------------------------------------------
# ORG‑entity extraction (for ticker discovery)
# ---------------------------------------------------------------------------

def extract_org_entities(text: str, pipe=None, max_entities: int = 5) -> List[str]:
    """Return up to *max_entities* unique ORG tokens (upper‑case, de‑hashed)."""
    try:
        ner_pipe = pipe or ner_pipeline
        entities = ner_pipe(text)
        orgs: List[str] = []
        for ent in entities:
            if ent.get("entity_group") == "ORG":
                token = ent["word"].replace("##", "").strip().upper()
                if token and token not in orgs:
                    orgs.append(token)
                if len(orgs) >= max_entities:
                    break
        return orgs
    except Exception:
        return []

# ---------------------------------------------------------------------------
# Public accessors (backward compatibility with app.py)
# ---------------------------------------------------------------------------

def get_sentiment_pipeline():
    return sentiment_pipeline


def get_ner_pipeline():
    return ner_pipeline