领先的免费Web技术教程,涵盖HTML到ASP.NET

网站首页 > 知识剖析 正文

python执行.sql语法和文件(python执行py文件)

nixiaole 2025-04-01 21:05:38 知识剖析 22 ℃

前面我们执行了python文件对mysql数据库的单条语法操作,那么如果我们要对很多句的.sql文件如何操作呢?

用for循环来做,除去空格!但是这句没有清除sql的备注语言

try:

#db = pymysql.connect(host='localhost',user='root', password='12345678', port=3306, db='stocks', charset='utf8')

#c = db.cursor()

with open('000001_sz.sql',encoding='utf-8',mode='r') as f:

# 读取整个sql文件,以分号切割。[:-1]删除最后一个元素,也就是空字符串

sql_list = f.read().split(';')[:-1]

sql_listsure = ''

#print(sql_list)

#raise SystemExit

for x in sql_list:

# 判断包含空行的

if '\n' in x:

# 替换空行为1个空格

x = x.replace('\n', ' ')


# 判断多个空格时

if ' ' in x:

# 替换为空

x = x.replace(' ', '')


# sql语句添加分号结尾

sql_item = x+';'

c.execute(sql_item)

print("执行成功sql: %s"%sql_item)

except Exception as e:

print(e)

print('执行失败sql: %s'%sql_item)

finally:

那我们再添加上删除sql的备注的正则表达式

import re

def rehint_limit(sql_values):

write_tag = 0 # 用来控制是否写入新变量

write_limit = '' # 记录/或者*

sql_result = '' # 记录去除注释后的结果

for case in sql_values:

if (write_limit + case) == '/*':

sql_result = sql_result.strip('/') # 去除最后一个/

write_tag += 1

if write_tag == 0:

sql_result += case

if (write_limit + case) == '*/':

write_tag -= 1

write_limit = ''

if '/' == case or '*' == case:

write_limit = case

return sql_result

def rehint_line(sql_values):

rev = re.compile('--.*\\n?')

sql_values = re.sub(rev,'\n',sql_values)

return sql_values

if __name__ == '__main__':

sql = '''

--这是个sql

select '1' v1,'2' v2 from dual union all;

select '2' v1,'3' v2 from dual union all;

/* 这段select 1 v1,2 v2 from dual union all

select 2 v1,3 v2 from dual 写错了

*/

select /* 这个是select语句 */ 'a' v1, --v1列

'b' v2 --v2列

from dual; --dual是个伪表

'''

print(sql)

# 先去除段注释

sql = rehint_limit(sql)

print('rehint_limit: ' + sql)

# 去除行注释

sql = rehint_line(sql)

print('rehint_line: ' + sql)

好像很麻烦的样子!

那咱们封装一下,直接做一个封装的文件

例如import pymysql

from pathlib import Path

class ConnectMsql:

def __init__(self, host='localhost', port=3306, user='root',

password='12345678', database="stocks", filename: str = "test_sh.sql"):

"""

:param host: 域名

:param port: 端口

:param user: 用户名

:param password: 密码

:param database: 数据库名

:param filename: 文件名称

"""

self._host: str = host

self._port: int = port

self._user: str = user

self._password: str = password

self._database: str = database

self._file_path = Path(__file__).parent.joinpath(filename)

def _show_databases_and_create(self):

"""

查询数据库是否存在,不存在则进行新建操作

:return:

"""

connection = pymysql.connect(host=self._host, port=self._port, user=self._user, password=self._password,

cursorclass=pymysql.cursors.DictCursor)

with connection:

with connection.cursor() as cursor:

cursor.execute('show databases;')

result = cursor.fetchall()

results = self._database not in tuple(x["Database"] for x in result)

if results:

with connection.cursor() as cursor:

cursor.execute(f'create database {self._database};')

with connection.cursor() as cursor:

cursor.execute('show databases;')

result = cursor.fetchall()

results = self._database in tuple(x["Database"] for x in result)

return results if results else result

else:

return True

def _export_databases_data(self):

"""

读取.sql文件,解析处理后,执行sql语句

:return:

"""

if self._show_databases_and_create() is True:

connection = pymysql.connect(host=self._host, port=self._port, user=self._user, password=self._password,

database=self._database, charset='utf8')

# 读取sql文件,并提取出sql语句

results, results_list = "", []

with open(self._file_path, mode="r+", encoding="utf-8") as r:

for sql in r.readlines():

# 去除数据中的“\n”和“\r”字符

sql = sql.replace("\n", "").replace("\r", "")

# 获取不是“--”开头且不是“--”结束的数据

if not sql.startswith("--") and not sql.endswith("--"):

# 获取不是“--”的数据

if not sql.startswith("--"):

results = results + sql

# 根据“;”分割数据,处理后插入列表中

for i in results.split(";"):

if i.startswith("/*"):

results_list.append(i.split("*/")[1] + ";")

else:

results_list.append(i + ";")

# 执行sql语句

with connection:

with connection.cursor() as cursor:

# 循环获取sql语句

for x in results_list[:-1]:

# 执行sql语句

cursor.execute(x)

# 提交事务

connection.commit()

else:

return "sql全部语句执行成功 !"

@property

def sql_run(self):

"""

执行方法

:return:

"""

return self._export_databases_data()

if __name__ == '__main__':

res = ConnectMsql().sql_run

print(res)

这样就可以直接用python对*.sql进行操作了

Tags:

最近发表
标签列表