File size: 5,781 Bytes
69894ec
49b79e1
69894ec
1224145
5460614
49b79e1
 
5e1abf0
90c13c1
 
5e1abf0
5460614
 
 
38fa9fc
90c13c1
 
38fa9fc
a8540ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e1abf0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4dad7a7
38fa9fc
 
5e1abf0
38fa9fc
 
4dad7a7
38fa9fc
1224145
5e1abf0
 
5460614
5e1abf0
 
 
 
 
49b79e1
5e1abf0
49b79e1
4dad7a7
49b79e1
 
5e1abf0
 
1224145
5e1abf0
1224145
 
 
90c13c1
e5ab730
1224145
90c13c1
1224145
90c13c1
5e1abf0
6ce5654
 
 
a8540ed
90c13c1
1224145
90c13c1
69894ec
90c13c1
 
5e1abf0
 
 
 
69894ec
90c13c1
 
5e1abf0
 
 
90c13c1
5e1abf0
 
 
69894ec
90c13c1
 
 
 
5e1abf0
69894ec
1224145
38fa9fc
90c13c1
5e1abf0
 
38fa9fc
49b79e1
5e1abf0
4dad7a7
49b79e1
1224145
 
 
a6ada40
1224145
 
e5ab730
1224145
a6ada40
 
 
 
4dad7a7
a6ada40
1224145
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import platform
from typing import Tuple, List

import torch
from hfendpoints.http import Context, run
from hfendpoints.tasks import Usage
from hfendpoints.tasks.embedding import EmbeddingRequest, EmbeddingResponse
from intel_extension_for_pytorch.cpu.runtime import pin
from loguru import logger
from sentence_transformers import SentenceTransformer
from torch.backends.mkldnn import VERBOSE_ON_CREATION, VERBOSE_OFF
from torch.nn import Module

from hfendpoints import EndpointConfig, Handler, __version__

# Not used for now
SUPPORTED_AMP_DTYPES = {torch.float32, torch.bfloat16}


def has_bf16_support() -> bool:
    """
    Helper to detect if the hardware supports bfloat16

    Note:
        Intel libraries, such as oneDNN, provide emulation for bfloat16 even if the underlying hardware does not support it.
    This means CPU ISA with AVX512 will work, even if not with the same performances as one could expect from CPU ISA with AVX512_BF16.
    Also, AMX_BF16 is implicitly assumed true when AVX512_BF16 is true (that's the case on Intel Sapphire Rapids).

    :return: True if the hardware supports (or can emulate) bfloat16, False otherwise
    """
    return torch.cpu._is_avx512_bf16_supported() or torch.cpu._is_avx512_supported()


def get_cores_pinning_strategy() -> "CPUPool":
    import intel_extension_for_pytorch as ipex

    # Retrieve the number of nodes
    num_nodes = ipex.cpu.runtime.runtime_utils.get_num_nodes()
    cpu_cores_id = [ipex.cpu.runtime.runtime_utils.get_core_list_of_node_id(node_id) for node_id in range(num_nodes)]

    if num_nodes == 1:
        pinned_cpu_cores_id = cpu_cores_id[0]
    else:
        pinned_cpu_cores_id = [core_id for node in cpu_cores_id for core_id in node]

    logger.info(f"Pinning CPU cores to {pinned_cpu_cores_id}")
    return ipex.cpu.runtime.CPUPool(pinned_cpu_cores_id)


def get_usage(mask: List[torch.IntTensor]) -> Usage:
    """
    Compute the number of processed tokens and return as Usage object matching OpenAI
    :param mask: Attention mask tensor, as returned by the model
    :return: Usage object matching OpenAI specifications
    """
    num_tokens = sum(x.sum().detach().item() for x in mask)
    return Usage(prompt_tokens=num_tokens, total_tokens=num_tokens)


class SentenceTransformerWithUsage(Module):
    __slots__ = ("_model",)

    def __init__(self, model: SentenceTransformer):
        super().__init__()
        self._model = model

    def forward(self, sentences: list[str]) -> Tuple[List[List[int]], List[List[int]]]:
        vectors = self._model.encode(sentences, output_value=None)
        return (
            [vector['attention_mask'] for vector in vectors],
            [vector['sentence_embedding'].tolist() for vector in vectors]
        )


class SentenceTransformerHandler(Handler):
    __slots__ = ("_config", "_dtype", "_model", "_model_name", "_pinned_cores", "_use_amp")

    def __init__(self, config: EndpointConfig):
        self._config = config
        self._dtype = torch.float32
        self._model_name = config.model_id

        self._allocate_model()

    def _allocate_model(self):
        # Denormal number is used to store tiny numbers that are close to 0.
        # Computations with denormal numbers are remarkably slower than normalized number.
        torch.set_flush_denormal(True)

        dtype = torch.bfloat16 if has_bf16_support() else torch.float32
        model = SentenceTransformer(self._config.model_id, device="cpu", model_kwargs={"torch_dtype": dtype})

        if platform.machine() == "x86_64":
            import intel_extension_for_pytorch as ipex
            logger.info(f"x64 platform detected: {platform.processor()}")

            # Retrieve all the physical cores ID for all the CPU nodes
            self._pinned_cores = get_cores_pinning_strategy()

            # Optimize the model for inference
            with torch.inference_mode():
                model = model.eval()
                model = model.to(memory_format=torch.channels_last)

                # Apply IPEx optimizations
                model = ipex.optimize(model, dtype=dtype, weights_prepack=True, graph_mode=True, concat_linear=True)
                model = torch.compile(model, dynamic=True, backend="ipex")

                # model = ipex.cpu.runtime.MultiStreamModule(SentenceTransformerWithUsage(model), num_streams=1)

        else:
            model = torch.compile(model)

        self._dtype = dtype
        self._use_amp = dtype in SUPPORTED_AMP_DTYPES
        self._model = SentenceTransformerWithUsage(model)

    async def __call__(self, request: EmbeddingRequest, ctx: Context) -> EmbeddingResponse:
        with torch.backends.mkldnn.verbose(VERBOSE_ON_CREATION if self._config.is_debug else VERBOSE_OFF):
            with torch.inference_mode(), torch.amp.autocast("cpu", dtype=self._dtype, enabled=self._use_amp):
                with pin(self._pinned_cores):
                    mask, vectors = self._model(request.input if request.is_batched else [request.input])

            # TODO: Change the way we return usage
            usage = get_usage(mask)
            vectors = vectors if request.is_batched else vectors[0]
            return EmbeddingResponse(embeddings=vectors, num_tokens=usage.total_tokens)


def entrypoint():
    # Readout the endpoint configuration from the provided environment variable
    config = EndpointConfig.from_env()

    logger.info(f"[Hugging Face Endpoint v{__version__}] Serving: {config.model_id}")

    # Allocate handler
    handler = SentenceTransformerHandler(config)

    # Allocate endpoint
    from hfendpoints.openai.embedding import EmbeddingEndpoint
    endpoint = EmbeddingEndpoint(handler)
    run(endpoint, config.interface, config.port)


if __name__ == "__main__":
    entrypoint()