Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Flowise support different environment variables to configure your instance. You
| DATABASE_USER | Database username (When DATABASE_TYPE is not sqlite) | String | |
| DATABASE_PASSWORD | Database password (When DATABASE_TYPE is not sqlite) | String | |
| DATABASE_NAME | Database name (When DATABASE_TYPE is not sqlite) | String | |
| DATABASE_SCHEMA | Database schema (When DATABASE_TYPE is postgres) | String | |
| DATABASE_SSL_KEY_BASE64 | Database SSL client cert in base64 (takes priority over DATABASE_SSL) | Boolean | false |
| DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false |
| SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` |
Expand Down
1 change: 1 addition & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DATABASE_PATH=/root/.flowise
# DATABASE_PORT=5432
# DATABASE_HOST=""
# DATABASE_NAME=flowise
# DATABASE_SCHEMA: default,
# DATABASE_USER=root
# DATABASE_PASSWORD=mypassword
# DATABASE_SSL=true
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-compose-queue-prebuilt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down Expand Up @@ -172,6 +173,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down
1 change: 1 addition & 0 deletions docker/worker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ DATABASE_PATH=/root/.flowise
# DATABASE_PORT=5432
# DATABASE_HOST=""
# DATABASE_NAME=flowise
# # DATABASE_SCHEMA: default,
# DATABASE_USER=root
# DATABASE_PASSWORD=mypassword
# DATABASE_SSL=true
Expand Down
1 change: 1 addition & 0 deletions docker/worker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_SCHEMA=${DATABASE_SCHEMA}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { DataSource } from 'typeorm'
import { CheckpointTuple, SaverOptions, SerializerProtocol } from '../interface'
import { IMessage, MemoryMethods } from '../../../../src/Interface'
import { mapChatMessageToBaseMessage } from '../../../../src/utils'
import { getSchema } from '../../../vectorstores/Postgres/utils'

export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods {
protected isSetup: boolean
Expand All @@ -17,6 +18,7 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods
this.config = config
const { threadId } = config
this.threadId = threadId
this.tableName = getSchema() != 'public' ? `${getSchema()}.checkpoints` : 'checkpoints'
}

sanitizeTableName(tableName: string): string {
Expand Down Expand Up @@ -154,6 +156,7 @@ CREATE TABLE IF NOT EXISTS ${tableName} (
const queryRunner = dataSource.createQueryRunner()
const thread_id = config.configurable?.thread_id || this.threadId
const tableName = this.sanitizeTableName(this.tableName)

let sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${tableName} WHERE thread_id = $1`
const args = [thread_id]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../
import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base'
import { DataSource } from 'typeorm'
import { getHost, getSSL } from '../../vectorstores/Postgres/utils'
import { getDatabase, getPort, getTableName } from './utils'
import { getDatabase, getPort, getSchema, getTableName } from './utils'

const serverCredentialsExists = !!process.env.POSTGRES_RECORDMANAGER_USER && !!process.env.POSTGRES_RECORDMANAGER_PASSWORD

Expand Down Expand Up @@ -74,6 +74,14 @@ class PostgresRecordManager_RecordManager implements INode {
additionalParams: true,
optional: true
},
{
label: 'Schema',
name: 'schema',
type: 'string',
placeholder: getSchema(),
additionalParams: true,
optional: true
},
{
label: 'Namespace',
name: 'namespace',
Expand Down Expand Up @@ -160,7 +168,8 @@ class PostgresRecordManager_RecordManager implements INode {
ssl: getSSL(nodeData),
username: user,
password: password,
database: getDatabase(nodeData)
database: getDatabase(nodeData),
schema: getSchema(nodeData)
}

const args = {
Expand All @@ -186,12 +195,14 @@ class PostgresRecordManager implements RecordManagerInterface {
lc_namespace = ['langchain', 'recordmanagers', 'postgres']
config: PostgresRecordManagerOptions
tableName: string
schema: string
namespace: string

constructor(namespace: string, config: PostgresRecordManagerOptions) {
const { tableName } = config
const { tableName, postgresConnectionOptions: { schema }} = config
this.namespace = namespace
this.tableName = tableName
this.schema = schema
this.config = config
}

Expand All @@ -206,7 +217,17 @@ class PostgresRecordManager implements RecordManagerInterface {

return tableName
}
sanitizeSchema(schema: string): string {
// Trim and normalize case, turn whitespace into underscores
schema = schema.trim().toLowerCase().replace(/\s+/g, '_')

// Validate using a regex (alphanumeric and underscores only)
if (!/^[a-zA-Z0-9_]+$/.test(schema)) {
throw new Error('Invalid table name')
}

return schema
}
private async getDataSource(): Promise<DataSource> {
const { postgresConnectionOptions } = this.config
if (!postgresConnectionOptions) {
Expand All @@ -225,21 +246,20 @@ class PostgresRecordManager implements RecordManagerInterface {
try {
const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)

const fullTableName = this.getFullTableName()
await queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS "${tableName}" (
CREATE TABLE IF NOT EXISTS ${fullTableName} (
uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key TEXT NOT NULL,
namespace TEXT NOT NULL,
updated_at Double PRECISION NOT NULL,
group_id TEXT,
UNIQUE (key, namespace)
);
CREATE INDEX IF NOT EXISTS updated_at_index ON "${tableName}" (updated_at);
CREATE INDEX IF NOT EXISTS key_index ON "${tableName}" (key);
CREATE INDEX IF NOT EXISTS namespace_index ON "${tableName}" (namespace);
CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
CREATE INDEX IF NOT EXISTS updated_at_index ON ${fullTableName} (updated_at);
CREATE INDEX IF NOT EXISTS key_index ON ${fullTableName} (key);
CREATE INDEX IF NOT EXISTS namespace_index ON ${fullTableName} (namespace);
CREATE INDEX IF NOT EXISTS group_id_index ON ${fullTableName} (group_id);`)

await queryRunner.release()
} catch (e: any) {
Expand Down Expand Up @@ -291,8 +311,7 @@ class PostgresRecordManager implements RecordManagerInterface {

const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)

const fullTableName = this.getFullTableName()
const updatedAt = await this.getTime()
const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {}

Expand All @@ -310,7 +329,7 @@ class PostgresRecordManager implements RecordManagerInterface {

const valuesPlaceholders = recordsToUpsert.map((_, j) => this.generatePlaceholderForRowAt(j, recordsToUpsert[0].length)).join(', ')

const query = `INSERT INTO "${tableName}" (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;`
const query = `INSERT INTO ${fullTableName} (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;`
try {
await queryRunner.manager.query(query, recordsToUpsert.flat())
await queryRunner.release()
Expand All @@ -329,13 +348,12 @@ class PostgresRecordManager implements RecordManagerInterface {

const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)

const fullTableName = this.getFullTableName()
const startIndex = 2
const arrayPlaceholders = keys.map((_, i) => `$${i + startIndex}`).join(', ')

const query = `
SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join "${tableName}" on k=key and namespace = $1;
SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join ${fullTableName} on k=key and namespace = $1;
`
try {
const res = await queryRunner.manager.query(query, [this.namespace, ...keys.flat()])
Expand All @@ -351,9 +369,9 @@ class PostgresRecordManager implements RecordManagerInterface {

async listKeys(options?: ListKeyOptions): Promise<string[]> {
const { before, after, limit, groupIds } = options ?? {}
const tableName = this.sanitizeTableName(this.tableName)
const fullTableName = this.getFullTableName()

let query = `SELECT key FROM "${tableName}" WHERE namespace = $1`
let query = `SELECT key FROM ${fullTableName} WHERE namespace = $1`
const values: (string | number | (string | null)[])[] = [this.namespace]

let index = 2
Expand Down Expand Up @@ -405,10 +423,10 @@ class PostgresRecordManager implements RecordManagerInterface {

const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)
const fullTableName = this.getFullTableName()

try {
const query = `DELETE FROM "${tableName}" WHERE namespace = $1 AND key = ANY($2);`
const query = `DELETE FROM ${fullTableName} WHERE namespace = $1 AND key = ANY($2);`
await queryRunner.manager.query(query, [this.namespace, keys])
await queryRunner.release()
} catch (error) {
Expand All @@ -418,6 +436,11 @@ class PostgresRecordManager implements RecordManagerInterface {
await dataSource.destroy()
}
}
getFullTableName(): string {
const tableName = this.sanitizeTableName(this.tableName)
const schema = this.sanitizeSchema(this.schema)
return schema ? `${schema}.${tableName}` : tableName
}
}

module.exports = { nodeClass: PostgresRecordManager_RecordManager }
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ export function getHost(nodeData?: INodeData) {
export function getDatabase(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.database, process.env.POSTGRES_RECORDMANAGER_DATABASE)
}
export function getSchema(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_RECORDMANAGER_SCHEMA, 'public')
}

export function getPort(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.port, process.env.POSTGRES_RECORDMANAGER_PORT, '5432')
Expand Down
10 changes: 9 additions & 1 deletion packages/components/nodes/vectorstores/Postgres/Postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { VectorStore } from '@langchain/core/vectorstores'
import { VectorStoreDriver } from './driver/Base'
import { TypeORMDriver } from './driver/TypeORM'
// import { PGVectorDriver } from './driver/PGVector'
import { getContentColumnName, getDatabase, getHost, getPort, getTableName } from './utils'
import { getContentColumnName, getDatabase, getHost, getPort, getTableName, getSchema } from './utils'

const serverCredentialsExists = !!process.env.POSTGRES_VECTORSTORE_USER && !!process.env.POSTGRES_VECTORSTORE_PASSWORD

Expand Down Expand Up @@ -119,6 +119,14 @@ class Postgres_VectorStores implements INode {
additionalParams: true,
optional: true
},
{
label: 'Schema',
name: 'schema',
type: 'string',
placeholder: getSchema(),
additionalParams: true,
optional: true
},
/*{
label: 'Driver',
name: 'driver',
Expand Down
16 changes: 14 additions & 2 deletions packages/components/nodes/vectorstores/Postgres/driver/Base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { VectorStore } from '@langchain/core/vectorstores'
import { getCredentialData, getCredentialParam, ICommonObject, INodeData } from '../../../../src'
import { Document } from '@langchain/core/documents'
import { Embeddings } from '@langchain/core/embeddings'
import { getDatabase, getHost, getPort, getSSL, getTableName } from '../utils'
import { getDatabase, getHost, getPort, getSchema, getSSL, getTableName } from '../utils'

export abstract class VectorStoreDriver {
constructor(protected nodeData: INodeData, protected options: ICommonObject) {}
Expand Down Expand Up @@ -34,7 +34,9 @@ export abstract class VectorStoreDriver {
getTableName() {
return this.sanitizeTableName(getTableName(this.nodeData))
}

getSchema(){
return this.sanitizeSchema(getSchema(this.nodeData))
}
getEmbeddings() {
return this.nodeData.inputs?.embeddings as Embeddings
}
Expand All @@ -50,7 +52,17 @@ export abstract class VectorStoreDriver {

return tableName
}
sanitizeSchema(schema: string): string {
// Trim and normalize case, turn whitespace into underscores
schema = schema.trim().toLowerCase().replace(/\s+/g, '_')

// Validate using a regex (alphanumeric and underscores only)
if (!/^[a-zA-Z0-9_]+$/.test(schema)) {
throw new Error('Invalid schema name')
}

return schema
}
async getCredentials() {
const credentialData = await getCredentialData(this.nodeData.credential ?? '', this.options)
const user = getCredentialParam('user', credentialData, this.nodeData, process.env.POSTGRES_VECTORSTORE_USER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class TypeORMDriver extends VectorStoreDriver {
if (!this._postgresConnectionOptions) {
const { user, password } = await this.getCredentials()
const additionalConfig = this.nodeData.inputs?.additionalConfig as string
const schema = this.getSchema()

let additionalConfiguration = {}

Expand All @@ -33,7 +34,13 @@ export class TypeORMDriver extends VectorStoreDriver {
username: user, // Required by TypeORMVectorStore
user: user, // Required by Pool in similaritySearchVectorWithScore
password: password,
database: this.getDatabase()
// schema: this.getSchema(),
database: this.getDatabase(),
extra: {
...(additionalConfiguration as any)?.extra,
// Force PostgreSQL to use your schema
options: `-c search_path=${schema},public`
}
} as DataSourceOptions

// Prevent using default MySQL port, otherwise will throw uncaught error and crashing the app
Expand Down
4 changes: 3 additions & 1 deletion packages/components/nodes/vectorstores/Postgres/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export function getSSL(nodeData?: INodeData) {
export function getTableName(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.tableName, process.env.POSTGRES_VECTORSTORE_TABLE_NAME, 'documents')
}

export function getSchema(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.schema, process.env.POSTGRES_VECTORSTORE_SCHEMA, 'public')
}
export function getContentColumnName(nodeData?: INodeData) {
return defaultChain(nodeData?.inputs?.contentColumnName, process.env.POSTGRES_VECTORSTORE_CONTENT_COLUMN_NAME, 'pageContent')
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class SingleStore_VectorStores implements INode {
database: nodeData.inputs?.database as string
},
...(nodeData.inputs?.tableName ? { tableName: nodeData.inputs.tableName as string } : {}),
...(nodeData.inputs?.schema ? { schema: nodeData.inputs.schema as string } : {}),
...(nodeData.inputs?.contentColumnName ? { contentColumnName: nodeData.inputs.contentColumnName as string } : {}),
...(nodeData.inputs?.vectorColumnName ? { vectorColumnName: nodeData.inputs.vectorColumnName as string } : {}),
...(nodeData.inputs?.metadataColumnName ? { metadataColumnName: nodeData.inputs.metadataColumnName as string } : {})
Expand Down Expand Up @@ -171,6 +172,7 @@ class SingleStore_VectorStores implements INode {
database: nodeData.inputs?.database as string
},
...(nodeData.inputs?.tableName ? { tableName: nodeData.inputs.tableName as string } : {}),
...(nodeData.inputs?.schema ? { schema: nodeData.inputs.schema as string } : {}),
...(nodeData.inputs?.contentColumnName ? { contentColumnName: nodeData.inputs.contentColumnName as string } : {}),
...(nodeData.inputs?.vectorColumnName ? { vectorColumnName: nodeData.inputs.vectorColumnName as string } : {}),
...(nodeData.inputs?.metadataColumnName ? { metadataColumnName: nodeData.inputs.metadataColumnName as string } : {})
Expand Down
Loading