AnalysisWithMSR commited on
Commit
b248ec3
·
verified ·
1 Parent(s): 5552067

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +103 -79
app.py CHANGED
@@ -5,29 +5,16 @@ import whisper
5
  from pydub import AudioSegment
6
  import tempfile
7
  from transformers import pipeline
 
8
  from youtube_transcript_api import YouTubeTranscriptApi
9
  import torch
10
  import openai
11
  import json
12
  from urllib.parse import urlparse, parse_qs
13
  import os
14
- import gradio as gr
15
 
16
- # API Keys setup
17
- youtube_api_key = os.getenv("YOUTUBE_API_KEY") # Set these as environment variables
18
- openai_api_key = os.getenv("OPENAI_API_KEY")
19
- openai.api_key = openai_api_key
20
-
21
- # Validation for missing API keys
22
- if not youtube_api_key:
23
- raise ValueError("YOUTUBE_API_KEY is not set. Please set it as an environment variable.")
24
-
25
- if not openai_api_key:
26
- raise ValueError("OPENAI_API_KEY is not set. Please set it as an environment variable.")
27
-
28
- # Utility Functions
29
  def extract_video_id(url):
30
- """Extract the video ID from a YouTube URL."""
31
  try:
32
  parsed_url = urlparse(url)
33
  if "youtube.com" in parsed_url.netloc:
@@ -35,15 +22,17 @@ def extract_video_id(url):
35
  return query_params.get('v', [None])[0]
36
  elif "youtu.be" in parsed_url.netloc:
37
  return parsed_url.path.strip("/")
38
- return None
 
 
39
  except Exception as e:
40
  print(f"Error parsing URL: {e}")
41
  return None
42
 
43
- def get_video_duration(video_id):
44
- """Fetch the video duration."""
45
  try:
46
- youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=youtube_api_key)
47
  request = youtube.videos().list(part="contentDetails", id=video_id)
48
  response = request.execute()
49
  if response["items"]:
@@ -53,86 +42,104 @@ def get_video_duration(video_id):
53
  minutes = int(match.group(2)) if match.group(2) else 0
54
  seconds = int(match.group(3)) if match.group(3) else 0
55
  return hours * 60 + minutes + seconds / 60
56
- return None
 
 
57
  except Exception as e:
58
- print(f"Error fetching duration: {e}")
59
  return None
60
 
61
  def download_and_transcribe_with_whisper(youtube_url):
62
- """Download audio and transcribe using Whisper."""
63
  try:
64
  with tempfile.TemporaryDirectory() as temp_dir:
65
  temp_audio_file = os.path.join(temp_dir, "audio.mp3")
 
66
  ydl_opts = {
67
  'format': 'bestaudio/best',
68
  'outtmpl': temp_audio_file,
69
- 'postprocessors': [{
70
- 'key': 'FFmpegExtractAudio',
71
- 'preferredcodec': 'mp3',
72
- 'preferredquality': '192',
73
- }],
74
  }
 
 
75
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
76
  ydl.download([youtube_url])
77
 
 
78
  audio = AudioSegment.from_file(temp_audio_file)
79
  wav_file = os.path.join(temp_dir, "audio.wav")
80
  audio.export(wav_file, format="wav")
81
 
 
82
  model = whisper.load_model("large")
83
  result = model.transcribe(wav_file)
84
- return result['text']
 
 
85
  except Exception as e:
86
- print(f"Error during Whisper transcription: {e}")
87
  return None
88
 
89
  def get_transcript_from_youtube_api(video_id, video_length):
90
- """Fetch transcript using YouTubeTranscriptApi."""
91
  try:
92
  transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
 
93
  for transcript in transcript_list:
94
  if not transcript.is_generated:
95
- return " ".join(segment['text'] for segment in transcript.fetch())
 
 
96
  if video_length > 15:
97
  auto_transcript = transcript_list.find_generated_transcript(['en'])
98
- return " ".join(segment['text'] for segment in auto_transcript.fetch())
 
 
 
 
99
  return None
 
100
  except Exception as e:
101
  print(f"Error fetching transcript: {e}")
102
  return None
103
 
104
- def get_transcript(youtube_url):
105
- """Fetch transcript or use Whisper fallback."""
106
  video_id = extract_video_id(youtube_url)
107
  if not video_id:
108
- return "Invalid or unsupported YouTube URL."
109
- video_length = get_video_duration(video_id)
110
- if video_length:
111
- transcript = get_transcript_from_youtube_api(video_id, video_length)
112
- return transcript if transcript else download_and_transcribe_with_whisper(youtube_url)
113
- return "Error fetching video details."
114
 
115
- def summarize_text(text):
116
- """Summarize text using Hugging Face's BART model."""
117
- try:
118
- summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=0 if torch.cuda.is_available() else -1)
119
- max_input_length = 1024
120
- chunk_overlap = 100
121
- text_chunks = [
122
- text[i:i + max_input_length]
123
- for i in range(0, len(text), max_input_length - chunk_overlap)
124
- ]
125
- summaries = [
126
- summarizer(chunk, max_length=100, min_length=50, do_sample=False)[0]['summary_text']
127
- for chunk in text_chunks
128
- ]
129
- return " ".join(summaries)
130
- except Exception as e:
131
- print(f"Error during summarization: {e}")
132
  return None
133
 
134
- def generate_optimized_content(summarized_text):
135
- """Generate optimized video metadata using GPT."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  prompt = f"""
137
  Analyze the following summarized YouTube video transcript and:
138
  1. Extract the top 10 keywords.
@@ -141,7 +148,7 @@ def generate_optimized_content(summarized_text):
141
  4. Generate related tags for the video.
142
 
143
  Summarized Transcript:
144
- {summarized_text}
145
 
146
  Provide the results in the following JSON format:
147
  {{
@@ -151,7 +158,9 @@ def generate_optimized_content(summarized_text):
151
  "tags": ["tag1", "tag2", ..., "tag10"]
152
  }}
153
  """
 
154
  try:
 
155
  response = openai.ChatCompletion.create(
156
  model="gpt-3.5-turbo",
157
  messages=[
@@ -159,28 +168,43 @@ def generate_optimized_content(summarized_text):
159
  {"role": "user", "content": prompt}
160
  ]
161
  )
162
- return json.loads(response['choices'][0]['message']['content'])
163
- except Exception as e:
164
- print(f"Error generating metadata: {e}")
165
- return {"error": "Unable to generate metadata."}
166
 
167
- # Main Gradio Interface
168
- def process_video(youtube_url):
169
- """Complete video processing workflow."""
170
- transcript = get_transcript(youtube_url)
 
 
 
 
 
 
 
 
 
171
  if not transcript:
172
- return {"error": "Could not fetch the transcript. Please try another video."}
173
- summary = summarize_text(transcript)
174
- optimized_content = generate_optimized_content(summary)
175
- return optimized_content
176
-
 
 
 
 
 
177
  iface = gr.Interface(
178
- fn=process_video,
179
- inputs=gr.Textbox(label="Enter YouTube URL"),
180
- outputs=gr.JSON(label="Optimized Metadata"),
181
- title="YouTube Video SEO Optimizer",
182
- description="Paste a YouTube URL to generate an SEO-friendly title, description, tags, and keywords."
183
  )
184
 
 
185
  if __name__ == "__main__":
186
- iface.launch()
 
5
  from pydub import AudioSegment
6
  import tempfile
7
  from transformers import pipeline
8
+ from pytrends.request import TrendReq
9
  from youtube_transcript_api import YouTubeTranscriptApi
10
  import torch
11
  import openai
12
  import json
13
  from urllib.parse import urlparse, parse_qs
14
  import os
 
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  def extract_video_id(url):
17
+ """Extracts the video ID from a YouTube URL."""
18
  try:
19
  parsed_url = urlparse(url)
20
  if "youtube.com" in parsed_url.netloc:
 
22
  return query_params.get('v', [None])[0]
23
  elif "youtu.be" in parsed_url.netloc:
24
  return parsed_url.path.strip("/")
25
+ else:
26
+ print("Invalid YouTube URL.")
27
+ return None
28
  except Exception as e:
29
  print(f"Error parsing URL: {e}")
30
  return None
31
 
32
+ def get_video_duration(video_id, api_key):
33
+ """Fetches the video duration in minutes."""
34
  try:
35
+ youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=api_key)
36
  request = youtube.videos().list(part="contentDetails", id=video_id)
37
  response = request.execute()
38
  if response["items"]:
 
42
  minutes = int(match.group(2)) if match.group(2) else 0
43
  seconds = int(match.group(3)) if match.group(3) else 0
44
  return hours * 60 + minutes + seconds / 60
45
+ else:
46
+ print("No video details found.")
47
+ return None
48
  except Exception as e:
49
+ print(f"Error fetching video duration: {e}")
50
  return None
51
 
52
  def download_and_transcribe_with_whisper(youtube_url):
 
53
  try:
54
  with tempfile.TemporaryDirectory() as temp_dir:
55
  temp_audio_file = os.path.join(temp_dir, "audio.mp3")
56
+
57
  ydl_opts = {
58
  'format': 'bestaudio/best',
59
  'outtmpl': temp_audio_file,
60
+ 'extractaudio': True,
61
+ 'audioquality': 1,
 
 
 
62
  }
63
+
64
+ # Download audio using yt-dlp
65
  with yt_dlp.YoutubeDL(ydl_opts) as ydl:
66
  ydl.download([youtube_url])
67
 
68
+ # Convert to wav for Whisper
69
  audio = AudioSegment.from_file(temp_audio_file)
70
  wav_file = os.path.join(temp_dir, "audio.wav")
71
  audio.export(wav_file, format="wav")
72
 
73
+ # Run Whisper transcription
74
  model = whisper.load_model("large")
75
  result = model.transcribe(wav_file)
76
+ transcript = result['text']
77
+ return transcript
78
+
79
  except Exception as e:
80
+ print(f"Error during transcription: {e}")
81
  return None
82
 
83
  def get_transcript_from_youtube_api(video_id, video_length):
84
+ """Fetches transcript using YouTube API if available."""
85
  try:
86
  transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
87
+
88
  for transcript in transcript_list:
89
  if not transcript.is_generated:
90
+ segments = transcript.fetch()
91
+ return " ".join(segment['text'] for segment in segments)
92
+
93
  if video_length > 15:
94
  auto_transcript = transcript_list.find_generated_transcript(['en'])
95
+ if auto_transcript:
96
+ segments = auto_transcript.fetch()
97
+ return " ".join(segment['text'] for segment in segments)
98
+
99
+ print("Manual transcript not available, and video is too short for auto-transcript.")
100
  return None
101
+
102
  except Exception as e:
103
  print(f"Error fetching transcript: {e}")
104
  return None
105
 
106
+ def get_transcript(youtube_url, api_key):
107
+ """Gets transcript from YouTube API or Whisper if unavailable."""
108
  video_id = extract_video_id(youtube_url)
109
  if not video_id:
110
+ print("Invalid or unsupported YouTube URL.")
111
+ return None
 
 
 
 
112
 
113
+ video_length = get_video_duration(video_id, api_key)
114
+ if video_length is not None:
115
+ print(f"Video length: {video_length:.2f} minutes.")
116
+ transcript = get_transcript_from_youtube_api(video_id, video_length)
117
+ if transcript:
118
+ return transcript
119
+ print("Using Whisper for transcription.")
120
+ return download_and_transcribe_with_whisper(youtube_url)
121
+ else:
122
+ print("Error fetching video duration.")
 
 
 
 
 
 
 
123
  return None
124
 
125
+ def summarize_text_huggingface(text):
126
+ """Summarizes text using a Hugging Face summarization model."""
127
+ summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=0 if torch.cuda.is_available() else -1)
128
+ max_input_length = 1024
129
+ chunk_overlap = 100
130
+ text_chunks = [
131
+ text[i:i + max_input_length]
132
+ for i in range(0, len(text), max_input_length - chunk_overlap)
133
+ ]
134
+ summaries = [
135
+ summarizer(chunk, max_length=100, min_length=50, do_sample=False)[0]['summary_text']
136
+ for chunk in text_chunks
137
+ ]
138
+ return " ".join(summaries)
139
+
140
+ def generate_optimized_content(api_key, summarized_transcript):
141
+ openai.api_key = api_key
142
+
143
  prompt = f"""
144
  Analyze the following summarized YouTube video transcript and:
145
  1. Extract the top 10 keywords.
 
148
  4. Generate related tags for the video.
149
 
150
  Summarized Transcript:
151
+ {summarized_transcript}
152
 
153
  Provide the results in the following JSON format:
154
  {{
 
158
  "tags": ["tag1", "tag2", ..., "tag10"]
159
  }}
160
  """
161
+
162
  try:
163
+ # Use the updated OpenAI API format for chat completions
164
  response = openai.ChatCompletion.create(
165
  model="gpt-3.5-turbo",
166
  messages=[
 
168
  {"role": "user", "content": prompt}
169
  ]
170
  )
171
+ # Extract and parse the response
172
+ response_content = response['choices'][0]['message']['content']
173
+ content = json.loads(response_content)
174
+ return content
175
 
176
+ except Exception as e:
177
+ print(f"Error generating content: {e}")
178
+ return None
179
+ def youtube_seo_pipeline(youtube_url):
180
+ openai.api_key = OPENAI_API_KEY
181
+ if not YOUTUBE_API_KEY or not OPENAI_API_KEY:
182
+ return "API keys missing! Please check environment variables."
183
+
184
+ video_id = extract_video_id(youtube_url)
185
+ if not video_id:
186
+ return "Invalid YouTube URL."
187
+
188
+ transcript = get_transcript(youtube_url, YOUTUBE_API_KEY)
189
  if not transcript:
190
+ return "Failed to fetch transcript. Try another video."
191
+
192
+ summarized_text = summarize_text_huggingface(transcript)
193
+ optimized_content = generate_optimized_content(OPENAI_API_KEY, summarized_text)
194
+ if optimized_content:
195
+ return json.dumps(optimized_content, indent=4)
196
+ else:
197
+ return "Failed to generate SEO content."
198
+
199
+ # Define the Gradio Interface
200
  iface = gr.Interface(
201
+ fn=youtube_seo_pipeline,
202
+ inputs="text",
203
+ outputs="text",
204
+ title="YouTube SEO Optimizer",
205
+ description="Enter a YouTube video URL to fetch and optimize SEO content (title, description, tags, and keywords)."
206
  )
207
 
208
+ # Run the Gradio app
209
  if __name__ == "__main__":
210
+ iface.launch()