paint-brush
Python - Django: トランザクションにこのようなものを入れるべきではない@mta
新しい歴史

Python - Django: トランザクションにこのようなものを入れるべきではない

Michael T. Andemeskel17m2025/03/04
Read on Terminal Reader

長すぎる; 読むには

トランザクションは Django アプリの中心ですが、トランザクションに間違ったコードを入れると、停止が発生する可能性があります。
featured image - Python - Django: トランザクションにこのようなものを入れるべきではない
Michael T. Andemeskel HackerNoon profile picture


コンテンツ

  • パート 1 のまとめ - Django トランザクションの詳細
  • TLDR、要約
  • トランザクションによってアプリにどのような悪影響が出るのでしょうか?
  • 危険
  • 次へ - PSQL コマンドが互いにブロックする仕組み
  • 出典

パート 1 のまとめ - Django トランザクションの詳細

前回の投稿では、 transaction.atomicで装飾された関数が呼び出されたときに何が起こるか、またwith transaction.atomic()何が起こるかを学びました。要約すると、次のようになります。


  1. DB への接続が作成または取得されます。
  2. トランザクションが作成されます (例: BEGIN;が DB に送信されます (DB が PSQL または別の SQL バリアントの場合))。
  3. 今から関数が存在するか、または with ステートメントが終了するまで (エラーがあっても正常であっても)、トランザクションが実行され、DB が待機します。
  4. これは、(少なくとも PSQL のような ACID DB の場合) トランザクションが変更中のテーブルと行のロックを保持することを意味します。
  5. Django DB 操作が実行されると、DB は対応するロックを取得し、そのテーブルまたは行に対する競合する操作を防止します。
  6. これにより、ロックが解除されるのを待機するため、他の接続がタイムアウトになる可能性があります。
  7. 操作が失敗した場合、または実行時エラーが発生した場合、DB はトランザクション全体をロールバックし、ロックを解除します。
  8. トランザクション全体が成功すると、すべての変更がコミットされ、他の DB 接続で利用できるようになります。
  9. ロックが解除されました。


ここで、トランザクションに何を入れるべきか、また何を避けるべきかについて説明します。以下のトランザクションの動作により、特定の操作はトランザクション ブロックに配置すると危険になります。


  • トランザクションは、失敗するか完了するまでロックを保持します。
  • トランザクションは失敗すると、すべての DB 操作を元に戻します。
  • トランザクションは、DB 操作が実行されるとすぐにロックを要求します。

TLDR、要約

  • DB での元に戻せる操作- DB 操作を元に戻すことができず、トランザクションが失敗した場合、トランザクションは変更を自動的にロールバックしようとしますが、変更は元に戻せないため失敗し、DB は不良な状態のままになります。
  • DB 上で必要な関連操作- ユーザー レコードがないと新しい銀行口座レコードを作成できないため、両方を同じトランザクションで作成する必要があります。
  • 可逆的かつ関連するビジネス ロジック- 預金を含む新しいアカウント レコードを作成した後、顧客の合計残高を計算します (巧妙なモデリングと調整により、これもトランザクションから移動できます)。

危険

  • 低速クエリ- 通常、データの取得は高速ですが、次の 3 つのシナリオを除きます。これらのクエリはトランザクションを遅くし、ロックを保持する時間を延長するため、他のユーザーに悪影響を及ぼします。
  • 複数のテーブルに対する操作- 複数のテーブルに対する操作を含むトランザクションは、完了するまで各テーブルをロックできます。これは、Django の移行で特によく見られます。移行を小さく保ち、一度に 1 つまたは少数のテーブルに集中させるもう 1 つの理由です。
  • データ移行- トランザクションはクエリが実行されてからトランザクションが失敗または終了するまでロックを保持するため、テーブル内のすべての行に対して実行される移行では、各行の読み取りまたは書き込みが防止され、テーブル全体がロックされることになります。
  • (PSQL および SQLite のみ*)テーブルまたは列の変更- これらの操作には最も厳格な形式のロックが必要なため、テーブル全体の読み取り/書き込みが防止されます。これらの操作は、停止を引き起こす可能性が最も高くなります。

  • 元に戻せない操作- トランザクションがロールバックされた場合、トランザクション内のすべての操作は元に戻せる必要があります。トランザクションに API 呼び出しを含めた場合、元に戻すことはできません。
  • 呼び出しのブロック- トランザクションは、トランザクションが変更しているテーブル/行に対して他のすべてのクエリが操作されるのを防ぐため、トランザクションの期間を延長するコードによって DB がロックされ、DB に依存するアプリでタイムアウトが発生し、応答しなくなります。


代替案とコード例については読み続けてください。

トランザクションはアプリにどのような悪影響を与えるのでしょうか?

主なリスクは、テーブルや行に対する競合する操作を防ぎ、トランザクションを元に戻せるようにするために、トランザクションが完了するまでロックを保持することです。これは、トランザクション内の DB 操作をアトミックにするために不可欠です。つまり、複数のテーブルまたはいくつかの重要なテーブルを操作する長時間実行トランザクションは、ロックを独占し、それらのテーブル/行への読み取り/書き込みを妨げることで、停止を引き起こす可能性があります。


本質的には、トランザクション ブロックに間違ったコードを配置すると、DB への他のすべての接続が操作を実行できないようにブロックされ、DB が事実上ダウンする可能性があります。


2 つ目のリスクは、トランザクションは元に戻せる必要があり、元に戻せることが期待されていることです。トランザクションでエラーが発生すると、DB はすべての操作を自動的に元に戻します。したがって、トランザクションに含める DB 操作は元に戻せる必要があります。ほとんどの場合、PSQL ではこの点について心配する必要はありません。しかし、他のコードはどうでしょうか。


多くの場合、データを変更すると、イベントの起動、サービスの更新、プッシュ通知の送信などのフォローアップ タスクを実行する必要があります。これらのタスクは元に戻せません。つまり、イベント、リクエスト、または通知の送信を取り消すことはできません。エラーが発生した場合、データの変更はロールバックされますが、「レポートが生成されました。ここをクリックして表示してください」というプッシュ通知はすでに送信されています。ユーザーまたは他のサービスがこの誤った情報に基づいて行動するとどうなるでしょうか。障害が連鎖的に発生します。したがって、元に戻せないコードはトランザクションに含めないでください。そうしないと、トランザクションでエラーが発生したときにシステムが不良な状態になるリスクがあります。

  • DB での元に戻せる操作- DB 操作を元に戻すことができず、トランザクションが失敗した場合、トランザクションは変更を自動的にロールバックしようとしますが、変更は元に戻せないため失敗し、DB は不良な状態のままになります。
  • DB 上で必要な関連操作- ユーザー レコードがないと新しい銀行口座レコードを作成できないため、両方を同じトランザクションで作成する必要があります。
  • 可逆的かつ関連するビジネス ロジック- 預金を含む新しいアカウント レコードを作成した後、顧客の合計残高を計算します (巧妙なモデリングと調整により、これもトランザクションから移動できます)。

危険

これらは、処理されるデータの量や DB トラフィックによっては、ロックを長時間保持することで停止を引き起こす可能性があります。これらはすべて、時間がかかりすぎなければ問題ありません。


  • 低速クエリ— 通常、データの取得は高速ですが、次の 3 つのシナリオを除きます。これらのクエリはトランザクションを遅くし、ロックを保持する時間を長くするため、他のユーザーに悪影響を及ぼします。

      • インデックスのない列に対するクエリ
      • 大きなテーブルに対するクエリ
      • 結合
    @transaction.atomic def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # do the transactional work with the large_orders queryset
    • 代替
      • これらのクエリはトランザクションの前と外で実行します。
 # fixed def process_large_order_report(start_date, end_date, min_order_value=1000): # Complex query with multiple joins and aggregations large_orders = Order.objects.filter( created_at__range=(start_date, end_date), total_amount__gte=min_order_value, status='completed' ).select_related( 'customer', 'shipping_address', 'billing_address' ).prefetch_related( 'items__product__category', 'items__product__supplier' ).annotate( item_count=Count('items'), total_weight=Sum('items__product__weight'), discount_percentage=F('discount_amount') * 100 / F('total_amount') ).filter( # Additional complex filtering Q(customer__user__is_active=True) & (Q(items__product__category__name='Electronics') | Q(items__product__category__name='Furniture')) & ~Q(shipping_address__country='US') ).order_by('-total_amount') # Start the transaction block with transaction.atomic(): # do the transactional work with the large_orders queryset


  • 複数のテーブルに対する操作- 複数のテーブルに対する操作を含むトランザクションは、完了するまで各テーブルをロックできます。これは、Django の移行で特によく見られます。移行を小さく保ち、一度に 1 つまたは少数のテーブルに集中させるもう 1 つの理由です。


      • テーブルまたは列の構造の変更を参照してください
    class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # too many operations operations = [ migrations.RemoveField("Author", "age"), migrations.AddField("Author", "rating", models.IntegerField(default=0)), migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]
    • 代替
      • トランザクションを複数の小さなトランザクションに分割し、それらを oncommit コールバックで連鎖します。
 # fixed # 1st migration class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] operations = [ migrations.RemoveField("Author", "age"), ] # 2nd migration class Migration(migrations.Migration): dependencies = [("migrations", "0002_initial")] operations = [ migrations.AddField("Author", "rating", models.IntegerField(default=0)), ] # 3rd migration class Migration(migrations.Migration): dependencies = [("migrations", "0003_initial")] operations = [ migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)), ]


  • データ移行- トランザクションはクエリが実行されてからトランザクションが失敗または終了するまでロックを保持するため、テーブル内のすべての行に対して実行される移行では、各行の読み取りまたは書き込みが防止され、テーブル全体がロックされることになります。

     def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction with transaction.atomic(): # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") legacy = user.legacy_profile legacy.update_new_user_profile()
    • 代替
      • トランザクションを移行全体ではなく、個々の操作にラップします。トランザクション内の各行への更新を配置することで、ロックを短時間だけ保持します。
 # fixed def migrate_user_profiles(): # Get all users with legacy profiles users_with_profiles = User.objects.filter( legacy_profile__isnull=False ).select_related('legacy_profile') # Process all users in a single transaction # Track progress total = users_with_profiles.count() print(f"Migrating {total} user profiles...") # Process each user for i, user in enumerate(users_with_profiles): if i % 100 == 0: print(f"Processed {i}/{total} profiles") with transaction.atomic(): legacy = user.legacy_profile legacy.update_new_user_profile()


  • (PSQL および SQLiteのみ*)テーブルまたは列の変更- これらの操作には最も厳格な形式のロックが必要なため、テーブル全体の読み取り/書き込みが防止されます。これらの操作は、停止を引き起こす可能性が最も高くなります。
      • 列の変更
      • テーブルの変更
    • 代替
      • 後で実行します。これらのクエリは必要ですが、業務時間中に実行する必要はありません。ここでの最善のポリシーは、テーブルの重要性を判断し、移行にかかる時間を見積もり、DB のトラフィックが最も少ないときに移行を実行し、ロールバック プランを準備することで、停止のリスクを軽減することです。

      • トランザクションにかかる時間を短縮します。これは、テーブルをパーティション分割し、個々のパーティションで移行を実行することで実現できます。PSQLパーティションと Django


