python utils常用模块 - clickhouse数据库的类
测试代码
常规操作类文件
manager_clickhouse.py
from clickhouse_driver import Client
from .config_db import select_config as Config
from .config_db import table_lis, sqlcmd_lis
from functools import lru_cache
from .func_decorators import retry
class ClickHouseManager:
@retry(exceptions=(Exception,), tries=5, initial_delay=3)
def __init__(self, env_db='testing'):
try:
# 这里使用 config 对象的属性来创建连接
# 初始化 ClickHouse 连接
self.config = Config(env_db)
self.client = self.create_connection()
except Exception as e:
# 打印异常信息,并根据需要进行处理
print(f"An error occurred during initialization: {e}")
# 可以选择重新抛出异常或设置 self.client 为 None
raise
def create_connection(self):
return Client(
host=self.config.host,
port=self.config.port,
user=self.config.user,
password=self.config.password,
database=self.config.db
)
def create_database(self, db_name):
try:
# 创建数据库
query = f"CREATE DATABASE IF NOT EXISTS {db_name}"
res = self.client.execute(query)
return res
except Exception as e:
print(f"An error occurred while creating the database: {e}")
# 根据需要处理异常或重新抛出
raise
@lru_cache(maxsize=1)
def initialize_database(self):
for table_name, create_table_query in zip(table_lis, sqlcmd_lis):
# print(sqlcmd)
# self.client.execute(create_table_query)
existing_tables = self.query('SHOW TABLES')
if table_name in [row[0] for row in existing_tables]:
print(f'{table_name} 已经存在!')
else:
self.query(create_table_query)
print(f"表 {table_name} 被创建。")
def query(self, query):
try:
# 创建表
res = self.client.execute(query)
return res
except Exception as e:
print(f"An error occurred while executing the query: {e}")
raise
def insert_data(self, table_name, data:list):
try:
columns = ', '.join(data[0].keys())
query = f"INSERT INTO {table_name} ({columns}) VALUES"
# 使用列表推导式和字典的 get 方法处理所有字典中的 None 值
data = [{k: (v if v is not None else '') for k, v in d.items()} for d in data]
# 将字典转换为元组
values = [tuple(row[col] for col in data[0]) for row in data]
# print(f'{query} {values}')
res = self.client.execute(query, values)
print(f' └ 存储数据完毕 {len(data)}条 到{table_name}', end='\r')
return res
except Exception as e:
print(f"An error occurred while inserting data: {e}")
raise
# 调用时候尽量使用with方法
def __enter__(self):
# 当进入 with 语句块时,返回 self 或其他资源
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# 当退出 with 语句块时,执行清理工作
self.disconnect()
# 如果有异常发生,可以选择处理它或让它传播出去
# 如果需要处理异常,可以在这里添加代码
# 如果不需要处理,就保持 __exit__ 返回 None
# 可以选择在这里处理异常
if exc_type:
print(f"An error occurred: {exc_val}")
def disconnect(self):
# 断开 ClickHouse 连接
if self.client:
try:
self.client.disconnect()
except Exception as e:
print(f"An error occurred while disconnecting: {e}")
# 根据需要实现其他 CRUD 方法(更新、删除、查询)
# if __name__ == "__main__":
# clickhouse_manager = ClickHouseManager()
# clickhouse_manager.create_database("mydb199")
# r = clickhouse_manager.query("show databases;")
# print('r',r)
# input('稍等一下'*20)
数据库配置
config_db.py
import re
import time
class Config:
host = "192.168.1.250"
user = "default"
password = ""
port = 19000
db = "test" # 默认数据库名,可以根据环境配置不同
class ProductionConfig(Config):
# 生产环境特定配置
DEBUG = False
db = 'jp_qoo10'
class DevelopmentConfig(Config):
# 开发环境特定配置
DEBUG = True
db = 'jp_qoo10'
class TestingConfig(Config):
# 测试环境特定配置
TESTING = True
db = 'test'
# 根据 env_db 环境变量选择配置
def select_config(env_db):
if env_db == 'production':
return ProductionConfig()
elif env_db == 'testing':
return TestingConfig()
elif env_db == 'development':
return DevelopmentConfig()
else:
raise ValueError("Unknown configuration environment")
# preseting 预设基本参数
# 设置CLICKHOUSE服务器
clickhousesrv_nt950xdc = {
# 'host':'100.73.47.104', # tailscale
# 'host':'172.22.0.254', # zerotier
'host':'192.168.1.250',
'user': 'root',
'password': 'eS5@pjue9IcYPVTr',
'port': 9306,
'db': 'jp_qoo10',
}
clickhousesrv_y700 = {
'host':'100.102.68.125', # tailscale
# 'host':'172.22.0.252', # zerotier
# 'host':'192.168.1.252', # 局域网
'user': 'root',
'password': '123456',
'port': 3306,
'db': 'jp_qoo10',
}
clickhousesrv_jiwoo = {
'host':'121.142.193.89', # 万网地址
# 'host':'100.114.176.27', # Tailscale
# 'host':'192.168.0.250', # 内网地址
'user': 'root',
'password': 'awsE0NY~Am-Ux61xTg!]',
'port': 3366,
'db': 'jp_qoo10',
}
# 设置表时间戳后缀
timestr = "".join(re.findall(r"\d+\.?\d*", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
# 设置mysql表名称
prefixList = [
'cate',
'pl',
'dp',
'dpr'
]
# # 生成mysql表名称和时间戳
# table_lis = [ prefix + '_'+ timestr for prefix in prefixList]
table_lis = [ prefix + '_'+ 'qoo10' for prefix in prefixList]
# CLICKHOUSE table表 列标题 column_name
mktable_sql_cate = '''CREATE TABLE IF NOT EXISTS {table} (
cate1 String COMMENT '一级主类目',
cate1_code String COMMENT '一级类目编码',
url_cate1 String COMMENT '一级类目链接',
cate2 String COMMENT '二级类目',
cate2_code String COMMENT '二级类目编码',
url_cate2 String COMMENT '二级类目链接',
cate3 String COMMENT '三级类目',
cate3_code String COMMENT '三级类目编码',
url_cate3 String COMMENT '三级类目链接',
cate4 String COMMENT '四级类目',
cate4_code String COMMENT '四级类目编码',
url_cate4 String COMMENT '四级类目链接',
cate4_prdcnt Int32 COMMENT '四级类目下产品计数',
timestamp DateTime DEFAULT toDateTime(now(), 'Asia/Seoul') COMMENT '时间戳'
)
ENGINE = MergeTree()
ORDER BY (cate3, cate4)
'''.format(table=table_lis[0])
mktable_sql_pl = '''CREATE TABLE IF NOT EXISTS {table} (
cate1 String COMMENT '一级主类目',
cate1_code String COMMENT '一级类目编码',
cate2 String COMMENT '二级类目',
cate2_code String COMMENT '二级类目编码',
cate3 String COMMENT '三级类目',
cate3_code String COMMENT '三级类目编码',
cate4 String COMMENT '四级分类',
cate4_code String COMMENT '四级分类编码',
cate4_prdcnt Int32 COMMENT '商品数量',
url_pl String COMMENT '产品列表url',
brand String COMMENT '品牌名',
brand_no String COMMENT '品牌序号',
brand_url String COMMENT '品牌链接',
goods_title String COMMENT '商品标题',
goods_code String COMMENT '商品编码',
url_dp String COMMENT '产品详情页链接',
goods_price Int32 COMMENT '商品价格',
goods_sellingprice Int32 COMMENT '销售价格',
ship_free String COMMENT '运费',
timestamp DateTime DEFAULT toDateTime(now(), 'Asia/Seoul') COMMENT '时间戳'
) ENGINE = MergeTree()
ORDER BY (cate3, cate4, goods_title)
'''.format(table=table_lis[1])
mktable_sql_dp = '''CREATE TABLE IF NOT EXISTS {table} (
cate1 String COMMENT '一级主类目',
cate1_code String COMMENT '一级类目编码',
url_cate1 String COMMENT '一级类目链接',
cate2 String COMMENT '二级类目',
cate2_code String COMMENT '二级类目编码',
url_cate2 String COMMENT '二级类目链接',
cate3 String COMMENT '三级类目',
cate3_code String COMMENT '三级类目编码',
url_cate3 String COMMENT '三级类目链接',
cate4 String COMMENT '四级分类',
cate4_code String COMMENT '四级分类编码',
url_cate4 String COMMENT '四级类目链接',
cate5 String COMMENT '五级分类',
cate5_code String COMMENT '五级分类编码',
url_cate5 String COMMENT '五级类目链接',
cate6 String COMMENT '六级分类',
cate6_code String COMMENT '六级分类编码',
url_cate6 String COMMENT '六级类目链接',
goods_title String COMMENT '商品标题',
goods_thumbmain String COMMENT '商品主图',
goods_thumblis String COMMENT '商品缩略图清单',
goods_code String COMMENT '商品编码',
brand String COMMENT '品牌名',
brand_url String COMMENT '品牌链接',
brand_no String COMMENT '品牌id',
goods_price Int32 COMMENT '商品价格',
goods_sellingprice Int32 COMMENT '商品实际销售价格',
goods_departure String COMMENT '商品出发国家',
goods_delivery String COMMENT '快递公司名称',
goods_distributor String COMMENT '商品流通公司',
goods_jancode String COMMENT '商品识别码',
review_counts Int32 COMMENT '评论计数',
qna_counts Int32 COMMENT 'qna计数',
review_rating_score Float32 COMMENT '商品星标评级',
review_satisfied_items String COMMENT '商品满意程度',
goods_detailimglis String COMMENT '详情页图片链接',
sellershop_name String COMMENT '店铺名称',
sellershop_url String COMMENT '店铺链接',
sellershop_logo String COMMENT '店铺logo',
sellershop_prdcnt Int32 COMMENT '店铺商品数',
sellershop_followers Int32 COMMENT '店铺粉丝数',
seller_company String COMMENT '销售公司名称',
seller_staff String COMMENT '销售公司负责人姓名',
seller_tel String COMMENT '销售公司联系电话',
seller_mail String COMMENT '销售公司电子邮箱',
seller_address String COMMENT '销售公司地址',
seller_businesshours String COMMENT '销售公司工作时间',
url_dp String COMMENT '商品链接',
timestamp DateTime DEFAULT toDateTime(now(), 'Asia/Seoul') COMMENT '时间戳'
) ENGINE = MergeTree()
ORDER BY (cate3, cate4, brand, goods_title)
'''.format(table=table_lis[2])
mktable_sql_dpr = '''CREATE TABLE IF NOT EXISTS {table} (
goods_code String COMMENT '商品编码',
review_userid String COMMENT '评论user_id',
review_score String COMMENT '评论星级评分',
review_date Date COMMENT '评论日期',
review_goods String COMMENT '评论商品',
review_eval1 String COMMENT '商品满意部分1',
review_eval2 String COMMENT '商品满意部分2',
review_eval3 String COMMENT '商品满意部分3',
review_txt String COMMENT '评论内容',
review_code String COMMENT '评论编码',
review_likecount Int32 COMMENT '评论喜欢计数',
url_dpr String COMMENT '评论链接',
timestamp DateTime DEFAULT toDateTime(now(), 'Asia/Seoul') COMMENT '时间戳'
) ENGINE = MergeTree()
ORDER BY (goods_code, review_date)
'''.format(table=table_lis[3])
# 设置mysql建表命令列表
sqlcmd_lis = [mktable_sql_cate, mktable_sql_pl, mktable_sql_dp, mktable_sql_dpr]
数据库初始化
init.py
from .string_cleaner import string_to_float
from .func_decorators import timer, retry
from .request import r_get_selector
from .manager_clickhouse import ClickHouseManager
from .config_db import table_lis, sqlcmd_lis
db = ClickHouseManager()
db.initialize_database()
License:
CC BY 4.0