
x33g5p2x  于2022-01-28 转载在 其他  



[英]A RocksDB is a persistent ordered map from keys to values. It is safe for concurrent access from multiple threads without any external synchronization. All methods of this class could potentially throw RocksDBException, which indicates sth wrong at the RocksDB library side and the call failed.


代码示例来源:origin: alibaba/jstorm

  1. @SuppressWarnings("unused")
  2. public void initDb(List<Integer> list, Options dbOptions) throws Exception {
  3."Begin to init rocksDB of {}", rootDir);
  4. try {
  5. //List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<ColumnFamilyHandle>();
  6. db =, rootDir);
  7."Successfully init rocksDB of {}", rootDir);
  8. } finally {
  9. if (dbOptions != null) {
  10. dbOptions.dispose();
  11. }
  12. }
  13. }

代码示例来源:origin: alibaba/jstorm

  1. @Override
  2. public void cleanup() {
  3."Begin to close rocketDb of {}", rootDir);
  4. if (db != null) {
  5. db.close();
  6. }
  7."Successfully closed rocketDb of {}", rootDir);
  8. }

代码示例来源:origin: apache/flink

  1. @Override
  2. public boolean contains(UK userKey) throws IOException, RocksDBException {
  3. byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
  4. byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
  5. return (rawValueBytes != null);
  6. }

代码示例来源:origin: alibaba/jstorm

  1. public static RocksDB create(Map conf, String rocksDbDir, int ttlTimeSec) throws IOException {
  2. Options options = getOptions(conf);
  3. try {
  4. RocksDB rocksDb = ttlTimeSec > 0 ?, rocksDbDir, ttlTimeSec, false) :
  5., rocksDbDir);
  6."Finished loading RocksDB");
  7. // enable compaction
  8. rocksDb.compactRange();
  9. return rocksDb;
  10. } catch (RocksDBException e) {
  11. throw new IOException("Failed to initialize RocksDb.", e);
  12. }
  13. }

代码示例来源:origin: ethereum/ethereumj

  1. @Override
  2. public void put(byte[] key, byte[] val) {
  3. resetDbLock.readLock().lock();
  4. try {
  5. if (logger.isTraceEnabled()) logger.trace("~> RocksDbDataSource.put(): " + name + ", key: " + toHexString(key) + ", " + (val == null ? "null" : val.length));
  6. if (val != null) {
  7. db.put(key, val);
  8. } else {
  9. db.delete(key);
  10. }
  11. if (logger.isTraceEnabled()) logger.trace("<~ RocksDbDataSource.put(): " + name + ", key: " + toHexString(key) + ", " + (val == null ? "null" : val.length));
  12. } catch (RocksDBException e) {
  13. logger.error("Failed to put into db '{}'", name, e);
  14. hintOnTooManyOpenFiles(e);
  15. throw new RuntimeException(e);
  16. } finally {
  17. resetDbLock.readLock().unlock();
  18. }
  19. }

代码示例来源:origin: alibaba/jstorm

  1. public static RocksDB createWithColumnFamily(Map conf, String rocksDbDir, final Map<String, ColumnFamilyHandle> columnFamilyHandleMap, int ttlTimeSec) throws IOException {
  2. List<ColumnFamilyDescriptor> columnFamilyDescriptors = getExistingColumnFamilyDesc(conf, rocksDbDir);
  3. List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
  4. DBOptions dbOptions = getDBOptions(conf);
  5. try {
  6. RocksDB rocksDb = ttlTimeSec > 0 ?
  7. dbOptions, rocksDbDir, columnFamilyDescriptors, columnFamilyHandles, getTtlValues(ttlTimeSec, columnFamilyDescriptors), false) :
  8., rocksDbDir, columnFamilyDescriptors, columnFamilyHandles);
  9. int n = Math.min(columnFamilyDescriptors.size(), columnFamilyHandles.size());
  10. // skip default column
  11. columnFamilyHandleMap.put(DEFAULT_COLUMN_FAMILY, rocksDb.getDefaultColumnFamily());
  12. for (int i = 1; i < n; i++) {
  13. ColumnFamilyDescriptor descriptor = columnFamilyDescriptors.get(i);
  14. columnFamilyHandleMap.put(new String(descriptor.columnFamilyName()), columnFamilyHandles.get(i));
  15. }
  16."Finished loading RocksDB with existing column family={}, dbPath={}, ttlSec={}",
  17. columnFamilyHandleMap.keySet(), rocksDbDir, ttlTimeSec);
  18. // enable compaction
  19. rocksDb.compactRange();
  20. return rocksDb;
  21. } catch (RocksDBException e) {
  22. throw new IOException("Failed to initialize RocksDb.", e);
  23. }
  24. }

