接上,补一下代码示例
import os
import time
import asyncio
import logging
import numpy as np
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

def get_cpu_limit():
    """获取容器环境中可用的CPU数量"""
    try:
        # 检查cgroups v1
        if os.path.exists("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"):
            with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as fp:
                cfs_quota_us = int(fp.read().strip())
            # 只有当有实际限制时才继续
            if cfs_quota_us > 0:
                with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as fp:
                    cfs_period_us = int(fp.read().strip())
                container_cpus = cfs_quota_us // cfs_period_us
                return container_cpus
        
        # 检查cgroups v2
        elif os.path.exists("/sys/fs/cgroup/cpu.max"):
            with open("/sys/fs/cgroup/cpu.max") as fp:
                content = fp.read().strip().split()
                if content[0] != "max":  # 有限制
                    quota = int(content[0])
                    period = int(content[1])
                    container_cpus = quota // period
                    return container_cpus
    
    except Exception as e:
        logging.warning(f"Error detecting container CPU limit: {e}")
    
    # 回退到系统CPU计数
    return cpu_count()

# CPU密集型函数 - 在单独进程中运行
def cpu_intensive_operation(size=1000):
    """
    一个模拟CPU密集型操作的函数,例如矩阵运算
    这个函数在一个单独的进程中运行
    """
    logging.info(f"Starting CPU intensive operation with size {size}")
    # 创建随机矩阵
    matrix_a = np.random.rand(size, size)
    matrix_b = np.random.rand(size, size)
    
    # 执行耗CPU的矩阵乘法
    start_time = time.time()
    result = np.matmul(matrix_a, matrix_b)
    
    # 计算一些统计数据
    mean_value = np.mean(result)
    std_value = np.std(result)
    
    elapsed = time.time() - start_time
    logging.info(f"Completed CPU intensive operation in {elapsed:.2f} seconds")
    
    return {
        "mean": float(mean_value),
        "std": float(std_value),
        "elapsed": elapsed
    }

# CPU密集型后台任务 - 异步循环
async def cpu_intensive_background_task(app):
    """
    CPU密集型后台任务 - 以固定间隔运行
    使用run_in_executor将耗CPU的操作卸载到进程池
    """
    logging.info("Starting CPU intensive background task")
    while True:
        try:
            logging.info("Running CPU intensive task cycle")
            
            # 获取事件循环
            loop = asyncio.get_running_loop()
            
            # 将CPU密集型操作卸载到进程池
            result = await loop.run_in_executor(
                app.state.process_pool, 
                cpu_intensive_operation,  # 传递CPU密集型函数
                500  # 矩阵大小参数
            )
            
            logging.info(f"CPU task result: mean={result['mean']:.4f}, time={result['elapsed']:.2f}s")
            
            # 等待一段时间再次执行
            await asyncio.sleep(60)  # 每分钟执行一次
            
        except asyncio.CancelledError:
            logging.info("CPU intensive background task cancelled")
            break
        except Exception as e:
            logging.error(f"Error in CPU intensive task: {e}")
            await asyncio.sleep(10)  # 出错后短暂等待再重试
 
 
Back to Top