Skip to content

Commit 95697d6

Browse files
authored
Merge pull request #7 from simPod/rdk4
Upgrade to rdkafka v4
2 parents a8bfaf6 + d53c2b0 commit 95697d6

File tree

8 files changed

+76
-66
lines changed

8 files changed

+76
-66
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
php-version: 7.3
3232
coverage: none
3333
tools: pecl
34-
extensions: json, mbstring, rdkafka-3.1.2
34+
extensions: json, mbstring, rdkafka
3535

3636
- name: Get Composer Cache Directory
3737
id: composer-cache
@@ -68,7 +68,7 @@ jobs:
6868
php-version: 7.3
6969
coverage: none
7070
tools: pecl
71-
extensions: json, mbstring, rdkafka-3.1.2
71+
extensions: json, mbstring, rdkafka
7272

7373
- name: Get Composer Cache Directory
7474
id: composer-cache
@@ -110,7 +110,7 @@ jobs:
110110
php-version: 7.3
111111
coverage: pcov
112112
tools: pecl
113-
extensions: json, mbstring, rdkafka-3.1.2
113+
extensions: json, mbstring, rdkafka
114114

115115
- name: Get Composer Cache Directory
116116
id: composer-cache
@@ -140,7 +140,7 @@ jobs:
140140
runs-on: ubuntu-18.04
141141
strategy:
142142
matrix:
143-
php: [7.3]
143+
php: [7.3, 7.4]
144144
env: [
145145
'DEPENDENCIES=--prefer-lowest',
146146
'',
@@ -164,7 +164,7 @@ jobs:
164164
php-version: ${{ matrix.php }}
165165
coverage: none
166166
tools: pecl
167-
extensions: json, mbstring, rdkafka-3.1.2
167+
extensions: json, mbstring, rdkafka
168168

169169
- name: Get Composer Cache Directory
170170
id: composer-cache

.github/workflows/infection.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
php-version: 7.3
3131
coverage: pcov
3232
tools: pecl
33-
extensions: json, mbstring, rdkafka-3.1.2
33+
extensions: json, mbstring, rdkafka
3434

3535
- name: Install Dependencies
3636
run: composer install --prefer-dist --no-progress --no-suggest

.github/workflows/shepherd.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
php-version: 7.3
2828
coverage: none
2929
tools: pecl
30-
extensions: json, mbstring, rdkafka-3.1.2
30+
extensions: json, mbstring, rdkafka
3131

3232
- name: Install dependencies
3333
run: composer install --prefer-dist --no-progress --no-suggest

composer.json

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,21 @@
1515
"require": {
1616
"php": "^7.2",
1717
"ext-pcntl": "*",
18-
"ext-rdkafka": "*",
19-
"psr/log": "^1.1",
20-
"symfony/polyfill-php73": "^1.11"
18+
"ext-rdkafka": "^4",
19+
"psr/log": "^1.1"
2120
},
2221
"require-dev": {
2322
"doctrine/coding-standard": "^7.0",
2423
"infection/infection": "^0.16.3",
25-
"kwn/php-rdkafka-stubs": "^1.0",
24+
"kwn/php-rdkafka-stubs": "^2.0",
2625
"phpstan/extension-installer": "^1.0",
2726
"phpstan/phpstan": "0.12.25",
2827
"phpstan/phpstan-phpunit": "0.12.8",
2928
"phpstan/phpstan-strict-rules": "0.12.2",
3029
"phpunit/phpunit": "^9.1",
3130
"psalm/plugin-phpunit": "^0.10.0",
3231
"simpod/php-coveralls-mirror": "^3.0",
33-
"vimeo/psalm": "dev-master#47cf69d as 3.11.4"
32+
"vimeo/psalm": "^3.12"
3433
},
3534
"suggest": {
3635
"kwn/php-rdkafka-stubs": "Support and autocompletion for RDKafka in IDE | require as dev dependency"

src/Clients/Consumer/KafkaConsumer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ function (RdKafkaConsumer $kafka, int $err, string $reason) : void {
4444
}
4545
);
4646

47-
$rebalanceCb =
47+
$rebalanceCallback =
4848
/** @param array<string, TopicPartition>|null $partitions */
4949
function (RdKafkaConsumer $kafka, int $err, ?array $partitions = null) : void {
5050
switch ($err) {
@@ -77,7 +77,7 @@ static function (TopicPartition $partition) : string {
7777
$kafka->assign();
7878
}
7979
};
80-
$config->getConf()->setRebalanceCb($rebalanceCb);
80+
$config->getConf()->setRebalanceCb($rebalanceCallback);
8181

8282
parent::__construct($config->getConf());
8383
}

src/Clients/Producer/KafkaProducer.php

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,78 @@
44

55
namespace SimPod\Kafka\Clients\Producer;
66

7+
use InvalidArgumentException;
78
use RdKafka\Producer;
9+
use RdKafka\ProducerTopic;
10+
use RuntimeException;
11+
use function sprintf;
812
use const RD_KAFKA_PARTITION_UA;
13+
use const RD_KAFKA_RESP_ERR_NO_ERROR;
914

1015
class KafkaProducer extends Producer
1116
{
1217
private const RD_KAFKA_MSG_F_COPY = 0;
1318

14-
public function __construct(ProducerConfig $config)
19+
/** @var callable(KafkaProducer):void|null */
20+
private $exitCallback;
21+
22+
/** @param callable(KafkaProducer):void|null $exitCallback */
23+
public function __construct(ProducerConfig $config, ?callable $exitCallback = null)
1524
{
25+
$this->exitCallback = $exitCallback;
26+
1627
parent::__construct($config->getConf());
1728
}
1829

19-
public function produce(ProducerRecord $record) : void
30+
public function __destruct()
2031
{
21-
$topic = $this->newTopic($record->topic);
22-
/** @psalm-suppress UndefinedMethod https://github.com/vimeo/psalm/issues/3406 */
23-
$topic->produce($record->partition ?? RD_KAFKA_PARTITION_UA, self::RD_KAFKA_MSG_F_COPY, $record->value, $record->key);
32+
if ($this->exitCallback === null) {
33+
return;
34+
}
35+
36+
($this->exitCallback)($this);
37+
}
38+
39+
/** @param array<string, string>|null $headers */
40+
public function produce(
41+
string $topicName,
42+
?int $partition,
43+
string $value,
44+
?string $key = null,
45+
?array $headers = null,
46+
?int $timestampMs = null
47+
) : void {
48+
if ($partition < 0) {
49+
throw new InvalidArgumentException(
50+
sprintf('Invalid partition: %d. Partition number should always be non-negative or null.', $partition)
51+
);
52+
}
53+
54+
/** @psalm-var ProducerTopic $topic Psalm thinks this is a Topic https://github.com/vimeo/psalm/issues/3406 */
55+
$topic = $this->newTopic($topicName);
56+
$topic->producev(
57+
$partition ?? RD_KAFKA_PARTITION_UA,
58+
self::RD_KAFKA_MSG_F_COPY,
59+
$value,
60+
$key,
61+
$headers,
62+
$timestampMs
63+
);
2464
$this->poll(0);
2565
}
2666

27-
public function flush() : void
67+
public function flushMessages(int $timeoutMs = 10000) : void
2868
{
29-
while ($this->getOutQLen() > 0) {
30-
$this->poll(1);
69+
$result = null;
70+
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
71+
$result = $this->flush($timeoutMs);
72+
if ($result === RD_KAFKA_RESP_ERR_NO_ERROR) {
73+
break;
74+
}
75+
}
76+
77+
if ($result !== RD_KAFKA_RESP_ERR_NO_ERROR) {
78+
throw new RuntimeException('Was unable to flush, messages might be lost!');
3179
}
3280
}
3381
}

src/Clients/Producer/ProducerRecord.php

Lines changed: 0 additions & 35 deletions
This file was deleted.

tests/Clients/Consumer/TestProducer.php

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use SimPod\Kafka\Clients\Producer\KafkaProducer;
88
use SimPod\Kafka\Clients\Producer\ProducerConfig;
9-
use SimPod\Kafka\Clients\Producer\ProducerRecord;
109
use function gethostname;
1110

1211
final class TestProducer
@@ -16,18 +15,17 @@ final class TestProducer
1615

1716
public function __construct()
1817
{
19-
$this->producer = new KafkaProducer($this->getConfig());
18+
$this->producer = new KafkaProducer(
19+
$this->getConfig(),
20+
static function (KafkaProducer $producer) : void {
21+
$producer->flushMessages(5000);
22+
}
23+
);
2024
}
2125

2226
public function run(string $payload) : void
2327
{
24-
$record = new ProducerRecord(KafkaBatchConsumerTest::TOPIC, null, $payload);
25-
$this->producer->produce($record);
26-
}
27-
28-
public function flush() : void
29-
{
30-
$this->producer->flush();
28+
$this->producer->produce(KafkaBatchConsumerTest::TOPIC, null, $payload);
3129
}
3230

3331
private function getConfig() : ProducerConfig

0 commit comments

Comments
 (0)