diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h index 496e79fb00..eba918fac2 100644 --- a/cpp/fory/util/buffer.h +++ b/cpp/fory/util/buffer.h @@ -136,6 +136,12 @@ class Buffer { memcpy(data_ + offset, data, (size_t)length); } + FORY_ALWAYS_INLINE void PutInt24(uint32_t offset, int32_t value) { + data_[offset] = static_cast(value); + data_[offset + 1] = static_cast(value >> 8); + data_[offset + 2] = static_cast(value >> 16); + } + template FORY_ALWAYS_INLINE T Get(uint32_t relative_offset) { FORY_CHECK(relative_offset < size_) << "Out of range " << relative_offset << " should be less than " << size_; @@ -164,6 +170,15 @@ class Buffer { return Get(offset); } + FORY_ALWAYS_INLINE int32_t GetInt24(uint32_t offset) { + FORY_CHECK(offset + 3 <= size_) + << "Out of range " << offset << " should be less than " << size_; + int32_t b0 = data_[offset]; + int32_t b1 = data_[offset + 1]; + int32_t b2 = data_[offset + 2]; + return (b0 & 0xFF) | ((b1 & 0xFF) << 8) | ((b2 & 0xFF) << 16); + } + FORY_ALWAYS_INLINE int32_t GetInt32(uint32_t offset) { return Get(offset); } @@ -541,6 +556,14 @@ class Buffer { IncreaseWriterIndex(2); } + /// Write int24 value as fixed 3 bytes to buffer at current writer index. + /// Automatically grows buffer and advances writer index. + FORY_ALWAYS_INLINE void WriteInt24(int32_t value) { + Grow(3); + PutInt24(writer_index_, value); + IncreaseWriterIndex(3); + } + /// Write int32_t value as fixed 4 bytes to buffer at current writer index. /// Automatically grows buffer and advances writer index. FORY_ALWAYS_INLINE void WriteInt32(int32_t value) { @@ -713,6 +736,19 @@ class Buffer { return value; } + /// Read int24 value from buffer. Sets error on bounds violation. + FORY_ALWAYS_INLINE int32_t ReadInt24(Error &error) { + if (FORY_PREDICT_FALSE(reader_index_ + 3 > size_)) { + error.set_buffer_out_of_bound(reader_index_, 3, size_); + return 0; + } + int32_t b0 = data_[reader_index_]; + int32_t b1 = data_[reader_index_ + 1]; + int32_t b2 = data_[reader_index_ + 2]; + reader_index_ += 3; + return (b0 & 0xFF) | ((b1 & 0xFF) << 8) | ((b2 & 0xFF) << 16); + } + /// Read uint32_t value from buffer (fixed 4 bytes). Sets error on bounds /// violation. FORY_ALWAYS_INLINE uint32_t ReadUint32(Error &error) { diff --git a/python/pyfory/buffer.pxd b/python/pyfory/buffer.pxd index 16c87705e4..e2ab43986c 100644 --- a/python/pyfory/buffer.pxd +++ b/python/pyfory/buffer.pxd @@ -22,9 +22,9 @@ # cython: annotate = True from libc.stdint cimport * -from libcpp.memory cimport shared_ptr from libcpp cimport bool as c_bool -from pyfory.includes.libutil cimport CBuffer +from libcpp.memory cimport shared_ptr +from pyfory.includes.libutil cimport CBuffer, CError cdef class Buffer: @@ -32,19 +32,18 @@ cdef class Buffer: us to use it for calls into Python libraries without having to copy the data.""" cdef: - shared_ptr[CBuffer] c_buffer - CBuffer* c_buffer_ptr - uint8_t* _c_address - int32_t _c_size + CBuffer c_buffer + CError _error # hold python buffer reference count object data Py_ssize_t shape[1] Py_ssize_t stride[1] - public int32_t reader_index, writer_index @staticmethod cdef Buffer wrap(shared_ptr[CBuffer] c_buffer) + cdef void _raise_if_error(self) + cpdef inline check_bound(self, int32_t offset, int32_t length) cdef getitem(self, int64_t i) diff --git a/python/pyfory/buffer.pyx b/python/pyfory/buffer.pyx index 6794dc60cd..cccf976ea7 100644 --- a/python/pyfory/buffer.pyx +++ b/python/pyfory/buffer.pyx @@ -23,13 +23,18 @@ cimport cython from cpython cimport * from cpython.unicode cimport * -from libcpp.memory cimport shared_ptr, make_shared +from cpython.bytes cimport PyBytes_AsString, PyBytes_FromStringAndSize, PyBytes_AS_STRING +from libcpp.memory cimport shared_ptr +from libcpp.utility cimport move +from cython.operator cimport dereference as deref +from libcpp.string cimport string as c_string from libc.stdint cimport * from libcpp cimport bool as c_bool from pyfory.includes.libutil cimport( CBuffer, AllocateBuffer, GetBit, SetBit, ClearBit, SetBitTo, CError, CErrorCode, CResultVoidError, utf16HasSurrogatePairs ) import os +from pyfory.error import raise_fory_error cdef int32_t max_buffer_size = 2 ** 31 - 1 cdef int UTF16_LE = -1 @@ -37,6 +42,11 @@ cdef int UTF16_LE = -1 cdef c_bool _WINDOWS = os.name == 'nt' +@cython.final +cdef class _SharedBufferOwner: + cdef shared_ptr[CBuffer] buffer + + @cython.final cdef class Buffer: def __init__(self, data not None, int32_t offset=0, length=None): @@ -49,196 +59,191 @@ cdef class Buffer: length_ = length if offset < 0 or offset + length_ > buffer_len: raise ValueError(f'Wrong offset {offset} or length {length} for buffer with size {buffer_len}') + cdef uint8_t* address if length_ > 0: - self._c_address = get_address(data) + offset + address = get_address(data) + offset else: - self._c_address = NULL - self._c_size = length_ - self.c_buffer = make_shared[CBuffer](self._c_address, length_, False) - self.c_buffer_ptr = self.c_buffer.get() - # hold c_address directly to avoid pointer indirect cost. - self.reader_index = 0 - self.writer_index = 0 + address = NULL + self.c_buffer = CBuffer(address, length_, False) + self.c_buffer.ReaderIndex(0) + self.c_buffer.WriterIndex(0) @staticmethod cdef Buffer wrap(shared_ptr[CBuffer] c_buffer): cdef Buffer buffer = Buffer.__new__(Buffer) - buffer.c_buffer = c_buffer - buffer.c_buffer_ptr = c_buffer.get() - buffer._c_address = buffer.c_buffer_ptr.data() - buffer._c_size = buffer.c_buffer_ptr.size() - buffer.reader_index = 0 - buffer.writer_index = 0 + cdef CBuffer* ptr = c_buffer.get() + buffer.c_buffer = CBuffer(ptr.data(), ptr.size(), False) + cdef _SharedBufferOwner owner = _SharedBufferOwner.__new__(_SharedBufferOwner) + owner.buffer = c_buffer + buffer.data = owner + buffer.c_buffer.ReaderIndex(0) + buffer.c_buffer.WriterIndex(0) return buffer @classmethod def allocate(cls, int32_t size): - cdef shared_ptr[CBuffer] buf - if not AllocateBuffer(size, &buf): + cdef CBuffer* buf = AllocateBuffer(size) + if buf == NULL: raise MemoryError("out of memory") - return Buffer.wrap(buf) + cdef Buffer buffer = Buffer.__new__(Buffer) + buffer.c_buffer = move(deref(buf)) + del buf + buffer.data = None + buffer.c_buffer.ReaderIndex(0) + buffer.c_buffer.WriterIndex(0) + return buffer + + cdef inline void _raise_if_error(self): + cdef CErrorCode code + cdef c_string message + if not self._error.ok(): + code = self._error.code() + message = self._error.message() + self._error.reset() + raise_fory_error(code, message) + + property reader_index: + def __get__(self): + return self.c_buffer.reader_index() + + def __set__(self, int32_t value): + if value < 0: + raise ValueError("reader_index must be >= 0") + self.c_buffer.ReaderIndex(value) + + property writer_index: + def __get__(self): + return self.c_buffer.writer_index() + + def __set__(self, int32_t value): + if value < 0: + raise ValueError("writer_index must be >= 0") + self.c_buffer.WriterIndex(value) cpdef c_bool own_data(self): - return self.c_buffer_ptr.own_data() + return self.c_buffer.own_data() cpdef inline reserve(self, int32_t new_size): assert 0 < new_size < max_buffer_size - self.c_buffer_ptr.Reserve(new_size) - self._c_address = self.c_buffer_ptr.data() - self._c_size = self.c_buffer_ptr.size() + self.c_buffer.Reserve(new_size) cpdef inline put_bool(self, uint32_t offset, c_bool v): self.check_bound(offset, 1) - self.c_buffer_ptr.UnsafePutByte(offset, v) + self.c_buffer.UnsafePutByte(offset, v) cpdef inline put_uint8(self, uint32_t offset, uint8_t v): self.check_bound(offset, 1) - self.c_buffer_ptr.UnsafePutByte(offset, v) + self.c_buffer.UnsafePutByte(offset, v) cpdef inline put_int8(self, uint32_t offset, int8_t v): self.check_bound(offset, 1) - self.c_buffer_ptr.UnsafePutByte(offset, v) + self.c_buffer.UnsafePutByte(offset, v) cpdef inline put_int16(self, uint32_t offset, int16_t v): self.check_bound(offset, 2) - self.c_buffer_ptr.UnsafePut(offset, v) + self.c_buffer.UnsafePut(offset, v) cpdef inline put_int24(self, uint32_t offset, int32_t v): self.check_bound(offset, 3) - cdef uint8_t* arr = self._c_address + offset - arr[0] = v - arr[1] = (v >> 8) - arr[2] = (v >> 16) + self.c_buffer.PutInt24(offset, v) cpdef inline put_int32(self, uint32_t offset, int32_t v): self.check_bound(offset, 4) - self.c_buffer_ptr.UnsafePut(offset, v) + self.c_buffer.UnsafePut(offset, v) cpdef inline put_int64(self, uint32_t offset, int64_t v): self.check_bound(offset, 8) - self.c_buffer_ptr.UnsafePut(offset, v) + self.c_buffer.UnsafePut(offset, v) cpdef inline put_float(self, uint32_t offset, float v): self.check_bound(offset, 4) - self.c_buffer_ptr.UnsafePut(offset, v) + self.c_buffer.UnsafePut(offset, v) cpdef inline put_double(self, uint32_t offset, double v): self.check_bound(offset, 8) - self.c_buffer_ptr.UnsafePut(offset, v) + self.c_buffer.UnsafePut(offset, v) cpdef inline c_bool get_bool(self, uint32_t offset): self.check_bound(offset, 1) - return self.c_buffer_ptr.GetBool(offset) + return self.c_buffer.GetBool(offset) cpdef inline int8_t get_int8(self, uint32_t offset): self.check_bound(offset, 1) - return self.c_buffer_ptr.GetInt8(offset) + return self.c_buffer.GetInt8(offset) cpdef inline int16_t get_int16(self, uint32_t offset): self.check_bound(offset, 2) - return self.c_buffer_ptr.GetInt16(offset) + return self.c_buffer.GetInt16(offset) cpdef inline int32_t get_int24(self, uint32_t offset): self.check_bound(offset, 3) - cdef uint8_t* arr = self._c_address + offset - cdef int32_t result = arr[0] - return (result & 0xFF) | (((arr[1]) & 0xFF) << 8) |\ - (((arr[2]) & 0xFF) << 16) + return self.c_buffer.GetInt24(offset) cpdef inline int32_t get_int32(self, uint32_t offset): self.check_bound(offset, 4) - return self.c_buffer_ptr.GetInt32(offset) + return self.c_buffer.GetInt32(offset) cpdef inline int64_t get_int64(self, uint32_t offset): self.check_bound(offset, 8) - return self.c_buffer_ptr.GetInt64(offset) + return self.c_buffer.GetInt64(offset) cpdef inline float get_float(self, uint32_t offset): self.check_bound(offset, 4) - return self.c_buffer_ptr.GetFloat(offset) + return self.c_buffer.GetFloat(offset) cpdef inline double get_double(self, uint32_t offset): self.check_bound(offset, 8) - return self.c_buffer_ptr.GetDouble(offset) + return self.c_buffer.GetDouble(offset) cpdef inline check_bound(self, int32_t offset, int32_t length): - cdef int32_t size_ = self._c_size + cdef int32_t size_ = self.c_buffer.size() if offset | length | (offset + length) | (size_- (offset + length)) < 0: - raise ValueError(f"Address range {offset, offset + length} " - f"out of bound {0, size_}") + raise_fory_error( + CErrorCode.BufferOutOfBound, + f"Address range {offset, offset + length} out of bound {0, size_}", + ) cpdef inline write_bool(self, c_bool value): - self.grow(1) - ((self._c_address + self.writer_index))[0] = value - self.writer_index += 1 + self.c_buffer.WriteUint8(value) cpdef inline write_uint8(self, uint8_t value): - self.grow(1) - ((self._c_address + self.writer_index))[0] = value - self.writer_index += 1 + self.c_buffer.WriteUint8(value) cpdef inline write_int8(self, int8_t value): - self.grow(1) - ((self._c_address + self.writer_index))[0] = value - self.writer_index += 1 + self.c_buffer.WriteInt8(value) cpdef inline write_int16(self, int16_t value): - self.grow(2) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 2 + self.c_buffer.WriteInt16(value) cpdef inline write_int24(self, int32_t value): - self.grow(3) - cdef uint8_t* arr = self._c_address + self.writer_index - arr[0] = value - arr[1] = (value >> 8) - arr[2] = (value >> 16) - self.writer_index += 3 + self.c_buffer.WriteInt24(value) cpdef inline write_int32(self, int32_t value): - self.grow(4) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 4 + self.c_buffer.WriteInt32(value) cpdef inline write_int64(self, int64_t value): - self.grow(8) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 8 + self.c_buffer.WriteInt64(value) cpdef inline write_uint16(self, uint16_t value): - self.grow(2) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 2 + self.c_buffer.WriteUint16(value) cpdef inline write_uint32(self, uint32_t value): - self.grow(4) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 4 + self.c_buffer.WriteUint32(value) cpdef inline write_uint64(self, uint64_t value): - self.grow(8) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 8 + self.c_buffer.WriteInt64(value) cpdef inline write_float(self, float value): - self.grow(4) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 4 + self.c_buffer.WriteFloat(value) cpdef inline write_float32(self, float value): - self.grow(4) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 4 + self.c_buffer.WriteFloat(value) cpdef inline write_double(self, double value): - self.grow(8) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 8 + self.c_buffer.WriteDouble(value) cpdef inline write_float64(self, double value): - self.grow(8) - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 8 + self.c_buffer.WriteDouble(value) cpdef put_buffer(self, uint32_t offset, v, int32_t src_index, int32_t length): if length == 0: # access an emtpy buffer may raise out-of-bound exception. @@ -250,42 +255,44 @@ cdef class Buffer: self.check_bound(offset, size) src_offset = src_index * itemsize cdef uint8_t* ptr = get_address(v) - self.c_buffer_ptr.CopyFrom(offset, ptr, src_offset, size) + self.c_buffer.CopyFrom(offset, ptr, src_offset, size) cpdef inline write_bytes_and_size(self, bytes value): cdef const unsigned char[:] data = value cdef int32_t length = data.nbytes self.write_varuint32(length) if length > 0: - self.grow(length) - self.c_buffer_ptr.CopyFrom(self.writer_index, &data[0], 0, length) - self.writer_index += length + self.c_buffer.WriteBytes(&data[0], length) cpdef inline bytes read_bytes_and_size(self): cdef int32_t length = self.read_varuint32() - value = self.get_bytes(self.reader_index, length) - self.reader_index += length - return value + return self.read_bytes(length) cpdef inline write_bytes(self, bytes value): cdef const unsigned char[:] data = value cdef int32_t length = data.nbytes if length > 0: - self.grow(length) - self.c_buffer_ptr.CopyFrom(self.writer_index, &data[0], 0, length) - self.writer_index += length + self.c_buffer.WriteBytes(&data[0], length) cpdef inline bytes read_bytes(self, int32_t length): - value = self.get_bytes(self.reader_index, length) - self.reader_index += length - return value + if length == 0: + return b"" + cdef bytes py_bytes = PyBytes_FromStringAndSize(NULL, length) + if py_bytes is None: + raise MemoryError("out of memory") + cdef char* buf = PyBytes_AS_STRING(py_bytes) + self.c_buffer.ReadBytes(buf, length, self._error) + if not self._error.ok(): + self._raise_if_error() + return py_bytes cpdef inline int64_t read_bytes_as_int64(self, int32_t length): cdef int64_t result = 0 - cdef CResultVoidError res = self.c_buffer_ptr.GetBytesAsInt64(self.reader_index, length, &result) + cdef uint32_t offset = self.c_buffer.reader_index() + cdef CResultVoidError res = self.c_buffer.GetBytesAsInt64(offset, length, &result) if not res.ok(): - raise ValueError(res.error().message()) - self.reader_index += length + raise_fory_error(res.error().code(), res.error().message()) + self.c_buffer.IncreaseReaderIndex(length) return result cpdef inline put_bytes(self, uint32_t offset, bytes value): @@ -293,13 +300,13 @@ cdef class Buffer: cdef int32_t length = data.nbytes if length > 0: self.grow(length) - self.c_buffer_ptr.CopyFrom(offset, &data[0], 0, length) + self.c_buffer.CopyFrom(offset, &data[0], 0, length) cpdef inline bytes get_bytes(self, uint32_t offset, uint32_t nbytes): if nbytes == 0: return b"" self.check_bound(offset, nbytes) - cdef unsigned char* binary_data = self._c_address + offset + cdef unsigned char* binary_data = self.c_buffer.data() + offset return binary_data[:nbytes] cpdef inline write_buffer(self, value, src_index=0, length_=None): @@ -311,106 +318,98 @@ cdef class Buffer: length = len(value) - src_index else: length = length_ - self.grow(length * itemsize) - self.put_buffer(self.writer_index, value, src_index, length) - self.writer_index += length * itemsize + if length <= 0: + return + cdef uint32_t offset = self.c_buffer.writer_index() + self.c_buffer.Grow(length * itemsize) + self.put_buffer(offset, value, src_index, length) + self.c_buffer.IncreaseWriterIndex(length * itemsize) cpdef inline write(self, value): cdef const unsigned char[:] data = value cdef int32_t length = data.nbytes if length > 0: - self.grow(length) - self.c_buffer_ptr.CopyFrom(self.writer_index, &data[0], 0, length) - self.writer_index += length + self.c_buffer.WriteBytes(&data[0], length) cpdef inline grow(self, int32_t needed_size): - cdef int32_t length = self.writer_index + needed_size - if length > self._c_size: - self.reserve(length * 2) + self.c_buffer.Grow(needed_size) cpdef inline ensure(self, int32_t length): - if length > self._c_size: + if length > self.c_buffer.size(): self.reserve(length * 2) cpdef inline skip(self, int32_t length): - cdef int32_t offset = self.reader_index - self.check_bound(offset, length) - self.reader_index = offset + length + self.c_buffer.Skip(length, self._error) + self._raise_if_error() cpdef inline c_bool read_bool(self): - cdef int32_t offset = self.reader_index - self.check_bound(offset, 1) - self.reader_index += 1 - return ((self._c_address + offset))[0] + cdef uint8_t value = self.c_buffer.ReadUint8(self._error) + self._raise_if_error() + return value != 0 cpdef inline uint8_t read_uint8(self): - cdef int32_t offset = self.reader_index - self.check_bound(offset, 1) - self.reader_index += 1 - return ((self._c_address + offset))[0] + cdef uint8_t value = self.c_buffer.ReadUint8(self._error) + self._raise_if_error() + return value cpdef inline int8_t read_int8(self): - cdef int32_t offset = self.reader_index - self.check_bound(offset, 1) - self.reader_index += 1 - return ((self._c_address + offset))[0] + cdef int8_t value = self.c_buffer.ReadInt8(self._error) + self._raise_if_error() + return value cpdef inline int16_t read_int16(self): - value = self.get_int16(self.reader_index) - self.reader_index += 2 + cdef int16_t value = self.c_buffer.ReadInt16(self._error) + self._raise_if_error() return value cpdef inline int16_t read_int24(self): - value = self.get_int24(self.reader_index) - self.reader_index += 3 + cdef int32_t value = self.c_buffer.ReadInt24(self._error) + self._raise_if_error() return value cpdef inline int32_t read_int32(self): - value = self.get_int32(self.reader_index) - self.reader_index += 4 + cdef int32_t value = self.c_buffer.ReadInt32(self._error) + self._raise_if_error() return value cpdef inline int64_t read_int64(self): - value = self.get_int64(self.reader_index) - self.reader_index += 8 + cdef int64_t value = self.c_buffer.ReadInt64(self._error) + self._raise_if_error() return value cpdef inline uint16_t read_uint16(self): - cdef int32_t offset = self.reader_index - self.check_bound(offset, 2) - self.reader_index = offset + 2 - return self.c_buffer_ptr.GetInt16(offset) + cdef uint16_t value = self.c_buffer.ReadUint16(self._error) + self._raise_if_error() + return value cpdef inline uint32_t read_uint32(self): - cdef int32_t offset = self.reader_index - self.check_bound(offset, 4) - self.reader_index = offset + 4 - return self.c_buffer_ptr.GetInt32(offset) + cdef uint32_t value = self.c_buffer.ReadUint32(self._error) + self._raise_if_error() + return value cpdef inline uint64_t read_uint64(self): - cdef int32_t offset = self.reader_index - self.check_bound(offset, 8) - self.reader_index = offset + 8 - return self.c_buffer_ptr.GetInt64(offset) + cdef uint64_t value = self.c_buffer.ReadUint64(self._error) + self._raise_if_error() + return value cpdef inline float read_float(self): - value = self.get_float(self.reader_index) - self.reader_index += 4 + cdef float value = self.c_buffer.ReadFloat(self._error) + self._raise_if_error() return value cpdef inline float read_float32(self): - value = self.get_float(self.reader_index) - self.reader_index += 4 + cdef float value = self.c_buffer.ReadFloat(self._error) + self._raise_if_error() return value cpdef inline double read_double(self): - value = self.get_double(self.reader_index) - self.reader_index += 8 + cdef double value = self.c_buffer.ReadDouble(self._error) + self._raise_if_error() return value cpdef inline double read_float64(self): - value = self.get_double(self.reader_index) - self.reader_index += 8 + cdef double value = self.c_buffer.ReadDouble(self._error) + self._raise_if_error() return value cpdef inline bytes read(self, int32_t length): @@ -419,287 +418,98 @@ cdef class Buffer: cpdef inline bytes readline(self, int32_t size=-1): if size != -1: raise ValueError(f"Specify size {size} is unsupported") - cdef uint8_t* arr = self._c_address - cdef int32_t target_index = self.reader_index + cdef uint8_t* arr = self.c_buffer.data() + cdef uint32_t start_index = self.c_buffer.reader_index() + cdef uint32_t target_index = start_index cdef uint8_t sep = 10 # '\n' - cdef int32_t buffer_size = self._c_size + cdef int32_t buffer_size = self.c_buffer.size() while arr[target_index] != sep and target_index < buffer_size: target_index += 1 - cdef bytes data = arr[self.reader_index:target_index] - self.reader_index = target_index + cdef bytes data = arr[start_index:target_index] + self.c_buffer.ReaderIndex(target_index) return data cpdef inline write_varint32(self, int32_t value): - return self.write_varuint32((value << 1) ^ (value >> 31)) + cdef uint32_t before = self.c_buffer.writer_index() + self.c_buffer.WriteVarInt32(value) + cdef uint32_t after = self.c_buffer.writer_index() + return after - before cpdef inline write_varuint32(self, uint32_t value): - # Need 8 bytes for safe bulk write (PutVarUint32 writes uint64_t for 5-byte varints) - self.grow(8) - cdef int32_t actual_bytes_written = self.c_buffer_ptr.PutVarUint32(self.writer_index, value) - self.writer_index += actual_bytes_written - return actual_bytes_written + cdef uint32_t before = self.c_buffer.writer_index() + self.c_buffer.WriteVarUint32(value) + cdef uint32_t after = self.c_buffer.writer_index() + return after - before cpdef inline int32_t read_varint32(self): - cdef uint32_t v = self.read_varuint32() - return (v >> 1) ^ -(v & 1) + cdef int32_t value = self.c_buffer.ReadVarInt32(self._error) + self._raise_if_error() + return value cpdef inline uint32_t read_varuint32(self): - cdef: - uint32_t read_length = 0 - int8_t b - uint32_t result - if self._c_size - self.reader_index > 5: - result = self.c_buffer_ptr.GetVarUint32(self.reader_index, &read_length) - self.reader_index += read_length - return result - else: - b = self.read_int8() - result = b & 0x7F - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 7 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 14 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 21 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 28 - return result + cdef uint32_t value = self.c_buffer.ReadVarUint32(self._error) + self._raise_if_error() + return value cpdef inline write_varint64(self, int64_t value): - return self.write_varuint64((value << 1) ^ (value >> 63)) + cdef uint32_t before = self.c_buffer.writer_index() + self.c_buffer.WriteVarInt64(value) + cdef uint32_t after = self.c_buffer.writer_index() + return after - before cpdef inline write_varuint64(self, int64_t v): - cdef: - uint64_t value = v - int64_t offset = self.writer_index - self.grow(9) - cdef uint8_t* arr = self._c_address - if value >> 7 == 0: - arr[offset] = value - self.writer_index += 1 - return 1 - arr[offset] = ((value & 0x7F) | 0x80) - if value >> 14 == 0: - arr[offset+1] = (value >> 7) - self.writer_index += 2 - return 2 - arr[offset + 1] = (value >> 7 | 0x80) - if value >> 21 == 0: - arr[offset+2] = (value >> 14) - self.writer_index += 3 - return 3 - arr[offset + 2] = (value >> 14 | 0x80) - if value >> 28 == 0: - arr[offset+3] = (value >> 21) - self.writer_index += 4 - return 4 - arr[offset + 3] = (value >> 21 | 0x80) - if value >> 35 == 0: - arr[offset+4] = (value >> 28) - self.writer_index += 5 - return 5 - arr[offset + 4] = (value >> 28 | 0x80) - if value >> 42 == 0: - arr[offset+5] = (value >> 35) - self.writer_index += 6 - return 6 - arr[offset + 5] = (value >> 35 | 0x80) - if value >> 49 == 0: - arr[offset+6] = (value >> 42) - self.writer_index += 7 - return 7 - arr[offset + 6] = (value >> 42 | 0x80) - if value >> 56 == 0: - arr[offset+7] = (value >> 49) - self.writer_index += 8 - return 8 - arr[offset + 7] = (value >> 49 | 0x80) - arr[offset + 8] = (value >> 56) - self.writer_index += 9 - return 9 + cdef uint32_t before = self.c_buffer.writer_index() + self.c_buffer.WriteVarUint64(v) + cdef uint32_t after = self.c_buffer.writer_index() + return after - before cpdef inline int64_t read_varint64(self): - cdef uint64_t v = self.read_varuint64() - return ((v >> 1) ^ -(v & 1)) + cdef int64_t value = self.c_buffer.ReadVarInt64(self._error) + self._raise_if_error() + return value cpdef inline int64_t read_varuint64(self): - cdef: - uint32_t read_length = 1 - int64_t b - int64_t result - uint32_t position = self.reader_index - int8_t * arr = (self._c_address + position) - if self._c_size - self.reader_index > 9: - b = arr[0] - result = b & 0x7F - if (b & 0x80) != 0: - read_length += 1 - b = arr[1] - result |= (b & 0x7F) << 7 - if (b & 0x80) != 0: - read_length += 1 - b = arr[2] - result |= (b & 0x7F) << 14 - if (b & 0x80) != 0: - read_length += 1 - b = arr[3] - result |= (b & 0x7F) << 21 - if (b & 0x80) != 0: - read_length += 1 - b = arr[4] - result |= (b & 0x7F) << 28 - if (b & 0x80) != 0: - read_length += 1 - b = arr[5] - result |= (b & 0x7F) << 35 - if (b & 0x80) != 0: - read_length += 1 - b = arr[6] - result |= (b & 0x7F) << 42 - if (b & 0x80) != 0: - read_length += 1 - b = arr[7] - result |= (b & 0x7F) << 49 - if (b & 0x80) != 0: - read_length += 1 - b = arr[8] - # highest bit in last byte is symbols bit - result |= b << 56 - self.reader_index += read_length - return result - else: - b = self.read_int8() - result = b & 0x7F - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 7 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 14 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 21 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 28 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 35 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 42 - if (b & 0x80) != 0: - b = self.read_int8() - result |= (b & 0x7F) << 49 - if (b & 0x80) != 0: - b = self.read_int8() - # highest bit in last byte is symbols bit - result |= b << 56 - return result + cdef uint64_t value = self.c_buffer.ReadVarUint64(self._error) + self._raise_if_error() + return value cpdef inline write_tagged_int64(self, int64_t value): - """Write signed int64 using fory Tagged(Small long as int) encoding. - - If value is in [-1073741824, 1073741823] (fits in 31 bits with sign), - encode as 4 bytes: ((value as i32) << 1). - Otherwise write as 9 bytes: 0b1 | little-endian 8 bytes i64. - """ - cdef int64_t HALF_MIN_INT_VALUE = -1073741824 # i32::MIN / 2 - cdef int64_t HALF_MAX_INT_VALUE = 1073741823 # i32::MAX / 2 - if HALF_MIN_INT_VALUE <= value <= HALF_MAX_INT_VALUE: - # Fits in 31 bits (with sign), encode as 4 bytes with bit 0 = 0 - self.write_int32((value) << 1) - else: - # Write flag byte (0b1) followed by 8-byte i64 - self.grow(9) - ((self._c_address + self.writer_index))[0] = 0b1 - self.writer_index += 1 - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 8 + """Write signed int64 using fory Tagged(Small long as int) encoding.""" + self.c_buffer.WriteTaggedInt64(value) cpdef inline int64_t read_tagged_int64(self): - """Read signed fory Tagged(Small long as int) encoded int64. - - If bit 0 of the first 4 bytes is 0, return the value >> 1 (arithmetic shift). - Otherwise, skip the flag byte and read 8 bytes as int64. - """ - cdef int32_t offset = self.reader_index - cdef int32_t i - cdef int64_t value - self.check_bound(offset, 4) - i = self.c_buffer_ptr.GetInt32(offset) - if (i & 0b1) != 0b1: - # Bit 0 is 0, small value encoded in 4 bytes - self.reader_index = offset + 4 - return (i >> 1) # arithmetic right shift preserves sign - else: - # Bit 0 is 1, big value: skip flag byte and read 8 bytes - self.check_bound(offset, 9) - self.reader_index = offset + 1 - value = self.c_buffer_ptr.GetInt64(self.reader_index) - self.reader_index += 8 - return value + """Read signed fory Tagged(Small long as int) encoded int64.""" + cdef int64_t value = self.c_buffer.ReadTaggedInt64(self._error) + self._raise_if_error() + return value cpdef inline write_tagged_uint64(self, uint64_t value): - """Write unsigned uint64 using fory Tagged(Small long as int) encoding. - - If value is in [0, 0x7fffffff], encode as 4 bytes: ((value as u32) << 1). - Otherwise write as 9 bytes: 0b1 | little-endian 8 bytes u64. - """ - cdef uint64_t MAX_SMALL_VALUE = 0x7fffffff # i32::MAX as u64 - if value <= MAX_SMALL_VALUE: - # Fits in 31 bits, encode as 4 bytes with bit 0 = 0 - self.write_int32((value) << 1) - else: - # Write flag byte (0b1) followed by 8-byte u64 - self.grow(9) - ((self._c_address + self.writer_index))[0] = 0b1 - self.writer_index += 1 - self.c_buffer_ptr.UnsafePut(self.writer_index, value) - self.writer_index += 8 + """Write unsigned uint64 using fory Tagged(Small long as int) encoding.""" + self.c_buffer.WriteTaggedUint64(value) cpdef inline uint64_t read_tagged_uint64(self): - """Read unsigned fory Tagged(Small long as int) encoded uint64. - - If bit 0 of the first 4 bytes is 0, return the value >> 1. - Otherwise, skip the flag byte and read 8 bytes as uint64. - """ - cdef int32_t offset = self.reader_index - cdef uint32_t i - cdef uint64_t value - self.check_bound(offset, 4) - i = self.c_buffer_ptr.GetInt32(offset) - if (i & 0b1) != 0b1: - # Bit 0 is 0, small value encoded in 4 bytes - self.reader_index = offset + 4 - return (i >> 1) - else: - # Bit 0 is 1, big value: skip flag byte and read 8 bytes - self.check_bound(offset, 9) - self.reader_index = offset + 1 - value = self.c_buffer_ptr.GetInt64(self.reader_index) - self.reader_index += 8 - return value + """Read unsigned fory Tagged(Small long as int) encoded uint64.""" + cdef uint64_t value = self.c_buffer.ReadTaggedUint64(self._error) + self._raise_if_error() + return value cdef inline write_c_buffer(self, const uint8_t* value, int32_t length): self.write_varuint32(length) if length <= 0: # access an emtpy buffer may raise out-of-bound exception. return - self.grow(length) - self.check_bound(self.writer_index, length) - self.c_buffer_ptr.CopyFrom(self.writer_index, value, 0, length) - self.writer_index += length + cdef uint32_t offset = self.c_buffer.writer_index() + self.c_buffer.Grow(length) + self.check_bound(offset, length) + self.c_buffer.CopyFrom(offset, value, 0, length) + self.c_buffer.IncreaseWriterIndex(length) cdef inline int32_t read_c_buffer(self, uint8_t** buf): cdef int32_t length = self.read_varuint32() - cdef uint8_t* binary_data = self._c_address - self.check_bound(self.reader_index, length) - buf[0] = binary_data + self.reader_index - self.reader_index += length + cdef uint8_t* binary_data = self.c_buffer.data() + cdef uint32_t offset = self.c_buffer.reader_index() + self.check_bound(offset, length) + buf[0] = binary_data + offset + self.c_buffer.IncreaseReaderIndex(length) return length cpdef inline write_string(self, str value): @@ -722,17 +532,19 @@ cdef class Buffer: self.write_varuint64(header) if buffer_size == 0: # access an emtpy buffer may raise out-of-bound exception. return - self.grow(buffer_size) - self.check_bound(self.writer_index, buffer_size) - self.c_buffer_ptr.CopyFrom(self.writer_index, buffer, 0, buffer_size) - self.writer_index += buffer_size + cdef uint32_t offset = self.c_buffer.writer_index() + self.c_buffer.Grow(buffer_size) + self.check_bound(offset, buffer_size) + self.c_buffer.CopyFrom(offset, buffer, 0, buffer_size) + self.c_buffer.IncreaseWriterIndex(buffer_size) cpdef inline str read_string(self): cdef uint64_t header = self.read_varuint64() cdef uint32_t size = header >> 2 - self.check_bound(self.reader_index, size) - cdef const char * buf = (self._c_address + self.reader_index) - self.reader_index += size + cdef uint32_t offset = self.c_buffer.reader_index() + self.check_bound(offset, size) + cdef const char * buf = (self.c_buffer.data() + offset) + self.c_buffer.IncreaseReaderIndex(size) cdef uint32_t encoding = header & 0b11 if encoding == 0: # PyUnicode_FromASCII @@ -751,19 +563,19 @@ cdef class Buffer: return PyUnicode_DecodeUTF8(buf, size, "strict") def __len__(self): - return self._c_size + return self.c_buffer.size() cpdef inline int32_t size(self): - return self._c_size + return self.c_buffer.size() def to_bytes(self, int32_t offset=0, int32_t length=0) -> bytes: if length != 0: - assert 0 < length <= self._c_size,\ - f"length {length} size {self._c_size}" + assert 0 < length <= self.c_buffer.size(),\ + f"length {length} size {self.c_buffer.size()}" else: - length = self._c_size + length = self.c_buffer.size() cdef: - uint8_t* data = self._c_address + offset + uint8_t* data = self.c_buffer.data() + offset return data[:length] def to_pybytes(self) -> bytes: @@ -777,10 +589,10 @@ cdef class Buffer: if (key.step or 1) != 1: raise IndexError('only slices with step 1 supported') return _normalize_slice(self, key) - return self.getitem(_normalize_index(key, self._c_size)) + return self.getitem(_normalize_index(key, self.c_buffer.size())) cdef getitem(self, int64_t i): - return self._c_address[i] + return self.c_buffer.data()[i] def hex(self): """ @@ -790,17 +602,17 @@ cdef class Buffer: ------- : bytes """ - return self.c_buffer_ptr.Hex().decode("UTF-8") + return self.c_buffer.Hex().decode("UTF-8") def __getbuffer__(self, Py_buffer *buffer, int flags): cdef Py_ssize_t itemsize = 1 - self.shape[0] = self._c_size + self.shape[0] = self.c_buffer.size() self.stride[0] = itemsize - buffer.buf = (self._c_address) + buffer.buf = (self.c_buffer.data()) buffer.format = 'B' buffer.internal = NULL # see References buffer.itemsize = itemsize - buffer.len = self._c_size # product(shape) * itemsize + buffer.len = self.c_buffer.size() # product(shape) * itemsize buffer.ndim = 1 buffer.obj = self buffer.readonly = 0 @@ -929,15 +741,15 @@ cdef Py_ssize_t _normalize_index(Py_ssize_t index, def get_bit(Buffer buffer, uint32_t base_offset, uint32_t index) -> bool: - return GetBit(buffer._c_address + base_offset, index) + return GetBit(buffer.c_buffer.data() + base_offset, index) def set_bit(Buffer buffer, uint32_t base_offset, uint32_t index): - return SetBit(buffer._c_address + base_offset, index) + return SetBit(buffer.c_buffer.data() + base_offset, index) def clear_bit(Buffer buffer, uint32_t base_offset, uint32_t index): - return ClearBit(buffer._c_address + base_offset, index) + return ClearBit(buffer.c_buffer.data() + base_offset, index) def set_bit_to(Buffer buffer, @@ -945,4 +757,4 @@ def set_bit_to(Buffer buffer, uint32_t index, c_bool bit_is_set): return SetBitTo( - buffer._c_address + base_offset, index, bit_is_set) + buffer.c_buffer.data() + base_offset, index, bit_is_set) diff --git a/python/pyfory/collection.pxi b/python/pyfory/collection.pxi index 45b95654d2..e88a487f97 100644 --- a/python/pyfory/collection.pxi +++ b/python/pyfory/collection.pxi @@ -203,7 +203,7 @@ cdef class CollectionSerializer(Serializer): if value_type is list or value_type is tuple: size = sizeof(bool) * Py_SIZE(value) buffer.grow(size) - Fory_PyBooleanSequenceWriteToBuffer(value, buffer.c_buffer.get(), buffer.writer_index) + Fory_PyBooleanSequenceWriteToBuffer(value, &buffer.c_buffer, buffer.writer_index) buffer.writer_index += size else: for s in value: @@ -218,7 +218,7 @@ cdef class CollectionSerializer(Serializer): if value_type is list or value_type is tuple: size = sizeof(double) * Py_SIZE(value) buffer.grow(size) - Fory_PyFloatSequenceWriteToBuffer(value, buffer.c_buffer.get(), buffer.writer_index) + Fory_PyFloatSequenceWriteToBuffer(value, &buffer.c_buffer, buffer.writer_index) buffer.writer_index += size else: for s in value: diff --git a/python/pyfory/error.py b/python/pyfory/error.py index 84d3173aba..5b89a47699 100644 --- a/python/pyfory/error.py +++ b/python/pyfory/error.py @@ -20,6 +20,78 @@ class ForyError(Exception): pass +class ForyOutOfMemoryError(ForyError): + pass + + +class ForyOutOfBoundError(ForyError): + pass + + +class ForyKeyError(ForyError): + pass + + +class ForyTypeError(ForyError): + pass + + +class ForyInvalidError(ForyError): + pass + + +class ForyIOError(ForyError): + pass + + +class ForyUnknownError(ForyError): + pass + + +class ForyEncodeError(ForyError): + pass + + +class ForyInvalidDataError(ForyError): + pass + + +class ForyInvalidRefError(ForyError): + pass + + +class ForyUnknownEnumError(ForyError): + pass + + +class ForyEncodingError(ForyError): + pass + + +class ForyDepthExceedError(ForyError): + pass + + +class ForyUnsupportedError(ForyError): + pass + + +class ForyNotAllowedError(ForyError): + pass + + +class ForyStructVersionMismatchError(ForyError): + pass + + +class ForyTypeMismatchError(ForyError): + pass + + +class ForyBufferOutOfBoundError(ForyError): + pass + + class TypeNotCompatibleError(ForyError): pass @@ -30,3 +102,32 @@ class TypeUnregisteredError(ForyError): class CompileError(ForyError): pass + + +_ERROR_CODE_TO_EXCEPTION = { + 1: ForyOutOfMemoryError, + 2: ForyOutOfBoundError, + 3: ForyKeyError, + 4: ForyTypeError, + 5: ForyInvalidError, + 6: ForyIOError, + 7: ForyUnknownError, + 8: ForyEncodeError, + 9: ForyInvalidDataError, + 10: ForyInvalidRefError, + 11: ForyUnknownEnumError, + 12: ForyEncodingError, + 13: ForyDepthExceedError, + 14: ForyUnsupportedError, + 15: ForyNotAllowedError, + 16: ForyStructVersionMismatchError, + 17: ForyTypeMismatchError, + 18: ForyBufferOutOfBoundError, +} + + +def raise_fory_error(code, message): + if isinstance(message, bytes): + message = message.decode("utf-8", "replace") + exc_cls = _ERROR_CODE_TO_EXCEPTION.get(int(code), ForyError) + raise exc_cls(message) diff --git a/python/pyfory/format/row.pxi b/python/pyfory/format/row.pxi index 312ecda480..527bb219bf 100644 --- a/python/pyfory/format/row.pxi +++ b/python/pyfory/format/row.pxi @@ -24,6 +24,7 @@ from pyfory.includes.libformat cimport ( CSchema, CListType, CMapType, fory_schema ) from pyfory.buffer cimport Buffer +from pyfory.includes.libutil cimport CBuffer from libcpp.memory cimport shared_ptr from libcpp.vector cimport vector from datetime import datetime, date @@ -280,7 +281,12 @@ cdef class RowData(Getter): cdef: Buffer buf = buffer shared_ptr[CRow] row = make_shared[CRow]((schema).c_schema) - deref(row).PointTo(buf.c_buffer, offset, size_in_bytes) + shared_ptr[CBuffer] shared_buf = make_shared[CBuffer]( + buf.c_buffer.data(), + buf.c_buffer.size(), + False, + ) + deref(row).PointTo(shared_buf, offset, size_in_bytes) self.data = row self.getter = row.get() self.schema_ = schema diff --git a/python/pyfory/includes/libutil.pxd b/python/pyfory/includes/libutil.pxd index f344d93b3f..57fa776cc8 100644 --- a/python/pyfory/includes/libutil.pxd +++ b/python/pyfory/includes/libutil.pxd @@ -43,10 +43,12 @@ cdef extern from "fory/util/error.h" namespace "fory" nogil: BufferOutOfBound = 18 cdef cppclass CError "fory::Error": + c_bool ok() const CErrorCode code() const const c_string& message() const c_string to_string() const c_string code_as_string() const + void reset() cdef extern from "fory/util/result.h" namespace "fory" nogil: cdef cppclass CResultVoidError "fory::Result": @@ -56,7 +58,8 @@ cdef extern from "fory/util/result.h" namespace "fory" nogil: cdef extern from "fory/util/buffer.h" namespace "fory" nogil: cdef cppclass CBuffer "fory::Buffer": - CBuffer(uint8_t* data, uint32_t size, c_bool own_data=True) + CBuffer() + CBuffer(uint8_t* data, uint32_t size, c_bool own_data) inline uint8_t* data() @@ -64,7 +67,21 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil: inline c_bool own_data() - inline c_bool Reserve(uint32_t new_size) + inline uint32_t writer_index() + + inline uint32_t reader_index() + + inline void WriterIndex(uint32_t writer_index) + + inline void IncreaseWriterIndex(uint32_t diff) + + inline void ReaderIndex(uint32_t reader_index) + + inline void IncreaseReaderIndex(uint32_t diff) + + void Grow(uint32_t min_capacity) + + void Reserve(uint32_t new_size) inline void UnsafePutByte(uint32_t offset, c_bool) @@ -91,6 +108,8 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil: inline int16_t GetInt16(uint32_t offset) + inline int32_t GetInt24(uint32_t offset) + inline int32_t GetInt32(uint32_t offset) inline int64_t GetInt64(uint32_t offset) @@ -105,6 +124,82 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil: inline int32_t GetVarUint32(uint32_t offset, uint32_t *readBytesLength) + inline void PutInt24(uint32_t offset, int32_t value) + + void WriteUint8(uint8_t value) + + void WriteInt8(int8_t value) + + void WriteUint16(uint16_t value) + + void WriteInt16(int16_t value) + + void WriteInt24(int32_t value) + + void WriteUint32(uint32_t value) + + void WriteInt32(int32_t value) + + void WriteInt64(int64_t value) + + void WriteFloat(float value) + + void WriteDouble(double value) + + void WriteVarUint32(uint32_t value) + + void WriteVarInt32(int32_t value) + + void WriteVarUint64(uint64_t value) + + void WriteVarInt64(int64_t value) + + void WriteTaggedInt64(int64_t value) + + void WriteTaggedUint64(uint64_t value) + + void WriteBytes(const void* data, uint32_t length) + + uint8_t ReadUint8(CError& error) + + int8_t ReadInt8(CError& error) + + uint16_t ReadUint16(CError& error) + + int16_t ReadInt16(CError& error) + + int32_t ReadInt24(CError& error) + + uint32_t ReadUint32(CError& error) + + int32_t ReadInt32(CError& error) + + uint64_t ReadUint64(CError& error) + + int64_t ReadInt64(CError& error) + + float ReadFloat(CError& error) + + double ReadDouble(CError& error) + + uint32_t ReadVarUint32(CError& error) + + int32_t ReadVarInt32(CError& error) + + uint64_t ReadVarUint64(CError& error) + + int64_t ReadVarInt64(CError& error) + + int64_t ReadTaggedInt64(CError& error) + + uint64_t ReadTaggedUint64(CError& error) + + uint64_t ReadVarUint36Small(CError& error) + + void ReadBytes(void* data, uint32_t length, CError& error) + + void Skip(uint32_t length, CError& error) + void Copy(uint32_t start, uint32_t nbytes, uint8_t* out, uint32_t offset) const