Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 42 additions & 43 deletions lm_eval/models/sglang_causallms.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def _collate_gen(_requests):
# we group requests by their generation_kwargs,
# so that we don't try to execute e.g. greedy sampling and temp=0.8 sampling
# in the same batch.
re_ords = Collator(requests, _collate_gen, group_by="gen_kwargs")
re_ords = Collator(requests, _collate_gen, group_by=None)
chunks = re_ords.get_batched(
n=int(self.batch_size) if self.batch_size != "auto" else 0, batch_fn=None
)
Expand All @@ -232,36 +232,41 @@ def _collate_gen(_requests):
context_and_encoding, all_gen_kwargs = zip(*chunk)
context, context_encoding = zip(*context_and_encoding)

# we assume all gen kwargs in the batch are the same
# this is safe to assume because the `grouper` object ensures it.
gen_kwargs = all_gen_kwargs[0]
# unpack our keyword arguments.
if isinstance(gen_kwargs, dict):
kwargs = copy.deepcopy(gen_kwargs) # edge case for repeats > 1
# add EOS token to stop sequences
until = handle_stop_sequences(kwargs.pop("until", None), eos=eos)
else:
raise ValueError(
f"Expected `kwargs` to be of type `dict` but got {type(gen_kwargs)}"
context_encoding_truncated = []
sampling_params = []
for x, gen_kwargs in zip(context_encoding, all_gen_kwargs):
# unpack our keyword arguments.
if isinstance(gen_kwargs, dict):
kwargs = copy.deepcopy(gen_kwargs) # edge case for repeats > 1
# add EOS token to stop sequences
until = handle_stop_sequences(kwargs.pop("until", None), eos=eos)
else:
raise ValueError(
f"Expected `kwargs` to be of type `dict` but got {type(gen_kwargs)}"
)
if "max_gen_toks" in kwargs.keys():
max_gen_toks = kwargs.pop("max_gen_toks")
else:
max_gen_toks = self.max_gen_toks

# set the max length in tokens of inputs ("context_enc")
# max len for inputs = max length, minus room to generate the max new tokens
max_ctx_len = self.max_length - max_gen_toks
if len(x) > max_ctx_len:
context_encoding_truncated.append(x[-max_ctx_len:])
else:
context_encoding_truncated.append(x)
# create sampling params
kwargs = self.modify_gen_kwargs(kwargs)
sampling_params.append(
kwargs | {"max_tokens": max_gen_toks, "stop": until}
)
if "max_gen_toks" in kwargs.keys():
max_gen_toks = kwargs.pop("max_gen_toks")
else:
max_gen_toks = self.max_gen_toks

# set the max length in tokens of inputs ("context_enc")
# max len for inputs = max length, minus room to generate the max new tokens
max_ctx_len = self.max_length - max_gen_toks
context_encoding = [x[-max_ctx_len:] for x in context_encoding]

# perform batched generation
# cont is a list of dic. See here https://github.com/sgl-project/sglang/blob/0a6f18f068e4095fc228e798454e8496c9749214/python/sglang/srt/entrypoints/engine.py#L111 .
cont = self._model_generate(
requests=context_encoding,
requests=context_encoding_truncated,
generate=True,
max_tokens=max_gen_toks,
stop=until,
**kwargs,
sampling_params=sampling_params,
)

# cache generations
Expand All @@ -284,28 +289,22 @@ def _model_generate(
self,
requests: List[List[int]] = None,
generate: bool = False,
max_tokens: int = None,
stop: Optional[List[str]] = None,
sampling_params: Union[List[Dict], Dict, None] = None,
return_logprob: bool = False,
top_logprobs_num: int = 1,
logprob_start_len: int = -1,
**kwargs,
):
# check sglang sampling parameters: https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/sampling/sampling_params.py#L21 and https://docs.sglang.ai/references/sampling_params.html.
if generate:
kwargs = self.modify_gen_kwargs(kwargs)
sampling_params = {
"max_new_tokens": max_tokens,
"stop": stop,
}
sampling_params.update(kwargs)
else:
sampling_params = {
"temperature": 0,
"max_new_tokens": 1,
}
sampling_params.update(kwargs)

if not generate:
sampling_params = sampling_params if sampling_params else {}
sampling_params.update(
{
"temperature": 0,
"max_new_tokens": 1,
}
)
if not isinstance(sampling_params, List):
sampling_params = [sampling_params] * len(requests)
# Refer to: https://docs.sglang.ai/backend/offline_engine_api.html
outputs = self.model.generate(
input_ids=requests,
Expand Down
100 changes: 53 additions & 47 deletions lm_eval/models/vllm_causallms.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

def _vllm_mp_worker(
model_args: dict,
sampling_params: "SamplingParams",
sampling_params: "list[SamplingParams]",
requests: list[list[int]],
lora_request: "LoRARequest",
result_queue: "Queue",
Expand Down Expand Up @@ -364,25 +364,22 @@ def _model_generate(
self,
requests: List[List[int]] = None,
generate: bool = False,
max_tokens: int = None,
stop: Optional[List[str]] = None,
**kwargs,
sampling_params: Union[List[SamplingParams], SamplingParams, None] = None,
):
if generate:
kwargs = self.modify_gen_kwargs(kwargs)
sampling_params = SamplingParams(max_tokens=max_tokens, stop=stop, **kwargs)
else:
if not generate or sampling_params is None:
sampling_params = SamplingParams(
temperature=0, prompt_logprobs=1, max_tokens=1, detokenize=False
)
if not isinstance(sampling_params, List):
sampling_params = [sampling_params] * len(requests)
if self.data_parallel_size > 1 and not self.V1:
# vLLM hangs if resources are set in ray.remote
# also seems to only work with decorator and not with ray.remote() fn
# see https://github.com/vllm-project/vllm/issues/973
@ray.remote
def run_inference_one_model(
model_args: dict,
sampling_params: SamplingParams,
sampling_params: List[SamplingParams],
requests: List[List[int]],
lora_request: LoRARequest,
):
Expand All @@ -396,9 +393,12 @@ def run_inference_one_model(
# dispatch requests to all self.data_parallel_size workers, in interleaved fashion
# interleaved important to balance context lengths across workers
requests = [list(x) for x in distribute(self.data_parallel_size, requests)]
sampling_params = [
list(sp) for sp in distribute(self.data_parallel_size, sampling_params)
]
inputs = (
(self.model_args, sampling_params, req, self.lora_request)
for req in requests
(self.model_args, sp, req, self.lora_request)
for req, sp in zip(requests, sampling_params)
)
object_refs = [run_inference_one_model.remote(*x) for x in inputs]
results = ray.get(object_refs)
Expand All @@ -413,16 +413,18 @@ def run_inference_one_model(
dp_master_port = os.environ.get("VLLM_DP_MASTER_PORT") or get_open_port()

requests = (list(x) for x in distribute(self.data_parallel_size, requests))

sampling_params = (
list(sp) for sp in distribute(self.data_parallel_size, sampling_params)
)
procs, resq = [], Queue()
# We use Process as it is non-daemonic
try:
for rank, req in enumerate(requests):
for rank, (sp, req) in enumerate(zip(requests, sampling_params)):
proc = Process(
target=_vllm_mp_worker,
args=(
self.model_args.copy(),
sampling_params,
sp,
req,
self.lora_request,
resq,
Expand Down Expand Up @@ -576,10 +578,11 @@ def _collate_gen(_requests):
# - any OOMs will happen right away rather than near the end
return -len(_requests[0][1]), _requests[0][0]

# we group requests by their generation_kwargs,
# so that we don't try to execute e.g. greedy sampling and temp=0.8 sampling
# in the same batch.
re_ords = Collator(requests, _collate_gen, group_by="gen_kwargs")
re_ords = Collator(
requests,
_collate_gen,
group_by=None,
)
chunks = re_ords.get_batched(
n=int(self.batch_size) if self.batch_size != "auto" else 0, batch_fn=None
)
Expand All @@ -594,41 +597,44 @@ def _collate_gen(_requests):
for chunk in chunks:
context_and_encoding, all_gen_kwargs = zip(*chunk)
context, context_encoding = zip(*context_and_encoding)
# we assume all gen kwargs in the batch are the same
# this is safe to assume because the `grouper` object ensures it.
gen_kwargs = all_gen_kwargs[0]
# unpack our keyword arguments.
if isinstance(gen_kwargs, dict):
kwargs = copy.deepcopy(gen_kwargs) # edge case for repeats > 1
# add EOS token to stop sequences
until = handle_stop_sequences(kwargs.pop("until", None), eos=eos)
else:
raise ValueError(
f"Expected `kwargs` to be of type `dict` but got {type(gen_kwargs)}"
)
if "max_gen_toks" in kwargs.keys():
max_gen_toks = kwargs.pop("max_gen_toks")
else:
max_gen_toks = self.max_gen_toks

# set the max length in tokens of inputs ("context_enc")
# max len for inputs = max length, minus room to generate the max new tokens
max_ctx_len = self.max_length - max_gen_toks
all_lengths = [len(x) for x in context_encoding]
for length in all_lengths:
if length > max_ctx_len:
context_encoding_truncated = []
sampling_params = []
for x, gen_kwargs in zip(context_encoding, all_gen_kwargs):
# unpack our keyword arguments.
if isinstance(gen_kwargs, dict):
kwargs = copy.deepcopy(gen_kwargs) # edge case for repeats > 1
# add EOS token to stop sequences
until = handle_stop_sequences(kwargs.pop("until", None), eos=eos)
else:
raise ValueError(
f"Expected `kwargs` to be of type `dict` but got {type(gen_kwargs)}"
)
if "max_gen_toks" in kwargs.keys():
max_gen_toks = kwargs.pop("max_gen_toks")
else:
max_gen_toks = self.max_gen_toks

# set the max length in tokens of inputs ("context_enc")
# max len for inputs = max length, minus room to generate the max new tokens
max_ctx_len = self.max_length - max_gen_toks
if len(x) > max_ctx_len:
eval_logger.warning(
f"Context length {length} exceeds max length (context + max gen tokens): {max_ctx_len}. Truncating context."
f"Context length {len(x)} exceeds max length (context + max gen tokens): {max_ctx_len}. Truncating context."
)
context_encoding = [x[-max_ctx_len:] for x in context_encoding]
context_encoding_truncated.append(x[-max_ctx_len:])
else:
context_encoding_truncated.append(x)
# create sampling params
kwargs = self.modify_gen_kwargs(kwargs)
sampling_params.append(
SamplingParams(max_tokens=max_gen_toks, stop=until, **kwargs)
)

# perform batched generation
cont = self._model_generate(
requests=context_encoding,
requests=context_encoding_truncated,
generate=True,
max_tokens=max_gen_toks,
stop=until,
**kwargs,
sampling_params=sampling_params,
)

# cache generations
Expand Down
Loading