Apache Hudi with AWS EMR 7.2.0, AWS S3
4 min readAug 13, 2024
As Aug 13, 2024, the AWS Apache Hudi pages have not been updated with instructions on working with AWS EMR 7.2.0.
Here’s what worked for me.
- Create a cluster using the instructions at https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-installation-and-configuration.html. Leave the default with the exception being:choose emr-7.2.0 and unselect Presto and check “use for Hive table metadata” and check “use for Spark table metadata”. You will also be forced to create an Amazon EMR service role and EC2 instance profile. Select “create” for both (it created AmazonEMR-ServiceRole-20240813T103441 and AmazonEMR-InstanceProfile-20240813T103425)
- Go into AWS IAM Roles and modify the AmazonEMR-InstanceProfile to include AWSGlueServiceRole.
- Create a “marketing” database in AWS Glue.
- Next, the login to the master EMR node, sudo into root and then run (note that the orginal command was missing the S3 libraries)
spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hudi/hudi-aws-bundle.jar --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" —-conf “spark.sql.catalog.spark_catalog.defaultDatabase=marketing”
4. Run the Hudi code
code
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
// Create a DataFrame
val inputDF = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
).toDF("id", "creation_date", "last_update_time")
//Specify common DataSourceWriteOptions in the single hudiOptions variable
val hudiOptions = Map[String,String](
HoodieWriteConfig.TBL_NAME.key -> "account",
DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "account",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
HoodieSyncConfig.META_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms",
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "account",
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date"
)
// Write the DataFrame as a Hudi dataset
(inputDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert")
.mode(SaveMode.Overwrite)
.save("s3a://oh-project-sunnyvalelabs-observed/marketing/"))
output
[root@ip-172-31-46-115 hudi]# spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hudi/hudi-aws-bundle.jar \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \
--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Aug 13, 2024 8:39:17 PM org.apache.spark.launcher.Log4jHotPatchOption staticJavaAgentOption
WARNING: spark.log4jHotPatch.enabled is set to true, but /usr/share/log4j-cve-2021-44228-hotpatch/jdk17/Log4jHotPatchFat.jar does not exist at the configured location
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/13 20:39:26 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://ip-172-31-46-115.us-west-2.compute.internal:4040
Spark context available as 'sc' (master = yarn, app id = application_1723581336609_0002).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.1-amzn-0
/_/
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 17.0.11)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions
scala> import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceReadOptions
scala> import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig
scala> import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.MultiPartKeysValueExtractor
scala> import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.hive.HiveSyncConfig
scala> import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
scala> // Create a DataFrame
scala> val inputDF = Seq(
| ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
| ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
| ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
| ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
| ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
| ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
| ).toDF("id", "creation_date", "last_update_time")
inputDF: org.apache.spark.sql.DataFrame = [id: string, creation_date: string ... 1 more field]
scala>
scala> //Specify common DataSourceWriteOptions in the single hudiOptions variable
scala> val hudiOptions = Map[String,String](
| HoodieWriteConfig.TBL_NAME.key -> "account",
| DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
| DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
| DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
| DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
| DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
| DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "account",
| DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
| HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
| HoodieSyncConfig.META_SYNC_ENABLED.key -> "true",
| HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms",
| HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "account",
| HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date"
| )
hudiOptions: scala.collection.immutable.Map[String,String] = Map(hoodie.datasource.hive_sync.mode -> hms, hoodie.datasource.write.precombine.field -> last_update_time, hoodie.datasource.hive_sync.partition_fields -> creation_date, hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.MultiPartKeysValueExtractor, hoodie.datasource.meta.sync.enable -> true, hoodie.datasource.hive_sync.table -> account, hoodie.datasource.hive_sync.enable -> true, hoodie.datasource.write.recordkey.field -> id, hoodie.table.name -> account, hoodie.datasource.write.table.type -> COPY_ON_WRITE, hoodie.datasource.write.partitionpath.field -> creation_date)
scala>
scala> // Write the DataFrame as a Hudi dataset
scala> (inputDF.write
| .format("hudi")
| .options(hudiOptions)
| .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert")
| .mode(SaveMode.Overwrite)
| .save("s3a://oh-project-sunnyvalelabs-observed/marketing/"))
24/08/13 20:42:01 WARN HoodieSparkSqlWriterInternal: hoodie table at s3a://oh-project-sunnyvalelabs-observed/marketing already exists. Deleting existing data & overwriting with new data.
24/08/13 20:42:02 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
24/08/13 20:42:33 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
24/08/13 20:42:36 WARN HoodieSparkSqlWriterInternal: Closing write client
Other errors: