在fast csv中异步调用存储过程

cbjzeqam  于 2021-06-23  发布在  Mysql
关注(0)|答案(2)|浏览(509)

我正在node.js中编写一个后端api,需要用户能够上传包含数据的文件,然后调用存储过程将数据插入mysql。我正在考虑使用fastcsv作为解析器,但是我正在努力解决如何在csv流中设置对存储过程的调用。这个想法是这样的:

  1. var fs = require("fs");
  2. var csv = require("fast-csv");
  3. var stream1 = fs.createReadStream("files/testCsvFile.csv");
  4. csv
  5. .fromStream(stream2, { headers: true })
  6. .on("data", function(data) {
  7. //CALL TO SP with params from "data"//
  8. numlines++;
  9. })
  10. .on("end", function() {
  11. console.log("done");
  12. });

在应用程序的其他部分,我设置了如下路线:

  1. auth.post("/verified", async (req, res) => {
  2. var user = req.session.passwordless;
  3. if (user) {
  4. const rawCredentials = await admin.raw(getUserRoleCredentials(user));
  5. const { user_end, role } = await normalizeCredentials(rawCredentials);
  6. const user_data = { user_end, role };
  7. res.send(user_data);
  8. } else {
  9. res.sendStatus(401);
  10. }
  11. });

..也就是说-路由是以异步/等待方式编写的,查询(都是被调用的存储过程)被定义为承诺。。我想在upload/parse csv/call sp for every line函数中遵循这种模式

whhtz7ly

whhtz7ly1#

这是为我做的工作-你能描述一下如何用你的框架实现它吗-我相信应该有办法,我只需要正确地配置它

  1. //use fast-csv to stream data from a file
  2. csv
  3. .fromPath(form.FileName, { headers: true })
  4. .on("data", async data => {
  5. const query = await queryBuilder({
  6. schema,
  7. routine,
  8. parameters,
  9. request
  10. }); //here we prepare query for calling the SP with parameters from data
  11. winston.info(query + JSON.stringify(data));
  12. const rawResponse = await session.raw(query); //here the query gets executed
  13. fileRows.push(data); // push each row - for testing only
  14. })
  15. .on("end", function() {
  16. console.log(fileRows);
  17. fs.unlinkSync(form.FileName); // remove temp file
  18. //process "fileRows" and respond
  19. res.end(JSON.stringify(fileRows)) // - for testing
  20. });
展开查看全部
2jcobegt

2jcobegt2#

正如在评论中提到的,我让我的超燃冲压发动机轻松处理这样一个用例。。。请纠正我,如果我理解错了,但我知道你想调用两个等待行为每个csv行在测试中。
如果是这样,您的代码将如下所示(更新以匹配您的注解/答案):

  1. var fs = require("fs");
  2. var csv = require("fast-csv");
  3. var stream1 = fs.createReadStream("files/testCsvFile.csv");
  4. var {DataStream} = require("scramjet");
  5. DataStream
  6. // the following line will convert any stream to scramjet.DataStream
  7. .from(csv.fromStream(stream2, { headers: true }))
  8. // the next lines controls how many simultaneous operations are made
  9. // I assumed 16, but if you're fine with 40 or you want 1 - go for it.
  10. .setOptions({maxParallel: 16})
  11. // the next line will call your async function and wait until it's completed
  12. // and control the back-pressure of the stream
  13. .do(async (data) => {
  14. const query = await queryBuilder({
  15. schema,
  16. routine,
  17. parameters,
  18. request
  19. }); //here we prepare query for calling the SP with parameters from data
  20. winston.info(query + JSON.stringify(data));
  21. const rawResponse = await session.raw(query); //here the query gets executed
  22. return data; // push each row - for testing only)
  23. })
  24. // next line will run the stream until end and return a promise
  25. .toArray()
  26. .then(fileRows => {
  27. console.log(fileRows);
  28. fs.unlinkSync(form.FileName); // remove temp file
  29. //process "fileRows" and respond
  30. res.end(JSON.stringify(fileRows)); // - for testing
  31. })
  32. .catch(e => {
  33. res.writeHead(500); // some error handling
  34. res.end(e.message);
  35. })
  36. ;
  37. // you may want to put an await statement before this, or call then to check
  38. // for errors, which I assume is your use case.
  39. ;

回答您的评论问题-如果您在 on("data") 事件-您需要创建一个promises数组并等待promise。所有这些数组都在流中 end -但这需要同步完成,所以事件处理程序中的异步函数不能做到这一点。
scramjet 这发生在引擎盖下,所以你可以使用这个功能。

展开查看全部

相关问题