python 操作DuckDB的类
import duckdb
class DuckDBManager:
def __init__(self, db_path):
"""
初始化 DuckDBManager 类。
:param db_path: 数据库文件路径或内存数据库的标识符。
"""
self.db_path = db_path
self.conn = None
def connect(self):
"""
建立数据库连接。
"""
if self.conn is None:
self.conn = duckdb.connect(self.db_path, read_only=False)
def create_table(self, table_name, schema):
"""
根据给定的模式创建一个新的表。
:param table_name: 要创建的表的名称。
:param schema: 表的列和数据类型的字典。
"""
self.connect() # 确保连接已经建立
columns = ', '.join([f"{k} {v}" for k, v in schema.items()])
query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})"
self.conn.execute(query)
def insert(self, table_name, data):
"""
向表中插入一条记录。
:param table_name: 要插入数据的表的名称。
:param data: 一个字典,包含要插入的数据。
"""
self.connect() # 确保连接已经建立
placeholders = ', '.join(['?' for _ in data])
columns = ', '.join(data.keys())
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
self.conn.execute(query, list(data.values()))
def batch_insert(self, table_name, data_list):
"""
向表中批量插入多条记录。
:param table_name: 要插入数据的表的名称。
:param data_list: 包含多个字典的列表,每个字典代表一条记录。
"""
self.connect() # 确保连接已经建立
columns = ', '.join(data_list[0].keys())
placeholders = ', '.join(['?' for _ in data_list[0]])
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
with self.conn.cursor() as cursor:
for data in data_list:
cursor.execute(query, list(data.values()))
def update(self, table_name, data, condition):
"""
根据条件更新表中的记录。
:param table_name: 要更新的表的名称。
:param data: 一个字典,包含要更新的数据。
:param condition: 更新条件的字符串。
"""
self.connect() # 确保连接已经建立
set_clause = ', '.join([f"{k} = ?" for k in data])
query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}"
with self.conn.cursor() as cursor:
cursor.execute(query, list(data.values()))
def delete(self, table_name, condition):
"""
根据条件从表中删除记录。
:param table_name: 要删除记录的表的名称。
:param condition: 删除条件的字符串。
"""
self.connect() # 确保连接已经建立
query = f"DELETE FROM {table_name} WHERE {condition}"
self.conn.execute(query)
def select(self, table_name, columns='*'):
"""
从表中选择记录。
:param table_name: 要查询的表的名称。
:param columns: 要查询的列,用字符串表示,如 'id, name'。
:return: 查询结果。
"""
self.connect() # 确保连接已经建立
query = f"SELECT {columns} FROM {table_name}"
with self.conn.cursor() as cursor:
cursor.execute(query)
return cursor.fetchall()
def close(self):
"""
关闭数据库连接。
"""
if self.conn:
self.conn.close()
self.conn = None
# 使用示例
if __name__ == "__main__":
db_manager = DuckDBManager(':memory:') # 或者使用文件路径,例如 'my_database.duckdb'
try:
# 创建表
schema = {'id': 'INTEGER', 'name': 'VARCHAR', 'age': 'INTEGER'}
db_manager.create_table('people', schema)
# 插入数据
person_data = {'id': 1, 'name': 'Alice', 'age': 30}
db_manager.insert('people', person_data)
# 批量插入数据
batch_data = [
{'id': 2, 'name': 'Bob', 'age': 25},
{'id': 3, 'name': 'Charlie', 'age': 35}
]
db_manager.batch_insert('people', batch_data)
# 更新数据
update_data = {'name': 'Alicia'}
db_manager.update('people', update_data, "id = 1")
# 查询数据
results = db_manager.select('people')
print('results', results)
for row in results:
print(row)
# 删除数据
db_manager.delete('people', "id = 2")
except duckdb.Error as e:
print(f"An error occurred: {e}")
finally:
db_manager.close()
License:
CC BY 4.0