springboot:整合TDengine

x33g5p2x  于2022-04-15 转载在 Spring  
字(13.8k)|赞(0)|评价(0)|浏览(2463)

springboot:整合TDengine

环境准备

服务端(ubuntu 20.04):TDengine-server:2.4.0.5

客户端(windows 10):TDengine-client:2.4.0.5

依赖:taos-jdbcdriver:2.0.34

springboot:spring-boot.version>2.3.7.RELEASE

JDBC-JNI方式

准备

  1. Linux或Windows操作系统
  2. Java 1.8以上运行时环境
  3. TDengine-client(使用JDBC-JNI时必须,使用JDBC-RESTful时非必须)
    注意:在 Windows 环境开发时需要安装 TDengine 对应的 windows 客户端

依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.taosdata.jdbc</groupId>
            <artifactId>taos-jdbcdriver</artifactId>
            <version>2.0.34</version>
        </dependency>

        <!-- MySQL的JDBC数据库驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.34</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.17</version>
        </dependency>

        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper</artifactId>
            <version>4.1.0</version>
        </dependency>

        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.8.1</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

注意:这里taos-jdbcdriver的版本可以参考下面的参数

taos-jdbcdriver 版本TDengine 2.0.x.x 版本TDengine 2.2.x.x 版本TDengine 2.4.x.x 版本JDK 版本
2.0.38XX2.4.0.14 及以上1.8.x
2.0.37XX2.4.0.6 及以上1.8.x
2.0.36X2.2.2.11 及以上2.4.0.0 - 2.4.0.51.8.x
2.0.35X2.2.2.11 及以上2.3.0.0 - 2.4.0.51.8.x
2.0.33 - 2.0.342.0.3.0 及以上2.2.0.0 及以上2.4.0.0 - 2.4.0.51.8.x
2.0.31 - 2.0.322.1.3.0 - 2.1.7.7XX1.8.x
2.0.22 - 2.0.302.0.18.0 - 2.1.2.1XX1.8.x
2.0.12 - 2.0.212.0.8.0 - 2.0.17.4XX1.8.x
2.0.4 - 2.0.112.0.0.0 - 2.0.7.3XX1.8.x

实体类

@Data
public class Temperature {

    private Timestamp ts;
    private float temperature;
    private String location;
    private int tbIndex;
}

TDengine 类型对应Java类型

TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对应类型转换如下

TDengine DataTypeJDBCType (driver 版本 < 2.0.24)JDBCType (driver 版本 >= 2.0.24)
TIMESTAMPjava.lang.Longjava.sql.Timestamp
INTjava.lang.Integerjava.lang.Integer
BIGINTjava.lang.Longjava.lang.Long
FLOATjava.lang.Floatjava.lang.Float
DOUBLEjava.lang.Doublejava.lang.Double
SMALLINTjava.lang.Shortjava.lang.Short
TINYINTjava.lang.Bytejava.lang.Byte
BOOLjava.lang.Booleanjava.lang.Boolean
BINARYjava.lang.Stringbyte array
NCHARjava.lang.Stringjava.lang.String
JSON-java.lang.String

注意:JSON类型仅在tag中支持

Mapper

@Repository
@Mapper
public interface TemperatureMapper{

    @Update("CREATE TABLE if not exists temperature(ts timestamp, temperature float) tags(location nchar(64), tbIndex int)")
    int createSuperTable();

    @Update("create table #{tbName} using temperature tags( #{location}, #{tbindex})")
    int createTable(@Param("tbName") String tbName, @Param("location") String location, @Param("tbindex") int tbindex);

    @Update("drop table if exists temperature")
    void dropSuperTable();

    @Insert("insert into t${tbIndex}(ts, temperature) values(#{ts}, #{temperature})")
    int insertOne(Temperature one);

    @Select("select * from temperature where location = #{location}")
    List<Temperature> selectTemperatureByLocation(@Param("location") String location);

    @Select("select * from temperature")
    List<Temperature> selectAll();

    @Select("select count(*) from temperature where temperature = 0.5")
    int selectCount();

    @Update("create database if not exists test")
    void createDB();

    @Update("drop database if exists test")
    void dropDB();
}

配置类

@Configuration
@EnableTransactionManagement
@MapperScan(basePackages = {"com.yolo.springboottdengine.mapper"}, sqlSessionFactoryRef = "TDengineSqlSessionFactory")
public class TDengineConfiguration {

    @Bean(name = "TDengineDataSource")
    public DataSource tdengineDataSource() throws Exception {

        // 125  TDengine测试环境
        String taosHost = "127.0.0.1";
        String taosPort = "6030";
        String taosUsername = "root";
        String taosPassword = "root";
        String taosDB = "test";

        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.taosdata.jdbc.TSDBDriver");
        dataSource.setUrl("jdbc:TAOS://" + taosHost + ":" + taosPort + "/" + taosDB
               + "?charset=UTF-8&locale=zh_CN.UTF-8&timezone=UTC-8");

        dataSource.setPassword(taosPassword);
        dataSource.setUsername(taosUsername);
        dataSource.setInitialSize(5);
        dataSource.setMinIdle(10);
        dataSource.setMaxActive(100);
        dataSource.setMaxWait(30000);
        dataSource.setValidationQuery("select server_status()");
        return dataSource;
    }

    @Bean(name = "TDengineTransactionManager")
    public DataSourceTransactionManager tdengineTransactionManager() throws Exception {
        return new DataSourceTransactionManager(tdengineDataSource());
    }

