Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions packages/Kafka/src/Channel/KafkaMessageChannelBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ final class KafkaMessageChannelBuilder implements MessageChannelWithSerializatio
private KafkaInboundChannelAdapterBuilder $inboundChannelAdapterBuilder;
private KafkaOutboundChannelAdapterBuilder $outboundChannelAdapterBuilder;
private string $headerMapper;
/**
* This is not passed to outboundChannelAdapter, as it's used in Kafka Module to declare Producer
*/
private ?MediaType $conversionMediaType = null;

private function __construct(
Expand All @@ -50,6 +47,8 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
$this->inboundChannelAdapterBuilder
->compile($builder),
$this->outboundChannelAdapterBuilder
->withHeaderMapper($this->getHeaderMapper())
->withDefaultConversionMediaType($this->conversionMediaType?->toString())
->compile($builder),
]
);
Expand Down
4 changes: 4 additions & 0 deletions packages/Kafka/src/Configuration/KafkaModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public function getModulePackageName(): string

private function registerMessagePublisher(Configuration $messagingConfiguration, KafkaPublisherConfiguration $extensionObject, ServiceConfiguration $applicationConfiguration): void
{
$mediaType = $extensionObject->getOutputDefaultConversionMediaType() ?? $applicationConfiguration->getDefaultSerializationMediaType();

$messagingConfiguration
->registerGatewayBuilder(
GatewayProxyBuilder::create($extensionObject->getReferenceName(), MessagePublisher::class, 'send', $extensionObject->getReferenceName())
Expand Down Expand Up @@ -219,6 +221,8 @@ private function registerMessagePublisher(Configuration $messagingConfiguration,
$this->getPublisherEndpointId($extensionObject->getReferenceName())
)
->withInputChannelName($extensionObject->getReferenceName())
->withHeaderMapper($extensionObject->getHeaderMapper())
->withDefaultConversionMediaType($mediaType)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function __construct(
) {
}

public static function createWithDefaults(string $topicName = '', string $referenceName = MessagePublisher::class, string $brokerConfigurationReference = KafkaBrokerConfiguration::class): self
public static function createWithDefaults(string $topicName = '', string $referenceName = MessagePublisher::class, string $brokerConfigurationReference = KafkaBrokerConfiguration::class, ?string $outputDefaultConversionMediaType = null): self
{
return new self(
$topicName,
Expand All @@ -60,7 +60,8 @@ public static function createWithDefaults(string $topicName = '', string $refere
'retry.backoff.ms' => '300',
],
$brokerConfigurationReference,
DefaultHeaderMapper::createAllHeadersMapping()
DefaultHeaderMapper::createAllHeadersMapping(),
$outputDefaultConversionMediaType
);
}

Expand Down
8 changes: 2 additions & 6 deletions packages/Kafka/src/Outbound/KafkaOutboundChannelAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@
*/
final class KafkaOutboundChannelAdapter implements MessageHandler
{
private OutboundMessageConverter $outboundMessageConverter;

public function __construct(
private string $referenceName,
private KafkaAdmin $kafkaAdmin,
private ConversionService $conversionService
private ConversionService $conversionService,
private OutboundMessageConverter $outboundMessageConverter
) {
$headerMapper = $kafkaAdmin->getConfigurationForPublisher($referenceName)->getHeaderMapper();

$this->outboundMessageConverter = new OutboundMessageConverter($headerMapper);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,30 @@
namespace Ecotone\Kafka\Outbound;

use Ecotone\Kafka\Configuration\KafkaAdmin;
use Ecotone\Messaging\Channel\PollableChannel\Serialization\OutboundMessageConverter;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\MessageHandlerBuilder;
use Ecotone\Messaging\MessageConverter\DefaultHeaderMapper;
use Ecotone\Messaging\MessageConverter\HeaderMapper;

/**
* licence Enterprise
*/
class KafkaOutboundChannelAdapterBuilder implements MessageHandlerBuilder
{
private ?MediaType $defaultConversionMediaType = null;

private HeaderMapper $headerMapper;

private function __construct(
private string $endpointId,
private string $inputChannelName = ''
) {

$this->headerMapper = DefaultHeaderMapper::createAllHeadersMapping();
}

public static function create(string $endpointId): self
Expand Down Expand Up @@ -52,12 +60,45 @@ public function getInputMessageChannelName(): string
return $this->inputChannelName;
}

public function withDefaultConversionMediaType(?string $mediaType): self
{
$this->defaultConversionMediaType = $mediaType
? MediaType::parseMediaType($mediaType)
: null
;

return $this;
}

public function getDefaultConversionMediaType(): ?MediaType
{
return $this->defaultConversionMediaType;
}

public function withHeaderMapper(HeaderMapper $headerMapper): self
{
$this->headerMapper = $headerMapper;

return $this;
}

public function getHeaderMapper(): HeaderMapper
{
return $this->headerMapper;
}

public function compile(MessagingContainerBuilder $builder): Definition
{
$outboundMessageConverter = new Definition(OutboundMessageConverter::class, [
$this->headerMapper,
$this->defaultConversionMediaType,
]);

return new Definition(KafkaOutboundChannelAdapter::class, [
$this->endpointId,
new Reference(KafkaAdmin::class),
new Reference(ConversionService::REFERENCE_NAME),
$outboundMessageConverter,
]);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\Kafka\Fixture\MediaTypeConverter;

use Ecotone\Messaging\Attribute\MediaTypeConverter;
use Ecotone\Messaging\Conversion\Converter;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\Type;

#[MediaTypeConverter]
/**
* licence Apache-2.0
*/
final class JsonEncodingConverter implements Converter
{
public function convert($source, Type $sourceType, MediaType $sourceMediaType, Type $targetType, MediaType $targetMediaType)
{
return json_encode($source, JSON_THROW_ON_ERROR);
}

public function matches(Type $sourceType, MediaType $sourceMediaType, Type $targetType, MediaType $targetMediaType): bool
{
return $targetMediaType->isCompatibleWith(MediaType::createApplicationJson());
}
}
40 changes: 40 additions & 0 deletions packages/Kafka/tests/Integration/KafkaChannelAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Ecotone\Messaging\Channel\SimpleMessageChannelBuilder;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Endpoint\ExecutionPollingMetadata;
use Ecotone\Messaging\Handler\Logger\EchoLogger;
use Ecotone\Messaging\Handler\Recoverability\ErrorHandlerConfiguration;
Expand All @@ -28,13 +29,15 @@
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\Attributes\RunTestsInSeparateProcesses;
use PHPUnit\Framework\TestCase;
use stdClass;
use Symfony\Component\Uid\Uuid;
use Test\Ecotone\Kafka\ConnectionTestCase;
use Test\Ecotone\Kafka\Fixture\ChannelAdapter\ExampleKafkaConsumer;
use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithDelayedRetryExample;
use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithFailStrategyExample;
use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithInstantRetryAndErrorChannelExample;
use Test\Ecotone\Kafka\Fixture\KafkaConsumer\KafkaConsumerWithInstantRetryExample;
use Test\Ecotone\Kafka\Fixture\MediaTypeConverter\JsonEncodingConverter;

/**
* licence Enterprise
Expand Down Expand Up @@ -268,6 +271,43 @@ public function test_defining_error_channel(): void
$this->assertNotNull($ecotoneLite->getMessageChannel('customErrorChannel')->receive());
}

public function test_convert_and_send(): void
{
$topicName = Uuid::v7()->toRfc4122();
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[ExampleKafkaConsumer::class, JsonEncodingConverter::class],
[
KafkaBrokerConfiguration::class => ConnectionTestCase::getConnection(),
new ExampleKafkaConsumer(),
new JsonEncodingConverter(),
],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE, ModulePackageList::KAFKA_PACKAGE]))
->withExtensionObjects([
KafkaPublisherConfiguration::createWithDefaults($topicName, MessagePublisher::class, KafkaBrokerConfiguration::class, MediaType::APPLICATION_JSON),
TopicConfiguration::createWithReferenceName('exampleTopic', $topicName),
KafkaConsumerConfiguration::createWithDefaults('exampleConsumer'),
]),
licenceKey: LicenceTesting::VALID_LICENCE,
);

/** @var MessagePublisher $kafkaPublisher */
$kafkaPublisher = $ecotoneLite->getGateway(MessagePublisher::class);

$stdClass = new stdClass();
$stdClass->value = 1;
$kafkaPublisher->convertAndSend($stdClass);

$ecotoneLite->run('exampleConsumer', ExecutionPollingMetadata::createWithTestingSetup(
maxExecutionTimeInMilliseconds: 30000
));

$messages = $ecotoneLite->sendQueryWithRouting('getMessages');

self::assertCount(1, $messages);
self::assertEquals('{"value":1}', $messages[0]['payload']);
}

public function test_kafka_consumer_with_delayed_retry(): void
{
$endpointId = 'kafka_consumer_delayed_retry';
Expand Down
Loading