Sqlalchemy 基础学习

增删改查

添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 添加单个对象
user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
session.add(user)
# 添加对象忽略重复错误
insert_command = User.__table__.insert().prefix_with(' IGNORE').values(data)
with db.engine.connect() as connect:
connect.execute(insert_command)
connect.commit()
# 批量添加
users = [
User(name='wendy', fullname='Wendy Williams', nickname='windy'),
User(name='mary', fullname='Mary Contrary', nickname='mary'),
User(name='fred', fullname='Fred Flintstone', nickname='freddy')
]
session.add_all(users)
session.commit() # 提交
session.rollback() # 回滚
# Upsert
insert_stmt = insert(User).values(**insert_data)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(**update_data)
with db.engine.connect() as connect:
connect.execute(on_duplicate_key_stmt)

删除

1
2
session.delete(jack)
session.query(User).filter_by(name='jack').count()

修改

1
2
3
4
5
6
7
8
9
10
11
12
13
from sqlalchemy import update, bindparam
stmt = update(User).where(User.c.name == "iFan").values(fullname="iFanlw")

stmt = update(user_table).where(user_table.c.name == bindparam("oldname")).values(name=bindparam("newname"))
with engine.begin() as conn:
conn.execute(
stmt,
[
{"oldname": "jack", "newname": "ed"},
{"oldname": "wendy", "newname": "mary"},
{"oldname": "jim", "newname": "jake"},
],
)

查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 查询全部
session.query(User).all()

# 根据条件查询
session.query(User).filter(User.name == "iFan").all()
session.query(User).filter(User.name != "iFan").all()
session.query(User).filter(User.name.like("%ifan%")).all() # 区分大小写
session.query(User).filter(User.name.ilike("%ifan%")).all() # 不区分大小写
session.query(User).filter(User.name.in_(["ifan", "ifan1"])).all()
session.query(User).filter(User.name.in_(session.query(User.name).filter(User.name.like("%ifan%")))).all()
# 多个In条件组合
from sqlalchemy import tuple_
query.filter(tuple_(User.name, User.nickname).in_([('ed', 'edsnickname'), ('wendy', 'windy')]))
# not in
session.query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
# is None
session.query(User).filter(User.name.is_not(None)).all()
# and
from sqlalchemy import and_
session.query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
# or
from sqlalchemy import or_
session.query.filter(or_(User.name == 'ed', User.name == 'wendy'))
# 存在则返回,否则返回None
session.query.filter(User.id == 1).one_or_none()
# 使用文本的查询
from sqlalchemy import text
session.query.filter(text("id = 1")).order_by(text("id").all()
session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one()
session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
stmt = text("SELECT name, id, fullname, nickname FROM users where name=:name")
stmt = stmt.columns(User.name, User.id, User.fullname, User.nickname)
session.query(User).from_statement(stmt).params(name='ed').all()


# 限制返回
for user in session.query(User).filter(User.age == 18)[:18]:
print(user.username, user.age)

分组查询

1
2
3
4
5
6
# 分组查询
from sqlalchemy import func
session.query(func.count(User.name), User.name).group_by(User.name).all()
# count
session.query(func.count('*')).select_from(User).scalar()
session.query(func.count(User.id)).scalar()