Kafka 为什么Ignite Connector会抛出IgniteCheckedException来重新连接关联节点?

cbjzeqam  于 2023-05-21  发布在  Apache
关注(0)|答案(1)|浏览(143)

我的kafka-ignite连接器突然停止了,因为Ignite服务器重新启动了。当我重新启动连接器时,它会抛出关于特定数据的关联节点的相同错误。它不会获取特定分区的节点详细信息。这是我通过异常堆栈跟踪了解到的。请找到附件并帮助我。
谢谢

java.lang.IllegalStateException: Data streamer has been closed.
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.closedException(DataStreamerImpl.java:1102)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.lock(DataStreamerImpl.java:447)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:647)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:632)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:754)
    at org.apache.ignite.stream.kafka.connect.IgniteSinkTask.put(IgniteSinkTask.java:114)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to get affinity nodes [aff=AffinityInfo [affFunc=RendezvousAffinityFunction [parts=1024, mask=1023, exclNeighbors=false, exclNeighborsWarn=false, backupFilter=null, affinityBackupFilter=null], mapper=org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper@5c3c8bba, assignment=GridAffinityAssignment [topVer=AffinityTopologyVersion [topVer=13, minorTopVer=0], super=org.apache.ignite.internal.processors.affinity.GridAffinityAssignment@193], cacheObjCtx=org.apache.ignite.internal.processors.cache.CacheObjectContext@44a4e9d], key=UserKeyCacheObjectImpl [part=276, val=Person(name=123), hasValBytes=true]]
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.primary(GridAffinityProcessor.java:686)
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.affinityMap(GridAffinityProcessor.java:649)
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.keysToNodes(GridAffinityProcessor.java:382)
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.mapKeyToNode(GridAffinityProcessor.java:293)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.nodes(DataStreamerImpl.java:1124)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.load0(DataStreamerImpl.java:902)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.access$1500(DataStreamerImpl.java:132)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$1.run(DataStreamerImpl.java:997)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1024)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1012)
    at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:7085)
    at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:971)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
    ... 3 more
[ERROR] 2021-09-10 15:56:28,439 [task-thread-map_2-0] org.apache.kafka.connect.runtime.WorkerTask doRun - WorkerSinkTask{id=map_2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Data streamer has been closed.
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.closedException(DataStreamerImpl.java:1102)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.lock(DataStreamerImpl.java:447)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:647)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:632)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:754)
    at org.apache.ignite.stream.kafka.connect.IgniteSinkTask.put(IgniteSinkTask.java:114)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
    ... 10 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to get affinity nodes [aff=AffinityInfo [affFunc=RendezvousAffinityFunction [parts=1024, mask=1023, exclNeighbors=false, exclNeighborsWarn=false, backupFilter=null, affinityBackupFilter=null], mapper=org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper@5c3c8bba, assignment=GridAffinityAssignment [topVer=AffinityTopologyVersion [topVer=13, minorTopVer=0], super=org.apache.ignite.internal.processors.affinity.GridAffinityAssignment@193], cacheObjCtx=org.apache.ignite.internal.processors.cache.CacheObjectContext@44a4e9d], key=UserKeyCacheObjectImpl [part=276, val=Person(name=123), hasValBytes=true]]
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.primary(GridAffinityProcessor.java:686)
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.affinityMap(GridAffinityProcessor.java:649)
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.keysToNodes(GridAffinityProcessor.java:382)
    at org.apache.ignite.internal.processors.affinity.GridAffinityProcessor.mapKeyToNode(GridAffinityProcessor.java:293)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.nodes(DataStreamerImpl.java:1124)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.load0(DataStreamerImpl.java:902)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.access$1500(DataStreamerImpl.java:132)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$1.run(DataStreamerImpl.java:997)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1024)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5$2.call(DataStreamerImpl.java:1012)
    at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:7085)
    at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:971)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)

我的服务器配置为:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    
    <property name="workDirectory" value="/ignite/work"/>
    
    <property name="dataStorageConfiguration">
      <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
        <property name="defaultDataRegionConfiguration">
          <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
            <property name="persistenceEnabled" value="true"/>
          </bean>
        </property>
        <property name="walPath" value="/ignite/wal"/>
        <property name="walArchivePath" value="/ignite/walarchive"/>
      </bean>
    </property>
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <property name="name" value="myCache"/>
          <property name="cacheMode" value="PARTITIONED"/>
        </bean>
      </list>
    </property>
    <property name="discoverySpi">
      <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
        <property name="ipFinder">
          <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder">
            <constructor-arg>
              <bean class="org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration">
                <property name="namespace" value="myspacename"/>
                <property name="serviceName" value="myservicename"/>
              </bean>
            </constructor-arg>
          </bean>
        </property>
      </bean>
    </property>
  </bean>

我的连接器是一个胖客户端,使用XML配置文件,默认配置和发现spi。
另一个问题是,当我的Ignite pod由于内存已满而重新启动时,会抛出此错误?我可以改变它的行为,把数据放在持久性而不是内存中吗?或者什么是空闲的方式来解决这个问题,如果它是得到重新启动,由于这个问题?
先谢谢你了。

koaltpgm

koaltpgm1#

看起来Kafka连接器无法确定key的主节点
key=UserKeyCacheObjectImpl [part=276, val=Person(name=123), hasValBytes=true]
尝试确定哪个节点对应此键:

ignite.affinity(cacheName).mapKeyToNode(key);

相关问题