我们在chroniclequeue(chronicle-queue-5.20.106,red hat linux release 6.10)中有一个streamcorruptedexception,我们在下面粘贴了stacktrace。在此期间,完全不同的进程执行了非常高的io/磁盘操作,我们认为这导致了队列暂停超过15秒,并导致了这种损坏。
即使在重新启动后,因为队列已损坏,无法启动。唯一的方法是删除并重新开始,这意味着丢失数百万的数据
请帮助解决方案或任何工作。谢谢
堆栈跟踪
2020-11-18 09:55:38,905536 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Couldn't acquire write lock after 15000 ms for the lock file:/local/data/metadata.cq4t, overriding the lock. Lock was held by me
2020-11-18 09:55:38,905795 [4b54debf-f9e2-4c70-9152-f05fe840bc92] [TableStoreWriteLock] (WARN) Forced unlock for the lock file:/local/data/metadata.cq4t, unlocked: true net.openhft.chronicle.core.StackTrace: Forced unlock on Reader STRESSTEST01
at net.openhft.chronicle.queue.impl.table.AbstractTSQueueLock.forceUnlockIfProcessIsDead(AbstractTSQueueLock.java:52)
at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:70)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:349)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:325)
然后
2020-11-18 09:55:42,364992 [] [ChronicleTxn] (ERROR) Error on commit java.lang.IllegalStateException: java.io.StreamCorruptedException: Data at 138604 overwritten? Expected: 0 was c3
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:842)
at net.openhft.chronicle.queue.impl.single.StoreAppender$StoreAppenderContext.close(StoreAppender.java:782)
重新启动时出错
java.lang.UnsupportedOperationException: Unknown_4
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.cantRead(BinaryWire.java:3648)
at net.openhft.chronicle.wire.BinaryWire$BinaryValueIn.bytes(BinaryWire.java:2591)
模拟测试类
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
public class SimulateStreamCorruptedException {
private static final int NO_OF_DOCUMENTS_TO_INSERT = 100_000;
private static final int NO_OF_THREADS = 50;
private String textToWrite = "This is a sample text to be written and value is ";
private String dbFolder = System.getProperty("dbFolder","/tmp/chroniclequeue");
private AtomicLong noOfDocuments = new AtomicLong();
public static void main(String[] args) throws InterruptedException {
SimulateStreamCorruptedException simulator = new SimulateStreamCorruptedException();
simulator.simulateError();
}
private void simulateError() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(NO_OF_THREADS);
ScheduledExecutorService preTouchScheduler = Executors.newScheduledThreadPool(1);
try(ChronicleQueue queue = getQueue()) {
preTouchScheduler.scheduleAtFixedRate(() -> queue.acquireAppender().pretouch(), 0, 1, TimeUnit.SECONDS);
IntStream.rangeClosed(1, NO_OF_THREADS).forEach(i -> startWriterThread(queue,i,latch));
latch.await();
} finally {
preTouchScheduler.shutdownNow();
}
}
private void startWriterThread(ChronicleQueue queue,int threadCount,CountDownLatch latch) {
Runnable task = () -> {
System.out.println("Starting the writing for Thread-"+threadCount);
IntStream.rangeClosed(1, NO_OF_DOCUMENTS_TO_INSERT).forEach(i -> {
try(DocumentContext dc = queue.acquireAppender().writingDocument()) {
String text = textToWrite+(threadCount+i);
dc.wire().write().bytes(text.getBytes());
simulatePause();
}
});
System.out.println("Completed the writing for Thread-"+threadCount);
latch.countDown();
};
new Thread(task).start();
}
private void simulatePause() {
if(noOfDocuments.incrementAndGet()%100==0) {
try {Thread.sleep(20*1000);}
catch (InterruptedException e) {e.printStackTrace();}
}
}
private ChronicleQueue getQueue() {
File folder = new File(dbFolder);
if(!folder.exists()) folder.mkdirs();
return ChronicleQueue.singleBuilder(folder)
.rollCycle(RollCycles.DAILY)
.strongAppenders(true)
.build();
}
}
1条答案
按热度按时间vlju58qv1#
如果您的应用程序可能会暂停15秒,那么在chronicle队列方面就没有可能的解决方案—您应该重新考虑您的软件的工作方式,因为chronicle的工具在开发时考虑到了超低延迟,而且我们只考虑微秒延迟,而不是秒延迟。
如果锁被强制解锁(这里就是这种情况),数据将被不可逆转地损坏。
不过,一种解决方法是增加超时时间-默认值为15000ms,但在创建队列时,可以通过使用builder#timeoutms()指定在您的环境中工作的内容来增加超时时间。