当前位置: 首页 > news >正文

基于TypeScript与NeuroLink构建企业级AI代理:架构设计与实战指南

1. 项目概述为什么我们需要一个自建的AI代理在当前的AI应用开发浪潮中直接调用各大语言模型LLM的API比如OpenAI的GPT-4或Anthropic的Claude似乎是最直接的路径。然而当你真正将AI能力集成到生产环境中尤其是面对高并发、多用户、成本敏感的场景时一系列“甜蜜的烦恼”就会接踵而至。想象一下你的应用突然因为一个意外的提示词循环调用而产生了天价账单或者某个关键业务因为单一AI服务提供商宕机而全面停摆又或者你根本无法回答老板“上个季度我们在AI上花了多少钱哪个团队用得最多”这类看似简单的问题。这正是我们团队在构建金融科技应用时亲身经历的痛点。一个自建的AI代理AI Proxy就是为解决这些问题而生的“交通枢纽”和“控制中心”。它不是一个简单的转发器而是一个具备路由、缓存、限流、监控和安全审计等核心能力的中间层。通过它你可以将杂乱的、供应商锁定的、不可控的AI调用转变为一个清晰、可管理、可观测的标准化流程。这就像为你的城市交通系统建立了一个智能调度中心而不是让每辆车都直接开上高速。使用TypeScript来构建这样一个代理是当下一个非常务实的选择。TypeScript的强类型系统能在开发阶段就捕捉到大量潜在的错误这对于处理复杂的API响应和中间件逻辑至关重要。同时Node.js生态的成熟度保证了高性能和丰富的库支持。本文将基于一个名为NeuroLink的通用AI SDK手把手带你构建一个功能完备的AI代理我会分享从架构设计到每一行代码的思考以及我们在实战中踩过的坑和总结的经验。2. 核心架构设计拆解一个AI代理的组成部分在动手写代码之前我们必须先想清楚一个健壮的AI代理应该由哪些核心模块构成以及它们之间如何协同工作。一个好的架构设计是项目成功的一半。2.1 请求路由层智能调度的大脑路由层是代理的“大脑”它决定了每个进来的请求应该由哪个AI模型来处理。最简单的路由策略是“写死”一个提供商但这完全失去了代理的价值。更高级的策略包括成本优先路由将请求自动导向当前最便宜的可用模型例如对于简单的分类任务使用gpt-3.5-turbo而非gpt-4。性能/能力路由根据请求的复杂度如提示词长度、特定关键词选择最合适的模型例如涉及复杂推理的请求路由到claude-3-opus而简单的文本补全则交给gemini-flash。故障转移路由当首选提供商出现高延迟或错误时自动切换到备用提供商保障服务的高可用性。负载均衡路由如果你有多个相同模型的API密钥可以在它们之间进行轮询避免单个密钥的速率限制。设计心得路由策略不应是静态配置而应该是一个可插拔的、基于规则的引擎。我们初期曾尝试用一个巨大的if-else链来实现路由结果代码迅速变得难以维护。后来我们将其抽象为一系列“路由规则”对象每个规则包含一个condition函数和一个action函数使得策略的增删改查变得非常清晰。2.2 缓存层提升性能与降低成本的利器LLM的API调用不仅慢几百毫秒到几秒而且贵。对于许多场景如常见问答、内容模板生成、参数固定的数据提取等其输出在一定时间内是确定或可重复使用的。缓存层的作用就是将完全相同的请求及其响应存储起来在后续相同请求到达时直接返回缓存结果。缓存键设计这是缓存有效性的核心。键必须包含所有影响输出的因素完整的提示词prompt、模型名称、温度temperature等参数。一个常见的错误是忽略了temperature0和temperature0.7的请求本质上是不同的不能共用缓存。缓存存储与失效可以选择内存缓存如lru-cache实现简单快速但服务重启后数据丢失也可以使用Redis等外部存储实现分布式缓存和持久化。需要设置合理的TTL生存时间因为模型的知识可能会更新过时的缓存可能返回错误信息。2.3 速率限制与配额管理成本的“刹车系统”没有限流的AI调用就像没有预算控制的采购随时可能酿成财务灾难。速率限制通常从两个维度进行全局速率限制保护你的钱包防止因代码bug或恶意攻击导致对AI API的疯狂调用。例如全局每分钟不超过1000次请求。用户/项目级配额在多租户系统中为每个用户或内部团队设置每日/每月的调用次数或费用上限。这需要与你的用户认证系统集成。踩坑实录我们曾遭遇过一次“午夜惊魂”。一个部署在测试环境的爬虫脚本发生了逻辑错误在凌晨疯狂调用AI接口短短几小时产生了远超预期的费用。自那以后我们在代理的最外层就加上了严格的、基于令牌桶算法的全局速率限制这是保护成本的第一道也是最重要的防线。2.4 监控、日志与成本追踪可观测性的基石“看不见”的系统是危险的。你需要清楚地知道谁在什么时候调用了什么模型花了多少钱精确到token响应耗时多长成功还是失败了结构化日志记录每一次请求的请求ID、用户标识、时间戳、请求参数、响应内容可脱敏、token用量、耗时和错误信息。这些日志应输出到像ELK Stack或Datadog这样的可观测性平台。成本聚合NeuroLink等SDK会在响应中返回本次调用使用的prompt_tokens和completion_tokens。你需要根据各提供商公开的定价表如OpenAI的$0.01 / 1K tokens实时计算并累加成本。这部分数据最好同时写入时间序列数据库如Prometheus和业务数据库以便做实时监控和后期对账。关键指标定义并暴露一些核心指标如请求速率QPS、平均/分位点延迟P95 P99、错误率、各模型/用户的token消耗速率。这些是判断系统健康度和进行容量规划的依据。2.5 安全与合规层不可或缺的守护者AI代理处理的数据可能包含敏感信息。这一层主要负责API密钥管理绝对不要在客户端或代码仓库中硬编码API密钥。代理应集中从环境变量或密钥管理服务如AWS Secrets Manager中加载密钥并为下游调用进行签名或注入。输入清洗与PII脱敏在将用户输入发送给外部AI提供商前应扫描并脱敏其中的个人身份信息PII如邮箱、手机号、身份证号、信用卡号等。这既保护用户隐私也符合GDPR等数据法规的要求。输出内容过滤对AI返回的内容进行安全检查过滤掉极端、有害或不适当的内容防止其流入你的应用。3. 基于NeuroLink构建代理从零开始的实战NeuroLink是一个TypeScript优先的通用AI SDK它的核心价值在于用一套统一的API抽象了超过13家不同的AI提供商。这意味着你的业务代码不再需要关心openai库和anthropic-ai/sdk的语法差异。我们将利用它的中间件系统高效地实现上述所有代理功能。3.1 项目初始化与基础配置首先创建一个新的TypeScript项目并安装依赖。# 初始化项目 mkdir my-ai-proxy cd my-ai-proxy npm init -y # 安装核心依赖 npm install juspay/neurolink # 安装开发依赖和工具库 npm install -D typescript ts-node types/node npm install express dotenv lru-cache接下来配置TypeScript和创建项目结构。// tsconfig.json { compilerOptions: { target: ES2022, module: commonjs, lib: [ES2022], outDir: ./dist, rootDir: ./src, strict: true, esModuleInterop: true, skipLibCheck: true, forceConsistentCasingInFileNames: true, resolveJsonModule: true }, include: [src/**/*], exclude: [node_modules, dist] }# 项目目录结构 src/ ├── index.ts # 应用入口 ├── config/ │ └── index.ts # 配置文件 ├── middleware/ # 自定义中间件 │ ├── logging.ts │ ├── cache.ts │ ├── router.ts │ ├── rateLimit.ts │ └── sanitize.ts ├── services/ │ └── neurolink.ts # NeuroLink服务封装 └── types/ └── index.ts # 自定义类型定义创建环境变量文件并确保它被添加到.gitignore中。# .env PORT3000 OPENAI_API_KEYsk-your-openai-key-here ANTHROPIC_API_KEYyour-anthropic-key-here GOOGLE_AI_API_KEYyour-google-ai-key-here # ... 其他提供商的密钥 # 缓存和限流配置 GLOBAL_RATE_LIMIT_MAX1000 GLOBAL_RATE_LIMIT_WINDOW_MS60000 # 1分钟 USER_CACHE_TTL_MS300000 # 5分钟3.2 核心服务层封装NeuroLink实例在services/neurolink.ts中我们初始化NeuroLink客户端并集中管理配置。这是整个代理的引擎。// src/services/neurolink.ts import { NeuroLink, type Middleware } from juspay/neurolink; import dotenv from dotenv; // 加载环境变量 dotenv.config(); // 初始化NeuroLink客户端 export const neurolink new NeuroLink({ // 配置支持的AI提供商 providers: { openai: { apiKey: process.env.OPENAI_API_KEY!, // 可以配置默认模型、超时等 defaultModel: gpt-3.5-turbo, }, anthropic: { apiKey: process.env.ANTHROPIC_API_KEY!, defaultModel: claude-3-haiku-20240307, }, googleAI: { apiKey: process.env.GOOGLE_AI_API_KEY!, defaultModel: gemini-1.5-flash, }, // 可以继续添加其他提供商如cohere, groq, ollama等 }, // 全局默认配置 defaultOptions: { temperature: 0.7, maxTokens: 1024, }, }); console.log(NeuroLink AI 客户端初始化成功已配置提供商:, Object.keys(neurolink.providers)); // 这是一个辅助函数用于后续中间件注册 export function registerMiddlewares(middlewares: Middleware[]) { middlewares.forEach(middleware neurolink.use(middleware)); console.log(已注册 ${middlewares.length} 个中间件); }实操要点将AI提供商的API密钥通过环境变量注入是安全实践的基本要求。在生产环境中你应该使用像HashiCorp Vault或云服务商提供的密钥管理服务。此外为每个提供商设置一个合理的defaultModel和timeout可以避免在业务代码中重复配置也便于统一调整。3.3 实现日志与监控中间件日志中间件是我们了解系统运行状况的“眼睛”。我们将实现一个功能更丰富的版本不仅打印日志还收集关键指标。// src/middleware/logging.ts import { type Middleware, type GenerateOptions, type GenerateResult, type Usage } from juspay/neurolink; // 定义扩展的选项类型用于在中间件链中传递元数据 interface ExtendedOptions extends GenerateOptions { _requestId?: string; _startTime?: number; _userId?: string; // 可以从请求头中解析 } export const loggingMiddleware: Middleware { name: advanced-logging, async onBeforeGenerate(options: GenerateOptions): PromiseGenerateOptions { const startTime Date.now(); const requestId req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}; const extendedOptions: ExtendedOptions { ...options, _requestId: requestId, _startTime: startTime, // 在实际应用中这里可以从HTTP请求的JWT令牌或API Key中解析出userId _userId: (options as any).headers?.[x-user-id] || anonymous, }; // 结构化日志记录请求开始 const logEntry { level: INFO, timestamp: new Date().toISOString(), requestId, userId: extendedOptions._userId, event: LLM_REQUEST_START, data: { model: options.model, provider: options.provider, // 只记录提示词的前200个字符避免日志过大和泄露敏感信息 promptPreview: options.input?.text?.substring(0, 200), temperature: options.temperature, maxTokens: options.maxTokens, }, }; console.log(JSON.stringify(logEntry)); // 这里可以集成OpenTelemetry创建Span // const span tracer.startSpan(llm.generate, { attributes: { requestId, userId, model: options.model } }); return extendedOptions; }, async onAfterGenerate(result: GenerateResult, options: ExtendedOptions): PromiseGenerateResult { const endTime Date.now(); const duration endTime - (options._startTime || endTime); const usage: Usage result.usage || { promptTokens: 0, completionTokens: 0 }; // 计算预估成本这是一个简化示例实际成本需根据提供商和模型精确计算 const estimatedCost estimateCost(usage, options.provider, options.model); const logEntry { level: INFO, timestamp: new Date().toISOString(), requestId: options._requestId, userId: options._userId, event: LLM_REQUEST_COMPLETE, data: { model: options.model, provider: options.provider, durationMs: duration, usage, estimatedCost, // 同样只记录输出的预览 outputPreview: result.output?.text?.substring(0, 200), success: true, }, }; console.log(JSON.stringify(logEntry)); // 可以将指标推送到Prometheus或StatsD // metrics.llmRequestDuration.observe(duration / 1000); // 转换为秒 // metrics.llmTokensUsed.inc(usage.promptTokens usage.completionTokens); return result; }, async onError(error: Error, options: ExtendedOptions): Promisevoid { const errorLogEntry { level: ERROR, timestamp: new Date().toISOString(), requestId: options._requestId, userId: options._userId, event: LLM_REQUEST_FAILED, data: { model: options.model, provider: options.provider, error: error.message, stack: error.stack, }, }; console.error(JSON.stringify(errorLogEntry)); // 错误指标上报 // metrics.llmRequestError.inc(); // 注意这里需要重新抛出错误以便后续中间件或调用者处理 throw error; }, }; // 简单的成本估算函数需要根据各提供商最新价格表更新 function estimateCost(usage: Usage, provider?: string, model?: string): number { // 示例OpenAI GPT-3.5 Turbo 的输入$0.50/1M tokens 输出$1.50/1M tokens const costPerMillionInput 0.5; const costPerMillionOutput 1.5; const inputCost (usage.promptTokens / 1_000_000) * costPerMillionInput; const outputCost (usage.completionTokens / 1_000_000) * costPerMillionOutput; return parseFloat((inputCost outputCost).toFixed(6)); // 保留6位小数 }这个中间件做了几件关键事情为每个请求生成唯一ID便于追踪记录结构化的JSON日志方便后续用日志分析工具如Loki, Splunk处理在请求完成时计算预估成本并预留了集成OpenTelemetry和监控指标如Prometheus的入口。将日志输出为JSON格式而不是纯文本是生产环境的最佳实践。3.4 实现智能缓存中间件缓存是提升性能和降低成本最有效的手段之一。我们将实现一个基于内存LRU最近最少使用的缓存并考虑更复杂的缓存键设计。// src/middleware/cache.ts import { type Middleware, type GenerateOptions, type GenerateResult } from juspay/neurolink; import LRUCache from lru-cache; import { createHash } from crypto; interface CacheEntry { result: GenerateResult; timestamp: number; metadata: { model: string; provider?: string; tokenUsage: number; }; } // 初始化一个LRU缓存 const llmResponseCache new LRUCachestring, CacheEntry({ max: 500, // 缓存最多500条记录 ttl: 1000 * 60 * 10, // 默认每条记录存活10分钟 // 当条目被驱逐时可以在这里记录日志或更新指标 dispose: (key, value) { console.log(缓存条目被驱逐: ${key.substring(0, 50)}...); }, }); export const cachingMiddleware: Middleware { name: response-cache, async onBeforeGenerate(options: GenerateOptions): PromiseGenerateOptions { // 1. 决定哪些请求需要被缓存 // 通常只有确定性高的请求才适合缓存例如temperature0的请求 // 或者我们可以通过一个配置列表或函数来判断 const shouldCache shouldCacheThisRequest(options); if (!shouldCache) { (options as any)._skipCache true; return options; } // 2. 生成缓存键必须唯一标识一个请求 const cacheKey generateCacheKey(options); const cachedEntry llmResponseCache.get(cacheKey); if (cachedEntry) { console.log([${this.name}] 缓存命中 for key: ${cacheKey.substring(0, 30)}...); // 在结果上标记来自缓存并附加缓存元数据 const cachedResult { ...cachedEntry.result, _metadata: { ...cachedEntry.result._metadata, cached: true, cachedAt: cachedEntry.timestamp, }, }; // 通过一个特殊字段返回缓存结果在onAfterGenerate中拦截 return { ...options, _cachedResult: cachedResult, _cacheKey: cacheKey, }; } console.log([${this.name}] 缓存未命中 for key: ${cacheKey.substring(0, 30)}...); // 将缓存键附加到选项上以便在生成后存储 return { ...options, _cacheKey: cacheKey, }; }, async onAfterGenerate(result: GenerateResult, options: GenerateOptions any): PromiseGenerateResult { // 如果请求命中了缓存直接返回缓存的结果 if (options._cachedResult) { return options._cachedResult; } // 如果这个请求被标记为跳过缓存或者没有缓存键直接返回 if (options._skipCache || !options._cacheKey) { return result; } // 只有成功的请求才存入缓存 if (result.output?.text) { const cacheEntry: CacheEntry { result: { ...result, _metadata: { ...result._metadata, cached: false, }, }, timestamp: Date.now(), metadata: { model: options.model || unknown, provider: options.provider, tokenUsage: (result.usage?.promptTokens || 0) (result.usage?.completionTokens || 0), }, }; llmResponseCache.set(options._cacheKey, cacheEntry); console.log([${this.name}] 已缓存结果 for key: ${options._cacheKey.substring(0, 30)}...); } return result; }, }; // --- 辅助函数 --- function shouldCacheThisRequest(options: GenerateOptions): boolean { // 策略1温度0的请求通常是确定性的适合缓存 if (options.temperature 0) { return true; } // 策略2某些特定模型或提供商的请求总是缓存可配置 const alwaysCacheModels [gpt-3.5-turbo-instruct, claude-instant-1.2]; if (options.model alwaysCacheModels.includes(options.model)) { return true; } // 策略3根据提示词前缀或路径判断例如所有以“系统提示翻译以下内容”开头的请求 const prompt options.input?.text || ; if (prompt.startsWith(Translate the following text to Chinese:)) { return true; } // 默认不缓存 return false; } function generateCacheKey(options: GenerateOptions): string { // 使用SHA-256哈希来生成一个固定长度、唯一的键避免过长的字符串作为Map键 const contentToHash JSON.stringify({ // 核心影响输出的参数 input: options.input, model: options.model, provider: options.provider, temperature: options.temperature, maxTokens: options.maxTokens, topP: options.topP, // 注意我们通常不把stream选项包含在内因为流式和非流式响应本质不同 // 但如果你也缓存流式响应的第一个chunk那可能需要包含。 // 排除非功能性字段如自定义的metadata或用于追踪的ID }); return createHash(sha256).update(contentToHash).digest(hex); }关键细节解析generateCacheKey函数是缓存层的灵魂。我们使用SHA-256哈希而不是直接将JSON字符串作为键有两个好处一是哈希值长度固定且较短更适合作为缓存键二是可以避免因JSON序列化时键的顺序不同导致本应相同的请求生成不同的键。shouldCacheThisRequest函数体现了策略模式你可以在这里实现非常复杂的业务逻辑来决定是否缓存例如根据用户等级、请求频率或内容类型来动态决定。3.5 实现路由中间件路由中间件是代理“智能”的体现。我们将实现一个基于规则引擎的路由器它比简单的if-else更灵活、更易维护。// src/middleware/router.ts import { type Middleware, type GenerateOptions } from juspay/neurolink; // 定义路由规则的类型 interface RoutingRule { name: string; // 条件函数返回true则应用此规则 condition: (options: GenerateOptions) boolean; // 动作函数修改请求选项如目标提供商、模型、参数 action: (options: GenerateOptions) GenerateOptions; // 优先级数字越大优先级越高 priority: number; } // 预定义的路由规则集 const routingRules: RoutingRule[] [ { name: 财务分析路由, condition: (opt) { const prompt opt.input?.text?.toLowerCase() || ; return prompt.includes(financial analysis) || prompt.includes(earnings report) || prompt.includes(投资建议); }, action: (opt) ({ ...opt, provider: openai, // 指定提供商 model: gpt-4, // 使用能力更强的模型 temperature: 0.3, // 降低随机性提高确定性 }), priority: 100, }, { name: 快速摘要路由, condition: (opt) { const prompt opt.input?.text || ; return prompt.length 10 prompt.length 300; // 短文本摘要 }, action: (opt) ({ ...opt, provider: googleAI, model: gemini-1.5-flash, // 使用更快、更便宜的模型 maxTokens: 150, // 限制输出长度 }), priority: 90, }, { name: 代码生成路由, condition: (opt) { const prompt opt.input?.text?.toLowerCase() || ; return prompt.includes(code) || prompt.includes(function) || prompt.includes(implement) || prompt.includes(python) || prompt.includes(javascript); }, action: (opt) ({ ...opt, provider: anthropic, model: claude-3-sonnet-20240229, // Claude在代码生成上表现优异 temperature: 0.1, // 代码需要高确定性 }), priority: 80, }, { name: 降级路由成本优先, condition: (opt) { // 这是一个“兜底”规则当没有其他规则匹配且用户没有明确指定模型时 // 我们选择一个成本较低的默认模型。 return !opt.model !opt.provider; }, action: (opt) ({ ...opt, provider: openai, model: gpt-3.5-turbo, }), priority: 10, // 低优先级作为默认选项 }, ]; export const routingMiddleware: Middleware { name: intelligent-router, async onBeforeGenerate(options: GenerateOptions): PromiseGenerateOptions { console.log([${this.name}] 处理原始请求: provider${options.provider}, model${options.model}); // 如果调用者已经明确指定了provider和model则尊重其选择跳过自动路由。 // 这为特殊请求提供了覆盖默认规则的能力。 if (options.provider options.model) { console.log([${this.name}] 请求已指定目标跳过自动路由。); return options; } // 按优先级降序排序规则 const sortedRules [...routingRules].sort((a, b) b.priority - a.priority); let finalOptions { ...options }; let appliedRuleName none; // 遍历规则应用第一个匹配的条件 for (const rule of sortedRules) { if (rule.condition(finalOptions)) { console.log([${this.name}] 应用规则: ${rule.name}); finalOptions rule.action(finalOptions); appliedRuleName rule.name; break; // 只应用优先级最高的第一个匹配规则 } } console.log([${this.name}] 路由决策完成。最终目标: provider${finalOptions.provider}, model${finalOptions.model} (规则: ${appliedRuleName})); return finalOptions; }, };这个路由中间件展示了一个可扩展的规则引擎。你可以轻松地通过修改routingRules数组来添加、删除或调整路由策略。例如你可以添加一个规则在检测到请求来自“VIP用户”时总是路由到性能最好的模型。规则引擎的设计使得策略管理变得清晰且易于测试。3.6 实现速率限制中间件速率限制是保护后端API和控制成本的闸门。我们将实现一个基于内存的令牌桶算法同时支持全局和用户级别的限流。// src/middleware/rateLimit.ts import { type Middleware, type GenerateOptions } from juspay/neurolink; // 简单的令牌桶实现 class TokenBucket { private tokens: number; private lastRefillTime: number; constructor( private capacity: number, // 桶容量 private refillRate: number, // 每秒补充的令牌数 ) { this.tokens capacity; this.lastRefillTime Date.now(); } private refill(): void { const now Date.now(); const timePassed (now - this.lastRefillTime) / 1000; // 转换为秒 const tokensToAdd timePassed * this.refillRate; this.tokens Math.min(this.capacity, this.tokens tokensToAdd); this.lastRefillTime now; } tryConsume(tokens: number 1): boolean { this.refill(); if (this.tokens tokens) { this.tokens - tokens; return true; } return false; } getCurrentTokens(): number { this.refill(); return this.tokens; } } // 限流器管理器 class RateLimiter { private globalBucket: TokenBucket; private userBuckets: Mapstring, TokenBucket; constructor( globalCapacity: number, globalRefillRate: number, private userCapacity: number, private userRefillRate: number, ) { this.globalBucket new TokenBucket(globalCapacity, globalRefillRate); this.userBuckets new Map(); } isAllowed(userId: string): { allowed: boolean; retryAfter?: number } { // 1. 检查全局限制 if (!this.globalBucket.tryConsume()) { // 计算全局桶需要多久才能有1个令牌 const globalTokensNeeded 1; const globalRefillTime globalTokensNeeded / (this.globalBucket as any).refillRate; return { allowed: false, retryAfter: Math.ceil(globalRefillTime * 1000) }; // 返回毫秒 } // 2. 检查用户限制 let userBucket this.userBuckets.get(userId); if (!userBucket) { userBucket new TokenBucket(this.userCapacity, this.userRefillRate); this.userBuckets.set(userId, userBucket); } if (!userBucket.tryConsume()) { // 计算用户桶需要多久才能有1个令牌 const userTokensNeeded 1; const userRefillTime userTokensNeeded / (userBucket as any).refillRate; return { allowed: false, retryAfter: Math.ceil(userRefillTime * 1000) }; } return { allowed: true }; } // 清理长时间不活跃的用户桶防止内存泄漏 cleanupInactiveUsers(maxInactiveTimeMs: number): void { const now Date.now(); // 这里需要一个机制来记录用户桶的最后访问时间为简化示例省略。 // 实际可以使用带TTL的Map或定期扫描。 } } // 从环境变量读取配置 const GLOBAL_CAPACITY parseInt(process.env.GLOBAL_RATE_LIMIT_MAX || 1000); const GLOBAL_REFILL_RATE GLOBAL_CAPACITY / 60; // 假设时间窗口是1分钟则每秒补充的令牌数 const USER_CAPACITY 100; // 每个用户每分钟100次 const USER_REFILL_RATE USER_CAPACITY / 60; const rateLimiter new RateLimiter(GLOBAL_CAPACITY, GLOBAL_REFILL_RATE, USER_CAPACITY, USER_REFILL_RATE); export const rateLimitMiddleware: Middleware { name: rate-limiter, async onBeforeGenerate(options: GenerateOptions): PromiseGenerateOptions { // 从请求上下文中获取用户ID这里简化处理实际应从认证信息中获取 const userId (options as any).headers?.[x-user-id] || anonymous; const { allowed, retryAfter } rateLimiter.isAllowed(userId); if (!allowed) { console.warn([${this.name}] 速率限制触发。用户: ${userId}, 建议重试时间: ${retryAfter}ms后); // 抛出一个特定的错误方便上层如HTTP服务器捕获并返回429状态码 const error new Error(Rate limit exceeded. Try again in ${Math.ceil((retryAfter || 0)/1000)} seconds.); (error as any).statusCode 429; (error as any).retryAfter retryAfter; throw error; } console.log([${this.name}] 请求通过速率限制检查。用户: ${userId}); // 可以将当前剩余令牌数作为元数据传递下去用于监控 return { ...options, _rateLimitInfo: { userId }, }; }, };算法解析我们使用了令牌桶算法。它的优点是能够允许一定程度的突发流量因为桶里有初始容量并且以恒定速率补充令牌比简单的固定窗口计数器更平滑。例如全局配置为capacity1000, refillRate1000/60意味着系统最多允许1分钟内处理1000个请求突发并且会以每秒约16.67个请求的速率持续补充处理能力。cleanupInactiveUsers方法提示了在生产环境中需要注意内存泄漏问题对于用户级别的限流需要定期清理不活跃用户的令牌桶。3.7 组装中间件与创建HTTP服务器最后我们将所有中间件组装起来并创建一个简单的Express服务器来暴露代理的HTTP接口。// src/index.ts import express from express; import { neurolink, registerMiddlewares } from ./services/neurolink; import { loggingMiddleware } from ./middleware/logging; import { cachingMiddleware } from ./middleware/cache; import { routingMiddleware } from ./middleware/router; import { rateLimitMiddleware } from ./middleware/rateLimit; // 假设我们还有一个安全清洗中间件 // import { sanitizationMiddleware } from ./middleware/sanitize; const app express(); const PORT process.env.PORT || 3000; // 中间件解析JSON请求体 app.use(express.json()); // 注册NeuroLink中间件注意顺序很重要 registerMiddlewares([ rateLimitMiddleware, // 第一层限流保护下游API loggingMiddleware, // 第二层日志记录所有经过限流后的请求 // sanitizationMiddleware, // 第三层安全清洗如有 routingMiddleware, // 第四层智能路由 cachingMiddleware, // 第五层缓存在路由之后因为路由可能改变目标模型 ]); // 定义代理的POST端点 app.post(/v1/chat/completions, async (req, res) { try { const { messages, model, temperature, max_tokens, stream, ...otherOptions } req.body; // 1. 将通用API格式转换为NeuroLink格式 // 注意这是一个简化转换实际可能需要处理更多字段 const prompt messages?.map(m ${m.role}: ${m.content}).join(\n) || ; const options { input: { text: prompt }, model: model, // 如果前端指定了模型路由中间件可能会覆盖它 temperature: temperature ?? 0.7, maxTokens: max_tokens ?? 1024, stream: stream ?? false, // 传递用户身份信息应从JWT等认证机制中获取 headers: { x-user-id: req.headers[x-user-id] || anonymous, }, ...otherOptions, }; // 2. 调用NeuroLink生成内容 console.log(代理收到请求开始调用AI...); const result await neurolink.generate(options); // 3. 将NeuroLink响应转换回通用格式 const completion { id: chatcmpl-${Date.now()}, object: chat.completion, created: Math.floor(Date.now() / 1000), model: options.model || unknown, choices: [ { index: 0, message: { role: assistant, content: result.output?.text || , }, finish_reason: stop, // 简化处理 }, ], usage: result.usage ? { prompt_tokens: result.usage.promptTokens, completion_tokens: result.usage.completionTokens, total_tokens: (result.usage.promptTokens || 0) (result.usage.completionTokens || 0), } : undefined, // 传递我们自定义的元数据如是否来自缓存 _proxy_metadata: result._metadata, }; // 4. 返回响应 res.status(200).json(completion); } catch (error: any) { console.error(代理处理请求时发生错误:, error); // 处理特定的错误类型 if (error.statusCode 429) { res.setHeader(Retry-After, error.retryAfter ? Math.ceil(error.retryAfter / 1000) : 60); return res.status(429).json({ error: { message: error.message, type: rate_limit_error, }, }); } // 其他错误 res.status(500).json({ error: { message: error.message || Internal server error, type: internal_error, }, }); } }); // 健康检查端点 app.get(/health, (req, res) { res.status(200).json({ status: ok, timestamp: new Date().toISOString() }); }); // 启动服务器 app.listen(PORT, () { console.log( AI代理服务器运行在 http://localhost:${PORT}); console.log( 可用端点: POST /v1/chat/completions); console.log(❤️ 健康检查: GET /health); });这个HTTP服务器模拟了OpenAI Chat Completions API的接口使得现有的、基于OpenAI SDK的客户端可以几乎无缝地切换到我们的代理上只需修改API的Base URL即可。中间件的注册顺序体现了处理流水线先限流保护系统再日志记录所有合法请求然后路由和缓存。4. 部署、测试与进阶优化完成代码编写只是第一步让代理稳定可靠地运行起来才是真正的挑战。4.1 部署考量进程管理使用pm2或systemd来管理Node.js进程确保服务崩溃后能自动重启。反向代理与负载均衡使用Nginx或Caddy作为反向代理处理SSL/TLS终止、静态文件服务和负载均衡。如果你有多个代理实例需要一个负载均衡器。配置管理将所有配置API密钥、限流阈值、缓存TTL外部化使用环境变量或配置中心如Consul。绝对不要将密钥硬编码在代码中。日志聚合将JSON格式的日志输出到标准输出stdout然后由Docker或Kubernetes的日志驱动收集并发送到如Elasticsearch、Loki或云服务商的日志服务中。监控告警为前面定义的指标QPS、延迟、错误率、Token消耗设置监控仪表盘和告警规则。当错误率升高或延迟异常时能及时收到通知。4.2 测试策略单元测试为每个中间件路由、缓存、限流编写独立的单元测试模拟输入并验证输出。集成测试测试整个HTTP API验证从请求到响应的完整流程包括错误处理如限流返回429。端到端测试模拟真实用户场景发送一系列请求验证代理的路由决策、缓存命中、成本计算等功能是否符合预期。负载测试使用k6或artillery等工具进行压力测试找出系统的瓶颈是Node.js事件循环、内存还是下游AI API的速率限制。4.3 进阶优化方向分布式缓存将内存缓存lru-cache替换为Redis。这允许多个代理实例共享缓存提高了缓存利用率和系统的一致性。动态配置实现一个管理后台允许在不重启服务的情况下动态更新路由规则、限流参数和缓存策略。A/B测试与影子模式将一部分流量复制影子流量发送到新的模型或路由策略上在不影响线上用户的情况下对比效果如成本、质量、延迟。回退与降级机制当首选模型调用失败或超时时中间件应能自动尝试备用模型甚至返回一个预设的、友好的降级响应保证核心功能的可用性。成本分析与预测基于历史使用数据构建简单的预测模型预估未来的API花费并设置预算告警。构建自己的AI代理是一个持续迭代的过程。从最初满足基本的路由和缓存需求到逐步增加监控、安全、动态配置等高级功能它会随着你的业务一起成长。通过这个项目你不仅获得了一个强大的技术组件更重要的是你建立起了对AI基础设施的深刻理解和掌控力这是在AI时代构建可靠应用不可或缺的能力。
http://www.gsyq.cn/news/1387729.html

相关文章:

  • 为什么说 2026 是“Agentic Workflow”爆发元年?生态工具链全景图
  • 为什么架构师越老越值钱?越陈越香的IT界茅台
  • Unity PC单exe封装实战:嵌入式资源方案详解
  • Unity打包安卓报错?手把手教你修改build.gradle解决资源冲突(附Gradle模板配置)
  • 终极指南:如何用开源分屏工具实现单机游戏多人同乐
  • Hakira平台实战:模块化低代码数据探索与自动化分析工作流搭建
  • 别再手动复制粘贴了!用Stata的logout和esttab,5分钟搞定论文标准表格
  • 别再写重复代码了!用这个Spine动画管理器搞定Unity中的角色动作切换与回调
  • 低碳物流网络设计与评价【附代码】
  • Flutter原理与混合栈开发深度解析
  • 项目一拖再拖、成本失控?企业破局关键在这!
  • 告别外设不足:用MCP2517FD给ESP32或树莓派Pico扩展CAN FD接口实战
  • 告别SD卡!手把手教你为EBAZ4205矿卡配置NAND启动的JFFS2根文件系统(Petalinux 2018.3)
  • SAP财务凭证替代避坑指南:从VF01销售发票到MIRO发票校验,AC_DOCUMENT BADI的字段映射与性能考量
  • 从二极管门到TTL/CMOS:聊聊数字IC设计里那些‘古老’却至关重要的工程权衡
  • 别再死记硬背公式了!用Multisim 14.0仿真文件,带你玩转20个经典运放电路
  • 从‘纹波’看本质:手把手教你诊断并优化VNA去嵌后的S参数测量结果
  • 2026年评价高的常熟工作服/苏州工作服品牌厂家推荐 - 行业平台推荐
  • 机器学习工程师必学的容器化实战:Docker与Kubernetes在ML部署中的深度应用
  • ARM SVE2指令集与BFloat16运算优化实践
  • 链路预测:白盒物理模型与黑盒机器学习模型的性能对比与选择指南
  • 2026年口碑好的堵水气囊/市政气囊/衡水充气芯膜气囊/封堵气囊主流厂家对比评测 - 品牌宣传支持者
  • 告别串口打印!用JScope的HSS模式实时图形化监控GD32变量(附Keil工程配置)
  • Promptfoo实战:构建可版本化、自动化的LLM输出质量评估体系
  • 2026年靠谱的山东大型微波烘干机/小型微波烘干机/微波烘干机厂家选择推荐 - 行业平台推荐
  • 观测不同模型在Taotoken平台上的响应速度与可用性状态
  • 交通流预测模型对比:从短期精准到长期稳健的选型指南
  • Claude Code用户如何通过Taotoken解决访问不稳定与Token不足困扰
  • Harness到底是未来,还是过渡
  • Unity GPU加速Boids群体仿真实战指南