无法将Dataframe“dftransformed”加载到db表(ms sql)
import java.time.LocalDateTime
import java.util.TimeZone
import java.io.File
import com.typesafe.config._
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{BinaryType, DoubleType, LongType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.{col, concat_ws, from_json, posexplode_outer, schema_of_json}
import sun.nio.cs.StandardCharsets
import org.apache.commons.lang.StringUtils.{left, substring}
import org.xml.sax.SAXParseException
import org.apache.spark.sql.{SparkSession, _}
val loadType = "Load.Azure.Synapse"
val loadLocation = "UbiStaging.CEC"
val sc = SparkContext.getOrCreate()
val spark = SparkSession.builder().getOrCreate()
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
val environment = "Development"
var avroPath = ""
val containerPath = "..."
val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS")
var batchStartDate = LocalDateTime.now()
object Config {
val parsedConfig = ConfigFactory.parseFile(new File("./src/main/Resources/application.conf"))
//val parsedConfig = ConfigFactory.parseFile(new File("E:/Dev/sparkstream/src/main/Resources/application.conf"))
val conf = ConfigFactory.load(parsedConfig)
def apply() = conf.getConfig(environment)
}
avroPath = Config().getString("avrobasepath") + batchStartDate.getYear + "/" + f"${batchStartDate.getMonthValue}%02d" + "/" + f"${batchStartDate.getDayOfMonth}%02d" + "/" + f"${(batchStartDate.getHour)}%02d" + "/*/*"
val checkpointLocation = "C:\\SparkStreams\\CheckPoints\\" + avroPath.replace("*", "x").replace("/", "_").toLowerCase() + "_" + loadLocation.replace(".", "_").toLowerCase()
sc.hadoopConfiguration.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set(Config().getString("fsazureaccountname"), Config().getString("fsazureaccountkey"))
val avroSchema = new StructType().add("SequenceNumber", LongType, false).add("Offset", StringType, false).add("EnqueuedTimeUtc", StringType, false).add("SystemProperties", MapType(StringType, StructType(Array(StructField("member0", LongType, true), StructField("member1", DoubleType, true), StructField("member2", StringType, true), StructField("member3", BinaryType, true))), false), false).add("Properties", MapType(StringType, StructType(Array(StructField("member0", LongType, true), StructField("member1", DoubleType, true), StructField("member2", StringType, true), StructField("member3", BinaryType, true))), true), false).add("Body", BinaryType, true)
val dfRawStream = spark.readStream.schema(avroSchema).option("path", {Config().getString("containerpath") + avroPath}).format("avro").load().select(col("Offset").cast("string"), col("SequenceNumber").cast("string"), col("EnqueuedTimeUtc").cast("string"), col("Body").cast("string"))
//****Transform***
import com.google.common.net.HostAndPort
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, date_format, from_json, udf, posexplode, unbase64, lit}
import org.apache.spark.sql.types.{ArrayType, LongType, StringType, StructType}
val bodySchema = new StructType().
add("AppName", StringType, true).
add("ClientIP", StringType, true).
add("CommandInput", StringType, true).
add("CommandName", StringType, true).
add("CommandOutput", StringType, true).
add("Country", StringType, true).
add("EnableIDTransformation", StringType, true).
add("EnableSearchFiltering", StringType, true).
add("FromCachedData", StringType, true).
add("IsACSupported", StringType, true).
add("IsInternalOnly", StringType, true).
add("Misc", StringType, true).
add("OemId", StringType, true).
add("OperationName", ArrayType(StringType, true), true).
add("ProcessEndTime", StringType, true).
add("ProcessStartTime", StringType, true).
add("ProductCode", StringType, true).
add("Region", StringType, true).
add("ResultCode", LongType, true).
add("TransformLocalIDOnly", StringType, true).
add("UserName", StringType, true).
add("Version", StringType, true)
val commandInputSchema = new StructType().add("inputJson", StringType, true)
val inputJsonSchema = new StructType().add("FormatVersion", StringType, true).add("PredictiveOptions", StringType, true).add("Devices", ArrayType(StringType, true), true)
val devicesSchema = new StructType().add("FormatVersion", StringType, true).add("DeviceID", StringType, true).add("Signatures", StringType, true).add("DefiniteAttributes", StringType, true).add("PotentialAttributes", StringType, true).add("PredictiveOptions", StringType, true)
val signaturesSchema = new StructType().add("CEC", ArrayType(StringType, true), true)
val cecSchema = new StructType().add("vendorid", StringType, true).add("osd", StringType, true).add("logicaladdress", StringType, true)
val fnGetPort: (String => String) = (ipAddress: String) => {HostAndPort.fromString(ipAddress).getPortOrDefault(0).toString()}
val fnGetHost: (String => String) = (ipAddress: String) => {HostAndPort.fromString(ipAddress).getHostText()}
val sfnGetPort = udf(fnGetPort)
val sfnGetHost = udf(fnGetHost)
val dfBodyStream = dfRawStream.
select("Offset", "EnqueuedTimeUtc", "SequenceNumber", "Body").withColumn("a", from_json(col("Body"), bodySchema)).
select("a.CommandName", "a.CommandInput", "a.CommandOutput", "Offset", "a.ClientIP", "a.ProcessStartTime", "a.ProcessEndTime", "SequenceNumber", "EnqueuedTimeUtc").
filter(col("CommandName") === "PredictiveDiscovery")
val dfDevicesStream = dfBodyStream.
withColumn("a", from_json(col("CommandInput"), commandInputSchema)).
withColumn("inputJsonDecoded", unbase64(col("a.inputJson"))).
withColumn("inputJsonDecodedString", col("inputJsonDecoded").cast(StringType)).
withColumn("b", from_json(col("inputJsonDecodedString"), inputJsonSchema)).
select(col("Offset"), col("ClientIP"), col("ProcessStartTime"), col("ProcessEndTime"), col("SequenceNumber"), col("EnqueuedTimeUtc"), posexplode(col("b.Devices"))).
withColumn("c", from_json(col("col"), devicesSchema)).
select("Offset", "ClientIP", "ProcessStartTime", "ProcessEndTime", "SequenceNumber", "EnqueuedTimeUtc", "c.Signatures")
val dfSignaturesStream = dfDevicesStream.
withColumn("a", from_json(col("Signatures"), signaturesSchema)).
select(col("Offset"), col("ClientIP"), col("ProcessStartTime"), col("ProcessEndTime"), col("SequenceNumber"), col("EnqueuedTimeUtc"), posexplode(col("a.CEC"))).filter(col("col").isNotNull).
withColumn("CEC", col("col")).withColumn("SeqNbr", col("pos") + 1).
withColumn("CECDecoded", unbase64(col("col"))).
withColumn("CECDecodedString", col("CECDecoded").cast(StringType)).
withColumn("b", from_json(col("CECDecodedString"), cecSchema)).
withColumn("CECId", lit("hash_goes_here")).
withColumn("VendorIdParsed", lit("vendor_parsed_goes_here")).
withColumn("OSDParsed", lit("osd_parsed_goes_here")).
withColumn("HEX", lit("hex_goes_here")).
withColumn("OSD", col("b.osd")).
withColumn("VendorId", col("b.vendorid")).
withColumn("LogicalAddress", col("b.logicaladdress")).
select("Offset", "ProcessStartTime", "ProcessEndTime", "ClientIP", "SequenceNumber", "EnqueuedTimeUtc", "CECId", "CEC", "VendorId", "VendorIdParsed", "OSD", "OSDParsed", "HEX", "LogicalAddress", "SeqNbr")
val dfTransformed = dfSignaturesStream.
withColumn("IP", sfnGetHost(col("ClientIP"))).
withColumn("Port", sfnGetPort(col("ClientIP"))).
withColumn("ProcessStartDateUt", date_format(col("ProcessStartTime"), "yyyyMMdd").cast("integer")).
withColumn("ProcessStartTimeUt", date_format(col("ProcessStartTime"), "HH:mm:ss.S")).
withColumn("ProcessEndDateUt", date_format(col("ProcessEndTime"), "yyyyMMdd").cast("integer")).
withColumn("ProcessEndTimeUt", date_format(col("ProcessEndTime"), "HH:mm:ss.S")).
withColumn("EnqueuedTimeUt", col("EnqueuedTimeUtc")).as("a").
select("Offset", "a.ProcessStartDateUt", "a.ProcessStartTimeUt", "a.ProcessEndDateUt", "a.ProcessEndTimeUt", "a.IP", "a.Port", "CECId", "CEC", "VendorId", "VendorIdParsed", "LogicalAddress", "OSD", "OSDParsed", "HEX", "SeqNbr", "SequenceNumber", "EnqueuedTimeUt").na.fill("Unknown").na.fill(-1)
//.filter(col("Offset") === "4900106716867360")
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
val query = dfTransformed.coalesce(1).writeStream.queryName(checkpointLocation).option("checkpointLocation", checkpointLocation).outputMode("append").trigger(Trigger.ProcessingTime("2 minutes")).foreachBatch {
(batchDF: DataFrame, batchId: Long) => batchDF.write.format("com.microsoft.sqlserver.jdbc.spark").mode("append")
.option("batchsize", 1000000)
.option("mssqlIsolationLevel", "READ_UNCOMMITTED")
.option("url",Config().getString("databaseurl") + ";databaseName=" + Config().getString("databasename") + ";applicationName=" + checkpointLocation)
.option("dbtable", "[" + loadLocation.replace(".","].[") + "]").option("user", Config().getString("databaseusername"))
.option("password", Config().getString("databasepassword")).save()
}.start()
load命令::load./src/test/scala/file\u name.scala
进入sparkshell:sparkshell-packagesorg.apache。spark:spark-avro_2.12:3.0.1—包com.databricks:spark-xml_2.12:0.10.0--conf spark.dynamicallocation.enabled=true--conf spark.shuffle.service.enabled=true--conf spark.executor.memory=2g--conf spark.dynamicallocation.maxexecutors=1--jarsc:\bigdatalocalsetup\spark3.0\jars\hadoop-azure-3.3.0.jar,c:\bigdatalocalsetup\spark3.0\jars\azure-storage-8.6.jar,c:\bigdatalocalsetup\spark3.0\jars\jetty-util-ajax-9.4.20.v20190813.jar,c:\bigdatalocalsetup\spark3.0\jetty-util-9.4.20.v20190813.jar,c:\bigdatalocalsetup\spark3.0\jars\config-1.4.jar
ide:vs代码
暂无答案!
目前还没有任何答案,快来回答吧!