使用SQLalchemy保存数据到SQLite

在测试中获得了一批结构化的数据,为了便于后续工作中使用,打算将它们保存到数据库中。对我而言,sqlite是最好的选择,轻量简单。之所以不直接拼接SQL语句,是因为之前遇到过数据中包含了特殊字符(单引号)导致报错,所以便使用ORM框架SQLAlchemy来做。

PS:本文没技术含量,纯粹做记录。

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 声明
import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, Sequence
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker

# 链接数据库 '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
engine = create_engine("postgresql://scott:tiger@localhost/test")
engine = create_engine("mysql://scott:tiger@hostname/dbname", encoding='latin1', echo=True)
engine = create_engine("sqlite:///"+"/Users/bingo/temp/test.db", echo=False)

# 声明Mapping实例
Base = declarative_base()

class MyOBJ(Base): # 继承Base创建一个对象类
__tablename__ = 'data'
id = Column(Integer, primary_key = True)
Name = Column(String)
Handsome = Column(Boolean)

# Create a Schema 生成数据表
Base.metadata.create_all(engine)

# Creating a Session 创建Session类
Session = sessionmaker(bind=engine)
session = Session() # 创建Session实例

# 创建对象
myobj = MyOBJ(Name="Bingo", Handsome=True)

# 保存对象到数据库
session.add(data) # insert
session.merge(data) # insert or update(只对primary_key可行,unique字段不行)

# 提交事务
session.commit() # 将add/merge等操作固化到数据库中
# 撤销事务
session.rollback() # 撤销操作

# 关闭session
session.close()
`

实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import pdb
import re
import json
import time
import pdb
import uuid
import sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, Sequence
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker

def collect2SQLite():
dirPath = "/Users/bingo/Downloads/test/"
filePattern = r'.*\.json$'
sqlDB = "/Users/bingo/temp/testdata%d.db"%(int(time.time()*10))
# 链接数据库 '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
engine = create_engine("sqlite:///"+sqlDB,echo=False) # echo - logging标志

# 声明Mapping实例
Base = declarative_base()

class Data(Base):
def __init__(self,d):
# 预处理,原本的json数据有子dict,为了方便,处理为单一dictionary。
try:
d['PersonId'] = d['PersonId']['Id']
except:
d['PersonId'] = "NoValue :: " + uuid.uuid1()
try:
d['EmailAddress'] = d['EmailAddress']['EmailAddress']
except:
d['EmailAddress'] = None
for a, b in d.items():
setattr(self, a, b if isinstance(b, (int,str)) else None)

__tablename__ = 'data'
# id = Column(Integer, primary_key = True) # 为了使用session.merge(),取消该字段。
PersonId = Column(String, primary_key = True) # sqlalchemy.orm.relationship('PersonId')
PersonTypeString = Column(String) # 可以Column(String(50)) 对于sqlite,声明字符串长度是不必须的
CreationTimeString = Column(String)
DisplayName = Column(String)
DisplayNameFirstLast = Column(String)
DisplayNameLastFirst = Column(String)
FileAs = Column(String)
GivenName = Column(String)
Surname = Column(String)
CompanyName = Column(String)
EmailAddress = Column(String) # sqlalchemy.orm.relationship('EmailAddress')
ImAddress = Column(Integer) #
RelevanceScore = Column(Integer)
ADObjectId = Column(String) # Column(String, unique = True)
def __repr__(self): # 直接打印时的展示格式
return "<Data(Name='%s', Company='%s', Email='%s')>" % (self.DisplayName, self.CompanyName, self.EmailAddress)

# Create a Schema 生成数据表
Base.metadata.create_all(engine)

# Creating a Session 创建Session类
Session = sessionmaker(bind=engine)
session = Session() # 创建Session实例
for rawfile in os.listdir(dirPath):
if re.match(filePattern,rawfile):
filePath =os.path.join(dirPath,rawfile)
print("Processing :: %s"%(filePath))
counter = 0
with open(filePath,"r") as fp:
rawdict = json.load(fp)
personList = rawdict['Body']['ResultSet']
for p in personList:
#time.sleep(0.1)
try:
data = Data(p)
except:
print("❌ \n%s"%p)
pdb.set_trace()
time.sleep(0.5)
else:
try:
# Adding Objects
# session.add(data) # insert
session.merge(data) # insert or update(只对primary_key可行,unique字段不行)
# session.commit() # 频繁提交会较严重的影响速度
counter += 1
#print("· ",end="")
except Exception as e:
print(e)
# session.rollback()
pdb.set_trace()

# submit data
session.commit()
print("✅ 处理 %d 条数据"%counter)
session.close()
# end for

if __name__=="__main__":
collect2SQLite()

遗留问题

  • SQLAlchemy的插入速度貌似比SQL语句拼接的数据慢挺多
  • 非主键字段唯一时,如何优雅的避免冲突?
  • 存在子字典的json数据如何优雅的保存到数据库的多个关联表格中?
  • 数据库查询及使用相关姿势。