# IO密集型后台任务 - 异步循环
async def io_intensive_background_task():
    """
    IO密集型后台任务 - 以固定间隔运行
    直接使用异步操作,不需要run_in_executor
    """
    logging.info("Starting IO intensive background task")
    while True:
        try:
            logging.info("Running IO intensive task cycle")
            
            # 模拟IO密集型操作,如文件操作或网络请求
            start_time = time.time()
            
            # 模拟多个并发IO操作
            io_tasks = [simulate_io_operation() for _ in range(5)]
            results = await asyncio.gather(*io_tasks)
            
            total_io_time = sum(result['time'] for result in results)
            elapsed = time.time() - start_time
            
            logging.info(f"IO task completed: {len(results)} operations, "
                         f"total IO time: {total_io_time:.2f}s, "
                         f"wall time: {elapsed:.2f}s")
            
            # 等待一段时间再次执行
            await asyncio.sleep(30)  # 每30秒执行一次
            
        except asyncio.CancelledError:
            logging.info("IO intensive background task cancelled")
            break
        except Exception as e:
            logging.error(f"Error in IO intensive task: {e}")
            await asyncio.sleep(5)  # 出错后短暂等待再重试

async def simulate_io_operation():
    """模拟单个IO密集型操作,如网络请求或文件读取"""
    # 模拟IO延迟,如网络延迟
    delay = 0.5 + (1.5 * np.random.random())  # 0.5到2秒的随机延迟
    start_time = time.time()
    
    await asyncio.sleep(delay)  # 模拟IO等待
    
    # 模拟读取的数据
    data_size = int(np.random.random() * 1000)  # 0到1000的随机数
    
    elapsed = time.time() - start_time
    return {
        "time": elapsed,
        "data_size": data_size
    }

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 记录应用启动
    logging.info("Application starting up")
    
    # 创建进程池
    cpus = get_cpu_limit()
    workers = max(1, cpus - 1)  # 保留一个CPU给系统和事件循环
    app.state.process_pool = ProcessPoolExecutor(max_workers=workers)
    logging.info(f"Process pool created with {workers} workers (detected {cpus} CPUs)")
    
    # 启动后台任务
    app.state.background_tasks = []
    
    # 添加CPU密集型后台任务
    cpu_task = asyncio.create_task(cpu_intensive_background_task(app))
    app.state.background_tasks.append(cpu_task)
    logging.info("CPU intensive background task started")
    
    # 添加IO密集型后台任务
    io_task = asyncio.create_task(io_intensive_background_task())
    app.state.background_tasks.append(io_task)
    logging.info("IO intensive background task started")
    
    yield
    
    # 清理资源
    logging.info("Application shutting down, cleaning up resources")
    
    # 取消所有后台任务
    for task in app.state.background_tasks:
        task.cancel()
    
    if app.state.background_tasks:
        # 等待所有任务正确取消
        await asyncio.gather(*app.state.background_tasks, return_exceptions=True)
        logging.info("All background tasks cancelled")
    
    # 关闭进程池
    app.state.process_pool.shutdown(wait=True)
    logging.info("Process pool shut down")

app = FastAPI(lifespan=lifespan)

@app.get("/status")
async def status():
    return {"status": "ok", "message": "Server is running with background tasks"}

#架构设计思考
 
 
Back to Top