En la publicación anterior, aprendimos qué sucede cuando se llama a una función decorada con transaction.atomic
y qué sucede with transaction.atomic()
. En resumen:
BEGIN;
se envía a la base de datos (si la base de datos es PSQL u otra variante de SQL).
Ahora, analizaremos qué incluir en una transacción y qué evitar. Debido a los siguientes comportamientos de transacción, ciertas operaciones son peligrosas cuando se colocan en un bloque de transacción.
Continúe leyendo para conocer alternativas y ejemplos de código.
El riesgo principal es que las transacciones mantengan bloqueos hasta que finalicen para evitar operaciones conflictivas en tablas y filas y permitir que la transacción sea reversible; esto es esencial para que las operaciones de la base de datos en la transacción sean atómicas. Esto significa que una transacción de larga duración que opera en varias tablas o en unas pocas tablas críticas puede causar interrupciones al acaparar bloqueos e impedir lecturas/escrituras en esas tablas/filas.
En esencia, si colocamos el código incorrecto en un bloque de transacción, podemos efectivamente inutilizar la base de datos al bloquear todas las demás conexiones a la base de datos para que no puedan realizar operaciones en ella.
El riesgo secundario es que las transacciones deben ser reversibles y se ESPERA que lo sean. La base de datos revierte automáticamente cada operación si ocurre un error en la transacción. Por lo tanto, las operaciones de la base de datos que ponemos en la transacción deben ser reversibles; en su mayor parte, no necesitamos preocuparnos por esto con PSQL. Pero ¿qué sucede con otros códigos?
A menudo, cuando cambiamos nuestros datos, necesitamos realizar tareas de seguimiento como activar eventos, actualizar servicios, enviar notificaciones push, etc. Estas tareas NO son reversibles: no podemos anular el envío de un evento, una solicitud o una notificación. Si se produce un error, los cambios en los datos se revierten, pero ya hemos enviado la notificación push que dice: "Se ha generado su informe; haga clic aquí para verlo". ¿Qué sucede cuando el usuario u otros servicios actúan sobre esta información falsa? Habrá una cascada de errores. Por lo tanto, cualquier código que no se pueda revertir no debería estar en una transacción, o corremos el riesgo de dejar nuestro sistema en un mal estado cuando se produce un error en la transacción.
Se trata de cosas que, según la cantidad de datos que se estén procesando y el tráfico de la base de datos, pueden provocar interrupciones debido a la retención de bloqueos durante demasiado tiempo. Todas estas cosas están bien si no tardan demasiado.
Consultas lentas : la obtención de datos suele ser rápida, excepto en estos tres casos. Estas consultas ralentizarán la transacción y prolongarán el tiempo de bloqueo, lo que tendrá efectos adversos para otros usuarios.
@transaction.atomic def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # do the transactional work with the large_orders queryset
# fixed def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # Start the transaction block with transaction.atomic(): # do the transactional work with the large_orders queryset
Operaciones en varias tablas : una transacción con operaciones en varias tablas puede bloquear cada tabla hasta que finalice. Esto es especialmente frecuente en las migraciones de Django, otra razón para mantener las migraciones pequeñas y centradas en una o unas pocas tablas a la vez.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # too many operations operations = [ migrations.RemoveField("Author", "age"), migrations.AddField("Author", "rating", models.IntegerField(default=0)), migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
# fixed # 1st migration class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] operations = [ migrations.RemoveField("Author", "age"), ] # 2nd migration class Migration(migrations.Migration): dependencies = [("migrations", "0002_initial")] operations = [ migrations.AddField("Author", "rating", models.IntegerField(default=0)), ] # 3rd migration class Migration(migrations.Migration): dependencies = [("migrations", "0003_initial")] operations = [ migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
Migraciones de datos : dado que las transacciones mantienen un bloqueo desde que se ejecuta la consulta hasta que la transacción falla o finaliza, una migración que opera en cada fila de la tabla terminará bloqueando toda la tabla, ya sea al evitar lecturas o escrituras en cada fila.
def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction with transaction.atomic(): # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") legacy = user.legacy_profile legacy.update_new_user_profile()
# fixed def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") with transaction.atomic(): legacy = user.legacy_profile legacy.update_new_user_profile()
Ejecútelas más tarde. Estas consultas son necesarias, PERO no tenemos que ejecutarlas durante el horario comercial. La mejor política en este caso es reducir el riesgo de una interrupción determinando la importancia de la tabla, estimando cuánto tiempo puede llevar la migración, ejecutando la migración cuando la base de datos tenga menos tráfico y preparando un plan de recuperación.
Reduzca la cantidad de tiempo que demora la transacción. Esto se puede lograr dividiendo la tabla y ejecutando la migración en particiones individuales. Particiones PSQL y Django
*Django solo envuelve transacciones alrededor de migraciones para PSQL y SQLite.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # this migration, if on a large table, can slow down and block other operations # do it later operations = [ migrations.RemoveField("Users", "middle_name"), ]
Operaciones irreversibles : todo en la transacción debe ser reversible si la transacción se revierte; si colocamos una llamada API en la transacción, no se puede deshacer.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) async_notification_service.send_email(user.email, "You can login now!") Account.objects.create(user=user, balance=0) # rest of user creation proccess
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # the transaction is still in progress, so it can still be rolled back, it is not # committed until the transaction block is exited, so putting the notification here # is not a good idea - especially if the job starts immediately tries to read the data # this creates a race condition async_notification_service.send_email(user.email, "You can login now!") def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))
Bloqueo de llamadas : dado que las transacciones impiden que todas las demás consultas operen en las tablas/filas que la transacción está modificando, cualquier código que aumente la duración de la transacción hará que la base de datos se bloquee, lo que ocasiona tiempos de espera y falta de respuesta en las aplicaciones que dependen de la base de datos.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) for file_data in user_files: # transaction waits for this upload and so do all other connections that need access to table/rows the transaction # uses url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) Account.objects.create(user=user, balance=0) # rest of user creation proccess
# not bad def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) # best fix from functools import partial def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # partials create a callable with the function and arguments # so that the function is called with the arguments when the transaction is committed # TODO: diff between partial and lambda here??? transaction.on_commit(partial(create_user_files, user_files, user)) def create_user_files(user_files, user): for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url)
En la próxima publicación, profundizaremos en PSQL y descubriremos: