В предыдущем посте мы узнали, что происходит при вызове функции, декорированной transaction.atomic
, и что происходит при with transaction.atomic()
. Подводя итог:
BEGIN;
отправляется в базу данных (если база данных — PSQL или другой вариант SQL).
Теперь мы обсудим, что следует помещать в транзакцию, а чего следует избегать. Из-за следующих особенностей поведения транзакций некоторые операции опасны при размещении в блоке транзакций.
Продолжайте читать, чтобы ознакомиться с альтернативами и примерами кода.
Основной риск заключается в том, что транзакции удерживают блокировки до тех пор, пока они не будут выполнены, чтобы предотвратить конфликтующие операции с таблицами и строками и позволить транзакции быть обратимой — это необходимо для того, чтобы сделать операции с базой данных в транзакции атомарными. Это означает, что длительная транзакция, которая работает с несколькими таблицами или несколькими критическими, может вызвать сбои, поглощая блокировки и предотвращая чтение/запись в этих таблицах/строках.
По сути, если мы поместим неправильный код в блок транзакции, мы можем фактически вывести из строя базу данных, заблокировав все другие подключения к базе данных и запретив им выполнять операции с ней.
Вторичный риск заключается в том, что транзакции должны быть обратимыми и ОЖИДАЕТСЯ, что они будут обратимыми. БД автоматически отменяет каждую операцию, если в транзакции происходит ошибка. Следовательно, операции БД, которые мы помещаем в транзакцию, должны быть обратимыми — по большей части, нам не нужно беспокоиться об этом с PSQL. Но как насчет других кодов?
Часто, когда мы меняем данные, нам нужно выполнить последующие задачи, такие как запуск событий, обновление служб, отправка push-уведомлений и т. д. Эти задачи НЕобратимы — мы не можем отменить отправку события, запроса или уведомления. Если происходит ошибка, изменения данных откатываются, но мы уже отправили push-уведомление со следующим текстом: «Ваш отчет был сформирован; нажмите здесь, чтобы просмотреть его». Что происходит, когда пользователь или другие службы действуют на основе этой ложной информации? Будет каскад сбоев. Поэтому любой код, который нельзя отменить, не должен находиться в транзакции, иначе мы рискуем оставить нашу систему в плохом состоянии, когда в транзакции возникает ошибка.
Это вещи, которые, в зависимости от того, сколько данных обрабатывается и трафика БД, могут вызывать сбои из-за слишком долгого удержания блокировок. Все эти вещи хороши, если они не занимают слишком много времени.
Медленные запросы — Извлечение данных обычно происходит быстро, за исключением этих трех сценариев. Эти запросы замедлят транзакцию и увеличат время удержания блокировок, что окажет неблагоприятное воздействие на других пользователей.
@transaction.atomic def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # do the transactional work with the large_orders queryset
# fixed def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # Start the transaction block with transaction.atomic(): # do the transactional work with the large_orders queryset
Операции над несколькими таблицами — транзакция с операциями над несколькими таблицами может заблокировать каждую таблицу до тех пор, пока она не будет выполнена. Это особенно распространено в миграциях Django — еще одна причина делать миграции небольшими и сосредоточенными на одной или нескольких таблицах за раз.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # too many operations operations = [ migrations.RemoveField("Author", "age"), migrations.AddField("Author", "rating", models.IntegerField(default=0)), migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
# fixed # 1st migration class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] operations = [ migrations.RemoveField("Author", "age"), ] # 2nd migration class Migration(migrations.Migration): dependencies = [("migrations", "0002_initial")] operations = [ migrations.AddField("Author", "rating", models.IntegerField(default=0)), ] # 3rd migration class Migration(migrations.Migration): dependencies = [("migrations", "0003_initial")] operations = [ migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
Миграции данных — поскольку транзакции удерживают блокировку с момента выполнения запроса до тех пор, пока транзакция не завершится неудачей или не будет завершена, миграция, которая выполняется для каждой строки в таблице, в конечном итоге заблокирует всю таблицу, либо запретив чтение, либо запись для каждой строки.
def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction with transaction.atomic(): # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") legacy = user.legacy_profile legacy.update_new_user_profile()
# fixed def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") with transaction.atomic(): legacy = user.legacy_profile legacy.update_new_user_profile()
Запустите их позже. Эти запросы необходимы, НО нам не нужно запускать их в рабочее время. Лучшая политика здесь — снизить риск сбоя, определив, насколько важна таблица, оценив, сколько времени может занять миграция, выполнив миграцию, когда в базе данных наименьший трафик, и подготовив план отката.
Сократить время выполнения транзакции. Это можно сделать, разбив таблицу на разделы и запустив миграцию на отдельные разделы. Разделы PSQL и Django
*Django оборачивает транзакции только вокруг миграций для PSQL и SQLite.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # this migration, if on a large table, can slow down and block other operations # do it later operations = [ migrations.RemoveField("Users", "middle_name"), ]
Необратимые операции — все в транзакции должно быть обратимым, если транзакция откатывается; если мы помещаем вызов API в транзакцию, его нельзя отменить.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) async_notification_service.send_email(user.email, "You can login now!") Account.objects.create(user=user, balance=0) # rest of user creation proccess
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # the transaction is still in progress, so it can still be rolled back, it is not # committed until the transaction block is exited, so putting the notification here # is not a good idea - especially if the job starts immediately tries to read the data # this creates a race condition async_notification_service.send_email(user.email, "You can login now!") def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))
Блокирующие вызовы — поскольку транзакции не позволяют всем другим запросам работать с таблицами/строками, которые изменяет транзакция, любой код, увеличивающий продолжительность транзакции, приведет к блокировке базы данных, что приведет к тайм-аутам и зависанию приложений, зависящих от базы данных.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) for file_data in user_files: # transaction waits for this upload and so do all other connections that need access to table/rows the transaction # uses url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) Account.objects.create(user=user, balance=0) # rest of user creation proccess
# not bad def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) # best fix from functools import partial def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # partials create a callable with the function and arguments # so that the function is called with the arguments when the transaction is committed # TODO: diff between partial and lambda here??? transaction.on_commit(partial(create_user_files, user_files, user)) def create_user_files(user_files, user): for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url)
В следующем посте мы углубимся в PSQL и выясним: