Skip to content

Commit a433a29

Browse files
authored
Deprecated JobScaler.concurrency_modifier (#331)
1 parent 01fb5e7 commit a433a29

File tree

5 files changed

+17
-98
lines changed

5 files changed

+17
-98
lines changed

examples/serverless/concurrent_handler.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,6 @@ async def async_generator_handler(job):
1111
return job
1212

1313

14-
# --------------------------- Concurrency Modifier --------------------------- #
15-
def concurrency_modifier(current_concurrency=1):
16-
'''
17-
Concurrency modifier.
18-
'''
19-
desired_concurrency = current_concurrency
20-
21-
# Do some logic to determine the desired concurrency.
22-
23-
return desired_concurrency
24-
25-
2614
runpod.serverless.start({
2715
"handler": async_generator_handler,
28-
"concurrency_modifier": concurrency_modifier
2916
})

runpod/serverless/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ def start(config: Dict[str, Any]):
111111
config (Dict[str, Any]): Configuration parameters for the worker.
112112
113113
config["handler"] (Callable): The handler function to run.
114-
config["concurrency_modifier"] (Callable): Concurrency modifier function to run.
115114
116115
config["rp_args"] (Dict[str, Any]): Arguments for the worker, populated by runtime arguments.
117116
"""

runpod/serverless/modules/rp_scale.py

Lines changed: 16 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
'''
55

66
import asyncio
7-
import typing
8-
7+
import warnings
98
from runpod.serverless.modules.rp_logger import RunPodLogger
109
from .rp_job import get_job
1110
from .worker_state import Jobs
@@ -14,35 +13,20 @@
1413
job_list = Jobs()
1514

1615

17-
def _default_concurrency_modifier(current_concurrency: int) -> int:
18-
"""
19-
Default concurrency modifier.
20-
21-
This function returns the current concurrency without any modification.
22-
23-
Args:
24-
current_concurrency (int): The current concurrency.
25-
26-
Returns:
27-
int: The current concurrency.
28-
"""
29-
return current_concurrency
30-
31-
3216
class JobScaler():
3317
"""
3418
Job Scaler. This class is responsible for scaling the number of concurrent requests.
3519
"""
3620

37-
def __init__(self, concurrency_modifier: typing.Any):
38-
if concurrency_modifier is None:
39-
self.concurrency_modifier = _default_concurrency_modifier
40-
else:
41-
self.concurrency_modifier = concurrency_modifier
42-
43-
self.background_get_job_tasks = set()
44-
self.job_history = []
45-
self.current_concurrency = 1
21+
def __init__(self, concurrency_modifier = None):
22+
if concurrency_modifier:
23+
warnings.warn(
24+
"JobScaler(concurrency_modifier) is deprecated ",
25+
"and will be removed in a future version. "
26+
"Please remove `concurrency_modifier` parameter.",
27+
DeprecationWarning,
28+
stacklevel=2
29+
)
4630
self._is_alive = True
4731

4832
def is_alive(self):
@@ -65,22 +49,12 @@ async def get_jobs(self, session):
6549
List[Any]: A list of job data retrieved from the server.
6650
"""
6751
while self.is_alive():
68-
self.current_concurrency = self.concurrency_modifier(self.current_concurrency)
69-
log.debug(f"Concurrency set to: {self.current_concurrency}")
70-
7152
log.debug(f"Jobs in progress: {job_list.get_job_count()}")
72-
if job_list.get_job_count() < self.current_concurrency and self.is_alive():
73-
log.debug("Job list is less than concurrency, getting more jobs.")
74-
75-
tasks = [
76-
asyncio.create_task(get_job(session, retry=False))
77-
for _ in range(self.current_concurrency if job_list.get_job_list() else 1)
78-
]
7953

80-
for job_future in asyncio.as_completed(tasks):
81-
job = await job_future
82-
self.job_history.append(1 if job else 0)
83-
if job:
84-
yield job
54+
tasks = [
55+
asyncio.create_task(get_job(session, retry=False))
56+
]
8557

86-
await asyncio.sleep(0)
58+
for job_future in asyncio.as_completed(tasks):
59+
if job := await job_future:
60+
yield job

runpod/serverless/worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,7 @@ async def run_worker(config: Dict[str, Any]) -> None:
8787
client_session = AsyncClientSession()
8888

8989
async with client_session as session:
90-
job_scaler = rp_scale.JobScaler(
91-
concurrency_modifier=config.get('concurrency_modifier', None)
92-
)
90+
job_scaler = rp_scale.JobScaler()
9391

9492
while job_scaler.is_alive():
9593

tests/test_serverless/test_worker.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -316,45 +316,6 @@ async def test_run_worker_generator_aggregate_handler(
316316
_, args, _ = mock_send_result.mock_calls[0]
317317
assert args[1] == {'output': ['test1', 'test2'], 'stopPod': True}
318318

319-
@patch("runpod.serverless.worker.AsyncClientSession")
320-
@patch("runpod.serverless.modules.rp_scale.get_job")
321-
@patch("runpod.serverless.worker.run_job")
322-
@patch("runpod.serverless.worker.stream_result")
323-
@patch("runpod.serverless.worker.send_result")
324-
# pylint: disable=too-many-arguments
325-
async def test_run_worker_concurrency(
326-
self, mock_send_result, mock_stream_result, mock_run_job, mock_get_job, mock_session):
327-
'''
328-
Test run_worker with synchronous handler.
329-
330-
Args:
331-
mock_send_result (_type_): _description_
332-
mock_stream_result (_type_): _description_
333-
mock_run_job (_type_): _description_
334-
mock_get_job (_type_): _description_
335-
mock_session (_type_): _description_
336-
'''
337-
# Define the mock behaviors
338-
mock_get_job.return_value = {"id": "123", "input": {"number": 1}}
339-
mock_run_job.return_value = {"output": {"result": "odd"}}
340-
341-
def concurrency_modifier(current_concurrency):
342-
return current_concurrency + 1
343-
344-
config_with_concurrency = self.config.copy()
345-
config_with_concurrency['concurrency_modifier'] = concurrency_modifier
346-
347-
# Call the function
348-
runpod.serverless.start(config_with_concurrency)
349-
350-
# Make assertions about the behaviors
351-
mock_get_job.assert_called_once()
352-
mock_run_job.assert_called_once()
353-
mock_send_result.assert_called_once()
354-
355-
assert mock_stream_result.called is False
356-
assert mock_session.called
357-
358319
@patch("runpod.serverless.worker.AsyncClientSession")
359320
@patch("runpod.serverless.modules.rp_scale.get_job")
360321
@patch("runpod.serverless.worker.run_job")

0 commit comments

Comments
 (0)