Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested json with JSONSchema value converter DataException: Invalid null value for required STRUCT field #3101

Open
timonviola opened this issue Apr 25, 2024 · 3 comments

Comments

@timonviola
Copy link

I am trying to send nested JSON messages and sink them using Kafka connect S3sink to parque.

Whenever I nest the object too much the connector gives me the error message below.

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid null value for required STRUCT field
    at io.confluent.connect.json.JsonSchemaData.toConnectData(JsonSchemaData.java:511)
    at io.confluent.connect.json.JsonSchemaData.lambda$static$11(JsonSchemaData.java:192)
    at io.confluent.connect.json.JsonSchemaData.toConnectData(JsonSchemaData.java:561)
    at io.confluent.connect.json.JsonSchemaData.lambda$static$11(JsonSchemaData.java:192)
    at io.confluent.connect.json.JsonSchemaData.toConnectData(JsonSchemaData.java:561)
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more

To trigger the error I start a producer:

docker compose exec schema-registry kafka-json-schema-console-producer --bootstrap-server localhost:9092  --property schema.registry.url=http://localhost:8081 --topic foo \
--property value.schema='{"type": "object", "properties": { "level0": { "type": "object", "properties": { "level1": { "type": "object", "properties": { "level2": { "type": "integer" } } } } } }, "additionalProperties": false}'

I send this message:
{"level0": {"k": {"i": 2}}}

My connector config:

{
    "store.url": "http://localstack:4566",
    "aws.access.key.id": "",
    "aws.secret.access.key": "",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "flush.size": 1,
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "locale": "en-US",
    "name": "s3-sink",
    "parquet.codec": "snappy",
    "partition.duration.ms": 3600000,
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "rotate.interval.ms": 600000,
    "s3.bucket.name": "my-bucket",
    "s3.region": "eu-central-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "tasks.max": 1,
    "timestamp.extractor": "Record",
    "timestamp.field": "time",
    "timezone": "UTC",
    "topics": "foo",
    "topics.dir": "topics",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
}

I can do the nesting with AVRO without any problems.

@rayokota
Copy link
Member

Shouldn't you be sending {"level0": {"level1": {"level2": 2}}} ?

@timonviola
Copy link
Author

You are right. The the error message I gave is coming, because I am missing the defined object. My real problem - and the error I got stuck on is when I am sending the message you also suggested is Error: Can't redefine: io.confluent.connect.avro.ConnectDefault:

[2024-04-26 06:09:58,625] INFO Kafka startTimeMs: 1714111798620 (org.apache.kafka.common.utils.AppInfoParser)
[2024-04-26 06:09:58,938] INFO [Producer clientId=console-producer] Cluster ID: esaSwPz2R1GpJN7R-ALcIA (org.apache.kafka.clients.Metadata)
{"level0": {"level1": {"level2": 2}}}
 INFO Opening record writer for: topics/foo/partition=0/foo+0+0000000000.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider)
2024-04-26 08:10:39 [2024-04-26 06:10:39,392] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Can't redefine: io.confluent.connect.avro.ConnectDefault (org.apache.kafka.connect.runtime.WorkerSinkTask)
2024-04-26 08:10:39 org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault
2024-04-26 08:10:39     at org.apache.avro.Schema$Names.put(Schema.java:1511)
2024-04-26 08:10:39     at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
2024-04-26 08:10:39     at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
2024-04-26 08:10:39     at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
2024-04-26 08:10:39     at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
2024-04-26 08:10:39     at org.apache.avro.Schema.toString(Schema.java:396)
2024-04-26 08:10:39     at org.apache.avro.Schema.toString(Schema.java:382)
2024-04-26 08:10:39     at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137)
2024-04-26 08:10:39     at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:277)
2024-04-26 08:10:39     at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
2024-04-26 08:10:39     at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
2024-04-26 08:10:39     at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:549)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
2024-04-26 08:10:39     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2024-04-26 08:10:39     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-04-26 08:10:39     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2024-04-26 08:10:39     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2024-04-26 08:10:39     at java.lang.Thread.run(Thread.java:748)
2024-04-26 08:10:39 [2024-04-26 06:10:39,395] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
2024-04-26 08:10:39 org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:571)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
2024-04-26 08:10:39     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2024-04-26 08:10:39     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2024-04-26 08:10:39     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2024-04-26 08:10:39     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2024-04-26 08:10:39     at java.lang.Thread.run(Thread.java:748)
2024-04-26 08:10:39     Suppressed: java.lang.NullPointerException
2024-04-26 08:10:39             at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97)
2024-04-26 08:10:39             at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:314)
2024-04-26 08:10:39             at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249)
2024-04-26 08:10:39             at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:404)
2024-04-26 08:10:39             at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:601)
2024-04-26 08:10:39             at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:205)
2024-04-26 08:10:39             ... 7 more
2024-04-26 08:10:39 Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault
2024-04-26 08:10:39     at org.apache.avro.Schema$Names.put(Schema.java:1511)
2024-04-26 08:10:39     at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
2024-04-26 08:10:39     at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
2024-04-26 08:10:39     at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
2024-04-26 08:10:39     at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
2024-04-26 08:10:39     at org.apache.avro.Schema.toString(Schema.java:396)
2024-04-26 08:10:39     at org.apache.avro.Schema.toString(Schema.java:382)
2024-04-26 08:10:39     at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137)
2024-04-26 08:10:39     at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:277)
2024-04-26 08:10:39     at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
2024-04-26 08:10:39     at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
2024-04-26 08:10:39     at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
2024-04-26 08:10:39     at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
2024-04-26 08:10:39     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:549)
2024-04-26 08:10:39     ... 10 more
2024-04-26 08:10:39 [2024-04-26 06:10:39,395] ERROR WorkerSinkTask{id=s3-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

@timonviola
Copy link
Author

@rayokota Thanks for chipping in earlier.
To be absolutely clear this is about KafkaConnect failing with the config I shared in my initial description of the issue.

I solved the org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault by adding title fields to my JSON Schema:

{
  "type": "object",
  "title": "titleA",
  "properties": {
    "level0": {
      "type": "object",
      "title": "titleB",
      "properties": {
        "level1": {
          "type": "object",
          "properties": {
            "level2": {
              "type": "integer"
            }
          }
        }
      }
    }
  },
  "additionalProperties": false
}

As soon as I remove any of the title fields I get the exception again. At this point I am guessing, that under the hood, the title field is used to get the namespace. I have also looked at https://json-schema.org/draft/2020-12/json-schema-validation#name-title-and-description, but it seems to me that "title" is not mandatory for JSON Schema per se.

Can you point me to the direction, why this is happening or where can I learn more about mandatory JSON fields for connectors?

I can also close the issue, as I found "a solution"TM, but ideally I would also know "why".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants