# some patches for postman
+from typing import Any, cast
from django.db import models
from django.db.models.expressions import RawSQL
from django.db.models.query import QuerySet
MessageManager._folder_old = MessageManager._folder
-def _folder(self, related, filters, option=None, order_by=None, query_dict=None):
- """Base code, in common to the folders."""
- search = None
- if query_dict:
- search = query_dict.get('search')
- qs = self.all() if option == OPTION_MESSAGES else QuerySet(self.model, PostmanQuery(self.model), using=self._db)
- if related:
- qs = qs.select_related(*related)
- if order_by:
- qs = qs.order_by(order_by)
- if isinstance(filters, (list, tuple)):
- lookups = models.Q()
- for filter in filters:
- lookups |= models.Q(**filter)
- else:
- lookups = models.Q(**filters)
-
- if search:
- lookups &= models.Q(subject__icontains=search)\
- | models.Q(body__icontains=search)\
- | models.Q(sender__username__icontains=search)\
- | models.Q(sender__first_name__icontains=search)\
- | models.Q(sender__last_name__icontains=search)\
- | models.Q(recipient__first_name__icontains=search)\
- | models.Q(recipient__last_name__icontains=search)
- if option == OPTION_MESSAGES:
- qs = qs.filter(lookups)
- # Adding a 'count' attribute, to be similar to the by-conversation case,
- # should not be necessary. Otherwise add:
- # .extra(select={'count': 'SELECT 1'})
- else:
- qs = qs.annotate(count=RawSQL('{0}.count'.format(qs.query.pm_alias_prefix), ()))
- qs.query.pm_set_extra(table=(
- self.filter(lookups, thread_id__isnull=True).annotate(count=Value(0, IntegerField()))\
- .values_list('id', 'count').order_by(),
- # use separate annotate() to keep control of the necessary order
- self.filter(lookups, thread_id__isnull=False).values('thread').annotate(id=models.Max('pk')).annotate(count=models.Count('pk'))\
- .values_list('id', 'count').order_by(),
- ))
- if query_dict:
- limit = query_dict.get('limit')
- if limit: # at end because doc says "Further filtering or ordering of a sliced queryset is prohibited..."
- try:
- i = int(limit)
- qs = qs[:i]
- except: # just ignore any bad format
- pass
- return qs
-
-MessageManager._folder = _folder
\ No newline at end of file
+
+
+def _folder(
+ self,
+ related: tuple[str, ...] | None,
+ filters: dict[str, Any] | list[dict[str, Any]] | tuple[dict[str, Any], dict[str, Any]],
+ option: str | None = None,
+ order_by: str | None = None,
+ query_dict: dict[str, Any] | None = None
+ ):
+ """Base code, in common to the folders. Patched to add search"""
+ search = None
+ if query_dict:
+ search = query_dict.get('search')
+ qs = self.all() if option == OPTION_MESSAGES else QuerySet['Message'](self.model, PostmanQuery(self.model), using=self._db)
+ if related:
+ qs = qs.select_related(*related)
+ if order_by:
+ qs = qs.order_by(order_by)
+ if isinstance(filters, (list, tuple)):
+ lookups = models.Q()
+ for filter in filters:
+ lookups |= models.Q(**filter)
+ else:
+ lookups = models.Q(**filters)
+
+ if search:
+ lookups &= models.Q(subject__icontains=search)\
+ | models.Q(body__icontains=search)\
+ | models.Q(sender__username__icontains=search)\
+ | models.Q(sender__first_name__icontains=search)\
+ | models.Q(sender__last_name__icontains=search)\
+ | models.Q(recipient__first_name__icontains=search)\
+ | models.Q(recipient__last_name__icontains=search)
+ if option == OPTION_MESSAGES:
+ qs = qs.filter(lookups)
+ # Adding a 'count' attribute, to be similar to the by-conversation case,
+ # should not be necessary. Otherwise add:
+ # .extra(select={'count': 'SELECT 1'})
+ else:
+ qs = qs.annotate(count=RawSQL('{0}.count'.format(cast(PostmanQuery, qs.query).pm_alias_prefix), ()))
+ query = cast(PostmanQuery, qs.query)
+ alias_id = query.pm_alias_id
+ query.pm_set_extra(table=(
+ self.filter(lookups, thread_id__isnull=True)\
+ # Dj 4.0 imposes its col aliases (as 'colN'), for combinator sql, if not explicitly defined.
+ # can't reuse 'id' because "ValueError: The annotation 'id' conflicts with a field on the model.".
+ .annotate(**{alias_id: models.F('pk')})\
+ .annotate(count=models.Value(0, models.IntegerField()))\
+ .values_list(alias_id, 'count').order_by(),
+ # use separate annotate() to keep control of the necessary order
+ self.filter(lookups, thread_id__isnull=False).values('thread')\
+ .annotate(**{alias_id: models.Max('pk')})\
+ .annotate(count=models.Count('pk'))\
+ .values_list(alias_id, 'count').order_by(),
+ ))
+ if query_dict:
+ limit = query_dict.get('limit')
+ if limit: # at end because doc says "Further filtering or ordering of a sliced queryset is prohibited..."
+ try:
+ i = int(limit)
+ qs = qs[:i]
+ except: # just ignore any bad format
+ pass
+ return qs
+
+MessageManager._folder = _folder