Aggregation with filter (Django 2.0+) by Haki Benita ¶
See also
Prior to Django 2.0 if we wanted to get something like the total number of users and the total number of active users we had to resort to conditional expressions:
from django.contrib.auth.models import User
from django.db.models import (
Count,
Sum,
Case,
When,
Value,
IntegerField,
)
User.objects.aggregate(
total_users=Count('id'),
total_active_users=Sum(Case(
When(is_active=True, then=Value(1)),
default=Value(0),
output_field=IntegerField(),
)),
)
In Django 2.0 a filter argument to aggregate functions was added to make this a lot easier:
from django.contrib.auth.models import User
from django.db.models import Count, F
User.objects.aggregate(
total_users=Count('id'),
total_active_users=Count('id', filter=F('is_active')),
)
Nice, short and sweet.
If you are using PostgreSQL, the two queries will look like this:
SELECT
COUNT(id) AS total_users,
SUM(CASE WHEN is_active THEN 1 ELSE 0 END) AS total_active_users
FROM
auth_users;
SELECT
COUNT(id) AS total_users,
COUNT(id) FILTER (WHERE is_active) AS total_active_users
FROM
auth_users;
The second query uses the FILTER (WHERE …) clause.
Example 1 ¶
"""Gestion des tables mensuels de statistiques."""
import logging
from typing import Dict, Union, TypedDict
import django
# https://docs.djangoproject.com/en/dev/topics/db/sql/
from django.db import connection, models
from django.db.models import Avg, Case, Count, F, IntegerField, Min, Q, Sum, When
from django.db.models import QuerySet
from django.utils.translation import ugettext as _
from django_enumfield import enum
from log_transaction_notariale.models_enums import (
EnumMatchBiographics,
EnumMatchLocal,
EnumMatchRNEC,
EnumStatusTransaction,
)
class InfosTransactionMixin(models.Model):
@property
def percent_nb_transactions_ok(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_transactions_ok / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_transactions_pb(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_transactions_pb / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_match_rnec_disabled(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_match_rnec_disabled / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_global_match_rnec_ok(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_global_match_rnec_ok / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_global_match_rnec_pb(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_global_match_rnec_pb / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_global_match_rnec_inconnu(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_global_match_rnec_inconnu / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_global_match_rnec_incorrect(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_global_match_rnec_incorrect / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_match_local_ok(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_match_local_ok / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_match_local_pb(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_match_local_pb / self.nb_transactions
percent = f"({p:.2%})"
return percent
@property
def percent_nb_match_local_inconnu(self):
percent = ""
if self.nb_transactions > 0:
p = self.nb_match_local_inconnu / self.nb_transactions
percent = f"({p:.2%})"
return percent
@staticmethod
def get_cumul_compteurs(transactions: QuerySet) -> Union[Dict[str, int], None]:
"""Cette fonction a l'air très lente.
Il faut la réécrire par une simple boucle sur les transactions ?
Emploi des expressions conditionnelles le lundi 20 janvier 2020
- https://docs.djangoproject.com/en/dev/ref/models/conditional-expressions/
Exemple
=======
In [42]: cumul_compteurs
Out[42]:
{'total_transactions': 543826,
'nb_transactions_ok': 540612,
'nb_transactions_pb': 3214,
'nb_transactions_inconnu': 0,
'nb_match_local_ok': 501,
'nb_match_local_pb': 129487,
'nb_match_local_inconnu': 413838,
'nb_global_match_rnec_ok': 393649,
'nb_global_match_rnec_pb': 132174,
'nb_global_match_rnec_inconnu': 14789,
'nb_global_match_rnec_incorrect': 3214,
'nb_match_rnec1_ok': 318902,
'nb_match_rnec1_pb': 206844,
'nb_match_rnec1_inconnu': 14866,
'nb_match_rnec1_incorrect': 3214,
'nb_match_rnec2_ok': 321571,
'nb_match_rnec2_pb': 204185,
'nb_match_rnec2_inconnu': 14856,
'nb_match_rnec2_incorrect': 3214,
'nb_match_rnec_disabled': 14789}
Appels
=======
find . -name "*.py" | xargs grep -A 2 "InfosTransactionMixin.get_cumul_compteurs"
- stats/models/models_agence.py: cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(transactions_agence)
- stats/models/models_day.py: cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(transactions_day)
- stats/models/models_month.py: cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(
- stats/models/models_month_agence.py: cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(
- stats/models/models_day_agence.py: cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(
"""
if not transactions.exists():
return None
# https://docs.djangoproject.com/en/dev/ref/models/conditional-expressions/#case
dict_compteurs = transactions.aggregate(
total_transactions=Count("id"),
nb_transactions_ok=Count(
"id", filter=Q(status_transaction=EnumStatusTransaction.OK_1)
),
nb_transactions_pb=Count(
"id", filter=Q(status_transaction=EnumStatusTransaction.PB_0)
),
nb_transactions_inconnu=Count(
"id", filter=Q(status_transaction=EnumStatusTransaction.INCONNU_MOINS_1)
),
nb_match_local_ok=Count("id", filter=Q(match_local=EnumMatchLocal.OK_1)),
nb_match_local_pb=Count("id", filter=Q(match_local=EnumMatchLocal.PB_0)),
nb_match_local_inconnu=Count(
"id", filter=Q(match_local=EnumMatchLocal.INCONNU_MOINS_1)
),
nb_global_match_rnec_ok=Count(
"id", filter=Q(global_match_rnec=EnumMatchRNEC.OK_1)
),
nb_global_match_rnec_pb=Count(
"id", filter=Q(global_match_rnec=EnumMatchRNEC.PB_0)
),
nb_global_match_rnec_inconnu=Count(
"id", filter=Q(global_match_rnec=EnumMatchRNEC.INCONNU_MOINS_1)
),
nb_global_match_rnec_incorrect=Count(
"id", filter=Q(global_match_rnec=EnumMatchRNEC.REPONSE_NA_RNEC_MOINS_2)
),
nb_match_rnec1_ok=Count("id", filter=Q(match_rnec1=EnumMatchRNEC.OK_1)),
nb_match_rnec1_pb=Count("id", filter=Q(match_rnec1=EnumMatchRNEC.PB_0)),
nb_match_rnec1_inconnu=Count(
"id", filter=Q(match_rnec1=EnumMatchRNEC.INCONNU_MOINS_1)
),
nb_match_rnec1_incorrect=Count(
"id", filter=Q(match_rnec1=EnumMatchRNEC.REPONSE_NA_RNEC_MOINS_2)
),
nb_match_rnec2_ok=Count("id", filter=Q(match_rnec2=EnumMatchRNEC.OK_1)),
nb_match_rnec2_pb=Count("id", filter=Q(match_rnec2=EnumMatchRNEC.PB_0)),
nb_match_rnec2_inconnu=Count(
"id", filter=Q(match_rnec2=EnumMatchRNEC.INCONNU_MOINS_1)
),
nb_match_rnec2_incorrect=Count(
"id", filter=Q(match_rnec2=EnumMatchRNEC.REPONSE_NA_RNEC_MOINS_2)
),
nb_match_rnec_disabled=Count("id", filter=Q(match_rnec_enabled=False)),
)
# logger.info(f"get_cumul_compteurs() {dict_compteurs=}")
return dict_compteurs
Example 2 ¶
class InfosDocsMixin(models.Model):
"""Informations communes aux tables de statistiques documents.
"""
nb_documents = models.PositiveIntegerField(
help_text=_("Le nombre total de documents")
)
nb_documents_type_doc = models.PositiveIntegerField(
default=0, help_text=_("Le nombre total de documents de type 'Document'")
)
nb_documents_type_authorization = models.PositiveIntegerField(
default=0, help_text=_("Le nombre total de documents de type 'Authorization'")
)
nb_documents_waiting = models.PositiveIntegerField(
default=0, help_text=_("Le nombre total de documents waiting'")
)
nb_documents_uploaded = models.PositiveIntegerField(
default=0, help_text=_("Le nombre total de documents uploaded")
)
# filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)&Q(type_doc=EnumTypeDocument.DOCUMENT_1)
nb_docs_uploaded = models.PositiveIntegerField(
default=0,
help_text=_("Le nombre total de documents de type 'Document' uploaded"),
)
# Q(status_doc=EnumStatusDocument.WAITING_1)&Q(type_doc=EnumTypeDocument.DOCUMENT_1)
nb_docs_waiting = models.PositiveIntegerField(
default=0,
help_text=_("Le nombre total de documents de type 'Document' waiting"),
)
# Q(status_doc=EnumStatusDocument.UPLOADED_2)&Q(type_doc=EnumTypeDocument.AUTHORIZATION_2)
nb_auths_uploaded = models.PositiveIntegerField(
default=0,
help_text=_("Le nombre total de documents de type 'Authorization' uploaded"),
)
# Q(status_doc=EnumStatusDocument.WAITING_1)&Q(type_doc=EnumTypeDocument.AUTHORIZATION_2
nb_auths_waiting = models.PositiveIntegerField(
default=0,
help_text=_("Le nombre total de documents de type 'Authorization' waiting"),
)
class Meta:
# https://docs.djangoproject.com/en/dev/ref/models/options/#abstract
abstract = True
def __str__(self):
chaine = (
f"{self.nb_documents=}\n"
f"{self.nb_documents_type_doc=} "
f"{self.nb_documents_type_authorization=} "
f"{self.nb_documents_waiting=} "
f"{self.nb_documents_uploaded=}\n"
f"{self.nb_docs_uploaded=} "
f"{self.nb_docs_waiting=} "
f"{self.nb_auths_uploaded=} "
f"{self.nb_auths_waiting=} "
)
return chaine
@property
def percent_nb_documents_type_doc(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_documents_type_doc / self.nb_documents
percent = f"({p:.2%})"
return percent
@property
def percent_nb_documents_type_authorization(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_documents_type_authorization / self.nb_documents
percent = f"({p:.2%})"
return percent
@property
def percent_nb_documents_waiting(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_documents_waiting / self.nb_documents
percent = f"({p:.2%})"
return percent
@property
def percent_nb_documents_uploaded(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_documents_uploaded / self.nb_documents
percent = f"({p:.2%})"
return percent
@property
def percent_nb_docs_uploaded(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_docs_uploaded / self.nb_documents
percent = f"({p:.2%})"
return percent
@property
def percent_nb_docs_waiting(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_docs_waiting / self.nb_documents
percent = f"({p:.2%})"
return percent
@property
def percent_nb_auths_uploaded(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_auths_uploaded / self.nb_documents
percent = f"({p:.2%})"
return percent
@property
def percent_nb_auths_waiting(self):
percent = ""
if self.nb_documents > 0:
p = self.nb_auths_waiting / self.nb_documents
percent = f"({p:.2%})"
return percent
@staticmethod
def get_cumul_compteurs_documents(
documents: QuerySet,
) -> Union[Dict[str, int], None]:
"""
In [48]: dict_compteurs
Out[48]:
{'nb_documents': 4,
'nb_documents_type_doc': 2,
'nb_documents_type_authorization': 2,
'nb_documents_waiting': 2,
'nb_documents_uploaded': 2,
'nb_docs_uploaded': 1,
'nb_docs_waiting': 1,
'nb_auths_uploaded': 1,
'nb_auths_waiting': 1}
"""
if not documents.exists():
return None
# https://docs.djangoproject.com/en/dev/ref/models/conditional-expressions/#case
dict_compteurs = documents.aggregate(
nb_documents=Count("id"),
nb_documents_type_doc=Count(
"id", filter=Q(type_doc=EnumTypeDocument.DOCUMENT_1)
),
nb_documents_type_authorization=Count(
"id", filter=Q(type_doc=EnumTypeDocument.AUTHORIZATION_2)
),
nb_documents_waiting=Count(
"id", filter=Q(status_doc=EnumStatusDocument.WAITING_1)
),
nb_documents_uploaded=Count(
"id", filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)
),
nb_docs_uploaded=Count(
"id",
filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)
& Q(type_doc=EnumTypeDocument.DOCUMENT_1),
),
nb_docs_waiting=Count(
"id",
filter=Q(status_doc=EnumStatusDocument.WAITING_1)
& Q(type_doc=EnumTypeDocument.DOCUMENT_1),
),
nb_auths_uploaded=Count(
"id",
filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)
& Q(type_doc=EnumTypeDocument.AUTHORIZATION_2),
),
nb_auths_waiting=Count(
"id",
filter=Q(status_doc=EnumStatusDocument.WAITING_1)
& Q(type_doc=EnumTypeDocument.AUTHORIZATION_2),
),
)
logger.info(f"InfosDocsMixin.get_cumul_compteurs_documents() {dict_compteurs=}")
return dict_compteurs