如何使用DolphinDB C++ API提供的MultiplendedTableWriter将CSV文件导入分区表?

xu3bshqb  于 2023-09-27  发布在  其他
关注(0)|答案(1)|浏览(126)

我有一个CSV文件,该文件的前10条记录如下:

time    device_id   battery_level   battery_status  battery_temperature bssid   cpu_avg_1min    cpu_avg_5min    cpu_avg_15min   mem_free    mem_used    rssi    ssid
2016.11.15T07:00:00 demo000029  48  discharging 90.3    01:02:03:04:05:06   7.42    7.004   7.1213  460,781,455 539,218,545 (58)    demo-net
2016.11.15T07:00:00 demo000034  31  discharging 88.9    01:02:03:04:05:06   5.36    7.152   7.6373  649,945,465 350,054,535 (58)    demo-net
2016.11.15T07:00:00 demo000037  75  discharging 90.7    A0:B1:C5:D2:E0:F3   7.26    6.572   6.644   420,956,932 579,043,068 (49)    stealth-net
2016.11.15T07:00:00 demo000059  78  discharging 89.9    A0:B1:C5:D2:E0:F3   24.35   9.91    7.69    559,660,292 440,339,708 (48)    stealth-net
2016.11.15T07:00:00 demo000063  47  discharging 89.8    22:32:A2:B3:05:98   30.23   13.166  10.5087 730,143,821 269,856,179 (41)    demo-5ghz
2016.11.15T07:00:00 demo000084  56  discharging 92.4    A0:B1:C5:D2:E0:F3   6.97    8.354   8.7713  490,555,023 509,444,977 (44)    stealth-net
2016.11.15T07:00:00 demo000087  85  discharging 88.5    22:32:A2:B3:05:98   28.13   10.186  7.382   669,735,198 330,264,802 (47)    demo-5ghz
2016.11.15T07:00:00 demo000096  92  discharging 93  A0:B1:C5:D2:E0:F3   28.28   11.416  8.792   569,078,358 430,921,642 (66)    stealth-net
2016.11.15T07:00:00 demo000099  53  discharging 87.9    22:32:A2:B3:05:98   8.98    8.196   8.252   449,422,871 550,577,129 (57)    demo-5ghz
2016.11.15T07:00:00 demo000101  72  discharging 92.3    A0:B1:C5:D2:E0:F3   96.74   26.228  14.6627 619,974,710 380,025,290 (57)    stealth-net

下面是创建数据库和表的脚本:

login(`admin, `123456)
if (exists('dfs://iot') ) dropDatabase('dfs://iot')

db1 = database('',VALUE,2016.11.15..2016.11.18)
db2 = database('',HASH,[SYMBOL,10])
db = database('dfs://iot',COMPO,[db1,db2])

schema=table(1:0,`time`device_id`battery_level`battery_status`battery_temperature`bssid`cpu_avg_1min`cpu_avg_5min`cpu_avg_15min`mem_free`mem_used`rssi`ssid,
 [DATETIME,SYMBOL,INT,SYMBOL,DOUBLE,SYMBOL,DOUBLE,DOUBLE,DOUBLE,LONG,LONG,SHORT,SYMBOL])
 db.createPartitionedTable(schema,`readings,`time`device_id)

我想使用DolphinDB C++ API提供的MultithreadedTableWriter将CSV文件导入分区表。我如何才能做到这一点?

rt4zxlrg

rt4zxlrg1#

您可以使用开源C++ CSV解析器库rapidCsv读取文件,然后使用MultithreadedTableWriter将数据写入分区表。下面是一个例子:

#include "MultithreadedTableWriter.h"
#include "DolphinDB.h"
#include "Util.h"
#include "rapidcsv.h"
#include <string>
#include <vector>
#include <thread>

using namespace std;
using namespace dolphindb;

Constant *createDateTime(string str) {
    int year, month, day, hour, minute, second;
    year = atoi(str.substr(0, 4).c_str());
    month = atoi(str.substr(5, 2).c_str());
    day = atoi(str.substr(8, 2).c_str());
    hour = atoi(str.substr(11, 2).c_str());
    minute = atoi(str.substr(14, 2).c_str());
    second = atoi(str.substr(17, 2).c_str());
    return Util::createDateTime(year, month, day, hour, minute, second);
}

int main(int argc, char *argv[]) {
    DBConnection::initialize();
    DBConnection conn;
    string host = "127.0.0.1";
    int port = 8848;
    string userId = "admin";
    string password = "123456";

    string path = "d:/data/devices_big/devices_big_readings.csv";

    try {
        vector<COMPRESS_METHOD> compress;
        compress.push_back(COMPRESS_DELTA);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_DELTA);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_DELTA);
        compress.push_back(COMPRESS_DELTA);
        compress.push_back(COMPRESS_LZ4);
        compress.push_back(COMPRESS_LZ4);

        MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://iot", "readings",
            false, false, NULL, 10000, 1, 10, "device_id", &compress);

        rapidcsv::Document doc(path, rapidcsv::LabelParams(-1, -1));
        std::vector<string> time = doc.GetColumn<string>(0);
        std::vector<string> device_id = doc.GetColumn<string>(1);
        std::vector<int> battery_level = doc.GetColumn<int>(2);
        std::vector<string> battery_status = doc.GetColumn<string>(3);
        std::vector<double> battery_temperature = doc.GetColumn<double>(4);
        std::vector<string> bssid = doc.GetColumn<string>(5);
        std::vector<double> cpu_avg_1min = doc.GetColumn<double>(6);
        std::vector<double> cpu_avg_5min = doc.GetColumn<double>(7);
        std::vector<double> cpu_avg_15min = doc.GetColumn<double>(8);
        std::vector<long long> mem_free = doc.GetColumn<long long>(9);
        std::vector<long long> mem_used = doc.GetColumn<long long>(10);
        std::vector<int> rssi = doc.GetColumn<int>(11);
        std::vector<string> ssid = doc.GetColumn<string>(12);

        int rowNum = time.size();

        ErrorCodeInfo errorInfo;

        long long startTime = Util::getEpochTime();
        
        for (int i = 0; i < rowNum; i++) {
            if (writer.insert(errorInfo, 
                createDateTime(time[i]),
                device_id[i],
                battery_level[i],
                battery_status[i],
                battery_temperature[i],
                bssid[i],
                cpu_avg_1min[i],
                cpu_avg_5min[i],
                cpu_avg_15min[i],
                mem_free[i],
                mem_used[i],
                (short)rssi[i],
                ssid[i]
                ) == false) {
                //This part will not be executed.
                cout << "insert failed: " << errorInfo.errorInfo << endl;
                break;
            }
                
        }
        // Check the current status of MTW
        MultithreadedTableWriter::Status status;
        writer.getStatus(status);
        if (status.hasError()) {
            cout << "error in writing: " << status.errorInfo << endl;
        }
        writer.waitForThreadCompletion();
        // Check whether the tasks have been completed
        writer.getStatus(status);
        if (status.hasError()) {
            cout << "error after write complete: " << status.errorInfo << endl;
            // Obtain unwritten data
            std::vector<std::vector<ConstantSP>*> unwrittenData;
            writer.getUnwrittenData(unwrittenData);
            cout << "unwriterdata length " << unwrittenData.size() << endl;
        }
                     
        cout << "Insert Time " << Util::getEpochTime() - startTime << " ms" << endl;
        //Check the final result
        cout << conn.run("select count(*) from pt")->getString() << endl;

    }
    catch (std::exception &e) {
        cerr << "Failed to insert table, with exception: " << e.what() << endl;
    }
}

相关问题