接上,补一下代码示例
import os
import time
import asyncio
import logging
import numpy as np
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def get_cpu_limit():
"""获取容器环境中可用的CPU数量"""
try:
# 检查cgroups v1
if os.path.exists("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"):
with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as fp:
cfs_quota_us = int(fp.read().strip())
# 只有当有实际限制时才继续
if cfs_quota_us > 0:
with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as fp:
cfs_period_us = int(fp.read().strip())
container_cpus = cfs_quota_us // cfs_period_us
return container_cpus
# 检查cgroups v2
elif os.path.exists("/sys/fs/cgroup/cpu.max"):
with open("/sys/fs/cgroup/cpu.max") as fp:
content = fp.read().strip().split()
if content[0] != "max": # 有限制
quota = int(content[0])
period = int(content[1])
container_cpus = quota // period
return container_cpus
except Exception as e:
logging.warning(f"Error detecting container CPU limit: {e}")
# 回退到系统CPU计数
return cpu_count()
# CPU密集型函数 - 在单独进程中运行
def cpu_intensive_operation(size=1000):
"""
一个模拟CPU密集型操作的函数,例如矩阵运算
这个函数在一个单独的进程中运行
"""
logging.info(f"Starting CPU intensive operation with size {size}")
# 创建随机矩阵
matrix_a = np.random.rand(size, size)
matrix_b = np.random.rand(size, size)
# 执行耗CPU的矩阵乘法
start_time = time.time()
result = np.matmul(matrix_a, matrix_b)
# 计算一些统计数据
mean_value = np.mean(result)
std_value = np.std(result)
elapsed = time.time() - start_time
logging.info(f"Completed CPU intensive operation in {elapsed:.2f} seconds")
return {
"mean": float(mean_value),
"std": float(std_value),
"elapsed": elapsed
}
# CPU密集型后台任务 - 异步循环
async def cpu_intensive_background_task(app):
"""
CPU密集型后台任务 - 以固定间隔运行
使用run_in_executor将耗CPU的操作卸载到进程池
"""
logging.info("Starting CPU intensive background task")
while True:
try:
logging.info("Running CPU intensive task cycle")
# 获取事件循环
loop = asyncio.get_running_loop()
# 将CPU密集型操作卸载到进程池
result = await loop.run_in_executor(
app.state.process_pool,
cpu_intensive_operation, # 传递CPU密集型函数
500 # 矩阵大小参数
)
logging.info(f"CPU task result: mean={result['mean']:.4f}, time={result['elapsed']:.2f}s")
# 等待一段时间再次执行
await asyncio.sleep(60) # 每分钟执行一次
except asyncio.CancelledError:
logging.info("CPU intensive background task cancelled")
break
except Exception as e:
logging.error(f"Error in CPU intensive task: {e}")
await asyncio.sleep(10) # 出错后短暂等待再重试