Na publicación anterior , aprendemos o que ocorre cando se chama unha función decorada con transaction.atomic
e o que ocorre with transaction.atomic()
. En resumo:
BEGIN;
envíase á base de datos (se a base de datos é PSQL ou outra variante de SQL).
Agora, discutiremos que poñer nunha transacción e que evitar. Debido aos seguintes comportamentos de transacción, certas operacións son perigosas cando se colocan nun bloque de transaccións.
Continúa lendo alternativas e exemplos de código.
O risco principal é que as transaccións manteñan bloqueos ata que se fagan para evitar operacións conflitivas en táboas e filas e permitir que a transacción sexa reversible; isto é esencial para que as operacións de base de datos na transacción sexan atómicas. Isto significa que unha transacción de longa duración que opera en varias táboas ou algunhas críticas pode causar interrupcións ao acaparar bloqueos e impedir lecturas/escrituras nesas táboas/filas.
En esencia, se poñemos o código incorrecto nun bloque de transaccións, podemos eliminar a base de datos de forma efectiva bloqueando todas as outras conexións á base de datos para que non realicen operacións nela.
O risco secundario é que as transaccións deben ser reversibles e espérase que sexan reversibles. A base de datos inverte automaticamente cada operación se se produce un erro na transacción. Polo tanto, as operacións de base de datos que poñemos na transacción deberían ser reversibles; na súa maior parte, non necesitamos preocuparnos por isto con PSQL. Pero e outros códigos?
Moitas veces, cando cambiamos os nosos datos, necesitamos facer tarefas de seguimento como activar eventos, actualizar servizos, enviar notificacións push, etc. Estas tarefas NON son reversibles: non podemos anular o envío dun evento, solicitude ou notificación. Se se produce un erro, os cambios de datos revéranse, pero xa enviamos a notificación push dicindo: "Xerouse o teu informe; fai clic aquí para velo". Que ocorre cando o usuario ou outros servizos actúan sobre esta información falsa? Haberá unha cascada de fracasos. Polo tanto, calquera código que non se poida reverter non debería estar nunha transacción ou corremos o risco de deixar o noso sistema nun mal estado cando se produce un erro na transacción.
Estas son cousas que, dependendo da cantidade de datos que se procesen e do tráfico de base de datos, poden causar interrupcións debido a que se mantén os bloqueos durante demasiado tempo. Todas estas cousas están ben se non tardan moito.
Consultas lentas : a obtención de datos adoita ser rápida, excepto nestes tres escenarios. Estas consultas ralentizarán a transacción e prolongarán o tempo que mantén os bloqueos, o que terá efectos adversos sobre outros usuarios.
@transaction.atomic def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # do the transactional work with the large_orders queryset
# fixed def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # Start the transaction block with transaction.atomic(): # do the transactional work with the large_orders queryset
Operacións en varias táboas : unha transacción con operacións en varias táboas pode bloquear cada táboa ata que se faga. Isto é especialmente frecuente nas migracións de Django, outra razón para manter as migracións pequenas e centradas nunha ou algunhas táboas á vez.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # too many operations operations = [ migrations.RemoveField("Author", "age"), migrations.AddField("Author", "rating", models.IntegerField(default=0)), migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
# fixed # 1st migration class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] operations = [ migrations.RemoveField("Author", "age"), ] # 2nd migration class Migration(migrations.Migration): dependencies = [("migrations", "0002_initial")] operations = [ migrations.AddField("Author", "rating", models.IntegerField(default=0)), ] # 3rd migration class Migration(migrations.Migration): dependencies = [("migrations", "0003_initial")] operations = [ migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
Migracións de datos — Dado que as transaccións manteñen un bloqueo desde que se executa a consulta ata que a transacción falla ou remata, unha migración que opera en cada fila da táboa acabará bloqueando toda a táboa, xa sexa impedindo lecturas ou escrituras en cada fila.
def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction with transaction.atomic(): # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") legacy = user.legacy_profile legacy.update_new_user_profile()
# fixed def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") with transaction.atomic(): legacy = user.legacy_profile legacy.update_new_user_profile()
Execútalos máis tarde. Estas consultas son necesarias, PERO non temos que executalas en horario comercial. A mellor política aquí é reducir o risco dunha interrupción determinando o crucial que é a táboa, estimando canto tempo pode levar a migración, executar a migración cando a base de datos teña menos tráfico e preparando un plan de retroceso.
Reducir o tempo que leva a transacción. Isto pódese facer particionando a táboa e executando a migración en particións individuais. Particións PSQL e Django
*Django só envolve as transaccións en torno ás migracións para PSQL e SQLite.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # this migration, if on a large table, can slow down and block other operations # do it later operations = [ migrations.RemoveField("Users", "middle_name"), ]
Operacións irreversibles : todo o que hai na transacción debería ser reversible se a transacción é revertida; se poñemos unha chamada API na transacción, non se pode desfacer.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) async_notification_service.send_email(user.email, "You can login now!") Account.objects.create(user=user, balance=0) # rest of user creation proccess
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # the transaction is still in progress, so it can still be rolled back, it is not # committed until the transaction block is exited, so putting the notification here # is not a good idea - especially if the job starts immediately tries to read the data # this creates a race condition async_notification_service.send_email(user.email, "You can login now!") def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))
Bloqueo de chamadas : dado que as transaccións impiden que todas as outras consultas funcionen nas táboas/filas na que a transacción está a cambiar, calquera código que aumente a duración da transacción fará que a base de datos se bloquee, provocando tempo de espera e falta de resposta nas aplicacións dependentes da base de datos.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) for file_data in user_files: # transaction waits for this upload and so do all other connections that need access to table/rows the transaction # uses url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) Account.objects.create(user=user, balance=0) # rest of user creation proccess
# not bad def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) # best fix from functools import partial def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # partials create a callable with the function and arguments # so that the function is called with the arguments when the transaction is committed # TODO: diff between partial and lambda here??? transaction.on_commit(partial(create_user_files, user_files, user)) def create_user_files(user_files, user): for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url)
Na seguinte publicación, mergullaremos en PSQL e descubriremos: