File size: 10,752 Bytes
666750d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4955f2d
666750d
4955f2d
 
666750d
 
 
 
4955f2d
 
666750d
 
 
4955f2d
666750d
 
 
 
4955f2d
 
 
 
666750d
 
4955f2d
 
 
 
666750d
4955f2d
 
 
 
666750d
4955f2d
 
 
666750d
 
 
 
4955f2d
 
 
666750d
4955f2d
 
 
666750d
 
 
 
4955f2d
 
 
 
666750d
4955f2d
 
666750d
4955f2d
 
666750d
4955f2d
666750d
4955f2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
666750d
 
 
 
4955f2d
 
666750d
 
 
4955f2d
 
666750d
 
4955f2d
 
 
 
666750d
4955f2d
666750d
 
 
4955f2d
 
 
666750d
4955f2d
 
 
 
666750d
4955f2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import re
import gradio as gr
from youtube_transcript_api._api import YouTubeTranscriptApi
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound


def _extract_video_id(youtube_url: str) -> str | None:
    """
    Extracts the YouTube video ID from a URL.
    Handles standard, shortened, and embed URLs.
    """
    # Standard URL: https://www.youtube.com/watch?v=VIDEO_ID
    match = re.search(r"watch\?v=([^&]+)", youtube_url)
    if match:
        return match.group(1)

    # Shortened URL: https://youtu.be/VIDEO_ID
    match = re.search(r"youtu\.be/([^?&]+)", youtube_url)
    if match:
        return match.group(1)

    # Embed URL: https://www.youtube.com/embed/VIDEO_ID
    match = re.search(r"youtube\.com/embed/([^?&]+)", youtube_url)
    if match:
        return match.group(1)

    # Video ID directly passed
    if re.fullmatch(r"^[a-zA-Z0-9_-]{11}$", youtube_url):
        return youtube_url
    return None


def get_youtube_video_transcript_scraper(video_url_or_id: str, lang_preference_list: list[str]) -> str:
    """
    Retrieves the transcript for a given YouTube video URL or video ID using your specified youtube_transcript_api methods.
    It tries to fetch the transcript in the preferred languages first.

    Args:
        video_url_or_id (str): The full YouTube video URL (e.g., "https://www.youtube.com/watch?v=VIDEO_ID") 
                               or just the 11-character video ID.
        lang_preference_list (list[str]): A list of language codes to try for the transcript, in order of preference.
                                     Example: ['en', 'en-US', 'es'].

    Returns:
        str: The concatenated transcript text if successful.
             An error message string if the transcript cannot be fetched.
    """
    video_id = _extract_video_id(video_url_or_id)

    if not video_id:
        return f"Error: Invalid YouTube video URL or ID: '{video_url_or_id}'. Could not extract a valid video ID."

    if not lang_preference_list:
        return "Error: Language preference list ('lang_preference_list') cannot be empty."

    try:
        # Using your specified API instantiation and list method
        api = YouTubeTranscriptApi()
        transcript_list_obj = api.list(
            video_id)  # This is TranscriptList object

        transcript_found = None

        # Try to find manually created transcript in preferred languages
        for lang_code in lang_preference_list:
            try:
                # Using your specified find_transcript method
                transcript_found = transcript_list_obj.find_transcript([
                                                                       lang_code])
                break
            except NoTranscriptFound:
                continue

        # If not found, try generated transcript in preferred languages
        if not transcript_found:
            for lang_code in lang_preference_list:
                try:
                    # Using your specified find_generated_transcript method
                    transcript_found = transcript_list_obj.find_generated_transcript([
                                                                                     lang_code])
                    break
                except NoTranscriptFound:
                    continue

        # Fallback logic (similar to your original code's structure)
        english_fallbacks = ['en', 'en-US', 'en-GB', 'en-AU', 'en-CA', 'en-IN']
        already_tried_english = any(lang.lower().startswith(
            'en') for lang in lang_preference_list)

        if not transcript_found and not already_tried_english:
            # Try any available English transcript (manual first)
            try:
                transcript_found = transcript_list_obj.find_transcript(
                    english_fallbacks)
            except NoTranscriptFound:
                # Then try generated English
                try:
                    transcript_found = transcript_list_obj.find_generated_transcript(
                        english_fallbacks)
                except NoTranscriptFound:
                    pass  # No English transcript found

        # If still not found, try the first available original language (as per your initial logic)
        if not transcript_found:
            try:
                # This part requires iterating through the TranscriptList object if no specific methods like "get_first" exist.
                # Your original code used `next(iter(transcript_list_obj))` which implies the object is iterable.
                # Let's assume the TranscriptList object itself can be iterated or has a way to get its items.
                # A more direct way, if the object behaves like a list of available transcripts:
                print(f"Notice: No transcript found in preferred languages or English for video ID '{video_id}'. "
                      "Attempting to fetch the first available original language transcript.")

                # Iterate through all available transcripts in the list_obj
                # This assumes transcript_list_obj is iterable and yields transcript objects directly.
                # Based on your original code `next(iter(transcript_list))`, where transcript_list was from `api.list()`,
                # this should work similarly.
                for tr in transcript_list_obj:  # transcript_list_obj is a TranscriptList
                    transcript_found = tr  # Get the first one and break
                    break
                if not transcript_found:  # If loop completed without finding any
                    raise StopIteration  # Mimic original behavior to be caught below

            except StopIteration:  # No transcripts at all
                return (f"Error: No transcripts at all seem to be available for video ID '{video_id}'. "
                        f"Checked preferred: {lang_preference_list}, English fallbacks, and any original language.")
            except NoTranscriptFound:  # Should ideally be caught by StopIteration if list is empty
                return (f"Error: No transcripts found for video ID '{video_id}' after trying preferred, English, and original languages.")

        if transcript_found:  # transcript_found is a Transcript object
            full_transcript_data = transcript_found.fetch()
            transcript_text = " ".join([segment.text
                                       for segment in full_transcript_data])
            return transcript_text
        else:
            return (f"Error: Could not find any suitable transcript for video ID '{video_id}'. "
                    f"Preferred languages: {lang_preference_list}. Also checked English and original languages if applicable.")

    except TranscriptsDisabled:
        return f"Error: Transcripts are disabled for video ID '{video_id}'."
    except NoTranscriptFound:  # This can be raised by list_transcripts directly if no captions at all for the video
        return f"Error: No transcripts whatsoever found for video ID '{video_id}'. The video might not have any captions initially."
    except Exception as e:
        error_type = type(e).__name__
        # Check for common youtube_transcript_api specific errors
        if "VideoUnavailable" in error_type:  # Common error from the library
            return f"Error: Video '{video_id}' is unavailable. It might be private, deleted, or geo-restricted."
        # Heuristic from your original code
        if "video ID" in str(e).lower() or "parameter" in str(e).lower():
            return f"Error: Could not retrieve transcript for video ID '{video_id}'. It might be an invalid ID or other parameter issue. (API Error: {error_type})"
        return f"Error: An unexpected error occurred while fetching transcript for video ID '{video_id}': {error_type} - {str(e)}"


def gradio_mcp_handler(video_url_or_id: str, lang_preference_str: str):
    """
    MCP tool handler to retrieve YouTube video transcript using youtube_transcript_api.

    Args:
        video_url_or_id (str): The YouTube video URL or its 11-character ID.
        lang_preference_str (str): A comma-separated string of preferred language codes for the transcript
                                   (e.g., "en,en-US,es"). Defaults to "en" if empty or invalid.

    Returns:
        str: The fetched transcript or an error message.
    """
    if not video_url_or_id.strip():
        return "Error: 'video_url_or_id' argument cannot be empty."

    if lang_preference_str and lang_preference_str.strip():
        lang_list = [lang.strip()
                     for lang in lang_preference_str.split(',') if lang.strip()]
    else:
        lang_list = ['en']

    if not lang_list:  # Handle cases like lang_preference_str = ","
        lang_list = ['en']

    return get_youtube_video_transcript_scraper(video_url_or_id, lang_list)


# Define Gradio input components for MCP
inputs = [
    gr.Textbox(
        label="YouTube Video URL or ID",
        placeholder="e.g., https://www.youtube.com/watch?v=VIDEO_ID or VIDEO_ID"
    ),
    gr.Textbox(
        label="Preferred Language Codes (comma-separated)",
        value="en,en-US",
        placeholder="e.g., en,es,fr (default: en)"
    )
]

outputs = gr.Textbox(
    label="Transcript Output",
    lines=15,
    show_copy_button=True
)

demo = gr.Interface(
    fn=gradio_mcp_handler,
    inputs=inputs,
    outputs=outputs,
    title="YouTube Transcript Retriever (youtube-transcript-api)",
    description=(
        "Enter YouTube video URL/ID and comma-separated language codes to fetch transcript using 'youtube-transcript-api'. "
        "MCP argument descriptions from handler's docstring."
    ),
    allow_flagging='never',
    examples=[
        ["https://www.youtube.com/watch?v=Sd6F2pfKJmk", "en"],
        ["Sd6F2pfKJmk", "en,ja"],
        ["https://www.youtube.com/watch?v=rokGy0huYEA", "ja,en"]
    ],
    article=(
        "**How to Use:**\n"
        "1. Paste YouTube video URL or 11-character video ID.\n"
        "2. Enter comma-separated language codes (e.g., `en-GB,en,es`). Defaults to `en` if empty.\n"
        "3. Click 'Submit'.\n\n"
        "**MCP Server Information:**\n"
        "Launched with `mcp_server=True`, exposes an MCP tool.\n"
        "- Tool arguments `video_url_or_id` (str) and `lang_preference_str` (str) are defined in handler docstring.\n"
        "- Schema: `/gradio_api/mcp/schema`. Endpoint: `/gradio_api/mcp/sse`."
    )
)

if __name__ == '__main__':
    print("Gradio app starting with your specified youtube-transcript-api methods...")
    print("MCP Server integration enabled (mcp_server=True).")
    print("Ensure 'gradio[mcp]' and 'youtube-transcript-api' are installed.")
    demo.launch(mcp_server=True)