# IO密集型后台任务 - 异步循环
async def io_intensive_background_task():
"""
IO密集型后台任务 - 以固定间隔运行
直接使用异步操作,不需要run_in_executor
"""
logging.info("Starting IO intensive background task")
while True:
try:
logging.info("Running IO intensive task cycle")
# 模拟IO密集型操作,如文件操作或网络请求
start_time = time.time()
# 模拟多个并发IO操作
io_tasks = [simulate_io_operation() for _ in range(5)]
results = await asyncio.gather(*io_tasks)
total_io_time = sum(result['time'] for result in results)
elapsed = time.time() - start_time
logging.info(f"IO task completed: {len(results)} operations, "
f"total IO time: {total_io_time:.2f}s, "
f"wall time: {elapsed:.2f}s")
# 等待一段时间再次执行
await asyncio.sleep(30) # 每30秒执行一次
except asyncio.CancelledError:
logging.info("IO intensive background task cancelled")
break
except Exception as e:
logging.error(f"Error in IO intensive task: {e}")
await asyncio.sleep(5) # 出错后短暂等待再重试
async def simulate_io_operation():
"""模拟单个IO密集型操作,如网络请求或文件读取"""
# 模拟IO延迟,如网络延迟
delay = 0.5 + (1.5 * np.random.random()) # 0.5到2秒的随机延迟
start_time = time.time()
await asyncio.sleep(delay) # 模拟IO等待
# 模拟读取的数据
data_size = int(np.random.random() * 1000) # 0到1000的随机数
elapsed = time.time() - start_time
return {
"time": elapsed,
"data_size": data_size
}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 记录应用启动
logging.info("Application starting up")
# 创建进程池
cpus = get_cpu_limit()
workers = max(1, cpus - 1) # 保留一个CPU给系统和事件循环
app.state.process_pool = ProcessPoolExecutor(max_workers=workers)
logging.info(f"Process pool created with {workers} workers (detected {cpus} CPUs)")
# 启动后台任务
app.state.background_tasks = []
# 添加CPU密集型后台任务
cpu_task = asyncio.create_task(cpu_intensive_background_task(app))
app.state.background_tasks.append(cpu_task)
logging.info("CPU intensive background task started")
# 添加IO密集型后台任务
io_task = asyncio.create_task(io_intensive_background_task())
app.state.background_tasks.append(io_task)
logging.info("IO intensive background task started")
yield
# 清理资源
logging.info("Application shutting down, cleaning up resources")
# 取消所有后台任务
for task in app.state.background_tasks:
task.cancel()
if app.state.background_tasks:
# 等待所有任务正确取消
await asyncio.gather(*app.state.background_tasks, return_exceptions=True)
logging.info("All background tasks cancelled")
# 关闭进程池
app.state.process_pool.shutdown(wait=True)
logging.info("Process pool shut down")
app = FastAPI(lifespan=lifespan)
@app.get("/status")
async def status():
return {"status": "ok", "message": "Server is running with background tasks"}
#架构设计思考