我试着遵循以下指南:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 但我不明白为什么我大部分时间都不向控制台写入数据,为什么它会滥发执行线程日志?
我需要配置什么吗?这是我的密码:
SparkSession spark = SparkSession
.builder()
.appName("Testing")
.config("spark.master", "local")
.getOrCreate();
StructType recordSchema = new StructType()
.add("description", "string")
.add("location", "string")
.add("id", "string")
.add("title", "string")
.add("company", "string")
.add("place", "string")
.add("date", "string")
.add("senorityLevel", "string")
.add("function", "string")
.add("employmentType", "string")
.add("industries", "string");
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "linkedin-producer")
.option("startingOffsets", "earliest")
.option("kafka.group.id","test")
.load();
StreamingQuery query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value").cast("string"), recordSchema).as("data"))
.select("data.*")
.writeStream()
.outputMode(OutputMode.Append())
.format("console")
.start();
try {
query.awaitTermination(10000);
} catch (StreamingQueryException e) {
e.printStackTrace();
}
有时我会在控制台中获取df,但我的控制台中充满了这样的内容:
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1613492229792
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Subscribed to partition(s): linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 0 for partition linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-3, groupId=test] Cluster ID: N88wfukWTIS-ycMeSGhhng
[task-result-getter-0] INFO org.apache.spark.network.client.TransportClientFactory - Successfully created connection to /10.0.0.9:44237 after 76 ms (0 ms spent in bootstraps)
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 500 for partition linkedin-producer-0
[task-result-getter-0] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0 (TID 0) in 1069 ms on 10.0.0.9 (executor driver) (1/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_0 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 909 for partition linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Commit authorized for partition 1 (task 1, attempt 0, stage 0.0)
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 1 (task 1, attempt 0, stage 0.0)
[Executor task launch worker for task 1] INFO org.apache.spark.storage.memory.MemoryStore - Block taskresult_1 stored as bytes in memory (estimated size 2.9 MiB, free 845.5 MiB)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Added taskresult_1 in memory on 10.0.0.9:44237 (size: 2.9 MiB, free: 845.5 MiB)
[Executor task launch worker for task 1] INFO org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID 1). 3003495 bytes result sent via BlockManager)
[dispatcher-event-loop-1] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0 (TID 2, 10.0.0.9, executor driver, partition 2, PROCESS_LOCAL, 8103 bytes)
[Executor task launch worker for task 2] INFO org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
[task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0 (TID 1) in 304 ms on 10.0.0.9 (executor driver) (2/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_1 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [127.0.0.1:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-test-4
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1613492230087
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Subscribed to partition(s): linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 0 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-4, groupId=test] Cluster ID: N88wfukWTIS-ycMeSGhhng
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 500 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 905 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Commit authorized for partition 2 (task 2, attempt 0, stage 0.0)
[Executor task launch worker for task 2] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 2 (task 2, attempt 0, stage 0.0)
[Executor task launch worker for task 2] INFO org.apache.spark.storage.memory.MemoryStore - Block taskresult_2 stored as bytes in memory (estimated size 2.9 MiB, free 845.5 MiB)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Added taskresult_2 in memory on 10.0.0.9:44237 (size: 2.9 MiB, free: 845.5 MiB)
[Executor task launch worker for task 2] INFO org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID 2). 3001144 bytes result sent via BlockManager)
[task-result-getter-2] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0 (TID 2) in 240 ms on 10.0.0.9 (executor driver) (3/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_2 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[task-result-getter-2] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (start at Spark.java:73) finished in 1.730 s
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Killing all running tasks in stage 0: Stage finished
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: start at Spark.java:73, took 1.768779 s
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@52eea1c3 is committing.
-------------------------------------------
Batch: 0
-------------------------------------------
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 29.841333 ms
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 30.563541 ms
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
| description|location| id| title| company| place| date| senorityLevel| function|employmentType| industries|
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
|Job Summary We ar...| Israel|2406654159| Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...| Full-time|Marketing and Adv...|
|We're looking for...| Israel|2404180635| Personal Assistant| Lemonade|Tel Aviv, Tel Avi...|2021-01-07| Entry level| Administrative| Full-time|Marketing and Adv...|
|Job Summary We ar...| Israel|2398561147|Retail intern -12...|The Walt Disney C...| Tel Aviv, Israel|2021-02-10| Internship| Marketing| Full-time| Entertainment|
|We're looking for...| Israel|2404180635| Personal Assistant| Lemonade|Tel Aviv, Tel Avi...|2021-01-07| Entry level| Administrative| Full-time|Marketing and Adv...|
|We're looking for...| Israel|2404180635| Personal Assistant| Lemonade|Tel Aviv, Tel Avi...|2021-01-07| Entry level| Administrative| Full-time|Marketing and Adv...|
|Job Summary We ar...| Israel|2406654159| Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...| Full-time|Marketing and Adv...|
|Job Summary We ar...| Israel|2398561147|Retail intern -12...|The Walt Disney C...| Tel Aviv, Israel|2021-02-10| Internship| Marketing| Full-time| Entertainment|
|At CrowdStrike we...| Israel|2406653801| HR Generalist| CrowdStrike|Ramat Gan, Tel Av...|2021-02-11| Associate| Human Resources| Full-time|Information Techn...|
|Job Description W...| Israel|2406699205|HR Administrator ...| Akamai Technologies|Tel Aviv, Tel Avi...|2021-02-11| Not Applicable| Human Resources| Full-time|Computer Networki...|
|JOB PURPOSE To as...| Israel|2403563715|Research, Campaig...|Amnesty Internati...|Jerusalem Municip...|2021-02-09| Entry level| Research| Contract|Nonprofit Organiz...|
|Job Description A...| Israel|2383126490|Receptionist – Pa...| Ceragon Networks|Tel Aviv, Tel Avi...|2021-02-01| Not Applicable| Administrative| Full-time|Computer Networki...|
|Fiverr is looking...| Israel|2419715658| Data Analyst| About Fiverr| Tel Aviv, Israel|2021-02-11|Mid-Senior level|Information Techn...| Full-time| Internet|
|חברת AlfaCloud - ...| Israel|2400094107| Project Manager|AlfaCloud - ERP S...| Tel Aviv, Israel|2021-02-11| Entry level|Project Managemen...| Full-time| Computer Software|
|טדי הפקות מחפשת א...| Israel|2396568054| Booking Agent| Tedy Productions| Tel Aviv, Israel|2021-02-09| Entry level|Design, Art/Creat...| Full-time| |
|The Norman Tel Av...| Israel|2418149015| Front Desk Staff| The Norman Tel Aviv| Tel Aviv, Israel|2021-02-10| Entry level| Administrative| Full-time| Hospitality|
|Are you a stellar...| Israel|2405797088|Regional Operatio...| Wolt|Tel Aviv, Tel Avi...|2021-02-11| Director| Management| Full-time|Marketing and Adv...|
|About CXBuzz Inte...| Israel|2400078284| Journalism Intern| CXBuzz| Tel Aviv, Israel|2021-02-11| Internship| Education, Training| Internship| Publishing|
|Job Summary We ar...| Israel|2406654159| Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...| Full-time|Marketing and Adv...|
|Job Summary We ar...| Israel|2398561147|Retail intern -12...|The Walt Disney C...| Tel Aviv, Israel|2021-02-10| Internship| Marketing| Full-time| Entertainment|
|At CrowdStrike we...| Israel|2406653801| HR Generalist| CrowdStrike|Ramat Gan, Tel Av...|2021-02-11| Associate| Human Resources| Full-time|Information Techn...|
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
only showing top 20 rows
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@52eea1c3 committed.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Writing atomically to file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/0 using temp file file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/.0.0cdf78cd-795c-4c3c-94d1-91341e38187f.tmp
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Renamed temp file file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/.0.0cdf78cd-795c-4c3c-94d1-91341e38187f.tmp to file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Streaming query made progress: {
"id" : "9d193cbf-379e-495e-87e3-18f9f09145ea",
"runId" : "2e9f6d84-23af-4b23-89cd-73ecef66d290",
"name" : null,
"timestamp" : "2021-02-16T16:17:06.949Z",
"batchId" : 0,
"numInputRows" : 3813,
"processedRowsPerSecond" : 1035.5784899511136,
"durationMs" : {
"addBatch" : 2786,
"getBatch" : 22,
"latestOffset" : 446,
"queryPlanning" : 363,
"triggerExecution" : 3681,
"walCommit" : 23
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[linkedin-producer]]",
"startOffset" : null,
"endOffset" : {
"linkedin-producer" : {
"2" : 1269,
"1" : 1272,
"0" : 1272
}
},
"numInputRows" : 3813,
"processedRowsPerSecond" : 1035.5784899511136
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@793ec5d7",
"numOutputRows" : 3813
}
}
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-2 to position FetchPosition{offset=1269, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-1 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Streaming query made progress: {
"id" : "9d193cbf-379e-495e-87e3-18f9f09145ea",
"runId" : "2e9f6d84-23af-4b23-89cd-73ecef66d290",
"name" : null,
"timestamp" : "2021-02-16T16:17:10.664Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 3,
"triggerExecution" : 4
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[linkedin-producer]]",
"startOffset" : {
"linkedin-producer" : {
"2" : 1269,
"1" : 1272,
"0" : 1272
}
},
"endOffset" : {
"linkedin-producer" : {
"2" : 1269,
"1" : 1272,
"0" : 1272
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@793ec5d7",
"numOutputRows" : 0
}
}
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-2 to position FetchPosition{offset=1269, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-1 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
.
.
.
pom.xml文件:
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2条答案
按热度按时间yi0zb3m41#
据我所知,您只想运行作业10秒,因为您正在使用中的超时
awaitTermination
.为了避免您的工作不断检查Kafka中是否有新消息可用,您可以在中设置触发器
writeStream
:这只会每秒检查Kafka中的新消息,因此每秒只会看到一次“seeking”和“reseting”日志。
你也可能对
Trigger.Once()
另一种选择。tjjdgumg2#
您正在获取记录器信息,因为您已使用默认日志级别作为信息。将日志记录级别设置为警告
spark.sparkContext.setLogLevel("WARN")
.