Skip to content

Commit 71af5cb

Browse files
authored
Merge pull request #107 from Erengokce03/master
Add DBMatik
2 parents 383612d + 2d7b397 commit 71af5cb

File tree

3 files changed

+1441
-0
lines changed

3 files changed

+1441
-0
lines changed

Projects/DBMatik/db.py

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
import sqlite3
2+
import csv
3+
import json
4+
5+
class Database:
6+
def __init__(self, db_name="database.db"):
7+
self.db_name = db_name
8+
self.conn = None
9+
self.cursor = None
10+
11+
def create_database(self, name_user_gave):
12+
# Ensure the database name has a .db extension
13+
if not name_user_gave.endswith('.db'):
14+
name_user_gave += '.db'
15+
16+
try:
17+
self.conn = sqlite3.connect(name_user_gave)
18+
self.cursor = self.conn.cursor()
19+
print("Connection and cursor set successfully")
20+
except sqlite3.Error as e:
21+
self.handle_error(e)
22+
23+
def export_table_to_json(self, table_name, file_name):
24+
try:
25+
self.cursor.execute(f"SELECT * FROM {table_name}")
26+
rows = self.cursor.fetchall()
27+
column_names = [desc[0] for desc in self.cursor.description]
28+
29+
# Create a list of dictionaries where each dictionary represents a row
30+
data = [dict(zip(column_names, row)) for row in rows]
31+
32+
with open(file_name, 'w') as file:
33+
json.dump(data, file, indent=4)
34+
35+
print(f"Data exported to {file_name} successfully.")
36+
except sqlite3.Error as e:
37+
self.handle_error(e)
38+
39+
def create_table(self, table_name, columns, primary_key=None):
40+
try:
41+
columns_def = ', '.join([f"{col_name} {col_type}" for col_name, col_type in columns])
42+
if primary_key:
43+
columns_def += f", PRIMARY KEY ({primary_key})"
44+
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_def})"
45+
self.cursor.execute(create_table_sql)
46+
self.conn.commit()
47+
except sqlite3.Error as e:
48+
self.handle_error(e)
49+
50+
51+
def delete_table(self , table_name):
52+
query = f"DROP TABLE IF EXISTS {table_name}"
53+
self.execute_query(query)
54+
55+
56+
def insert_data(self, data_column: str, data, table_name):
57+
try:
58+
self.cursor.execute(f"INSERT INTO {table_name} ({data_column}) VALUES (?)", (data,))
59+
except sqlite3.Error as e:
60+
self.handle_error(e)
61+
62+
def insert_all_data(self, data_list, table_name):
63+
try:
64+
if not data_list:
65+
return
66+
67+
columns = data_list[0].keys()
68+
placeholders = ', '.join(['?'] * len(columns))
69+
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
70+
data_values = [tuple(row.values()) for row in data_list]
71+
72+
self.cursor.executemany(query, data_values)
73+
self.conn.commit()
74+
except sqlite3.Error as e:
75+
self.handle_error(e)
76+
77+
def execute_query(self, query, params=None):
78+
print("------")
79+
print("------")
80+
print("------")
81+
print("1")
82+
83+
if self.cursor is None:
84+
raise Exception("Database connection is not established")
85+
print(f"Executing query: {query}") # Debug statement
86+
print("2")
87+
self.cursor.execute(query, params or [])
88+
print("3")
89+
column_names = [desc[0] for desc in self.cursor.description] if self.cursor.description else []
90+
print("4")
91+
result = self.cursor.fetchall()
92+
print("5")
93+
print(f"Query executed: {query}") # Debug statement
94+
print("6")
95+
return column_names, result
96+
97+
98+
def get_columns_of_table(self, table_name):
99+
try:
100+
self.cursor.execute(f"PRAGMA table_info({table_name})")
101+
columns = self.cursor.fetchall()
102+
column_names = [col[1] for col in columns]
103+
104+
return column_names
105+
except sqlite3.Error as e:
106+
self.handle_error(e)
107+
return []
108+
109+
110+
def update_data(self, table_name, data_list, unique_column):
111+
print("---------")
112+
print("---------")
113+
print("---------")
114+
115+
i = 0
116+
try:
117+
if not data_list:
118+
return
119+
120+
for row in data_list:
121+
columns = row.keys()
122+
print(columns)
123+
print(unique_column[i])
124+
set_clause = ', '.join([f"{col} = ?" if row[col] is not None else f"{col} = NULL" for col in columns if col != unique_column[i]])
125+
print(set_clause)
126+
query = f"UPDATE {table_name} SET {set_clause} WHERE {unique_column[i]} = ?"
127+
print(query)
128+
values = [row[col] for col in columns if col != unique_column[i]]
129+
print(values)
130+
unique_column_value = row.get(unique_column[i], None)
131+
print(unique_column_value)
132+
values.append(unique_column_value)
133+
print(values)
134+
self.cursor.execute(query, values)
135+
136+
self.conn.commit()
137+
except sqlite3.Error as e:
138+
self.handle_error(e)
139+
140+
141+
142+
143+
def delete_data(self, table, delete_conditions):
144+
try:
145+
if not delete_conditions:
146+
raise ValueError("Please enter at least one condition to delete")
147+
148+
where_clause = ' AND '.join([f'{column} = ?' for column in delete_conditions.keys()])
149+
query = f'DELETE FROM {table} WHERE {where_clause}'
150+
values = list(delete_conditions.values())
151+
152+
self.execute_query(query, values)
153+
self.conn.commit()
154+
except sqlite3.Error as e:
155+
self.handle_error(e)
156+
157+
158+
def get_tables(self):
159+
try:
160+
print("2")
161+
self.cursor.execute("SELECT name FROM sqlite_schema WHERE type='table' ORDER BY name;")
162+
tables = self.cursor.fetchall()
163+
164+
return [table[0] for table in tables]
165+
except sqlite3.Error as e:
166+
self.handle_error(e)
167+
return [] # Return an empty list if there's an error
168+
169+
def handle_error(self, error):
170+
print(f"An error occurred: {error}")
171+
172+
def close_connection(self):
173+
if self.conn:
174+
self.conn.close()
175+
176+
def export_table_to_text(self, table_name, file_name):
177+
try:
178+
self.cursor.execute(f"SELECT * FROM {table_name}")
179+
rows = self.cursor.fetchall()
180+
column_names = [desc[0] for desc in self.cursor.description]
181+
182+
with open(file_name, 'w', newline='') as file:
183+
writer = csv.writer(file)
184+
writer.writerow(column_names)
185+
writer.writerows(rows)
186+
187+
print(f"Data exported to {file_name} successfully.")
188+
except sqlite3.Error as e:
189+
self.handle_error(e)
190+
191+
def import_table_from_text(self, table_name, file_name):
192+
try:
193+
with open(file_name, 'r') as file:
194+
reader = csv.reader(file)
195+
columns = next(reader)
196+
data = [tuple(row) for row in reader]
197+
198+
placeholders = ', '.join(['?'] * len(columns))
199+
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
200+
201+
self.cursor.executemany(query, data)
202+
self.conn.commit()
203+
204+
print(f"Data imported from {file_name} successfully.")
205+
except sqlite3.Error as e:
206+
self.handle_error(e)
207+
208+
def export_table_to_csv(self, table_name, file_name):
209+
try:
210+
self.cursor.execute(f"SELECT * FROM {table_name}")
211+
rows = self.cursor.fetchall()
212+
column_names = [desc[0] for desc in self.cursor.description]
213+
214+
with open(file_name, 'w', newline='') as file:
215+
writer = csv.writer(file)
216+
writer.writerow(column_names)
217+
writer.writerows(rows)
218+
219+
print(f"Data exported to {file_name} successfully.")
220+
except sqlite3.Error as e:
221+
self.handle_error(e)
222+
223+
def import_table_from_csv(self, table_name, file_name):
224+
try:
225+
with open(file_name, 'r') as file:
226+
reader = csv.reader(file)
227+
columns = next(reader)
228+
data = [tuple(row) for row in reader]
229+
230+
placeholders = ', '.join(['?'] * len(columns))
231+
query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
232+
233+
self.cursor.executemany(query, data)
234+
self.conn.commit()
235+
236+
print(f"Data imported from {file_name} successfully.")
237+
except sqlite3.Error as e:
238+
self.handle_error(e)
239+
240+
241+
242+
243+
if __name__ == "__main__":
244+
deneme_db2 = Database()
245+
deneme_db2.create_database("deneme2.db")
246+
247+
columns = [("User_id", "INTEGER"), ("User_name", "TEXT"), ("User_age", "INTEGER")]
248+
deneme_db2.create_table("Users1", columns, "User_id")
249+
250+
data_list = [
251+
{"User_id": 3, "User_name": "Metehan Efe", "User_age": 23},
252+
{"User_id": 4, "User_name": "Barış Bağçeci", "User_age": 21},
253+
{"User_id": 5, "User_name": "Kubilay Çakmak", "User_age": 25}
254+
]
255+
deneme_db2.insert_all_data(data_list, "Users1")
256+
257+
query = "SELECT * FROM Users1"
258+
##print(deneme_db2.execute_query(query))
259+
260+
# Exporting data to CSV
261+
deneme_db2.export_table_to_csv("Users1", "users1_export.csv")
262+
263+
# Importing data from CSV
264+
deneme_db2.create_table("Users2", columns, "User_id")
265+
deneme_db2.import_table_from_csv("Users2", "users1_export.csv")
266+
query = "SELECT * FROM Users2"
267+
deneme_db2.execute_query(query)
268+

0 commit comments

Comments
 (0)