Aggregation with filter (Django 2.0+) by Haki Benita

Prior to Django 2.0 if we wanted to get something like the total number of users and the total number of active users we had to resort to conditional expressions:

from django.contrib.auth.models import User
from django.db.models import (
    Count,
    Sum,
    Case,
    When,
    Value,
    IntegerField,
)

User.objects.aggregate(
    total_users=Count('id'),
    total_active_users=Sum(Case(
        When(is_active=True, then=Value(1)),
        default=Value(0),
        output_field=IntegerField(),
    )),
)

In Django 2.0 a filter argument to aggregate functions was added to make this a lot easier:

from django.contrib.auth.models import User
from django.db.models import Count, F

User.objects.aggregate(
    total_users=Count('id'),
    total_active_users=Count('id', filter=F('is_active')),
)

Nice, short and sweet.

If you are using PostgreSQL, the two queries will look like this:

SELECT
    COUNT(id) AS total_users,
    SUM(CASE WHEN is_active THEN 1 ELSE 0 END) AS total_active_users
FROM
    auth_users;

SELECT
    COUNT(id) AS total_users,
    COUNT(id) FILTER (WHERE is_active) AS total_active_users
FROM
    auth_users;

The second query uses the FILTER (WHERE …) clause.

Example 1

"""Gestion des tables mensuels de statistiques."""
import logging
from typing import Dict, Union, TypedDict

import django

# https://docs.djangoproject.com/en/dev/topics/db/sql/
from django.db import connection, models
from django.db.models import Avg, Case, Count, F, IntegerField, Min, Q, Sum, When
from django.db.models import QuerySet
from django.utils.translation import ugettext as _
from django_enumfield import enum
from log_transaction_notariale.models_enums import (
    EnumMatchBiographics,
    EnumMatchLocal,
    EnumMatchRNEC,
    EnumStatusTransaction,
)


