一、vllm对外暴露调用接口如下:
from vllm import AsyncEngineArgs, AsyncLLMEngine, SamplingParams
1)从模型路径里加载模型参数:
args=AsyncEngineArgs(MODEL_PATH)
2)根据模型参数加载和初始化模型:
model = AsyncLLMEngine.from_engine_args(args)
3)设置推理采样参数:
sampling_params = SamplingParams(temperature, top_p, top_k, max_tokens, repetition_penalty, stop)
4)输入请求prompt开始模型推理:
response_generator = model.generate(inputs=text_token_prompt, sampling_params=sampling_params, request_id=reqid )
二、vllm的整体框架如下图:
首先,vllm支持Parallel Sampling、Beam Search,即可能出现"1个prompt -> 多个outputs"的情况, 为了方便对1个prompt下所有的outputs进行集中管理,所以vllm引入SequenceGroup结构,即vllm对每一个输入请求会封装成一个SequenceGroup结构体,如下图:
然后,我们开始解析Scheduler模块,Scheduler负责对所有输入请求进行调度管理,决定当前哪些请求进行等待,哪些请求进行推理响应。
主要维护3个队列self.waiting, self.running, self.swapped(采用python的deque()实例,双端队列,允许从队列两侧添加或删除元素):
waiting队列用于存放所有还未开始做prefill推理的seq_group;
running队列用于存放当前正在做推理的seq_group;
swapped队列用于存放被抢占的seq_group。
具体调度流程如下:
1、如果当前swapped队列为空,则检查是否能从waiting队列中调度seq_group:
(1)如果设备上所有block数量-该seq_group实际需要的block数量<水位线block数量,则该seq_group不做推理;
(2)如果设备中可用block数量-该seq_group实际需要的block数量<水位数block数量,则该seq_group延迟推理;
(3)如果累计num_batched_tokens大于max阈值,则该seq_group延迟推理;
(4)如果累计num_new_seqs大于max阈值,则该seq_group延迟推理;
(5)否则该请求被调度进行推理,从waiting队列出队,加入到running队列中。
2、如果当前swapped队列为空,且waiting队列中没有可调度的seq_group,则检查running队列中的seq_group:
(1)如果设备上可用block数>seq_group.num_seqs,则该seq_group被调度进行推理;
(2)否则将running队列中最后进队列的seq_group出队,加入到waiting队列或者swapped队列。
3、如果当前swapped队列非空,则检查swapped队列中的seq_group;
(1)如果设备上可用block数<重新跑该seq_group需要的block数,则seq_group继续留在swapped队列中;
(2)如果累计num_new_seqs大于max阈值,则seq_group继续留在swapped队列中;
(3)否则该请求被调度进行推理,从swapped队列出队,加入到running队列中。
接下来,我们解析BlockManager模块,BlockManager负责对kv_cache进行管理,为每个请求分配缓存block id。
这里使用一个物理块表self.block_tables: Dict[int, List[PhysicalTokenBlock]],记录每个seq用了哪几个cache block,其中PhysicalTokenBlock定义如下:
class PhysicalTokenBlock:
def __init__(self, device:Device, block_number:int, block_size: int) -> None:
self.device = device # gpu/cpu
self.block_number = block_number # 该block在对应设备上的全局block index
self.block_size = block_size # 该物理块的尺寸(即槽位数量,默认为16)
self.ref_count = 0 # 该物理块被多少个逻辑块引用
同时根据num_gpu_blocks数量创建gpu_allocator对象:
self.gpu_allocator = BlockAllocator(Device.GPU, block_size, num_gpu_blocks)
其中BlockAllocator定义如下,主要维护self.free_blocks列表:1)如果需要给seq分配1个block,则self.free_blocks.pop(); 2)如果需要回收block,则self.free_blocks.append(block)。
class BlockAllocator:
def __init__(self, device:Device, block_size:int, num_blocks:int) -> None:
self.device = device
self.block_size = block_size
self.num_blocks = num_blocks
self.free_blocks: BlockTable = []
for i in range(num_blocks):
block = PhysicalTokenBlock(device=device, block_number=i, block_size=block_size)
self.free_blocks.append(block)
另外每个请求实例中维护着一个属性:逻辑块列表List[LogicalTokenBlock],其中LogicalTokenBlock定义如下:
class LogicalTokenBlock:
def __init__(self, block_number: int, block_size: int, ) -> None:
self.block_number = block_number # 逻辑块的block index
self.block_size = block_size # 每个逻辑块中有多少个槽位(默认为16)
self.token_ids = [_BLANK_TOKEN_ID] * block_size
self.num_tokens = 0 # 当前逻辑块中已经装下的token的数量
最后,我们来解析Worker模块,一个worker为绑定在1个gpu上的一个进程,主要负责两个工作:模型推理和kv_cache读写。
模型推理本质上类似于pytorch推理,在每个worker上会实例化一个ModelRunner对象,ModelRunner主要的成员函数有:
prepare_input_tensors( ):将请求信息转换输入tensor
profile_run( ):模型预热,伪输入评估性能
而kv_cache操作使用CacheEngine类对象实现,CacheEngine定义如下,注意这里CacheEngine区别于前面的BlockManager,BlockManager只是在维护物理块的id,而物理块的实际分配是模型在推理过程中根据物理块id来操作的。
class CacheEngine:
def __init__(self, cache_config:CacheConfig, model_config:ModelConfig, parallel_config:ParallelConfig) -> None:
self.gpu_cache = self.allocate_gpu_cache()
self.cpu_cache = self.allocate_cpu_cache()
其中allocate_gpu_cache定义如下:
def allocate_gpu_cache(self) -> List[KVCache]:
gpu_cache: List[KVCache] = []
key_block_shape = self.get_key_block_shape() # (num_heads, head_size//x, block_size, x), 其中x为8
value_block_shape = self.get_value_block_shape() # (num_heads, head_size, block_size)
for _ in range(self.num_layers):
key_blocks = torch.empty(size=(self.num_gpu_blocks, *key_block_shape), dtype=self.dtype, device="cuda")
value_blocks = torch.empty(size=(self.num_gpu_blocks, *value_block_shape), dtype=self.dtype, device="cuda")
gpu_cache.append((key_blocks, value_blocks))
return gpu_cache
三、昇腾适配
首先,我们需要重写worker中的ModelRunner对象,适配华为昇腾的atb_llm模型库,具体如下:
1、ModelRunner初始化时,对atb_llm.runner中的ModelRunner进行实例化;
from atb_llm.runner import ModelRunner as atb_model
class ModelRunner:
def __init__():
self.model = atb_model( )
2、load_model函数调用atb_llm里的模型加载函数:
def load_model():
self.model.load_weights()
3、prepare_input_tensors函数需要针对atb_llm中的模型输入参数构造对应的输入tensor,具体有:
input_ids:torch.Tensor,输入tokens
position_ids:torch.Tensor,输入位置,即记录每个token的所在位置
is_prefill:bool,是否prefill阶段
kv_cache:List[Tuple[torch.Tensor, torch.Tensor]],申请的kv缓存
block_table:torch.Tensor,当前输入对应的block维度的cache ids
slot:torch.Tensor,当前输入对应的token维度的cache id
input_lengths:torch.Tensor,每个请求输入的长度
max_seq_len: int,最长请求输入的长度
lm_head_indices:Optional[torch.Tensor] = None,每个请求输入的最后一个token所在位置,只在prefill阶段使用
4、execute_model函数调用self.model.forward函数进行模型推理。
其次,我们需要重写kv_cache大小的计算方法:
1、获取显卡本身总的显存
self.max_memory = NpuHbmInfo.get_hbm_capacity(self.local_rank, self.world_size, self.model.soc_info.need_nz)
2、获取初始已占用的显存
3、伪输入跑一遍模型后,获取已占用的显存:
4、总显存减去预留3gb再减去步骤3已占用的显存,得到的显存作为kv_cache:
然后,我们还要修改CacheEngine中allocate_gpu_cache函数,具体如下:
def allocate_gpu_cache(self) -> List[KVCache]:
gpu_cache: List[KVCache] = []
key_block_shape = (self.block_size, self.num_kv_heads, self.head_size)
value_block_shape = (self.block_size, self.num_kv_heads, self.head_size)
for _ in range(self.num_layers):
key_blocks = torch.empty(size=(self.num_gpu_blocks, *key_block_shape), dtype=self.dtype, device="npu")
value_blocks = torch.empty(size=(self.num_gpu_blocks, *value_block_shape), dtype=self.dtype, device="npu")
gpu_cache.append((key_blocks, value_blocks))
return gpu_cache
最后,需要将一些原先cuda的引用进行屏蔽或者改写,比如:
torch.cuda.empty_cache()改为torch.npu.empty_cache()
torch.cuda.synchronize()改为torch.npu.synchronize()
atb_llm没有cuda_graph的概念,也暂不支持流水线并行,所以vllm关于cuda_graph和流水线并行的代码需做相应的屏蔽或者隔离。