from typing import List, Tuple from transformers import ( pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForTokenClassification, ) from bs4 import BeautifulSoup import requests # --------------------------------------------------------------------------- # Model identifiers – custom binary‑sentiment model hosted on Hugging Face # --------------------------------------------------------------------------- SENTIMENT_MODEL_ID = "LinkLinkWu/Stock_Analysis_Test_Ahamed" # LABEL_0 = Negative, LABEL_1 = Positive NER_MODEL_ID = "dslim/bert-base-NER" # --------------------------------------------------------------------------- # Pipeline singletons (initialised once per session) # --------------------------------------------------------------------------- sentiment_tokenizer = AutoTokenizer.from_pretrained(SENTIMENT_MODEL_ID) sentiment_model = AutoModelForSequenceClassification.from_pretrained(SENTIMENT_MODEL_ID) sentiment_pipeline = pipeline( "sentiment-analysis", model=sentiment_model, tokenizer=sentiment_tokenizer, ) ner_tokenizer = AutoTokenizer.from_pretrained(NER_MODEL_ID) ner_model = AutoModelForTokenClassification.from_pretrained(NER_MODEL_ID) ner_pipeline = pipeline( "ner", model=ner_model, tokenizer=ner_tokenizer, grouped_entities=True, ) # --------------------------------------------------------------------------- # Web‑scraping helper (Finviz) # --------------------------------------------------------------------------- def fetch_news(ticker: str) -> List[dict]: """Return ≤30 latest Finviz headlines for *ticker* ("title" & "link").""" try: url = f"https://finviz.com/quote.ashx?t={ticker}" headers = { "User-Agent": "Mozilla/5.0", "Accept": "text/html", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://finviz.com/", "Connection": "keep-alive", } r = requests.get(url, headers=headers, timeout=10) if r.status_code != 200: return [] soup = BeautifulSoup(r.text, "html.parser") if ticker.upper() not in (soup.title.text if soup.title else "").upper(): return [] # redirect / placeholder page table = soup.find(id="news-table") if table is None: return [] headlines: List[dict] = [] for row in table.find_all("tr")[:30]: link_tag = row.find("a") if link_tag: headlines.append({"title": link_tag.get_text(strip=True), "link": link_tag["href"]}) return headlines except Exception: return [] # --------------------------------------------------------------------------- # Sentiment helpers – binary output, internal probabilities retained # --------------------------------------------------------------------------- _LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive", "NEUTRAL": "Positive"} _POSITIVE_RAW = "LABEL_1" _NEUTRAL_RAW = "NEUTRAL" # rarely returned; mapped to Positive on purpose _SINGLE_THRESHOLD = 0.55 # per‑headline cut‑off def analyze_sentiment( text: str, pipe=None, threshold: float = _SINGLE_THRESHOLD, ) -> Tuple[str, float]: """Return ``(label, positive_probability)`` for *text*. * Neutral predictions – if produced by the model – are **treated as Positive**. * Numeric probability is kept for aggregation; front‑end may discard it to satisfy the "no numbers" display requirement. """ try: sentiment_pipe = pipe or sentiment_pipeline all_scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0] score_map = {item["label"].upper(): item["score"] for item in all_scores} # Positive probability: include Neutral as positive when present pos_prob = score_map.get(_POSITIVE_RAW, 0.0) if _NEUTRAL_RAW in score_map: pos_prob = max(pos_prob, score_map[_NEUTRAL_RAW]) # Determine final label (Neutral → Positive by design) label = "Positive" if ( (_NEUTRAL_RAW in score_map) or (pos_prob >= threshold) ) else "Negative" return label, pos_prob except Exception: return "Unknown", 0.0 # --------------------------------------------------------------------------- _LABEL_MAP = {"LABEL_0": "Negative", "LABEL_1": "Positive"} _POSITIVE_RAW = "LABEL_1" _SINGLE_THRESHOLD = 0.55 # per‑headline cut‑off def analyze_sentiment(text: str, pipe=None, threshold: float = _SINGLE_THRESHOLD) -> Tuple[str, float]: """Return ``(label, positive_probability)`` for *text*. * Neutral is not expected from a binary model; if encountered, treat as Negative. * Numeric probability is for internal aggregation only – front‑end can ignore it to satisfy the "no numbers" requirement. """ try: sentiment_pipe = pipe or sentiment_pipeline scores = sentiment_pipe(text, return_all_scores=True, truncation=True)[0] pos_prob = 0.0 for item in scores: if item["label"].upper() == _POSITIVE_RAW: pos_prob = item["score"] break label = "Positive" if pos_prob >= threshold else "Negative" return label, pos_prob except Exception: return "Unknown", 0.0 # --------------------------------------------------------------------------- # Aggregation – average positive probability → binary overall label # --------------------------------------------------------------------------- _AVG_THRESHOLD = 0.55 # ≥55 % mean positive probability → overall Positive def aggregate_sentiments(results: List[Tuple[str, float]], avg_threshold: float = _AVG_THRESHOLD) -> str: """Compute overall **Positive/Negative** via *average positive probability*. * *results* – list of tuples from ``analyze_sentiment``. * Empty list → *Unknown*. * The returned label is **binary**; numeric values remain internal. """ if not results: return "Unknown" avg_pos = sum(prob for _, prob in results) / len(results) return "Positive" if avg_pos >= avg_threshold else "Negative" # --------------------------------------------------------------------------- # ORG‑entity extraction (ticker discovery) # --------------------------------------------------------------------------- def extract_org_entities(text: str, pipe=None, max_entities: int = 5) -> List[str]: """Extract up to *max_entities* unique ORG tokens (upper‑case, de‑hashed).""" try: ner_pipe = pipe or ner_pipeline entities = ner_pipe(text) orgs: List[str] = [] for ent in entities: if ent.get("entity_group") == "ORG": token = ent["word"].replace("##", "").strip().upper() if token and token not in orgs: orgs.append(token) if len(orgs) >= max_entities: break return orgs except Exception: return [] # --------------------------------------------------------------------------- # Public accessors (legacy compatibility) # --------------------------------------------------------------------------- def get_sentiment_pipeline(): return sentiment_pipeline def get_ner_pipeline(): return ner_pipeline