diff --git a/.eslintrc.json b/.eslintrc.json index d009780f37..e1092b9bef 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -285,6 +285,13 @@ } ] } + ], + "no-restricted-globals": [ + "error", + { + "name": "Buffer", + "message": "Use Uint8Array instead" + } ] } }, diff --git a/src/bson.ts b/src/bson.ts index c08cd03896..0b3782a97e 100644 --- a/src/bson.ts +++ b/src/bson.ts @@ -8,6 +8,7 @@ export { BSONRegExp, BSONSymbol, BSONType, + ByteUtils, calculateObjectSize, Code, DBRef, @@ -38,10 +39,56 @@ export function parseToElementsToArray(bytes: Uint8Array, offset?: number): BSON return Array.isArray(res) ? res : [...res]; } -export const getInt32LE = BSON.onDemand.NumberUtils.getInt32LE; -export const getFloat64LE = BSON.onDemand.NumberUtils.getFloat64LE; -export const getBigInt64LE = BSON.onDemand.NumberUtils.getBigInt64LE; -export const toUTF8 = BSON.onDemand.ByteUtils.toUTF8; +export const getInt32LE = BSON.NumberUtils.getInt32LE; +export const getFloat64LE = BSON.NumberUtils.getFloat64LE; +export const getBigInt64LE = BSON.NumberUtils.getBigInt64LE; +export const toUTF8 = BSON.ByteUtils.toUTF8; +export const fromUTF8 = BSON.ByteUtils.fromUTF8; +export const fromBase64 = BSON.ByteUtils.fromBase64; +export const fromNumberArray = BSON.ByteUtils.fromNumberArray; +export const concatBuffers = BSON.ByteUtils.concat; +export const allocateBuffer = BSON.ByteUtils.allocate; +export const allocateUnsafeBuffer = BSON.ByteUtils.allocateUnsafe; + +// writeInt32LE, same order of arguments as Buffer.writeInt32LE +export const writeInt32LE = (destination: Uint8Array, value: number, offset: number) => + BSON.NumberUtils.setInt32LE(destination, offset, value); + +// copyBuffer: copies from source buffer to target buffer, returns number of bytes copied +// inputs are explicitly named to avoid confusion +export const copyBuffer = (input: { + source: Uint8Array; + target: Uint8Array; + targetStart?: number; + sourceStart?: number; + sourceEnd?: number; +}): number => { + const { source, target, targetStart = 0, sourceStart = 0, sourceEnd } = input; + const sourceEndActual = sourceEnd ?? source.length; + const srcSlice = source.subarray(sourceStart, sourceEndActual); + const maxLen = Math.min(srcSlice.length, target.length - targetStart); + if (maxLen <= 0) { + return 0; + } + target.set(srcSlice.subarray(0, maxLen), targetStart); + return maxLen; +}; + +// validates buffer inputs, used for read operations +const validateBufferInputs = (buffer: Uint8Array, offset: number, length: number) => { + if (offset < 0 || offset + length > buffer.length) { + throw new RangeError( + `Attempt to access memory outside buffer bounds: buffer length: ${buffer.length}, offset: ${offset}, length: ${length}` + ); + } +}; + +// readInt32LE, reads a 32-bit integer from buffer at given offset +// throws if offset is out of bounds +export const readInt32LE = (buffer: Uint8Array, offset: number): number => { + validateBufferInputs(buffer, offset, 4); + return getInt32LE(buffer, offset); +}; /** * BSON Serialization options. diff --git a/src/client-side-encryption/auto_encrypter.ts b/src/client-side-encryption/auto_encrypter.ts index 0f9c79752a..53426609f9 100644 --- a/src/client-side-encryption/auto_encrypter.ts +++ b/src/client-side-encryption/auto_encrypter.ts @@ -1,7 +1,7 @@ import { type MongoCrypt, type MongoCryptOptions } from 'mongodb-client-encryption'; import * as net from 'net'; -import { deserialize, type Document, serialize } from '../bson'; +import { ByteUtils, deserialize, type Document, serialize } from '../bson'; import { type CommandOptions, type ProxyOptions } from '../cmap/connection'; import { kDecorateResult } from '../constants'; import { getMongoDBClientEncryption } from '../deps'; @@ -256,20 +256,26 @@ export class AutoEncrypter { errorWrapper: defaultErrorWrapper }; if (options.schemaMap) { - mongoCryptOptions.schemaMap = Buffer.isBuffer(options.schemaMap) - ? options.schemaMap - : (serialize(options.schemaMap) as Buffer); + if (ByteUtils.isUint8Array(options.schemaMap)) { + mongoCryptOptions.schemaMap = options.schemaMap; + } else { + mongoCryptOptions.schemaMap = serialize(options.schemaMap); + } } if (options.encryptedFieldsMap) { - mongoCryptOptions.encryptedFieldsMap = Buffer.isBuffer(options.encryptedFieldsMap) - ? options.encryptedFieldsMap - : (serialize(options.encryptedFieldsMap) as Buffer); + if (ByteUtils.isUint8Array(options.encryptedFieldsMap)) { + mongoCryptOptions.encryptedFieldsMap = options.encryptedFieldsMap; + } else { + mongoCryptOptions.encryptedFieldsMap = serialize(options.encryptedFieldsMap); + } } - mongoCryptOptions.kmsProviders = !Buffer.isBuffer(this._kmsProviders) - ? (serialize(this._kmsProviders) as Buffer) - : this._kmsProviders; + if (ByteUtils.isUint8Array(this._kmsProviders)) { + mongoCryptOptions.kmsProviders = this._kmsProviders; + } else { + mongoCryptOptions.kmsProviders = serialize(this._kmsProviders); + } if (options.options?.logger) { mongoCryptOptions.logger = options.options.logger; @@ -396,7 +402,7 @@ export class AutoEncrypter { return cmd; } - const commandBuffer = Buffer.isBuffer(cmd) ? cmd : serialize(cmd, options); + const commandBuffer: Uint8Array = serialize(cmd, options); const context = this._mongocrypt.makeEncryptionContext( MongoDBCollectionNamespace.fromString(ns).db, commandBuffer diff --git a/src/client-side-encryption/client_encryption.ts b/src/client-side-encryption/client_encryption.ts index 6303eba4ba..948f27256f 100644 --- a/src/client-side-encryption/client_encryption.ts +++ b/src/client-side-encryption/client_encryption.ts @@ -143,9 +143,7 @@ export class ClientEncryption { const mongoCryptOptions: MongoCryptOptions = { ...options, - kmsProviders: !Buffer.isBuffer(this._kmsProviders) - ? (serialize(this._kmsProviders) as Buffer) - : this._kmsProviders, + kmsProviders: serialize(this._kmsProviders), errorWrapper: defaultErrorWrapper }; diff --git a/src/cmap/auth/auth_provider.ts b/src/cmap/auth/auth_provider.ts index e40c791ea5..d39de6500c 100644 --- a/src/cmap/auth/auth_provider.ts +++ b/src/cmap/auth/auth_provider.ts @@ -21,7 +21,7 @@ export class AuthContext { /** A response from an initial auth attempt, only some mechanisms use this (e.g, SCRAM) */ response?: Document; /** A random nonce generated for use in an authentication conversation */ - nonce?: Buffer; + nonce?: Uint8Array; constructor( connection: Connection, diff --git a/src/cmap/auth/aws4.ts b/src/cmap/auth/aws4.ts index 912cdbdcaa..53bb0d356f 100644 --- a/src/cmap/auth/aws4.ts +++ b/src/cmap/auth/aws4.ts @@ -31,7 +31,7 @@ export type SignedHeaders = { const getHexSha256 = async (str: string): Promise => { const data = stringToBuffer(str); const hashBuffer = await crypto.subtle.digest('SHA-256', data); - const hashHex = BSON.onDemand.ByteUtils.toHex(new Uint8Array(hashBuffer)); + const hashHex = BSON.ByteUtils.toHex(new Uint8Array(hashBuffer)); return hashHex; }; @@ -81,8 +81,8 @@ const convertHeaderValue = (value: string | number) => { * @returns Uint8Array containing the UTF-8 encoded string. */ function stringToBuffer(str: string): Uint8Array { - const data = new Uint8Array(BSON.onDemand.ByteUtils.utf8ByteLength(str)); - BSON.onDemand.ByteUtils.encodeUTF8Into(data, str, 0); + const data = new Uint8Array(BSON.ByteUtils.utf8ByteLength(str)); + BSON.ByteUtils.encodeUTF8Into(data, str, 0); return data; } @@ -189,7 +189,7 @@ export async function aws4Sign( // 5. Calculate the signature const signatureBuffer = await getHmacSha256(signingKey, stringToSign); - const signature = BSON.onDemand.ByteUtils.toHex(signatureBuffer); + const signature = BSON.ByteUtils.toHex(signatureBuffer); // 6. Add the signature to the request // Calculate the Authorization header diff --git a/src/cmap/auth/mongodb_aws.ts b/src/cmap/auth/mongodb_aws.ts index b9a2cdef0a..868fdc30b9 100644 --- a/src/cmap/auth/mongodb_aws.ts +++ b/src/cmap/auth/mongodb_aws.ts @@ -5,7 +5,7 @@ import { MongoMissingCredentialsError, MongoRuntimeError } from '../../error'; -import { ByteUtils, maxWireVersion, ns, randomBytes } from '../../utils'; +import { maxWireVersion, ns, randomBytes } from '../../utils'; import { type AuthContext, AuthProvider } from './auth_provider'; import { type AWSCredentialProvider, @@ -92,7 +92,7 @@ export class MongoDBAWS extends AuthProvider { throw new MongoRuntimeError(`Invalid server nonce length ${serverNonce.length}, expected 64`); } - if (!ByteUtils.equals(serverNonce.subarray(0, nonce.byteLength), nonce)) { + if (!BSON.ByteUtils.equals(serverNonce.subarray(0, nonce.byteLength), nonce)) { // throw because the serverNonce's leading 32 bytes must equal the client nonce's 32 bytes // https://github.com/mongodb/specifications/blob/master/source/auth/auth.md#conversation-5 @@ -115,7 +115,7 @@ export class MongoDBAWS extends AuthProvider { headers: { 'Content-Type': 'application/x-www-form-urlencoded', 'Content-Length': body.length, - 'X-MongoDB-Server-Nonce': ByteUtils.toBase64(serverNonce), + 'X-MongoDB-Server-Nonce': BSON.ByteUtils.toBase64(serverNonce), 'X-MongoDB-GS2-CB-Flag': 'n' }, path: '/', diff --git a/src/cmap/auth/plain.ts b/src/cmap/auth/plain.ts index f5a4386311..fbebd3f82d 100644 --- a/src/cmap/auth/plain.ts +++ b/src/cmap/auth/plain.ts @@ -1,4 +1,4 @@ -import { Binary } from '../../bson'; +import { Binary, fromUTF8 } from '../../bson'; import { MongoMissingCredentialsError } from '../../error'; import { ns } from '../../utils'; import { type AuthContext, AuthProvider } from './auth_provider'; @@ -12,7 +12,7 @@ export class Plain extends AuthProvider { const { username, password } = credentials; - const payload = new Binary(Buffer.from(`\x00${username}\x00${password}`)); + const payload = new Binary(fromUTF8(`\x00${username}\x00${password}`)); const command = { saslStart: 1, mechanism: 'PLAIN', diff --git a/src/cmap/auth/scram.ts b/src/cmap/auth/scram.ts index b10b2007dc..254e780a26 100644 --- a/src/cmap/auth/scram.ts +++ b/src/cmap/auth/scram.ts @@ -1,7 +1,16 @@ import { saslprep } from '@mongodb-js/saslprep'; import * as crypto from 'crypto'; -import { Binary, type Document } from '../../bson'; +import { + allocateBuffer, + Binary, + ByteUtils, + concatBuffers, + type Document, + fromBase64, + fromNumberArray, + fromUTF8 +} from '../../bson'; import { MongoInvalidArgumentError, MongoMissingCredentialsError, @@ -65,21 +74,21 @@ function cleanUsername(username: string) { return username.replace('=', '=3D').replace(',', '=2C'); } -function clientFirstMessageBare(username: string, nonce: Buffer) { +function clientFirstMessageBare(username: string, nonce: Uint8Array) { // NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8. // Since the username is not sasl-prep-d, we need to do this here. - return Buffer.concat([ - Buffer.from('n=', 'utf8'), - Buffer.from(username, 'utf8'), - Buffer.from(',r=', 'utf8'), - Buffer.from(nonce.toString('base64'), 'utf8') + return concatBuffers([ + fromUTF8('n='), + fromUTF8(username), + fromUTF8(',r='), + fromUTF8(ByteUtils.toBase64(nonce)) ]); } function makeFirstMessage( cryptoMethod: CryptoMethod, credentials: MongoCredentials, - nonce: Buffer + nonce: Uint8Array ) { const username = cleanUsername(credentials.username); const mechanism = @@ -90,9 +99,7 @@ function makeFirstMessage( return { saslStart: 1, mechanism, - payload: new Binary( - Buffer.concat([Buffer.from('n,,', 'utf8'), clientFirstMessageBare(username, nonce)]) - ), + payload: new Binary(concatBuffers([fromUTF8('n,,'), clientFirstMessageBare(username, nonce)])), autoAuthorize: 1, options: { skipEmptyExchange: true } }; @@ -136,7 +143,7 @@ async function continueScramConversation( const processedPassword = cryptoMethod === 'sha256' ? saslprep(password) : passwordDigest(username, password); - const payload: Binary = Buffer.isBuffer(response.payload) + const payload: Binary = ByteUtils.isUint8Array(response.payload) ? new Binary(response.payload) : response.payload; @@ -157,12 +164,7 @@ async function continueScramConversation( // Set up start of proof const withoutProof = `c=biws,r=${rnonce}`; - const saltedPassword = HI( - processedPassword, - Buffer.from(salt, 'base64'), - iterations, - cryptoMethod - ); + const saltedPassword = HI(processedPassword, fromBase64(salt), iterations, cryptoMethod); const clientKey = HMAC(cryptoMethod, saltedPassword, 'Client Key'); const serverKey = HMAC(cryptoMethod, saltedPassword, 'Server Key'); @@ -181,13 +183,13 @@ async function continueScramConversation( const saslContinueCmd = { saslContinue: 1, conversationId: response.conversationId, - payload: new Binary(Buffer.from(clientFinal)) + payload: new Binary(fromUTF8(clientFinal)) }; const r = await connection.command(ns(`${db}.$cmd`), saslContinueCmd, undefined); const parsedResponse = parsePayload(r.payload); - if (!compareDigest(Buffer.from(parsedResponse.v, 'base64'), serverSignature)) { + if (!compareDigest(fromBase64(parsedResponse.v), serverSignature)) { throw new MongoRuntimeError('Server returned an invalid signature'); } @@ -199,7 +201,7 @@ async function continueScramConversation( const retrySaslContinueCmd = { saslContinue: 1, conversationId: r.conversationId, - payload: Buffer.alloc(0) + payload: allocateBuffer(0) }; await connection.command(ns(`${db}.$cmd`), retrySaslContinueCmd, undefined); @@ -245,15 +247,7 @@ function passwordDigest(username: string, password: string) { } // XOR two buffers -function xor(a: Buffer, b: Buffer) { - if (!Buffer.isBuffer(a)) { - a = Buffer.from(a); - } - - if (!Buffer.isBuffer(b)) { - b = Buffer.from(b); - } - +function xor(a: Uint8Array, b: Uint8Array) { const length = Math.max(a.length, b.length); const res = []; @@ -261,19 +255,19 @@ function xor(a: Buffer, b: Buffer) { res.push(a[i] ^ b[i]); } - return Buffer.from(res).toString('base64'); + return ByteUtils.toBase64(fromNumberArray(res)); } -function H(method: CryptoMethod, text: Buffer) { +function H(method: CryptoMethod, text: Uint8Array): Uint8Array { return crypto.createHash(method).update(text).digest(); } -function HMAC(method: CryptoMethod, key: Buffer, text: Buffer | string) { +function HMAC(method: CryptoMethod, key: Uint8Array, text: Uint8Array | string): Uint8Array { return crypto.createHmac(method, key).update(text).digest(); } interface HICache { - [key: string]: Buffer; + [key: string]: Uint8Array; } let _hiCache: HICache = {}; @@ -288,9 +282,9 @@ const hiLengthMap = { sha1: 20 }; -function HI(data: string, salt: Buffer, iterations: number, cryptoMethod: CryptoMethod) { +function HI(data: string, salt: Uint8Array, iterations: number, cryptoMethod: CryptoMethod) { // omit the work if already generated - const key = [data, salt.toString('base64'), iterations].join('_'); + const key = [data, ByteUtils.toBase64(salt), iterations].join('_'); if (_hiCache[key] != null) { return _hiCache[key]; } @@ -314,7 +308,7 @@ function HI(data: string, salt: Buffer, iterations: number, cryptoMethod: Crypto return saltedData; } -function compareDigest(lhs: Buffer, rhs: Uint8Array) { +function compareDigest(lhs: Uint8Array, rhs: Uint8Array) { if (lhs.length !== rhs.length) { return false; } diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 177ca3a783..fe80a2a062 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -1,5 +1,15 @@ -import type { BSONSerializeOptions, Document, Long } from '../bson'; -import * as BSON from '../bson'; +import { + allocateBuffer, + allocateUnsafeBuffer, + BSON, + type BSONSerializeOptions, + ByteUtils, + concatBuffers, + type Document, + type Long, + readInt32LE, + writeInt32LE +} from '../bson'; import { MongoInvalidArgumentError, MongoRuntimeError } from '../error'; import { type ReadPreference } from '../read_preference'; import type { ClientSession } from '../sessions'; @@ -30,7 +40,7 @@ const QUERY_FAILURE = 2; const SHARD_CONFIG_STALE = 4; const AWAIT_CAPABLE = 8; -const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into; +const encodeUTF8Into = BSON.ByteUtils.encodeUTF8Into; /** @internal */ export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest; @@ -182,10 +192,10 @@ export class OpQueryRequest { if (this.batchSize !== this.numberToReturn) this.numberToReturn = this.batchSize; // Allocate write protocol header buffer - const header = Buffer.alloc( + const header = allocateBuffer( 4 * 4 + // Header 4 + // Flags - Buffer.byteLength(this.ns) + + ByteUtils.utf8ByteLength(this.ns) + 1 + // namespace 4 + // numberToSkip 4 // numberToReturn @@ -256,7 +266,7 @@ export class OpQueryRequest { index = index + 4; // Write collection name - index = index + header.write(this.ns, index, 'utf8') + 1; + index = index + encodeUTF8Into(header, this.ns, index) + 1; header[index - 1] = 0; // Write header information flags numberToSkip @@ -290,8 +300,8 @@ export interface MessageHeader { /** @internal */ export class OpReply { parsed: boolean; - raw: Buffer; - data: Buffer; + raw: Uint8Array; + data: Uint8Array; opts: BSONSerializeOptions; length: number; requestId: number; @@ -318,9 +328,9 @@ export class OpReply { moreToCome = false; constructor( - message: Buffer, + message: Uint8Array, msgHeader: MessageHeader, - msgBody: Buffer, + msgBody: Uint8Array, opts?: BSONSerializeOptions ) { this.parsed = false; @@ -364,10 +374,10 @@ export class OpReply { this.index = 20; // Read the message body - this.responseFlags = this.data.readInt32LE(0); - this.cursorId = new BSON.Long(this.data.readInt32LE(4), this.data.readInt32LE(8)); - this.startingFrom = this.data.readInt32LE(12); - this.numberReturned = this.data.readInt32LE(16); + this.responseFlags = readInt32LE(this.data, 0); + this.cursorId = new BSON.Long(readInt32LE(this.data, 4), readInt32LE(this.data, 8)); + this.startingFrom = readInt32LE(this.data, 12); + this.numberReturned = readInt32LE(this.data, 16); if (this.numberReturned < 0 || this.numberReturned > 2 ** 32 - 1) { throw new RangeError( @@ -433,7 +443,7 @@ export class DocumentSequence { documents: Document[]; serializedDocumentsLength: number; private chunks: Uint8Array[]; - private header: Buffer; + private header: Uint8Array; /** * Create a new document sequence for the provided field. @@ -446,7 +456,7 @@ export class DocumentSequence { this.serializedDocumentsLength = 0; // Document sequences starts with type 1 at the first byte. // Field strings must always be UTF-8. - const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1); + const buffer = allocateUnsafeBuffer(1 + 4 + this.field.length + 1); buffer[0] = 1; // Third part is the field name at offset 5 with trailing null byte. encodeUTF8Into(buffer, `${this.field}\0`, 5); @@ -473,7 +483,9 @@ export class DocumentSequence { // Push the document raw bson. this.chunks.push(buffer); // Write the new length. - this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1); + if (this.header) { + writeInt32LE(this.header, 4 + this.field.length + 1 + this.serializedDocumentsLength, 1); + } return this.serializedDocumentsLength + this.header.length; } @@ -482,7 +494,7 @@ export class DocumentSequence { * @returns The section bytes. */ toBin(): Uint8Array { - return Buffer.concat(this.chunks); + return concatBuffers(this.chunks); } } @@ -531,8 +543,8 @@ export class OpMsgRequest { typeof options.exhaustAllowed === 'boolean' ? options.exhaustAllowed : false; } - toBin(): Buffer[] { - const buffers: Buffer[] = []; + toBin(): Uint8Array[] { + const buffers: Uint8Array[] = []; let flags = 0; if (this.checksumPresent) { @@ -547,7 +559,7 @@ export class OpMsgRequest { flags |= OPTS_EXHAUST_ALLOWED; } - const header = Buffer.alloc( + const header = allocateBuffer( 4 * 4 + // Header 4 // Flags ); @@ -558,11 +570,11 @@ export class OpMsgRequest { const command = this.command; totalLength += this.makeSections(buffers, command); - header.writeInt32LE(totalLength, 0); // messageLength - header.writeInt32LE(this.requestId, 4); // requestID - header.writeInt32LE(0, 8); // responseTo - header.writeInt32LE(OP_MSG, 12); // opCode - header.writeUInt32LE(flags, 16); // flags + writeInt32LE(header, totalLength, 0); // messageLength + writeInt32LE(header, this.requestId, 4); // requestID + writeInt32LE(header, 0, 8); // responseTo + writeInt32LE(header, OP_MSG, 12); // opCode + writeInt32LE(header, flags, 16); // flags return buffers; } @@ -571,7 +583,7 @@ export class OpMsgRequest { */ makeSections(buffers: Uint8Array[], document: Document): number { const sequencesBuffer = this.extractDocumentSequences(document); - const payloadTypeBuffer = Buffer.allocUnsafe(1); + const payloadTypeBuffer = allocateUnsafeBuffer(1); payloadTypeBuffer[0] = 0; const documentBuffer = this.serializeBson(document); @@ -606,11 +618,11 @@ export class OpMsgRequest { } } if (chunks.length > 0) { - return Buffer.concat(chunks); + return concatBuffers(chunks); } // If we have no document sequences we return an empty buffer for nothing to add // to the payload. - return Buffer.alloc(0); + return allocateBuffer(0); } serializeBson(document: Document): Uint8Array { @@ -630,8 +642,8 @@ export class OpMsgRequest { /** @internal */ export class OpMsgResponse { parsed: boolean; - raw: Buffer; - data: Buffer; + raw: Uint8Array; + data: Uint8Array; opts: BSONSerializeOptions; length: number; requestId: number; @@ -652,9 +664,9 @@ export class OpMsgResponse { sections: Uint8Array[] = []; constructor( - message: Buffer, + message: Uint8Array, msgHeader: MessageHeader, - msgBody: Buffer, + msgBody: Uint8Array, opts?: BSONSerializeOptions ) { this.parsed = false; @@ -676,7 +688,7 @@ export class OpMsgResponse { this.fromCompressed = msgHeader.fromCompressed; // Read response flags - this.responseFlags = msgBody.readInt32LE(0); + this.responseFlags = readInt32LE(msgBody, 0); this.checksumPresent = (this.responseFlags & OPTS_CHECKSUM_PRESENT) !== 0; this.moreToCome = (this.responseFlags & OPTS_MORE_TO_COME) !== 0; this.exhaustAllowed = (this.responseFlags & OPTS_EXHAUST_ALLOWED) !== 0; @@ -700,9 +712,9 @@ export class OpMsgResponse { this.index = 4; while (this.index < this.data.length) { - const payloadType = this.data.readUInt8(this.index++); + const payloadType = this.data[this.index++]; if (payloadType === 0) { - const bsonSize = this.data.readUInt32LE(this.index); + const bsonSize = readInt32LE(this.data, this.index); const bin = this.data.subarray(this.index, this.index + bsonSize); this.sections.push(bin); @@ -758,31 +770,31 @@ export class OpCompressedRequest { return !uncompressibleCommands.has(commandName); } - async toBin(): Promise { - const concatenatedOriginalCommandBuffer = Buffer.concat(this.command.toBin()); + async toBin(): Promise { + const concatenatedOriginalCommandBuffer = concatBuffers(this.command.toBin()); // otherwise, compress the message const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); // Extract information needed for OP_COMPRESSED from the uncompressed message - const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); + const originalCommandOpCode = readInt32LE(concatenatedOriginalCommandBuffer, 12); // Compress the message body const compressedMessage = await compress(this.options, messageToBeCompressed); // Create the msgHeader of OP_COMPRESSED - const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); - msgHeader.writeInt32LE( + const msgHeader = allocateBuffer(MESSAGE_HEADER_SIZE); + writeInt32LE( + msgHeader, MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0 ); // messageLength - msgHeader.writeInt32LE(this.command.requestId, 4); // requestID - msgHeader.writeInt32LE(0, 8); // responseTo (zero) - msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode - + writeInt32LE(msgHeader, this.command.requestId, 4); // requestID + writeInt32LE(msgHeader, 0, 8); // responseTo (zero) + writeInt32LE(msgHeader, OP_COMPRESSED, 12); // opCode // Create the compression details of OP_COMPRESSED - const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); - compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode - compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader - compressionDetails.writeUInt8(Compressor[this.options.agreedCompressor], 8); // compressorID + const compressionDetails = allocateBuffer(COMPRESSION_DETAILS_SIZE); + writeInt32LE(compressionDetails, originalCommandOpCode, 0); // originalOpcode + writeInt32LE(compressionDetails, messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader + writeInt32LE(compressionDetails, Compressor[this.options.agreedCompressor], 8); // compressorID return [msgHeader, compressionDetails, compressedMessage]; } } diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 9652e3a5e4..d812384847 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -2,7 +2,9 @@ import { type Readable, Transform, type TransformCallback } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; import { + BSON, type BSONSerializeOptions, + concatBuffers, deserialize, type DeserializeOptions, type Document, @@ -174,7 +176,7 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string { return HostAddress.fromHostPort(remoteAddress, remotePort).toString(); } - return uuidV4().toString('hex'); + return BSON.ByteUtils.toHex(uuidV4()); } /** @internal */ @@ -204,7 +206,7 @@ export class Connection extends TypedEventEmitter { private lastUseTime: number; private clusterTime: Document | null = null; private error: Error | null = null; - private dataEvents: AsyncGenerator | null = null; + private dataEvents: AsyncGenerator | null = null; private readonly socketTimeoutMS: number; private readonly monitorCommands: boolean; @@ -696,7 +698,7 @@ export class Connection extends TypedEventEmitter { zlibCompressionLevel: options.zlibCompressionLevel ?? 0 }); - const buffer = Buffer.concat(await finalCommand.toBin()); + const buffer = concatBuffers(await finalCommand.toBin()); if (options.timeoutContext?.csotEnabled()) { if ( @@ -794,7 +796,7 @@ export class SizedMessageTransform extends Transform { this.connection = connection; } - override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void { + override _transform(chunk: Uint8Array, encoding: unknown, callback: TransformCallback): void { if (this.connection.delayedTimeoutId != null) { clearTimeout(this.connection.delayedTimeoutId); this.connection.delayedTimeoutId = null; diff --git a/src/cmap/handshake/client_metadata.ts b/src/cmap/handshake/client_metadata.ts index 48cb6a4735..7e26f7b6f3 100644 --- a/src/cmap/handshake/client_metadata.ts +++ b/src/cmap/handshake/client_metadata.ts @@ -1,7 +1,7 @@ import * as os from 'os'; import * as process from 'process'; -import { BSON, type Document, Int32, NumberUtils } from '../../bson'; +import { BSON, type Document, fromUTF8, Int32, toUTF8 } from '../../bson'; import { MongoInvalidArgumentError } from '../../error'; import type { DriverInfo, MongoOptions } from '../../mongo_client'; import { fileIsAccessible } from '../../utils'; @@ -114,9 +114,9 @@ export async function makeClientMetadata( // Add app name first, it must be sent if (appName.length > 0) { const name = - Buffer.byteLength(appName, 'utf8') <= 128 + BSON.ByteUtils.utf8ByteLength(appName) <= 128 ? appName - : Buffer.from(appName, 'utf8').subarray(0, 128).toString('utf8'); + : toUTF8(fromUTF8(appName), 0, 128, false); metadataDocument.ifItFitsItSits('application', { name }); } @@ -336,7 +336,7 @@ declare const Bun: { (): void; version?: string } | undefined; * with a future change to these global objects. */ function getRuntimeInfo(): string { - const endianness = NumberUtils.isBigEndian ? 'BE' : 'LE'; + const endianness = BSON.NumberUtils.isBigEndian ? 'BE' : 'LE'; if ('Deno' in globalThis) { const version = typeof Deno?.version?.deno === 'string' ? Deno?.version?.deno : '0.0.0-unknown'; diff --git a/src/cmap/wire_protocol/compression.ts b/src/cmap/wire_protocol/compression.ts index 4ee941ff3c..3ed4557938 100644 --- a/src/cmap/wire_protocol/compression.ts +++ b/src/cmap/wire_protocol/compression.ts @@ -1,5 +1,6 @@ import * as zlib from 'zlib'; +import { concatBuffers, readInt32LE } from '../../bson'; import { LEGACY_HELLO_COMMAND } from '../../constants'; import { getSnappy, getZstdLibrary, type SnappyLib, type ZStandard } from '../../deps'; import { MongoDecompressionError, MongoInvalidArgumentError } from '../../error'; @@ -43,7 +44,7 @@ export const uncompressibleCommands = new Set([ const ZSTD_COMPRESSION_LEVEL = 3; const zlibInflate = (buf: zlib.InputType) => { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { zlib.inflate(buf, (error, result) => { if (error) return reject(error); resolve(result); @@ -52,7 +53,7 @@ const zlibInflate = (buf: zlib.InputType) => { }; const zlibDeflate = (buf: zlib.InputType, options: zlib.ZlibOptions) => { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { zlib.deflate(buf, options, (error, result) => { if (error) return reject(error); resolve(result); @@ -76,8 +77,8 @@ function loadSnappy() { // Facilitate compressing a message using an agreed compressor export async function compress( options: OpCompressesRequestOptions, - dataToBeCompressed: Buffer -): Promise { + dataToBeCompressed: Uint8Array +): Promise { const zlibOptions = {} as zlib.ZlibOptions; switch (options.agreedCompressor) { case 'snappy': { @@ -106,7 +107,10 @@ export async function compress( } // Decompress a message using the given compressor -export async function decompress(compressorID: number, compressedData: Buffer): Promise { +export async function decompress( + compressorID: number, + compressedData: Uint8Array +): Promise { if ( compressorID !== Compressor.snappy && compressorID !== Compressor.zstd && @@ -159,7 +163,7 @@ const MESSAGE_HEADER_SIZE = 16; export async function compressCommand( command: WriteProtocolMessageType, description: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number } -): Promise { +): Promise { const finalCommand = description.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) ? command @@ -168,7 +172,7 @@ export async function compressCommand( zlibCompressionLevel: description.zlibCompressionLevel ?? 0 }); const data = await finalCommand.toBin(); - return Buffer.concat(data); + return concatBuffers(data); } /** @@ -178,12 +182,12 @@ export async function compressCommand( * * This method does not parse the response's BSON. */ -export async function decompressResponse(message: Buffer): Promise { +export async function decompressResponse(message: Uint8Array): Promise { const messageHeader: MessageHeader = { - length: message.readInt32LE(0), - requestId: message.readInt32LE(4), - responseTo: message.readInt32LE(8), - opCode: message.readInt32LE(12) + length: readInt32LE(message, 0), + requestId: readInt32LE(message, 4), + responseTo: readInt32LE(message, 8), + opCode: readInt32LE(message, 12) }; if (messageHeader.opCode !== OP_COMPRESSED) { @@ -195,8 +199,8 @@ export async function decompressResponse(message: Buffer): Promise>>, + ReturnType>>, 'promise' >; @@ -32,7 +32,7 @@ export function onData( * value from the event in this list. Next time they call .next() * we pull the first value out of this list and resolve a promise with it. */ - const unconsumedEvents = new List(); + const unconsumedEvents = new List(); /** * When there has not yet been an event, a new promise will be created * and implicitly stored in this list. When an event occurs we take the first @@ -49,7 +49,7 @@ export function onData( /** Set to true only after event listeners have been removed. */ let finished = false; - const iterator: AsyncGenerator & AsyncDisposable = { + const iterator: AsyncGenerator & AsyncDisposable = { next() { // First, we consume all unread events const value = unconsumedEvents.shift(); @@ -71,7 +71,7 @@ export function onData( if (finished) return closeHandler(); // Wait until an event happens - const { promise, resolve, reject } = promiseWithResolvers>(); + const { promise, resolve, reject } = promiseWithResolvers>(); unconsumedPromises.push({ resolve, reject }); return promise; }, @@ -107,7 +107,7 @@ export function onData( return iterator; - function eventHandler(value: Buffer) { + function eventHandler(value: Uint8Array) { const promise = unconsumedPromises.shift(); if (promise != null) promise.resolve({ value, done: false }); else unconsumedEvents.push(value); diff --git a/src/deps.ts b/src/deps.ts index f4c0b0f9ca..fc92beda8b 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -50,12 +50,12 @@ type ZStandardLib = { * Compress using zstd. * @param buf - Buffer to be compressed. */ - compress(buf: Buffer, level?: number): Promise; + compress(buf: Uint8Array, level?: number): Promise; /** * Decompress using zstd. */ - decompress(buf: Buffer): Promise; + decompress(buf: Uint8Array): Promise; }; export type ZStandard = ZStandardLib | { kModuleError: MongoMissingDependencyError }; @@ -144,13 +144,13 @@ export type SnappyLib = { * In order to support both we must check the return value of the function * @param buf - Buffer to be compressed */ - compress(buf: Buffer): Promise; + compress(buf: Uint8Array): Promise; /** * In order to support both we must check the return value of the function * @param buf - Buffer to be compressed */ - uncompress(buf: Buffer, opt: { asBuffer: true }): Promise; + uncompress(buf: Uint8Array, opt: { asBuffer: true }): Promise; }; export function getSnappy(): SnappyLib | { kModuleError: MongoMissingDependencyError } { diff --git a/src/gridfs/download.ts b/src/gridfs/download.ts index 563678c317..fb6691cb8c 100644 --- a/src/gridfs/download.ts +++ b/src/gridfs/download.ts @@ -1,6 +1,6 @@ import { Readable } from 'stream'; -import type { Document, ObjectId } from '../bson'; +import { ByteUtils, type Document, type ObjectId } from '../bson'; import type { Collection } from '../collection'; import { CursorTimeoutMode } from '../cursor/abstract_cursor'; import type { FindCursor } from '../cursor/find_cursor'; @@ -248,7 +248,7 @@ function doRead(stream: GridFSBucketReadStream): void { ); } - let buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer; + let buf = ByteUtils.isUint8Array(doc.data) ? doc.data : doc.data.buffer; if (buf.byteLength !== expectedLength) { if (bytesRemaining <= 0) { diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index 9c1b265901..6aaab6465c 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -1,6 +1,6 @@ import { Writable } from 'stream'; -import { type Document, ObjectId } from '../bson'; +import { allocateBuffer, ByteUtils, copyBuffer, type Document, fromUTF8, ObjectId } from '../bson'; import type { Collection } from '../collection'; import { CursorTimeoutMode } from '../cursor/abstract_cursor'; import { @@ -62,7 +62,7 @@ export class GridFSBucketWriteStream extends Writable { /** The number of bytes that each chunk will be limited to */ chunkSizeBytes: number; /** Space used to store a chunk currently being inserted */ - bufToStore: Buffer; + bufToStore: Uint8Array; /** Accumulates the number of bytes inserted as the stream uploads chunks */ length: number; /** Accumulates the number of chunks inserted as the stream uploads file contents */ @@ -122,7 +122,7 @@ export class GridFSBucketWriteStream extends Writable { this.id = options.id ? options.id : new ObjectId(); // properly inherit the default chunksize from parent this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes; - this.bufToStore = Buffer.alloc(this.chunkSizeBytes); + this.bufToStore = allocateBuffer(this.chunkSizeBytes); this.length = 0; this.n = 0; this.pos = 0; @@ -178,7 +178,7 @@ export class GridFSBucketWriteStream extends Writable { * @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush. */ override _write( - chunk: Buffer | string, + chunk: Uint8Array | string, encoding: BufferEncoding, callback: Callback ): void { @@ -227,7 +227,7 @@ function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Ca queueMicrotask(() => callback(error)); } -function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk { +function createChunkDoc(filesId: ObjectId, n: number, data: Uint8Array): GridFSChunk { return { _id: new ObjectId(), files_id: filesId, @@ -409,7 +409,7 @@ function createFilesDoc( function doWrite( stream: GridFSBucketWriteStream, - chunk: Buffer | string, + chunk: Uint8Array | string, encoding: BufferEncoding, callback: Callback ): void { @@ -417,13 +417,17 @@ function doWrite( return; } - const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); + const inputBuf = typeof chunk === 'string' ? fromUTF8(chunk) : ByteUtils.toLocalBufferType(chunk); stream.length += inputBuf.length; // Input is small enough to fit in our buffer if (stream.pos + inputBuf.length < stream.chunkSizeBytes) { - inputBuf.copy(stream.bufToStore, stream.pos); + copyBuffer({ + source: inputBuf, + target: stream.bufToStore, + targetStart: stream.pos + }); stream.pos += inputBuf.length; queueMicrotask(callback); return; @@ -437,12 +441,18 @@ function doWrite( let outstandingRequests = 0; while (inputBufRemaining > 0) { const inputBufPos = inputBuf.length - inputBufRemaining; - inputBuf.copy(stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy); + copyBuffer({ + source: inputBuf, + target: stream.bufToStore, + targetStart: stream.pos, + sourceStart: inputBufPos, + sourceEnd: inputBufPos + numToCopy + }); stream.pos += numToCopy; spaceRemaining -= numToCopy; let doc: GridFSChunk; if (spaceRemaining === 0) { - doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore)); + doc = createChunkDoc(stream.id, stream.n, new Uint8Array(stream.bufToStore)); const remainingTimeMS = stream.timeoutContext?.remainingTimeMS; if (remainingTimeMS != null && remainingTimeMS <= 0) { @@ -495,8 +505,14 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void // Create a new buffer to make sure the buffer isn't bigger than it needs // to be. - const remnant = Buffer.alloc(stream.pos); - stream.bufToStore.copy(remnant, 0, 0, stream.pos); + const remnant = allocateBuffer(stream.pos); + copyBuffer({ + source: stream.bufToStore, + target: remnant, + targetStart: 0, + sourceStart: 0, + sourceEnd: stream.pos + }); const doc = createChunkDoc(stream.id, stream.n, remnant); // If the stream was aborted, do not write remnant diff --git a/src/sessions.ts b/src/sessions.ts index c1d9ab70b0..775e1d6a61 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -1,6 +1,13 @@ import { setTimeout } from 'timers/promises'; -import { Binary, type Document, Long, type Timestamp } from './bson'; +import { + allocateUnsafeBuffer, + Binary, + ByteUtils, + type Document, + Long, + type Timestamp +} from './bson'; import type { CommandOptions, Connection } from './cmap/connection'; import { ConnectionPoolMetrics } from './cmap/metrics'; import { type MongoDBResponse } from './cmap/wire_protocol/responses'; @@ -37,7 +44,6 @@ import { TxnState } from './transactions'; import { - ByteUtils, calculateDurationInMs, commandSupportsReadConcern, isPromiseLike, @@ -1018,7 +1024,7 @@ export class ServerSession { /** @internal */ constructor(cloned?: ServerSession | null) { if (cloned != null) { - const idBytes = Buffer.allocUnsafe(16); + const idBytes = allocateUnsafeBuffer(16); idBytes.set(cloned.id.id.buffer); this.id = { id: new Binary(idBytes, cloned.id.id.sub_type) }; this.lastUse = cloned.lastUse; diff --git a/src/utils.ts b/src/utils.ts index 397850649d..ea0721f515 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -6,7 +6,16 @@ import * as http from 'http'; import * as process from 'process'; import { clearTimeout, setTimeout } from 'timers'; -import { deserialize, type Document, ObjectId, resolveBSONOptions } from './bson'; +import { + allocateBuffer, + allocateUnsafeBuffer, + ByteUtils, + deserialize, + type Document, + getInt32LE, + ObjectId, + resolveBSONOptions +} from './bson'; import type { Connection } from './cmap/connection'; import { MAX_SUPPORTED_WIRE_VERSION } from './cmap/wire_protocol/constants'; import type { Collection } from './collection'; @@ -44,26 +53,6 @@ export type Callback = (error?: AnyError, result?: T) => void; export type AnyOptions = Document; -export const ByteUtils = { - toLocalBufferType(this: void, buffer: Buffer | Uint8Array): Buffer { - return Buffer.isBuffer(buffer) - ? buffer - : Buffer.from(buffer.buffer, buffer.byteOffset, buffer.byteLength); - }, - - equals(this: void, seqA: Uint8Array, seqB: Uint8Array) { - return ByteUtils.toLocalBufferType(seqA).equals(seqB); - }, - - compare(this: void, seqA: Uint8Array, seqB: Uint8Array) { - return ByteUtils.toLocalBufferType(seqA).compare(seqB); - }, - - toBase64(this: void, uint8array: Uint8Array) { - return ByteUtils.toLocalBufferType(uint8array).toString('base64'); - } -}; - /** * Returns true if value is a Uint8Array or a Buffer * @param value - any value that may be a Uint8Array @@ -318,7 +307,7 @@ export function* makeCounter(seed = 0): Generator { * Synchronously Generate a UUIDv4 * @internal */ -export function uuidV4(): Buffer { +export function uuidV4(): Uint8Array { const result = crypto.randomBytes(16); result[6] = (result[6] & 0x0f) | 0x40; result[8] = (result[8] & 0x3f) | 0x80; @@ -793,7 +782,7 @@ export class List { * @internal */ export class BufferPool { - private buffers: List; + private buffers: List; private totalByteLength: number; constructor() { @@ -806,7 +795,7 @@ export class BufferPool { } /** Adds a buffer to the internal buffer pool list */ - append(buffer: Buffer): void { + append(buffer: Uint8Array): void { this.buffers.push(buffer); this.totalByteLength += buffer.length; } @@ -821,13 +810,13 @@ export class BufferPool { } const firstBuffer = this.buffers.first(); if (firstBuffer != null && firstBuffer.byteLength >= 4) { - return firstBuffer.readInt32LE(0); + return getInt32LE(firstBuffer, 0); } // Unlikely case: an int32 is split across buffers. // Use read and put the returned buffer back on top const top4Bytes = this.read(4); - const value = top4Bytes.readInt32LE(0); + const value = getInt32LE(top4Bytes, 0); // Put it back. this.totalByteLength += 4; @@ -837,19 +826,19 @@ export class BufferPool { } /** Reads the requested number of bytes, optionally consuming them */ - read(size: number): Buffer { + read(size: number): Uint8Array { if (typeof size !== 'number' || size < 0) { throw new MongoInvalidArgumentError('Argument "size" must be a non-negative number'); } // oversized request returns empty buffer if (size > this.totalByteLength) { - return Buffer.alloc(0); + return allocateBuffer(0); } // We know we have enough, we just don't know how it is spread across chunks // TODO(NODE-4732): alloc API should change based on raw option - const result = Buffer.allocUnsafe(size); + const result = allocateUnsafeBuffer(size); for (let bytesRead = 0; bytesRead < size; ) { const buffer = this.buffers.shift(); @@ -1240,8 +1229,8 @@ export function squashError(_error: unknown) { } export const randomBytes = (size: number) => { - return new Promise((resolve, reject) => { - crypto.randomBytes(size, (error: Error | null, buf: Buffer) => { + return new Promise((resolve, reject) => { + crypto.randomBytes(size, (error: Error | null, buf: Uint8Array) => { if (error) return reject(error); resolve(buf); }); @@ -1331,10 +1320,10 @@ export function decorateDecryptionResult( ): void { if (isTopLevelDecorateCall) { // The original value could have been either a JS object or a BSON buffer - if (Buffer.isBuffer(original)) { + if (ByteUtils.isUint8Array(original)) { original = deserialize(original); } - if (Buffer.isBuffer(decrypted)) { + if (ByteUtils.isUint8Array(decrypted)) { throw new MongoRuntimeError('Expected result of decryption to be deserialized BSON object'); } } diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index dc393fe099..6914bb6ce3 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1,6 +1,6 @@ import { setTimeout } from 'node:timers'; -import { ObjectId } from 'bson'; +import { ByteUtils, ObjectId } from 'bson'; import { expect } from 'chai'; import * as sinon from 'sinon'; @@ -10,7 +10,6 @@ import { decorateWithExplain, Explain } from '../../src/explain'; import { abortable, BufferPool, - ByteUtils, checkParentDomainMatch, compareObjectId, hasAtomicOperators,