LinkLinkWu's picture
Update func.py
c7f60fc verified
raw
history blame
5.81 kB
from typing import List, Tuple
from transformers import (
pipeline,
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForTokenClassification,
)
from bs4 import BeautifulSoup
import requests
# ---------------------------------------------------------------------------
# Model identifiers
# ---------------------------------------------------------------------------
SENTIMENT_MODEL_ID = "LinkLinkWu/Stock_Analysis_Test_Ahamed"
NER_MODEL_ID = "dslim/bert-base-NER"
# ---------------------------------------------------------------------------
# Eager initialisation of Hugging Face pipelines (shared singletons)
# ---------------------------------------------------------------------------
sentiment_tokenizer = AutoTokenizer.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(SENTIMENT_MODEL_ID)
sentiment_pipeline = pipeline(
"sentiment-analysis",
model=sentiment_model,
tokenizer=sentiment_tokenizer,
)
ner_tokenizer = AutoTokenizer.from_pretrained(NER_MODEL_ID)
ner_model = AutoModelForTokenClassification.from_pretrained(NER_MODEL_ID)
ner_pipeline = pipeline(
"ner",
model=ner_model,
tokenizer=ner_tokenizer,
grouped_entities=True,
)
# ---------------------------------------------------------------------------
# Web‑scraping helper
# ---------------------------------------------------------------------------
def fetch_news(ticker: str) -> List[dict]:
"""Return up to 30 latest Finviz headlines for *ticker* (title & link).
Empty list on network / parsing errors or if Finviz redirects to a generic
page (e.g. wrong ticker).
"""
try:
url = f"https://finviz.com/quote.ashx?t={ticker}"
headers = {
"User-Agent": "Mozilla/5.0",
"Accept": "text/html",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://finviz.com/",
"Connection": "keep-alive",
}
r = requests.get(url, headers=headers, timeout=10)
if r.status_code != 200:
return []
soup = BeautifulSoup(r.text, "html.parser")
if ticker.upper() not in (soup.title.text if soup.title else "").upper():
return [] # Finviz placeholder page
table = soup.find(id="news-table")
if table is None:
return []
news: List[dict] = []
for row in table.find_all("tr")[:30]:
link_tag = row.find("a")
if link_tag:
news.append({"title": link_tag.get_text(strip=True), "link": link_tag["href"]})
return news
except Exception:
return []
# ---------------------------------------------------------------------------
# Sentiment helpers
# ---------------------------------------------------------------------------
_POSITIVE = "positive"
_DEFAULT_THRESHOLD = 0.55 # per‑headline probability cut‑off
def analyze_sentiment(
text: str,
pipe=None,
threshold: float = _DEFAULT_THRESHOLD,
) -> Tuple[str, float]:
"""Classify *text* and return ``(label, positive_probability)``.
* Binary label (*Positive* / *Negative*) is determined by comparing the
*positive* probability with *threshold*.
* Neutral headlines are mapped to *Negative* by design.
* On any internal error → ("Unknown", 0.0).
"""
try:
sentiment_pipe = pipe or sentiment_pipeline
scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0]
pos_prob = 0.0
for item in scores:
if item["label"].lower() == _POSITIVE:
pos_prob = item["score"]
break
label = "Positive" if pos_prob >= threshold else "Negative"
return label, pos_prob
except Exception:
return "Unknown", 0.0
# ---------------------------------------------------------------------------
# Aggregation – average positive probability → binary overall label
# ---------------------------------------------------------------------------
def aggregate_sentiments(
results: List[Tuple[str, float]],
avg_threshold: float = _DEFAULT_THRESHOLD,
) -> str:
"""Compute overall **Positive/Negative** based on *mean* positive probability.
* *results* – list returned by ``analyze_sentiment`` for each headline.
* If the average positive probability ≥ *avg_threshold* → *Positive*.
* Empty list → *Unknown*.
"""
if not results:
return "Unknown"
avg_pos = sum(prob for _, prob in results) / len(results)
return "Positive" if avg_pos >= avg_threshold else "Negative"
# ---------------------------------------------------------------------------
# ORG‑entity extraction (for ticker discovery)
# ---------------------------------------------------------------------------
def extract_org_entities(text: str, pipe=None, max_entities: int = 5) -> List[str]:
"""Return up to *max_entities* unique ORG tokens (upper‑case, de‑hashed)."""
try:
ner_pipe = pipe or ner_pipeline
entities = ner_pipe(text)
orgs: List[str] = []
for ent in entities:
if ent.get("entity_group") == "ORG":
token = ent["word"].replace("##", "").strip().upper()
if token and token not in orgs:
orgs.append(token)
if len(orgs) >= max_entities:
break
return orgs
except Exception:
return []
# ---------------------------------------------------------------------------
# Public accessors (backward compatibility with app.py)
# ---------------------------------------------------------------------------
def get_sentiment_pipeline():
return sentiment_pipeline
def get_ner_pipeline():
return ner_pipeline