python同步代码执行多个异步方法代码

作者: adm 分类: python 发布时间: 2025-02-25

在 同步代码(如普通 def 函数)中调用多个异步方法,你需要 协调异步代码的执行,通常有 4 种方法:

方法 1:asyncio.run()(适用于 Python 3.7+,推荐)

适用场景:同步代码需要运行多个异步函数(顺序执行或并行)。
限制:不能嵌套调用 asyncio.run()(每个线程只能调用一次)。

示例(顺序执行)

import asyncio

async def async_task1():
    print("Task 1 start")
    await asyncio.sleep(1)
    print("Task 1 end")

async def async_task2():
    print("Task 2 start")
    await asyncio.sleep(2)
    print("Task 2 end")

async def main():
    await async_task1()  # 先执行 task1
    await async_task2()  # 再执行 task2

def sync_function():
    asyncio.run(main())  # 同步代码调用异步函数

if __name__ == "__main__":
    sync_function()

示例(并行执行,推荐 asyncio.gather)

async def main():
    await asyncio.gather(
        async_task1(),  # 并行执行
        async_task2()
    )

def sync_function():
    asyncio.run(main())

方法 2:loop.run_until_complete()(适用于 Python 3.5+)

适用场景:需要更精细控制事件循环(如多次运行异步任务)。
限制:需手动管理事件循环,不能与 asyncio.run() 混用。

def sync_function():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_task1())  # 运行第一个任务
    loop.run_until_complete(async_task2())  # 运行第二个任务
    loop.close()  # 手动关闭

方法 3:asyncio.create_task() + await(适用于嵌套异步调用)

适用场景:在异步函数里调用多个异步任务,但外层是同步代码。

async def async_main():
    task1 = asyncio.create_task(async_task1())  # 创建任务1(不阻塞)
    task2 = asyncio.create_task(async_task2())  # 创建任务2(不阻塞)
    await task1  # 等待 task1 完成
    await task2  # 等待 task2 完成

def sync_function():
    asyncio.run(async_main())  # 同步代码调用

方法 4:anyio.run()(跨兼容方案,支持 trio 和 asyncio)

适用场景:需要兼容 asyncio 和 trio 异步运行时。

import anyio

async def async_task1():
    print("Task 1 start")
    await anyio.sleep(1)
    print("Task 1 end")

async def async_task2():
    print("Task 2 start")
    await anyio.sleep(2)
    print("Task 2 end")

def sync_function():
    anyio.run(asyncio.gather(async_task1(), async_task2()))

if __name__ == "__main__":
    sync_function()

总结

方法	适用场景	是否推荐
asyncio.run(main())	简单场景,顺序/并行执行	⭐⭐⭐⭐⭐
asyncio.gather()	并行执行多个任务	⭐⭐⭐⭐
loop.run_until_complete()	需要手动管理事件循环	⭐⭐
asyncio.create_task()	动态管理任务	⭐⭐⭐
anyio.run()	兼容 trio 和 asyncio	⭐⭐⭐

最佳实践:

推荐 asyncio.run(main()) + await 或 asyncio.gather(),代码最简洁。
避免多次调用 asyncio.run(),容易导致 RuntimeError。
如果任务可以并行,优先用 asyncio.gather() 提高效率。

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!