searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

vllm框架梳理以及昇腾适配

2024-09-10 09:23:35
484
0

一、vllm对外暴露调用接口如下:

from vllm import AsyncEngineArgs, AsyncLLMEngine, SamplingParams

1)从模型路径里加载模型参数:

args=AsyncEngineArgs(MODEL_PATH) 

2)根据模型参数加载和初始化模型:

model = AsyncLLMEngine.from_engine_args(args) 

3)设置推理采样参数:

sampling_params = SamplingParams(temperature, top_p, top_k, max_tokens, repetition_penalty, stop) 

4)输入请求prompt开始模型推理:

response_generator = model.generate(inputs=text_token_prompt, sampling_params=sampling_params, request_id=reqid )

二、vllm的整体框架如下图:

首先,vllm支持Parallel Sampling、Beam Search,即可能出现"1个prompt -> 多个outputs"的情况, 为了方便对1个prompt下所有的outputs进行集中管理,所以vllm引入SequenceGroup结构,即vllm对每一个输入请求会封装成一个SequenceGroup结构体,如下图:

然后,我们开始解析Scheduler模块,Scheduler负责对所有输入请求进行调度管理,决定当前哪些请求进行等待,哪些请求进行推理响应。

主要维护3个队列self.waiting, self.running, self.swapped(采用python的deque()实例,双端队列,允许从队列两侧添加或删除元素):

waiting队列用于存放所有还未开始做prefill推理的seq_group;

running队列用于存放当前正在做推理的seq_group; 

swapped队列用于存放被抢占的seq_group。

具体调度流程如下:

1、如果当前swapped队列为空,则检查是否能从waiting队列中调度seq_group:

(1)如果设备上所有block数量-该seq_group实际需要的block数量<水位线block数量,则该seq_group不做推理;

(2)如果设备中可用block数量-该seq_group实际需要的block数量<水位数block数量,则该seq_group延迟推理;

(3)如果累计num_batched_tokens大于max阈值,则该seq_group延迟推理;

(4)如果累计num_new_seqs大于max阈值,则该seq_group延迟推理;

(5)否则该请求被调度进行推理,从waiting队列出队,加入到running队列中。

2、如果当前swapped队列为空,且waiting队列中没有可调度的seq_group,则检查running队列中的seq_group:

(1)如果设备上可用block数>seq_group.num_seqs,则该seq_group被调度进行推理;

(2)否则将running队列中最后进队列的seq_group出队,加入到waiting队列或者swapped队列。

3、如果当前swapped队列非空,则检查swapped队列中的seq_group; 

(1)如果设备上可用block数<重新跑该seq_group需要的block数,则seq_group继续留在swapped队列中;

(2)如果累计num_new_seqs大于max阈值,则seq_group继续留在swapped队列中;

(3)否则该请求被调度进行推理,从swapped队列出队,加入到running队列中。

接下来,我们解析BlockManager模块,BlockManager负责对kv_cache进行管理,为每个请求分配缓存block id。

这里使用一个物理块表self.block_tables: Dict[int, List[PhysicalTokenBlock]],记录每个seq用了哪几个cache block,其中PhysicalTokenBlock定义如下:

class PhysicalTokenBlock:

   def __init__(self,  device:Device,  block_number:int,  block_size: int) -> None:

        self.device = device  # gpu/cpu

        self.block_number = block_number  # 该block在对应设备上的全局block index

        self.block_size = block_size  # 该物理块的尺寸(即槽位数量,默认为16)

        self.ref_count = 0  # 该物理块被多少个逻辑块引用

同时根据num_gpu_blocks数量创建gpu_allocator对象:

self.gpu_allocator = BlockAllocator(Device.GPU, block_size, num_gpu_blocks)

其中BlockAllocator定义如下,主要维护self.free_blocks列表:1)如果需要给seq分配1个block,则self.free_blocks.pop(); 2)如果需要回收block,则self.free_blocks.append(block)。

class BlockAllocator:

    def __init__(self,  device:Device,  block_size:int,  num_blocks:int) -> None:

        self.device = device

        self.block_size = block_size

        self.num_blocks = num_blocks

        self.free_blocks: BlockTable = []

        for i in range(num_blocks):

            block = PhysicalTokenBlock(device=device,  block_number=i,  block_size=block_size)

            self.free_blocks.append(block) 

另外每个请求实例中维护着一个属性:逻辑块列表List[LogicalTokenBlock],其中LogicalTokenBlock定义如下:

class LogicalTokenBlock:

    def __init__(self, block_number: int, block_size: int, ) -> None:

        self.block_number = block_number  # 逻辑块的block index

        self.block_size = block_size  # 每个逻辑块中有多少个槽位(默认为16)

        self.token_ids = [_BLANK_TOKEN_ID] * block_size

        self.num_tokens = 0  # 当前逻辑块中已经装下的token的数量

最后,我们来解析Worker模块,一个worker为绑定在1个gpu上的一个进程,主要负责两个工作:模型推理和kv_cache读写。

模型推理本质上类似于pytorch推理,在每个worker上会实例化一个ModelRunner对象,ModelRunner主要的成员函数有:

load_model( ):加载模型

prepare_input_tensors( ):将请求信息转换输入tensor

profile_run( ):模型预热,伪输入评估性能

execute_model( ):模型推理,随机性采样,输出结果

而kv_cache操作使用CacheEngine类对象实现,CacheEngine定义如下,注意这里CacheEngine区别于前面的BlockManager,BlockManager只是在维护物理块的id,而物理块的实际分配是模型在推理过程中根据物理块id来操作的。

class CacheEngine:

    def __init__(self,  cache_config:CacheConfig,  model_config:ModelConfig,  parallel_config:ParallelConfig) -> None:

        self.gpu_cache = self.allocate_gpu_cache()

        self.cpu_cache = self.allocate_cpu_cache()

其中allocate_gpu_cache定义如下:

def allocate_gpu_cache(self) -> List[KVCache]:

        gpu_cache: List[KVCache] = []

        key_block_shape = self.get_key_block_shape()  # (num_heads, head_size//x, block_size, x), 其中x为8

        value_block_shape = self.get_value_block_shape()  # (num_heads, head_size, block_size)

        for _ in range(self.num_layers):

            key_blocks = torch.empty(size=(self.num_gpu_blocks, *key_block_shape), dtype=self.dtype, device="cuda")

            value_blocks = torch.empty(size=(self.num_gpu_blocks, *value_block_shape), dtype=self.dtype, device="cuda")

            gpu_cache.append((key_blocks, value_blocks))

        return gpu_cache

三、昇腾适配

首先,我们需要重写worker中的ModelRunner对象,适配华为昇腾的atb_llm模型库,具体如下:

1、ModelRunner初始化时,对atb_llm.runner中的ModelRunner进行实例化;

from atb_llm.runner import ModelRunner as atb_model

class ModelRunner:

   def __init__():

       self.model = atb_model( )

2、load_model函数调用atb_llm里的模型加载函数:

def load_model():

   self.model.load_weights()

3、prepare_input_tensors函数需要针对atb_llm中的模型输入参数构造对应的输入tensor,具体有:

input_ids:torch.Tensor,输入tokens

position_ids:torch.Tensor,输入位置,即记录每个token的所在位置

is_prefill:bool,是否prefill阶段

kv_cache:List[Tuple[torch.Tensor, torch.Tensor]],申请的kv缓存

block_table:torch.Tensor,当前输入对应的block维度的cache ids

slot:torch.Tensor,当前输入对应的token维度的cache id

input_lengths:torch.Tensor,每个请求输入的长度

max_seq_len: int,最长请求输入的长度

lm_head_indices:Optional[torch.Tensor] = None,每个请求输入的最后一个token所在位置,只在prefill阶段使用

4、execute_model函数调用self.model.forward函数进行模型推理。

其次,我们需要重写kv_cache大小的计算方法:

1、获取显卡本身总的显存

self.max_memory = NpuHbmInfo.get_hbm_capacity(self.local_rank, self.world_size, self.model.soc_info.need_nz)

