然后定义filter:
task_filter = { or_( and_( Task.task_department == current_user.user_department, Task.task_commit_status > 0, Task.task_complete_time >= prev, ), and_( Task.task_complete_time >= prev, Task.task_members.like(‘%,‘+str(current_user.user_id)+‘,%‘), ) )}
上面这个filter中有两个条件组,关系为or,每个条件组里有一些and关系的条件。
filter定义完之后就可以进行查询了:
Task.query.filter(*task_filter).all()
FLASK-SQLALCHEMY如何使用or和and条件进行组合查询
标签:组合 content 遇到 官方 members 工作 keyword constant import
小编还为您整理了以下内容,可能对您也有帮助:
在 Flask-SQLAlchemy 中联表查询
SQLAlchemy 是一个功能强大的 ORM 。 Flask-SQLAlchemy 是一个 Flask 插件,它让我们在 Flask 框架中使用 SQLAlchemy 变得更容易。
本篇介绍我在使用 Flask-SQLAlchemy 2.1 时进行联表查询的一些经验
这里有两个表,account 表保存帐号 ID 和昵称,bind 表保存 account 之间的绑定关系。
对应的 Model 如下:
先来看一个简单的例子:查询 gameuid 1000 账号下绑定的所有帐号。
看一看生成的 SQL 语句:
这里的联表查询使用的是 WHERE 语句。如果希望使用 JOIN 语句,可以这样写:
可以看出,现在生成的 SQL 语句已经使用 JOIN 语句了。但上面的语意有点奇怪,既然已经在 query 中使用了 Bind 和 Account,后面再 join 一次 Account 总觉得有点多余。那么 SQLAlchemy 如何选择 JOIN 的时候谁先谁后呢?看看这个错误就知道了:
这个错误显然说明,query 中参数的顺序很重要,第一个参数所代表的 table 就是 JOIN 时放在前面的那个 table。因此,此处 JOIN 的目标应该是 Account, 而不应该是 Bind 自身。
上面的例子已经解决了大多数需求了。我们再来看看分页。在 Flask-SQLAlchemy 中封装了一个 paginate 方法,可以方便地将查询记录进行分页:
报错的原因是 db.session.query 默认返回的是 orm.Query 对象,这个对象并不包含 paginate 方法。要解决这个问题,需要修改 Flask-SQLAlchemy 的源码。
找到 SQLAlchemy 对象的 __init__ 定义,在其中加入 session_options['query_cls'] = BaseQuery 即可:
在 Flask-SQLAlchemy 提供的 Model 对象中,可以使用 Model.query 这样的语法来直接得到一个查询对象,这是由于 Flask-SQLAlchemy 中存在一个 _QueryProperty 类,每次调用 Model.__get__ 时,会自动生成一个基于当前 session 的 query 对象:
使用 Model.query 得到的这个 query 对象可以直接进行 JOIN 操作,得到的结果是 Model 对象。这样就方便多了:
转换成 SQL 是这样的:
可以看出,这样的查询结果和使用 db.session.query 并没有什么不同。由于返回的是 Model 对象,使用上可能还更加方便了。
如何使用 Model.query.join 语法得到部分字段呢?这里可以使用 SQLAlchemy 提供的 with_entities 方法:
注意,列表中的项 (2, '玩家10001') 并不是标准的 Python tuple。你如果查看它的类型,会发现一个奇怪的名称: <class 'sqlalchemy.util._collections.result'> 。它是一个 AbstractKeyedTuple 对象,拥有一个 keys() 方法,这样可以很容易将其转换成 dict :
想了解 AbstractKeyedTuple ,可以看看这篇文档 New KeyedTuple implementation dramatically faster 。
除了筛选字段外,还可以用另一个方法获取多个 Model 的记录。那就是,返回两个 Model 的所有字段:
使用上面的语法直接返回 Account 和 Bind 对象,可以进行更加灵活的操作。
要联结超过 2 张以上的表,可以直接在 join 得到的结果之后链式调用 join 。也可以在 filter 的结果后面链式调用 join 。join 和 filter 返回的都是 query 对象,因此可以无限链式调用下去。
写完查询后,应该打印生成的 SQL 语句查看一下有没有性能问题。
python sqlalchemy 查询条件怎么写
通过Flask-SQLAlchemy提供的一个query属性,当你通过model类的query属性,你可以得到一个数据库表的查询结果集。
i.User.query.filter_by(username='peter').first(),通过filter_by方法里的条件表达式来对query所得到的结果集进行过滤,得到你想要得到的结果。
example:
Retrieve a user by username通过username属性为’peter‘过滤结果集:
>>> peter = User.query.filter_by(username='peter').first()
>>> peter.id
1
>>> peter.email
u'peter@example.org'
当不存在结果集时返回none:
>>> missing = User.query.filter_by(username='missing').first()
>>> missing is None
True
python sqlalchemy 查询条件怎么写
通过Flask-SQLAlchemy提供的一个query属性,当你通过model类的query属性,你可以得到一个数据库表的查询结果集。
i.User.query.filter_by(username='peter').first(),通过filter_by方法里的条件表达式来对query所得到的结果集进行过滤,得到你想要得到的结果。
example:
Retrieve a user by username通过username属性为’peter‘过滤结果集:
>>> peter = User.query.filter_by(username='peter').first()
>>> peter.id
1
>>> peter.email
u'peter@example.org'
当不存在结果集时返回none:
>>> missing = User.query.filter_by(username='missing').first()
>>> missing is None
True
flask怎样查询mysql并显示在页面上
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Database(object):
def __init__(self, configure):
self.configure = configure
self.engine = create_engine(self.configure.MYSQL_SQLALCHEMY_URL)
self.Session = sessionmaker(bind=self.engine)
self.session = self.Session()
from sqlalchemy import Column, Integer, String, Sequence
class User(Base):
"""
user table
"""
__tablename__ = 'user'
id = Column(Integer, Sequence("USER_ID_SEQ"), primary_key=True)
username = Column(String(256), doc=u"用户名")
password = Column(String(256), doc=u"密码")
def __repr__(self):
return "<User(username='%s', password='%s', id='%s')" % (self.username, self.password, self.id)
使用sqlalchemy来连接mysql并且查询其中的user表的方法,你可以试试。
显示页面的话:
from flask import jsonify会把结果显示在页面上,是以json方式显示的
flask怎样查询mysql并显示在页面上
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Database(object):
def __init__(self, configure):
self.configure = configure
self.engine = create_engine(self.configure.MYSQL_SQLALCHEMY_URL)
self.Session = sessionmaker(bind=self.engine)
self.session = self.Session()
from sqlalchemy import Column, Integer, String, Sequence
class User(Base):
"""
user table
"""
__tablename__ = 'user'
id = Column(Integer, Sequence("USER_ID_SEQ"), primary_key=True)
username = Column(String(256), doc=u"用户名")
password = Column(String(256), doc=u"密码")
def __repr__(self):
return "<User(username='%s', password='%s', id='%s')" % (self.username, self.password, self.id)
使用sqlalchemy来连接mysql并且查询其中的user表的方法,你可以试试。
显示页面的话:
from flask import jsonify会把结果显示在页面上,是以json方式显示的
在python3下怎样用flask-sqlalchemy对mysql数据库操作
以 Debian/Ubuntu 为例(请确保有管理员权限):
1.MySQL
代码如下:
apt-get install mysql-server
apt-get install mysql-client
apt-get install libmysqlclient15-dev
2.python-mysqldb
代码如下:
apt-get install python-mysqldb
3.easy_install
代码如下:
wget http:// peak.telecommunity.com/dist/ez_setup.py
python ez_setup.py
4.MySQL-Python
代码如下:
easy_install MySQL-Python
5.SQLAlchemy
代码如下:
easy_install SQLAlchemy
6、安装完成后使用下面代码测试连接
代码如下:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DB_CONNECT_STRING = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, echo=True)
DB_Session = sessionmaker(bind=engine)
session = DB_Session()
7、数据操作(增删改查)
代码如下:
from sqlalchemy import func, or_, not_
user = User(name='a')
session.add(user)
user = User(name='b')
session.add(user)
user = User(name='a')
session.add(user)
user = User()
session.add(user)
session.commit()
query = session.query(User)
print query # 显示SQL 语句
print query.statement # 同上
for user in query: # 遍历时查询
print user.name
print query.all() # 返回的是一个类似列表的对象
print query.first().name # 记录不存在时,first() 会返回 None
# print query.one().name # 不存在,或有多行记录时会抛出异常
print query.filter(User.id == 2).first().name
print query.get(2).name # 以主键获取,等效于上句
print query.filter('id = 2').first().name # 支持字符串
query2 = session.query(User.name)
print query2.all() # 每行是个元组
print query2.limit(1).all() # 最多返回 1 条记录
print query2.offset(1).all() # 从第 2 条记录开始返回
print query2.order_by(User.name).all()
print query2.order_by('name').all()
print query2.order_by(User.name.desc()).all()
print query2.order_by('name desc').all()
print session.query(User.id).order_by(User.name.desc(), User.id).all()
print query2.filter(User.id == 1).scalar() # 如果有记录,返回第一条记录的第一个元素
print session.query('id').select_from(User).filter('id = 1').scalar()
print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and
query3 = query3.filter(User.name != 'a')
print query3.scalar()
print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in
query4 = session.query(User.id)
print query4.filter(User.name == None).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all()
print query4.count()
print session.query(func.count('*')).select_from(User).scalar()
print session.query(func.count('1')).select_from(User).scalar()
print session.query(func.count(User.id)).scalar()
print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表
print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以用 limit() count() 的返回数
print session.query(func.sum(User.id)).scalar()
print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该数据库支持
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(User.name)).filter(User.id == 1).scalar()
query.filter(User.id == 1).update({User.name: 'c'})
user = query.get(1)
print user.name
user.name = 'd'
session.flush() # 写数据库,但并不提交
print query.get(1).name
session.delete(user)
session.flush()
print query.get(1)
session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)
如何在flask-sqlalchemy查询最后一条记录
思路:先过滤出一个班级的所有记录,然后倒序查询这些记录中的第一个记录即为原来的最后一个记录ModelName.query.filter_by(RowName='a').order_by(ModelName.RowName.desc()).first()