前回の投稿では、 transaction.atomic
で装飾された関数が呼び出されたときに何が起こるか、またwith transaction.atomic()
何が起こるかを学びました。要約すると、次のようになります。
BEGIN;
が DB に送信されます (DB が PSQL または別の SQL バリアントの場合))。
ここで、トランザクションに何を入れるべきか、また何を避けるべきかについて説明します。以下のトランザクションの動作により、特定の操作はトランザクション ブロックに配置すると危険になります。
代替案とコード例については読み続けてください。
主なリスクは、テーブルや行に対する競合する操作を防ぎ、トランザクションを元に戻せるようにするために、トランザクションが完了するまでロックを保持することです。これは、トランザクション内の DB 操作をアトミックにするために不可欠です。つまり、複数のテーブルまたはいくつかの重要なテーブルを操作する長時間実行トランザクションは、ロックを独占し、それらのテーブル/行への読み取り/書き込みを妨げることで、停止を引き起こす可能性があります。
本質的には、トランザクション ブロックに間違ったコードを配置すると、DB への他のすべての接続が操作を実行できないようにブロックされ、DB が事実上ダウンする可能性があります。
2 つ目のリスクは、トランザクションは元に戻せる必要があり、元に戻せることが期待されていることです。トランザクションでエラーが発生すると、DB はすべての操作を自動的に元に戻します。したがって、トランザクションに含める DB 操作は元に戻せる必要があります。ほとんどの場合、PSQL ではこの点について心配する必要はありません。しかし、他のコードはどうでしょうか。
多くの場合、データを変更すると、イベントの起動、サービスの更新、プッシュ通知の送信などのフォローアップ タスクを実行する必要があります。これらのタスクは元に戻せません。つまり、イベント、リクエスト、または通知の送信を取り消すことはできません。エラーが発生した場合、データの変更はロールバックされますが、「レポートが生成されました。ここをクリックして表示してください」というプッシュ通知はすでに送信されています。ユーザーまたは他のサービスがこの誤った情報に基づいて行動するとどうなるでしょうか。障害が連鎖的に発生します。したがって、元に戻せないコードはトランザクションに含めないでください。そうしないと、トランザクションでエラーが発生したときにシステムが不良な状態になるリスクがあります。
これらは、処理されるデータの量や DB トラフィックによっては、ロックを長時間保持することで停止を引き起こす可能性があります。これらはすべて、時間がかかりすぎなければ問題ありません。
低速クエリ— 通常、データの取得は高速ですが、次の 3 つのシナリオを除きます。これらのクエリはトランザクションを遅くし、ロックを保持する時間を長くするため、他のユーザーに悪影響を及ぼします。
@transaction.atomic def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # do the transactional work with the large_orders queryset
# fixed def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # Start the transaction block with transaction.atomic(): # do the transactional work with the large_orders queryset
複数のテーブルに対する操作- 複数のテーブルに対する操作を含むトランザクションは、完了するまで各テーブルをロックできます。これは、Django の移行で特によく見られます。移行を小さく保ち、一度に 1 つまたは少数のテーブルに集中させるもう 1 つの理由です。
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # too many operations operations = [ migrations.RemoveField("Author", "age"), migrations.AddField("Author", "rating", models.IntegerField(default=0)), migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
# fixed # 1st migration class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] operations = [ migrations.RemoveField("Author", "age"), ] # 2nd migration class Migration(migrations.Migration): dependencies = [("migrations", "0002_initial")] operations = [ migrations.AddField("Author", "rating", models.IntegerField(default=0)), ] # 3rd migration class Migration(migrations.Migration): dependencies = [("migrations", "0003_initial")] operations = [ migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
データ移行- トランザクションはクエリが実行されてからトランザクションが失敗または終了するまでロックを保持するため、テーブル内のすべての行に対して実行される移行では、各行の読み取りまたは書き込みが防止され、テーブル全体がロックされることになります。
def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction with transaction.atomic(): # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") legacy = user.legacy_profile legacy.update_new_user_profile()
# fixed def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") with transaction.atomic(): legacy = user.legacy_profile legacy.update_new_user_profile()
後で実行します。これらのクエリは必要ですが、業務時間中に実行する必要はありません。ここでの最善のポリシーは、テーブルの重要性を判断し、移行にかかる時間を見積もり、DB のトラフィックが最も少ないときに移行を実行し、ロールバック プランを準備することで、停止のリスクを軽減することです。
トランザクションにかかる時間を短縮します。これは、テーブルをパーティション分割し、個々のパーティションで移行を実行することで実現できます。PSQLパーティションと Django
*Django は、PSQL および SQLite の移行に対してのみトランザクションをラップします。
class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # this migration, if on a large table, can slow down and block other operations # do it later operations = [ migrations.RemoveField("Users", "middle_name"), ]
元に戻せない操作- トランザクションがロールバックされた場合、トランザクション内のすべての操作は元に戻せる必要があります。トランザクションに API 呼び出しを含めた場合、元に戻すことはできません。
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) async_notification_service.send_email(user.email, "You can login now!") Account.objects.create(user=user, balance=0) # rest of user creation proccess
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # the transaction is still in progress, so it can still be rolled back, it is not # committed until the transaction block is exited, so putting the notification here # is not a good idea - especially if the job starts immediately tries to read the data # this creates a race condition async_notification_service.send_email(user.email, "You can login now!") def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))
呼び出しのブロック- トランザクションは、トランザクションが変更しているテーブル/行に対して他のすべてのクエリが操作されるのを防ぐため、トランザクションの期間を延長するコードによって DB がロックされ、DB に依存するアプリでタイムアウトが発生し、応答しなくなります。
def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) for file_data in user_files: # transaction waits for this upload and so do all other connections that need access to table/rows the transaction # uses url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) Account.objects.create(user=user, balance=0) # rest of user creation proccess
# not bad def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) # best fix from functools import partial def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # partials create a callable with the function and arguments # so that the function is called with the arguments when the transaction is committed # TODO: diff between partial and lambda here??? transaction.on_commit(partial(create_user_files, user_files, user)) def create_user_files(user_files, user): for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url)
次の投稿では、PSQL について詳しく調べて、次のことを調べます。