领先的免费Web技术教程,涵盖HTML到ASP.NET

网站首页 > 知识剖析 正文

python散装笔记——161: 数据库访问

nixiaole 2025-04-27 15:31:08 知识剖析 3 ℃

1: SQLite

SQLite是一种轻量级的、基于磁盘的数据库。由于它不需要单独的数据库服务器,因此通常用于原型设计或经常由单个用户或在给定时间内由一个用户使用的较小应用程序。

import sqlite3

conn = sqlite3.connect("users.db")
c = conn.cursor()

c.execute("CREATE TABLE user (name text, age integer)")

c.execute("INSERT INTO user VALUES ('User A', 42)")
c.execute("INSERT INTO user VALUES ('User B', 43)")

conn.commit()

c.execute("SELECT * FROM user")
print(c.fetchall())

conn.close()

上述代码连接到存储在名为users.db的文件中的数据库,如果该文件尚不存在,则会首先创建该文件。你可以通过SQL语句与数据库进行交互。

此示例的结果应为:

[(u'User A', 42), (u'User B', 43)]

SQLite语法:深入分析

开始使用

  1. 使用以下命令导入sqlite模块:
>>> import sqlite3
  1. 要使用该模块,你必须首先创建一个Connection对象,该对象代表数据库。这里的数据将存储在example.db文件中:
>>> conn = sqlite3.connect('users.db')

或者,你也可以提供特殊名称:memory:来创建一个临时的内存数据库,如下所示:

>>> conn = sqlite3.connect(':memory:')
  1. 一旦有了Connection,你就可以创建一个Cursor对象,并调用其execute()方法来执行SQL命令:
c = conn.cursor()

# Create table
c.execute('''CREATE TABLE stocks
(date text, trans text, symbol text, qty real, price real)''')

# Insert a row of data
c.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")

# Save (commit) the changes
conn.commit()

# We can also close the connection if we are done with it.
# Just be sure any changes have been committed or they will be lost.
conn.close()

Connection的重要属性和函数

  1. isolation_level

这是一个用于获取或设置当前隔离级别的属性。可以是None(自动提交模式),或者是DEFERREDIMMEDIATEEXCLUSIVE之一。

  1. cursor

cursor对象用于执行SQL命令和查询。

  1. commit()

提交当前事务。

  1. rollback()

回滚自上次调用commit()以来所做的任何更改。

  1. close()

关闭数据库连接。它不会自动调用commit()。如果在未调用commit()的情况下(假设你不在自动提交模式下)关闭了连接,那么所有更改都将丢失。

  1. total_changes

一个记录自数据库打开以来修改、删除或插入的总行数的属性。

  1. executeexecutemanyexecutescript

这些函数的执行方式与cursor对象的相同。这其实是一个快捷方式,因为通过Connection对象调用这些函数会导致创建一个中间的cursor对象,并调用该cursor对象的相应方法。

  1. row_factory

你可以将此属性更改为一个可调用对象,该对象接受cursor和原始行作为元组,并将返回实际的结果行。

def dict_factory(cursor, row):
  d = {}
  for i, col in enumerate(cursor.description):
    d[col[0]] = row[i]
  return d

conn = sqlite3.connect(":memory:")
conn.row_factory = dict_factory

Cursor的重要函数

  1. execute(sql[, parameters])

执行单个SQL语句。SQL语句可以参数化(即使用占位符而不是SQL字面量)。sqlite3模块支持两种占位符:问号?(“qmark风格”)和命名占位符:name(“命名风格”)。

import sqlite3
conn = sqlite3.connect(":memory:")
cur = conn.cursor()
cur.execute("create table people (name, age)")

who = "Sophia"
age = 37
# This IS the qmark STYLE:
cur.execute("insert into people values (?, ?)", (who, age))

# AND this IS the named STYLE:
cur.execute("select * from people where name=:who and age=:age", {"who": who, "age": age}) 
# the KEYS correspond TO the placeholders IN SQL

print(cur.fetchone())

注意:不要使用%s将字符串插入SQL命令中,因为这会使你的程序容易受到SQL注入攻击(参见SQL注入)。

  1. executemany(sql, seq_of_parameters)

对序列sql中的所有参数序列或映射执行SQL命令。

sqlite3模块还允许使用返回参数的迭代器而不是序列。

L = [(1, 'abcd', 'dfj', 300), # A list OF tuples TO be inserted INTO the DATABASE
     (2, 'cfgd', 'dyfj', 400),
     (3, 'sdd', 'dfjh', 300.50)]

conn = sqlite3.connect("test1.db")
conn.execute("create table if not exists book (id int, name text, author text, price real)")
conn.executemany("insert into book values (?, ?, ?, ?)", L)

for row in conn.execute("select * from book"):
  print(ROW)

你也可以将迭代器对象作为executemany的参数传递,该函数将迭代迭代器返回的每个值元组。迭代器必须返回一个值元组。

import sqlite3

class IterChars:
  def __init__(self):
    self.count = ord('a')

  def __iter__(self):
    return self

  def __next__(self): # (USE NEXT(SELF) FOR Python 2)
    if self.count > ord('z'):
      raise StopIteration
    self.count += 1
    return (chr(SELF.count - 1),)

conn = sqlite3.connect("abc.db")
cur = conn.cursor()
cur.execute("create table characters(c)")

theIter = IterChars()
cur.executemany("insert into characters(c) values (?)", theIter)

rows = cur.execute("select c from characters")
for row in rows:
  print(row[0]),
  1. executescript(sql_script)

这是一个非标准的便捷方法,用于一次性执行多个SQL语句。它首先发出一个COMMIT语句,然后执行作为参数传递的SQL脚本。

sql_script可以是strbytes的实例。

import sqlite3
conn = sqlite3.connect(":memory:")
cur = conn.cursor()
cur.executescript(
  """
  create table person(firstname, lastname, age);
  create table book(title, author, published);
  
  insert into book(title, author, published) 
  values (
  'Dirk Gently''s Holistic Detective Agency',
  'Douglas Adams',
  1987
  );
  """)

以下函数与SQL中的SELECT语句一起使用。执行SELECT语句后,你可以将cursor视为迭代器,调用cursorfetchone()方法来检索单个匹配行,或者调用fetchall()来获取匹配行的列表。

迭代器形式的示例:

import sqlite3
stocks = [('2006-01-05', 'BUY', 'RHAT', 100, 35.14),
          ('2006-03-28', 'BUY', 'IBM', 1000, 45.0),
          ('2006-04-06', 'SELL', 'IBM', 500, 53.0),
          ('2006-04-05', 'BUY', 'MSFT', 1000, 72.0)]
conn = sqlite3.connect(":memory:")
conn.execute("create table stocks (date text, buysell text, symb text, amount int, price real)")
conn.executemany("insert into stocks values (?, ?, ?, ?, ?)", stocks)
cur = conn.cursor()

for row in cur.execute('SELECT * FROM stocks ORDER BY price'):
  print(row)

# Output:
# ('2006-01-05', 'BUY', 'RHAT', 100, 35.14)
# ('2006-03-28', 'BUY', 'IBM', 1000, 45.0)
# ('2006-04-06', 'SELL', 'IBM', 500, 53.0)
# ('2006-04-05', 'BUY', 'MSFT', 1000, 72.0)
  1. fetchone()

检索查询结果集的下一行,返回一个单一序列,如果没有更多数据可用则返回None

cur.execute('SELECT * FROM stocks ORDER BY price')
i = cur.fetchone()
while(i):
  print(i)
  i = cur.fetchone()

# Output:
# ('2006-01-05', 'BUY', 'RHAT', 100, 35.14)
# ('2006-03-28', 'BUY', 'IBM', 1000, 45.0)
# ('2006-04-06', 'SELL', 'IBM', 500, 53.0)
# ('2006-04-05', 'BUY', 'MSFT', 1000, 72.0)
  1. fetchmany(size=cursor.arraysize)

检索查询结果的下一批行(由size指定),返回一个列表。如果省略sizefetchmany返回单行。如果没有更多行可用,则返回空列表。

cur.execute('SELECT * FROM stocks ORDER BY price')
print(cur.fetchmany(2))

