首页 热点资讯 义务教育 高等教育 出国留学 考研考公
您的当前位置:首页正文

FLASK-SQLALCHEMY如何使用or和and条件进行组合查询

2023-11-13 来源:花图问答

然后定义filter:

task_filter = { or_( and_( Task.task_department == current_user.user_department, Task.task_commit_status > 0, Task.task_complete_time >= prev, ), and_( Task.task_complete_time >= prev, Task.task_members.like(‘%,‘+str(current_user.user_id)+‘,%‘), ) )}

上面这个filter中有两个条件组,关系为or,每个条件组里有一些and关系的条件。

filter定义完之后就可以进行查询了:

Task.query.filter(*task_filter).all()

FLASK-SQLALCHEMY如何使用or和and条件进行组合查询

标签:组合   content   遇到   官方   members   工作   keyword   constant   import   

小编还为您整理了以下内容,可能对您也有帮助:

在 Flask-SQLAlchemy 中联表查询

SQLAlchemy 是一个功能强大的 ORM 。 Flask-SQLAlchemy 是一个 Flask 插件,它让我们在 Flask 框架中使用 SQLAlchemy 变得更容易。

本篇介绍我在使用 Flask-SQLAlchemy 2.1 时进行联表查询的一些经验

这里有两个表,account 表保存帐号 ID 和昵称,bind 表保存 account 之间的绑定关系。

对应的 Model 如下:

先来看一个简单的例子:查询 gameuid 1000 账号下绑定的所有帐号。

看一看生成的 SQL 语句:

这里的联表查询使用的是 WHERE 语句。如果希望使用 JOIN 语句,可以这样写:

可以看出,现在生成的 SQL 语句已经使用 JOIN 语句了。但上面的语意有点奇怪,既然已经在 query 中使用了 Bind 和 Account,后面再 join 一次 Account 总觉得有点多余。那么 SQLAlchemy 如何选择 JOIN 的时候谁先谁后呢?看看这个错误就知道了:

这个错误显然说明,query 中参数的顺序很重要,第一个参数所代表的 table 就是 JOIN 时放在前面的那个 table。因此,此处 JOIN 的目标应该是 Account, 而不应该是 Bind 自身。

上面的例子已经解决了大多数需求了。我们再来看看分页。在 Flask-SQLAlchemy 中封装了一个 paginate 方法,可以方便地将查询记录进行分页:

报错的原因是 db.session.query 默认返回的是 orm.Query 对象,这个对象并不包含 paginate 方法。要解决这个问题,需要修改 Flask-SQLAlchemy 的源码。

找到 SQLAlchemy 对象的 __init__ 定义,在其中加入 session_options['query_cls'] = BaseQuery 即可:

在 Flask-SQLAlchemy 提供的 Model 对象中,可以使用 Model.query 这样的语法来直接得到一个查询对象,这是由于 Flask-SQLAlchemy 中存在一个 _QueryProperty 类,每次调用 Model.__get__ 时,会自动生成一个基于当前 session 的 query 对象:

使用 Model.query 得到的这个 query 对象可以直接进行 JOIN 操作,得到的结果是 Model 对象。这样就方便多了:

转换成 SQL 是这样的:

可以看出,这样的查询结果和使用 db.session.query 并没有什么不同。由于返回的是 Model 对象,使用上可能还更加方便了。

如何使用 Model.query.join 语法得到部分字段呢?这里可以使用 SQLAlchemy 提供的 with_entities 方法:

注意,列表中的项 (2, '玩家10001') 并不是标准的 Python tuple。你如果查看它的类型,会发现一个奇怪的名称: <class 'sqlalchemy.util._collections.result'> 。它是一个 AbstractKeyedTuple 对象,拥有一个 keys() 方法,这样可以很容易将其转换成 dict :

想了解 AbstractKeyedTuple ,可以看看这篇文档 New KeyedTuple implementation dramatically faster 。

