У попередній публікації ми дізналися, що відбувається, коли викликається функція, оформлена transaction.atomic
, і що відбувається with transaction.atomic()
. Підсумовуючи:
BEGIN;
надсилається до БД (якщо БД є PSQL або іншим варіантом SQL).
Тепер ми обговоримо, що включати в транзакцію, а чого уникати. Через наведену нижче поведінку транзакцій певні операції є небезпечними, якщо їх помістити в блок транзакцій.
Продовжуйте читати альтернативи та приклади коду.
Основний ризик полягає в тому, що транзакції утримують блокування, доки вони не будуть виконані, щоб запобігти конфліктним операціям над таблицями та рядками та дозволити транзакції бути оборотною - це важливо, щоб зробити операції БД у транзакції атомарними. Це означає, що тривала транзакція, яка працює з кількома таблицями або кількома критичними, може призвести до збоїв через блокування та запобігання читанню/запису в ці таблиці/рядки.
По суті, якщо ми розмістимо неправильний код у блоці транзакції, ми можемо ефективно знищити БД, заблокувавши всі інші підключення до БД від виконання операцій над нею.
Вторинний ризик полягає в тому, що транзакції повинні бути оборотними, і ОЧІКУЄТЬСЯ, що вони будуть оборотними. БД автоматично скасовує кожну операцію, якщо в транзакції виникає помилка. Таким чином, операції БД, які ми вставляємо в транзакцію, повинні бути оборотними - здебільшого нам не потрібно турбуватися про це з PSQL. Але як щодо інших кодів?
Часто, коли ми змінюємо наші дані, нам потрібно виконувати наступні завдання, як-от запуск подій, оновлення служб, надсилання push-сповіщень тощо. Ці завдання НЕ можна відмінити – ми не можемо скасувати надсилання події, запиту чи сповіщення. Якщо виникає помилка, зміни даних скасовуються, але ми вже надіслали push-повідомлення з повідомленням: «Ваш звіт створено; натисніть тут, щоб переглянути його». Що відбувається, коли користувач або інші служби діють на основі цієї неправдивої інформації? Буде каскад невдач. Таким чином, будь-який код, який не можна скасувати, не повинен бути в транзакції, інакше ми ризикуємо залишити нашу систему в поганому стані, коли в транзакції станеться помилка.
Це речі, які, залежно від того, скільки даних обробляється та трафіку БД, можуть спричинити збої через надто тривале утримання блокувань. Усе це добре, якщо не займає надто багато часу.
Повільні запити — отримання даних зазвичай відбувається швидко, за винятком цих трьох сценаріїв. Ці запити сповільнять транзакцію та подовжать час утримання блокувань, що матиме негативний вплив на інших користувачів.
@transaction.atomic def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # do the transactional work with the large_orders queryset
# fixed def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # Start the transaction block with transaction.atomic(): # do the transactional work with the large_orders queryset
Операції над кількома таблицями — транзакція з операціями над кількома таблицями може блокувати кожну таблицю, доки вона не буде виконана. Це особливо поширене в міграціях Django — ще одна причина, щоб міграції були невеликими та зосередженими на одній або кількох таблицях одночасно.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # too many operations operations = [ migrations.RemoveField("Author", "age"), migrations.AddField("Author", "rating", models.IntegerField(default=0)), migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
# fixed # 1st migration class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] operations = [ migrations.RemoveField("Author", "age"), ] # 2nd migration class Migration(migrations.Migration): dependencies = [("migrations", "0002_initial")] operations = [ migrations.AddField("Author", "rating", models.IntegerField(default=0)), ] # 3rd migration class Migration(migrations.Migration): dependencies = [("migrations", "0003_initial")] operations = [ migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
Міграція даних . Оскільки транзакції зберігаються на блокуванні з моменту виконання запиту до моменту збою або завершення транзакції, міграція, яка виконується з кожним рядком таблиці, призведе до блокування всієї таблиці шляхом запобігання читанню або запису в кожному рядку.
def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction with transaction.atomic(): # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") legacy = user.legacy_profile legacy.update_new_user_profile()
# fixed def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") with transaction.atomic(): legacy = user.legacy_profile legacy.update_new_user_profile()
Запустіть їх пізніше. Ці запити необхідні, АЛЕ нам не потрібно виконувати їх у робочий час. Найкраща політика тут полягає в тому, щоб зменшити ризик збою, визначивши, наскільки важливою є таблиця, оцінивши, скільки часу може зайняти міграція, виконавши міграцію, коли БД має найменший трафік, і підготувавши план відкату.
Скоротіть час, який займає транзакція. Це можна зробити шляхом розділення таблиці та запуску міграції на окремих розділах. Розділи PSQL і Django
*Django лише обертає транзакції навколо міграцій для PSQL і SQLite.
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # this migration, if on a large table, can slow down and block other operations # do it later operations = [ migrations.RemoveField("Users", "middle_name"), ]
Незворотні операції — все в транзакції має бути оборотним, якщо транзакцію буде відкочено; якщо ми додаємо виклик API до транзакції, його не можна скасувати.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) async_notification_service.send_email(user.email, "You can login now!") Account.objects.create(user=user, balance=0) # rest of user creation proccess
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # the transaction is still in progress, so it can still be rolled back, it is not # committed until the transaction block is exited, so putting the notification here # is not a good idea - especially if the job starts immediately tries to read the data # this creates a race condition async_notification_service.send_email(user.email, "You can login now!") def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))
Блокування викликів — оскільки транзакції перешкоджають роботі всіх інших запитів із таблицями/рядками, які змінює транзакція, будь-який код, який збільшує тривалість транзакції, призведе до блокування БД, викликаючи тайм-аути та невідповідь у програмах, залежних від БД.
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) for file_data in user_files: # transaction waits for this upload and so do all other connections that need access to table/rows the transaction # uses url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) Account.objects.create(user=user, balance=0) # rest of user creation proccess
# not bad def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) # best fix from functools import partial def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # partials create a callable with the function and arguments # so that the function is called with the arguments when the transaction is committed # TODO: diff between partial and lambda here??? transaction.on_commit(partial(create_user_files, user_files, user)) def create_user_files(user_files, user): for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url)
У наступній публікації ми зануримося в PSQL і дізнаємося: