By starting the workflow from the terminal, it is easy to see which step is being executed and the log we put in those steps.
We can also enable human interaction in the circuit simply by using user_feedback = input()
in the workflow. This will pause the workflow and wait for user input (see the example of human intervention in this official Llamaindex) ai/en/stable/examples/workflow/human_in_the_loop_story_crafting/” rel=”noopener ugc nofollow” target=”_blank”>laptop). However, in order to achieve the same functionality in an easy-to-use interface, we needed additional modifications to the original workflow.
The workflow can take a long time to execute, so for a better user experience, Llamaindex provided a way to send streaming events to indicate the progress of the workflow, as shown in the notebook. ai/en/stable/understanding/workflows/stream/” rel=”noopener ugc nofollow” target=”_blank”>hereIn my workflow, I define a WorkflowStreamingEvent
class to include useful information about the event message, such as the type of event and from which step it is sent:
class WorkflowStreamingEvent(BaseModel):
event_type: Literal("server_message", "request_user_input") = Field(
..., description="Type of the event"
)
event_sender: str = Field(
..., description="Sender (workflow step name) of the event"
)
event_content: Dict(str, Any) = Field(..., description="Content of the event")
To enable sending streaming events, the workflow step must have access to the shared context, which is done by adding @step(pass_context=True)
decorator to the step definition. Then, in the step definition, we can send event messages about progress via the context. For example, in the tavily_query()
passed:
@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.data("research_topic") = ev.user_query
query = f"arxiv papers about the state of the art of {ev.user_query}"
ctx.write_event_to_stream(
Event(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=inspect.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{query}'"},
).model_dump()
)
)
In this example, we set the event_type
be “server_message”
. It means that it is an update message and no user action is required. We have another type of event "request_user_input"
which indicates that user input is required. For example, in the gather_feedback_outline()
Step in the workflow, after generating the slide text outlines from the original article summary, a message is sent to ask the user to provide approval and feedback on the outline text:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Present user the original paper summary and the outlines generated, gather feedback from user"""
...# Send a special event indicating that user input is needed
ctx.write_event_to_stream(
Event(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": inspect.currentframe().f_code.co_name,
"event_content": {
"summary": ev.summary,
"outline": ev.outline.dict(),
"message": "Do you approve this outline? If not, please provide feedback.",
},
}
)
)
)
...
These events are handled differently in the backend API and in the frontend logic, which I will describe in detail in later sections of this article.
When sending a "request_user_input"
event to the user, we just want to move on to the next step after We have received the user input. As shown in the above workflow diagram, proceed to the outlines_with_layout()
step if the user approves the scheme, or summary2outline()
retry if the user does not approve.
This is achieved by using the Future()
Python object asyncio
Library. In the SlideGenerationWorkflow
class, we set an attribute self.user_input_future = asyncio.Future()
what can be expected in the gather_feedback_outline()
Step. The subsequent execution of the workflow is conditioned by the content of the user feedback:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...# Wait for user input
if not self.user_input_future.done():
user_response = await self.user_input_future
logger.info(f"gather_feedback_outline: Got user response: {user_response}")
# Process user_response, which should be a JSON string
try:
response_data = json.loads(user_response)
approval = response_data.get("approval", "").lower().strip()
feedback = response_data.get("feedback", "").strip()
except json.JSONDecodeError:
# Handle invalid JSON
logger.error("Invalid user response format")
raise Exception("Invalid user response format")
if approval == ":material/thumb_up:":
return OutlineOkEvent(summary=ev.summary, outline=ev.outline)
else:
return OutlineFeedbackEvent(
summary=ev.summary, outline=ev.outline, feedback=feedback
)
We configure the backend using fastAPI, expose a POST endpoint to handle requests, and start the workflow execution. The asynchronous function run_workflow_endpoint()
accept ResearchTopic
as input. In the function, an asynchronous generator event_generator()
A task is defined that creates a task to run the workflow and transmits events to the client as the workflow progresses. When the workflow completes, it also transmits the final file results to the client.
class ResearchTopic(BaseModel):
query: str = Field(..., example="example query")@app.post("/run-slide-gen")
async def run_workflow_endpoint(topic: ResearchTopic):
workflow_id = str(uuid.uuid4())
wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)
async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}\n\n"
task = asyncio.create_task(wf.run(user_query=topic.query))
logger.debug(f"event_generator: Created task {task}")
try:
async for ev in wf.stream_events():
logger.info(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}\n\n"
await asyncio.sleep(0.1) # Small sleep to ensure proper chunking
final_result = await task
# Construct the download URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"
final_result_with_url = {
"result": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}
yield f"{json.dumps({'final_result': final_result_with_url})}\n\n"
except Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'event': 'error', 'message': error_message})}\n\n"
finally:
# Clean up
workflows.pop(workflow_id, None)
return StreamingResponse(event_generator(), media_type="text/event-stream")
In addition to this endpoint, there are endpoints for receiving user input from the client and handling file download requests. Since each workflow is assigned a unique workflow ID, we can assign the user input received from the client to the correct workflow. By calling the set_result()
In the waiting Future
The pending workflow can resume execution.
@app.post("/submit_user_input")
async def submit_user_input(data: dict = Body(...)):
workflow_id = data.get("workflow_id")
user_input = data.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the future
logger.info(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.done():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.info("submit_user_input: set_result called")
else:
logger.info("submit_user_input: future already done")
return {"status": "input received"}
else:
raise HTTPException(
status_code=404, detail="Workflow not found or future not initialized"
)
The download endpoint also identifies where the final file is located based on the workflow ID.
@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "final.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"final.pptx",
)
else:
raise HTTPException(status_code=404, detail="File not found")
On the interface page, after the user submits the research topic through st.text_input()
A long-running process is started in a background thread in a new event loop to receive events transmitted from the backend, without interfering with the rest of the page:
def start_long_running_task(url, payload, message_queue, user_input_event):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.close()
except Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))...
def main():
...
with st.sidebar:
with st.form(key="slide_gen_form"):
query = st.text_input(
"Enter the topic of your research:",
)
submit_button = st.form_submit_button(label="Submit")
if submit_button:
# Reset the workflow_complete flag for a new workflow
st.session_state.workflow_complete = False
# Start the long-running task in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Starting the background thread...")
st.session_state.workflow_thread = threading.Thread(
target=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"query": query},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.start()
st.session_state.received_lines = ()
else:
st.write("Background thread is already running.")
The event data transmitted from the backend is obtained using httpx.AsyncClient
and are placed in a message queue for further processing. Different information is extracted depending on the type of event. For the type of event “request_user_input”
The thread is also paused until user input is provided.
async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as client:
async with client.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield lineasync def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Starting to fetch streaming data..."))
data_json = None
async for data in fetch_streaming_data(url, payload):
if data:
try:
data_json = json.loads(data)
if "workflow_id" in data_json:
# Send workflow_id to main thread
message_queue.put(("workflow_id", data_json("workflow_id")))
continue
elif "final_result" in data_json:
# Send final_result to main thread
message_queue.put(("final_result", data_json("final_result")))
continue
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ("request_user_input"):
# Send the message to the main thread
message_queue.put(("user_input_required", data_json))
# Wait until user input is provided
user_input_event.wait()
user_input_event.clear()
continue
else:
# Send the line to the main thread
message_queue.put(("message", format_workflow_info(data_json)))
except json.JSONDecodeError: # todo: is this necessary?
message_queue.put(("message", data))
if data_json and "final_result" in data_json or "final_result" in str(data):
break # Stop processing after receiving the final result
We store messages in the st.session_state
and use a st.expander()
to display and update this transmitted data.
if st.session_state.received_lines:
with expander_placeholder.container():
# Create or update the expander with the latest truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()
To ensure that the UI remains responsive and displays event messages when they are processed on a background thread, we use a auto update component to refresh the page at a set interval:
if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, limit=None, key="data_refresh")
When the transmitted event is of type “request_user_input”
We will display the related information in a separate container and collect user feedback. Since there can be multiple events that require user input from a workflow execution, we place them in a message queue and make sure to assign a unique key to the events. st.feedback()
, st.text_area()
and st.button()
which are bound to each event to ensure that widgets do not interfere with each other:
def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
data = st.session_state.user_input_prompt
event_type = data.get("event_type")
if event_type == "request_user_input":
summary = data.get("event_content").get("summary")
outline = data.get("event_content").get("outline")
prompt_message = data.get("event_content").get(
"message", "Please review the outline."
)# display the content for user input
st.markdown("## Original Summary:")
st.text_area("Summary", summary, disabled=True, height=400)
st.divider()
st.markdown("## Generated Slide Outline:")
st.json(outline)
st.write(prompt_message)
# Define unique keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"
# Display the approval feedback widget
approval = st.feedback("thumbs", key=approval_key)
st.write(f"Current Approval state is: {approval}")
logging.info(f"Current Approval state is: {approval}")
# Display the feedback text area
feedback = st.text_area(
"Please provide feedback if you have any:", key=feedback_key
)
# Handle the submission of user response
if st.button(
"Submit Feedback", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and feedback using unique keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")
# Ensure approval_state is valid
if approval_state not in (0, 1):
st.error("Please select an approval option.")
return
user_response = {
"approval": (
":material/thumb_down:"
if approval_state == 0
else ":material/thumb_up:"
),
"feedback": user_feedback,
}
# Send the user's response to the backend
try:
response = requests.post(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.info(
f"Backend response for submitting approval: {response.status_code}"
)
except requests.RequestException as e:
st.error(f"Failed to submit user input: {str(e)}")
return
...
At the end, when the workflow execution finally finishes, the frontend client will receive a response containing the path to the final generated files (the same presentation in pdf format for rendering in the UI and in pptx format for download as the final result). We display the pdf file and create a button to download the pptx file:
if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
try:
# Fetch the PDF content
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.contentst.markdown("### Generated Slide Deck:")
# Display the PDF using an iframe
st.markdown(
f'',
unsafe_allow_html=True,
)
except Exception as e:
st.error(f"Failed to load the PDF file: {str(e)}")
# Provide the download button for PPTX if available
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
try:
# Fetch the PPTX content
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content
st.download_button(
label="Download Generated PPTX",
data=pptx_data,
file_name="generated_slides.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
)
except Exception as e:
st.error(f"Failed to load the PPTX file: {str(e)}")
We will create a multi-service Docker application with docker-compose
to run frontend and backend applications.
version: '3.8'services:
backend:
build:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./data:/app/data
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure
frontend:
build:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network
networks:
app-network:
That's all! Just run! docker-compose up
And now we have an application that can run a research workflow based on the query entered by the user, ask for feedback during execution, and show the final result.