Skip to content
This repository was archived by the owner on Jun 27, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rocksdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from ._rocksdb import *
from ._rocksdb import *
154 changes: 119 additions & 35 deletions rocksdb/_rocksdb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import cython
from libcpp.string cimport string
from libcpp.deque cimport deque
from libcpp.vector cimport vector
from libcpp.map cimport map as cpp_map
from cpython cimport bool as py_bool
from libcpp cimport bool as cpp_bool
from libc.stdint cimport uint32_t
Expand Down Expand Up @@ -38,7 +39,8 @@ from options cimport kCompactionStyleLevel
from options cimport kCompactionStyleUniversal
from options cimport kCompactionStyleFIFO
from options cimport kCompactionStyleNone

from options cimport ColumnFamilyOptions
from db cimport ColumnFamilyHandle, ColumnFamilyDescriptor
from slice_ cimport Slice
from status cimport Status

Expand All @@ -56,6 +58,9 @@ ctypedef const filter_policy.FilterPolicy ConstFilterPolicy
cdef extern from "cpp/utils.hpp" namespace "py_rocks":
cdef const Slice* vector_data(vector[Slice]&)

cdef extern from "rocksdb/db.h" namespace "rocksdb":
cdef string kDefaultColumnFamilyName

# Prepare python for threaded usage.
# Python callbacks (merge, comparator)
# could be executed in a rocksdb background thread (eg. compaction).
Expand Down Expand Up @@ -105,6 +110,13 @@ cdef Slice bytes_to_slice(ob) except *:
cdef slice_to_bytes(Slice sl):
return PyBytes_FromStringAndSize(sl.data(), sl.size())

cdef ColumnFamilyDescriptor column_name_to_cfd(string column):
cdef ColumnFamilyDescriptor cdf
cdf.name = column
cdef ColumnFamilyOptions opt
cdf.options = opt
return cdf

## only for filsystem paths
cdef string path_to_string(object path) except *:
if isinstance(path, bytes):
Expand Down Expand Up @@ -579,6 +591,7 @@ cdef class BlockBasedTableFactory(PyTableFactory):
block_size=None,
block_size_deviation=None,
block_restart_interval=None,
cache_index_and_filter_blocks=True,
whole_key_filtering=None):

cdef table_factory.BlockBasedTableOptions table_options
Expand Down Expand Up @@ -1406,38 +1419,47 @@ cdef class WriteBatchIterator(object):
self.pos += 1
return ret


@cython.no_gc_clear
cdef class DB(object):
cdef Options opts
cdef db.DB* db
cdef vector[ColumnFamilyHandle*]* cfh
cdef cpp_map[string, ColumnFamilyHandle*]* cfh_map

def __cinit__(self, db_name, Options opts, read_only=False):
def __cinit__(self, db_name, Options opts, cfs, read_only=False):
cdef Status st
cdef string db_path
self.db = NULL
self.opts = None

self.cfh = new vector[ColumnFamilyHandle*]()
if opts.in_use:
raise Exception("Options object is already used by another DB")

