# IO密集型后台任务 - 异步循环
async def io_intensive_background_task():
"""
IO密集型后台任务 - 以固定间隔运行
直接使用异步操作,不需要run_in_executor
"""
logging.info("Starting IO intensive background task")
while True:
try:
logging.info("Running IO intensive task cycle")
# 模拟IO密集型操作,如文件操作或网络请求
start_time = time.time()
# 模拟多个并发IO操作
io_tasks = [simulate_io_operation() for _ in range(5)]
results = await asyncio.gather(*io_tasks)
total_io_time = sum(result['time'] for result in results)
elapsed = time.time() - start_time
logging.info(f"IO task completed: {len(results)} operations, "
f"total IO time: {total_io_time:.2f}s, "
f"wall time: {elapsed:.2f}s")
# 等待一段时间再次执行
await asyncio.sleep(30) # 每30秒执行一次
except asyncio.CancelledError:
logging.info("IO intensive background task cancelled")
break
except Exception as e:
logging.error(f"Error in IO intensive task: {e}")
await asyncio.sleep(5) # 出错后短暂等待再重试
async def simulate_io_operation():
"""模拟单个IO密集型操作,如网络请求或文件读取"""
# 模拟IO延迟,如网络延迟
delay = 0.5 + (1.5 * np.random.random()) # 0.5到2秒的随机延迟
start_time = time.time()
await asyncio.sleep(delay) # 模拟IO等待
# 模拟读取的数据
data_size = int(np.random.random() * 1000) # 0到1000的随机数
elapsed = time.time() - start_time
return {
"time": elapsed,
"data_size": data_size
}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 记录应用启动
logging.info("Application starting up")
# 创建进程池
cpus = get_cpu_limit()
workers = max(1, cpus - 1) # 保留一个CPU给系统和事件循环
app.state.process_pool = ProcessPoolExecutor(max_workers=workers)
logging.info(f"Process pool created with {workers} workers (detected {cpus} CPUs)")
# 启动后台任务
app.state.background_tasks = []
# 添加CPU密集型后台任务
cpu_task = asyncio.create_task(cpu_intensive_background_task(app))
app.state.background_tasks.append(cpu_task)
logging.info("CPU intensive background task started")
# 添加IO密集型后台任务
io_task = asyncio.create_task(io_intensive_background_task())
app.state.background_tasks.append(io_task)
logging.info("IO intensive background task started")
yield
# 清理资源
logging.info("Application shutting down, cleaning up resources")
# 取消所有后台任务
for task in app.state.background_tasks:
task.cancel()
if app.state.background_tasks:
# 等待所有任务正确取消
await asyncio.gather(*app.state.background_tasks, return_exceptions=True)
logging.info("All background tasks cancelled")
# 关闭进程池
app.state.process_pool.shutdown(wait=True)
logging.info("Process pool shut down")
app = FastAPI(lifespan=lifespan)
@app.get("/status")
async def status():
return {"status": "ok", "message": "Server is running with background tasks"}
#架构设计思考
接上,补一下代码示例
import os
import time
import asyncio
import logging
import numpy as np
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def get_cpu_limit():
"""获取容器环境中可用的CPU数量"""
try:
# 检查cgroups v1
if os.path.exists("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"):
with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as fp:
cfs_quota_us = int(fp.read().strip())
# 只有当有实际限制时才继续
if cfs_quota_us > 0:
with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as fp:
cfs_period_us = int(fp.read().strip())
container_cpus = cfs_quota_us // cfs_period_us
return container_cpus
# 检查cgroups v2
elif os.path.exists("/sys/fs/cgroup/cpu.max"):
with open("/sys/fs/cgroup/cpu.max") as fp:
content = fp.read().strip().split()
if content[0] != "max": # 有限制
quota = int(content[0])
period = int(content[1])
container_cpus = quota // period
return container_cpus
except Exception as e:
logging.warning(f"Error detecting container CPU limit: {e}")
# 回退到系统CPU计数
return cpu_count()
# CPU密集型函数 - 在单独进程中运行
def cpu_intensive_operation(size=1000):
"""
一个模拟CPU密集型操作的函数,例如矩阵运算
这个函数在一个单独的进程中运行
"""
logging.info(f"Starting CPU intensive operation with size {size}")
# 创建随机矩阵
matrix_a = np.random.rand(size, size)
matrix_b = np.random.rand(size, size)
# 执行耗CPU的矩阵乘法
start_time = time.time()
result = np.matmul(matrix_a, matrix_b)
# 计算一些统计数据
mean_value = np.mean(result)
std_value = np.std(result)
elapsed = time.time() - start_time
logging.info(f"Completed CPU intensive operation in {elapsed:.2f} seconds")
return {
"mean": float(mean_value),
"std": float(std_value),
"elapsed": elapsed
}
# CPU密集型后台任务 - 异步循环
async def cpu_intensive_background_task(app):
"""
CPU密集型后台任务 - 以固定间隔运行
使用run_in_executor将耗CPU的操作卸载到进程池
"""
logging.info("Starting CPU intensive background task")
while True:
try:
logging.info("Running CPU intensive task cycle")
# 获取事件循环
loop = asyncio.get_running_loop()
# 将CPU密集型操作卸载到进程池
result = await loop.run_in_executor(
app.state.process_pool,
cpu_intensive_operation, # 传递CPU密集型函数
500 # 矩阵大小参数
)
logging.info(f"CPU task result: mean={result['mean']:.4f}, time={result['elapsed']:.2f}s")
# 等待一段时间再次执行
await asyncio.sleep(60) # 每分钟执行一次
except asyncio.CancelledError:
logging.info("CPU intensive background task cancelled")
break
except Exception as e:
logging.error(f"Error in CPU intensive task: {e}")
await asyncio.sleep(10) # 出错后短暂等待再重试
我们组有个 python 服务,里面有一些机器学习的功能,例如人脸识别和 ocr 还有音频分析等。这些任务用到的通常都是阻塞性的 api。现在我是通过 twsited 将它们创建成单独的线程或进程。但是我更想结合最近实践到的 fastapi 和 lifespan 来管理。
但是 lifespan 启动后台任务比较适合做一些 io 密集的,如果是这种 cpu 的可以说是典型错误了。那么进一步说,到底如何让 CPU 密集和 IO 密集的代码在 async 中协同工作呢?
答案是使用
这个方法可以将一些任务 offload 到线程或者进程,具体取决于你传入的 executor 是什么。返回值是一个 Future 所以你就可以把它无缝组合到自己的 async 代码中了!
我打算在 lifespan 中使用
#架构设计思考
但是 lifespan 启动后台任务比较适合做一些 io 密集的,如果是这种 cpu 的可以说是典型错误了。那么进一步说,到底如何让 CPU 密集和 IO 密集的代码在 async 中协同工作呢?
答案是使用
asyncio.run_in_executor
.这个方法可以将一些任务 offload 到线程或者进程,具体取决于你传入的 executor 是什么。返回值是一个 Future 所以你就可以把它无缝组合到自己的 async 代码中了!
我打算在 lifespan 中使用
asyncio.create_task
创建一系列 tasks 然后在 tasks 中涉及到 CPU 密集方法时使用 asyncio.run_in_executor
执行,这样可以保证主服务 fastapi 不受到影响。同时,我们可以对 executor 进行 max_worker 限制,通过一些手段感知容器或物理机中的 cpu 数量,这样 CPU 密集的任务可以充分地利用到多核#架构设计思考
说一下关于 ginja 的背景,我的 dalbox 的架构设计会为插件提供一套组件模板,我打算使用类 jinja2 的模板实现,而搜了下发现 go 没有成熟的,流行的模板,而官方的 template 功能比较简陋,不支持 macro 无法实现我的需求。所以我就想把 minijinja port 到 go 来使用 #开源项目
打算把 minijinja port 到 go 但有个问题是官方的 c binding 把错误存到了 TLS 这样在 go 的环境下就需要加 LockOSThread 不太行。
所以我需要自己 port 一个 c binding 参考 opendal-c 我觉得就是一个很好的实现。项目名就决定叫 ginja 吧。还要有个存放 artifact 的库,存放不同架构的 bin
或者,考虑到 bin 产物比较小,只有1.4m 经过 zstd 可能更小就几十 kb 直接存放在库里也不是不能接受,以固定版本的方式,比如2.9.0
#开源项目
所以我需要自己 port 一个 c binding 参考 opendal-c 我觉得就是一个很好的实现。项目名就决定叫 ginja 吧。还要有个存放 artifact 的库,存放不同架构的 bin
或者,考虑到 bin 产物比较小,只有1.4m 经过 zstd 可能更小就几十 kb 直接存放在库里也不是不能接受,以固定版本的方式,比如2.9.0
#开源项目
写了一个小工具把 copilot 转换成 open ai api server
https://github.com/yuchanns/copilot-openai-api
这样就可以在各种软件里充分利用我的 copilot 了
#开源项目
https://github.com/yuchanns/copilot-openai-api
这样就可以在各种软件里充分利用我的 copilot 了
#开源项目
对了,还要用 opendal 实现一个类似 alist 可以上传的插件。我现在经常通过 alist 上传再使用 nas-tool 刮削
这个过程完全可以用上述的插件事件通知触发刮削(插件允许上传时指定媒体元数据)
#nastool
这个过程完全可以用上述的插件事件通知触发刮削(插件允许上传时指定媒体元数据)
#nastool
nastool 本身应该为插件提供前端组件,一种简单的,制式组件。插件返回页面布局描述,由 nastool 本身组装成真正的页面。
插件的路由统一用 /module/:pluginid/作为前缀,可以自由追加路由。
在插件管理列表,可以设置将插件固定到侧边栏,或者首页常用
#nastool
插件的路由统一用 /module/:pluginid/作为前缀,可以自由追加路由。
在插件管理列表,可以设置将插件固定到侧边栏,或者首页常用
#nastool
今天稍微思考了一下插件架构应该怎么设计……
我的场景是这样的:主程序是个 nas 管理工具,需要通过插件来扩展功能。比如下载插件负责下载文件,元数据插件负责处理下载完的媒体文件信息,字幕插件去找字幕。这些插件之间可以互相配合,但也可以独立工作。
所以插件系统需要:
1. 每个插件都要能说明自己能做什么
2. 插件之间要能发布和订阅事件
3. 长时间任务要能反馈进度
经过一番思考,整理出了这样的接口规范:
然后拿下载插件举个例子:
这样的设计既简单又灵活,用 HTTP + JSON 的方式也让插件开发没有语言限制。每个插件都是一个独立的 HTTP 服务,主程序只要调用这些 API 就行了。
插件之间的协作通过事件来完成,比如下载完成时发个事件,元数据插件订阅这个事件就能自动开始处理文件,处理完再发事件给字幕插件……
而用户通过 nastool 的插件管理页面,设置插件的订阅事件,以及如何提取所订阅的事件里的参数映射到插件入参等等。
其实这个配置过程还是不够人性化,有点废手,可以考虑以 mcp 的形式提供,由 ai 来决定怎么填充!
想到这里,感觉这个方案还不错,先这样吧 😋
#开源项目 #nastool
我的场景是这样的:主程序是个 nas 管理工具,需要通过插件来扩展功能。比如下载插件负责下载文件,元数据插件负责处理下载完的媒体文件信息,字幕插件去找字幕。这些插件之间可以互相配合,但也可以独立工作。
所以插件系统需要:
1. 每个插件都要能说明自己能做什么
2. 插件之间要能发布和订阅事件
3. 长时间任务要能反馈进度
经过一番思考,整理出了这样的接口规范:
# 每个插件必须实现的基础接口
@dataclass
class PluginInfo:
id: str # 插件唯一标识
name: str # 插件名称
version: str # 版本
description: str # 描述
author: str # 作者
homepage: str # 可选,项目主页
@dataclass
class ActionParameter:
type: str
description: str
required: bool
default: Any = None
@dataclass
class Action:
description: str
method: str # GET/POST/PUT/DELETE
path: str
params: dict[str, ActionParameter]
returns: dict # 返回值描述
@dataclass
class Event:
description: str
schema: dict # JSON Schema 描述事件数据结构
# 基础 HTTP 接口
GET /plugin/info -> PluginInfo
GET /plugin/capabilities -> {
"actions": dict[str, Action],
"events": dict[str, Event],
"subscriptions": list[str]
}
GET /health -> {"status": "ok" | "error"}
POST /events -> 接收订阅的事件
然后拿下载插件举个例子:
# 下载插件接口定义
POST /actions/download {
"request": {
"url": str,
"save_path": str,
"headers": dict[str, str] # 可选
},
"response": {
"task_id": str,
"status": "started"
}
}
GET /tasks/{task_id} {
"response": {
"task_id": str,
"status": "downloading" | "completed" | "error",
"progress": float, # 0-100
"speed": int, # bytes/s
"downloaded": int, # bytes
"total": int # bytes
}
}
# 事件定义
events = {
"download.completed": {
"task_id": str,
"file_path": str,
"file_size": int,
"mime_type": str
}
}
这样的设计既简单又灵活,用 HTTP + JSON 的方式也让插件开发没有语言限制。每个插件都是一个独立的 HTTP 服务,主程序只要调用这些 API 就行了。
插件之间的协作通过事件来完成,比如下载完成时发个事件,元数据插件订阅这个事件就能自动开始处理文件,处理完再发事件给字幕插件……
而用户通过 nastool 的插件管理页面,设置插件的订阅事件,以及如何提取所订阅的事件里的参数映射到插件入参等等。
其实这个配置过程还是不够人性化,有点废手,可以考虑以 mcp 的形式提供,由 ai 来决定怎么填充!
想到这里,感觉这个方案还不错,先这样吧 😋
#开源项目 #nastool
对于 nastool 的后续改造计划一
1. 由于 GIL 的存在,即使 twisted 线程池也无法真正充分利用多核心并行,所以打算用多进程+reuseport 的方式其多个 twisted 进程
2. 然后我意识到像配置监听热重载以及写入对于多进程来说是个阻碍
3. 我的部署场景是只考虑支持容器化,实际上不需要热重载和源码升级重启这些能力,应该将这些部分清除,既可以减少累赘,也可以扫除多进程的障碍
4. 新的架构应该是,首先一个主进程,负责初始化数据,然后启动多线程背景任务,接着管理分叉多个 twisted 子进程等待。各自的子进程并行利用多核优势进行网络请求的处理响应
#开源项目 #nastool
1. 由于 GIL 的存在,即使 twisted 线程池也无法真正充分利用多核心并行,所以打算用多进程+reuseport 的方式其多个 twisted 进程
2. 然后我意识到像配置监听热重载以及写入对于多进程来说是个阻碍
3. 我的部署场景是只考虑支持容器化,实际上不需要热重载和源码升级重启这些能力,应该将这些部分清除,既可以减少累赘,也可以扫除多进程的障碍
4. 新的架构应该是,首先一个主进程,负责初始化数据,然后启动多线程背景任务,接着管理分叉多个 twisted 子进程等待。各自的子进程并行利用多核优势进行网络请求的处理响应
#开源项目 #nastool
发布了第一个 alpha 版本镜像
ghcr.io/yuchanns/nastool:v2.10.0-alpha.2
#开源项目 #nastool
功能上暂无什么实质性的变更,主要改动了:
1. HTTP 运行时从 flask 自带的 demo 改成使用 twisted
2. 调整了项目布局,以便于优化 Dockerfile 的构建流程
3. 为代码添加了 ruff format 和 lint 并通过验证
4. 打包镜像时限制了最小权限提高安全性
5. 增加开发脚本,优化了开发时的体验
6. 采用 PDM 管理依赖
ghcr.io/yuchanns/nastool:v2.10.0-alpha.2
#开源项目 #nastool
功能上暂无什么实质性的变更,主要改动了:
1. HTTP 运行时从 flask 自带的 demo 改成使用 twisted
2. 调整了项目布局,以便于优化 Dockerfile 的构建流程
3. 为代码添加了 ruff format 和 lint 并通过验证
4. 打包镜像时限制了最小权限提高安全性
5. 增加开发脚本,优化了开发时的体验
6. 采用 PDM 管理依赖
记录一下 fedora zram 如何调整:
1. 安装配置生成器
2. 创建配置文件`/etc/systemd/zram-generator.conf`并写入:
3. 重启 zram 服务:
#linux
1. 安装配置生成器
sudo dnf install zram-generator-defaults
2. 创建配置文件`/etc/systemd/zram-generator.conf`并写入:
[zram0]
zram-size = min(ram, 4096)
3. 重启 zram 服务:
systemctl restart systemd-zram-setup@zram0
#linux
刚刚发现原来 nastool 的原作者开了个新坑 https://github.com/jxxghp/MoviePilot
😂不过我还是决定基于原来的 2.9.1 分支继续开发我想要的版本,因为我感觉到了乐趣
#开源项目 #nastool
😂不过我还是决定基于原来的 2.9.1 分支继续开发我想要的版本,因为我感觉到了乐趣
#开源项目 #nastool
看了一下似乎 ruff 对 mypy 的支持也在路上了
typecheck 先放着不管,等后面再修吧
https://x.com/charliermarsh/status/1884651482009477368
#开源项目
typecheck 先放着不管,等后面再修吧
https://x.com/charliermarsh/status/1884651482009477368
#开源项目
先把仓库建起来,并作了 pdm 的适配,对目录进行了改造。
修改了 Dockerfile 移除了一些没用的功能脚本。把基础镜像改成了 debian。
此外添加了 ruff lint format,还有 mypy typecheck 。不过目前 typecheck 有太多不通过的地方了😅慢慢改
https://github.com/yuchanns/nastool
#开源项目 #nastool
修改了 Dockerfile 移除了一些没用的功能脚本。把基础镜像改成了 debian。
此外添加了 ruff lint format,还有 mypy typecheck 。不过目前 typecheck 有太多不通过的地方了😅慢慢改
https://github.com/yuchanns/nastool
#开源项目 #nastool