camus的预期提交/回滚行为是什么?

bprjcwpo  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(333)

我们已经运行了大约一年的camus,成功地从kafka(版本0.82)中提取avro有效负载,并在hdfs中存储为.avro文件,只使用了几个kafka主题。最近,我们公司的一个新团队在我们的预生产环境中注册了大约60个新主题,并开始向这些主题发送数据。团队在将数据路由到kafka主题时犯了一些错误,这导致了当camus将这些主题的有效负载反序列化到avro时出错。由于超出“其他失败”错误阈值,camus作业失败。失败后在camus中产生的行为令人惊讶,我想与其他开发人员一起检查,看看我们观察到的行为是否符合预期,或者我们的实现是否存在一些问题。
当camus作业由于超过“failed other”阈值而失败时,我们注意到这种行为:1。所有Map器任务都成功了,因此允许taskattempt提交—这意味着由camus写入的所有数据都被复制到最终的hdfs位置。2camusjob在计算%错误率(这是在Map器提交之后)时引发异常,导致作业失败3。因为工作失败了(我想),Kafka的补偿没有进展
我们遇到的问题是,我们的gamus作业设置为每5分钟运行一次。因此,每5分钟我们就会看到数据被提交到hdfs,作业失败,并且kafka偏移量没有更新—这意味着我们会写入重复的数据,直到我们注意到磁盘已满。
我写了一个集成测试来确认结果——它向一个主题提交10条好记录,向同一个主题提交10条使用意外模式的记录,运行camus作业时只将该主题列为白名单,我们可以看到10条记录被写入hdfs,而kafka偏移量没有提前。下面是该测试的日志片段,以及我们在运行作业时使用的属性。
感谢您的帮助-我不确定这是否是camus的预期行为,或者我们的实现是否有问题,以及防止这种行为(复制数据)的最佳方法是什么。
谢谢~马特
测试的camusjob属性:

etl.destination.path=/user/camus/kafka/data
etl.execution.base.path=/user/camus/kafka/workspace
etl.execution.history.path=/user/camus/kafka/history
dfs.default.classpath.dir=/user/camus/kafka/libs

etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z
mapreduce.output.fileoutputformat.compress=false

mapred.map.tasks=15
kafka.max.pull.hrs=1
kafka.max.historical.days=3

kafka.whitelist.topics=advertising.edmunds.admax
log4j.configuration=true

kafka.client.name=camus
kafka.brokers=<kafka brokers>
max.decoder.exceptions.to.print=5
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=<schema repo url>
etl.run.tracking.post=false
kafka.monitor.time.granularity=10

etl.daily=daily
etl.ignore.schema.errors=false

etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=America/Los_Angeles
mapred.output.compress=false
mapred.map.max.attempts=2

来自测试的日志片段,显示Map程序成功后的提交行为以及由于超过“其他”阈值而导致的后续作业失败:

LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now

[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0

[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro

[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0
[LocalJobRunner] - map task executor complete.
[Job] -  map 100% reduce 0%
[Job] - Job job_local866350146_0001 completed successfully
[Job] - Counters: 23
File System Counters
FILE: Number of bytes read=117251
FILE: Number of bytes written=350942
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=10
Map output records=15
Input split bytes=793
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=13
Total committed heap usage (bytes)=251658240
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
DECODE_SUCCESSFUL=10
SKIPPED_OTHER=10
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=5907
total
data-read=840
decode-time(ms)=123
event-count=20
mapper-time(ms)=58
request-time(ms)=12114
skip-old=0
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read:    117251
[CamusJob] - FILE: Number of bytes written: 350942
[CamusJob] - FILE: Number of read operations:   0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations:  0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 10
[CamusJob] - Map output records:    15
[CamusJob] - Input split bytes: 793
[CamusJob] - Spilled Records:   0
[CamusJob] - Failed Shuffles:   0
[CamusJob] - Merged Map outputs:    0
[CamusJob] - GC time elapsed (ms):  13
[CamusJob] - Total committed heap usage (bytes):    251658240
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
[CamusJob] - DECODE_SUCCESSFUL: 10
[CamusJob] - SKIPPED_OTHER: 10
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%
u0sqgete

u0sqgete1#

我面临着一个非常类似的问题:我的kafka/camus管道已经运行了大约一年,但是最近我在集成来自远程代理的接收时遇到了重复问题,连接非常不稳定,并且经常出现作业失败。
今天在检查gobblin文档时,我意识到camus sweeper是一个工具,可能是我们正在寻找的。尝试将其集成到您的管道中。
我还认为,最好是在不久的将来迁移到戈布林(加缪的继任者)。

相关问题