pojo类型到tuple数据类型

p5fdfcr1  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(423)

我在javaflinkapi上创建了一个小实用程序来学习这些功能。我正在尝试读取csv文件并打印它,我已经为数据结构开发了一个pojo类。当我执行代码时,我看不到正确的值。如何Map属性的数据类型
我的主要课程:

  1. package org.karthick.flinkLab;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import javax.xml.crypto.Data;
  5. public class CSVFileRead {
  6. public static void main(String[] args) throws Exception {
  7. System.out.println("--CSV File Reader using Flink's Data Set API--");
  8. ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
  9. DataSet<DataModel> csvInput = execEnv.readCsvFile("C:\\Flink\\Data\\IndividualDetails.csv")
  10. .pojoType(DataModel.class);
  11. csvInput.print();
  12. }
  13. }

我的pojo类(datamodel.class)

  1. package org.karthick.flinkLab;
  2. import org.apache.flink.api.java.tuple.Tuple;
  3. import org.apache.flink.api.java.tuple.Tuple12;
  4. import java.io.Serializable;
  5. import java.util.Date;
  6. public class DataModel<T extends Tuple>
  7. extends Tuple12<Integer,String,Date,Integer,String,String,String,String,String,String,Date,String>
  8. implements Serializable
  9. {
  10. public Integer id;
  11. public String government_id;
  12. public Date diagnosed_date;
  13. public Integer age;
  14. public String detected_city;
  15. public String detected_district;
  16. public String detected_state;
  17. public String nationality;
  18. public String current_status;
  19. public Date status_change_date;
  20. public String notes;
  21. public DataModel() {};
  22. public String getNotes() {
  23. return notes;
  24. }
  25. public Date getStatus_change_date() {
  26. return status_change_date;
  27. }
  28. public String getCurrent_status() {
  29. return current_status;
  30. }
  31. public String getNationality() {
  32. return nationality;
  33. }
  34. public String getDetected_state() {
  35. return detected_state;
  36. }
  37. public String getDetected_district() {
  38. return detected_district;
  39. }
  40. public String getDetected_city() {
  41. return detected_city;
  42. }
  43. public String gender ;
  44. public Date getDiagnosed_date() {
  45. return diagnosed_date;
  46. }
  47. public String getGender() {
  48. return gender;
  49. }
  50. public Integer getAge() {
  51. return age;
  52. }
  53. public Integer getId() {
  54. return id;
  55. }
  56. public void setId(Integer id) {
  57. this.id = id;
  58. }
  59. public String getGovernment_id() {
  60. return government_id;
  61. }
  62. public void setGovernment_id(String government_id) {
  63. this.government_id = government_id;
  64. }
  65. }

当我执行main方法时,我看不到正确的值。样品结果

  1. (0,,Tue May 19 16:50:38 IST 2020,0,,,,,,,Tue May 19 16:50:38 IST 2020,)

像我期望的那样

  1. (2777,AP,Tue May 19 16:50:38 IST 2020,0,A,B,C,D,E,F,Tue May 19 16:50:38 IST 2020,G)

这里可能少了什么?

b0zn9rqh

b0zn9rqh1#

缺少从csv到pojo的列Map。添加Map将起作用。列名的Map必须遵循以下两个规则:
列名应该与pojo中的列名完全相同。
Map中的列顺序应与csv文件中的列顺序完全相同。
可以按以下方式定义Map:

  1. DataSet<DataModel> csvInput = execEnv.readCsvFile("C:\\Flink\\Data\\IndividualDetails.csv")
  2. .pojoType(DataModel.class, "id", "age",.........);

它应该抛出错误,但它没有。可能是虫子

相关问题