class InfosTransactionMixin(models.Model):
    @property
    def percent_nb_transactions_ok(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_transactions_ok / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_transactions_pb(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_transactions_pb / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_match_rnec_disabled(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_match_rnec_disabled / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_global_match_rnec_ok(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_global_match_rnec_ok / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_global_match_rnec_pb(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_global_match_rnec_pb / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_global_match_rnec_inconnu(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_global_match_rnec_inconnu / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_global_match_rnec_incorrect(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_global_match_rnec_incorrect / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_match_local_ok(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_match_local_ok / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_match_local_pb(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_match_local_pb / self.nb_transactions
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_match_local_inconnu(self):
        percent = ""
        if self.nb_transactions > 0:
            p = self.nb_match_local_inconnu / self.nb_transactions
            percent = f"({p:.2%})"
        return percent


    @staticmethod
    def get_cumul_compteurs(transactions: QuerySet) -> Union[Dict[str, int], None]:
        """Cette fonction a l'air très lente.

        Il faut la réécrire par une simple boucle sur les transactions ?

        Emploi des expressions conditionnelles le lundi 20 janvier 2020

        - https://docs.djangoproject.com/en/dev/ref/models/conditional-expressions/

        Exemple
        =======

        In [42]: cumul_compteurs
        Out[42]:
        {'total_transactions': 543826,
         'nb_transactions_ok': 540612,
         'nb_transactions_pb': 3214,
         'nb_transactions_inconnu': 0,
         'nb_match_local_ok': 501,
         'nb_match_local_pb': 129487,
         'nb_match_local_inconnu': 413838,
         'nb_global_match_rnec_ok': 393649,
         'nb_global_match_rnec_pb': 132174,
         'nb_global_match_rnec_inconnu': 14789,
         'nb_global_match_rnec_incorrect': 3214,
         'nb_match_rnec1_ok': 318902,
         'nb_match_rnec1_pb': 206844,
         'nb_match_rnec1_inconnu': 14866,
         'nb_match_rnec1_incorrect': 3214,
         'nb_match_rnec2_ok': 321571,
         'nb_match_rnec2_pb': 204185,
         'nb_match_rnec2_inconnu': 14856,
         'nb_match_rnec2_incorrect': 3214,
         'nb_match_rnec_disabled': 14789}


        Appels
        =======

        find . -name "*.py"  | xargs grep -A 2 "InfosTransactionMixin.get_cumul_compteurs"

        - stats/models/models_agence.py:  cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(transactions_agence)
        - stats/models/models_day.py:     cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(transactions_day)
        - stats/models/models_month.py:   cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(
        - stats/models/models_month_agence.py:  cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(
        - stats/models/models_day_agence.py:    cumul_compteurs = InfosTransactionMixin.get_cumul_compteurs(


        """
        if not transactions.exists():
            return None

        # https://docs.djangoproject.com/en/dev/ref/models/conditional-expressions/#case
        dict_compteurs = transactions.aggregate(
            total_transactions=Count("id"),
            nb_transactions_ok=Count(
                "id", filter=Q(status_transaction=EnumStatusTransaction.OK_1)
            ),
            nb_transactions_pb=Count(
                "id", filter=Q(status_transaction=EnumStatusTransaction.PB_0)
            ),
            nb_transactions_inconnu=Count(
                "id", filter=Q(status_transaction=EnumStatusTransaction.INCONNU_MOINS_1)
            ),
            nb_match_local_ok=Count("id", filter=Q(match_local=EnumMatchLocal.OK_1)),
            nb_match_local_pb=Count("id", filter=Q(match_local=EnumMatchLocal.PB_0)),
            nb_match_local_inconnu=Count(
                "id", filter=Q(match_local=EnumMatchLocal.INCONNU_MOINS_1)
            ),
            nb_global_match_rnec_ok=Count(
                "id", filter=Q(global_match_rnec=EnumMatchRNEC.OK_1)
            ),
            nb_global_match_rnec_pb=Count(
                "id", filter=Q(global_match_rnec=EnumMatchRNEC.PB_0)
            ),
            nb_global_match_rnec_inconnu=Count(
                "id", filter=Q(global_match_rnec=EnumMatchRNEC.INCONNU_MOINS_1)
            ),
            nb_global_match_rnec_incorrect=Count(
                "id", filter=Q(global_match_rnec=EnumMatchRNEC.REPONSE_NA_RNEC_MOINS_2)
            ),
            nb_match_rnec1_ok=Count("id", filter=Q(match_rnec1=EnumMatchRNEC.OK_1)),
            nb_match_rnec1_pb=Count("id", filter=Q(match_rnec1=EnumMatchRNEC.PB_0)),
            nb_match_rnec1_inconnu=Count(
                "id", filter=Q(match_rnec1=EnumMatchRNEC.INCONNU_MOINS_1)
            ),
            nb_match_rnec1_incorrect=Count(
                "id", filter=Q(match_rnec1=EnumMatchRNEC.REPONSE_NA_RNEC_MOINS_2)
            ),
            nb_match_rnec2_ok=Count("id", filter=Q(match_rnec2=EnumMatchRNEC.OK_1)),
            nb_match_rnec2_pb=Count("id", filter=Q(match_rnec2=EnumMatchRNEC.PB_0)),
            nb_match_rnec2_inconnu=Count(
                "id", filter=Q(match_rnec2=EnumMatchRNEC.INCONNU_MOINS_1)
            ),
            nb_match_rnec2_incorrect=Count(
                "id", filter=Q(match_rnec2=EnumMatchRNEC.REPONSE_NA_RNEC_MOINS_2)
            ),
            nb_match_rnec_disabled=Count("id", filter=Q(match_rnec_enabled=False)),
        )

        # logger.info(f"get_cumul_compteurs() {dict_compteurs=}")
        return dict_compteurs

Example 2

class InfosDocsMixin(models.Model):
    """Informations communes aux tables de statistiques documents.

    """

    nb_documents = models.PositiveIntegerField(
        help_text=_("Le nombre total de documents")
    )
    nb_documents_type_doc = models.PositiveIntegerField(
        default=0, help_text=_("Le nombre total de documents de type 'Document'")
    )
    nb_documents_type_authorization = models.PositiveIntegerField(
        default=0, help_text=_("Le nombre total de documents de type 'Authorization'")
    )
    nb_documents_waiting = models.PositiveIntegerField(
        default=0, help_text=_("Le nombre total de documents waiting'")
    )
    nb_documents_uploaded = models.PositiveIntegerField(
        default=0, help_text=_("Le nombre total de documents uploaded")
    )
    # filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)&Q(type_doc=EnumTypeDocument.DOCUMENT_1)
    nb_docs_uploaded = models.PositiveIntegerField(
        default=0,
        help_text=_("Le nombre total de documents de type 'Document' uploaded"),
    )
    # Q(status_doc=EnumStatusDocument.WAITING_1)&Q(type_doc=EnumTypeDocument.DOCUMENT_1)
    nb_docs_waiting = models.PositiveIntegerField(
        default=0,
        help_text=_("Le nombre total de documents de type 'Document' waiting"),
    )
    # Q(status_doc=EnumStatusDocument.UPLOADED_2)&Q(type_doc=EnumTypeDocument.AUTHORIZATION_2)
    nb_auths_uploaded = models.PositiveIntegerField(
        default=0,
        help_text=_("Le nombre total de documents de type 'Authorization' uploaded"),
    )
    # Q(status_doc=EnumStatusDocument.WAITING_1)&Q(type_doc=EnumTypeDocument.AUTHORIZATION_2
    nb_auths_waiting = models.PositiveIntegerField(
        default=0,
        help_text=_("Le nombre total de documents de type 'Authorization' waiting"),
    )

    class Meta:
        # https://docs.djangoproject.com/en/dev/ref/models/options/#abstract
        abstract = True

    def __str__(self):
        chaine = (
            f"{self.nb_documents=}\n"
            f"{self.nb_documents_type_doc=} "
            f"{self.nb_documents_type_authorization=} "
            f"{self.nb_documents_waiting=} "
            f"{self.nb_documents_uploaded=}\n"
            f"{self.nb_docs_uploaded=} "
            f"{self.nb_docs_waiting=} "
            f"{self.nb_auths_uploaded=} "
            f"{self.nb_auths_waiting=} "
        )
        return chaine

    @property
    def percent_nb_documents_type_doc(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_documents_type_doc / self.nb_documents
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_documents_type_authorization(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_documents_type_authorization / self.nb_documents
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_documents_waiting(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_documents_waiting / self.nb_documents
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_documents_uploaded(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_documents_uploaded / self.nb_documents
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_docs_uploaded(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_docs_uploaded / self.nb_documents
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_docs_waiting(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_docs_waiting / self.nb_documents
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_auths_uploaded(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_auths_uploaded / self.nb_documents
            percent = f"({p:.2%})"
        return percent

    @property
    def percent_nb_auths_waiting(self):
        percent = ""
        if self.nb_documents > 0:
            p = self.nb_auths_waiting / self.nb_documents
            percent = f"({p:.2%})"
        return percent



    @staticmethod
    def get_cumul_compteurs_documents(
        documents: QuerySet,
    ) -> Union[Dict[str, int], None]:
        """

        In [48]: dict_compteurs
        Out[48]:
        {'nb_documents': 4,
         'nb_documents_type_doc': 2,
         'nb_documents_type_authorization': 2,
         'nb_documents_waiting': 2,
         'nb_documents_uploaded': 2,
         'nb_docs_uploaded': 1,
         'nb_docs_waiting': 1,
         'nb_auths_uploaded': 1,
         'nb_auths_waiting': 1}

        """
        if not documents.exists():
            return None

        # https://docs.djangoproject.com/en/dev/ref/models/conditional-expressions/#case
        dict_compteurs = documents.aggregate(
            nb_documents=Count("id"),
            nb_documents_type_doc=Count(
                "id", filter=Q(type_doc=EnumTypeDocument.DOCUMENT_1)
            ),
            nb_documents_type_authorization=Count(
                "id", filter=Q(type_doc=EnumTypeDocument.AUTHORIZATION_2)
            ),
            nb_documents_waiting=Count(
                "id", filter=Q(status_doc=EnumStatusDocument.WAITING_1)
            ),
            nb_documents_uploaded=Count(
                "id", filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)
            ),
            nb_docs_uploaded=Count(
                "id",
                filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)
                & Q(type_doc=EnumTypeDocument.DOCUMENT_1),
            ),
            nb_docs_waiting=Count(
                "id",
                filter=Q(status_doc=EnumStatusDocument.WAITING_1)
                & Q(type_doc=EnumTypeDocument.DOCUMENT_1),
            ),
            nb_auths_uploaded=Count(
                "id",
                filter=Q(status_doc=EnumStatusDocument.UPLOADED_2)
                & Q(type_doc=EnumTypeDocument.AUTHORIZATION_2),
            ),
            nb_auths_waiting=Count(
                "id",
                filter=Q(status_doc=EnumStatusDocument.WAITING_1)
                & Q(type_doc=EnumTypeDocument.AUTHORIZATION_2),
            ),
        )

        logger.info(f"InfosDocsMixin.get_cumul_compteurs_documents() {dict_compteurs=}")
        return dict_compteurs