Skip to content

feat: kafka publisher default content type conversion media type#657

Merged
dgafka merged 2 commits intoecotoneframework:mainfrom
tandev:default-content-type-conversion-with-kafka
Apr 15, 2026
Merged

feat: kafka publisher default content type conversion media type#657
dgafka merged 2 commits intoecotoneframework:mainfrom
tandev:default-content-type-conversion-with-kafka

Conversation

@tandev
Copy link
Copy Markdown
Contributor

@tandev tandev commented Apr 13, 2026

Why is this change proposed?

See also #650

It should be possible to define the content type for publishing messages to kafka to allow a different format then serialized php like json

ecotone/kafka does not use the OutputDefaultConversionMediaType in the KafkaPublisherConfiguration for use with MessagePublisher::convertAndSend
Its not being passed through to the OutboundMessageConverter as a new instance was hardcoded in KafkaOutboundChannelAdapter

$this->outboundMessageConverter = new OutboundMessageConverter($headerMapper);

Description of Changes

Implemented OutputDefaultConversionMediaType propagation similiar to the implementations using enqueue

KafkaPublisherConfiguration::createWithDefaults(
    $topicName,
    MessagePublisher::class, 
    KafkaBrokerConfiguration::class, 
    MediaType::APPLICATION_JSON
),

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

@tandev
Copy link
Copy Markdown
Contributor Author

tandev commented Apr 13, 2026

Just might need a hint how to adjust the KafkaMessageChannelBuilder which leads to the header mapper error

Which has some notes:

    private string $headerMapper;
    /**
     * This is not passed to outboundChannelAdapter, as it's used in Kafka Module to declare Producer
     */
    private ?MediaType $conversionMediaType = null;

EnqueueMessageChannelBuilder looks a bit different here

@dgafka
Copy link
Copy Markdown
Member

dgafka commented Apr 14, 2026

The CONTENT_TYPE header on the convertAndSend / convertAndSendWithMetadata gateways should stay as MediaType::APPLICATION_X_PHP. This header describes the source format of the payload — "this is a PHP object coming in" — not the target format we want on the wire.

The conversion pipeline works in two stages:

  1. Gateway sets CONTENT_TYPE = APPLICATION_X_PHP → tells the system the incoming payload is a PHP object
  2. OutboundMessageConverter::prepare() reads $this->defaultConversionMediaType and converts from APPLICATION_X_PHP to the configured target (e.g. APPLICATION_JSON)

If we set the gateway header to APPLICATION_JSON, the converter would think the payload is already JSON and skip conversion — which breaks the purpose of convertAndSend.

You can verify this pattern is consistent across all publisher modules (SQS, AMQP, Redis, DBAL) — they all hardcode APPLICATION_X_PHP at the gateway level and let the OutboundMessageConverter handle the actual conversion.

So the gateway lines should be reverted, and the MediaType import kept. The real fix is what you're already doing — passing the media type to the builder via withDefaultConversionMediaType().

@dgafka
Copy link
Copy Markdown
Member

dgafka commented Apr 14, 2026

The $headerMapper property doesn't have a default value, so if withHeaderMapper() is never called before compile(), PHP will throw an uninitialized typed property error.

This can happen in the KafkaMessageChannelBuilder flow — it creates a KafkaOutboundChannelAdapterBuilder in its constructor (line 40) but never calls withHeaderMapper() on it.

Safest fix is to initialize it in the constructor:

private function __construct(
    private string $endpointId,
    private string $inputChannelName = ''
) {
    $this->headerMapper = DefaultHeaderMapper::createAllHeadersMapping();
}

This way the builder always has a safe default, matching what KafkaPublisherConfiguration::createWithDefaults() uses.

@dgafka
Copy link
Copy Markdown
Member

dgafka commented Apr 14, 2026

Regarding your question about KafkaMessageChannelBuilder — the comment in the code saying "This is not passed to outboundChannelAdapter" was describing the old situation where the OutboundMessageConverter was hardcoded inside KafkaOutboundChannelAdapter. Now that you've moved it to the builder, the channel builder should forward its settings:

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
    return new Definition(
        KafkaMessageChannel::class,
        [
            $this->inboundChannelAdapterBuilder->compile($builder),
            $this->outboundChannelAdapterBuilder
                ->withHeaderMapper($this->getHeaderMapper())
                ->withDefaultConversionMediaType($this->conversionMediaType?->toString())
                ->compile($builder),
        ]
    );
}

The KafkaMessageChannelBuilder already holds both $headerMapper and $conversionMediaType — they just need to be forwarded to the outbound builder before compilation. That comment can then be removed since it no longer applies.

Also, withDefaultConversionMediaType should handle null gracefully since message channels may not have a conversion type configured:

public function withDefaultConversionMediaType(?string $mediaType): self
{
    $this->defaultConversionMediaType = $mediaType
        ? MediaType::parseMediaType($mediaType)
        : null;
    return $this;
}

@tandev tandev force-pushed the default-content-type-conversion-with-kafka branch from 4237595 to 27c8627 Compare April 15, 2026 13:31
@tandev
Copy link
Copy Markdown
Contributor Author

tandev commented Apr 15, 2026

Incroporated the feedback.

Regarding convertAndSend, that totally makes sense to me, misunderstood that indeed, thx :)

@dgafka dgafka merged commit f3a024a into ecotoneframework:main Apr 15, 2026
10 of 14 checks passed
@dgafka
Copy link
Copy Markdown
Member

dgafka commented Apr 15, 2026

Let's release :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants