vcorelib.asyncio
index
/home/vkottler/src/vkottler/workspace/vcorelib/vcorelib/asyncio/__init__.py

A module for working with asyncio.

 
Package Contents
       
cli
subprocess

 
Functions
       
all_stop_signals() -> '_Set[int]'
Get a set of all stop signals on this platform.
event_setter(stop_sig: '_asyncio.Event', eloop: '_asyncio.AbstractEventLoop' = None, logger: '_LoggerType' = None) -> 'SignalHandler'
Create a function that sets an event.
log_exceptions(tasks: '_Iterable[_asyncio.Task[T]]', logger: '_LoggerType' = None) -> '_List[_asyncio.Task[T]]'
Log task exception and return the list of tasks that aren't complete.
log_task_exception(task: '_asyncio.Task[_Any]', logger: '_LoggerType' = None) -> 'None'
If a task is done and raised an exception, log it.
new_eloop(set_current: 'bool' = True) -> '_asyncio.AbstractEventLoop'
Get a new event loop.
normalize_eloop(eloop: '_asyncio.AbstractEventLoop' = None) -> '_asyncio.AbstractEventLoop'
Get the active event loop if one isn't provided explicitly.
run_handle_interrupt(to_run: '_Awaitable[T]', eloop: '_asyncio.AbstractEventLoop' = None, enable_uvloop: 'bool' = True) -> '_Optional[T]'
Run a task in an event loop and gracefully handle keyboard interrupts.
 
Return the result of the awaitable or None if execution was interrupted.
run_handle_stop(stop_sig: '_asyncio.Event', task: '_Coroutine[None, None, T]', eloop: '_asyncio.AbstractEventLoop' = None, signals: '_Iterable[int]' = None, enable_uvloop: 'bool' = True) -> 'T'
Publish the stop signal on keyboard interrupt and wait for the task to
complete.
shutdown_loop(eloop: '_asyncio.AbstractEventLoop' = None, logger: '_LoggerType' = None) -> 'None'
Attempt to shut down an event loop.
try_uvloop_runner(debug: 'bool' = None, eloop: '_asyncio.AbstractEventLoop' = None, enable: 'bool' = True) -> 'Iterator[_asyncio.AbstractEventLoop]'
Try to set up an asyncio runner using uvloop.

 
Data
        Iterator = typing.Iterator
LOG = <Logger vcorelib.asyncio (WARNING)>
SignalHandler = typing.Callable[[int, typing.Optional[frame]], NoneType]
T = ~T