Skip to content

Commit 544d2e7

Browse files
git init
0 parents  commit 544d2e7

File tree

1,299 files changed

+228265
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,299 files changed

+228265
-0
lines changed

.env

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2+
MYSQL_HOST="localhost"
3+
MYSQL_PORT="3306"
4+
MYSQL_USER="root"
5+
MYSQL_PASSWORD="rootPwd@2022"

.env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
MYSQL_HOST="0.0.0.0"
2+
MYSQL_PORT="3306"
3+
MYSQL_USER="root"
4+
MYSQL_PASSWORD="root"

.idea/.gitignore

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/Mysql-DB-Sinker.iml

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Producer/Producer.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
const { Kafka } = require('kafkajs')
2+
3+
const kafka = new Kafka({
4+
clientId: 'my-app',
5+
brokers: ['localhost:9092']
6+
})
7+
const producer = kafka.producer()
8+
9+
const sendMessage = async (message) => {
10+
await producer.connect()
11+
await producer.send(message)
12+
await producer.disconnect()
13+
}
14+
15+
module.exports=sendMessage

Source-Connector/mysql-source.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
const mysql = require('mysql');
2+
const MySQLEvents = require('@rodrigogs/mysql-events');
3+
const sendMessage = require('../Producer/Producer');
4+
5+
6+
const program = async () => {
7+
const connection = mysql.createConnection({
8+
user: 'root',
9+
password: 'root'
10+
});
11+
12+
const instance = new MySQLEvents(connection, {
13+
startAtEnd: true,
14+
excludedSchemas: {
15+
mysql: true,
16+
},
17+
});
18+
19+
await instance.start();
20+
21+
instance.addTrigger({
22+
name: 'OPERATIONS',
23+
expression: '*',
24+
statement: MySQLEvents.STATEMENTS.ALL,
25+
onEvent: (event) => { // You will receive the events here
26+
const message=
27+
{
28+
topic: event.table,
29+
messages: [
30+
{ key: event.table, value: JSON.stringify(event) }
31+
],
32+
}
33+
console.log(message);
34+
sendMessage(message);
35+
},
36+
});
37+
38+
instance.on(MySQLEvents.EVENTS.CONNECTION_ERROR, console.error);
39+
instance.on(MySQLEvents.EVENTS.ZONGJI_ERROR, console.error);
40+
};
41+
42+
program()
43+
.then(() => console.log('Waiting for database events...'))
44+
.catch(console.error);

app.js

Whitespace-only changes.

consumer/mongoDb.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
//mongodb://localhost:27017
2+
const { MongoClient } = require("mongodb");
3+
4+
// Replace the uri string with your MongoDB deployment's connection string.
5+
const uri ="mongodb://localhost:27017"
6+
7+
const client = new MongoClient(uri);
8+
9+
const updateOrCreate = async (collectionName,payload)=> {
10+
try {
11+
await client.connect();
12+
const db=client.db('sink_db');
13+
const collection = db.collection(collectionName);
14+
const query = { id: payload.id };
15+
const update = { $set: payload};
16+
const options = { upsert: true };
17+
const result= await collection.updateOne(query, update, options);
18+
console.log(result);
19+
} catch(error) {
20+
console.log("error: ", error)
21+
}
22+
};
23+
24+
const deleteDoc = async (collectionName,payload)=> {
25+
try {
26+
await client.connect();
27+
const db=client.db('sink_db');
28+
const query = { id: payload.id };
29+
const collection = db.collection(collectionName);
30+
const result = await collection.deleteOne(query);
31+
console.log(result);
32+
} catch(error) {
33+
console.log("error: ", error)
34+
}
35+
}
36+
37+
module.exports={
38+
updateOrCreate,
39+
deleteDoc
40+
}

0 commit comments

Comments
 (0)