Spaces:
Sleeping
Sleeping
from typing import List, Tuple | |
from transformers import ( | |
pipeline, | |
AutoTokenizer, | |
AutoModelForSequenceClassification, | |
AutoModelForTokenClassification, | |
) | |
from bs4 import BeautifulSoup | |
import requests | |
# --------------------------------------------------------------------------- | |
# Model identifiers – custom binary‑sentiment model hosted on Hugging Face | |
# --------------------------------------------------------------------------- | |
SENTIMENT_MODEL_ID = "LinkLinkWu/Stock_Analysis_Test_Ahamed" # LABEL_0 = Negative, LABEL_1 = Positive | |
NER_MODEL_ID = "dslim/bert-base-NER" | |
# --------------------------------------------------------------------------- | |
# Pipeline singletons (initialised once per session) | |
# --------------------------------------------------------------------------- | |
sentiment_tokenizer = AutoTokenizer.from_pretrained(SENTIMENT_MODEL_ID) | |
sentiment_model = AutoModelForSequenceClassification.from_pretrained(SENTIMENT_MODEL_ID) | |
sentiment_pipeline = pipeline( | |
"sentiment-analysis", | |
model=sentiment_model, | |
tokenizer=sentiment_tokenizer, | |
) | |
ner_tokenizer = AutoTokenizer.from_pretrained(NER_MODEL_ID) | |
ner_model = AutoModelForTokenClassification.from_pretrained(NER_MODEL_ID) | |
ner_pipeline = pipeline( | |
"ner", | |
model=ner_model, | |
tokenizer=ner_tokenizer, | |
grouped_entities=True, | |
) | |
# --------------------------------------------------------------------------- | |
# Web‑scraping helper (Finviz) | |
# --------------------------------------------------------------------------- | |
def fetch_news(ticker: str) -> List[dict]: | |
"""Return ≤30 latest Finviz headlines for *ticker* ("title" & "link").""" | |
try: | |
url = f"https://finviz.com/quote.ashx?t={ticker}" | |
headers = { | |
"User-Agent": "Mozilla/5.0", | |
"Accept": "text/html", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://finviz.com/", | |
"Connection": "keep-alive", | |
} | |
r = requests.get(url, headers=headers, timeout=10) | |
if r.status_code != 200: | |
return [] | |
soup = BeautifulSoup(r.text, "html.parser") | |
if ticker.upper() not in (soup.title.text if soup.title else "").upper(): | |
return [] # redirect / placeholder page | |
table = soup.find(id="news-table") | |
if table is None: | |
return [] | |
headlines: List[dict] = [] | |
for row in table.find_all("tr")[:30]: | |
link_tag = row.find("a") | |
if link_tag: | |
headlines.append({"title": link_tag.get_text(strip=True), "link": link_tag["href"]}) | |
return headlines | |
except Exception: | |
return [] | |
# --------------------------------------------------------------------------- | |
# Sentiment helpers – binary output, internal probabilities retained | |
# --------------------------------------------------------------------------- | |
_LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive", "NEUTRAL": "Positive"} | |
_POSITIVE_RAW = "LABEL_1" | |
_NEUTRAL_RAW = "NEUTRAL" # rarely returned; mapped to Positive on purpose | |
_SINGLE_THRESHOLD = 0.55 # per‑headline cut‑off | |
def analyze_sentiment( | |
text: str, | |
pipe=None, | |
threshold: float = _SINGLE_THRESHOLD, | |
) -> Tuple[str, float]: | |
"""Return ``(label, positive_probability)`` for *text*. | |
* Neutral predictions – if produced by the model – are **treated as Positive**. | |
* Numeric probability is kept for aggregation; front‑end may discard it to | |
satisfy the "no numbers" display requirement. | |
""" | |
try: | |
sentiment_pipe = pipe or sentiment_pipeline | |
all_scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0] | |
score_map = {item["label"].upper(): item["score"] for item in all_scores} | |
# Positive probability: include Neutral as positive when present | |
pos_prob = score_map.get(_POSITIVE_RAW, 0.0) | |
if _NEUTRAL_RAW in score_map: | |
pos_prob = max(pos_prob, score_map[_NEUTRAL_RAW]) | |
# Determine final label (Neutral → Positive by design) | |
label = "Positive" if ( | |
(_NEUTRAL_RAW in score_map) or (pos_prob >= threshold) | |
) else "Negative" | |
return label, pos_prob | |
except Exception: | |
return "Unknown", 0.0 | |
# --------------------------------------------------------------------------- | |
_LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive"} | |
_POSITIVE_RAW = "LABEL_1" | |
_SINGLE_THRESHOLD = 0.55 # per‑headline cut‑off | |
def analyze_sentiment(text: str, pipe=None, threshold: float = _SINGLE_THRESHOLD) -> Tuple[str, float]: | |
"""Return ``(label, positive_probability)`` for *text*. | |
* Neutral is not expected from a binary model; if encountered, treat as Negative. | |
* Numeric probability is for internal aggregation only – front‑end can ignore | |
it to satisfy the "no numbers" requirement. | |
""" | |
try: | |
sentiment_pipe = pipe or sentiment_pipeline | |
scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0] | |
pos_prob = 0.0 | |
for item in scores: | |
if item["label"].upper() == _POSITIVE_RAW: | |
pos_prob = item["score"] | |
break | |
label = "Positive" if pos_prob >= threshold else "Negative" | |
return label, pos_prob | |
except Exception: | |
return "Unknown", 0.0 | |
# --------------------------------------------------------------------------- | |
# Aggregation – average positive probability → binary overall label | |
# --------------------------------------------------------------------------- | |
_AVG_THRESHOLD = 0.55 # ≥55 % mean positive probability → overall Positive | |
def aggregate_sentiments(results: List[Tuple[str, float]], avg_threshold: float = _AVG_THRESHOLD) -> str: | |
"""Compute overall **Positive/Negative** via *average positive probability*. | |
* *results* – list of tuples from ``analyze_sentiment``. | |
* Empty list → *Unknown*. | |
* The returned label is **binary**; numeric values remain internal. | |
""" | |
if not results: | |
return "Unknown" | |
avg_pos = sum(prob for _, prob in results) / len(results) | |
return "Positive" if avg_pos >= avg_threshold else "Negative" | |
# --------------------------------------------------------------------------- | |
# ORG‑entity extraction (ticker discovery) | |
# --------------------------------------------------------------------------- | |
def extract_org_entities(text: str, pipe=None, max_entities: int = 5) -> List[str]: | |
"""Extract up to *max_entities* unique ORG tokens (upper‑case, de‑hashed).""" | |
try: | |
ner_pipe = pipe or ner_pipeline | |
entities = ner_pipe(text) | |
orgs: List[str] = [] | |
for ent in entities: | |
if ent.get("entity_group") == "ORG": | |
token = ent["word"].replace("##", "").strip().upper() | |
if token and token not in orgs: | |
orgs.append(token) | |
if len(orgs) >= max_entities: | |
break | |
return orgs | |
except Exception: | |
return [] | |
# --------------------------------------------------------------------------- | |
# Public accessors (legacy compatibility) | |
# --------------------------------------------------------------------------- | |
def get_sentiment_pipeline(): | |
return sentiment_pipeline | |
def get_ner_pipeline(): | |
return ner_pipeline | |