除了筛选字段外,还可以用另一个方法获取多个 Model 的记录。那就是,返回两个 Model 的所有字段:

使用上面的语法直接返回 Account 和 Bind 对象,可以进行更加灵活的操作。

要联结超过 2 张以上的表,可以直接在 join 得到的结果之后链式调用 join 。也可以在 filter 的结果后面链式调用 join 。join 和 filter 返回的都是 query 对象,因此可以无限链式调用下去。

写完查询后,应该打印生成的 SQL 语句查看一下有没有性能问题。

python sqlalchemy 查询条件怎么写

通过Flask-SQLAlchemy提供的一个query属性,当你通过model类的query属性,你可以得到一个数据库表的查询结果集。
  i.User.query.filter_by(username='peter').first(),通过filter_by方法里的条件表达式来对query所得到的结果集进行过滤,得到你想要得到的结果。
   example:
    Retrieve a user by username通过username属性为’peter‘过滤结果集:
    >>> peter = User.query.filter_by(username='peter').first()
    >>> peter.id
    1
    >>> peter.email
    u'peter@example.org'
   当不存在结果集时返回none:
    >>> missing = User.query.filter_by(username='missing').first()
    >>> missing is None
    True 

python sqlalchemy 查询条件怎么写

通过Flask-SQLAlchemy提供的一个query属性,当你通过model类的query属性,你可以得到一个数据库表的查询结果集。
  i.User.query.filter_by(username='peter').first(),通过filter_by方法里的条件表达式来对query所得到的结果集进行过滤,得到你想要得到的结果。
   example:
    Retrieve a user by username通过username属性为’peter‘过滤结果集:
    >>> peter = User.query.filter_by(username='peter').first()
    >>> peter.id
    1
    >>> peter.email
    u'peter@example.org'
   当不存在结果集时返回none:
    >>> missing = User.query.filter_by(username='missing').first()
    >>> missing is None
    True 

flask怎样查询mysql并显示在页面上

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
class Database(object):    
    def __init__(self, configure):        
        self.configure = configure        
        self.engine = create_engine(self.configure.MYSQL_SQLALCHEMY_URL)        
        self.Session = sessionmaker(bind=self.engine)        
        self.session = self.Session()

from sqlalchemy import Column, Integer, String, Sequence
class User(Base):
    """
       user table
    """
    __tablename__ = 'user'
    id = Column(Integer, Sequence("USER_ID_SEQ"), primary_key=True)
    username = Column(String(256), doc=u"用户名")
    password = Column(String(256), doc=u"密码")

    def __repr__(self):
        return "<User(username='%s', password='%s', id='%s')" % (self.username, self.password, self.id)

使用sqlalchemy来连接mysql并且查询其中的user表的方法,你可以试试。

显示页面的话:

from flask import jsonify
@app.route("/")
def main():
    db = Database(config)
    result = db.session.query(User).filter_by(username="aa").first()
    return jsonify({"result":result})

会把结果显示在页面上,是以json方式显示的

flask怎样查询mysql并显示在页面上

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
class Database(object):    
    def __init__(self, configure):        
        self.configure = configure        
        self.engine = create_engine(self.configure.MYSQL_SQLALCHEMY_URL)        
        self.Session = sessionmaker(bind=self.engine)        
        self.session = self.Session()

from sqlalchemy import Column, Integer, String, Sequence
class User(Base):
    """
       user table
    """
    __tablename__ = 'user'
    id = Column(Integer, Sequence("USER_ID_SEQ"), primary_key=True)
    username = Column(String(256), doc=u"用户名")
    password = Column(String(256), doc=u"密码")

    def __repr__(self):
        return "<User(username='%s', password='%s', id='%s')" % (self.username, self.password, self.id)

使用sqlalchemy来连接mysql并且查询其中的user表的方法,你可以试试。

显示页面的话:

from flask import jsonify
@app.route("/")
def main():
    db = Database(config)
    result = db.session.query(User).filter_by(username="aa").first()
    return jsonify({"result":result})

会把结果显示在页面上,是以json方式显示的

在python3下怎样用flask-sqlalchemy对mysql数据库操作

以 Debian/Ubuntu 为例(请确保有管理员权限):

1.MySQL

代码如下:

apt-get install mysql-server

apt-get install mysql-client

apt-get install libmysqlclient15-dev

2.python-mysqldb

代码如下:

apt-get install python-mysqldb

3.easy_install

代码如下:

wget http:// peak.telecommunity.com/dist/ez_setup.py

python ez_setup.py

4.MySQL-Python

代码如下:

easy_install MySQL-Python

5.SQLAlchemy

代码如下:

easy_install SQLAlchemy

6、安装完成后使用下面代码测试连接

代码如下:

from sqlalchemy import create_engine

from sqlalchemy.orm import sessionmaker

DB_CONNECT_STRING = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'

engine = create_engine(DB_CONNECT_STRING, echo=True)

DB_Session = sessionmaker(bind=engine)

session = DB_Session()

7、数据操作(增删改查)

代码如下:

from sqlalchemy import func, or_, not_

user = User(name='a')

session.add(user)

user = User(name='b')

session.add(user)

user = User(name='a')

session.add(user)

user = User()

session.add(user)

session.commit()

query = session.query(User)

print query # 显示SQL 语句

print query.statement # 同上

for user in query: # 遍历时查询

print user.name

print query.all() # 返回的是一个类似列表的对象

print query.first().name # 记录不存在时,first() 会返回 None

# print query.one().name # 不存在,或有多行记录时会抛出异常

print query.filter(User.id == 2).first().name

print query.get(2).name # 以主键获取,等效于上句

print query.filter('id = 2').first().name # 支持字符串

query2 = session.query(User.name)

print query2.all() # 每行是个元组

print query2.limit(1).all() # 最多返回 1 条记录

print query2.offset(1).all() # 从第 2 条记录开始返回

print query2.order_by(User.name).all()

print query2.order_by('name').all()

print query2.order_by(User.name.desc()).all()

print query2.order_by('name desc').all()

print session.query(User.id).order_by(User.name.desc(), User.id).all()

print query2.filter(User.id == 1).scalar() # 如果有记录,返回第一条记录的第一个元素

print session.query('id').select_from(User).filter('id = 1').scalar()

print query2.filter(User.id > 1, User.name != 'a').scalar() # and

query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and

query3 = query3.filter(User.name != 'a')

print query3.scalar()

print query2.filter(or_(User.id == 1, User.id == 2)).all() # or

print query2.filter(User.id.in_((1, 2))).all() # in

query4 = session.query(User.id)

print query4.filter(User.name == None).scalar()

print query4.filter('name is null').scalar()

print query4.filter(not_(User.name == None)).all() # not

print query4.filter(User.name != None).all()

print query4.count()

print session.query(func.count('*')).select_from(User).scalar()

print session.query(func.count('1')).select_from(User).scalar()

print session.query(func.count(User.id)).scalar()

print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表

print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以用 limit() count() 的返回数

print session.query(func.sum(User.id)).scalar()

print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该数据库支持

print session.query(func.current_timestamp()).scalar()

print session.query(func.md5(User.name)).filter(User.id == 1).scalar()

query.filter(User.id == 1).update({User.name: 'c'})

user = query.get(1)

print user.name

user.name = 'd'

session.flush() # 写数据库,但并不提交

print query.get(1).name

session.delete(user)

session.flush()

print query.get(1)

session.rollback()

print query.get(1).name

query.filter(User.id == 1).delete()

session.commit()

print query.get(1)

如何在flask-sqlalchemy查询最后一条记录

思路:先过滤出一个班级的所有记录,然后倒序查询这些记录中的第一个记录即为原来的最后一个记录ModelName.query.filter_by(RowName='a').order_by(ModelName.RowName.desc()).first()

显示全文