Spaces:
Running
Running
| from typing import List, Optional, Union | |
| from openai import OpenAI | |
| import litellm | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| AsyncHTTPHandler, | |
| HTTPHandler, | |
| get_async_httpx_client, | |
| ) | |
| from litellm.llms.openai.openai import OpenAIChatCompletion | |
| from litellm.types.llms.azure_ai import ImageEmbeddingRequest | |
| from litellm.types.utils import EmbeddingResponse | |
| from litellm.utils import convert_to_model_response_object | |
| from .cohere_transformation import AzureAICohereConfig | |
| class AzureAIEmbedding(OpenAIChatCompletion): | |
| def _process_response( | |
| self, | |
| image_embedding_responses: Optional[List], | |
| text_embedding_responses: Optional[List], | |
| image_embeddings_idx: List[int], | |
| model_response: EmbeddingResponse, | |
| input: List, | |
| ): | |
| combined_responses = [] | |
| if ( | |
| image_embedding_responses is not None | |
| and text_embedding_responses is not None | |
| ): | |
| # Combine and order the results | |
| text_idx = 0 | |
| image_idx = 0 | |
| for idx in range(len(input)): | |
| if idx in image_embeddings_idx: | |
| combined_responses.append(image_embedding_responses[image_idx]) | |
| image_idx += 1 | |
| else: | |
| combined_responses.append(text_embedding_responses[text_idx]) | |
| text_idx += 1 | |
| model_response.data = combined_responses | |
| elif image_embedding_responses is not None: | |
| model_response.data = image_embedding_responses | |
| elif text_embedding_responses is not None: | |
| model_response.data = text_embedding_responses | |
| response = AzureAICohereConfig()._transform_response(response=model_response) # type: ignore | |
| return response | |
| async def async_image_embedding( | |
| self, | |
| model: str, | |
| data: ImageEmbeddingRequest, | |
| timeout: float, | |
| logging_obj, | |
| model_response: litellm.EmbeddingResponse, | |
| optional_params: dict, | |
| api_key: Optional[str], | |
| api_base: Optional[str], | |
| client: Optional[Union[HTTPHandler, AsyncHTTPHandler]] = None, | |
| ) -> EmbeddingResponse: | |
| if client is None or not isinstance(client, AsyncHTTPHandler): | |
| client = get_async_httpx_client( | |
| llm_provider=litellm.LlmProviders.AZURE_AI, | |
| params={"timeout": timeout}, | |
| ) | |
| url = "{}/images/embeddings".format(api_base) | |
| response = await client.post( | |
| url=url, | |
| json=data, # type: ignore | |
| headers={"Authorization": "Bearer {}".format(api_key)}, | |
| ) | |
| embedding_response = response.json() | |
| embedding_headers = dict(response.headers) | |
| returned_response: EmbeddingResponse = convert_to_model_response_object( # type: ignore | |
| response_object=embedding_response, | |
| model_response_object=model_response, | |
| response_type="embedding", | |
| stream=False, | |
| _response_headers=embedding_headers, | |
| ) | |
| return returned_response | |
| def image_embedding( | |
| self, | |
| model: str, | |
| data: ImageEmbeddingRequest, | |
| timeout: float, | |
| logging_obj, | |
| model_response: EmbeddingResponse, | |
| optional_params: dict, | |
| api_key: Optional[str], | |
| api_base: Optional[str], | |
| client: Optional[Union[HTTPHandler, AsyncHTTPHandler]] = None, | |
| ): | |
| if api_base is None: | |
| raise ValueError( | |
| "api_base is None. Please set AZURE_AI_API_BASE or dynamically via `api_base` param, to make the request." | |
| ) | |
| if api_key is None: | |
| raise ValueError( | |
| "api_key is None. Please set AZURE_AI_API_KEY or dynamically via `api_key` param, to make the request." | |
| ) | |
| if client is None or not isinstance(client, HTTPHandler): | |
| client = HTTPHandler(timeout=timeout, concurrent_limit=1) | |
| url = "{}/images/embeddings".format(api_base) | |
| response = client.post( | |
| url=url, | |
| json=data, # type: ignore | |
| headers={"Authorization": "Bearer {}".format(api_key)}, | |
| ) | |
| embedding_response = response.json() | |
| embedding_headers = dict(response.headers) | |
| returned_response: EmbeddingResponse = convert_to_model_response_object( # type: ignore | |
| response_object=embedding_response, | |
| model_response_object=model_response, | |
| response_type="embedding", | |
| stream=False, | |
| _response_headers=embedding_headers, | |
| ) | |
| return returned_response | |
| async def async_embedding( | |
| self, | |
| model: str, | |
| input: List, | |
| timeout: float, | |
| logging_obj, | |
| model_response: litellm.EmbeddingResponse, | |
| optional_params: dict, | |
| api_key: Optional[str] = None, | |
| api_base: Optional[str] = None, | |
| client=None, | |
| ) -> EmbeddingResponse: | |
| ( | |
| image_embeddings_request, | |
| v1_embeddings_request, | |
| image_embeddings_idx, | |
| ) = AzureAICohereConfig()._transform_request( | |
| input=input, optional_params=optional_params, model=model | |
| ) | |
| image_embedding_responses: Optional[List] = None | |
| text_embedding_responses: Optional[List] = None | |
| if image_embeddings_request["input"]: | |
| image_response = await self.async_image_embedding( | |
| model=model, | |
| data=image_embeddings_request, | |
| timeout=timeout, | |
| logging_obj=logging_obj, | |
| model_response=model_response, | |
| optional_params=optional_params, | |
| api_key=api_key, | |
| api_base=api_base, | |
| client=client, | |
| ) | |
| image_embedding_responses = image_response.data | |
| if image_embedding_responses is None: | |
| raise Exception("/image/embeddings route returned None Embeddings.") | |
| if v1_embeddings_request["input"]: | |
| response: EmbeddingResponse = await super().embedding( # type: ignore | |
| model=model, | |
| input=input, | |
| timeout=timeout, | |
| logging_obj=logging_obj, | |
| model_response=model_response, | |
| optional_params=optional_params, | |
| api_key=api_key, | |
| api_base=api_base, | |
| client=client, | |
| aembedding=True, | |
| ) | |
| text_embedding_responses = response.data | |
| if text_embedding_responses is None: | |
| raise Exception("/v1/embeddings route returned None Embeddings.") | |
| return self._process_response( | |
| image_embedding_responses=image_embedding_responses, | |
| text_embedding_responses=text_embedding_responses, | |
| image_embeddings_idx=image_embeddings_idx, | |
| model_response=model_response, | |
| input=input, | |
| ) | |
| def embedding( | |
| self, | |
| model: str, | |
| input: List, | |
| timeout: float, | |
| logging_obj, | |
| model_response: EmbeddingResponse, | |
| optional_params: dict, | |
| api_key: Optional[str] = None, | |
| api_base: Optional[str] = None, | |
| client=None, | |
| aembedding=None, | |
| max_retries: Optional[int] = None, | |
| ) -> EmbeddingResponse: | |
| """ | |
| - Separate image url from text | |
| -> route image url call to `/image/embeddings` | |
| -> route text call to `/v1/embeddings` (OpenAI route) | |
| assemble result in-order, and return | |
| """ | |
| if aembedding is True: | |
| return self.async_embedding( # type: ignore | |
| model, | |
| input, | |
| timeout, | |
| logging_obj, | |
| model_response, | |
| optional_params, | |
| api_key, | |
| api_base, | |
| client, | |
| ) | |
| ( | |
| image_embeddings_request, | |
| v1_embeddings_request, | |
| image_embeddings_idx, | |
| ) = AzureAICohereConfig()._transform_request( | |
| input=input, optional_params=optional_params, model=model | |
| ) | |
| image_embedding_responses: Optional[List] = None | |
| text_embedding_responses: Optional[List] = None | |
| if image_embeddings_request["input"]: | |
| image_response = self.image_embedding( | |
| model=model, | |
| data=image_embeddings_request, | |
| timeout=timeout, | |
| logging_obj=logging_obj, | |
| model_response=model_response, | |
| optional_params=optional_params, | |
| api_key=api_key, | |
| api_base=api_base, | |
| client=client, | |
| ) | |
| image_embedding_responses = image_response.data | |
| if image_embedding_responses is None: | |
| raise Exception("/image/embeddings route returned None Embeddings.") | |
| if v1_embeddings_request["input"]: | |
| response: EmbeddingResponse = super().embedding( # type: ignore | |
| model, | |
| input, | |
| timeout, | |
| logging_obj, | |
| model_response, | |
| optional_params, | |
| api_key, | |
| api_base, | |
| client=( | |
| client | |
| if client is not None and isinstance(client, OpenAI) | |
| else None | |
| ), | |
| aembedding=aembedding, | |
| ) | |
| text_embedding_responses = response.data | |
| if text_embedding_responses is None: | |
| raise Exception("/v1/embeddings route returned None Embeddings.") | |
| return self._process_response( | |
| image_embedding_responses=image_embedding_responses, | |
| text_embedding_responses=text_embedding_responses, | |
| image_embeddings_idx=image_embeddings_idx, | |
| model_response=model_response, | |
| input=input, | |
| ) | |