cdef vector[ColumnFamilyDescriptor] cf
for c in cfs:
cf.push_back(column_name_to_cfd(c))
db_path = path_to_string(db_name)
if read_only:
with nogil:
st = db.DB_OpenForReadOnly(
st = db.DB_OpenForReadOnly_CF(
deref(opts.opts),
db_path,
cf,
self.cfh,
cython.address(self.db),
False)
else:
with nogil:
st = db.DB_Open(
st = db.DB_Open_CF(
deref(opts.opts),
db_path,
cf,
self.cfh,
cython.address(self.db))
check_status(st)

# Inject the loggers into the python callbacks
cdef shared_ptr[logger.Logger] info_log = self.db.GetOptions().info_log
cdef shared_ptr[logger.Logger] info_log = self.db.GetOptions(self.cfh[0][0]).info_log
if opts.py_comparator is not None:
opts.py_comparator.set_info_log(info_log)

Expand All @@ -1449,49 +1471,91 @@ cdef class DB(object):

self.opts = opts
self.opts.in_use = True
self.cfh_map = new cpp_map[string, ColumnFamilyHandle*]()
for cfh1 in self.cfh[0]:
self.cfh_map[0][cfh1.GetName()] = cfh1

def close(self):
if not self.db == NULL:
with nogil:
for cf in self.cfh[0]:
self.db.DestroyColumnFamilyHandle(cf)
del self.cfh
del self.db
self.db = NULL
if self.opts is not None:
self.opts.in_use = False

def __dealloc__(self):
if not self.db == NULL:
with nogil:
for cf in self.cfh[0]:
self.db.DestroyColumnFamilyHandle(cf)
del self.cfh
del self.db

if self.opts is not None:
self.opts.in_use = False

def put(self, key, value, sync=False, disable_wal=False):
@staticmethod
cdef ColumnFamilyHandle* get_cfh(cpp_map[string, ColumnFamilyHandle*]* cfh_map, string column):
if cfh_map[0].count(column) <= 0:
return NULL
return cfh_map[0][column]

def create_column_family(self, columns):
cdef ColumnFamilyOptions opt
cdef vector[ColumnFamilyHandle*] cfhs
cdef vector[string] cf
for c in columns:
cf.push_back(c)
with nogil:
self.db.CreateColumnFamilies(opt, cf, &cfhs)
for cfh in cfhs:
self.db.DestroyColumnFamilyHandle(cfh)

def put(self, column, key, value, sync=False, disable_wal=False):
cdef Status st
cdef options.WriteOptions opts
opts.sync = sync
opts.disableWAL = disable_wal

cdef Slice c_key = bytes_to_slice(key)
cdef Slice c_value = bytes_to_slice(value)

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
with nogil:
st = self.db.Put(opts, c_key, c_value)
st = self.db.Put(opts, cfh, c_key, c_value)
check_status(st)

def delete(self, key, sync=False, disable_wal=False):
def delete(self, column, key, sync=False, disable_wal=False):
cdef Status st
cdef options.WriteOptions opts
opts.sync = sync
opts.disableWAL = disable_wal

cdef Slice c_key = bytes_to_slice(key)
cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
with nogil:
st = self.db.Delete(opts, c_key)
st = self.db.Delete(opts, cfh, c_key)
check_status(st)

def merge(self, key, value, sync=False, disable_wal=False):
def merge(self, column, key, value, sync=False, disable_wal=False):
cdef Status st
cdef options.WriteOptions opts
opts.sync = sync
opts.disableWAL = disable_wal

cdef Slice c_key = bytes_to_slice(key)
cdef Slice c_value = bytes_to_slice(value)
cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
with nogil:
st = self.db.Merge(opts, c_key, c_value)
st = self.db.Merge(opts, cfh, c_key, c_value)
check_status(st)

def write(self, WriteBatch batch, sync=False, disable_wal=False):
Expand All @@ -1504,16 +1568,18 @@ cdef class DB(object):
st = self.db.Write(opts, batch.batch)
check_status(st)

def get(self, key, *args, **kwargs):
def get(self, column, key, *args, **kwargs):
cdef string res
cdef Status st
cdef options.ReadOptions opts

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
cdef Slice c_key = bytes_to_slice(key)

with nogil:
st = self.db.Get(opts, c_key, cython.address(res))
st = self.db.Get(opts, cfh, c_key, cython.address(res))

if st.ok():
return string_to_bytes(res)
Expand All @@ -1522,13 +1588,16 @@ cdef class DB(object):
else:
check_status(st)

def multi_get(self, keys, *args, **kwargs):
def multi_get(self, columns, keys, *args, **kwargs):
cdef vector[string] values
values.resize(len(keys))

cdef vector[Slice] c_keys
for key in keys:
c_keys.push_back(bytes_to_slice(key))
cdef vector[ColumnFamilyHandle*] cfhs
for column in columns:
cfhs.push_back(DB.get_cfh(self.cfh_map, column))

cdef options.ReadOptions opts
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
Expand All @@ -1537,6 +1606,7 @@ cdef class DB(object):
with nogil:
res = self.db.MultiGet(
opts,
cfhs,
c_keys,
cython.address(values))

Expand All @@ -1551,14 +1621,16 @@ cdef class DB(object):

return ret_dict

def key_may_exist(self, key, fetch=False, *args, **kwargs):
def key_may_exist(self, column, key, fetch=False, *args, **kwargs):
cdef string value
cdef cpp_bool value_found
cdef cpp_bool exists
cdef options.ReadOptions opts
cdef Slice c_key
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
c_key = bytes_to_slice(key)
exists = False

Expand All @@ -1567,6 +1639,7 @@ cdef class DB(object):
with nogil:
exists = self.db.KeyMayExist(
opts,
cfh,
c_key,
cython.address(value),
cython.address(value_found))
Expand All @@ -1582,56 +1655,65 @@ cdef class DB(object):
with nogil:
exists = self.db.KeyMayExist(
opts,
cfh,
c_key,
cython.address(value))

return (exists, None)

def iterkeys(self, *args, **kwargs):
def iterkeys(self, column, *args, **kwargs):
cdef options.ReadOptions opts
cdef KeysIterator it

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
it = KeysIterator(self)

with nogil:
it.ptr = self.db.NewIterator(opts)
it.ptr = self.db.NewIterator(opts, cfh)
return it

def itervalues(self, *args, **kwargs):
def itervalues(self, column, *args, **kwargs):
cdef options.ReadOptions opts
cdef ValuesIterator it

opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
it = ValuesIterator(self)

with nogil:
it.ptr = self.db.NewIterator(opts)
it.ptr = self.db.NewIterator(opts, cfh)
return it

def iteritems(self, *args, **kwargs):
def iteritems(self, column, *args, **kwargs):
cdef options.ReadOptions opts
cdef ItemsIterator it

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))

it = ItemsIterator(self)

with nogil:
it.ptr = self.db.NewIterator(opts)
it.ptr = self.db.NewIterator(opts, cfh)
return it

def snapshot(self):
return Snapshot(self)

def get_property(self, prop):
def get_property(self, column, prop):
cdef string value
cdef Slice c_prop = bytes_to_slice(prop)
cdef cpp_bool ret = False

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
with nogil:
ret = self.db.GetProperty(c_prop, cython.address(value))
ret = self.db.GetProperty(cfh, c_prop, cython.address(value))

if ret:
return string_to_bytes(value)
Expand Down Expand Up @@ -1659,9 +1741,11 @@ cdef class DB(object):

return ret

def compact_range(self, begin=None, end=None, **py_options):
def compact_range(self, column, begin=None, end=None, **py_options):
cdef options.CompactRangeOptions c_options

cdef ColumnFamilyHandle* cfh = DB.get_cfh(self.cfh_map, column)
if cfh == NULL:
raise ValueError("column family {} doesn't exist".format(column))
c_options.change_level = py_options.get('change_level', False)
c_options.target_level = py_options.get('target_level', -1)

Expand Down Expand Up @@ -1693,7 +1777,7 @@ cdef class DB(object):
end_val = bytes_to_slice(end)
end_ptr = cython.address(end_val)

st = self.db.CompactRange(c_options, begin_ptr, end_ptr)
st = self.db.CompactRange(c_options, cfh, begin_ptr, end_ptr)
check_status(st)

@staticmethod
Expand Down
Loading