WorryFree Computers   »   [go: up one dir, main page]

Functional Elegance: Making Spark Applications Cleaner with the Cats Library

Shalom Sanevich
AppsFlyer Engineering
8 min readApr 25, 2024

--

During my work, I’ve noticed that there are two approaches to Scala programming. Some people use Scala as a DSL to Apache Spark, but do not use the advanced features of the language itself. Others write backends in Scala, using both language constructs and Cats, ZIO, and other libraries. However, it is sometimes possible to benefit from the knowledge of Scala FP approaches when writing common Spark ETL applications.

This article explores the use of the Cats library in Apache Spark, and how its functional approaches and abstractions can enhance the code’s structure, readability, modularity, and reusability.

The Challenge

I was tasked with writing a Spark application that computes certain metrics simultaneously with processing the dataset. The goal was not merely to calculate metrics once, but also to write the code in a way that allows for reuse in other Spark applications.

Example Description

The example, taken from the current project and modified for simplicity, is purely coincidental.

I decided to show this solution on something more or less close to reality, as I think it will be clearer that way.

MedCleanData is the owner of some medical records, and their customers are insurance companies. An insurance company has access to aggregated data of patients insured by them.

For this purpose, MedCleanData has created a special report generation interface. Here, the insurance company can select the grouping parameters of interest (for example, diagnosis, city, gender), and statistical metrics (number of patients, average age, etc.) — and then define the sampling dates.

You can also add your own functions and group results by function (e.g. get_month(appointment_time), and make the month of visit as a grouping parameter). In the final report, rows will be removed if they have less than 20 unique patients grouped together.

The application should be designed to return calculated technical metrics and metadata alongside the data processing. This can be data of any type. We will call them metadata.

Let’s formalize the application requirements:

  1. Extract the MedCleanData dataset and filter it for the insurance company, applying a filter by sampling date.
    Record the following metadata:
    - the size of the filtered dataset.
    - the number of unique patients in the filtered dataset.
    - the min and max appointment_time for the filtered records.
  2. Add custom fields with functions.
    Record the following metadata:
    -
    If an error occurs while adding a function, insert a constant null and record the error message in the metadata.
  3. Group the report and truncate cohorts if there are less than 20 unique patients grouped in them.
    Record the following metadata:
    -
    Number of cohorts truncated.
    - Number of rows in the report.
  4. Save the report to the insurance company’s S3 bucket and send all of the collected metadata to the backend.

Approach 1 — Naive

Let's start writing the application directly. For the sake of simplicity, we'll omit the functionality related to retrieving the configuration and saving and sending the result, and just write a function.

Here MedReportConfig is a class that describes the application configuration, and ReportMetadata is a class that will store that metadata in it.

where

The report configuration and result statistics classes are ready. Now let’s write the body of generateReport. According to requirement (1) it is necessary to read MedCleanData dataset, filter it, and collect some metadata.

Then, as per requirement (2), we need to add custom fields and save error messages — if there are any.

Finally, requirement (3) is that the report should be grouped and cohorts should be truncated if there are less than 20 unique patients in the group.

First, let’s get the statistical metrics metrics: Map[String, String] from the application configuration, and convert them to a format that Spark understands: — List[Column].

Let’s add our mandatory statistical metric to this list — count distinct by patient_id — , as it will be useful for the final filtering of cohorts with less than 20 patients.

We will then group by the grouping parameters set by the user, and aggregate by the statistical metrics set by the user in the configuration and the mandatory statistical metric we added in the previous step: unique_patients_count. At the same time, we will calculate the number of truncated cohorts and the size of the report to form the future metadata.

Let’s put all the code pieces together:

Run generateReport:

The code successfully worked and printed the following:

By the way, this kind of case class output is possible because of the PPrint library.

Now let’s examine the advantages and disadvantages of the code above:

Advantage:

  • The code compiles and does what it needs to do.

Disadvantages:

  • Long method
  • Unreadable
  • Non-composable.

From my perspective, this code is not readable because it mixes the process of dataset transformation with metadata counting — and worse, it’s almost impossible to rewrite within a method.

Since metadata needs to be counted on intermediate transformations, these results of intermediate transformations need to be written into variables. It is of course possible to write the whole pipeline of transformations first, and only collect the metadata at the end — but then the code for collecting the metadata will be far away from the location of the corresponding transformations.

