Imagine this. We have a fully functional machine learning process and it is perfect. So we decided to take it to the production environment. Everything is fine in production, and one day a small change occurs in one of the components that generates input data for our pipeline, and the pipeline breaks. Oops!!!
Why did this happened??
Because machine learning models are highly dependent on the data being used, remember the old saying: Garbage in, garbage out. With the right data, the process works well; Any change tends to go wrong.
Data passing into pipelines is primarily generated through automated systems, reducing control over the type of data that is generated.
So what do we do?
Data validation is the answer.
Data validation is the gatekeeper system that would check if the data is in the proper format for the pipeline to consume.
Read this article to understand why validation is crucial in a machine learning process and the five stages of machine learning validations.
TensorFlow Data Validation (TFDV) is part of the TFX ecosystem and can be used to validate data in an ML pipeline.
TFDV calculates descriptive statistics, schemas and identifies anomalies by comparing training and service data. This ensures that training and delivery data is consistent and does not disrupt or create unwanted predictions in the process.
The people at Google wanted TFDV to be used from the earliest stage of a machine learning process. Therefore, they made sure that TFDV could be used with laptops. Let's do the same here.
To get started, we need to install the tensorflow-data-validation library using pip. Preferably create a virtual environment and start with your installations.
A note of caution: Before installation, make sure the version is supported in the TFX libraries.
pip install tensorflow-data-validation
The following are the steps we will follow for the data validation process:
- Generate statistics from training data
- Infer schemas from training data
- Generate statistics for evaluation data and compare them with training data
- Identify and correct anomalies
- Checking for data deviations and biases
- Save the schematic
Here we will use 3 types of data sets; training data, evaluation data and service data, to mimic real-time usage. The ML model is trained using the training data. Evaluation data, also known as test data, is part of the data designated to test the model as soon as the training phase is completed. Service data is presented to the model in the production environment to make predictions.
The complete code discussed in this article is available in my GitHub repository. You can download it from here.
We will use the Titanic spaceship data set from Kaggle. You can get more information and download the dataset using this link.
The data is made up of a mix of numerical and categorical data. It is a classification data set and the class label is Transported
. It has the value True or False.
The necessary imports are made and the paths for the csv file are defined. The actual data set contains the training and testing data. I introduced some errors manually and saved the file as 'titanic_test_anomalies.csv' (This file is not available on Kaggle. You can download it from my GitHub repository). link).
Here, we will use ANOMALOUS_DATA as evaluation data and TEST_DATA as service data.
import tensorflow_data_validation as tfdv
import tensorflow as tfTRAIN_DATA = '/data/titanic_train.csv'
TEST_DATA = '/data/titanic_test.csv'
ANOMALOUS_DATA = '/data/titanic_test_anomalies.csv'
The first step is to analyze the training data and identify its statistical properties. TFDV has the generate_statistics_from_csv
function, which directly reads data from a csv file. TFDV also has a generate_statistics_from_tfrecord
It works if you have the data like TFRecord
.
He visualize_statistics
The feature presents an 8-point summary, along with useful graphs that can help us understand the underlying statistics of the data. This is called Facet view. Some critical details that need our attention are highlighted in red. Many other functions for analyzing data are available here. Play and get to know it better.
# Generate statistics for training data
train_stats=tfdv.generate_statistics_from_csv(TRAIN_DATA)
tfdv.visualize_statistics(train_stats)
Here we see missing values in the Age and RoomService functions that need to be imputed. We also see that RoomService has 65.52% zeros. It's the way this particular data is distributed, so we don't consider it an anomaly and move on.
Once all the problems have been satisfactorily solved, we infer the scheme using the infer_schema
function.
schema=tfdv.infer_schema(statistics=train_stats)
tfdv.display_schema(schema=schema)
The outline is usually presented in two sections. The first section presents details such as data type, presence, valence and its domain. The second section presents the values that constitute the domain.
This is the initial raw outline, we will refine it in the later steps.
Now we collect the evaluation data and generate the statistics. We need to understand how anomalies should be handled, so we will use ANOMALOUS_DATA as our evaluation data. We have manually introduced anomalies into this data.
After generating the statistics, we visualize the data. The visualization can be applied only to the evaluation data (as we did with the training data), however, it makes more sense to compare the statistics of the evaluation data with the training statistics. This way we can understand how different the evaluation data is from the training data.
# Generate statistics for evaluation dataeval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA)
tfdv.visualize_statistics(lhs_statistics = train_stats, rhs_statistics = eval_stats,
lhs_name = "Training Data", rhs_name = "Evaluation Data")
Here we can see that the RoomService function is absent in the evaluation data (Big Red Flag). The other features seem quite good, as they exhibit similar distributions to the training data.
However, observing is not enough in a production environment, so we will ask TFDV to actually analyze and report if everything is okay.
Our next step is to validate the statistics obtained from the evaluation data. We are going to compare it with the scheme that we had generated with the training data. He display_anomalies
The function will give us a tabulated view of the anomalies that TFDV has identified and also a description.
# Identifying Anomalies
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)
From the table, we see that our evaluation data is missing 2 columns (Transported and RoomService), the Destination function has an additional value called 'Anomaly' in its domain (which was not present in the training data), the CryoSleep functions and VIP have 'TRUE' and 'FALSE' values that are not present in the training data; finally, 5 features contain integer values, while the scheme expects floating point values.
That's a handful. So let's get to work.
There are two ways to correct anomalies; Process the evaluation data (manually) to ensure that it conforms to the schema, or modify the schema to ensure that these anomalies are accepted. Again, a domain expert has to decide which anomalies are acceptable and which require data processing.
Let's start with the 'Destination' function. We found a new value “Anomaly”, which was missing from the domain list of the training data. Let's add it to the domain and say that it is also an acceptable value for the feature.
# Adding a new value for 'Destination'
destination_domain=tfdv.get_domain(schema, 'Destination')
destination_domain.value.append('Anomaly')anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)
We have removed this anomaly and the anomaly list no longer shows it. Let's move on to the next one.
Looking at the VIP and CryoSleep domains, we see that the training data has values in lower case, while the evaluation data has the same values in upper case. One option is to preprocess the data and ensure that all data is converted to lowercase or uppercase. However, we are going to add these values in the domain. Since VIP and CryoSleep use the same set of values (true and false), we configure CryoSleep's domain to use VIP's domain.
# Adding data in CAPS to domain for VIP and CryoSleepvip_domain=tfdv.get_domain(schema, 'VIP')
vip_domain.value.extend(('TRUE','FALSE'))
# Setting domain of one feature to another
tfdv.set_domain(schema, 'CryoSleep', vip_domain)
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)
It is quite safe to convert whole entities to floats. We then ask the evaluation data to infer data types from the schema of the training data. This solves the problem related to data types.
# INT can be safely converted to FLOAT. So we can safely ignore it and ask TFDV to use schemaoptions = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
eval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA, stats_options=options)
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)
Finally, we end up with the last set of anomalies; 2 columns that are present in the Training data are missing from the Evaluation data.
'Transported' is the class label and will obviously not be available in the Assessment data. To solve cases where we know that the training and evaluation functions may differ from each other, we can create multiple environments. Here we create an environment of Training and Service. We specify that the 'Transported' feature will be available in the Training environment but will not be available in the Service environment.
# Transported is the class label and will not be available in Evaluation data.
# To indicate that we set two environments; Training and Servingschema.default_environment.append('Training')
schema.default_environment.append('Serving')
tfdv.get_feature(schema, 'Transported').not_in_environment.append('Serving')
serving_anomalies_with_environment=tfdv.validate_statistics(
statistics=eval_stats, schema=schema, environment='Serving')
tfdv.display_anomalies(serving_anomalies_with_environment)
'RoomService' is a required feature that is not available in the service environment. These cases require manual interventions by experts in the field.
Continue solving problems until you get this result.
All anomalies have been resolved.
The next step is to check for deviations and biases. Bias occurs due to an irregularity in the distribution of data. Initially, when a model is trained, its predictions are usually perfect. However, as time passes, the distribution of the data changes and the classification errors begin to increase, this is called drift. These problems require model retraining.
The L-infinity distance is used to measure pitch and drift. A threshold value is set based on the L-infinity distance. If the difference between the analyzed features in the training and service environment exceeds the given threshold, the feature is considered to have experienced drift. A similar approach based on thresholds for bias is followed. For our example, we have set the threshold level to 0.01 for both drift and deviation.
serving_stats = tfdv.generate_statistics_from_csv(TEST_DATA)# Skew Comparator
spa_analyze=tfdv.get_feature(schema, 'Spa')
spa_analyze.skew_comparator.infinity_norm.threshold=0.01
# Drift Comparator
CryoSleep_analyze=tfdv.get_feature(schema, 'CryoSleep')
CryoSleep_analyze.drift_comparator.infinity_norm.threshold=0.01
skew_anomalies=tfdv.validate_statistics(statistics=train_stats, schema=schema,
previous_statistics=eval_stats,
serving_statistics=serving_stats)
tfdv.display_anomalies(skew_anomalies)
We can see that the level of deviation exhibited by 'Spa' is acceptable (since it is not listed in the anomaly list); however, 'CryoSleep' exhibits high levels of drift. When creating automated pipelines, these anomalies could be used as triggers for automated model retraining.
After resolving all anomalies, the schema could be saved as an artifact or it could be saved in the metadata repository and could be used in the ML pipeline.
# Saving the Schema
from tensorflow.python.lib.io import file_io
from google.protobuf import text_formatfile_io.recursive_create_dir('schema')
schema_file = os.path.join('schema', 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)
# Loading the Schema
loaded_schema= tfdv.load_schema_text(schema_file)
loaded_schema
You can download the notebook and data files from my GitHub repository using this link
You can read the following articles to learn what your options are and how to select the right framework for your ML pipeline project.
Thanks for reading my article. If you like it, encourage me by giving me some pats, and if you're on the other end of the spectrum, let me know what can be improved in the comments. Hello.
Unless otherwise noted, all images are the author's.