如何使用Node.js Express流式传输AI.JSX响应

ee7vknir  于 11个月前  发布在  Node.js
关注(0)|答案(1)|浏览(161)

我正在使用Node.js和Express编写一个小型REST API,并使用AI.JSX和OpenAI API。AI.JSX文档提供了如何使用SSE在Next.js应用程序的上下文中流式传输响应的示例,并使用Vercel Edge Function:
https://docs.ai-jsx.com/tutorials/aijsxTutorials/part5-nextjs#the-aijsx-edge-function
上面使用了函数toStreamResponse,它创建/返回一个新的响应对象。因此,这种方法不与Express集成,Express希望您使用路由处理程序中现有的响应对象。我如何将这两种方法结合在一起?我已经尝试使用toTextStream函数:

routes.post("/chat-stream", async (req, res) => {
  const { system = "", user = "" } = req.body || {};

  if (!user) {
    res.sendStatus(400);
    return;
  }

  const RequestEl = getRequestEl(user, system);

  const stream = toTextStream(RequestEl);
  const reader = stream.getReader();

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("Cache-Control", "no-cache");
  res.flushHeaders();

  const pump = () => {
    reader
      .read()
      .then(({ value, done }) => {
        if (done) {
          res.end();
          return;
        }

        res.write(value);
        pump();
      })
      .catch((err) => {
        console.error(err);
        if (!res.headersSent) {
          res.status(500);
        }
        res.end();
      });
  };

  pump();
});

字符串
使用上面的代码,我可以在Chrome开发工具中看到一个响应流,但是客户端AI. JSON提供的React钩子并没有报告收到的任何数据。换句话说,这是:

const { error, current, fetchAI } = useAIStream({});


始终报告errorcurrent均为null。

gudnpqoy

gudnpqoy1#

最简单的方法是直接将toStreamResponse中的代码拉到我的Express应用程序中,并使用流管道(尽管我不确定为什么在管道调用中需要any):

import { Router } from "express";
import { toEventStream } from "ai-jsx/stream";
import { createRenderContext } from "ai-jsx";
import {
  ChatCompletion,
  SystemMessage,
  UserMessage,
} from "ai-jsx/core/completion";
import { JSX } from "ai-jsx/jsx-runtime";
import { pipeline } from "stream/promises";

const routes = Router();

const getRequestEl = (user: string = "", system: string = ""): JSX.Element => {
  return (
    <ChatCompletion>
      <SystemMessage>{system}</SystemMessage>
      <UserMessage>{user}</UserMessage>
    </ChatCompletion>
  );
};

routes.post("/chat-stream", async (req, res) => {
  const { system = "", user = "" } = req.body || {};

  if (!user) {
    res.sendStatus(400);
    return;
  }

  const RequestEl = getRequestEl(user, system);

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("Cache-Control", "no-cache");
  res.flushHeaders();

  const renderResult = createRenderContext().render(RequestEl, {
    stop: () => false,
    map: (x) => x,
  });

  const eventStream = toEventStream(renderResult)
    .pipeThrough(
      new TransformStream({
        transform(streamEvent, controller) {
          controller.enqueue(`data: ${JSON.stringify(streamEvent)}\n\n`);
        },
      })
    )
    .pipeThrough(new TextEncoderStream());

  pipeline(eventStream as any, res)
    .then(() => {
      res.end();
    })
    .catch((err) => {
      console.error(err);
      if (!res.headersSent) {
        res.sendStatus(500);
      }
      res.end();
    });
});

export default routes;

字符串

相关问题