No post anterior, aprendemos o que acontece quando uma função decorada com transaction.atomic
é chamada e o que acontece with transaction.atomic()
. Em resumo:
BEGIN;
é enviada ao BD (se o BD for PSQL ou outra variante de SQL).
Agora, discutiremos o que colocar em uma transação e o que evitar. Devido aos seguintes comportamentos de transação, certas operações são perigosas quando colocadas em um bloco de transação.
Continue lendo para alternativas e exemplos de código.
O risco primário é que as transações mantenham bloqueios até que sejam feitas para evitar operações conflitantes em tabelas e linhas e permitir que a transação seja reversível - isso é essencial para tornar as operações do BD na transação atômicas. Isso significa que uma transação de longa duração que opera em várias tabelas ou algumas críticas pode causar interrupções ao monopolizar bloqueios e impedir leituras/gravações nessas tabelas/linhas.
Em essência, se colocarmos o código errado em um bloco de transação, podemos efetivamente derrubar o banco de dados, bloqueando todas as outras conexões com o banco de dados de realizar operações nele.
O risco secundário é que as transações precisam ser reversíveis e ESPERA-SE que sejam reversíveis. O BD reverte automaticamente todas as operações se ocorrer um erro na transação. Portanto, as operações do BD que colocamos na transação devem ser reversíveis - na maioria das vezes, não precisamos nos preocupar com isso com PSQL. Mas e quanto a outros códigos?
Muitas vezes, quando alteramos nossos dados, precisamos fazer tarefas de acompanhamento, como disparar eventos, atualizar serviços, enviar notificações push, etc. Essas tarefas NÃO são reversíveis - não podemos cancelar o envio de um evento, uma solicitação ou uma notificação. Se ocorrer um erro, as alterações de dados serão revertidas, mas já enviamos a notificação push dizendo: "Seu relatório foi gerado; clique aqui para visualizá-lo". O que acontece quando o usuário ou outros serviços agem com base nessas informações falsas? Haverá uma cascata de falhas. Portanto, qualquer código que não possa ser revertido não deve estar em uma transação, ou corremos o risco de deixar nosso sistema em um estado ruim quando ocorrer um erro na transação.
Essas são coisas que, dependendo de quantos dados estão sendo processados e do tráfego do BD, podem causar interrupções devido à retenção de bloqueios por muito tempo. Todas essas coisas são boas se não demorarem muito.
Consultas lentas — A busca de dados geralmente é rápida, exceto nesses três cenários. Essas consultas tornarão a transação mais lenta e estenderão o tempo que ela mantém os bloqueios, o que terá efeitos adversos em outros usuários.
@transaction.atomic def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # do the transactional work with the large_orders queryset
# fixed def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # Start the transaction block with transaction.atomic(): # do the transactional work with the large_orders queryset
Operações em múltiplas tabelas — uma transação com operações em múltiplas tabelas pode bloquear cada tabela até que seja feita. Isso é especialmente prevalente em migrações Django — outro motivo para manter migrações pequenas e focadas em uma ou algumas tabelas por vez.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # too many operations operations = [ migrations.RemoveField("Author", "age"), migrations.AddField("Author", "rating", models.IntegerField(default=0)), migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
# fixed # 1st migration class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] operations = [ migrations.RemoveField("Author", "age"), ] # 2nd migration class Migration(migrations.Migration): dependencies = [("migrations", "0002_initial")] operations = [ migrations.AddField("Author", "rating", models.IntegerField(default=0)), ] # 3rd migration class Migration(migrations.Migration): dependencies = [("migrations", "0003_initial")] operations = [ migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
Migrações de dados — Como as transações mantêm um bloqueio desde o momento em que a consulta é executada até que a transação falhe ou termine, uma migração que opera em todas as linhas da tabela acabará bloqueando a tabela inteira, seja impedindo leituras ou gravações em cada linha.
def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction with transaction.atomic(): # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") legacy = user.legacy_profile legacy.update_new_user_profile()
# fixed def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") with transaction.atomic(): legacy = user.legacy_profile legacy.update_new_user_profile()
Execute-as mais tarde. Essas consultas são necessárias, MAS não precisamos executá-las durante o horário comercial. A melhor política aqui é reduzir o risco de uma interrupção determinando o quão crucial é a tabela, estimando quanto tempo a migração pode levar, executando a migração quando o BD tiver menos tráfego e preparando um plano de reversão.
Reduza o tempo que a transação leva. Isso pode ser feito particionando a tabela e executando a migração em partições individuais. Partições PSQL e Django
*O Django só envolve transações em migrações para PSQL e SQLite.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # this migration, if on a large table, can slow down and block other operations # do it later operations = [ migrations.RemoveField("Users", "middle_name"), ]
Operações irreversíveis — Tudo na transação deve ser reversível se a transação for revertida; se colocarmos uma chamada de API na transação, ela não poderá ser desfeita.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) async_notification_service.send_email(user.email, "You can login now!") Account.objects.create(user=user, balance=0) # rest of user creation proccess
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # the transaction is still in progress, so it can still be rolled back, it is not # committed until the transaction block is exited, so putting the notification here # is not a good idea - especially if the job starts immediately tries to read the data # this creates a race condition async_notification_service.send_email(user.email, "You can login now!") def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))
Bloqueio de chamadas — como as transações impedem que todas as outras consultas operem nas tabelas/linhas que a transação está alterando, qualquer código que aumente a duração da transação fará com que o banco de dados seja bloqueado, causando tempos limite e falta de resposta nos aplicativos dependentes do banco de dados.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) for file_data in user_files: # transaction waits for this upload and so do all other connections that need access to table/rows the transaction # uses url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) Account.objects.create(user=user, balance=0) # rest of user creation proccess
# not bad def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) # best fix from functools import partial def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # partials create a callable with the function and arguments # so that the function is called with the arguments when the transaction is committed # TODO: diff between partial and lambda here??? transaction.on_commit(partial(create_user_files, user_files, user)) def create_user_files(user_files, user): for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url)
No próximo post, vamos nos aprofundar no PSQL e descobrir: