Dado que nuestro servidor se ejecuta localmente, el Webhook de WhatsApp no puede llamar al punto final para su verificación. Lo que necesitamos es una URL pública que pueda ser utilizada por el webhook. Hay dos opciones: implementar la aplicación en un servidor en la nube o crear un túnel de servidor proxy. Como todavía estamos en el proceso de desarrollo, usaremos la segunda opción.
- Ir a Registrarse y crea una cuenta gratuita.
- Instale ngrok localmente. Dependiendo de su sistema, puede usar Brew, Chocolatey o simplemente descargarlo e instalarlo. Ver: Configuración e instalación.
- Después de la instalación, agregue su código de autenticación usando el siguiente comando en su terminal. Reemplazar
$YOUR-AUTHENTICATION_TOKEN
con su token de autenticación de ngrok, que se puede encontrar en “Su token de autenticación” en el panel de control de ngrok. - Comience a reenviar tráfico desde su host local en el puerto 8000 ejecutando el siguiente comando en su terminal:
> ngrok config add-authtoken $YOUR-AUTHENTICATION_TOKEN
> ngrok http http://localhost:8000Forwarding https://.ngrok.io -> http://localhost:8000
Ahora se puede acceder a su servidor local a través de URL públicas proporcionadas por ngrok. Deberías ver algo como esto:
Forwarding https://.ngrok.io -> http://localhost:8000
Utilice la URL HTTPS proporcionada por ngrok para la configuración del webhook.
Ahora volvamos a la API de la nube de Meta para implementar el webhook deseado.
- Navegar a <a target="_blank" class="af qa" href="https://developers.facebook.com/apps/” rel=”noopener ugc nofollow” target=”_blank”>Meta para desarrolladores y seleccione la aplicación creada antes.
- En el menú de la izquierda ve a WhatsApp > Configuración.
- En el gancho web sección pegue su URL de reenvío HTTPS de ngrok en el URL de devolución de llamada campo e ingrese el
VERIFICATION_TOKEN
definido enmain.py
en el Ficha de verificación campo. - Haga clic en el botón confirmar y guardar y espere a que el webhook verifique su backend.
- en la sección Campos de webhook habilitar el
messages
alternar debajo Campos suscritos.
¡Eso es todo! Ahora debería poder recibir mensajes de WhatsApp en su servidor backend de Python.
Los webhooks son devoluciones de llamada HTTP que permiten a los programas recibir actualizaciones en tiempo real cuando ocurren ciertos eventos, como un mensaje nuevo o un cambio de estado. Los webhooks hacen posible la integración y la automatización del sistema al entregar una solicitud HTTP que contiene datos de eventos a una URL preconfigurada (en nuestro caso, la URL del servidor proxy ngrok).
Para comprender la lógica y los precios detrás de los webhooks en el Metacosmos, es útil comprender algunos principios básicos sobre las conversaciones.
Una 'conversación' en la API de WhatsApp comienza cuando:
1. El Usuario envía un mensaje: esto abre una ventana de 24 horas, durante la cual puede responder con mensajes que incluyan texto, imágenes u otros medios. sin costos adicionales.
2. El contacto de iniciación empresarial: Si no se ha recibido ningún mensaje de usuario recientemente (no hay una ventana abierta de 24 horas), su asistente de IA debe usar un mensaje de plantilla preaprobada para iniciar la conversación. Puede agregar plantillas personalizadas, pero deben ser aprobadas por Meta.
Mientras el usuario siga respondiendo, la ventana de 24 horas se restablece con cada mensaje nuevo. Esto hace posible tener una interacción continua sin costos adicionales. Una conversación cuesta entre 0,00 y 0,08 USD. El precio concreto se basa en su tipo de conversación Marketing, Utilidad, Servicio y su ubicación. Para su información: Hoy en día, las conversaciones de servicio parecen ser gratuitas. Puede encontrar el precio concreto aquí: <a target="_blank" class="af qa" href="https://developers.facebook.com/docs/whatsapp/pricing” rel=”noopener ugc nofollow” target=”_blank”>Precios de WhatsApp
Ahora podemos recibir mensajes en nuestro backend. Dado que nos hemos suscrito a objetos de mensaje, cada vez que se envía un mensaje a su número de prueba, el webhook creará una solicitud POST a la URL de devolución de llamada que definió en el paso anterior. Lo que debemos hacer a continuación es crear un punto final para solicitudes POST en nuestra aplicación FastAPI.
Primero definamos los requisitos:
- Devuelve un código de estado HTTP 200: Esto es esencial para informar a CloudAPI que el mensaje se recibió correctamente. De no hacerlo, CloudAPI volverá a intentar enviar el mensaje durante un máximo de 7 días.
- Extraer número de teléfono y mensaje: La carga útil de la solicitud entrante contiene datos que incluyen el número de teléfono y el mensaje. Que necesitamos procesar en el backend.
- Filtrar objetos entrantes: Dado que CloudAPI puede enviar varios eventos para el mismo mensaje (como enviado, recibido y leído), el backend debe garantizar que solo se procese una instancia del mensaje.
- Manejar múltiples tipos de mensajes: El backend puede manejar diferentes tipos de mensajes, como texto, mensajes de voz e imágenes. Para no extender el alcance del artículo, solo sentaremos las bases para las imágenes, pero no las implementaremos hasta el final.
- Proceso con flujo de trabajo LLM-Agent: La información extraída se procesa utilizando el flujo de trabajo LLM-Agent, que hemos desarrollado en partes anteriores de esta serie. También puede utilizar otra implementación agente, por ejemplo, Langchain o Langgraph.
Recibiremos una carga útil de un webhook. Puede encontrar cargas útiles de ejemplo en la documentación de Meta: <a target="_blank" class="af qa" href="https://developers.facebook.com/docs/whatsapp/cloud-api/webhooks/payload-examples/” rel=”noopener ugc nofollow” target=”_blank”>Carga útil de ejemplo
Prefiero escribir mi código con Pydantic para agregar seguridad de tipos a mi código Python. Además, las anotaciones de tipo y Pydantic son una combinación óptima para las aplicaciones FastAPI. Entonces, primero definamos los modelos utilizados en nuestro punto final:
# app/schema.py
from typing import List, Optional
from pydantic import BaseModel, Field class Profile(BaseModel):
name: str
class Contact(BaseModel):
profile: Profile
wa_id: str
class Text(BaseModel):
body: str
class Image(BaseModel):
mime_type: str
sha256: str
id: str
class Audio(BaseModel):
mime_type: str
sha256: str
id: str
voice: bool
class Message(BaseModel):
from_: str = Field(..., alias="from")
id: str
timestamp: str
text: Text | None = None
image: Image | None = None
audio: Audio | None = None
type: str
class Metadata(BaseModel):
display_phone_number: str
phone_number_id: str
class Value(BaseModel):
messaging_product: str
metadata: Metadata
contacts: List(Contact) | None = None
messages: List(Message) | None = None
class Change(BaseModel):
value: Value
field: str
statuses: List(dict) | None = None
class Entry(BaseModel):
id: str
changes: List(Change)
class Payload(BaseModel):
object: str
entry: List(Entry)
class User(BaseModel):
id: int
first_name: str
last_name: str
phone: str
role: str
class UserMessage(BaseModel):
user: User
message: str | None = None
image: Image | None = None
audio: Audio | None = None
A continuación, crearemos algunas funciones auxiliares para usar la inyección de dependencia en FastAPI:
# app/main.pyfrom app.domain import message_service
def parse_message(payload: Payload) -> Message | None:
if not payload.entry(0).changes(0).value.messages:
return None
return payload.entry(0).changes(0).value.messages(0)
def get_current_user(message: Annotated(Message, Depends(parse_message))) -> User | None:
if not message:
return None
return message_service.authenticate_user_by_phone_number(message.from_)
def parse_audio_file(message: Annotated(Message, Depends(parse_message))) -> Audio | None:
if message and message.type == "audio":
return message.audio
return None
def parse_image_file(message: Annotated(Message, Depends(parse_message))) -> Image | None:
if message and message.type == "image":
return message.image
return None
def message_extractor(
message: Annotated(Message, Depends(parse_message)),
audio: Annotated(Audio, Depends(parse_audio_file)),
):
if audio:
return message_service.transcribe_audio(audio)
if message and message.text:
return message.text.body
return None
- Analizando la carga útil: El
parse_message
La función extrae el primer mensaje de la carga útil entrante si existe. Esta función devuelveNone
si no se encuentran mensajes, de modo que solo se procesen los mensajes válidos. - Autenticación de usuario: El
get_current_user
La función utiliza elparse_message
inyección de dependencia para extraer el mensaje y luego autentica al usuario en función del número de teléfono asociado con el mensaje. Aquí nos aseguramos de que sólo los usuarios autenticados puedan enviar mensajes. - Análisis de audio e imagen: Estas funciones extraen archivos de audio o imagen del mensaje si el tipo de mensaje es “audio” o “imagen”, respectivamente. Esto permite que la aplicación maneje diferentes tipos de medios.
- Extracción de mensajes: El
message_extractor
La función intenta extraer texto del mensaje o transcribir audio a texto. Esto garantiza que, independientemente del tipo de mensaje, el contenido pueda procesarse.
Aquí tenemos una importación desde nuestra capa de dominio. todo el guion message_service
es donde colocamos todo el código específico del dominio para esta implementación, como authenticate_user_by_phone_number
y transcribe_audio
.
# app/main.py
import threading
from typing_extensions import Annotated
from fastapi import APIRouter, Query, HTTPException, Depends
from app.domain import message_service
from app.schema import Payload, Message, Audio, Image, User # ... rest of the code ...
@app.post("/", status_code=200)
def receive_whatsapp(
user: Annotated(User, Depends(get_current_user)),
user_message: Annotated(str, Depends(message_extractor)),
image: Annotated(Image, Depends(parse_image_file)),
):
if not user and not user_message and not image:
return {"status": "ok"}
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
if image:
return print("Image received")
if user_message:
thread = threading.Thread(
target=message_service.respond_and_send_message,
args=(user_message, user)
)
thread.daemon = True
thread.start()
return {"status": "ok"}
- Implementación del punto final POST: Este punto final maneja la solicitud POST entrante. Comprueba si el usuario, mensaje o imagen es válido. Si ninguno es válido, simplemente devuelve un mensaje de estado a CloudAPI. Si el usuario no está autenticado, genera un
HTTPException
con un código de estado 401. - Procesamiento de imágenes y mensajes: Si se recibe una imagen, hacemos una impresión estándar simple como marcador de posición para el manejo futuro de la imagen. Si se recibe un mensaje de texto, se procesa de forma asincrónica utilizando un hilo separado para evitar bloquear el hilo principal de la aplicación. El
message_service.respond_and_send_message
Se invoca la función para manejar el mensaje de acuerdo con el flujo de trabajo de LLM-Agent.
Explicación sobre el uso de la agrupación de subprocesos para el webhook: WhatsApp reenviará el webhook hasta que obtenga una respuesta 200, por lo que se utiliza la agrupación de subprocesos para garantizar que el manejo de mensajes no bloquee la respuesta del webhook.
En nuestra capa de presentación donde previamente definimos nuestro punto final, usamos algunos message_service
funciones que deben definirse a continuación. Específicamente, necesitamos una implementación para procesar y transcribir cargas de audio, autenticar usuarios y, finalmente, invocar a nuestro agente y enviar una respuesta. Toda esta funcionalidad la colocaremos dentro domain/message_service.py
. En entornos de producción, a medida que su aplicación crece, recomendaría dividirlas en, por ejemplo, transcription_service.py
, message_service.py
y authentication_service.py
.
En múltiples funciones de esta sección, realizaremos solicitudes a la Meta API "https://graph.facebook.com/..."
. En todas estas solicitudes, debemos incluir encabezados de autorización con WHATSAPP_API_KEY
que creamos en paso 1.3como token al portador. Normalmente almaceno claves API y tokens en un .env
archivo y acceder a ellos con Python dotenv
biblioteca. También utilizamos el cliente OpenAI con su OPENAI_API_KEY
que también podría almacenarse en el .env
archivo.
Pero para simplificar, simplemente los colocaremos e inicializaremos en la parte superior de message_service.py
guiones de la siguiente manera:
import os
import json
import requests
from typing import BinaryIOWHATSAPP_API_KEY = "YOUR_ACCESS_TOKEN"
llm = OpenAI(api_key="YOUR_OPENAI_API_KEY")
Reemplace “YOUR_ACCESS_TOKEN” con su token de acceso real que creó en el paso 1.3.
Manejar registros de voz desde un webhook de WhatsApp no es tan sencillo como parece. En primer lugar, es importante saber que el webhook entrante solo nos dice el tipo de datos y un ID de objeto. Por lo tanto, no contiene el archivo de audio binario. Primero tenemos que descargar el archivo de audio usando la API Graph de Meta. Para descargar nuestro audio recibido, debemos realizar dos solicitudes secuenciales. La primera es una solicitud GET con el object_id
para obtener la URL de descarga. Esta URL de descarga es el objetivo de nuestra segunda solicitud GET.
def download_file_from_facebook(file_id: str, file_type: str, mime_type: str) -> str | None:
# First GET request to retrieve the download URL
url = f"https://graph.facebook.com/v19.0/{file_id}"
headers = {"Authorization": f"Bearer {WHATSAPP_API_KEY}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
download_url = response.json().get('url')
# Second GET request to download the file
response = requests.get(download_url, headers=headers)
if response.status_code == 200:
# Extract file extension from mime_type
file_extension = mime_type.split('/')(-1).split(';')(0)
# Create file_path with extension
file_path = f"{file_id}.{file_extension}"
with open(file_path, 'wb') as file:
file.write(response.content)
if file_type == "image" or file_type == "audio":
return file_path
raise ValueError(f"Failed to download file. Status code: {response.status_code}")
raise ValueError(f"Failed to retrieve download URL. Status code: {response.status_code}")
Aquí, básicamente obtenemos la URL de descarga y descargamos el archivo al sistema de archivos usando el ID del objeto y la extensión del archivo como su file_path
. Si algo falla, planteamos un ValueError
que indica dónde ocurrió el error.
A continuación, simplemente definimos una función que toma el binario de audio y lo transcribe usando Whisper:
def transcribe_audio_file(audio_file: BinaryIO) -> str:
if not audio_file:
return "No audio file provided"
try:
transcription = llm.audio.transcriptions.create(
file=audio_file,
model="whisper-1",
response_format="text"
)
return transcription
except Exception as e:
raise ValueError("Error transcribing audio") from e
Y finalmente, juntemos las funciones de descarga y transcripción:
def transcribe_audio(audio: Audio) -> str:
file_path = download_file_from_facebook(audio.id, "audio", audio.mime_type)
with open(file_path, 'rb') as audio_binary:
transcription = transcribe_audio_file(audio_binary)
try:
os.remove(file_path)
except Exception as e:
print(f"Failed to delete file: {e}")
return transcription
Mientras usamos el número de prueba proporcionado por Meta, tenemos que predefinir a qué números nuestro chatbot puede enviar mensajes. No estoy muy seguro y no he probado si algún número puede enviar un mensaje a nuestro chatbot. Pero de todos modos, tan pronto como cambiemos a un número personalizado, no queremos que nadie pueda ejecutar nuestro chatbot de agente. Entonces necesitamos un método para autenticar al usuario. Tenemos varias opciones para hacer esto. En primer lugar, tenemos que pensar en dónde almacenar la información del usuario. Podríamos utilizar, por ejemplo, una base de datos como PostgreSQL o una base de datos no relacional como Firestore. Podemos predefinir nuestros usuarios en el sistema de archivos en un archivo JSON o en un .env
archivo. Para este tutorial, optaré por la forma más sencilla y codificaré al usuario dentro de una lista en nuestra función de autenticación.
Una entrada de lista tiene la estructura de la User
modelo tal como se define en paso 5.1. Entonces, un usuario consta de una identificación, nombre, apellido y número de teléfono. Aún no hemos implementado un sistema de roles en el flujo de trabajo de nuestro agente. Pero en la mayoría de los casos de uso con diferentes usuarios, como en el caso de un asistente de pequeña empresa, diferentes usuarios tendrán diferentes derechos y alcances de acceso. Por ahora solo pasamos "default"
como rol de marcador de posición.
def authenticate_user_by_phone_number(phone_number: str) -> User | None:
allowed_users = (
{"id": 1, "phone": "+1234567890", "first_name": "John", "last_name": "Doe", "role": "default"},
{"id": 2, "phone": "+0987654321", "first_name": "Jane", "last_name": "Smith", "role": "default"}
)
for user in allowed_users:
if user("phone") == phone_number:
return User(**user)
return None
Así que simplemente verifique si el número de teléfono está en nuestra lista de allowed_users
y devolver al usuario si es así. De lo contrario, volvemos None
. Si observa nuestro punto final en paso 5.3verá que generamos un error si el usuario es None
para evitar el procesamiento posterior de mensajes de usuarios no autorizados.
Ahora, nuestra última función auxiliar antes de que podamos invocar a nuestro agente es send_whatsapp_message
. He incluido dos modos en esta función debido a alguna lógica metaespecífica de la API de WhatsApp.
Básicamente, no se le permite enviar un mensaje personalizado a un usuario para iniciar una conversación. Esto significa que puede responder con un mensaje de texto individual si el usuario inicia la conversación y escribe un mensaje al chatbot primero. De lo contrario, si desea que el chatbot inicie una conversación, está limitado a plantillas aprobadas, como la plantilla “Hola mundo”.
También es importante mencionar que cuando hablamos de Metalógica, una conversación luego de iniciarse abre una ventana de conversación de 24 horas en la que puedes enviar mensajes a ese usuario. Esta ventana de conversación también es la que se cobra, no el mensaje individual. Se vuelve un poco más complejo según el tipo de conversación, como marketing, soporte, etc.
También puedes definir una plantilla por tu cuenta y dejar que Meta la apruebe. No lo he hecho en este momento, así que para probar si podemos enviar un mensaje desde nuestro backend a un usuario, uso la plantilla “Hola mundo”. Si agrega algunas plantillas aprobadas personalizadas, también puede usar esta función para enviárselas al usuario.
Así que volvamos al código. Para enviar un mensaje, realizamos una solicitud POST y definimos una carga útil que incluye el cuerpo del texto o la plantilla:
def send_whatsapp_message(to, message, template=True):
url = f"https://graph.facebook.com/v18.0/289534840903017/messages"
headers = {
"Authorization": f"Bearer " + WHATSAPP_API_KEY,
"Content-Type": "application/json"
}
if not template:
data = {
"messaging_product": "whatsapp",
"preview_url": False,
"recipient_type": "individual",
"to": to,
"type": "text",
"text": {
"body": message
}
}
else:
data = {
"messaging_product": "whatsapp",
"to": to,
"type": "template",
"template": {
"name": "hello_world",
"language": {
"code": "en_US"
}
}
} response = requests.post(url, headers=headers, data=json.dumps(data))
return response.json()
Finalmente, podemos integrar nuestro agente de nuestros ejemplos anteriores. En esta etapa, también puedes integrar tu agente personalizado, un Langchain. AgentExecutor
Langgrafo AgentWorkflow
etc.
Entonces, nuestra función principal que se llamará en cada mensaje entrante es respond_and_send_message
que toma la user_message
cadena y la pasa a nuestro flujo de trabajo de agente como objeto de entrada.
# app/domain/message_service.py
import json
import requests
from app.domain.agents.routing_agent import RoutingAgent
from app.schema import User def respond_and_send_message(user_message: str, user: User):
agent = RoutingAgent()
response = agent.run(user_message, user.id)
send_whatsapp_message(user.phone, response, template=False)
Después de invocar a nuestro agente, recibimos un mensaje de respuesta que queremos enviar al usuario usando la función send_whatsapp_message.