I embarked on a mission to integrate Apache Flink with Kafka and PostgreSQL using Docker. What makes this effort particularly exciting is the use of pyFlink, the Python version of Flink, which is powerful and relatively rare. This setup aims to efficiently handle real-time data processing and storage. In the following sections, I will demonstrate how I achieved this, discussing the challenges encountered and how I overcame them. I'll conclude with a step-by-step guide so you can create and experiment with this streaming channel yourself.
The infrastructure we will build is illustrated below. Externally, there is a publishing module that simulates messages from IoT sensors, similar to what was discussed in a Previous post. Inside the Docker container, we will create two Kafka topics. The first topic, sensors, will store incoming messages from IoT devices in real time. A Flink application will then consume the messages from this topic, filter out those with temperatures above 30°C, and publish them to a second topic. alerts. Additionally, the Flink application will insert the consumed messages into a PostgreSQL table created specifically for this purpose. This setup allows us to retain sensor data in a structured tabular format, providing opportunities for further transformation and analysis. Visualization tools like Tableau or Power BI can be connected to this data to make real-time charts and dashboards.
Additionally, other clients can consume the alert topic to initiate actions based on the messages it contains, such as activating air conditioning systems or activating fire safety protocols.
To follow the tutorial, you can clone the following repository. A docker-compose.yml is placed at the root of the project so that it can initialize the multi-container application. Additionally, you can find detailed instructions in the README file.
Issues with Kafka ports in docker-compose.yml
Initially, I ran into issues with Kafka port configuration when using the confluent Kafka Docker image, a popular choice for these types of setups. This issue became evident through the logs, emphasizing the importance of not running docker-compose up in standalone (-d) mode during the initial configuration and troubleshooting phases.
The reason for the failure was that the internal and external hosts were using the same port, which caused connectivity issues. I fixed this by changing the internal port to 19092. I found this Pretty insightful blog post.
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092
Configure Flink in session mode
To run Flink on session mode (allowing multiple jobs on a single cluster), I am using the following directives in docker-compose.yml.
Custom Docker image for PyFlink
Given the limitations of the default Apache Flink Docker image, which does not include Python support, I created a custom docker image for pyFlink. This custom image ensures that Flink can run Python jobs and includes the dependencies necessary for integration with Kafka and PostgreSQL. The Dockerfile used for this is located in the pyflink subdirectory.
- base image: We start with the official Flink image.
- Python installation: Python and pip are installed, updating pip to the latest version.
- Dependency management: Dependencies are installed via requirements.txt. Alternatively, lines are commented out to demonstrate how to manually install dependencies from local files, which is useful for deployment in environments without Internet access.
- Connector Libraries: Connectors for Kafka and PostgreSQL are downloaded directly to the Flink lib directory. This allows Flink to interact with Kafka and PostgreSQL during job execution.
- Copy of scripts: The repository scripts are copied to the /opt/flink directory for execution by the Flink task manager.
With this custom Docker image, we ensure that pyFlink can run correctly inside the Docker container, equipped with the necessary libraries to interact with Kafka and PostgreSQL without problems. This approach provides flexibility and is suitable for both development and production environments.
Note: Ensure that any networking or security considerations for downloading connectors and other dependencies are addressed according to the policies of your deployment environment.
Integrating PostgreSQL
To connect Apache Flink to PostgreSQL database, a suitable JDBC connector is required. The custom Docker image for pyFlink downloads the JDBC connector for PostgreSQL, which is compatible with PostgreSQL 16.
To simplify this process, a download_libs.sh script is included in the repository, which reflects the actions performed in the Flink Docker container. This script automates the download of the necessary libraries, ensuring consistency between Docker and local environments.
Note: Connectors usually have two versions. In this particular case, since I am using Flink 1.18, the latest stable version available, I downloaded 3.1.2–1.18. I assume that the first version tracks the JDBC implementation for various databases. They are available in the maven directory.
env.add_jars(
f"file://{current_dir}/flink-connector-jdbc-3.1.2–1.18.jar",
f"file://{current_dir}/postgresql-42.7.3.jar"
)
JDBC sink definition
In our Flink task, there is a crucial function called configure_postgre_sink located in the usr_jobs/postgres_sink.py file. This function is responsible for configuring a generic PostgreSQL sink. To use it effectively, you must provide the SQL data manipulation language (DML) declaration and the corresponding value types. The types used in streaming data are defined as TYPE_INFO… it took me a while to find the correct declaration .
Also notice that JdbcSink has an optional parameter to define ExecutionOptions. For this particular case, I will use a 1 second refresh interval and limit the number of rows to 200. More information can be found at official documentation. Yes, you guessed it, since I'm defining an interval, this can be considered a microbatch ETL. However, due to Flink's parallelism, it can handle multiple streams at once in a simple script that is still easy to follow.
Note: Don't forget to create the raw_sensors_data table in Postgres, where the raw data coming from the IoT sensors will be received. This is covered in the step-by-step guide in the following sections.
Sink data to Kafka
I have covered how to consume data from a Kafka topic in a previous discussion. However, I haven't set up a receiver yet and that's what we'll do. The configuration has some complexities and is defined in a function, similar to the Postgres receiver. Additionally, you must define the data flow type before sending it to Kafka. Notice that the alarms_data stream is correctly converted as a string with output_type=Types.STRING() before sending it to Kafka, since I declared the serializer as SimpleStringSchema().
I will show you how to recover data from the alert topic in the following steps.
Local or containerized configuration
One of the best things about this Docker setup is that you can run Flink from local or inside the container as a managed task. The local Flink configuration is shown in the following figure, where you can see our Flink application separated from the docker container. This can help troubleshoot Flink, which does not have a good set of native observability tools. Actually, we would like to try indebted tools for Flink, are very promising for monitoring purposes.
If you want to test the Flink application locally, you must correctly define the hosts and ports used by the script, which are actually two constants in the usr_jobs/postgres_sink.py file:
To run containers, use:
KAFKA_HOST = "kafka:19092"
POSTGRES_HOST = "postgres:5432"
For local execution, use:
KAFKA_HOST = "localhost:9092"
POSTGRES_HOST = "localhost:5432"
By default, the repository configures the Flink application to run inside the container. You can monitor jobs running using the web user interface, accessing from http://localhost:8081. You won't be able to see it if you choose to run the job locally.
Note: If you run the job locally, you must install the Flink dependencies located in requirements.txt. A pyproject.toml file is also provided if you want to configure the environment with poetry.