# IO密集型后台任务 - 异步循环
async def io_intensive_background_task():
"""
IO密集型后台任务 - 以固定间隔运行
直接使用异步操作,不需要run_in_executor
"""
logging.info("Starting IO intensive background task")
while True:
try:
logging.info("Running IO intensive task cycle")
# 模拟IO密集型操作,如文件操作或网络请求
start_time = time.time()
# 模拟多个并发IO操作
io_tasks = [simulate_io_operation() for _ in range(5)]
results = await asyncio.gather(*io_tasks)
total_io_time = sum(result['time'] for result in results)
elapsed = time.time() - start_time
logging.info(f"IO task completed: {len(results)} operations, "
f"total IO time: {total_io_time:.2f}s, "
f"wall time: {elapsed:.2f}s")
# 等待一段时间再次执行
await asyncio.sleep(30) # 每30秒执行一次
except asyncio.CancelledError:
logging.info("IO intensive background task cancelled")
break
except Exception as e:
logging.error(f"Error in IO intensive task: {e}")
await asyncio.sleep(5) # 出错后短暂等待再重试
async def simulate_io_operation():
"""模拟单个IO密集型操作,如网络请求或文件读取"""
# 模拟IO延迟,如网络延迟
delay = 0.5 + (1.5 * np.random.random()) # 0.5到2秒的随机延迟
start_time = time.time()
await asyncio.sleep(delay) # 模拟IO等待
# 模拟读取的数据
data_size = int(np.random.random() * 1000) # 0到1000的随机数
elapsed = time.time() - start_time
return {
"time": elapsed,
"data_size": data_size
}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 记录应用启动
logging.info("Application starting up")
# 创建进程池
cpus = get_cpu_limit()
workers = max(1, cpus - 1) # 保留一个CPU给系统和事件循环
app.state.process_pool = ProcessPoolExecutor(max_workers=workers)
logging.info(f"Process pool created with {workers} workers (detected {cpus} CPUs)")
# 启动后台任务
app.state.background_tasks = []
# 添加CPU密集型后台任务
cpu_task = asyncio.create_task(cpu_intensive_background_task(app))
app.state.background_tasks.append(cpu_task)
logging.info("CPU intensive background task started")
# 添加IO密集型后台任务
io_task = asyncio.create_task(io_intensive_background_task())
app.state.background_tasks.append(io_task)
logging.info("IO intensive background task started")
yield
# 清理资源
logging.info("Application shutting down, cleaning up resources")
# 取消所有后台任务
for task in app.state.background_tasks:
task.cancel()
if app.state.background_tasks:
# 等待所有任务正确取消
await asyncio.gather(*app.state.background_tasks, return_exceptions=True)
logging.info("All background tasks cancelled")
# 关闭进程池
app.state.process_pool.shutdown(wait=True)
logging.info("Process pool shut down")
app = FastAPI(lifespan=lifespan)
@app.get("/status")
async def status():
return {"status": "ok", "message": "Server is running with background tasks"}
#架构设计思考
我们组有个 python 服务,里面有一些机器学习的功能,例如人脸识别和 ocr 还有音频分析等。这些任务用到的通常都是阻塞性的 api。现在我是通过 twsited 将它们创建成单独的线程或进程。但是我更想结合最近实践到的 fastapi 和 lifespan 来管理。
但是 lifespan 启动后台任务比较适合做一些 io 密集的,如果是这种 cpu 的可以说是典型错误了。那么进一步说,到底如何让 CPU 密集和 IO 密集的代码在 async 中协同工作呢?
答案是使用
这个方法可以将一些任务 offload 到线程或者进程,具体取决于你传入的 executor 是什么。返回值是一个 Future 所以你就可以把它无缝组合到自己的 async 代码中了!
我打算在 lifespan 中使用
#架构设计思考
但是 lifespan 启动后台任务比较适合做一些 io 密集的,如果是这种 cpu 的可以说是典型错误了。那么进一步说,到底如何让 CPU 密集和 IO 密集的代码在 async 中协同工作呢?
答案是使用
asyncio.run_in_executor
.这个方法可以将一些任务 offload 到线程或者进程,具体取决于你传入的 executor 是什么。返回值是一个 Future 所以你就可以把它无缝组合到自己的 async 代码中了!
我打算在 lifespan 中使用
asyncio.create_task
创建一系列 tasks 然后在 tasks 中涉及到 CPU 密集方法时使用 asyncio.run_in_executor
执行,这样可以保证主服务 fastapi 不受到影响。同时,我们可以对 executor 进行 max_worker 限制,通过一些手段感知容器或物理机中的 cpu 数量,这样 CPU 密集的任务可以充分地利用到多核#架构设计思考