You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Producer throws error message Broker: Message size too large when the event payload size is less than the max max.message.bytes configuration on the topic
#2214
Open
8 tasks done
vikasillumina opened this issue
Apr 27, 2024
· 0 comments
Producer throws error messageBroker: Message size too large when the event payload size is less than the max.message.bytes configuration on the topic
How to reproduce
We have not been able to reproduce this issue and from the investigation we have concluded that the event payload we are trying to publish to Kafka is no where near the max.message.bytes configuration on the topic.
But here is the sample code:
public virtual async Task<DeliveryResult<string, byte[]>[]> PublishMessagesAsync(IEnumerable<Message> messages,
Action<MessageDelivery<string, byte[]>> onDelivery,
CancellationToken cancellationToken = default(CancellationToken))
{
var tasks = new List<Task<DeliveryResult<string, byte[]>>>();
foreach (var m in messages)
{
if (m == null)
{
continue;
}
Task<DeliveryResult<string, byte[]>> task = null;
var contentJson = JsonConvert.SerializeObject(m.ContentObject, new JsonSerializerSettings
{
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
NullValueHandling = NullValueHandling.Ignore,
ContractResolver = new Newtonsoft.Json.Serialization.DefaultContractResolver
{
NamingStrategy = new Newtonsoft.Json.Serialization.CamelCaseNamingStrategy()
}
});
var contentBytes = Encoding.Default.GetBytes(contentJson);
if (m.ContentBytes != null && m.ContentBytes.Length > 0)
{
task = PublishRawAsync(m.KafkaMessageMetadata?.Topic, m.ContentBytes, m.Headers, m.KafkaMessageMetadata?.Key, cancellationToken);
}
else
{
throw new ArgumentException("message has no content populated");
}
if (onDelivery != null)
{
task = task.ContinueWith(async continuation =>
{
var deliveryReport = await continuation;
onDelivery(new MessageDelivery<string, byte[]>(m, deliveryReport));
return deliveryReport;
}, cancellationToken).Unwrap();
}
tasks.Add(task);
}
return await Task.WhenAll(tasks);
}
public async Task<DeliveryResult<string, byte[]>> PublishRawAsync(string topic, byte[] content, HeadersWrapper messageHeaders, string messageKey = null, CancellationToken cancellationToken = default(CancellationToken))
{
if (string.IsNullOrEmpty(topic))
{
throw new ArgumentNullException(nameof(topic), "Topic must be provided");
}
if (content == null || content.Length == 0)
{
throw new ArgumentException("content is null or empty", nameof(content));
}
if (string.IsNullOrEmpty(messageHeaders.producedby))
{
messageHeaders.producedby = _serviceName;
}
try
{
var message = new Message<string, byte[]>()
{
Value = content,
Headers = CopyToKafkaHeaders(messageHeaders),
Key = messageKey,
Timestamp = new Timestamp(DateTime.UtcNow)
};
var dr = await _producer.ProduceAsync(topic, message);
return dr;
}
catch(Exception ex)
{
_logger.Error(ex, $"An error occured publishing event to topic {topic}.");
throw;
}
}
CLIENT CONFIGURATION
Producer configuration:
bootstrap.servers = AWS broker url (3 brokers across 3 AZ)
client.id = service name
message.max.bytes = 4194304 (4MB) (its a common library setting, however the topic itself has 1MB limit on the message.max.bytes, please see below screenshot under Topic configuration
Topic configuration:
OPERATING SYSTEM:
Application runs in docker container hosted as Kubernetes pod inside Linux OS, below is more specifics of the OS
PRETTY_NAME="Debian GNU/Linux 11 (bullseye)"
NAME="Debian GNU/Linux"
VERSION_ID="11"
VERSION="11 (bullseye)"
VERSION_CODENAME=bullseye
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
5.10.205-195.807.amzn2.x86_64
PRODUCER LOGS:
Broker: Message size too large at Confluent.Kafka.Producer2.ProduceAsync(TopicPartition topicPartition, Message2 message, CancellationToken cancellationToken)
This is the only error message we could get from the SDK.
Our application doesn't batch the events, it is only sending 1 event at a time via async Tasks. We do submit a lot of these tasks in parallel if that matters.
I looked through the SDK code but couldn't find more details that could explain this error other than the size.
I searched through internet and didn't find any obvious suggestions/solutions.
Please let me know if there is anything else I can provide.
Thanks for your help in advance.
Checklist
Please provide the following information:
A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. N/A as there is no way to reproduce this issue on demand.
Confluent.Kafka nuget version - 1.8.2
Apache Kafka version - 2.7.0 (AWS MSK)
Client configuration - See above Producer and Topic Configurations
Operating system - See above
Provide logs (with "debug" : "..." as necessary in configuration)- See above
Provide broker log excerpts- AWS confirmed when we saw the error messages on publishing there were no errors in broker log
Critical issue - Its not happening all the time but unfortunately we are seeing this in our production environment.
The text was updated successfully, but these errors were encountered:
vikasillumina
changed the title
Producer throws error Broker: Message size too large message the event payload size is less than the max max.message.bytes configuration on the topic
Producer throws error message Broker: Message size too large when the event payload size is less than the max max.message.bytes configuration on the topic
Apr 27, 2024
Description
Producer throws error message
Broker: Message size too large
when the event payload size is less than themax.message.bytes
configuration on the topicHow to reproduce
We have not been able to reproduce this issue and from the investigation we have concluded that the event payload we are trying to publish to Kafka is no where near the
max.message.bytes
configuration on the topic.But here is the sample code:
CLIENT CONFIGURATION
Producer configuration:
bootstrap.servers = AWS broker url (3 brokers across 3 AZ)
client.id = service name
message.max.bytes = 4194304 (4MB) (its a common library setting, however the topic itself has 1MB limit on the message.max.bytes, please see below screenshot under Topic configuration
Topic configuration:
OPERATING SYSTEM:
Application runs in docker container hosted as Kubernetes pod inside Linux OS, below is more specifics of the OS
PRETTY_NAME="Debian GNU/Linux 11 (bullseye)"
NAME="Debian GNU/Linux"
VERSION_ID="11"
VERSION="11 (bullseye)"
VERSION_CODENAME=bullseye
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
5.10.205-195.807.amzn2.x86_64
PRODUCER LOGS:
Broker: Message size too large at Confluent.Kafka.Producer
2.ProduceAsync(TopicPartition topicPartition, Message
2 message, CancellationToken cancellationToken)This is the only error message we could get from the SDK.
Our application doesn't batch the events, it is only sending 1 event at a time via async Tasks. We do submit a lot of these tasks in parallel if that matters.
I looked through the SDK code but couldn't find more details that could explain this error other than the size.
I searched through internet and didn't find any obvious suggestions/solutions.
Please let me know if there is anything else I can provide.
Thanks for your help in advance.
Checklist
Please provide the following information:
The text was updated successfully, but these errors were encountered: