Source code for spinifex.asyncio_wrapper

"""Async utility functions for Spinifex."""

from __future__ import annotations

import asyncio
import sys
from collections.abc import Coroutine
from functools import wraps
from typing import Callable, TypeVar

import nest_asyncio

if sys.version_info < (3, 10):
    from typing_extensions import ParamSpec
else:
    from typing import ParamSpec

[docs] P = ParamSpec("P")
[docs] T = TypeVar("T")
[docs] def sync_wrapper(coro: Callable[P, Coroutine[None, None, T]]) -> Callable[P, T]: # Apply the `nest_asyncio` magic to allow nested event loops # Need for running async functions in a Jupyter notebook nest_asyncio.apply() @wraps(coro) def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: try: # Check if we are in an event loop loop = asyncio.get_running_loop() except RuntimeError: # Nope, we are not in an event loop # Create a new event loop with `asyncio.run` return asyncio.run(coro(*args, **kwargs)) # We are in an event loop # Use the current event loop to run the coroutine # This is useful when we are in a Jupyter notebook return loop.run_until_complete(coro(*args, **kwargs)) return wrapper