# Output:
# [('2006-01-05', 'BUY', 'RHAT', 100, 35.14), ('2006-03-28', 'BUY', 'IBM', 1000, 45.0)]
  1. fetchall()

检索查询结果的所有(剩余)行,返回一个列表。

cur.execute('SELECT * FROM stocks ORDER BY price')
print(cur.fetchall())

# Output:
# [('2006-01-05', 'BUY', 'RHAT', 100, 35.14), ('2006-03-28', 'BUY', 'IBM', 1000, 45.0), ('2006-04-06', 'SELL', 'IBM', 500, 53.0), ('2006-04-05', 'BUY', 'MSFT', 1000, 72.0)]

SQLite和Python数据类型

SQLite原生支持以下数据类型:NULLINTEGERREALTEXTBLOB

当从SQL移动到Python或反之亦然时,数据类型之间的转换如下:

None <-> NULL
int <-> INTEGER/INT
float <-> REAL/FLOAT
str <-> TEXT/VARCHAR(n)
bytes <-> BLOB

2: 使用MySQLdb访问MySQL数据库

首先,你需要使用connect方法创建一个与数据库的连接。之后,你需要一个游标,它将与该连接一起操作。

使用游标的execute方法与数据库交互,并不时地使用连接对象的commit方法提交更改。

完成所有操作后,不要忘记关闭游标和连接。

以下是一个包含你需要的一切的Dbconnect类。

import MySQLdb

class Dbconnect(object):
  
  def __init__(self):
    self.dbconection = MySQLdb.connect(host='host_example',
                                       port=int('port_example'),
                                       user='user_example',
                                       passwd='pass_example',
                                       db='schema_example')

    self.dbcursor = self.dbconection.cursor()

  def commit_db(self):
     elf.dbconection.commit()

  def close_db(self):
    self.dbcursor.close()
    self.dbconection.close()

与数据库交互非常简单。创建对象后,只需使用execute方法即可。

db = Dbconnect()
db.dbcursor.execute('SELECT * FROM %s' % 'table_example')

如果你想调用存储过程,请使用以下语法。注意,参数列表是可选的。

db = Dbconnect()
db.callproc('stored_procedure_name', [parameters] )

查询完成后,你可以通过多种方式访问结果。游标对象是一个生成器,可以检索所有结果或进行循环。

results = db.dbcursor.fetchall()
for individual_row in results:
  first_field = individual_row[0]

如果你想直接使用生成器进行循环:

for individual_row in db.dbcursor:
  first_field = individual_row[0]

如果你想将更改提交到数据库:

db.commit_db()

如果你想关闭游标和连接:

db.close_db()

3: 连接

创建连接

根据PEP 249,应使用connect()构造函数建立与数据库的连接,该函数返回一个Connection对象。此构造函数的参数取决于数据库。请参阅数据库特定的主题以了解相关的参数。

import MyDBAPI

con = MyDBAPI.connect(*database_dependent_args)

此连接对象具有四个方法:

1: close

con.close()

立即关闭连接。注意,如果调用了Connection.__del__方法,则连接将自动关闭。任何挂起的事务都将隐式回滚。

2: commit

con.commit()

将任何挂起的事务提交到数据库。

3: rollback

con.rollback()

回滚到任何挂起事务的开始。换句话说:这会取消对数据库的所有非提交事务。

4: cursor

cur = con.cursor()

返回一个Cursor对象。这用于在数据库上执行事务。

4: 使用psycopg2访问PostgreSQL数据库

psycopg2是最受欢迎的PostgreSQL数据库适配器,它既轻量级又高效。它是当前的PostgreSQL适配器实现。

它的主要特点是完全实现了Python DB API 2.0规范,并且线程安全(多个线程可以共享同一个连接)。

建立与数据库的连接并创建表

import psycopg2

# 建立与数据库的连接。
# 将参数值替换为数据库凭证。
conn = psycopg2.connect(database="testpython",
                        user="postgres",
                        host="localhost",
                        password="abc123",
                        port="5432")

# 创建一个游标。游标允许你执行数据库查询。
cur = conn.cursor()

# 创建一个表。初始化表名、列名和数据类型。
cur.execute(
  """CREATE TABLE FRUITS (
  id INT ,
  fruit_name TEXT,
  color TEXT,
  price REAL
  )""")

conn.commit()
conn.close()

向表中插入数据:

# 按照上述方式创建表后,向其中插入值。
cur.execute(
  """INSERT INTO FRUITS (id, fruit_name, color, price) 
  VALUES (1, 'Apples', 'green', 1.00)
  """)
cur.execute(
  """INSERT INTO FRUITS (id, fruit_name, color, price) 
  VALUES (1, 'Bananas', 'yellow', 0.80)
  """)

检索表数据:

# 设置查询并执行
cur.execute("""SELECT id, fruit_name, color, price FROM fruits""")

# 检索数据
rows = cur.fetchall()

# 对数据进行操作
for row in rows:
  print("ID = {} ".format(row[0]))
  print("FRUIT NAME = {}".format(row[1]))
  print("COLOR = {}".format(row[2]))
  print("PRICE = {}".format(row[3]))

上述代码的输出为:

ID = 1
NAME = Apples
COLOR = green
PRICE = 1.0

ID = 2
NAME = Bananas
COLOR = yellow
PRICE = 0.8

就这样,你已经掌握了psycopg2所需了解的一半内容!:)

5: Oracle数据库

先决条件:

  • cx_Oracle包 - 点击此处查看所有版本
  • Oracle即时客户端 - Windows x64,Linux x64

设置:

  • 安装cx_Oracle包:
sudo rpm -i <YOUR_PACKAGE_FILENAME>
  • 解压Oracle即时客户端并设置环境变量:
ORACLE_HOME=<PATH_TO_INSTANTCLIENT>
PATH=$ORACLE_HOME:$PATH
LD_LIBRARY_PATH=<PATH_TO_INSTANTCLIENT>:$LD_LIBRARY_PATH

创建连接:

import cx_Oracle
class OraExec(object):
_db_connection = None
_db_cur = None
def __init__(self):
self._db_connection =
cx_Oracle.connect('<USERNAME>/<PASSWORD>@<HOSTNAME>:<PORT>/<SERVICE_NAME>')
self._db_cur = self._db_connection.cursor()

获取数据库版本:

ver = con.version.split(".")
print ver

示例:

# Output: ['12', '1', '0', '2', '0']

执行查询:SELECT

_db_cur.execute("select * from employees order by emp_id")
for result in _db_cur:
  print result

输出将以Python元组的形式呈现:

(10, 'SYSADMIN', 'IT-INFRA', 7)
(23, 'HR ASSOCIATE', 'HUMAN RESOURCES', 6)

执行查询:INSERT

_db_cur.execute("insert into employees(emp_id, title, dept, grade) values (31, 'MTS', 'ENGINEERING', 7)
_db_connection.commit()

在Oracle数据库中执行插入/更新/删除操作时,更改仅在你的会话中可用,直到发出提交命令。当更新的数据提交到数据库后,它才对其他用户和会话可用。

使用绑定变量执行查询:INSERT

参考

绑定变量使你能够使用新值重新执行语句,而无需重新解析语句的开销。绑定变量提高了代码的可重用性,并可以降低SQL注入攻击的风险。

rows = [ (1, "First" ),
        (2, "Second" ),
        (3, "Third" ) ]
_db_cur.bindarraysize = 3
_db_cur.setinputsizes(int, 10)
_db_cur.executemany("insert into mytab(id, data) values (:1, :2)", rows)
_db_connection.commit()

关闭连接:

_db_connection.close()

close()方法关闭连接。任何未明确关闭的连接将在脚本结束时自动释放。

6: 使用sqlalchemy

要使用sqlalchemy访问数据库:

from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL

url = URL(drivername='mysql',
          username='user',
          password='passwd',
          host='host',database='db')

engine = create_engine(url) # sqlalchemy engine

现在可以使用这个引擎了,例如与pandas一起使用,直接从mysql获取数据框:

import pandas as pd

con = engine.connect()
dataframe = pd.read_sql(sql=query, con=con)



最近发表
标签列表