我正在尝试从sqlserver读取一个表,并在读取时应用分区。在读取数据之前,我想得到lowerbound和upperbound的界限,如下所示。
boundsDF = spark.read.format('jdbc')
.option('url', 'url')
.option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')
.option('user', username)
.option('password', password)
.option('dbtable', f'(select min(updated_datetime) as mint, max(updated_datetime) as maxt from tablename)
.load()
我从boundsdf中提取了如下值:
maxdate = [x["maxt"] for x in boundsDF.rdd.collect()]
mindate = [x["mint"] for x in boundsDF.rdd.collect()]
这就是我在阅读时指定时间戳列的方式:
dataframe = spark.read.format('jdbc')
.option('url', url)
.option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')
.option('user', user)
.option('password', password)
.option('dbtable', tablename)
.option('partitionColumn', timestamp_column)
.option('numPartitions', 3)
.option('lowerBound', mindate[0])
.option('upperBound', maxdate[0])
.option('fetchsize', 5000)
.load()
如果我打印mindate和maxdate的值,下面是它们的样子:
mindate[0]: datetime.datetime(2010, 10, 4, 11, 54, 13, 543000)
maxdate[0]: datetime.datetime(2021, 3, 5, 17, 59, 45, 880000)
当我打印时 dataframe.count()
,我看到一条异常消息,如下所示。例外情况:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 18.0 failed 1 times, most recent failure: Lost task 2.0 in stage 18.0 (TID 21, executor driver): com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
自从我开始使用spark以来,我一直使用整数列作为分区列。这是我第一次使用timestamp列对数据进行分区。
mindate[0]和maxdate[0]的格式在我的read语句中是否正确?有人能告诉我我是否以正确的方式实现了代码吗?
1条答案
按热度按时间bpsygsoo1#
问题是在sql表中使用什么数据类型?
时间戳不是日期时间数据类型。它是一个内部行版本号(二进制),与时态数据无关
datetime是date+time的旧数据类型,第二个数据类型的小数点限制为3位
datetime2取代了datetime,是用于date+time的新数据类型,并且有一个限制,您可以选择0到7个小数位作为第二个小数位
现在说两句:
如果您使用timestamp,则将其替换为带有所需pr的datetime2é割礼(默认为7)。
如果您使用datetime并且不想将其替换为datetime2,则只能为第二位的小数部分指定3位数字,但我在代码中看到的是mindate[0]:datetime.datetime(2010,10,4,11,54,13,543000)
datetime2比datetime更精确,datetime限制为3毫秒,这会导致某些查询的解释错误