Such code is difficult to maintain and reuse. If you want to read the MedCleanData dataset and store the same metadata elsewhere in the project, you will have to copy not only the code with the dataset transformations, but also the code for calculating the metadata separately.

Approach 2 — Tuples

This function should obviously be decomposed into several smaller functions, so that each separate function returns a pair from the dataset and the collected metadata.

By rewriting it, we get something like this:

We can already see an improvement. First, the main generateReport method is much more readable. You can see at a glance that there are three stages of report generation, that the result of each stage is an input parameter for the next, and that some metadata is collected as the data is processed.

Second, it’s composable. Every small function transformer (readMedCleanDataset or addCustomColumns) can be reused in other parts of the project.

However, it is also possible to improve this version. This code has one drawback: it does not follow the Single Responsibility principle. The point is that generateReport should deal solely with data transformations, but it creates the ReportMetadata object and for this purpose it keeps track of metadata types from each function it calls. If you change the ReportMetadata class, you will have to rewrite generateReport as well. This is unnecessary code logic that should be hidden here.

The idea is as follows: creating a ReportMetadata object is an obvious violation of Single Responsibility, because a change to ReportMetadata results in a change to generateReport. However, if generateReport would not create ReportMetadata, but only combined objects created by someone else, then a change in ReportMetadata would not result in a change to generateReport, since the combine mechanism would hide the internal structure of ReportMetadata.

To do this, we’ll first change the signatures of our transformer functions so that they all return (ReportMetadata, DataFrame).

For example:

This will be similar for the other two as well.

The main method looks like this, but it does not work:

ReportMetadata is not a collection to simply collect all the metadata into one big object using + or ++. However, you may notice that ReportMetadata consists of two fields of type Map that can be combined with each other.

In such a case, the Semigroup type class from the Cats library can help us. Its essence is that by defining an implementation of the function combine(x: A, y: A): A, we can use this very combine to combine two or more ReportMetadata into one big ReportMetadata by combining internal collections of the ReportMetadata class.

Let’s write combine implementation for ReportMetadata in the companion object.

|+| in the function above is a syntax from Cats available through the import of cats.syntax.semigroup._, which is essentially the same combine for Map[String, Long] prepared in advance by Cats developers. In this case, it can be replaced by ++ from the standard Scala library.

Next is the main function generateReport:

Note that generateReport does not create ReportMetadata; it only combines the created ones.

However, we can see that the combining of m1, m2, and m3 is rather trivial code, which interferes with the reading of the underlying data processing.

Approach 3 — Writer

In order to hide the trivial combining of m1, m2, and m3, let’s use the class Writer from Cats.

Writer[L, A] describes a computation over type A that produces a “statistic” or “log” of type L (possibly from the word Log, although not certain).

Writers can be chained via flatMap, and “statistics” of type L will be merged, provided that the implicit Semigroup[L] is defined for them.

Let’s rewrite the transformer functions so that they return Writer[ReportMetadata, DataFrame].

In the addCustomColumns function, we can finally get rid of the additional customColumnsErrors map, since we can use Writer itself to collect errors in ReportMetadata.

This looks like this:

The remaining two functions are trivially rewritten. You will need to wrap tuple in Writer:

And then the main method:

A few notes:

  • You can see that for each flatMap step, there is a combine of statistics — new and old — just as we manually did m1.combine(m2).combine(m3).
  • Writer is essentially a wrapper on tuple (by a pair of) run: (L, V). Therefore, at the end of the generateReport method to remove the Writer wrapper, it is necessary to make report.run, which will return an internal tuple of type (ReportMetadata, DataFrame).

For fans of special Cats characters, there is a short one-line version as well:

If we return from generateReport Writer[ReportMetadata, DataFrame], this is even shorter:

Let’s see what happens if we print out the Writer itself:

The code worked successfully and printed the following:

Conclusion

The result of the code is exactly the same as in the first variant (except for the randomly generated dataset).

The difference is that the generateReport method explicitly performs operations on the data, and the Writer method is responsible for combining the metrics.

This code is easy to read, you can see the main steps of data processing here, and those interested in implementation details and metadata collection can look into the transformer function.

It is convenient to reuse such code in other Spark applications, and not waste time researching ways to compute metadata for the same operations on data.

Each transformer function can be seen as a building block that can be combined to create different data processing pipelines, and useful metadata can be collected and combined in the process.

Links

Repository with all code examples — https://github.com/Jenya95/functional-elegance

--

--