如何在jython evaluator中获取streamsets记录字段类型

czq61nw1  于 2021-06-24  发布在  Kudu
关注(0)|答案(2)|浏览(540)

我有一个streamsets管道,在这里我使用jdbc组件从远程sqlserver数据库读取数据,并将数据放入一个hive和一个kudu数据湖。
我在类型二进制列方面遇到了一些问题,因为impala中不支持二进制类型,我使用它来访问hive和kudu。
我决定将二进制类型的列(在管道中作为byte\u数组类型流动)转换为string并这样插入。
我尝试使用字段类型转换器元素将所有字节数组类型转换为字符串,但没有成功。所以我使用jython组件将所有arr.arr类型转换为字符串。它工作得很好,直到我在该字段上得到一个空值,所以jython类型是none.type,我无法检测byte\u数组类型,也无法将其转换为string。所以我不能把它插入Kudu。
如何在jython evaluator中获取streamsets记录字段类型有什么帮助吗?或者有什么建议来解决我面临的问题?

fjaof16o

fjaof16o1#

我的最终解决方案是:
您可以使用下面的逻辑来检测jython组件中的任何streamsets类型,方法是使用null\ u常量:

NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG, 
NULL_FLOAT, NULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL, 
NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP

其思想是将字段的值保存在一个temp变量中,将字段的值设置为none,并使用函数sdcfunctions.getfieldnull通过将其与一个null\u常量进行比较来了解streamsets类型。

import binascii

def toByteArrayToHexString(value):
  if value is None:
    return NULL_STRING
  value = '0x'+binascii.hexlify(value).upper()
  return value

for record in records:
  try:

    for colName,value in record.value.items():
      temp = record.value[colName]
      record.value[colName] = None
      if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_BYTE_ARRAY:
        temp = toByteArrayToHexString(temp)
      record.value[colName] = temp

    output.write(record)
  except Exception as e
    error.write(record, str(e))

限制:上面的代码仅当日期类型有值时(当其不为null时)才将其转换为日期时间类型

drkbr07n

drkbr07n2#

你需要使用 sdcFunctions.getFieldNull() 测试字段是否 NULL_BYTE_ARRAY . 例如:

import array

def convert(item):
  return ':-)'

def is_byte_array(record, k, v):
  # getFieldNull expect a field path, so we need to prepend the '/'
  return (sdcFunctions.getFieldNull(record, '/'+k) == NULL_BYTE_ARRAY 
          or (type(v) == array.array and v.typecode == 'b'))

for record in records:
  try:
    record.value = {k: convert(v) if is_byte_array(record, k, v) else v 
                    for k, v in record.value.items()}
    output.write(record)

  except Exception as e:
    error.write(record, str(e))

相关问题