Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] HoodieStreamer: Encountering ClassNotFoundException: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider after upgrading to the latest version #12838

Open
YousifS7 opened this issue Feb 13, 2025 · 1 comment

Comments

@YousifS7
Copy link

Hello,
We are using org.apache.hudi.utilities.streamer.HoodieStreamer class to extract data out of Kafka and write to Hudi table. The Kafka topic is populated via Debezium using SQL Server table. The converter used in Debezium is Avro. We are using EMR 7.6.0 to run Spark-Submit.
This works perfectly when using Hudi 0.15.0. However, after switching to Hudi 1.0.1, we started encountering this error:

java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider

To Reproduce

Steps to reproduce the behavior:

  1. Sync SQL Server table to Kafka via Debezium (AvroConverter)
  2. Provision EMR 7.6.0
  3. Run below Spark-Submit:

-- Spark-Submit

spark-submit 
--deploy-mode cluster 
--master yarn 
--conf spark.rdd.compress=true 
--conf spark.shuffle.service.enabled=true 
--conf spark.kryoserializer.buffer.max=512m 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.sql.shuffle.partitions=200 
--conf spark.default.parallelism=200 
--conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
--jars s3://some_bucket/libs/hudi-aws-bundle-1.0.1.jar,s3://some_bucket/libs/hudi-spark3.5-bundle_2.12-1.0.1.jar 
--class org.apache.hudi.utilities.streamer.HoodieStreamer s3://some_bucket/libs/hudi-utilities-slim-bundle_2.12-1.0.1.jar 
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider 
--props s3://path/to/file.properties 
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
--source-ordering-field ts 
--target-base-path s3://path/to/file
--target-table some_table 
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
--op UPSERT 
--transformer-class org.apache.hudi.utilities.transform.SqlFileBasedTransformer 
--enable-sync 
--table-type COPY_ON_WRITE

-- Properties File

hoodie.streamer.transformer.sql.file=s3://path/to/file.sql
hoodie.streamer.schemaprovider.registry.url=some_url
hoodie.streamer.schemaprovider.registry.targetUrl=some_url
schema.registry.url=some_ip:8081
hoodie.streamer.source.kafka.topic=some_topic
bootstrap.servers=some_ip:9092
auto.offset.reset=earliest
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.datasource.write.partitionpath.field=partition_path
hoodie.datasource.write.recordkey.field=hudi_key
hoodie.datasource.write.precombine.field=ts
hoodie.datasource.hive_sync.database=some_db_name
hoodie.streamer.kafka.source.maxEvents=1000000000000
hoodie.datasource.hive_sync.support_timestamp=true
hoodie.parquet.max.file.size=18874368
hoodie.copyonwrite.record.size.estimate=2048
hoodie.parquet.small.file.limit=104857600
hoodie.cleaner.fileversions.retained=5

-- SQL Transfomer File

CACHE TABLE dbz_filtered AS
SELECT CONCAT(source.commit_lsn, ':', ifnull(source.change_lsn, 0), ':', ifnull(source.event_serial_no, 0)) AS ts, CASE WHEN op = 'd' THEN before ELSE after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');

SELECT ts, is_deleted, source_fields.*, CONCAT(trim(source_fields.some_col1), source_fields.some_col2, latest) AS hudi_key, from_unixtime(source_fields.some_col2/1000, 'yyyyMM') AS partition_path FROM dbz_filtered;

Environment Description

  • Hudi version : 1.0.1

  • Spark version : 3.5.3

  • Hive version : Glue Metastore

  • Hadoop version : N/A

  • Storage : S3

  • Running on Docker? : No

Error Message

ERROR Client: Application diagnostics message: User class threw exception: java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider
	at org.apache.hudi.utilities.schema.SchemaRegistryProvider.lambda$new$1e9d4812$1(SchemaRegistryProvider.java:113)
	at org.apache.hudi.utilities.schema.SchemaRegistryProvider.fetchSchemaFromRegistry(SchemaRegistryProvider.java:173)
	at org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:141)
	at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:252)
	at org.apache.hudi.utilities.streamer.SourceFormatAdapter.avroDataInRowFormat(SourceFormatAdapter.java:212)
	at org.apache.hudi.utilities.streamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:238)
	at org.apache.hudi.utilities.streamer.StreamSync.fetchNextBatchFromSource(StreamSync.java:639)
	at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSourceAndPrepareRecords(StreamSync.java:582)
	at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:554)
	at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:464)
	at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:911)
	at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:101)
	at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:226)
	at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:646)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)
Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)

We are not using protobuf anywhere in the pipeline. Not sure why this version is complaining about it. If we switch back to 0.15.0 the error goes away. Any help would be appreciated.

Thank you

@rangareddy
Copy link
Contributor

Created upstream jira for this issue - https://issues.apache.org/jira/browse/HUDI-9057

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants