SQLAlchemy(二)

目标

使用 SQLAlchemy 连接数据库,创建数据表
使用 SQLAlchemy 实现插入数据、查询数据的操作

环境:

  • Python==3.6.8
  • SQLAlchemy==1.2.16
  • Psycopg2==2.7.6.1
  • faker==1.0.1

插入一条数据

现在,我们可以直接使用 ORM 来操作数据了,先来插入一条数据:

1
2
3
4
5
6
7
>>> ed_user = User(name='ed', password='123456')
>>> ed_user.name
'ed'
>>> ed_user.password
'123456'
>>> str(ed_user.id)
'None'

虽然我们没有指定 id 的值,但是 id 仍然有个默认值,SQLAlchemy 的 instrumentation 会在初次访问某个属性的时候赋上默认值。如果已经赋上的值,则会直接使用之前赋予旳值。

这时,我们查一下数据库,看看数据有没有存进去:

1
2
3
4
mydb=# SELECT * FROM users;
id | name | password
----+------+----------
(0 rows)

???

怎么回事?肯定是漏了哪一步。

SQLAlchemy 是通过 Session 来和数据库交流通信的。

有两种时机来定义 Session

一、create_engine() 之后

1
2
>>> from sqlalchemy.orm import sessionmaker
>>> Session = sessionmaker(bind=engine)

二、create_engine() 之前

1
2
3
>>> Session = sessionmaker()
...
>>> Session.configure(bind=engine)

然后在需要和数据库交流的时候,可以直接实例化 Session

1
>>> session = Session()

在把数据存入数据库之前,需要先将对象数据加入 Session 之中,可以调用 Sessionadd() 方法:

1
2
>>> ed_user = User(name='ed', password='123456')
>>> session.add(ed_user)

这时候我们再去查数据库:

1
2
3
4
mydb=# SELECT * FROM users;
id | name | password
----+------+----------
(0 rows)

没错,数据库其实还不知道有这数据,这个时候,该对象存在于 Session 中,处于 pending 状态,还没有 SQL 语句发送给数据库,Session 只有在一定的时候才会去发送 SQL 持久化数据(这个过程称为 flush)。

我们现在通过 ORM 来查询下:

1
2
3
4
5
6
>>> our_user = session.query(User).filter_by(name='ed').first()
>>> our_user
<User (ed)>

>>> ed_user is our_user
True

Query 查询返回的结果对象居然和之前的对象一样。实际上 Session 发现返回的对象其实和之前对象引用的是内部映射对象中的同一行数据,所以返回的对象其实是同一个内存上的数据。

这个概念称为 ORM 的 identity map,这能确保一个 Session 上的所有特定行的操作都是操作的同样数据。Session 中指定 primary key 的对象,所有 SQL 查询都会只返回同样的 Python 对象,如果想新建一个已存在的 primary key 对象也是会直接报错的。

上述用户的密码太简单了,我们可以直接这样更改:

1
>>> ed_user.password = 'f8s7ccs'

Session 对象还能检测到 ed_user 被修改了:

1
2
>>> session.dirty
IdentitySet([<User (ed)>])

我们再往 Session 添加一条数据:

1
session.add(User(name='n', password='123'))

我们可以通过 Sessionnew 属性获知还有哪些处于 pending 状态的对象:

1
2
>>> session.new
IdentitySet([<User (n)>])

以上,所有操作的都是存在 Session 中的数据,现在,终于到了我们梦寐以求的时刻 —— 插入数据库。

通过 Sessioncommit() 方法,将所有的数据持久化进数据库:

1
>>> session.commit()

现在我们去数据库瞅瞅:

1
2
3
4
5
6
mydb=# SELECT * FROM users;
id | name | password
----+------+----------
1 | ed | f8s7ccs
2 | n | 123
(2 rows)

完美。

Session 将数据插入数据库之后,这些对象属性的值都能立马获取。

commit() 之后,会开启一个新的事务,所以所有的内部映射对象都会重新加载。SQLAlchemy 默认会在访问本次事务的时候,从上一次事务中刷新数据。

查询

Session 对象使用 query() 方法时,会创建一个 Query 对象。

一、query() 可以接受模型类作为参数,将返回该模型类的实例对象:

1
2
3
4
5
6
>>> for instance in session.query(User).order_by(User.id):
... print(instance.id, instance.name)

1 ed
2 n
3 tom

二、query() 也可以接受模型类的字段属性作为参数,返回这些属性值的 tuple:

1
2
3
4
5
6
>>> for id, name in session.query(User.id, User.name):
... print(id, name)

1 ed
2 n
3 tom

上面返回的 tuple 可以是命名 tule,也可以是 KeydTuple,长得和字典差不多。属性的话 key 就是属性名字,类的话就是类名:

1
2
3
4
5
6
>>> for row in session.query(User, User.name).all():
... print(row.User, row.name)

<User (ed)> ed
<User (n)> n
<User (tom)> tom

当然,如果不想使用上面这些默认的名称,也可以通过 label() 来自定义名字:

1
2
3
4
5
6
>>> for row in session.query(User.name.label('user_name')).all():
... print(row.user_name)

ed
n
tom

而,对于整个类名的自定义命名则需要使用 aliased() 方法:

1
2
3
4
5
6
7
8
9
>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')

>>> for row in session.query(user_alias, user_alias.name).all():
... print(row.user_alias, row.name)

<User (ed)> ed
<User (n)> n
<User (tom)> tom

QueryLIMITOFFSET 操作,都可以直接使用 Python 的切片实现,同时还可以搭配 ORDER_BY 使用:

1
2
3
4
5
>>> for u in session.query(User).order_by(-User.id)[1:3]:
... print(u.id)

3
2

过滤操作有两种:

一、可以通过 filter_by() 传入关键字参数来实现:

1
2
3
4
>>> for u in session.query(User).filter_by(name='tom'):
... print(u.id)

3

二、通过 filter() 传入类的映射属性实现:

1
2
3
4
>>> for u in session.query(User).filter(User.name=='tom'):
... print(u.id)

3

可以看见,第二种过滤方法更灵活。其传入的是 Python 的操作符。

常用的操作符有以下几种:

  • equals:query.filter(User.name=='tom')
  • not equals:query.filter(User.name!='tom')
  • like:query.filter(User.name.like('%to%'))
  • ilike:query.filter(User.name.ilike('%to%'))
  • in:
    • query.filter(User.name.in_(['tom', 'ed']))
    • query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%to%')))
  • not in:query.filter(~User.name.in_(['tom', 'ed']))
  • is null:
    • query.filter(User.name==None)
    • query.filter(User.name.is_(None))
  • is not null:
    • query.filter(User.name!=None)
    • query.filter(User.name.isnot(None))
  • and:from sqlalchemy import and_
    • query.filter(and_(User.name=='tom', User.id==2))
    • query.filter(User.name=='tom', User.id==2)
    • query.filter(User.name=='tom').filter(User.id==2)
  • or:from sqlalchemy import or_
    • query.filter(or_(User.name=='tom', User.id==2))
  • match:query.filter(User.name.match('tom'))

返回值

一般就分两种:对象列表、对象:

  • all():返回对象列表
  • first():返回第一个对象
  • one():返回一个对象,如果该 query 为空或者 不止一个对象都会报错
  • one_or_none():返回一个对象,query 为空返回 None,多个对象还是会报错
  • scalar():返回一个对象,会先调用 one(),获取对象,然后返回第一列的数据,query 为空返回 None,多个对象报错

使用字符SQL

Query 通过 text() 方法,可以接受文字字符串,大部分的 Query 方法都支持:

1
2
3
4
5
6
7
8
9
>>> from sqlalchemy import text
>>> for user in session.query(User).\
... filter(text('id<224')).\
... order_by(text('id')).all():
... print(user.name)

ed
n
tom

text() 里面还可以传递变量。变量名前面只需加上 :,然后在配合 params() 方法即可:

1
2
3
4
>>> session.query(User).filter(text('id<:value and name=:name')).\
... params(value=224, name='tom').order_by(User.id).one()

<User (tom)>

如果想直接使用完整的 SQL 语句可以在 from_statement() 中使用 text()

1
2
3
4
5
>>> session.query(User).from_statement(
... text("SELECT * FROM users where name=:name")).\
... params(name='tom').all()

[<User (tom)>]

另外,可能会有这么一种情况:当我们处理复杂案例时,可能会碰到多个映射类有重复的列名,或者还会有些匿名的 ORM 结构。我们可以使用 TextClause.columns() 方法指定具体列所对应的 ORM 映射:

1
2
3
4
5
>>> stmt = text('SELECT name, id, password FROM user where name=:name')
>>> stmt = stmt.columns(User.name, User.id, User.password)
>>> session.query(User).from_statement(stmt).params(name='ed').all()

[<User (ed)>]

计数

简单的 Query 计数可以直接使用 count()

1
2
3
>>> session.query(User).filter(User.name.like('%ed')).count()

1

但是这段语句会翻译成如下的 SQL 命令去执行:

1
2
3
4
5
6
7
8
SELECT count(*) AS count_1
FROM (SELECT users.id AS users_id,
users.name AS users_name,
users.fullname AS users_fullname,
users.password AS users_password
FROM users
WHERE users.name LIKE ?) AS anon_1
('%ed',)

SQLAlchemy 总是会把我们的查询作为子集,然后再从这里面计算行数。

有时候我们可能需要更明确的指定需要计数的内容,func.count() 表达式就可以完成:

1
2
3
4
>>> from sqlalchemy import func
>>> session.query(func.count(User.name), User.name).group_by(User.name).all()

[(1, 'n'), (1, 'tom'), (1, 'ed')]

如果我们想实现简单的 SELECT count(*) FROM table,可以这么做:

1
2
3
>>> session.query(func.count('*')).select_from(User).scalar()

3

还可以再简化,当我们直接以 User 主键来计数,select_from() 都可以省略不写:

1
2
3
>>> session.query(func.count(User.id)).scalar()

3