Kotlin and Spark, lets make it work.
Recently, I was thinking about something new I could learn, and I ended up with two options. The first was to try working with Apache Spark in the Kotlin language. I have worked with both of those individually before, but I was curious to see how well they would work together. The other topic was to dive into Apache Airflow, figure out how that works, and how I might be able to use it in the future. Then I thought to myself, why not both? So I set about writing a simple Spark job in Kotlin, and running it through Airflow running in a Docker container, for no particular reason other than to find out how well it works. Let’s find out, shall we? First off, let’s take a look at some of the components.
Apache Airflow is a tool for creating and managing workflows. You lay out tasks in a graph structure, and Airflow offers a web interface to schedule, run and monitor them, as well as providing analytics, logs and administrative features. It’s written in pure Python, and users write their tasks and graphs in Python as well. Airflow comes with a suite of ready-to-use operators with which we can specify our tasks. These focus mainly around moving data between stores, run SQL, or send messages to external systems. If you need custom functionality, there are operators with which you can run arbitrary Python or Bash code as well, so you can do pretty much anything. Another interesting feature is its plugin system, which allows users to, among other things, write their own operators. Now, running a JAR containing the little Spark job we’ll be building would be perfectly possible using the Bash operator. But let’s go ahead and write a custom JVM operator instead, just for the fun of it.
Finally, we’ll need some data. I downloaded some (~1M) book reviews from a public Amazon dataset and loaded them into a PostgreSQL database.
Ok, time to actually get started. First off, let’s get that Spark job written. Like any other sane developer, the first thing I did was to see whether someone else has already done the work for me. And would you look at that, JetBrains did. That project is still very much in beta, however, and far from production-ready. It’s a starting point, though, and offers some convenient shortcuts and integrations.
We’ll work with three (direct) dependencies: Spark, the PostgreSQL JDBC driver, and that JetBrains API. With these, it’s very simple to set up a basic Spark job. This one just reads a table from a local database and prints out its schema:
That’s it, no need for any bells or whistles. The withSpark function in JetBrains’ API handles the setup/teardown boilerplate. So this is looking pretty good so far, but what if we want something just slightly more complex than printing schemas?
The dataset contains 1–5 star book reviews, as previously mentioned. On Amazon, however, users can vote on whether a given review was helpful to them or not. Let’s assume that these votes actually indicate more accurate reviews, and have our Spark job compute a new score for each book which weights the ratings given by reviews by the ‘helpfulness’ of the reviews themselves. We’ll get the boring stuff out of the way first:
Nothing remarkable going on here. We’re just connecting to the same database as before and getting a table. The only things to note are that, since we’re actually going to be doing some work this time, we’re giving Spark a little more memory to work with. Now for the interesting bit. We need to do three things: Filter out reviews which have no helpfulness votes, weigh the ratings given by remaining reviews by their helpfulness, and then compute the average weighted rating per book:
The main problem I ran in to here is that while Scala and Kotlin both support operator overloading, they don’t quite work the same under the hood. For example, the >= comparator on columns, defined in Scala, gets compiled to a function called “$greater$eq”, which Kotlin does not recognize as an operator. It still lets you use the function, though you don’t get to use the actual >= symbol or infix notation. The same is true for other operators, so I used the .times() and .div() functions instead.
Similarly, both Scala and Kotlin allow programmers to overload the () operator. Spark uses this notation on its DataFrames to provide access to individual columns, which is a very convenient shorthand for a common task. Unfortunately, again, this does not map to Kotlin automatically. Scala calls the function implementing this overloaded operator ‘apply’, while Kotlin calls it ‘invoke’. Same thing, different name. All of these issues are easily fixed with a few lines of boilerplate, however:
This effectively maps the Scala operators to Kotlin ones, so now we can use * and / instead of .times() and .div(), for example. The only difference is that Kotlin does not let you change the return type of comparators, so >= must always return a boolean. The Spark version returns a Column, however, so we won’t be able to use an actual symbolic operator in this case in Kotlin. Instead, greaterThan is written as the next best thing: an infix function. Now we can rewrite the code from before in a slightly more readable fashion:
It’s only a small difference, of course, and certainly for such a small program, but I think that over time these small differences will make the whole process just that little bit nicer to work with.
Alright, that’s all we need for our Spark job. All that’s left is to pack it up in a JAR and move on to Airflow.
As I mentioned before, we’ll be creating a custom operator to run our JAR for us. This is actually quite easy. You just need to create a class which extends BaseOperator, and implement its execute() function. I will admit that I may have gone a little overboard with the number of parameters here:
I won’t bore you with the implementation of the constructor itself, since it basically just sets the fields the same way any other Python class does. Let’s instead take a look at the execute method:
Again, more complex than it needs to be. Sue me. The gist of it is that it builds up a list of command line arguments and passes them to the standard subprocess library to execute the JAR as a separate process.
Now, all that’s left is to write the Airflow job. To do this, Airflow wants you to lay out a directed acyclic graph (or DAG) connecting your tasks together. Really, we only have a single task in this case, but I’ll just add a little echo at the end just for fun:
We just define some default arguments and instantiate the DAG and the two operators, simple as that. With that out of the way, we can now run Airflow and have it execute the DAG.
I chose to use Docker to run airflow, using a Dockerfile created by GitHub user puckel. Airflow have since released their own Dockerfile, but it seems to be significantly less popular than the “unofficial” version.
There is one small problem: We want to run a JVM application, but the Docker container built by that Dockerfile comes without any JVM installed. No matter, we just need to write up a quick extension:
Problem solved. Now, we can run Airflow and be greeted by its shiny web interface:
And it even works, which is always nice to see:
And finally, a quick look at the database confirms that the new table was indeed created as expected:
Is it a good idea to write Spark jobs in Kotlin? Probably not. It’s clearly possible, but the costs of the incompatibilities outweigh the benefits of using my favorite language, in my opinion. I didn’t even get into the worst example I’ve seen, which is that while both Scala and Kotlin use the concept of a “companion object” to replace Java’s static members, Kotlin can’t actually use the Scala implementations of such objects without significant hassle. That alone would be pretty much a dealbreaker, unfortunately. So, in the future, I’ll just stick to Scala or Python for working in Spark instead.
In retrospect, I only explored some very basic aspect of Airflow here. There’s a lot more going on, though, and I am excited to dig into it further to see what else it can do. I definitely like how easy it is to work with, however, once you get past a few initial quirks.
This concludes my rambling report on taking two ideas that don’t really fit together and smashing them together anyway. I hope you enjoyed it.