Today, we are excited to announce the support of continuous updates for long-running pipelines in CocoIndex. This powerful feature automatically applies incremental source changes to keep your index up-to-date with minimal latency.
With continuous updates, your indexes remain synchronized with your source data in real time, ensuring that your applications always have access to the most current information without the performance overhead of full reindexing.
If you like our work, it would mean a lot to us if you could support Cocoindex on Github with a star. Thank you so much with a warm coconut hug 🥥🤗.
When to use it?
It fits into situations that you need to access the fresh target data continuously in most of the time.
How does it work?
It continuously captures changes from the source data and updates the target data accordingly. It's long-running and only stops when being aborted explicitly.
A data source may enable one or multiple change capture mechanisms:
CocoIndex supports two main categories of change detection mechanisms:
- General Mechanism:
- Refresh Interval: A universal approach applicable to all data sources, allowing periodic checks for changes by scanning and comparing the current state with the previous state.
- Source-Specific Mechanisms:
- Push Change Notification: Many data sources offer built-in mechanisms for real-time change notification which CocoIndex can subscribe to (coming soon!)
- Recent Changes Poll: Some data sources provide APIs for tracking recently modified entries, allowing efficient detection of changes; for example, Google Drive.
These mechanisms work together to ensure CocoIndex can detect and process changes as they happen, maintaining your index in perfect sync with source data with minimal latency and resource usage.
Under the hood, after the change is detected, CocoIndex will use its incremental processing mechanism to update the target data.
How to enable continuous updates?
Here is an example of how to enable continuous updates for Google Drive. It is pretty simple:
1. Configure the change capture mechanisms for the source
@cocoindex.flow_def(name="GoogleDriveIndex")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.GoogleDrive(
service_account_credential_path=credential_path,
root_folder_ids=root_folder_ids,
recent_changes_poll_interval=datetime.timedelta(seconds=10)),
refresh_interval=datetime.timedelta(minutes=1))
In this example, we've configured two change detection mechanisms:
-
recent_changes_poll_interval=datetime.timedelta(seconds=10)
: This is a Google Drive-specific mechanism that uses the Drive API's changes endpoint to efficiently detect modifications every 10 seconds. This is effecient fast scan to capture all latest modified files, and we could set it to a short interval to get fresher data. It doesn't capture file deletions, so we need the fallback mechanism to ensure all changes are eventually captured. -
refresh_interval=datetime.timedelta(minutes=1)
: This is the universal fallback mechanism that performs a complete scan of the data source every minute. This is to scan all the files, to ensure all the changes - including the deleted files, are captured.The
refresh_interval
parameter is particularly important as it serves as a safety net to ensure all changes are eventually captured, even if source-specific mechanisms miss something. It works by:- Periodically scanning the list of entries (e.g. file list with metadata like last modified time) at the specified interval
- Comparing the current list with the previously known list
- Identifying any differences (additions, modifications, deletions)
- Triggering updates only for the changed items
While source-specific mechanisms like recent_changes_poll_interval
are more efficient for near real-time updates, the refresh_interval
provides comprehensive coverage. We recommend setting it to a reasonable value based on your freshness requirements and resource constraints - shorter intervals provide fresher data but consume more resources.
You can read the full documentation:
- for source specific change detection mechanisms here. Different sources have different mechanisms supported.
- for the universal fallback mechanism here.
2. Run the flow in live update mode
Option 1: CLI
Add a @cocoindex.main_fn()
decorator to your main function, so CocoIndex CLI will take over the control (when cocoindex is the first command line argument).
@cocoindex.main_fn()
def main():
pass
if __name__ == "__main__":
main()
To run the CLI with live update mode, you can use the following command:
python main.py cocoindex update -L
This will start the flow in live update mode, which will continuously capture changes from the source and update the target data accordingly.
Option 2: Python API
You can create cocoindex.FlowLiveUpdater
. For example,
@cocoindex.main_fn()
async def main():
my_updater = cocoindex.FlowLiveUpdater(demo_flow)
await my_updater.wait()
...
if __name__ == "__main__":
asyncio.run(main())
And you run with the flow with
python main.py
You can also use the updater as a context manager. It will abort and wait for the updater to finish automatically when the context is exited. See full documentation here.
Now you are all set! It is super simple to get started and have continuous updates for your data. Get started now quickstart guide 🚀
It would mean a lot to us if you could support Cocoindex on Github with a star if you like our work. Thank you so much with a warm coconut hug 🥥🤗.