paint-brush
Uso de MinIO para crear una aplicación de chat de generación aumentada de recuperaciónpor@minio
5,705 lecturas
5,705 lecturas

Uso de MinIO para crear una aplicación de chat de generación aumentada de recuperación

por MinIO21m2024/09/18
Read on Terminal Reader

Demasiado Largo; Para Leer

La creación de una aplicación RAG de nivel de producción exige una infraestructura de datos adecuada para almacenar, versionar, procesar, evaluar y consultar fragmentos de datos que componen su corpus propietario.
featured image - Uso de MinIO para crear una aplicación de chat de generación aumentada de recuperación
MinIO HackerNoon profile picture
0-item


Se ha dicho a menudo que en la era de la IA, los datos son el foso. Por ello, crear una aplicación RAG de nivel de producción exige una infraestructura de datos adecuada para almacenar, versionar, procesar, evaluar y consultar fragmentos de datos que componen su corpus propietario. Dado que MinIO adopta un enfoque de IA que prioriza los datos, nuestra recomendación inicial predeterminada de infraestructura para un proyecto de este tipo es configurar un Modern Data Lake (MinIO) y una base de datos vectorial. Si bien es posible que sea necesario conectar otras herramientas auxiliares a lo largo del camino, estas dos unidades de infraestructura son fundamentales. Servirán como centro de gravedad para casi todas las tareas que se encuentren posteriormente para poner en producción su aplicación RAG.


Pero te encuentras en un dilema. Has oído hablar de estos términos LLM y RAG antes, pero no te has aventurado mucho más allá de eso debido a lo desconocido. Pero ¿no sería genial si hubiera una aplicación de tipo "Hola mundo" o estándar que pudiera ayudarte a empezar?


No se preocupen, yo estaba en la misma situación. Por eso, en este blog, demostraremos cómo usar MinIO para crear una aplicación de chat basada en Retrieval Augmented Generation (RAG) utilizando hardware básico.


  • Utilice MinIO para almacenar todos los documentos, fragmentos procesados y las incrustaciones utilizando la base de datos vectorial.


  • Utilice la función de notificación de depósito de MinIO para activar eventos al agregar o eliminar documentos de un depósito


  • Webhook que consume el evento y procesa los documentos utilizando Langchain y guarda los metadatos y los documentos fragmentados en un depósito de metadatos


  • Activar eventos de notificación de depósito MinIO para documentos fragmentados recientemente agregados o eliminados


  • Un webhook que consume los eventos y genera incrustaciones y las guarda en la base de datos vectorial (LanceDB) que se almacena en MinIO


Herramientas clave utilizadas

  • MinIO - Almacén de objetos para conservar todos los datos
  • LanceDB : base de datos vectorial de código abierto sin servidor que conserva los datos en un almacén de objetos
  • Ollama : para ejecutar LLM y incrustar modelos localmente (compatible con API de OpenAI)
  • Gradio - Interfaz a través de la cual interactuar con la aplicación RAG
  • FastAPI : servidor para los webhooks que reciben notificaciones de MinIO y exponen la aplicación Gradio
  • LangChain y Unstructured : para extraer texto útil de nuestros documentos y dividirlos en fragmentos para incrustarlos


Modelos utilizados

Iniciar MinIO Server

Puedes descargar el binario si aún no lo tienes desde aquí


 # Run MinIO detached !minio server ~/dev/data --console-address :9090 &


Iniciar el servidor Ollama + Descargar LLM y modelo de integración

Descarga Ollama desde aquí


 # Start the Server !ollama serve


 # Download Phi-3 LLM !ollama pull phi3:3.8b-mini-128k-instruct-q8_0


 # Download Nomic Embed Text v1.5 !ollama pull nomic-embed-text:v1.5


 # List All the Models !ollama ls


Cree una aplicación Gradio básica con FastAPI para probar el modelo

 LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0" EMBEDDING_MODEL = "nomic-embed-text:v1.5" LLM_ENDPOINT = "http://localhost:11434/api/chat" CHAT_API_PATH = "/chat" def llm_chat(user_question, history): history = history or [] user_message = f"**You**: {user_question}" llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "keep_alive": "48h", # Keep the model in-memory for 48 hours "messages": [ {"role": "user", "content": user_question } ]}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response


 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)

Modelo de prueba de incrustación

 import numpy as np EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings" EMBEDDINGS_DIM = 768 def get_embedding(text): resp = requests.post(EMBEDDING_ENDPOINT, json={"model": EMBEDDING_MODEL, "prompt": text}) return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)


 ## Test with sample text get_embedding("What is MinIO?")


Descripción general del proceso de ingestión

Crear depósitos MinIO

Utilice el comando mc o hágalo desde la interfaz de usuario

  • custom-corpus - Para almacenar todos los documentos
  • almacén: para almacenar todos los metadatos, fragmentos e incrustaciones vectoriales


 !mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'


 !mc mb myminio/custom-corpus !mc mb myminio/warehouse

Crear un webhook que consume notificaciones de buckets desde un bucket de corpus personalizado

 import json import gradio as gr import requests from fastapi import FastAPI, Request from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request): json_data = await request.json() print(json.dumps(json_data, indent=2)) with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


 ## Test with sample text get_embedding("What is MinIO?")


Crear notificaciones de eventos de MinIO y vincularlas a un depósito de corpus personalizado

Crear evento de Webhook

En la consola, vaya a Eventos-> Agregar destino de evento -> Webhook


Complete los campos con los siguientes valores y presione guardar


Identificador - doc-webhook


Punto final : http://localhost:8808/api/v1/document/notification


Haga clic en Reiniciar MinIO en la parte superior cuando se le solicite.


( Nota : También puedes usar mc para esto)

Vincular el evento de Webhook a los eventos del depósito de corpus personalizado

En la consola, vaya a Buckets (Administrador) -> corpus personalizado -> Eventos


Complete los campos con los siguientes valores y presione guardar


ARN - Seleccione el doc-webhook del menú desplegable


Seleccionar eventos - Marcar PUT y DELETE


( Nota : También puedes usar mc para esto)


Tenemos nuestra primera configuración de webhook

Ahora prueba agregando y quitando un objeto

Extraer datos de los documentos y fragmentos

Usaremos Langchain y Unstructured para leer un objeto de MinIO y dividir documentos en múltiples fragmentos.


 from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import S3FileLoader MINIO_ENDPOINT = "http://localhost:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" # Split Text from a given document using chunk_size number of characters text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=64, length_function=len) def split_doc_by_chunks(bucket_name, object_key): loader = S3FileLoader(bucket_name, object_key, endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) docs = loader.load() doc_splits = text_splitter.split_documents(docs) return doc_splits


 # test the chunking split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")

Añadir la lógica de fragmentación al webhook

Agregue la lógica del fragmento al webhook y guarde los metadatos y los fragmentos en el depósito del almacén


 import urllib.parse import s3fs METADATA_PREFIX = "metadata" # Using s3fs to save and delete objects from MinIO s3 = s3fs.S3FileSystem() # Split the documents and save the metadata to warehouse bucket def create_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"]) doc_splits = split_doc_by_chunks(bucket_name, object_key) for i, chunk in enumerate(doc_splits): source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json" with s3.open(source, "w") as f: f.write(chunk.json()) return "Task completed!" def delete_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True) return "Task completed!"

Actualizar el servidor FastAPI con la nueva lógica

 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)

Agregar nuevo webhook para procesar metadatos/fragmentos de documentos

Ahora que tenemos el primer webhook funcionando, el siguiente paso es obtener todos los fragmentos con metadatos, generar las incrustaciones y almacenarlas en la base de datos vectorial.



 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() print(json.dumps(json_data, indent=2)) @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


Crear notificaciones de eventos de MinIO y vincularlas al depósito del almacén

Crear evento de Webhook

En la consola, vaya a Eventos-> Agregar destino de evento -> Webhook


Complete los campos con los siguientes valores y presione guardar


Identificador - metadata-webhook


Punto final : http://localhost:8808/api/v1/metadata/notification


Haga clic en Reiniciar MinIO en la parte superior cuando se le solicite.


( Nota : También puedes usar mc para esto)

Vincular el evento de Webhook a los eventos del depósito de corpus personalizado

En la consola, vaya a Buckets (Administrador) -> Warehouse -> Eventos


Complete los campos con los siguientes valores y presione guardar


ARN - Seleccione el webhook de metadatos del menú desplegable


Prefijo - metadatos/


Sufijo - .json


Seleccionar eventos - Marcar PUT y DELETE


( Nota : También puedes usar mc para esto)


Tenemos nuestra primera configuración de webhook

Ahora pruebe agregando y eliminando un objeto en el corpus personalizado y vea si se activa este webhook.

Crear una base de datos vectorial LanceDB en MinIO

Ahora que tenemos el webhook básico funcionando, configuremos la base de datos vectorial lanceDB en el depósito de MinIO en el que guardaremos todas las incrustaciones y los campos de metadatos adicionales.


 import os import lancedb # Set these environment variables for the lanceDB to connect to MinIO os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT os.environ["ALLOW_HTTP"] = "True" db = lancedb.connect("s3://warehouse/v-db/")


 # list existing tables db.table_names()


 # Create a new table with pydantic schema from lancedb.pydantic import LanceModel, Vector import pyarrow as pa DOCS_TABLE = "docs" EMBEDDINGS_DIM = 768 table = None class DocsModel(LanceModel): parent_source: str # Actual object/document source source: str # Chunk/Metadata source text: str # Chunked text vector: Vector(EMBEDDINGS_DIM, pa.float16()) # Vector to be stored def get_or_create_table(): global table if table is None and DOCS_TABLE not in list(db.table_names()): return db.create_table(DOCS_TABLE, schema=DocsModel) if table is None: table = db.open_table(DOCS_TABLE) return table


 # Check if that worked get_or_create_table()


 # list existing tables db.table_names()

Agregar almacenamiento/eliminación de datos de lanceDB a metadata-webhook

 import multiprocessing EMBEDDING_DOCUMENT_PREFIX = "search_document" # Add queue that keeps the processed meteadata in memory add_data_queue = multiprocessing.Queue() delete_data_queue = multiprocessing.Queue() def create_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(bucket_name, object_key) with s3.open(f"{bucket_name}/{object_key}", "r") as f: data = f.read() chunk_json = json.loads(data) embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}") add_data_queue.put({ "text": chunk_json["page_content"], "parent_source": chunk_json.get("metadata", "").get("source", ""), "source": f"{bucket_name}/{object_key}", "vector": embeddings }) return "Metadata Create Task Completed!" def delete_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) delete_data_queue.put(f"{bucket_name}/{object_key}") return "Metadata Delete Task completed!"

Agregar un programador que procese datos de las colas

 from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd def add_vector_job(): data = [] table = get_or_create_table() while not add_data_queue.empty(): item = add_data_queue.get() data.append(item) if len(data) > 0: df = pd.DataFrame(data) table.add(df) table.compact_files() print(len(table.to_pandas())) def delete_vector_job(): table = get_or_create_table() source_data = [] while not delete_data_queue.empty(): item = delete_data_queue.get() source_data.append(item) if len(source_data) > 0: filter_data = ", ".join([f'"{d}"' for d in source_data]) table.delete(f'source IN ({filter_data})') table.compact_files() table.cleanup_old_versions() print(len(table.to_pandas())) scheduler = BackgroundScheduler() scheduler.add_job(add_vector_job, 'interval', seconds=10) scheduler.add_job(delete_vector_job, 'interval', seconds=10)

Actualización de FastAPI con los cambios en la incrustación de vectores

 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808) 




Ahora que tenemos el pipeline de ingestión funcionando, integremos el pipeline RAG final.

Añadir capacidad de búsqueda de vectores

Ahora que tenemos el documento ingresado en lanceDB, agreguemos la capacidad de búsqueda.


 EMBEDDING_QUERY_PREFIX = "search_query" def search(query, limit=5): query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}") res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit) return res


 # Lets test to see if it works res = search("What is MinIO Enterprise Object Store Lite?") res.to_list()

Instar a los LLM a utilizar los documentos pertinentes

 RAG_PROMPT = """ DOCUMENT: {documents} QUESTION: {user_question} INSTRUCTIONS: Answer in detail the user's QUESTION using the DOCUMENT text above. Keep your answer ground in the facts of the DOCUMENT. Do not use sentence like "The document states" citing the document. If the DOCUMENT doesn't contain the facts to answer the QUESTION only Respond with "Sorry! I Don't know" """


 context_df = [] def llm_chat(user_question, history): history = history or [] global context_df # Search for relevant document chunks res = search(user_question) documents = " ".join([d["text"].strip() for d in res.to_list()]) # Pass the chunks to LLM for grounded response llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "messages": [ {"role": "user", "content": RAG_PROMPT.format(user_question=user_question, documents=documents) } ], "options": { # "temperature": 0, "top_p": 0.90, }}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response context_df = res.to_pandas() context_df = context_df.drop(columns=['source', 'vector']) def clear_events(): global context_df context_df = [] return context_df

Actualizar el punto final de chat de FastAPI para utilizar RAG

 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 gr.Markdown("### Context Supplied") context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True) ch_interface.clear_btn.click(clear_events, [], context_dataframe) @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe]) def update_chat_context_df(text): global context_df if context_df is not None: return context_df return "" demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


¿Pudiste implementar un chat basado en RAG con MinIO como backend del lago de datos? En un futuro cercano, realizaremos un seminario web sobre este mismo tema en el que te brindaremos una demostración en vivo mientras construimos esta aplicación de chat basada en RAG.

Los trapos son nuestros

Como desarrollador enfocado en la integración de IA en MinIO, exploro constantemente cómo nuestras herramientas se pueden integrar sin problemas en las arquitecturas de IA modernas para mejorar la eficiencia y la escalabilidad. En este artículo, le mostramos cómo integrar MinIO con Retrieval-Augmented Generation (RAG) para crear una aplicación de chat. Esto es solo la punta del iceberg, para darle un impulso en su búsqueda para crear casos de uso más exclusivos para RAG y MinIO. Ahora tiene los elementos básicos para hacerlo. ¡Hagámoslo!


Si tiene alguna pregunta sobre la integración de MinIO RAG, asegúrese de comunicarse con nosotros en Flojo !