我大概明白了,是检查 LTASK_EXTERNAL_OPENLIBS 是否定义了。如果没有,就默认打开标准库。所以可以通过这个方式注入。不过 ant 的实现方式更彻底。直接替换掉 luaL_openlibs 。看来我的 lua binding 实现需要做一些调整,以便于支持这种操作
在为 ltask-go 编写 examples 时我陷入了困境,无法同时成功跑通三个平台的 bee.lua 。主要是无法跑通 windows 平台的。经过研究 ejoy/ant 的代码,我终于弄清楚了。嘿嘿,果然看不懂只是暂时的,迟早会弄懂。

https://github.com/ejoy/ant/blob/master/runtime/common/modules.c

ant 采用的方式是把所有的 clibs 的 luaopen 函数统一注入到自己的入口,这样 lua 代码就可以直接使用,而不需要寻找同名的 dylib

目前我还有一个疑惑,ltask 创建多个服务时,只会打开标准库。不知道是怎么注入的?难道是宏可以重写? ant/runtime/common/modules.c at master · ejoy/ant
rocknix 出场不支持中文,会导致中文字符全部显示为 ??? 即使在主题中安装了中文字体支持也没用。间接原因是每个 roms/*/gamtelist.xml 显示的游戏名字就是 ??? 因此可选的解决方法就是把 gamelist 里的显示名称改为中文字符,这样就可以在游戏机列表上显示为中文。这种事情可以写一个脚本来处理和生成
dalbox 是不是可以有个插件,把 retroarch 安装上,通过网页玩 retrogame
记忆碎片
lua binding 遇到一个问题,当 lua 取栈取到不存在的栈时,go 直接显示 exit syscall frame no longer availabe.这和 windows 环境下使用 yield 报错一样,不知道有没有什么关联
这个问题找到解决方案了: 之所以发生syscall frame 的错误,本质上还是因为 lua 在遇到错误的时候会进行 longjmp,即跳回安全点,这个过程对 goroutine 的栈造成了破坏。所以需要想办法避免 longjmp。在深入研究 lua 的源码后我发现 lua_error 会检查是否设置了安全位点setjmp来决定是否进行 longjmp 。如果没有设置,则检查用户是否设置了错误处理器,最后 fallback 到 panic 上。那么安全位点是什么时候设置的呢?其实很简单,就是调用 lua_pcall 的时候。这也是为什么该方法可以安全调用任何函数的原因。所以我们只要跳过该方法的调用,改为使用 call,同时设置错误处理器,就可以将不安全的错误用 go 接管。那么用 go 接管后,我们又该怎么返回错误呢?答案是进行简单的 panic 操作。然后在 call 的调用协程上 recover 就可以捕获这个错误。进一步讲,可以自己实现一个 lua_pcall,其底层实现是通过一个 lua_call 组合 panic 和 recover 来达成由 go 端接管 lua 错误处理的效果
lua binding 遇到一个问题,当 lua 取栈取到不存在的栈时,go 直接显示 exit syscall frame no longer availabe.这和 windows 环境下使用 yield 报错一样,不知道有没有什么关联
lua go binding 写得差不多了,有两个计划:1. 实现一个 ltask 。2. 打磨一下 binding 以支持多个版本: 所有版本共用一套封装,通过 tag 来标记版本,通过对比版本来决定是否注册到 purego
ginja 已经写好了第一个版本
go.yuchanns.xyz/ginja
但是性能很不理想,比起 go 标准库 template 慢了十几倍!
接下来想验证:
1. purego v.s. purego+libffi 性能
2. cgo v.s. purego 性能
如果确定是 libffi 拖后腿,我可能就需要考虑如何仅使用 purego 来实现了
今天开发 ginja 的时候发现和 opendal go binding 一样的 c 的部分有重复的代码,明天可以封装一个 typeffi
#开源项目 我又有个想法。想写一个 #mcptranslator 这个名字并不是说它是一个 mcp 工具,而是表示它会通过 mcp 协议使用一些工具进行翻译。例如,它会首先调用语言检测工具检测语种,然后使用对应语种的词典查阅单词信息,或者专有名词信息,接着根据上下文翻译到目标语言,最后查询 native speaker 常用表达语句,进行本地化润色。灵感来自于参与 yetone 的 avante.nvim 的开发与使用。这个项目促使 AI 使用各种工具优化代码,具有显著的质量提升
#dalbox 第四个插件,使用 emby api 进行内嵌播放能力
#dalbox 而 AI 字幕则作为一个单独的进程,以第一种插件的形式接入
#dalbox 按照之前的插件架构设计,一般插件都要以一个进程的形式而存在。感觉有时候不是很必要,尤其是这个插件只是作为一个垫片的形式存在时。例如: 下载插件,本质上是作为一个垫片帮助 dalbox 访问 aria2 的 API, 这种情况下 aria2 已经是一个进程了,垫片作为一个进程显得很不必要。应当考虑支持第二种形式的插件,比如 lua 脚本。这样就可以作为轻量的垫片来实现。运行在 goroutine 管理的 lua vm 下。像订阅这种插件似乎也没有必要以一个单独的进程存在,完全可以是一片 lua 代码, 进行简单地周期性轮询订阅
#dalbox 第三个重要的插件应该是字幕吧?而且是使用 AI 转音频转文字后翻译的字幕,支持设置作品上下文中特定的名词
#dalbox mvp 目标: 提供一个以 opendal 为核心的文件管理器本体,以及初步实现的插件架构和初版组件模板,并实现订阅插件(仅支持自定义订阅)和下载(aria2)的插件 #开源项目


# IO密集型后台任务 - 异步循环
async def io_intensive_background_task():
    """
    IO密集型后台任务 - 以固定间隔运行
    直接使用异步操作,不需要run_in_executor
    """
    logging.info("Starting IO intensive background task")
    while True:
        try:
            logging.info("Running IO intensive task cycle")
            
            # 模拟IO密集型操作,如文件操作或网络请求
            start_time = time.time()
            
            # 模拟多个并发IO操作
            io_tasks = [simulate_io_operation() for _ in range(5)]
            results = await asyncio.gather(*io_tasks)
            
            total_io_time = sum(result['time'] for result in results)
            elapsed = time.time() - start_time
            
            logging.info(f"IO task completed: {len(results)} operations, "
                         f"total IO time: {total_io_time:.2f}s, "
                         f"wall time: {elapsed:.2f}s")
            
            # 等待一段时间再次执行
            await asyncio.sleep(30)  # 每30秒执行一次
            
        except asyncio.CancelledError:
            logging.info("IO intensive background task cancelled")
            break
        except Exception as e:
            logging.error(f"Error in IO intensive task: {e}")
            await asyncio.sleep(5)  # 出错后短暂等待再重试

async def simulate_io_operation():
    """模拟单个IO密集型操作,如网络请求或文件读取"""
    # 模拟IO延迟,如网络延迟
    delay = 0.5 + (1.5 * np.random.random())  # 0.5到2秒的随机延迟
    start_time = time.time()
    
    await asyncio.sleep(delay)  # 模拟IO等待
    
    # 模拟读取的数据
    data_size = int(np.random.random() * 1000)  # 0到1000的随机数
    
    elapsed = time.time() - start_time
    return {
        "time": elapsed,
        "data_size": data_size
    }

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 记录应用启动
    logging.info("Application starting up")
    
    # 创建进程池
    cpus = get_cpu_limit()
    workers = max(1, cpus - 1)  # 保留一个CPU给系统和事件循环
    app.state.process_pool = ProcessPoolExecutor(max_workers=workers)
    logging.info(f"Process pool created with {workers} workers (detected {cpus} CPUs)")
    
    # 启动后台任务
    app.state.background_tasks = []
    
    # 添加CPU密集型后台任务
    cpu_task = asyncio.create_task(cpu_intensive_background_task(app))
    app.state.background_tasks.append(cpu_task)
    logging.info("CPU intensive background task started")
    
    # 添加IO密集型后台任务
    io_task = asyncio.create_task(io_intensive_background_task())
    app.state.background_tasks.append(io_task)
    logging.info("IO intensive background task started")
    
    yield
    
    # 清理资源
    logging.info("Application shutting down, cleaning up resources")
    
    # 取消所有后台任务
    for task in app.state.background_tasks:
        task.cancel()
    
    if app.state.background_tasks:
        # 等待所有任务正确取消
        await asyncio.gather(*app.state.background_tasks, return_exceptions=True)
        logging.info("All background tasks cancelled")
    
    # 关闭进程池
    app.state.process_pool.shutdown(wait=True)
    logging.info("Process pool shut down")

app = FastAPI(lifespan=lifespan)

@app.get("/status")
async def status():
    return {"status": "ok", "message": "Server is running with background tasks"}

#架构设计思考
接上,补一下代码示例
import os
import time
import asyncio
import logging
import numpy as np
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

def get_cpu_limit():
    """获取容器环境中可用的CPU数量"""
    try:
        # 检查cgroups v1
        if os.path.exists("/sys/fs/cgroup/cpu/cpu.cfs_quota_us"):
            with open("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") as fp:
                cfs_quota_us = int(fp.read().strip())
            # 只有当有实际限制时才继续
            if cfs_quota_us > 0:
                with open("/sys/fs/cgroup/cpu/cpu.cfs_period_us") as fp:
                    cfs_period_us = int(fp.read().strip())
                container_cpus = cfs_quota_us // cfs_period_us
                return container_cpus
        
        # 检查cgroups v2
        elif os.path.exists("/sys/fs/cgroup/cpu.max"):
            with open("/sys/fs/cgroup/cpu.max") as fp:
                content = fp.read().strip().split()
                if content[0] != "max":  # 有限制
                    quota = int(content[0])
                    period = int(content[1])
                    container_cpus = quota // period
                    return container_cpus
    
    except Exception as e:
        logging.warning(f"Error detecting container CPU limit: {e}")
    
    # 回退到系统CPU计数
    return cpu_count()

# CPU密集型函数 - 在单独进程中运行
def cpu_intensive_operation(size=1000):
    """
    一个模拟CPU密集型操作的函数,例如矩阵运算
    这个函数在一个单独的进程中运行
    """
    logging.info(f"Starting CPU intensive operation with size {size}")
    # 创建随机矩阵
    matrix_a = np.random.rand(size, size)
    matrix_b = np.random.rand(size, size)
    
    # 执行耗CPU的矩阵乘法
    start_time = time.time()
    result = np.matmul(matrix_a, matrix_b)
    
    # 计算一些统计数据
    mean_value = np.mean(result)
    std_value = np.std(result)
    
    elapsed = time.time() - start_time
    logging.info(f"Completed CPU intensive operation in {elapsed:.2f} seconds")
    
    return {
        "mean": float(mean_value),
        "std": float(std_value),
        "elapsed": elapsed
    }

# CPU密集型后台任务 - 异步循环
async def cpu_intensive_background_task(app):
    """
    CPU密集型后台任务 - 以固定间隔运行
    使用run_in_executor将耗CPU的操作卸载到进程池
    """
    logging.info("Starting CPU intensive background task")
    while True:
        try:
            logging.info("Running CPU intensive task cycle")
            
            # 获取事件循环
            loop = asyncio.get_running_loop()
            
            # 将CPU密集型操作卸载到进程池
            result = await loop.run_in_executor(
                app.state.process_pool, 
                cpu_intensive_operation,  # 传递CPU密集型函数
                500  # 矩阵大小参数
            )
            
            logging.info(f"CPU task result: mean={result['mean']:.4f}, time={result['elapsed']:.2f}s")
            
            # 等待一段时间再次执行
            await asyncio.sleep(60)  # 每分钟执行一次
            
        except asyncio.CancelledError:
            logging.info("CPU intensive background task cancelled")
            break
        except Exception as e:
            logging.error(f"Error in CPU intensive task: {e}")
            await asyncio.sleep(10)  # 出错后短暂等待再重试
Back to Top