*Django は、PSQL および SQLite の移行に対してのみトランザクションをラップします。

 class Migration(migrations.Migration): dependencies = [("migrations", "0001_initial")] # this migration, if on a large table, can slow down and block other operations # do it later operations = [ migrations.RemoveField("Users", "middle_name"), ]


  • 元に戻せない操作- トランザクションがロールバックされた場合、トランザクション内のすべての操作は元に戻せる必要があります。トランザクションに API 呼び出しを含めた場合、元に戻すことはできません。

      • キューにイベントを追加して他のイベントをトリガーする
      • API 呼び出しの送信 - ステータスの更新、ジョブのトリガーなど
    def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) async_notification_service.send_email(user.email, "You can login now!") Account.objects.create(user=user, balance=0) # rest of user creation proccess
    • 代替案
      • トランザクションの oncommit コールバックで実行する必要がある重要なビジネス ロジックを実行します。oncommit コールバックは、トランザクションが成功した後に必ず呼び出され、トランザクションからのすべての更新は oncommit コールバックで利用できます。
      • 操作を元に戻せるようにすることもできます。つまり、イベントを削除したり、元に戻す API 呼び出しを送信したりする方法を作成します。これは、イベント キューまたは API を所有するチームにとって簡単な作業ではありません。お勧めしません。
 def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # the transaction is still in progress, so it can still be rolled back, it is not # committed until the transaction block is exited, so putting the notification here # is not a good idea - especially if the job starts immediately tries to read the data # this creates a race condition async_notification_service.send_email(user.email, "You can login now!") def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))


  • 呼び出しのブロック- トランザクションは、トランザクションが変更しているテーブル/行に対して他のすべてのクエリが操作されるのを防ぐため、トランザクションの期間を延長するコードによって DB がロックされ、DB に依存するアプリでタイムアウトが発生し、応答しなくなります。


      • ネットワーク呼び出し - トランザクションに入れるデータを取得するために API にリクエストを送信します。
      • ログ記録はこれに該当します。使用するロガーに応じて、ライブラリはトランザクション中にログを送信するか、データドッグを使用している場合は別のプロセスでログを保存しますが、バッチジョブがログをファイルに保存するまで、ログをメモリに保存するという代償を払うことになります。
      • ディスク操作 - CSV をロードしてテーブルに挿入するか、テーブルを CSV にエクスポートします。
      • CPU を大量に使用するタスク — 行列の乗算や、負荷の高い計算/データ変換は、CPU とその他すべての操作をブロックします — Python のグローバル インタープリタ ロックは、すべてのスレッドが CPU を 1 つずつ使用するように強制し、Django は単一処理です (トランザクション内のすべての操作はシリアルに実行されます)。
     def transaction(user_data, user_files): with transaction.atomic(): user = User.objects.create(**user_data) for file_data in user_files: # transaction waits for this upload and so do all other connections that need access to table/rows the transaction # uses url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) Account.objects.create(user=user, balance=0) # rest of user creation proccess
    • 代替案
      • トランザクションの前にデータを準備します - 事前にディスク/ネットワーク データをロードします。データの準備ができたら、それを使用してトランザクションを実行します。
      • プレースホルダー データを使用する - 簡単に取得および識別できる、明らかなプレースホルダー データ (実稼働データと間違える可能性がない) のセットを作成します。トランザクションの前にそれをロードして使用します。
      • データを生成する - フィールドに一意性の制約があり、プレースホルダー データを使用できない場合。
      • これは危険です。疑似ランダム アルゴリズムの性質上、予期しない障害が発生する可能性があるため、予期しない障害を回避するには、適切なシードを持つ適切なランダム アルゴリズムを使用してください。
      • 制約を削除または変更する - 制約によってレコードを安全に作成できない場合は、スキーマに問題があります。制約を、レコードがいつ完了し、監視する必要があるかを指定する状態列に依存させます。
 # not bad def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url) # best fix from functools import partial def transaction(user_data, user_files): user = None with transaction.atomic(): user = User.objects.create(**user_data) Account.objects.create(user=user, balance=0) # rest of user creation proccess # partials create a callable with the function and arguments # so that the function is called with the arguments when the transaction is committed # TODO: diff between partial and lambda here??? transaction.on_commit(partial(create_user_files, user_files, user)) def create_user_files(user_files, user): for file_data in user_files: url = Cloudinary.upload_file(file_data['data']) Files.objects.create(**file_data['meta_data'], user=user, url=url)


次へ — PSQL コマンドが互いにブロックする仕組み

次の投稿では、PSQL について詳しく調べて、次のことを調べます。

  • さまざまなコマンドがテーブルと行をロックする方法
  • 実行するのが最もリスクの高いコマンドはどれか

出典

  • アトミック
  • Django の移行とトランザクション
    • DDL トランザクションをサポートするデータベース (SQLite および PostgreSQL) では、すべての移行操作はデフォルトで単一のトランザクション内で実行されます。一方、データベースが DDL トランザクションをサポートしていない場合 (MySQL、Oracle など)、すべての操作はトランザクションなしで実行されます。
    • アトミック属性を False に設定することで、トランザクション内で移行が実行されないようにすることができます。例:
    • また、atomic()を使用するか、RunPythonにatomic=Trueを渡すことで、トランザクション内で移行の一部を実行することも可能である。
  • PSQL パーティションと Django
    • Django の ORM にはパーティション化されたテーブルのサポートが組み込まれていないため、アプリケーションでパーティションを使用する場合は、少し余分な作業が必要になります。
    • パーティションを使用する 1 つの方法は、生の SQL を実行する独自の移行を実行することです。これは機能しますが、将来テーブルに加えるすべての変更の移行を手動で管理する必要があることを意味します。
    • もう 1 つのオプションは、django-postgres-extra というパッケージを使用することです。Django-postgres-extra は、TRUNCATE TABLE やテーブル パーティションのサポートなど、Django の ORM に組み込まれていないいくつかの PostgreSQL 機能のサポートを提供します。
  • PSQL トランザクション
  • Python: Django トランザクションの仕組み