代码示例来源:origin: alibaba/jstorm

  1. @Override
  2. public Collection<K> listKeys() {
  3. Collection<K> keys = new ArrayList<>();
  4. RocksIterator itr = rocksDb.newIterator(columnFamily);
  5. itr.seekToFirst();
  6. while (itr.isValid()) {
  7. keys.add((K) serializer.deserialize(itr.key()));
  9. }
  10. return keys;
  11. }

代码示例来源:origin: Alluxio/alluxio

  1. @Override
  2. public List<BlockLocation> getLocations(long id) {
  3. RocksIterator iter =
  4. mDb.newIterator(mBlockLocationsColumn, new ReadOptions().setPrefixSameAsStart(true));
  6. List<BlockLocation> locations = new ArrayList<>();
  7. for (; iter.isValid(); {
  8. try {
  9. locations.add(BlockLocation.parseFrom(iter.value()));
  10. } catch (Exception e) {
  11. throw new RuntimeException(e);
  12. }
  13. }
  14. return locations;
  15. }

代码示例来源:origin: apache/storm

  1. RocksDB.loadLibrary();
  2. boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
  3. db =, path);
  4. } catch (RocksDBException e) {
  5. String message = "Error opening RockDB database";

代码示例来源:origin: alibaba/jstorm

  1. @Override
  2. public void put(String key, Object value) {
  3. byte[] data = serialize(value);
  4. try {
  5. db.put(key.getBytes(), data);
  6. } catch (Exception e) {
  7. LOG.error("Failed to put key into cache, " + key, e);
  8. }
  9. }

代码示例来源:origin: Alluxio/alluxio

  1. mBlockMetaColumn.close();
  2. mBlockLocationsColumn.close();
  3. mDb.close();
  4. } catch (Throwable t) {
  5. LOG.error("Failed to close previous rocks database", t);
  6. mDb =, mDbPath, cfDescriptors, columns);
  7. mDefaultColumn = columns.get(0);
  8. mBlockMetaColumn = columns.get(1);

代码示例来源:origin: Alluxio/alluxio

  1. @Override
  2. public boolean hasChildren(InodeDirectoryView inode) {
  3. RocksIterator iter = mDb.newIterator(mEdgesColumn, mReadPrefixSameAsStart);
  5. return iter.isValid();
  6. }

代码示例来源:origin: apache/flink

  1. setCurrentNamespace(source);
  2. final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
  3. final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
  4. backend.db.delete(columnFamily, writeOptions, sourceKey);
  5. final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
  6. backend.db.put(columnFamily, writeOptions, targetKey, dataOutputView.getCopyOfBuffer());

代码示例来源:origin: alibaba/jstorm

  1. familyOptions.setNumLevels(levelNum);
  2. familyOptions.setLevelZeroFileNumCompactionTrigger(compactionTriggerNum);
  3. List<byte[]> families = RocksDB.listColumnFamilies(options, dbPath);
  4. List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
  5. if (families != null) {
  6. db =, dbPath, columnFamilyDescriptors, columnFamilyHandles);
  7. } else {
  8. db =, dbPath);
  9. db.close();

代码示例来源:origin: apache/kylin

  1. public void build(ILookupTable srcLookupTable) {
  2. File dbFolder = new File(dbPath);
  3. if (dbFolder.exists()) {
  4."remove rocksdb folder:{} to rebuild table cache:{}", dbPath, tableDesc.getIdentity());
  5. FileUtils.deleteQuietly(dbFolder);
  6. } else {
  7."create new rocksdb folder:{} for table cache:{}", dbPath, tableDesc.getIdentity());
  8. dbFolder.mkdirs();
  9. }
  10."start to build lookup table:{} to rocks db:{}", tableDesc.getIdentity(), dbPath);
  11. try (RocksDB rocksDB =, dbPath)) {
  12. // todo use batch may improve write performance
  13. for (String[] row : srcLookupTable) {
  14. KV kv = encoder.encode(row);
  15. rocksDB.put(kv.getKey(), kv.getValue());
  16. }
  17. } catch (RocksDBException e) {
  18. logger.error("error when put data to rocksDB", e);
  19. throw new RuntimeException("error when write data to rocks db", e);
  20. }
  21."source table:{} has been written to rocks db:{}", tableDesc.getIdentity(), dbPath);
  22. }
  23. }

代码示例来源:origin: alibaba/jstorm

  1. handler2 = handlers.get(2);
  2. } else {
  3. handler1 = db.createColumnFamily(new ColumnFamilyDescriptor("test1".getBytes()));
  4. handler2 = db.createColumnFamily(new ColumnFamilyDescriptor("test2".getBytes()));
  5. db.compactRange();
  7. db.put(handler1, String.valueOf(i % 1000).getBytes(), String.valueOf(startValue1 + i).getBytes());
  8. db.put(handler2, String.valueOf(i % 1000).getBytes(), String.valueOf(startValue2 + i).getBytes());
  9. if (isFlush && flushWaitTime <= System.currentTimeMillis()) {
  10. db.flush(new FlushOptions());
  11. if (isCheckpoint) {
  12. cp.createCheckpoint(cpPath + "/" + i);

代码示例来源:origin: apache/flink

  1. @Override
  2. public void mergeNamespaces(N target, Collection<N> sources) {
  3. if (sources == null || sources.isEmpty()) {
  4. return;
  5. }
  6. try {
  7. // create the target full-binary-key
  8. setCurrentNamespace(target);
  9. final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
  10. // merge the sources to the target
  11. for (N source : sources) {
  12. if (source != null) {
  13. setCurrentNamespace(source);
  14. final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
  15. byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
  16. backend.db.delete(columnFamily, writeOptions, sourceKey);
  17. if (valueBytes != null) {
  18. backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes);
  19. }
  20. }
  21. }
  22. }
  23. catch (Exception e) {
  24. throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
  25. }
  26. }

代码示例来源:origin: Alluxio/alluxio

  1. @Override
  2. public Iterator<Block> iterator() {
  3. RocksIterator iter =
  4. mDb.newIterator(mBlockMetaColumn, new ReadOptions().setPrefixSameAsStart(true));
  5. iter.seekToFirst();
  6. return new Iterator<Block>() {
  7. @Override
  8. public boolean hasNext() {
  9. return iter.isValid();
  10. }
  11. @Override
  12. public Block next() {
  13. try {
  14. return new Block(Longs.fromByteArray(iter.key()), BlockMeta.parseFrom(iter.value()));
  15. } catch (Exception e) {
  16. throw new RuntimeException(e);
  17. } finally {
  19. }
  20. }
  21. };
  22. }

代码示例来源:origin: apache/flink

  1. static RocksIteratorWrapper getRocksIterator(
  2. RocksDB db,
  3. ColumnFamilyHandle columnFamilyHandle) {
  4. return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
  5. }

代码示例来源:origin: Alluxio/alluxio

  1. @Override
  2. public Iterable<Long> getChildIds(Long inodeId) {
  3. RocksIterator iter = mDb.newIterator(mEdgesColumn, mReadPrefixSameAsStart);
  5. return () -> new Iterator<Long>() {
  6. @Override
  7. public boolean hasNext() {
  8. return iter.isValid();
  9. }
  10. @Override
  11. public Long next() {
  12. try {
  13. return Longs.fromByteArray(iter.value());
  14. } catch (Exception e) {
  15. throw new RuntimeException(e);
  16. } finally {
  18. }
  19. }
  20. };
  21. }
