Django ORM查询在postgresql中作为查询执行时失败

62o28rlo  于 2022-12-18  发布在  PostgreSQL
关注(0)|答案(4)|浏览(171)

我有一个类似的模型

class model(models.Model):
    order_created_time = models.DateTimeField(blank=False, null=False)

我有一个比较日期时间的django查询-

filters = {'order_created_on__gte':'2018-10-10'}
queryset = model.objects.filter(**filters)
query = str(queryset.query)

它将创建一个查询-选择...其中订单创建日期〉= 2018-10-10 00:00:00
当在db上触发时,它给出一个错误-在“00”处或附近的语法错误。
如果我在数据库中手动启动相同的查询,将日期替换为“2018-10-10”,它将工作。
现在我实际上尝试了以下方法,但是所有查询在db查询中给予相同的文本

filters = {'order_created_on__gte':datetime(2018-10-10)}
filters = {'order_created_on__year__gte':2018, 'order_created_on__month__gte':10, 'order_created_on__day__gte':10}


我也试着把它做成这样的字符串-

filters['order_created_on__gte'] = "'{0}'".format(filters['order_created_on__gte'])

其声明为无效格式,预期为2018-10-10 00:00[:00][TZ]
还使用了范围过滤器,所有上述插入此文本在最终查询-

where order_created_on >= 2018-10-10 00:00:00

更新时区也没有任何效果,而只是从查询中删除了+5:30。

t3psigkw

t3psigkw1#

Django实际上并不使用str(queryset.query)的结果来查询数据库,原因很简单:
数据库接受参数化查询,所以Django从来不把参数放在查询中,它把查询字符串和参数列表分别发送到数据库,数据库会决定如何处理。

print(SomeModel.objects.filter(foo=1).query.sql_with_params())

('SELECT "app_model"."id", "app_model"."some_column", "app_model"."foo" FROM "app_model" WHERE "app_model"."foo" = %s',
 (1,))

为了简单起见,它所做的只是在查询中调用__str__()时进行普通字符串替换,而不是在内部使用,因此纯粹是为了方便用户。

query, params = SomeModel.objects.filter(foo=1).query.sql_with_params()
print(query % params)

SELECT "app_model"."id", "app_model"."some_column", "app_model"."foo" FROM "app_model" WHERE "app_model"."foo" = 1

当然,当一个日期被插入到这样的格式字符串中时,它周围没有引号,当你试图复制粘贴查询并在数据库中使用它时,这会导致错误。他们没有费心去实现这一点,因为类型太多,好处太少。
虽然您可以创建自己的函数来自动引用参数,甚至monkeypatch queryset类,以便__str__()方法返回您需要的内容,但我认为如果您经常这样做,django-debug-toolbar是最好的方法。

8hhllhi2

8hhllhi22#

最后在尝试了所有的过滤器后,通过用正则表达式替换查询文本来解决它。

jdg4fx2g

jdg4fx2g3#

厌倦了使用正则表达式来替换查询,使用了这种方法。只需将过滤器与选项或“ORM”或“RAW”放在一起。如果是原始的,则它将构造“where”查询。您也可以在其中添加其他过滤器。

# For ORM
    GET_SETTLEMENT_PARAM_QUERY_FORMATTING = {
    "settlement_no": {"query_text": "settlement_no__in", "list": True},
    "start_date": {"query_text": "start_date__gte", "list": False},
    "end_date": {"query_text": "end_date__lte", "list": False}
    }

    # For RAW query
    BAGSETTLEMENT_DOWNLOAD_PARAM_QUERY_FORMATTING_RAW = {
    "start_date": {"query_text": "created_on >= '{}'", "list": True, "ignore_format": True},
    "end_date": {"query_text": "created_on <= '{}'", "list": True, "ignore_format": True},
    "store_id": {"query_text": "store_id IN {}", "list": True},
    "brand_id": {"query_text": "brand_id IN {}", "list": True},
    "company_id": {"query_text": "company_id = {}", "list": False},
    "bag_id": {"query_text": "bag_id = {}", "list": False},
    "awb_number": {"query_text": "awb_number = {}", "list": False},
    "settlement_no": {"query_text": "settlement_no IN {}", "list": True},
    "settled": {"query_text": "settled = {}", "list": False}

}

    @query_params(formatting=GET_BAGSETTLEMENT_PARAM_QUERY_FORMATTING, query_type='ORM')
        def get(self, request):

            filters = parse_url_params(params=self.request.query_params, formatting=BAGSETTLEMENT_DOWNLOAD_PARAM_QUERY_FORMATTING_RAW, query_type='RAW')

            try:
                link = get_download_link(filters=filters,
                                             store_ids=request.store_ids,
                                             end_file_name=settlement_no)
                return Response({"link": link})
            except Exception as e:
                logger.critical('Bag Settlement Download Link Generation Failed')
                return Response({'link': None, 'error': 'Error while fetching data'})

def query_params(formatting, query_type):
    def assign_query_params(f):
        @wraps(f)
        def decorated_function(req, *args, **kwargs):
            try:
                request = req.request
            except AttributeError:
                request = req

            # print(request.store_ids)
            data = dict(request.GET)
            if data.get('store_id'):
                data['store_id'] = list(set(data.get('store_id')).intersection(set(request.store_ids)))
            else:
                data['store_id'] = request.store_ids

            request.filters = parse_url_params(params=data,
                                               formatting=formatting,
                                               query_type=query_type)
            return f(req, *args, **kwargs)

        return decorated_function

    return assign_query_params

def parse_url_params(params, formatting, query_type):
    """
    :param params: dictionary
    :param formatting: Formatting Dictionary
    :param query_type: ORM else RAW
    :return: filters
    """
    # print(params)
    filters = dict() if query_type == "ORM" else ''

    for key, value in formatting.items():
        param_value = params.get(key)
        if not param_value:
            continue
        query_text = value['query_text']

        if query_type == "ORM":
            query_value = param_value[0] if not value['list'] else param_value

            filters[query_text] = query_value

        else:
            if not value['list']:
                query_value = param_value[0]
            else:

                if value.get('ignore_format'):
                    query_value = param_value
                else:
                    z = str(param_value)
                    query_value = z.replace(z[0], '(').replace(z[len(z)-1], ')')
            syntax = query_text.format(query_value)
            filters += syntax if not len(filters) else ' AND '+syntax

    return filters

通过这个,你可以得到ORM过滤器和RAW查询过滤器。我所面临的问题是日期时间没有被作为字符串插入,所以这里我只是做了一个字符串的所有where条件。
您需要手动输入查询并将其存储在数据库中或存储在如下常量中-

select * from table where {filters};

您还可以在其中添加order by,但需要显式地将其传递到查询中,而不是在过滤器中添加,因此它是这样的-

select * from table where {filters} order by {order_by}


如果您需要添加其他显式过滤器,您可以这样做

select * from table where {filters} and store_id in {store_id};
1cklez4t

1cklez4t4#

您可以使用以下公式生成查询:-

import datetime

sql, params = queryset.query.sql_with_params()

params_with_quotes = []
for x in params:
    if isinstance(x, (str, list, datetime.date, datetime.datetime)):
        x = "'%s'" % str(x).replace("'", "\"").replace("[", "{").replace("]", "}")
    params_with_quotes.append(x)

query = sql % tuple(params_with_quotes)
print(query)

相关问题