python操作Mysql数据库

目前工作中主要使用的还是mysql数据库,这里把常用的函数做下总结。

python3选择使用pymysql包进行数据库操作,使用pip3 install PyMySQL来进行安装

为了方便查看,这里建一个test数据库,people表进行举例。

1
2
3
4
5
6
7
CREATE TABLE `people` (   `name` char(20) DEFAULT NULL,   
`age` tinyint(2) DEFAULT NULL,
`weight` float DEFAULT NULL )
ENGINE=InnoDB DEFAULT CHARSET=latin1
#这里将表里插入两条语句
insert into people values("heng","30","170");
insert into people values("de","31","171");

连接数据库

1
2
3
4
5
6
7
8
9
10
import pymysql
def connect_local_db():
connect = pymysql.connect(host='localhost',
user='root',
password='root',
db='test',
port=3306,
charset='utf8')
cursor = connect.cursor()
return connect, cursor

这里可以把需要连接的数据库分别写成一个函数,将游标和db传出去,方便调用

查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 使用 execute()  方法执行 SQL 查询 
cursor.execute("select * from people")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
#使用 fetchall() 方法获取全部数据.
data = cursor.fetchall()

def test_select():
sql = "select * from people"
cursor.execute(sql)
data = cursor.fetchone()
print("fetchone data is",data)

cursor.execute(sql)
data = cursor.fetchall()
print("fetchall data is",data)
#需要注意的是这里抽取完的data是以元组形式存在的
1
2
3
fetchone data is ('heng', 30, 170.0)
fetchall data is (('heng', 30, 170.0), ('de', 31, 171.0))
可以看到fetchone取出的是一个元组,而fetchall取出的是一个元组的列表

插入数据

如果数据量少可以逐条插入,如果数据量多则需要批量插入(先将待插入数据写成元组,然后使用executemany函数进行批量插入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def test_select():
sql = "select * from people"
cursor.execute(sql)
data = cursor.fetchall()
print("fetchall data is",data)

#逐条插入
def test_insert1():
sql = 'insert into people values("heng2","32","172")'
cursor.execute(sql)
connect.commit()

test_insert1()
test_select()
## out:fetchall data is (('heng', 30, 170.0), ('de', 31, 171.0), ('heng2', 32, 172.0))

#批量插入
def test_insert2():
sql = 'insert into people (name, age, weight) values(%s,%s,%s)'
insert_data = (("heng3","32","172"),("heng4","32","172"),("heng5","32","172"))
cursor.executemany(sql, insert_data)
connect.commit()
test_insert2()
test_select()
## out:fetchall data is (('heng', 30, 170.0), ('de', 31, 171.0), ('heng2', 32, 172.0),
## ('heng3', 32, 172.0), ('heng4', 32, 172.0), ('heng5', 32, 172.0))

对时间进行插入时,踩过一些坑。

在插入单条时,这里的%s必须带双引号,不然会报错

但是在插入多条时,%s不能加引号,加引号会报错

1
2
3
4
5
6
7
8
9
10
11
import datetime

create_time = datetime.datetime.now()
sql = ('insert into people (name, age, weight, create_time) values("heng2","32","172","%s")'%create_time)
# 在插入单条时,这里的%s必须带双引号,不然会报错
# out:fetchall data is ((1, 'heng2', 32, 172.0, datetime.datetime(2020, 12, 11, 18, 5, 32)),)

#但是在插入多条时,%s不能加引号,加引号会报错
sql = 'insert into people (name, age, weight, create_time) values(%s,%s,%s,%s)'
create_time = datetime.datetime.now()
insert_data = (("heng3","32","172",create_time),("heng4","32","172",create_time))

插入文本时需要去除特殊符号,或者进行转义

例如,想插入“!@#$%^&*()/{}[]”,或者包含特殊字符的字符串。如果直接插入会报sql错误。我们需要对%s加单引号(双引号不行),注意这种情况下第一个转义字符会被忽略

1
2
3
str_test = '\!@#$%^&*()\/{}[]'
sql = ('insert into people (name, age, weight, create_time) values('%s',"32","172","%s")'%(str_test,create_time))
#out:(6, '!@#$%^&*()/{}[]', 32, 172.0, datetime.datetime(2020, 12, 11, 18, 31, 17))

Rollback的使用

在批量插入时,如果中间报错了, 使用rollback函数会回撤上一条插入,谨慎使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def test_insert2():
sql = 'insert into people (name, age, weight, create_time) values(%s,%s,%s,%s)'
create_time = datetime.datetime.now()
data_all = [[("heng3","32","172",create_time),("heng4","32","172",create_time)],
[("heng3333333333333","32","172",create_time),("heng44444444444444444","32","172",create_time)],
[("heng5","32","172",create_time),("heng6","32","172",create_time)]]
for insert_data in data_all:
try:
cursor.executemany(sql, insert_data)
except Exception as e:
# connect.rollback()
print("error",e)
connect.commit()
test_insert2()
test_select()
#这里第二条插入是超出长度限制的,如果第11行打开则六条记录只能插入最后两条

更新、删除数据

和插入一样,语句执行完成后,使用connect.commit()进行提交,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def test_update():
sql = 'update people set name= "de11", age="22" where id= 1'
cursor.execute(sql)
connect.commit()
test_update()
#id为1的数据被update,这里注意set之后的字段是用逗号分隔的而不是用and

def test_delete():
sql = 'delete from people where id= 1'
cursor.execute(sql)
connect.commit()
test_update()