2、获取初始已占用的显存

self.init_memory = int(
            self.max_memory * NpuHbmInfo.get_hbm_usage(self.local_rank, self.world_size, self.model.soc_info.need_nz))

3、伪输入跑一遍模型后,获取已占用的显存:

self.warm_up_memory = int(
            self.max_memory * NpuHbmInfo.get_hbm_usage(self.local_rank, self.world_size, self.model.soc_info.need_nz))

4、总显存减去预留3gb再减去步骤3已占用的显存,得到的显存作为kv_cache:

free_memory = max_memory - ENV.reserved_memory_gb * (1 << 30) - (
                self.warm_up_memory if self.warm_up_memory != 0 else self.init_memory)

然后,我们还要修改CacheEngine中allocate_gpu_cache函数,具体如下:

def allocate_gpu_cache(self) -> List[KVCache]:

        gpu_cache: List[KVCache] = []

        key_block_shape = (self.block_size, self.num_kv_heads, self.head_size)

        value_block_shape = (self.block_size, self.num_kv_heads, self.head_size)

        for _ in range(self.num_layers):

            key_blocks = torch.empty(size=(self.num_gpu_blocks, *key_block_shape), dtype=self.dtype, device="npu")

            value_blocks = torch.empty(size=(self.num_gpu_blocks, *value_block_shape), dtype=self.dtype, device="npu")

            gpu_cache.append((key_blocks, value_blocks))

        return gpu_cache

最后,需要将一些原先cuda的引用进行屏蔽或者改写,比如:

torch.cuda.empty_cache()改为torch.npu.empty_cache()

torch.cuda.synchronize()改为torch.npu.synchronize()

atb_llm没有cuda_graph的概念,也暂不支持流水线并行,所以vllm关于cuda_graph和流水线并行的代码需做相应的屏蔽或者隔离。

 
0条评论
0 / 1000
郭逸豪
3文章数
0粉丝数
郭逸豪
3 文章 | 0 粉丝
郭逸豪
3文章数
0粉丝数
郭逸豪
3 文章 | 0 粉丝
原创

vllm框架梳理以及昇腾适配

2024-09-10 09:23:35
484
0

一、vllm对外暴露调用接口如下:

from vllm import AsyncEngineArgs, AsyncLLMEngine, SamplingParams

1)从模型路径里加载模型参数:

args=AsyncEngineArgs(MODEL_PATH) 

2)根据模型参数加载和初始化模型:

model = AsyncLLMEngine.from_engine_args(args) 

3)设置推理采样参数:

sampling_params = SamplingParams(temperature, top_p, top_k, max_tokens, repetition_penalty, stop) 

4)输入请求prompt开始模型推理:

response_generator = model.generate(inputs=text_token_prompt, sampling_params=sampling_params, request_id=reqid )

二、vllm的整体框架如下图:

首先,vllm支持Parallel Sampling、Beam Search,即可能出现"1个prompt -> 多个outputs"的情况, 为了方便对1个prompt下所有的outputs进行集中管理,所以vllm引入SequenceGroup结构,即vllm对每一个输入请求会封装成一个SequenceGroup结构体,如下图:

然后,我们开始解析Scheduler模块,Scheduler负责对所有输入请求进行调度管理,决定当前哪些请求进行等待,哪些请求进行推理响应。

主要维护3个队列self.waiting, self.running, self.swapped(采用python的deque()实例,双端队列,允许从队列两侧添加或删除元素):

waiting队列用于存放所有还未开始做prefill推理的seq_group;

running队列用于存放当前正在做推理的seq_group; 

swapped队列用于存放被抢占的seq_group。

具体调度流程如下:

1、如果当前swapped队列为空,则检查是否能从waiting队列中调度seq_group:

(1)如果设备上所有block数量-该seq_group实际需要的block数量<水位线block数量,则该seq_group不做推理;

(2)如果设备中可用block数量-该seq_group实际需要的block数量<水位数block数量,则该seq_group延迟推理;

(3)如果累计num_batched_tokens大于max阈值,则该seq_group延迟推理;

(4)如果累计num_new_seqs大于max阈值,则该seq_group延迟推理;

(5)否则该请求被调度进行推理,从waiting队列出队,加入到running队列中。

2、如果当前swapped队列为空,且waiting队列中没有可调度的seq_group,则检查running队列中的seq_group:

(1)如果设备上可用block数>seq_group.num_seqs,则该seq_group被调度进行推理;

(2)否则将running队列中最后进队列的seq_group出队,加入到waiting队列或者swapped队列。

3、如果当前swapped队列非空,则检查swapped队列中的seq_group; 

(1)如果设备上可用block数<重新跑该seq_group需要的block数,则seq_group继续留在swapped队列中;

(2)如果累计num_new_seqs大于max阈值,则seq_group继续留在swapped队列中;

(3)否则该请求被调度进行推理,从swapped队列出队,加入到running队列中。

接下来,我们解析BlockManager模块,BlockManager负责对kv_cache进行管理,为每个请求分配缓存block id。

这里使用一个物理块表self.block_tables: Dict[int, List[PhysicalTokenBlock]],记录每个seq用了哪几个cache block,其中PhysicalTokenBlock定义如下:

class PhysicalTokenBlock:

   def __init__(self,  device:Device,  block_number:int,  block_size: int) -> None:

        self.device = device  # gpu/cpu

        self.block_number = block_number  # 该block在对应设备上的全局block index

        self.block_size = block_size  # 该物理块的尺寸(即槽位数量,默认为16)

        self.ref_count = 0  # 该物理块被多少个逻辑块引用

同时根据num_gpu_blocks数量创建gpu_allocator对象:

self.gpu_allocator = BlockAllocator(Device.GPU, block_size, num_gpu_blocks)

其中BlockAllocator定义如下,主要维护self.free_blocks列表:1)如果需要给seq分配1个block,则self.free_blocks.pop(); 2)如果需要回收block,则self.free_blocks.append(block)。

class BlockAllocator:

    def __init__(self,  device:Device,  block_size:int,  num_blocks:int) -> None:

        self.device = device

        self.block_size = block_size

        self.num_blocks = num_blocks

        self.free_blocks: BlockTable = []

        for i in range(num_blocks):

            block = PhysicalTokenBlock(device=device,  block_number=i,  block_size=block_size)

            self.free_blocks.append(block) 

另外每个请求实例中维护着一个属性:逻辑块列表List[LogicalTokenBlock],其中LogicalTokenBlock定义如下:

class LogicalTokenBlock:

    def __init__(self, block_number: int, block_size: int, ) -> None:

        self.block_number = block_number  # 逻辑块的block index

        self.block_size = block_size  # 每个逻辑块中有多少个槽位(默认为16)

        self.token_ids = [_BLANK_TOKEN_ID] * block_size

        self.num_tokens = 0  # 当前逻辑块中已经装下的token的数量

最后,我们来解析Worker模块,一个worker为绑定在1个gpu上的一个进程,主要负责两个工作:模型推理和kv_cache读写。

模型推理本质上类似于pytorch推理,在每个worker上会实例化一个ModelRunner对象,ModelRunner主要的成员函数有:

load_model( ):加载模型

prepare_input_tensors( ):将请求信息转换输入tensor

profile_run( ):模型预热,伪输入评估性能

execute_model( ):模型推理,随机性采样,输出结果

而kv_cache操作使用CacheEngine类对象实现,CacheEngine定义如下,注意这里CacheEngine区别于前面的BlockManager,BlockManager只是在维护物理块的id,而物理块的实际分配是模型在推理过程中根据物理块id来操作的。

class CacheEngine:

    def __init__(self,  cache_config:CacheConfig,  model_config:ModelConfig,  parallel_config:ParallelConfig) -> None:

        self.gpu_cache = self.allocate_gpu_cache()

        self.cpu_cache = self.allocate_cpu_cache()

其中allocate_gpu_cache定义如下:

def allocate_gpu_cache(self) -> List[KVCache]:

        gpu_cache: List[KVCache] = []

        key_block_shape = self.get_key_block_shape()  # (num_heads, head_size//x, block_size, x), 其中x为8

        value_block_shape = self.get_value_block_shape()  # (num_heads, head_size, block_size)

        for _ in range(self.num_layers):

            key_blocks = torch.empty(size=(self.num_gpu_blocks, *key_block_shape), dtype=self.dtype, device="cuda")

            value_blocks = torch.empty(size=(self.num_gpu_blocks, *value_block_shape), dtype=self.dtype, device="cuda")

            gpu_cache.append((key_blocks, value_blocks))

        return gpu_cache

三、昇腾适配

首先,我们需要重写worker中的ModelRunner对象,适配华为昇腾的atb_llm模型库,具体如下:

1、ModelRunner初始化时,对atb_llm.runner中的ModelRunner进行实例化;

from atb_llm.runner import ModelRunner as atb_model

class ModelRunner:

   def __init__():

       self.model = atb_model( )

2、load_model函数调用atb_llm里的模型加载函数:

def load_model():

   self.model.load_weights()

3、prepare_input_tensors函数需要针对atb_llm中的模型输入参数构造对应的输入tensor,具体有:

input_ids:torch.Tensor,输入tokens

position_ids:torch.Tensor,输入位置,即记录每个token的所在位置

is_prefill:bool,是否prefill阶段

kv_cache:List[Tuple[torch.Tensor, torch.Tensor]],申请的kv缓存

block_table:torch.Tensor,当前输入对应的block维度的cache ids

slot:torch.Tensor,当前输入对应的token维度的cache id

input_lengths:torch.Tensor,每个请求输入的长度

max_seq_len: int,最长请求输入的长度

lm_head_indices:Optional[torch.Tensor] = None,每个请求输入的最后一个token所在位置,只在prefill阶段使用

4、execute_model函数调用self.model.forward函数进行模型推理。

其次,我们需要重写kv_cache大小的计算方法:

1、获取显卡本身总的显存

self.max_memory = NpuHbmInfo.get_hbm_capacity(self.local_rank, self.world_size, self.model.soc_info.need_nz)

2、获取初始已占用的显存

self.init_memory = int(
            self.max_memory * NpuHbmInfo.get_hbm_usage(self.local_rank, self.world_size, self.model.soc_info.need_nz))

3、伪输入跑一遍模型后,获取已占用的显存:

self.warm_up_memory = int(
            self.max_memory * NpuHbmInfo.get_hbm_usage(self.local_rank, self.world_size, self.model.soc_info.need_nz))

4、总显存减去预留3gb再减去步骤3已占用的显存,得到的显存作为kv_cache:

free_memory = max_memory - ENV.reserved_memory_gb * (1 << 30) - (
                self.warm_up_memory if self.warm_up_memory != 0 else self.init_memory)

然后,我们还要修改CacheEngine中allocate_gpu_cache函数,具体如下:

def allocate_gpu_cache(self) -> List[KVCache]:

        gpu_cache: List[KVCache] = []

        key_block_shape = (self.block_size, self.num_kv_heads, self.head_size)

        value_block_shape = (self.block_size, self.num_kv_heads, self.head_size)

        for _ in range(self.num_layers):

            key_blocks = torch.empty(size=(self.num_gpu_blocks, *key_block_shape), dtype=self.dtype, device="npu")

            value_blocks = torch.empty(size=(self.num_gpu_blocks, *value_block_shape), dtype=self.dtype, device="npu")

            gpu_cache.append((key_blocks, value_blocks))

        return gpu_cache

最后,需要将一些原先cuda的引用进行屏蔽或者改写,比如:

torch.cuda.empty_cache()改为torch.npu.empty_cache()

torch.cuda.synchronize()改为torch.npu.synchronize()

atb_llm没有cuda_graph的概念,也暂不支持流水线并行,所以vllm关于cuda_graph和流水线并行的代码需做相应的屏蔽或者隔离。

 
文章来自个人专栏
GYH
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0