LinkLinkWu's picture
Update func.py
ae44182 verified
raw
history blame
7.36 kB
from typing import List, Tuple
from transformers import (
pipeline,
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForTokenClassification,
)
from bs4 import BeautifulSoup
import requests
# ---------------------------------------------------------------------------
# Model identifiers – custom binary‑sentiment model hosted on Hugging Face
# ---------------------------------------------------------------------------
SENTIMENT_MODEL_ID = "LinkLinkWu/Stock_Analysis_Test_Ahamed" # LABEL_0 = Negative, LABEL_1 = Positive
NER_MODEL_ID = "dslim/bert-base-NER"
# ---------------------------------------------------------------------------
# Pipeline singletons (initialised once per session)
# ---------------------------------------------------------------------------
sentiment_tokenizer = AutoTokenizer.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_pipeline = pipeline(
"sentiment-analysis",
model=sentiment_model,
tokenizer=sentiment_tokenizer,
)
ner_tokenizer = AutoTokenizer.from_pretrained(NER_MODEL_ID)
ner_model = AutoModelForTokenClassification.from_pretrained(NER_MODEL_ID)
ner_pipeline = pipeline(
"ner",
model=ner_model,
tokenizer=ner_tokenizer,
grouped_entities=True,
)
# ---------------------------------------------------------------------------
# Web‑scraping helper (Finviz)
# ---------------------------------------------------------------------------
def fetch_news(ticker: str) -> List[dict]:
"""Return ≤30 latest Finviz headlines for *ticker* ("title" & "link")."""
try:
url = f"https://finviz.com/quote.ashx?t={ticker}"
headers = {
"User-Agent": "Mozilla/5.0",
"Accept": "text/html",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://finviz.com/",
"Connection": "keep-alive",
}
r = requests.get(url, headers=headers, timeout=10)
if r.status_code != 200:
return []
soup = BeautifulSoup(r.text, "html.parser")
if ticker.upper() not in (soup.title.text if soup.title else "").upper():
return [] # redirect / placeholder page
table = soup.find(id="news-table")
if table is None:
return []
headlines: List[dict] = []
for row in table.find_all("tr")[:30]:
link_tag = row.find("a")
if link_tag:
headlines.append({"title": link_tag.get_text(strip=True), "link": link_tag["href"]})
return headlines
except Exception:
return []
# ---------------------------------------------------------------------------
# Sentiment helpers – binary output, internal probabilities retained
# ---------------------------------------------------------------------------
_LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive", "NEUTRAL": "Positive"}
_POSITIVE_RAW = "LABEL_1"
_NEUTRAL_RAW = "NEUTRAL" # rarely returned; mapped to Positive on purpose
_SINGLE_THRESHOLD = 0.55 # per‑headline cut‑off
def analyze_sentiment(
text: str,
pipe=None,
threshold: float = _SINGLE_THRESHOLD,
) -> Tuple[str, float]:
"""Return ``(label, positive_probability)`` for *text*.
* Neutral predictions – if produced by the model – are **treated as Positive**.
* Numeric probability is kept for aggregation; front‑end may discard it to
satisfy the "no numbers" display requirement.
"""
try:
sentiment_pipe = pipe or sentiment_pipeline
all_scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0]
score_map = {item["label"].upper(): item["score"] for item in all_scores}
# Positive probability: include Neutral as positive when present
pos_prob = score_map.get(_POSITIVE_RAW, 0.0)
if _NEUTRAL_RAW in score_map:
pos_prob = max(pos_prob, score_map[_NEUTRAL_RAW])
# Determine final label (Neutral → Positive by design)
label = "Positive" if (
(_NEUTRAL_RAW in score_map) or (pos_prob >= threshold)
) else "Negative"
return label, pos_prob
except Exception:
return "Unknown", 0.0
# ---------------------------------------------------------------------------
_LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive"}
_POSITIVE_RAW = "LABEL_1"
_SINGLE_THRESHOLD = 0.55 # per‑headline cut‑off
def analyze_sentiment(text: str, pipe=None, threshold: float = _SINGLE_THRESHOLD) -> Tuple[str, float]:
"""Return ``(label, positive_probability)`` for *text*.
* Neutral is not expected from a binary model; if encountered, treat as Negative.
* Numeric probability is for internal aggregation only – front‑end can ignore
it to satisfy the "no numbers" requirement.
"""
try:
sentiment_pipe = pipe or sentiment_pipeline
scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0]
pos_prob = 0.0
for item in scores:
if item["label"].upper() == _POSITIVE_RAW:
pos_prob = item["score"]
break
label = "Positive" if pos_prob >= threshold else "Negative"
return label, pos_prob
except Exception:
return "Unknown", 0.0
# ---------------------------------------------------------------------------
# Aggregation – average positive probability → binary overall label
# ---------------------------------------------------------------------------
_AVG_THRESHOLD = 0.55 # ≥55 % mean positive probability → overall Positive
def aggregate_sentiments(results: List[Tuple[str, float]], avg_threshold: float = _AVG_THRESHOLD) -> str:
"""Compute overall **Positive/Negative** via *average positive probability*.
* *results* – list of tuples from ``analyze_sentiment``.
* Empty list → *Unknown*.
* The returned label is **binary**; numeric values remain internal.
"""
if not results:
return "Unknown"
avg_pos = sum(prob for _, prob in results) / len(results)
return "Positive" if avg_pos >= avg_threshold else "Negative"
# ---------------------------------------------------------------------------
# ORG‑entity extraction (ticker discovery)
# ---------------------------------------------------------------------------
def extract_org_entities(text: str, pipe=None, max_entities: int = 5) -> List[str]:
"""Extract up to *max_entities* unique ORG tokens (upper‑case, de‑hashed)."""
try:
ner_pipe = pipe or ner_pipeline
entities = ner_pipe(text)
orgs: List[str] = []
for ent in entities:
if ent.get("entity_group") == "ORG":
token = ent["word"].replace("##", "").strip().upper()
if token and token not in orgs:
orgs.append(token)
if len(orgs) >= max_entities:
break
return orgs
except Exception:
return []
# ---------------------------------------------------------------------------
# Public accessors (legacy compatibility)
# ---------------------------------------------------------------------------
def get_sentiment_pipeline():
return sentiment_pipeline
def get_ner_pipeline():
return ner_pipeline