There's still one thing left before we turn our attention to the fun part. The Flink Web UI is an easy-to-use interface that allows developers and administrators to monitor and manage their Apache Flink applications. Provides a real-time overview of running or completed jobs, displays metrics such as throughput and latency, and provides detailed information about the job's execution plan. Basically, it is a convenient dashboard where you can visualize the performance and status of your Flink applications, making the process of debugging, optimizing, and managing your streaming or batch processing jobs much easier and more intuitive.
When you run a Flink application locally like in this example, you typically don't have the Flink web UI enabled. However, there is a way to also get the Flink web UI in a local runtime environment. I find this useful, especially to get an idea of the execution plan before running streaming applications in production.
Let's start by adding a dependency to pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
And slightly change the code in our main class. App.java
:
package de.vojay.flitch;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static void main(String() args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
env.fromSequence(1, Long.MAX_VALUE).print();
env.execute("Flitch");
env.close();
}
}
The streaming app will now process a sequence of numbers, so it won't end immediately. Also with createLocalEnvironmentWithWebUI
We will have the Flink web UI available locally on the port. 8081
while the application is running.
Start again and open http://localhost:8081/ in your browser. In addition to various metrics, you can also view the execution plan of your Flink application.
Now we have a proper local setup and can start connecting our app to Twitch and running sentiment analysis on chat messages.
Twitchthe leading live streaming platform for gamers, offers a comprehensive API and chat function deeply integrated with the Internet Relay Chat (IRC) protocol.
At its core, the Twitch API allows apps to interact with Twitch data. This includes retrieving information about live streams, VOD (video on demand), users and game details. The API is RESTful, meaning it follows the architectural style of the web, making it easy to use with common HTTP requests. Developers can use this API to create custom experiences, such as displaying live streaming statistics, searching for channels, or even automating streaming settings.
Twitch chat is a vital aspect of the Twitch experience, allowing viewers to interact with streamers and other viewers in real time. Beneath the modern Twitch Chat interface lies the Internet Relay Chat (IRC) protocol, a staple of online communication since the late 1980s. This reliance on IRC allows for a wide range of possibilities when it comes to reading and Interact with chat through custom applications.
For our purpose, we simply want to read the chat, without writing messages ourselves. Fortunately, Twitch allows anonymous connections to chat for read-only app use cases.
To reduce implementation effort, we will use an existing library to interact with Twitch: Twitch4J. Twitch4J is a modern Java library designed to simplify integration with Twitch features, including its API, Chat (via IRC), PubSub (for real-time notifications), and Webhooks. Essentially, it's a powerful toolset for Java developers looking to interact with Twitch services without having to directly manage low-level details like HTTP requests or handling the IRC protocol.
The first step is to add Twitch4J as a dependency to the pom.xml
:
<dependency>
<groupId>com.github.twitch4j</groupId>
<artifactId>twitch4j</artifactId>
<version>1.19.0</version>
</dependency>
We would like to have a lightweight, serializable Old Java Object (POJO) to represent Twitch chat messages within our application. We are interested in the channel where the message was written, the user and the content itself.
Create a new class TwitchMessage
with the following implementation:
package de.vojay.flitch;public class TwitchMessage {
private final String channel;
private final String user;
private final String message;
public TwitchMessage(String channel, String user, String message) {
this.channel = channel;
this.user = user;
this.message = message;
}
public String getChannel() {
return channel;
}
public String getUser() {
return user;
}
public String getMessage() {
return message;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer("TwitchMessage{");
sb.append("channel='").append(channel).append('\'');
sb.append(", user='").append(user).append('\'');
sb.append(", message='").append(message).append('\'');
sb.append('}');
return sb.toString();
}
}
As a side note: you don't need to write basic functions like toString()
on your own, you can use IntelliJ to generate it for you. Just click Code → Trigger… → toString()
to obtain the previous result.
We will now use Twitch4J to implement a custom Twitch feed function for Flink. The feed feature will generate an unlimited stream of data, in this case Twitch chat messages. That also means that the app won't end until we explicitly stop it.
The Twitch client can be built like this:
TwitchClientBuilder clientBuilder = TwitchClientBuilder.builder();
client = clientBuilder
.withEnableChat(true)
.build();client.getChat().joinChannel("vojay");
With this example we obtain a client
who joins the Twitch channel called travel. Yes, I was once an active streamer.. Fun fact: In my streams I taught people about game development and software development in general. I also enjoyed playing retro games live streaming . But that's a different topic, let's focus on the project .
You should also notice that there is no authentication in the example above. As said before, since we only want to read the chat, no authentication is needed. In fact, we simply join an IRC chat anonymously and read the messages.
Since we want to connect to Twitch chat only once per source instance, we need to expand the summary RichSourceFunction
class, to be able to cancel the open
function, which allows adding code for initialization.
public class TwitchSource extends RichSourceFunction<TwitchMessage> {
@Override
public void open(Configuration configuration) {
// ...
}// ...
}
We also use our TwitchMessage
POJO for the generic parameter to tell Flink that this source generates elements of type TwitchMessage
.
Additionally, we want to be able to pass a series of Twitch channels that we want to listen to in the source function's constructor.
To control the state of our source function, we use a boolean
called variable running
that we configure in true
in it open
function.
Based on this, the constructor and open
The function looks like this:
public class TwitchSource extends RichSourceFunction<TwitchMessage> {private final String() twitchChannels;
private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;
public TwitchSource(String() twitchChannels) {
this.twitchChannels = twitchChannels;
}
@Override
public void open(Configuration configuration) {
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();
for(String channel : twitchChannels) {
client.getChat().joinChannel(channel);
}
eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);
running = true;
}
// ...
With that, we have everything we need to consume messages and emit them for further processing as a data stream.
He run
The function of a source function is where the magic happens. Here we generate the data and with a given SourceContext
we can emit data.
He SimpleEventHandler
provided by Twitch4J can be used to react to specific messages.
Every time we get an event of type IRCMessageEvent
which is a message in Twitch chat, we instantiate our POJO and cast it to the stream via context.
To ensure that our source function does not terminate, we will add a loop with an artificial delay, which will run until our boolean
variable running
is set to false
. This will be done in the cancel
function, which is called by the Flink environment on shutdown.
@Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException {
eventHandler.onEvent(IRCMessageEvent.class, event -> {
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);ctx.collect(new TwitchMessage(channel, user, message));
});
while(running) {
Thread.sleep(100);
}
}
@Override
public void cancel() {
client.close();
running = false;
}
Altogether, this is the complete implementation of our custom Twitch feed feature for Flink. TwitchSource.java
:
package de.vojay.flitch;import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.TwitchClient;
import com.github.twitch4j.TwitchClientBuilder;
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.common.events.domain.EventUser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class TwitchSource extends RichSourceFunction<TwitchMessage> {
private final String() twitchChannels;
private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;
public TwitchSource(String() twitchChannels) {
this.twitchChannels = twitchChannels;
}
@Override
public void open(Configuration configuration) {
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();
for(String channel : twitchChannels) {
client.getChat().joinChannel(channel);
}
eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);
running = true;
}
@Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException {
eventHandler.onEvent(IRCMessageEvent.class, event -> {
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);
ctx.collect(new TwitchMessage(channel, user, message));
});
while(running) {
Thread.sleep(100);
}
}
@Override
public void cancel() {
client.close();
running = false;
}
}
With this custom source function, we can now expand our broadcast channel in App.java
to simply print each chat message typed in the chat:
package de.vojay.flitch;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static void main(String() args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
TwitchSource twitchSource = new TwitchSource(new String(){"vojay"});
env.addSource(twitchSource)
.print();
env.execute("Flitch");
env.close();
}
}
With addSource
We can add our source function. The elements are then processed in the next step in the sequence, which is print()
. With this sink, we will send each element back to STDOUT.
By running the app now and typing in the chat on https://twitch.tv/vojayMessages will be processed and printed by our streaming application .
Now that we can read Twitch chat as a data stream, it's time to process each message. The basic idea is: for each Twitch message, we detect the individual phrases in the message and calculate the sentiment of each of the phrases. The result will be a structure like this:
Tuple2<TwitchMessage, Tuple2<List<Integer>, List<String>>>
Let's break it down: the result contains the original POJO from the Twitch chat message along with another tuple with 2 elements:
- A list of sentiment scores (
List<Integer>
) that contains the score for each sentence in the message, from 0 (very negative) to 4 (very positive) and - A list of feeling classes (
List<String>
) that contains the readable class for each sentence in the message, for example: Neutral or Negative.