Spaces:
Build error
Build error
import tweepy | |
from transformers import pipeline, GPT2LMHeadModel, GPT2Tokenizer, AutoModelForSequenceClassification, AutoTokenizer | |
import os | |
import streamlit as st | |
from datetime import datetime | |
import time | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
import torch | |
from collections import Counter | |
import re | |
def debug_print(message): | |
"""Função para imprimir mensagens de debug tanto no console quanto no Streamlit""" | |
print(message) | |
st.text(message) | |
def fetch_tweets(client, query, tweet_fields): | |
try: | |
debug_print(f"Iniciando busca com query: {query}") | |
debug_print(f"Campos solicitados: {tweet_fields}") | |
tweets = client.search_recent_tweets( | |
query=query, | |
max_results=100, # Aumentado para ter mais contexto | |
tweet_fields=tweet_fields | |
) | |
if tweets is None: | |
debug_print("Nenhum resultado retornado da API") | |
return None | |
if not hasattr(tweets, 'data'): | |
debug_print("Resposta não contém dados") | |
return None | |
debug_print(f"Tweets encontrados: {len(tweets.data) if tweets.data else 0}") | |
return tweets | |
except tweepy.errors.TooManyRequests as e: | |
debug_print(f"Rate limit atingido: {str(e)}") | |
raise e | |
except tweepy.errors.TwitterServerError as e: | |
debug_print(f"Erro do servidor Twitter: {str(e)}") | |
raise e | |
except tweepy.errors.BadRequest as e: | |
debug_print(f"Erro na requisição: {str(e)}") | |
raise e | |
except Exception as e: | |
debug_print(f"Erro inesperado na busca: {str(e)}") | |
raise e | |
def post_tweet(client, text): | |
try: | |
response = client.create_tweet(text=text) | |
return response | |
except Exception as e: | |
debug_print(f"Erro ao postar tweet: {str(e)}") | |
raise e | |
def initialize_text_generator(): | |
"""Inicializa o modelo de geração de texto""" | |
# Usando um modelo GPT-2 em português maior para melhor qualidade | |
model_name = "pierreguillou/gpt2-small-portuguese" | |
tokenizer = GPT2Tokenizer.from_pretrained(model_name) | |
model = GPT2LMHeadModel.from_pretrained(model_name) | |
return model, tokenizer | |
def extract_context_from_tweets(tweets_data): | |
"""Extrai contexto relevante dos tweets""" | |
all_text = " ".join([tweet.text for tweet in tweets_data]) | |
# Remover URLs, mentions, RTs e caracteres especiais | |
clean_text = re.sub(r'http\S+|@\S+|RT|[^\w\s]', ' ', all_text) | |
# Encontrar nomes de participantes (palavras capitalizadas frequentes) | |
words = clean_text.split() | |
capitalized_words = [word for word in words if word.istitle() and len(word) > 2] | |
participants = Counter(capitalized_words).most_common(5) | |
# Encontrar temas/eventos importantes | |
# Procurar por frases comuns que indicam eventos | |
event_patterns = [ | |
r'paredão entre.*?(?=\s|$)', | |
r'prova do líder.*?(?=\s|$)', | |
r'prova do anjo.*?(?=\s|$)', | |
r'eliminação.*?(?=\s|$)', | |
r'briga entre.*?(?=\s|$)', | |
r'jogo da discórdia.*?(?=\s|$)' | |
] | |
events = [] | |
for pattern in event_patterns: | |
matches = re.findall(pattern, all_text.lower()) | |
if matches: | |
events.extend(matches) | |
return { | |
'participants': [p[0] for p in participants], | |
'events': list(set(events))[:3], # Top 3 eventos únicos | |
'raw_text': clean_text | |
} | |
def generate_comment(context, sentiment_ratio, model, tokenizer): | |
"""Gera um comentário contextualizado sobre o BBB""" | |
# Criar prompt baseado no contexto e sentimento | |
sentiment_tone = "" | |
if sentiment_ratio['positive'] > 0.5: | |
sentiment_tone = "positivo" | |
elif sentiment_ratio['negative'] > 0.5: | |
sentiment_tone = "negativo" | |
else: | |
sentiment_tone = "dividido" | |
# Construir contexto para o prompt | |
prompt = f"No BBB25, com clima {sentiment_tone}" | |
# Adicionar participantes relevantes | |
if context['participants']: | |
participants_str = ", ".join(context['participants'][:2]) | |
prompt += f", {participants_str} se destacam" | |
# Adicionar eventos relevantes | |
if context['events']: | |
event_str = context['events'][0] | |
prompt += f". {event_str.capitalize()}" | |
# Gerar texto | |
inputs = tokenizer.encode(prompt, return_tensors='pt', max_length=150, truncation=True) | |
outputs = model.generate( | |
inputs, | |
max_length=200, | |
num_return_sequences=3, | |
temperature=0.9, | |
top_k=50, | |
top_p=0.95, | |
do_sample=True, | |
no_repeat_ngram_size=2, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
# Gerar múltiplas opções e escolher a melhor | |
generated_texts = [] | |
for output in outputs: | |
text = tokenizer.decode(output, skip_special_tokens=True) | |
# Limpar e formatar o texto | |
text = re.sub(r'\s+', ' ', text).strip() | |
if len(text) > 30: # Garantir que temos um texto substancial | |
generated_texts.append(text) | |
# Escolher o melhor texto | |
best_text = max(generated_texts, key=len) if generated_texts else prompt | |
# Adicionar hashtags relevantes | |
hashtags = " #BBB25" | |
if context['participants']: | |
hashtags += f" #{context['participants'][0].replace(' ', '')}" | |
if context['events']: | |
event_tag = context['events'][0].split()[0].capitalize() | |
hashtags += f" #{event_tag}" | |
# Garantir que está dentro do limite do Twitter | |
max_length = 280 - len(hashtags) | |
if len(best_text) > max_length: | |
best_text = best_text[:max_length-3] + "..." | |
return best_text + hashtags | |
def main(): | |
try: | |
st.title("Análise de Sentimentos - BBB25") | |
# Verificar variáveis de ambiente | |
debug_print("Verificando variáveis de ambiente...") | |
required_vars = [ | |
'TWITTER_API_KEY', | |
'TWITTER_API_SECRET_KEY', | |
'TWITTER_ACCESS_TOKEN', | |
'TWITTER_ACCESS_TOKEN_SECRET', | |
'TWITTER_BEARER_TOKEN' | |
] | |
missing_vars = [] | |
for var in required_vars: | |
if os.getenv(var) is None: | |
missing_vars.append(var) | |
debug_print(f"Erro: A variável de ambiente '{var}' não está definida.") | |
else: | |
debug_print(f"{var} carregada com sucesso.") | |
if missing_vars: | |
raise ValueError(f"Variáveis de ambiente faltando: {', '.join(missing_vars)}") | |
debug_print("Iniciando autenticação com Twitter...") | |
# Autenticação com Twitter | |
client = tweepy.Client( | |
bearer_token=os.getenv('TWITTER_BEARER_TOKEN'), | |
consumer_key=os.getenv('TWITTER_API_KEY'), | |
consumer_secret=os.getenv('TWITTER_API_SECRET_KEY'), | |
access_token=os.getenv('TWITTER_ACCESS_TOKEN'), | |
access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'), | |
wait_on_rate_limit=True | |
) | |
# Inicializar modelo de geração de texto | |
debug_print("Inicializando modelo de geração de texto...") | |
model, tokenizer = initialize_text_generator() | |
# Query principal | |
query = 'BBB25 lang:pt -is:retweet -is:reply' | |
tweet_fields = ['text', 'created_at', 'lang', 'public_metrics'] | |
debug_print("Iniciando busca principal de tweets...") | |
with st.spinner('Buscando tweets...'): | |
tweets = fetch_tweets(client, query, tweet_fields) | |
if tweets is None: | |
st.error("Não foi possível obter tweets") | |
return | |
if not tweets.data: | |
st.warning("Nenhum tweet encontrado com os critérios especificados") | |
debug_print("Busca retornou vazia") | |
return | |
debug_print(f"Encontrados {len(tweets.data)} tweets") | |
# Extrair contexto dos tweets | |
context = extract_context_from_tweets(tweets.data) | |
debug_print("Contexto extraído dos tweets:") | |
debug_print(f"Participantes mencionados: {context['participants']}") | |
debug_print(f"Eventos detectados: {context['events']}") | |
# Análise de sentimentos | |
with st.spinner('Analisando sentimentos...'): | |
debug_print("Iniciando análise de sentimentos...") | |
sentiment_pipeline = pipeline( | |
"sentiment-analysis", | |
model="nlptown/bert-base-multilingual-uncased-sentiment" | |
) | |
sentiments = [] | |
for tweet in tweets.data: | |
if hasattr(tweet, 'lang') and tweet.lang == 'pt': | |
result = sentiment_pipeline(tweet.text) | |
rating = int(result[0]['label'].split()[0]) | |
if rating >= 4: | |
sentiments.append('positive') | |
elif rating <= 2: | |
sentiments.append('negative') | |
else: | |
sentiments.append('neutral') | |
debug_print(f"Sentimento analisado: {rating} estrelas") | |
time.sleep(1) | |
# Calcular taxas | |
if sentiments: | |
positive = sentiments.count('positive') | |
negative = sentiments.count('negative') | |
neutral = sentiments.count('neutral') | |
total = len(sentiments) | |
debug_print(f"Total de sentimentos analisados: {total}") | |
sentiment_ratios = { | |
'positive': positive / total, | |
'negative': negative / total, | |
'neutral': neutral / total | |
} | |
# Gerar comentário usando IA | |
with st.spinner('Gerando novo comentário...'): | |
debug_print("Iniciando geração de comentário com IA...") | |
tweet_text = generate_comment(context, sentiment_ratios, model, tokenizer) | |
debug_print(f"Comentário gerado: {tweet_text}") | |
# Postar tweet | |
with st.spinner('Postando tweet...'): | |
debug_print("Tentando postar tweet...") | |
try: | |
post_tweet(client, tweet_text) | |
st.success("Tweet postado com sucesso!") | |
debug_print("Tweet postado com sucesso") | |
except Exception as e: | |
st.error(f"Erro ao postar tweet: {str(e)}") | |
debug_print(f"Erro ao postar tweet: {str(e)}") | |
# Interface Streamlit | |
st.title("Resultados da Análise") | |
# Mostrar estatísticas | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.metric("Sentimento Positivo", f"{sentiment_ratios['positive']:.1%}") | |
with col2: | |
st.metric("Sentimento Neutro", f"{sentiment_ratios['neutral']:.1%}") | |
with col3: | |
st.metric("Sentimento Negativo", f"{sentiment_ratios['negative']:.1%}") | |
# Mostrar contexto extraído | |
st.subheader("Contexto Analisado") | |
st.write("Participantes em destaque:", ", ".join(context['participants'])) | |
st.write("Eventos detectados:", ", ".join(context['events'])) | |
# Mostrar tweet gerado | |
st.subheader("Tweet Gerado e Postado") | |
st.write(tweet_text) | |
# Logging | |
debug_print("Salvando log...") | |
log_entry = { | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'positive_ratio': sentiment_ratios['positive'], | |
'negative_ratio': sentiment_ratios['negative'], | |
'neutral_ratio': sentiment_ratios['neutral'], | |
'context': context, | |
'tweet': tweet_text | |
} | |
with open('posting_log.txt', 'a') as f: | |
f.write(f"{str(log_entry)}\n") | |
debug_print("Log salvo com sucesso") | |
except Exception as e: | |
st.error(f"Erro: {str(e)}") | |
debug_print(f"Erro fatal: {str(e)}") | |
raise e | |
finally: | |
st.markdown("---") | |
st.markdown( | |
""" | |
<div style='text-align: center'> | |
<small>Desenvolvido com ❤️ usando Streamlit e Transformers</small> | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
if __name__ == "__main__": | |
main() |