A context manager that acquires and releases concurrency slots from the
given concurrency limits.
Parameters:
Name
Type
Description
Default
names
Union[str, List[str]]
The names of the concurrency limits to acquire slots from.
required
occupy
int
The number of slots to acquire and hold from each limit.
1
timeout_seconds
Optional[float]
The number of seconds to wait for the slots to be acquired before
raising a TimeoutError. A timeout of None will wait indefinitely.
None
Raises:
Type
Description
TimeoutError
If the slots are not acquired within the given timeout.
A simple example of using the sync concurrency context manager:
fromprefect.concurrency.syncimportconcurrencydefresource_heavy():withconcurrency("test",occupy=1):print("Resource heavy task")defmain():resource_heavy()
@contextmanagerdefconcurrency(names:Union[str,List[str]],occupy:int=1,timeout_seconds:Optional[float]=None,):"""A context manager that acquires and releases concurrency slots from the given concurrency limits. Args: names: The names of the concurrency limits to acquire slots from. occupy: The number of slots to acquire and hold from each limit. timeout_seconds: The number of seconds to wait for the slots to be acquired before raising a `TimeoutError`. A timeout of `None` will wait indefinitely. Raises: TimeoutError: If the slots are not acquired within the given timeout. Example: A simple example of using the sync `concurrency` context manager: ```python from prefect.concurrency.sync import concurrency def resource_heavy(): with concurrency("test", occupy=1): print("Resource heavy task") def main(): resource_heavy() ``` """names=namesifisinstance(names,list)else[names]withtimeout(seconds=timeout_seconds):limits:List[MinimalConcurrencyLimitResponse]=_call_async_function_from_sync(_acquire_concurrency_slots,names,occupy)acquisition_time=pendulum.now("UTC")emitted_events=_emit_concurrency_acquisition_events(limits,occupy)try:yieldfinally:occupancy_period=cast(Interval,pendulum.now("UTC")-acquisition_time)_call_async_function_from_sync(_release_concurrency_slots,names,occupy,occupancy_period.total_seconds(),)_emit_concurrency_release_events(limits,occupy,emitted_events)
Block execution until an occupy number of slots of the concurrency
limits given in names are acquired. Requires that all given concurrency
limits have a slot decay.
Parameters:
Name
Type
Description
Default
names
Union[str, List[str]]
The names of the concurrency limits to acquire slots from.
required
occupy
int
The number of slots to acquire and hold from each limit.
1
Source code in src/prefect/concurrency/sync.py
8081828384858687888990919293
defrate_limit(names:Union[str,List[str]],occupy:int=1):"""Block execution until an `occupy` number of slots of the concurrency limits given in `names` are acquired. Requires that all given concurrency limits have a slot decay. Args: names: The names of the concurrency limits to acquire slots from. occupy: The number of slots to acquire and hold from each limit. """names=namesifisinstance(names,list)else[names]limits=_call_async_function_from_sync(_acquire_concurrency_slots,names,occupy,mode="rate_limit")_emit_concurrency_acquisition_events(limits,occupy)