本文整理了Java中org.rocksdb.RocksDB.write
方法的一些代码示例,展示了RocksDB.write
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RocksDB.write
方法的具体详情如下:
包路径:org.rocksdb.RocksDB
类名称:RocksDB
方法名:write
[英]Apply the specified updates to the database.
[中]将指定的更新应用于数据库。
代码示例来源:origin: Alluxio/alluxio
@Override
public void commit() {
try {
mDb.write(mDisableWAL, mBatch);
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
}
}
代码示例来源:origin: apache/flink
public void flush() throws RocksDBException {
if (options != null) {
db.write(options, batch);
} else {
// use the default WriteOptions, if wasn't provided.
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, batch);
}
}
batch.clear();
}
代码示例来源:origin: apache/storm
private void processBatchInsert(TreeMap<RocksDbKey, RocksDbValue> batchMap) throws MetricException {
try (WriteBatch writeBatch = new WriteBatch()) {
// take the batched metric data and write to the database
for (RocksDbKey k : batchMap.keySet()) {
RocksDbValue v = batchMap.get(k);
writeBatch.put(k.getRaw(), v.getRaw());
}
store.db.write(writeOpts, writeBatch);
} catch (Exception e) {
String message = "Failed to store data to RocksDB";
LOG.error(message, e);
throw new MetricException(message, e);
}
}
代码示例来源:origin: alibaba/jstorm
@Override
public void putBatch(Map<K, V> batch) throws IOException {
try {
WriteBatch writeBatch = new WriteBatch();
for (Map.Entry<K, V> entry : batch.entrySet()) {
writeBatch.put(
columnFamily,
serializer.serialize(entry.getKey()),
serializer.serialize(entry.getValue()));
}
rocksDb.write(new WriteOptions(), writeBatch);
} catch (RocksDBException e) {
throw new IOException(String.format("Failed to put batch=%s", batch), e);
}
}
代码示例来源:origin: ethereum/ethereumj
@Override
public void updateBatch(Map<byte[], byte[]> rows) {
resetDbLock.readLock().lock();
try {
if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.updateBatch(): " + name + ", " + rows.size());
try {
try (WriteBatch batch = new WriteBatch();
WriteOptions writeOptions = new WriteOptions()) {
for (Map.Entry<byte[], byte[]> entry : rows.entrySet()) {
if (entry.getValue() == null) {
batch.remove(entry.getKey());
} else {
batch.put(entry.getKey(), entry.getValue());
}
}
db.write(writeOptions, batch);
}
if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.updateBatch(): " + name + ", " + rows.size());
} catch (RocksDBException e) {
logger.error("Error in batch update on db '{}'", name, e);
hintOnTooManyOpenFiles(e);
throw new RuntimeException(e);
}
} finally {
resetDbLock.readLock().unlock();
}
}
代码示例来源:origin: alibaba/jstorm
@Override
public void putBatch(Map<String, Object> map) {
WriteOptions writeOpts = null;
WriteBatch writeBatch = null;
try {
writeOpts = new WriteOptions();
writeBatch = new WriteBatch();
for (Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
byte[] data = serialize(value);
if (StringUtils.isBlank(key) || data == null || data.length == 0) {
continue;
}
byte[] keyByte = key.getBytes();
writeBatch.put(keyByte, data);
}
db.write(writeOpts, writeBatch);
} catch (Exception e) {
LOG.error("Failed to putBatch into DB, " + map.keySet(), e);
} finally {
if (writeOpts != null) {
writeOpts.dispose();
}
if (writeBatch != null) {
writeBatch.dispose();
}
}
}
代码示例来源:origin: apache/storm
void deleteMetrics(FilterOptions filter) throws MetricException {
try (WriteBatch writeBatch = new WriteBatch();
WriteOptions writeOps = new WriteOptions()) {
scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> {
writeBatch.remove(key.getRaw());
return true;
});
if (writeBatch.count() > 0) {
LOG.info("Deleting {} metrics", writeBatch.count());
try {
db.write(writeOps, writeBatch);
} catch (Exception e) {
String message = "Failed delete metrics";
LOG.error(message, e);
if (this.failureMeter != null) {
this.failureMeter.mark();
}
throw new MetricException(message, e);
}
}
}
}
代码示例来源:origin: apache/flink
@Override
public void clear() {
try {
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {
final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
iterator.seek(keyPrefixBytes);
while (iterator.isValid()) {
byte[] keyBytes = iterator.key();
if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
writeBatch.remove(columnFamily, keyBytes);
} else {
break;
}
iterator.next();
}
backend.db.write(writeOptions, writeBatch);
}
} catch (Exception e) {
LOG.warn("Error while cleaning the state.", e);
}
}
代码示例来源:origin: apache/storm
LOG.info("Deleting {} metadata strings", writeBatch.count());
try {
db.write(writeOps, writeBatch);
} catch (Exception e) {
String message = "Failed delete metadata strings";
代码示例来源:origin: hugegraph/hugegraph
/**
* Commit all updates(put/delete) to DB
*/
@Override
public Integer commit() {
int count = this.batch.count();
if (count <= 0) {
return 0;
}
try {
rocksdb().write(this.writeOptions, this.batch);
} catch (RocksDBException e) {
//this.batch.rollbackToSavePoint();
throw new BackendException(e);
}
// Clear batch if write() successfully (retained if failed)
this.batch.clear();
return count;
}
代码示例来源:origin: org.apache.kafka/kafka-streams
void write(final WriteBatch batch) throws RocksDBException {
db.write(wOptions, batch);
}
代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server
@Override
public void flush() throws IOException {
try {
db.write(optionSync, writeBatch);
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
}
}
代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server
@Override
public void sync() throws IOException {
try {
db.write(optionSync, emptyBatch);
} catch (RocksDBException e) {
throw new IOException(e);
}
}
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb
public void flush() throws RocksDBException {
if (options != null) {
db.write(options, batch);
} else {
// use the default WriteOptions, if wasn't provided.
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, batch);
}
}
batch.clear();
}
代码示例来源:origin: org.apache.bookkeeper/statelib
private void executeBatch(WriteBatch batch) {
try {
db.write(writeOpts, batch);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException("Error while executing a multi operation from state store " + name, e);
}
}
代码示例来源:origin: criccomini/ezdb
@Override
public void flush() {
try {
db.write(writeOptions, writeBatch);
} catch (RocksDBException e) {
throw new DbException(e);
}
}
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.11
public void flush() throws RocksDBException {
if (options != null) {
db.write(options, batch);
} else {
// use the default WriteOptions, if wasn't provided.
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, batch);
}
}
batch.clear();
}
代码示例来源:origin: com.github.ddth/ddth-commons-core
/**
* See {@link RocksDB#write(WriteOptions, WriteBatch)}.
*
* @param writeOptions
* @param batch
* @throws RocksDbException
*/
public void write(WriteOptions writeOptions, WriteBatch batch) throws RocksDbException {
try {
rocksDb.write(writeOptions != null ? writeOptions : this.writeOptions, batch);
} catch (Exception e) {
throw e instanceof RocksDbException ? (RocksDbException) e : new RocksDbException(e);
}
}
代码示例来源:origin: locationtech/geogig
private void flush() {
lock.writeLock().lock();
try {
if (batch.count() >= WRITE_THRESHOLD) {
db().write(writeOptions, batch);
batch.clear();
}
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
lock.writeLock().unlock();
}
}
代码示例来源:origin: locationtech/geogig
@Override
public boolean put(ObjectId commitId, ImmutableList<ObjectId> parentIds) {
try (WriteBatchWithIndex batch = new WriteBatchWithIndex(); //
RocksDBReference dbRef = dbhandle.getReference();
WriteOptions wo = new WriteOptions()) {
wo.setSync(true);
boolean updated = put(dbRef, commitId, parentIds, batch);
dbRef.db().write(wo, batch);
return updated;
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
}
内容来源于网络,如有侵权,请联系作者删除!