    @Bean(name = "TDengineSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("TDengineDataSource") DataSource dataSource, PageHelper pageHelper) throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(dataSource);
        sessionFactory.setPlugins(pageHelper);
        return sessionFactory.getObject();
    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate(@Qualifier("TDengineSqlSessionFactory")SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

    @Bean
    public PageHelper pageHelper() {
        PageHelper pageHelper = new PageHelper();
        Properties p = new Properties();
        p.setProperty("offsetAsPageNum", "true");
        p.setProperty("rowBoundsWithCount", "true");
        p.setProperty("reasonable", "true");
        p.setProperty("dialect", "mysql");
        pageHelper.setProperties(p);
        return pageHelper;
    }

}

测试类

@SpringBootTest
@RunWith(SpringRunner.class)
public class TemperatureTest {

    private static final Random random = new Random(System.currentTimeMillis());
    private static final String[] locations = {"北京", "上海", "深圳", "广州", "杭州"};

    @Autowired
    private TemperatureMapper temperatureMapper;

    @Test
    public void createDatabase(){
        temperatureMapper.dropDB();
        temperatureMapper.createDB();
    }

    @Test
    public void init() {
        temperatureMapper.dropSuperTable();
        // create table temperature
        temperatureMapper.createSuperTable();
        // create table t_X using temperature
        for (int i = 0; i < 10; i++) {
            temperatureMapper.createTable("t" + i, locations[random.nextInt(locations.length)], i);
        }
        // insert into table
        int affectRows = 0;
        // insert 10 tables
        for (int i = 0; i < 10; i++) {
            // each table insert 5 rows
            for (int j = 0; j < 5; j++) {
                Temperature one = new Temperature();
                one.setTs(new Timestamp(System.currentTimeMillis()));
                one.setTemperature(random.nextFloat() * 50);
                one.setLocation("望京");
                one.setTbIndex(i);
                affectRows += temperatureMapper.insertOne(one);
            }
        }
        Assert.assertEquals(50, affectRows);
    }

    /**
     * 根据名称查询
     */
    @Test
    public void testSelectByLocation() {
        List<Temperature> temperatureList = temperatureMapper.selectTemperatureByLocation("广州");
        System.out.println(temperatureList);
    }

    /**
     * 查询所有
     */
    @Test
    public void testSelectAll() {
        List<Temperature> temperatures = temperatureMapper.selectAll();
        System.out.println(temperatures.size());
    }

    /**
     * 插入数据
     */
    @Test
    public void testInsert() {
        //时间一样的时候,数据不会发现改变   1604995200000
        Temperature one = new Temperature();
        one.setTs(new Timestamp(1604995222224L));
        one.setTemperature(1.2f);
        int i = temperatureMapper.insertOne(one);
        System.out.println(i);
    }

    /**
     * 查询数量
     */
    @Test
    public void testSelectCount() {
        int count = temperatureMapper.selectCount();
        System.out.println(count);
    }

    /**
     * 分页查询
     */
    @Test
    public void testPage() {
        //查询之前,设置当前页和当前页的数量
        PageHelper.startPage(1, 2);
        List<Temperature> temperatureList = temperatureMapper.selectAll();
        //把查询结果放入到pageInfo对象中
        PageInfo<Temperature> pageInfo = new PageInfo<>(temperatureList);
        long total = pageInfo.getTotal();
        int pageNum = pageInfo.getPageNum();
        List<Temperature> list = pageInfo.getList();
        System.out.println("总数:" + total);
        System.out.println("页数:" + pageNum);
        System.out.println(list);

    }
}

RESTful方式

实体类

public class Weather {
//    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
    private Timestamp ts;
    private Float temperature;
    private Float humidity;
    private String location;
    private String note;
    private int groupId;
    
    //省略构造方法和get/set方法
}

配置类

app.td.rest.url=http://127.0.0.1:6041/rest/sql
app.td.rest.basic=Basic cm9vd90
app.td.db=test

测试类

@SpringBootTest
@RunWith(SpringRunner.class)
public class WeatherTest {

    private static final Logger logger = LoggerFactory.getLogger(WeatherTest.class);

    @Value("${app.td.rest.url}")
    @NotBlank
    private String tdRestUrl;

    @Value("${app.td.rest.basic}")
    @NotBlank
    private String tdRestBasic;

    @Value("${app.td.db}")
    @NotBlank
    private String db;

    private final Random random = new Random(System.currentTimeMillis());
    private final String[] locations = {"北京", "上海", "广州", "深圳", "天津"};

    @Test
    public void createDB() {
        String sql = "create database if not exists test";
        String url = tdRestUrl;
        tdengineRestful(url, sql);
    }

    @Test
    public void dropDB() {
        String sql = "drop database if exists test";
        tdengineRestful2(tdRestUrl, sql);
    }

    @Test
    public void createSuperTable() {
        String sql = "create table if not exists test.weather (ts timestamp,temperature float,humidity float,note binary(64)) tags(location nchar(64), groupId int)";
        tdengineRestful2(tdRestUrl, sql);
    }

    @Test
    public void createTable() {
        String url = tdRestUrl + "/" + db;
        long ts = System.currentTimeMillis();
        long thirtySec = 1000 * 30;
        Weather weather = new Weather(new Timestamp(ts + (thirtySec)), 30 * random.nextFloat(), random.nextInt(100));
        weather.setLocation(locations[random.nextInt(locations.length)]);
        weather.setGroupId(1);
        weather.setNote("note-" + 1);
        //create table if not exists test.t#{groupId} using test.weather tags(#{location},#{groupId})
        StringBuilder sb = new StringBuilder();
        sb.append("create table if not exists test.t")
                .append(weather.getGroupId()).append(" ")
                .append("using test.weather tags(")
                .append("'").append(weather.getLocation()).append("'").append(",")
                .append(weather.getGroupId()).append(")");
        String s = sb.toString();

        tdengineRestful2(url, sb.toString());
    }

    @Test
    public void insertTable() {
        String url = tdRestUrl + "/" + db;
        long ts = System.currentTimeMillis();
        long thirtySec = 1000 * 30;

        for (int i = 0; i < 5; i++) {
            Weather weather = new Weather(new Timestamp(ts + (thirtySec * i)), 30 * random.nextFloat(), random.nextInt(100));
            weather.setLocation(locations[random.nextInt(locations.length)]);
            weather.setGroupId(1);
            weather.setNote("note-" + 1);
            //insert into test.t#{groupId} (ts, temperature, humidity, note)values (#{ts}, ${temperature}, ${humidity}, #{note})
            StringBuilder sb = new StringBuilder();
            sb.append("insert into test.t").append(weather.getGroupId()).append(" ")
                    .append("(ts, temperature, humidity, note)").append(" ")
                    .append("values (").append(weather.getTs().getTime()).append(",")
                    .append(weather.getTemperature()).append(",")
                    .append(weather.getHumidity()).append(",")
                    .append("'").append(weather.getNote()).append("'")
                    .append(")");
            String sql = sb.toString();
            tdengineRestful2(url,sql);
        }

    }

    @Test
    public void selectCount(){
        String url = tdRestUrl + "/" + db;
        String sql = "select count(*) from test.weather";
        TDengineRestfulInfo tDengineRestfulInfo = tdengineRestful2(url, sql);
        System.out.println(tDengineRestfulInfo);
    }

    @Test
    public void selectOne(){
        String url = tdRestUrl + "/" + db;
        String sql = "select * from test.weather where humidity = 13";
        TDengineRestfulInfo tDengineRestfulInfo = tdengineRestful2(url, sql);
        System.out.println(tDengineRestfulInfo);

    }

    @Test
    public void selectTbname(){
        String url = tdRestUrl + "/" + db;
        String sql = "select tbname from test.weather";
        TDengineRestfulInfo tDengineRestfulInfo = tdengineRestful2(url, sql);
        System.out.println(tDengineRestfulInfo);

    }

    @Test
    public void selectLastOne(){
        String url = tdRestUrl + "/" + db;
        String sql = "select last_row(*), location, groupid from test.weather";
        TDengineRestfulInfo tDengineRestfulInfo = tdengineRestful2(url, sql);
        System.out.println(tDengineRestfulInfo);
    }

    @Test
    public void selectAVG(){
        String url = tdRestUrl + "/" + db;
        String sql = "select avg(temperature), avg(humidity) from test.weather interval(1m)";
        TDengineRestfulInfo tDengineRestfulInfo = tdengineRestful2(url, sql);
        System.out.println(tDengineRestfulInfo);

    }

    @Test
    public void selectLimit(){
        String url = tdRestUrl + "/" + db;
        String sql = "select * from test.weather order by ts desc limit 3";
        TDengineRestfulInfo tDengineRestfulInfo = tdengineRestful2(url, sql);
        System.out.println(tDengineRestfulInfo);
    }


    public TDengineRestfulInfo tdengineRestful2(String url, String sql) {

        TDengineRestfulInfo tDengineRestfulInfo = null;

        // 获取默认配置 的OkHttpClient 对象
        OkHttpClient httpClient = new OkHttpClient.Builder().build();

        MediaType mediaType = okhttp3.MediaType.parse("application/json; charset=utf-8");
        RequestBody requestBody = RequestBody.create(mediaType, sql);

        Request request = new Request.Builder()
                .url(url)
                .addHeader("Authorization", tdRestBasic)
                .post(requestBody)
                .build();

        Response response = null;
        try {
            response = httpClient.newCall(request).execute();
            if (response.code() == HttpStatus.OK.value()) {
                if (response.body() != null) {
                    String s = response.body().string();
                    tDengineRestfulInfo = JSONUtil.toBean(s, TDengineRestfulInfo.class);
                }
            } else {
                logger.error("tdengineRestful 查询状态码异常,状态码是:" + response.code() + " ,异常消息是:" + response.message());
            }
        } catch (Exception e) {
            logger.error("tdengineRestful 查询出现错误:" + e);
        } finally {
            if (response != null) {
                response.close();
            }
        }
        return tDengineRestfulInfo;

    }
}

注意这里新增一条数据的时候ts,俩种方式,要注意引号
“ts”: 1626324781093
“ts”: “2021-07-19 14:53:01.093”

相关文章