feat: kafka publisher default content type conversion media type#657
Conversation
|
Just might need a hint how to adjust the Which has some notes: private string $headerMapper;
/**
* This is not passed to outboundChannelAdapter, as it's used in Kafka Module to declare Producer
*/
private ?MediaType $conversionMediaType = null;
|
|
The The conversion pipeline works in two stages:
If we set the gateway header to You can verify this pattern is consistent across all publisher modules (SQS, AMQP, Redis, DBAL) — they all hardcode So the gateway lines should be reverted, and the |
|
The This can happen in the Safest fix is to initialize it in the constructor: private function __construct(
private string $endpointId,
private string $inputChannelName = ''
) {
$this->headerMapper = DefaultHeaderMapper::createAllHeadersMapping();
}This way the builder always has a safe default, matching what |
|
Regarding your question about public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
return new Definition(
KafkaMessageChannel::class,
[
$this->inboundChannelAdapterBuilder->compile($builder),
$this->outboundChannelAdapterBuilder
->withHeaderMapper($this->getHeaderMapper())
->withDefaultConversionMediaType($this->conversionMediaType?->toString())
->compile($builder),
]
);
}The Also, public function withDefaultConversionMediaType(?string $mediaType): self
{
$this->defaultConversionMediaType = $mediaType
? MediaType::parseMediaType($mediaType)
: null;
return $this;
} |
4237595 to
27c8627
Compare
|
Incroporated the feedback. Regarding convertAndSend, that totally makes sense to me, misunderstood that indeed, thx :) |
|
Let's release :) |
Why is this change proposed?
See also #650
It should be possible to define the content type for publishing messages to kafka to allow a different format then serialized php like json
ecotone/kafka does not use the OutputDefaultConversionMediaType in the KafkaPublisherConfiguration for use with MessagePublisher::convertAndSend
Its not being passed through to the OutboundMessageConverter as a new instance was hardcoded in KafkaOutboundChannelAdapter
$this->outboundMessageConverter = new OutboundMessageConverter($headerMapper);Description of Changes
Implemented OutputDefaultConversionMediaType propagation similiar to the implementations using enqueue
Pull Request Contribution Terms