org.rocksdb.RocksDB.write()方法的使用及代码示例

x33g5p2x  于2022-01-28 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(255)

本文整理了Java中org.rocksdb.RocksDB.write方法的一些代码示例,展示了RocksDB.write的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RocksDB.write方法的具体详情如下:
包路径:org.rocksdb.RocksDB
类名称:RocksDB
方法名:write

RocksDB.write介绍

[英]Apply the specified updates to the database.
[中]将指定的更新应用于数据库。

代码示例

代码示例来源:origin: Alluxio/alluxio

  1. @Override
  2. public void commit() {
  3. try {
  4. mDb.write(mDisableWAL, mBatch);
  5. } catch (RocksDBException e) {
  6. throw new RuntimeException(e);
  7. }
  8. }
  9. }

代码示例来源:origin: apache/flink

  1. public void flush() throws RocksDBException {
  2. if (options != null) {
  3. db.write(options, batch);
  4. } else {
  5. // use the default WriteOptions, if wasn't provided.
  6. try (WriteOptions writeOptions = new WriteOptions()) {
  7. db.write(writeOptions, batch);
  8. }
  9. }
  10. batch.clear();
  11. }

代码示例来源:origin: apache/storm

  1. private void processBatchInsert(TreeMap<RocksDbKey, RocksDbValue> batchMap) throws MetricException {
  2. try (WriteBatch writeBatch = new WriteBatch()) {
  3. // take the batched metric data and write to the database
  4. for (RocksDbKey k : batchMap.keySet()) {
  5. RocksDbValue v = batchMap.get(k);
  6. writeBatch.put(k.getRaw(), v.getRaw());
  7. }
  8. store.db.write(writeOpts, writeBatch);
  9. } catch (Exception e) {
  10. String message = "Failed to store data to RocksDB";
  11. LOG.error(message, e);
  12. throw new MetricException(message, e);
  13. }
  14. }

代码示例来源:origin: alibaba/jstorm

  1. @Override
  2. public void putBatch(Map<K, V> batch) throws IOException {
  3. try {
  4. WriteBatch writeBatch = new WriteBatch();
  5. for (Map.Entry<K, V> entry : batch.entrySet()) {
  6. writeBatch.put(
  7. columnFamily,
  8. serializer.serialize(entry.getKey()),
  9. serializer.serialize(entry.getValue()));
  10. }
  11. rocksDb.write(new WriteOptions(), writeBatch);
  12. } catch (RocksDBException e) {
  13. throw new IOException(String.format("Failed to put batch=%s", batch), e);
  14. }
  15. }

代码示例来源:origin: ethereum/ethereumj

  1. @Override
  2. public void updateBatch(Map<byte[], byte[]> rows) {
  3. resetDbLock.readLock().lock();
  4. try {
  5. if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.updateBatch(): " + name + ", " + rows.size());
  6. try {
  7. try (WriteBatch batch = new WriteBatch();
  8. WriteOptions writeOptions = new WriteOptions()) {
  9. for (Map.Entry<byte[], byte[]> entry : rows.entrySet()) {
  10. if (entry.getValue() == null) {
  11. batch.remove(entry.getKey());
  12. } else {
  13. batch.put(entry.getKey(), entry.getValue());
  14. }
  15. }
  16. db.write(writeOptions, batch);
  17. }
  18. if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.updateBatch(): " + name + ", " + rows.size());
  19. } catch (RocksDBException e) {
  20. logger.error("Error in batch update on db '{}'", name, e);
  21. hintOnTooManyOpenFiles(e);
  22. throw new RuntimeException(e);
  23. }
  24. } finally {
  25. resetDbLock.readLock().unlock();
  26. }
  27. }

代码示例来源:origin: alibaba/jstorm

  1. @Override
  2. public void putBatch(Map<String, Object> map) {
  3. WriteOptions writeOpts = null;
  4. WriteBatch writeBatch = null;
  5. try {
  6. writeOpts = new WriteOptions();
  7. writeBatch = new WriteBatch();
  8. for (Entry<String, Object> entry : map.entrySet()) {
  9. String key = entry.getKey();
  10. Object value = entry.getValue();
  11. byte[] data = serialize(value);
  12. if (StringUtils.isBlank(key) || data == null || data.length == 0) {
  13. continue;
  14. }
  15. byte[] keyByte = key.getBytes();
  16. writeBatch.put(keyByte, data);
  17. }
  18. db.write(writeOpts, writeBatch);
  19. } catch (Exception e) {
  20. LOG.error("Failed to putBatch into DB, " + map.keySet(), e);
  21. } finally {
  22. if (writeOpts != null) {
  23. writeOpts.dispose();
  24. }
  25. if (writeBatch != null) {
  26. writeBatch.dispose();
  27. }
  28. }
  29. }

代码示例来源:origin: apache/storm

  1. void deleteMetrics(FilterOptions filter) throws MetricException {
  2. try (WriteBatch writeBatch = new WriteBatch();
  3. WriteOptions writeOps = new WriteOptions()) {
  4. scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> {
  5. writeBatch.remove(key.getRaw());
  6. return true;
  7. });
  8. if (writeBatch.count() > 0) {
  9. LOG.info("Deleting {} metrics", writeBatch.count());
  10. try {
  11. db.write(writeOps, writeBatch);
  12. } catch (Exception e) {
  13. String message = "Failed delete metrics";
  14. LOG.error(message, e);
  15. if (this.failureMeter != null) {
  16. this.failureMeter.mark();
  17. }
  18. throw new MetricException(message, e);
  19. }
  20. }
  21. }
  22. }

代码示例来源:origin: apache/flink

  1. @Override
  2. public void clear() {
  3. try {
  4. try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
  5. WriteBatch writeBatch = new WriteBatch(128)) {
  6. final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
  7. iterator.seek(keyPrefixBytes);
  8. while (iterator.isValid()) {
  9. byte[] keyBytes = iterator.key();
  10. if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
  11. writeBatch.remove(columnFamily, keyBytes);
  12. } else {
  13. break;
  14. }
  15. iterator.next();
  16. }
  17. backend.db.write(writeOptions, writeBatch);
  18. }
  19. } catch (Exception e) {
  20. LOG.warn("Error while cleaning the state.", e);
  21. }
  22. }

代码示例来源:origin: apache/storm

  1. LOG.info("Deleting {} metadata strings", writeBatch.count());
  2. try {
  3. db.write(writeOps, writeBatch);
  4. } catch (Exception e) {
  5. String message = "Failed delete metadata strings";

代码示例来源:origin: hugegraph/hugegraph

  1. /**
  2. * Commit all updates(put/delete) to DB
  3. */
  4. @Override
  5. public Integer commit() {
  6. int count = this.batch.count();
  7. if (count <= 0) {
  8. return 0;
  9. }
  10. try {
  11. rocksdb().write(this.writeOptions, this.batch);
  12. } catch (RocksDBException e) {
  13. //this.batch.rollbackToSavePoint();
  14. throw new BackendException(e);
  15. }
  16. // Clear batch if write() successfully (retained if failed)
  17. this.batch.clear();
  18. return count;
  19. }

代码示例来源:origin: org.apache.kafka/kafka-streams

  1. void write(final WriteBatch batch) throws RocksDBException {
  2. db.write(wOptions, batch);
  3. }

代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server

  1. @Override
  2. public void flush() throws IOException {
  3. try {
  4. db.write(optionSync, writeBatch);
  5. } catch (RocksDBException e) {
  6. throw new IOException("Failed to flush RocksDB batch", e);
  7. }
  8. }
  9. }

代码示例来源:origin: org.apache.bookkeeper/bookkeeper-server

  1. @Override
  2. public void sync() throws IOException {
  3. try {
  4. db.write(optionSync, emptyBatch);
  5. } catch (RocksDBException e) {
  6. throw new IOException(e);
  7. }
  8. }

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb

  1. public void flush() throws RocksDBException {
  2. if (options != null) {
  3. db.write(options, batch);
  4. } else {
  5. // use the default WriteOptions, if wasn't provided.
  6. try (WriteOptions writeOptions = new WriteOptions()) {
  7. db.write(writeOptions, batch);
  8. }
  9. }
  10. batch.clear();
  11. }

代码示例来源:origin: org.apache.bookkeeper/statelib

  1. private void executeBatch(WriteBatch batch) {
  2. try {
  3. db.write(writeOpts, batch);
  4. } catch (RocksDBException e) {
  5. throw new StateStoreRuntimeException("Error while executing a multi operation from state store " + name, e);
  6. }
  7. }

代码示例来源:origin: criccomini/ezdb

  1. @Override
  2. public void flush() {
  3. try {
  4. db.write(writeOptions, writeBatch);
  5. } catch (RocksDBException e) {
  6. throw new DbException(e);
  7. }
  8. }

代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.11

  1. public void flush() throws RocksDBException {
  2. if (options != null) {
  3. db.write(options, batch);
  4. } else {
  5. // use the default WriteOptions, if wasn't provided.
  6. try (WriteOptions writeOptions = new WriteOptions()) {
  7. db.write(writeOptions, batch);
  8. }
  9. }
  10. batch.clear();
  11. }

代码示例来源:origin: com.github.ddth/ddth-commons-core

  1. /**
  2. * See {@link RocksDB#write(WriteOptions, WriteBatch)}.
  3. *
  4. * @param writeOptions
  5. * @param batch
  6. * @throws RocksDbException
  7. */
  8. public void write(WriteOptions writeOptions, WriteBatch batch) throws RocksDbException {
  9. try {
  10. rocksDb.write(writeOptions != null ? writeOptions : this.writeOptions, batch);
  11. } catch (Exception e) {
  12. throw e instanceof RocksDbException ? (RocksDbException) e : new RocksDbException(e);
  13. }
  14. }

代码示例来源:origin: locationtech/geogig

  1. private void flush() {
  2. lock.writeLock().lock();
  3. try {
  4. if (batch.count() >= WRITE_THRESHOLD) {
  5. db().write(writeOptions, batch);
  6. batch.clear();
  7. }
  8. } catch (RocksDBException e) {
  9. throw new RuntimeException(e);
  10. } finally {
  11. lock.writeLock().unlock();
  12. }
  13. }

代码示例来源:origin: locationtech/geogig

  1. @Override
  2. public boolean put(ObjectId commitId, ImmutableList<ObjectId> parentIds) {
  3. try (WriteBatchWithIndex batch = new WriteBatchWithIndex(); //
  4. RocksDBReference dbRef = dbhandle.getReference();
  5. WriteOptions wo = new WriteOptions()) {
  6. wo.setSync(true);
  7. boolean updated = put(dbRef, commitId, parentIds, batch);
  8. dbRef.db().write(wo, batch);
  9. return updated;
  10. } catch (RocksDBException e) {
  11. throw new RuntimeException(e);
  12. }
  13. }

相关文章