# IO密集型后台任务 - 异步循环
async def io_intensive_background_task():
    """
    IO密集型后台任务 - 以固定间隔运行
    直接使用异步操作,不需要run_in_executor
    """
    logging.info("Starting IO intensive background task")
    while True:
        try:
            logging.info("Running IO intensive task cycle")
            
            # 模拟IO密集型操作,如文件操作或网络请求
            start_time = time.time()
            
            # 模拟多个并发IO操作
            io_tasks = [simulate_io_operation() for _ in range(5)]
            results = await asyncio.gather(*io_tasks)
            
            total_io_time = sum(result['time'] for result in results)
            elapsed = time.time() - start_time
            
            logging.info(f"IO task completed: {len(results)} operations, "
                         f"total IO time: {total_io_time:.2f}s, "
                         f"wall time: {elapsed:.2f}s")
            
            # 等待一段时间再次执行
            await asyncio.sleep(30)  # 每30秒执行一次
            
        except asyncio.CancelledError:
            logging.info("IO intensive background task cancelled")
            break
        except Exception as e:
            logging.error(f"Error in IO intensive task: {e}")
            await asyncio.sleep(5)  # 出错后短暂等待再重试

async def simulate_io_operation():
    """模拟单个IO密集型操作,如网络请求或文件读取"""
    # 模拟IO延迟,如网络延迟
    delay = 0.5 + (1.5 * np.random.random())  # 0.5到2秒的随机延迟
    start_time = time.time()
    
    await asyncio.sleep(delay)  # 模拟IO等待
    
    # 模拟读取的数据
    data_size = int(np.random.random() * 1000)  # 0到1000的随机数
    
    elapsed = time.time() - start_time
    return {
        "time": elapsed,
        "data_size": data_size
    }

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 记录应用启动
    logging.info("Application starting up")
    
    # 创建进程池
    cpus = get_cpu_limit()
    workers = max(1, cpus - 1)  # 保留一个CPU给系统和事件循环
    app.state.process_pool = ProcessPoolExecutor(max_workers=workers)
    logging.info(f"Process pool created with {workers} workers (detected {cpus} CPUs)")
    
    # 启动后台任务
    app.state.background_tasks = []
    
    # 添加CPU密集型后台任务
    cpu_task = asyncio.create_task(cpu_intensive_background_task(app))
    app.state.background_tasks.append(cpu_task)
    logging.info("CPU intensive background task started")
    
    # 添加IO密集型后台任务
    io_task = asyncio.create_task(io_intensive_background_task())
    app.state.background_tasks.append(io_task)
    logging.info("IO intensive background task started")
    
    yield
    
    # 清理资源
    logging.info("Application shutting down, cleaning up resources")
    
    # 取消所有后台任务
    for task in app.state.background_tasks:
        task.cancel()
    
    if app.state.background_tasks:
        # 等待所有任务正确取消
        await asyncio.gather(*app.state.background_tasks, return_exceptions=True)
        logging.info("All background tasks cancelled")
    
    # 关闭进程池
    app.state.process_pool.shutdown(wait=True)
    logging.info("Process pool shut down")

app = FastAPI(lifespan=lifespan)

@app.get("/status")
async def status():
    return {"status": "ok", "message": "Server is running with background tasks"}

#架构设计思考
接上,补一下代码示例
import os
import time
import asyncio
import logging
import numpy as np
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

def get_cpu_limit():
    """获取容器环境中可用的CPU数量"""
    try:
        # 检查cgroups v1
        if os.path.exists("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"):
            with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as fp:
                cfs_quota_us = int(fp.read().strip())
            # 只有当有实际限制时才继续
            if cfs_quota_us > 0:
                with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as fp:
                    cfs_period_us = int(fp.read().strip())
                container_cpus = cfs_quota_us // cfs_period_us
                return container_cpus
        
        # 检查cgroups v2
        elif os.path.exists("/sys/fs/cgroup/cpu.max"):
            with open("/sys/fs/cgroup/cpu.max") as fp:
                content = fp.read().strip().split()
                if content[0] != "max":  # 有限制
                    quota = int(content[0])
                    period = int(content[1])
                    container_cpus = quota // period
                    return container_cpus
    
    except Exception as e:
        logging.warning(f"Error detecting container CPU limit: {e}")
    
    # 回退到系统CPU计数
    return cpu_count()

# CPU密集型函数 - 在单独进程中运行
def cpu_intensive_operation(size=1000):
    """
    一个模拟CPU密集型操作的函数,例如矩阵运算
    这个函数在一个单独的进程中运行
    """
    logging.info(f"Starting CPU intensive operation with size {size}")
    # 创建随机矩阵
    matrix_a = np.random.rand(size, size)
    matrix_b = np.random.rand(size, size)
    
    # 执行耗CPU的矩阵乘法
    start_time = time.time()
    result = np.matmul(matrix_a, matrix_b)
    
    # 计算一些统计数据
    mean_value = np.mean(result)
    std_value = np.std(result)
    
    elapsed = time.time() - start_time
    logging.info(f"Completed CPU intensive operation in {elapsed:.2f} seconds")
    
    return {
        "mean": float(mean_value),
        "std": float(std_value),
        "elapsed": elapsed
    }

# CPU密集型后台任务 - 异步循环
async def cpu_intensive_background_task(app):
    """
    CPU密集型后台任务 - 以固定间隔运行
    使用run_in_executor将耗CPU的操作卸载到进程池
    """
    logging.info("Starting CPU intensive background task")
    while True:
        try:
            logging.info("Running CPU intensive task cycle")
            
            # 获取事件循环
            loop = asyncio.get_running_loop()
            
            # 将CPU密集型操作卸载到进程池
            result = await loop.run_in_executor(
                app.state.process_pool, 
                cpu_intensive_operation,  # 传递CPU密集型函数
                500  # 矩阵大小参数
            )
            
            logging.info(f"CPU task result: mean={result['mean']:.4f}, time={result['elapsed']:.2f}s")
            
            # 等待一段时间再次执行
            await asyncio.sleep(60)  # 每分钟执行一次
            
        except asyncio.CancelledError:
            logging.info("CPU intensive background task cancelled")
            break
        except Exception as e:
            logging.error(f"Error in CPU intensive task: {e}")
            await asyncio.sleep(10)  # 出错后短暂等待再重试
我们组有个 python 服务,里面有一些机器学习的功能,例如人脸识别和 ocr 还有音频分析等。这些任务用到的通常都是阻塞性的 api。现在我是通过 twsited 将它们创建成单独的线程或进程。但是我更想结合最近实践到的 fastapi 和 lifespan 来管理。

但是 lifespan 启动后台任务比较适合做一些 io 密集的,如果是这种 cpu 的可以说是典型错误了。那么进一步说,到底如何让 CPU 密集和 IO 密集的代码在 async 中协同工作呢?

答案是使用 asyncio.run_in_executor.

这个方法可以将一些任务 offload 到线程或者进程,具体取决于你传入的 executor 是什么。返回值是一个 Future 所以你就可以把它无缝组合到自己的 async 代码中了!

我打算在 lifespan 中使用 asyncio.create_task 创建一系列 tasks 然后在 tasks 中涉及到 CPU 密集方法时使用 asyncio.run_in_executor 执行,这样可以保证主服务 fastapi 不受到影响。同时,我们可以对 executor 进行 max_worker 限制,通过一些手段感知容器或物理机中的 cpu 数量,这样 CPU 密集的任务可以充分地利用到多核

#架构设计思考
说一下关于 ginja 的背景,我的 dalbox 的架构设计会为插件提供一套组件模板,我打算使用类 jinja2 的模板实现,而搜了下发现 go 没有成熟的,流行的模板,而官方的 template 功能比较简陋,不支持 macro 无法实现我的需求。所以我就想把 minijinja port 到 go 来使用 #开源项目
打算把 minijinja port 到 go 但有个问题是官方的 c binding 把错误存到了 TLS 这样在 go 的环境下就需要加 LockOSThread 不太行。

所以我需要自己 port 一个 c binding 参考 opendal-c 我觉得就是一个很好的实现。项目名就决定叫 ginja 吧。还要有个存放 artifact 的库,存放不同架构的 bin

或者,考虑到 bin 产物比较小,只有1.4m 经过 zstd 可能更小就几十 kb 直接存放在库里也不是不能接受,以固定版本的方式,比如2.9.0

#开源项目
我决定把 dalbox 和 nastool 合并。或者说,把 nastool 的功能重构到 dalbox 上,同时继续保留之前的插件架构设计 #开源项目
要吃自己的狗粮!打算用 opendal-go 写一个文件管理器,名字我都想好了,就叫 dalbox
今日 vibe coding 感想
对了,还要用 opendal 实现一个类似 alist 可以上传的插件。我现在经常通过 alist 上传再使用 nas-tool 刮削
这个过程完全可以用上述的插件事件通知触发刮削(插件允许上传时指定媒体元数据)

#nastool
nastool 本身应该为插件提供前端组件,一种简单的,制式组件。插件返回页面布局描述,由 nastool 本身组装成真正的页面。
插件的路由统一用 /module/:pluginid/作为前缀,可以自由追加路由。

在插件管理列表,可以设置将插件固定到侧边栏,或者首页常用

#nastool
今天稍微思考了一下插件架构应该怎么设计……

我的场景是这样的:主程序是个 nas 管理工具,需要通过插件来扩展功能。比如下载插件负责下载文件,元数据插件负责处理下载完的媒体文件信息,字幕插件去找字幕。这些插件之间可以互相配合,但也可以独立工作。

所以插件系统需要:
1. 每个插件都要能说明自己能做什么
2. 插件之间要能发布和订阅事件
3. 长时间任务要能反馈进度

经过一番思考,整理出了这样的接口规范:

# 每个插件必须实现的基础接口
@dataclass
class PluginInfo:
    id: str          # 插件唯一标识
    name: str        # 插件名称
    version: str     # 版本
    description: str # 描述
    author: str      # 作者
    homepage: str    # 可选,项目主页

@dataclass
class ActionParameter:
    type: str
    description: str
    required: bool
    default: Any = None

@dataclass
class Action:
    description: str
    method: str      # GET/POST/PUT/DELETE
    path: str
    params: dict[str, ActionParameter]
    returns: dict    # 返回值描述

@dataclass
class Event:
    description: str
    schema: dict     # JSON Schema 描述事件数据结构

# 基础 HTTP 接口
GET /plugin/info -> PluginInfo
GET /plugin/capabilities -> {
    "actions": dict[str, Action],
    "events": dict[str, Event],
    "subscriptions": list[str]
}
GET /health -> {"status": "ok" | "error"}
POST /events -> 接收订阅的事件


然后拿下载插件举个例子:

# 下载插件接口定义
POST /actions/download {
    "request": {
        "url": str,
        "save_path": str,
        "headers": dict[str, str]  # 可选
    },
    "response": {
        "task_id": str,
        "status": "started"
    }
}

GET /tasks/{task_id} {
    "response": {
        "task_id": str,
        "status": "downloading" | "completed" | "error",
        "progress": float,  # 0-100
        "speed": int,      # bytes/s
        "downloaded": int,  # bytes
        "total": int       # bytes
    }
}

# 事件定义
events = {
    "download.completed": {
        "task_id": str,
        "file_path": str,
        "file_size": int,
        "mime_type": str
    }
}


这样的设计既简单又灵活,用 HTTP + JSON 的方式也让插件开发没有语言限制。每个插件都是一个独立的 HTTP 服务,主程序只要调用这些 API 就行了。

插件之间的协作通过事件来完成,比如下载完成时发个事件,元数据插件订阅这个事件就能自动开始处理文件,处理完再发事件给字幕插件……

而用户通过 nastool 的插件管理页面,设置插件的订阅事件,以及如何提取所订阅的事件里的参数映射到插件入参等等。

其实这个配置过程还是不够人性化,有点废手,可以考虑以 mcp 的形式提供,由 ai 来决定怎么填充!

想到这里,感觉这个方案还不错,先这样吧 😋

#开源项目 #nastool
今天已经把多进程架构实现了,也干掉了热重载和 win 托盘这些累赘。
写多进程的时候一直在思考怎么更优雅地信号控制子进程退出以及控制全局变量的实例化时机,令我想起了18年写 PHP 多进程的日子。

现在突然想到一些新的点子,以前一直很想要的:
1. 视频转音频
2. 音频转文字
3. 文字自动适配成字幕。另外支持按作品设置一些专用字典保持翻译一致性等

#开源项目 #nastool
对于 nastool 的后续改造计划一

1. 由于 GIL 的存在,即使 twisted 线程池也无法真正充分利用多核心并行,所以打算用多进程+reuseport 的方式其多个 twisted 进程
2. 然后我意识到像配置监听热重载以及写入对于多进程来说是个阻碍
3. 我的部署场景是只考虑支持容器化,实际上不需要热重载和源码升级重启这些能力,应该将这些部分清除,既可以减少累赘,也可以扫除多进程的障碍
4. 新的架构应该是,首先一个主进程,负责初始化数据,然后启动多线程背景任务,接着管理分叉多个 twisted 子进程等待。各自的子进程并行利用多核优势进行网络请求的处理响应

#开源项目 #nastool
发布了第一个 alpha 版本镜像

ghcr.io/yuchanns/nastool:v2.10.0-alpha.2

#开源项目 #nastool

功能上暂无什么实质性的变更,主要改动了:
1. HTTP 运行时从 flask 自带的 demo 改成使用 twisted
2. 调整了项目布局,以便于优化 Dockerfile 的构建流程
3. 为代码添加了 ruff format 和 lint 并通过验证
4. 打包镜像时限制了最小权限提高安全性
5. 增加开发脚本,优化了开发时的体验
6. 采用 PDM 管理依赖 GitHub
记录一下 fedora zram 如何调整:
1. 安装配置生成器
sudo dnf install zram-generator-defaults

2. 创建配置文件`/etc/systemd/zram-generator.conf`并写入:
[zram0]
zram-size = min(ram, 4096)

3. 重启 zram 服务:
systemctl restart systemd-zram-setup@zram0


#linux
先把仓库建起来,并作了 pdm 的适配,对目录进行了改造。
修改了 Dockerfile 移除了一些没用的功能脚本。把基础镜像改成了 debian。

此外添加了 ruff lint format,还有 mypy typecheck 。不过目前 typecheck 有太多不通过的地方了😅慢慢改

https://github.com/yuchanns/nastool

#开源项目 #nastool
Back to Top