Getting Started with Apache Spark, Apache Kafka and Apache Iceberg

Apache Spark is a unified and distributed analytics tool that is supported by all of the Lakehouses and common streaming systems. Apache Spark has it roots all the way back with Apache Mesos as the first schedule for that platform. Spark now supports YARN (Hadoop), Kubernetes and Standalone Mode for running distributed parallel processing for both streaming and batch data analytics and machine learning. I have used Spark a few times professionally and it went well each time.

Apache Kafka is a highly available distributed publish and subscription system that allows us to stream data from disparate systems. It started as a project from LinkedIn and has grown into a powerhouse of a technology and has changed the behavior of the industry in how we have moved away from batch and into streaming. There have been other technologies that have emerged and now sit as equals imho but Kafka paved the way and in any case we are going to use it here because I have used it dozens of times in production for 12 years now or so and I know what I am getting myself into.

Apache Iceberg is a table format for data analytics. It comes from Netflix Engineering and provides some nice features. The Lakehouse architecture has two other options but I am pulling in Iceberg because of its support for partition evolution. The Lakehouse architecture regardless of which technology you choose I believe to be a solid foundation for moving machine learning more into the day to day operation and long term history of business operations. A lot of it also comes down to which vendor you might already have chosen for other technologies and now is supporting Lakehouse and you continue with them or decide to use one vendor or another over the open source only approach.

For our efforts here we are going to be using Python but you could also use Java or Scala. I am going to be sprinkling in some SQL too. Spark also supports SQL DDL & DML queries.

You can get the code for this post from here.

First up, install spark. You can download the binaries from the website and then set in your PATH adding SPARK_HOME to be where you unzip the download too.

You will also need Docker installed.

Next lets make sure you have everything python related installed. Please also make sure you have python3.7 or later.

pip3 install -r requirements.txt

Now create an AWS bucket and lets say you called it “cat” for the rest of this blog post. If you call it something else then when you see “cat” below change it with what you used for your bucket name.

Great, now lets get into a couple of terminals.

First in “Terminal A” run:

docker-compose up

That will run Apache Kafka from Confluent and the Confluent Schema Registry. This comes from their quick start trimmed down to just those two components. The Schema Registry is to hold the Apache Avro schema.

Apache Avro is a serialization system for multi language support for schema of data sets. This means you can work with classes of one language and serialize them through avro and over Kafka and read them in another language. Also nice & fast.

Ok, now lets open up “Terminal B” which will be our “producer terminal” where we send data through. Run our send command

./send.sh

Time to open up “Terminal C” which will be our pyspark session. Now create the Iceberg table.

./create-table.sh cat getting_started_table

Now, in the same terminal run.

./kafka-to-iceberg.sh cat getting_started_table

That will continuously run streaming from Kafka and into the Iceberg table so each time you “send” something to Kafka it will write it to Iceberg for you.

Ok, now in “Terminal D” open up a prompt and lets get into pyspark and see what is going on.

./mypy.sh

After the shell is up paste in the following code which you can also find in queries.py.

from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = (
    SparkConf()
        .setAppName('app_name')
  		#packages
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0,software.amazon.awssdk:bundle:2.18.31,software.amazon.awssdk:url-connection-client:2.18.31')
  		#Configuring Catalog
        .set('spark.sql.catalog.icebergcat', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.icebergcat.type', 'hadoop')
        .set('spark.sql.catalog.icebergcat.warehouse', 's3a://anothertester2/iceberg')
        #configure S3
        .set('spark.sql.catalog.icebergcat.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
        .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
)
    
spark = SparkSession.builder.config(conf=conf).getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")

spark.sql("select * from icebergcat.getting_started_table").show()

And voila, you have data in an Apache Iceberg table from an Apache Spark sink that was originally sent into Apache Kafka with an Apache Avro schema of a Python dict.

Thanx =8^) Joe Stein

http://www.twitter.com/charmalloc

http://www.linkedin/in/charmalloc

Leave a Reply

Discover more from Hello BitsNBytes World

Subscribe now to keep reading and get access to the full archive.

Continue reading