diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 86dfd9d1..4f7a1dd1 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include "AsioDefines.h" @@ -31,6 +32,7 @@ #include "ConsumerImpl.h" #include "ExecutorService.h" #include "LogUtils.h" +#include "MockServer.h" #include "OpSendMsg.h" #include "ProducerImpl.h" #include "PulsarApi.pb.h" @@ -1005,15 +1007,17 @@ Future ClientConnection::newConsumerStats(uint6 void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, const std::string& listenerName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise); + newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP", + promise); } void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId, const LookupDataResultPromisePtr& promise) { - newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise); + newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA", + promise); } -void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, +void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType, const LookupDataResultPromisePtr& promise) { Lock lock(mutex_); std::shared_ptr lookupDataResult; @@ -1042,6 +1046,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, pendingLookupRequests_.insert(std::make_pair(requestId, requestData)); numOfPendingLookupRequest_++; lock.unlock(); + LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); } @@ -1158,12 +1163,15 @@ void ClientConnection::sendPendingCommands() { } } -Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) { +Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); Promise promise; + LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << requestId + << ") to a closed connection"); promise.setFailed(ResultNotConnected); return promise.getFuture(); } @@ -1182,7 +1190,17 @@ Future ClientConnection::sendRequestWithId(const SharedBuf pendingRequests_.insert(std::make_pair(requestId, requestData)); lock.unlock(); - sendCommand(cmd); + LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")"); + if (mockingRequests_.load(std::memory_order_acquire)) { + if (mockServer_ == nullptr) { + LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType); + sendCommand(cmd); + } else if (!mockServer_->sendRequest(requestType, requestId)) { + sendCommand(cmd); + } + } else { + sendCommand(cmd); + } return requestData.promise.getFuture(); } @@ -1625,9 +1643,6 @@ void ClientConnection::handleConsumerStatsResponse( void ClientConnection::handleLookupTopicRespose( const proto::CommandLookupTopicResponse& lookupTopicResponse) { - LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: " - << lookupTopicResponse.request_id()); - Lock lock(mutex_); auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id()); if (it != pendingLookupRequests_.end()) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 18a7d846..aae53d23 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -115,6 +115,7 @@ struct ResponseData { typedef std::shared_ptr> NamespaceTopicsPtr; +class MockServer; class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this { enum State : uint8_t { @@ -123,6 +124,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this; public: typedef std::shared_ptr SocketPtr; @@ -185,7 +188,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this sendRequestWithId(const SharedBuffer& cmd, int requestId); + Future sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType); const std::string& brokerAddress() const; @@ -208,6 +212,13 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId); + void attachMockServer(const std::shared_ptr& mockServer) { + mockServer_ = mockServer; + // Mark that requests will first go through the mock server, if the mock server cannot process it, + // fall back to the normal logic + mockingRequests_.store(true, std::memory_order_release); + } + private: struct PendingRequestData { Promise promise; @@ -264,7 +275,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this state_{Pending}; TimeDuration operationsTimeout_; AuthenticationPtr authentication_; @@ -391,6 +405,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this mockServer_; + void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector& consumerStatsRequests); void startConsumerStatsTimer(std::vector consumerStatsRequests); @@ -405,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this #include +#include +#include #include "AckGroupingTracker.h" #include "AckGroupingTrackerDisabled.h" @@ -123,7 +125,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const std::string& topic negativeAcksTracker_(std::make_shared(client, *this, conf)), ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)), readCompacted_(conf.isReadCompacted()), - startMessageId_(getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), + startMessageId_(pulsar::getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), @@ -189,7 +191,8 @@ ConsumerImpl::~ConsumerImpl() { ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); cnx->removeConsumer(consumerId_); LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << consumerId_); } else { @@ -232,20 +235,19 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c return promise.getFuture(); } - // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after - // sending the subscribe request. - cnx->registerConsumer(consumerId_, get_shared_this_ptr()); + optional subscribeMessageId; + { + LockGuard lock{mutex_}; + setCnx(cnx); + cnx->registerConsumer(consumerId_, get_shared_this_ptr()); + LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_); - if (duringSeek()) { - ackGroupingTrackerPtr_->flushAndClean(); + clearReceiveQueue(); + subscribeMessageId = + (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_ : std::nullopt; + lastDequedMessageId_ = MessageId::earliest(); } - Lock lockForMessageId(mutexForMessageId_); - clearReceiveQueue(); - const auto subscribeMessageId = - (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_.get() : std::nullopt; - lockForMessageId.unlock(); - unAckedMessageTrackerPtr_->clear(); ClientImplPtr client = client_.lock(); @@ -259,13 +261,22 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = get_shared_this_ptr(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateConsumer(cnx, result); - if (handleResult == ResultOk) { - promise.setSuccess(); - } else { + if (handleResult != ResultOk) { promise.setFailed(handleResult); + return; + } + promise.setSuccess(); + // Complete the seek callback after completing `promise`, otherwise `reconnectionPending_` will + // still be true when the seek operation is done. + LockGuard lock{mutex_}; + if (seekStatus_ == SeekStatus::COMPLETED) { + executor_->postWork([seekCallback{std::exchange(seekCallback_, std::nullopt).value()}]() { + seekCallback(ResultOk); + }); + seekStatus_ = SeekStatus::NOT_STARTED; } }); @@ -301,7 +312,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result LOG_INFO(getName() << "Closing subscribed consumer since it was already closed"); int requestId = client->newRequestId(); auto name = getName(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER") .addListener([name](Result result, const ResponseData&) { if (result == ResultOk) { LOG_INFO(name << "Closed consumer successfully after subscribe completed"); @@ -321,8 +333,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result return ResultAlreadyClosed; } + mutexLock.unlock(); LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); - setCnx(cnx); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); backoff_.reset(); @@ -354,7 +366,8 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result // in case it was indeed created, otherwise it might prevent new subscribe operation, // since we are not closing the connection auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, + "CLOSE_CONSUMER"); } if (consumerCreatedPromise_.isComplete()) { @@ -408,7 +421,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } else { Result result = ResultNotConnected; @@ -502,9 +515,10 @@ optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay auto& chunkedMsgCtx = it->second; if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) { - auto startMessageId = startMessageId_.get().value_or(MessageId::earliest()); - if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() == messageId.ledgerId() && - startMessageId.entryId() == messageId.entryId()) { + auto startMessageId = getStartMessageId(); + if (!config_.isStartMessageIdInclusive() && startMessageId && + startMessageId->ledgerId() == messageId.ledgerId() && + startMessageId->entryId() == messageId.entryId()) { // When the start message id is not inclusive, the last chunk of the previous chunked message will // be delivered, which is expected and we only need to filter it out. chunkedMessageCache_.remove(uuid); @@ -621,17 +635,14 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: words[i] = msg.ack_set(i); } BitSet ackSet{std::move(words)}; - Lock lock(mutex_); numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count()); } else { // try convert key value data. m.impl_->convertPayloadToKeyValue(config_.getSchema()); - const auto startMessageId = startMessageId_.get(); - if (isPersistent_ && startMessageId && - m.getMessageId().ledgerId() == startMessageId.value().ledgerId() && - m.getMessageId().entryId() == startMessageId.value().entryId() && - isPriorEntryIndex(m.getMessageId().entryId())) { + const auto startMessageId = getStartMessageId(); + if (isPersistent_ && startMessageId && m.getMessageId().ledgerId() == startMessageId->ledgerId() && + isPrior(m.getMessageId().entryId(), startMessageId->entryId())) { LOG_DEBUG(getName() << " Ignoring message from before the startMessageId: " << startMessageId.value()); return; @@ -753,7 +764,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch(); LOG_DEBUG("Received Batch messages of size - " << batchSize << " -- msgId: " << batchedMessage.getMessageId()); - const auto startMessageId = startMessageId_.get(); + const auto startMessageId = getStartMessageId(); int skippedMessages = 0; @@ -783,9 +794,9 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId - if (isPersistent_ && msgId.ledgerId() == startMessageId.value().ledgerId() && - msgId.entryId() == startMessageId.value().entryId() && - isPriorBatchIndex(msgId.batchIndex())) { + if (isPersistent_ && msgId.ledgerId() == startMessageId->ledgerId() && + msgId.entryId() == startMessageId->entryId() && + isPrior(msgId.batchIndex(), startMessageId->batchIndex())) { LOG_DEBUG(getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); ++skippedMessages; @@ -925,7 +936,7 @@ void ConsumerImpl::internalListener() { trackMessage(msg.getMessageId()); try { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); - lastDequedMessageId_ = msg.getMessageId(); + setLastDequedMessageId(msg.getMessageId()); Consumer consumer{get_shared_this_ptr()}; Message interceptMsg = interceptors_->beforeConsume(Consumer(shared_from_this()), msg); messageListener_(consumer, interceptMsg); @@ -1098,10 +1109,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { } void ConsumerImpl::messageProcessed(Message& msg, bool track) { - Lock lock(mutexForMessageId_); - lastDequedMessageId_ = msg.getMessageId(); - lock.unlock(); - + setLastDequedMessageId(msg.getMessageId()); incomingMessagesSize_.fetch_sub(msg.getLength()); ClientConnectionPtr currentCnx = getCnx().lock(); @@ -1123,20 +1131,22 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { * was * not seen by the application * `startMessageId_` is updated so that we can discard messages after delivery restarts. + * NOTE: `mutex_` must be locked before calling this method. */ void ConsumerImpl::clearReceiveQueue() { - if (duringSeek()) { - if (hasSoughtByTimestamp()) { - // Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be - // skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare. - startMessageId_ = std::nullopt; + if (seekStatus_ != SeekStatus::NOT_STARTED) { + // Flush the pending ACKs in case newly arrived messages are filtered out by the previous pending ACKs + ackGroupingTrackerPtr_->flushAndClean(); + if (lastSeekArg_.has_value()) { + if (std::holds_alternative(lastSeekArg_.value())) { + startMessageId_ = std::get(lastSeekArg_.value()); + } else { + // Invalidate startMessageId_ so that `isPrior` checks will be skipped, and + // `hasMessageAvailableAsync` won't use `startMessageId_` in compare. + startMessageId_ = std::nullopt; + } } else { - startMessageId_ = seekMessageId_.get(); - } - SeekStatus expected = SeekStatus::COMPLETED; - if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) { - auto seekCallback = seekCallback_.release(); - executor_->postWork([seekCallback] { seekCallback(ResultOk); }); + LOG_ERROR(getName() << "SeekStatus is not NOT_STARTED but lastSeekArg_ is not set"); } return; } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { @@ -1374,7 +1384,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); auto self = get_shared_this_ptr(); - cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } @@ -1550,10 +1560,12 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, const ResultCallback& callb } const auto requestId = newRequestId(); - seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; + seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, + std::move(nonNullCallback)); } -void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) { +void ConsumerImpl::seekAsync(SeekTimestampType timestamp, const ResultCallback& callback) { const auto state = state_.load(); if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); @@ -1564,8 +1576,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback) } const auto requestId = newRequestId(); + auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {}; seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp}, - callback); + std::move(nonNullCallback)); } bool ConsumerImpl::isReadCompacted() { return readCompacted_; } @@ -1577,16 +1590,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c } bool compareMarkDeletePosition; { - std::lock_guard lock{mutexForMessageId_}; + LockGuard lock{mutex_}; compareMarkDeletePosition = // there is no message received by consumer, so we cannot compare the last position with the last // received position lastDequedMessageId_ == MessageId::earliest() && // If the start message id is latest, we should seek to the actual last message first. - (startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest() || + (startMessageId_.value_or(MessageId::earliest()) == MessageId::latest() || // If there is a previous seek operation by timestamp, the start message id will be incorrect, so // we cannot compare the start position with the last position. - hasSoughtByTimestamp()); + (lastSeekArg_.has_value() && std::holds_alternative(lastSeekArg_.value()))); } if (compareMarkDeletePosition) { auto self = get_shared_this_ptr(); @@ -1607,7 +1620,15 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c callback(ResultOk, false); } }; - if (self->config_.isStartMessageIdInclusive() && !self->hasSoughtByTimestamp()) { + bool lastSeekIsByTimestamp = false; + { + LockGuard lock{self->mutex_}; + if (self->lastSeekArg_.has_value() && + std::holds_alternative(self->lastSeekArg_.value())) { + lastSeekIsByTimestamp = true; + } + } + if (self->config_.isStartMessageIdInclusive() && !lastSeekIsByTimestamp) { self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { if (result != ResultOk) { callback(result, {}); @@ -1664,9 +1685,10 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) { if (result == ResultOk) { LOG_DEBUG(getName() << "getLastMessageId: " << response); - Lock lock(mutexForMessageId_); - lastMessageIdInBroker_ = response.getLastMessageId(); - lock.unlock(); + { + LockGuard lock{mutex_}; + lastMessageIdInBroker_ = response.getLastMessageId(); + } } else { LOG_ERROR(getName() << "Failed to getLastMessageId: " << result); } @@ -1723,78 +1745,89 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback) { + ResultCallback&& callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); callback(ResultNotConnected); return; } - - auto expected = SeekStatus::NOT_STARTED; - if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) { - LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is " - << static_cast(expected)); + bool hasPendingSeek = false; + // Save the previous last seek arg in case seek failed + decltype(lastSeekArg_) previousLastSeekArg; + { + std::lock_guard lock(mutex_); + if (seekStatus_ != SeekStatus::NOT_STARTED) { + hasPendingSeek = true; + } else { + seekStatus_ = SeekStatus::IN_PROGRESS; + if (seekCallback_.has_value()) { + // This should never happen + LOG_ERROR(getName() << "Previous seek callback is not triggered unexpectedly"); + executor_->postWork([callback{std::exchange(seekCallback_, std::nullopt).value()}] { + callback(ResultTimeout); + }); + } + seekCallback_ = std::move(callback); + previousLastSeekArg = lastSeekArg_; + lastSeekArg_ = seekArg; + } + } + if (hasPendingSeek) { + std::visit( + [this](auto&& arg) { + LOG_ERROR(getName() << "Attempted to seek " << arg << " when there is a pending seek"); + }, + seekArg); callback(ResultNotAllowedError); return; } - const auto originalSeekMessageId = seekMessageId_.get(); - if (boost::get(&seekArg)) { - hasSoughtByTimestamp_.store(true, std::memory_order_release); - } else { - seekMessageId_ = *boost::get(&seekArg); - hasSoughtByTimestamp_.store(false, std::memory_order_release); - } - seekStatus_ = SeekStatus::IN_PROGRESS; - seekCallback_ = callback; - LOG_INFO(getName() << " Seeking subscription to " << seekArg); + std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking subscription to " << arg); }, seekArg); auto weakSelf = weak_from_this(); - cnx->sendRequestWithId(seek, requestId) - .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, - const ResponseData& responseData) { + cnx->sendRequestWithId(seek, requestId, "SEEK") + .addListener([this, weakSelf, previousLastSeekArg](Result result, const ResponseData& responseData) { auto self = weakSelf.lock(); if (!self) { - callback(result); return; } if (result == ResultOk) { - LOG_INFO(getName() << "Seek successfully"); - ackGroupingTrackerPtr_->flushAndClean(); - incomingMessages_.clear(); - Lock lock(mutexForMessageId_); - lastDequedMessageId_ = MessageId::earliest(); - lock.unlock(); - if (getCnx().expired()) { + LockGuard lock(mutex_); + if (getCnx().expired() || reconnectionPending_) { // It's during reconnection, complete the seek future after connection is established seekStatus_ = SeekStatus::COMPLETED; + LOG_INFO(getName() << "Delay the seek future until the reconnection is done"); } else { - if (!hasSoughtByTimestamp()) { - startMessageId_ = seekMessageId_.get(); + LOG_INFO(getName() << "Seek successfully"); + ackGroupingTrackerPtr_->flushAndClean(); + incomingMessages_.clear(); + if (lastSeekArg_.has_value() && std::holds_alternative(lastSeekArg_.value())) { + startMessageId_ = std::get(lastSeekArg_.value()); } - seekCallback_.release()(result); - } + if (!seekCallback_.has_value()) { + LOG_ERROR(getName() << "Seek callback is not set"); + return; + } + executor_->postWork( + [self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { + callback(ResultOk); + }); + seekStatus_ = SeekStatus::NOT_STARTED; + } // else: complete the seek future after connection is established } else { LOG_ERROR(getName() << "Failed to seek: " << result); - seekMessageId_ = originalSeekMessageId; + LockGuard lock{mutex_}; seekStatus_ = SeekStatus::NOT_STARTED; - seekCallback_.release()(result); + lastSeekArg_ = previousLastSeekArg; + executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() { + callback(ResultOk); + }); } }); } -bool ConsumerImpl::isPriorBatchIndex(int32_t idx) { - return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().batchIndex() - : idx <= startMessageId_.get().value().batchIndex(); -} - -bool ConsumerImpl::isPriorEntryIndex(int64_t idx) { - return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().entryId() - : idx <= startMessageId_.get().value().entryId(); -} - bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const { if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) { return false; @@ -1928,7 +1961,7 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const MessageI auto requestId = newRequestId(); cnx->sendRequestWithId( Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId), - requestId) + requestId, "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); @@ -1958,7 +1991,8 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const std::set if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { if (config_.isAckReceiptEnabled()) { auto requestId = newRequestId(); - cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId) + cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId, + "ACK") .addListener([callback](Result result, const ResponseData&) { if (callback) { callback(result); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 63eb51d6..0da82a2d 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -21,13 +21,15 @@ #include -#include +#include #include #include #include #include +#include #include #include +#include #include "BrokerConsumerStatsImpl.h" #include "Commands.h" @@ -37,7 +39,6 @@ #include "MapCache.h" #include "MessageIdImpl.h" #include "NegativeAcksTracker.h" -#include "Synchronized.h" #include "TestUtil.h" #include "TimeUtils.h" #include "UnboundedBlockingQueue.h" @@ -79,9 +80,9 @@ const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; enum class SeekStatus : std::uint8_t { - NOT_STARTED, - IN_PROGRESS, - COMPLETED + NOT_STARTED, // there is no pending seek RPC so that it's allowed to seek + IN_PROGRESS, // the seek RPC is in progress + COMPLETED // the seek RPC is done but the connection is not established yet }; class ConsumerImpl : public ConsumerImplBase { @@ -138,7 +139,9 @@ class ConsumerImpl : public ConsumerImplBase { void getBrokerConsumerStatsAsync(const BrokerConsumerStatsCallback& callback) override; void getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback) override; void seekAsync(const MessageId& msgId, const ResultCallback& callback) override; - void seekAsync(uint64_t timestamp, const ResultCallback& callback) override; + using SeekTimestampType = uint64_t; + using SeekArg = std::variant; + void seekAsync(SeekTimestampType timestamp, const ResultCallback& callback) override; void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; uint64_t getNumberOfConnectedConsumer() override; @@ -191,8 +194,10 @@ class ConsumerImpl : public ConsumerImplBase { void drainIncomingMessageQueue(size_t count); uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage, const BitSet& ackSet, int redeliveryCount); - bool isPriorBatchIndex(int32_t idx); - bool isPriorEntryIndex(int64_t idx); + template + bool isPrior(T index, T startIndex) const noexcept { + return config_.isStartMessageIdInclusive() ? (index < startIndex) : (index <= startIndex); + } void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const BrokerConsumerStatsCallback&); enum class DecryptionResult : uint8_t @@ -218,19 +223,9 @@ class ConsumerImpl : public ConsumerImplBase { const BrokerGetLastMessageIdCallback& callback); void clearReceiveQueue(); - using SeekArg = boost::variant; - friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) { - auto ptr = boost::get(&seekArg); - if (ptr) { - os << *ptr; - } else { - os << *boost::get(&seekArg); - } - return os; - } void seekAsyncInternal(long requestId, const SharedBuffer& seek, const SeekArg& seekArg, - const ResultCallback& callback); + ResultCallback&& callback); void processPossibleToDLQ(const MessageId& messageId, const ProcessDLQCallBack& cb); std::mutex mutexForReceiveWithZeroQueueSize; @@ -269,19 +264,13 @@ class ConsumerImpl : public ConsumerImplBase { std::shared_ptr> deadLetterProducer_; std::mutex createProducerLock_; - // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_` thread safe - mutable std::mutex mutexForMessageId_; MessageId lastDequedMessageId_{MessageId::earliest()}; MessageId lastMessageIdInBroker_{MessageId::earliest()}; + optional startMessageId_; - std::atomic seekStatus_{SeekStatus::NOT_STARTED}; - Synchronized seekCallback_{[](Result) {}}; - Synchronized> startMessageId_; - Synchronized seekMessageId_{MessageId::earliest()}; - std::atomic hasSoughtByTimestamp_{false}; - - bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); } - bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; } + SeekStatus seekStatus_{SeekStatus::NOT_STARTED}; + optional seekCallback_; + optional lastSeekArg_; class ChunkedMessageCtx { public: @@ -380,7 +369,8 @@ class ConsumerImpl : public ConsumerImplBase { const ClientConnectionPtr& cnx, MessageId& messageId); bool hasMoreMessages() const { - std::lock_guard lock{mutexForMessageId_}; + LockGuard lock{mutex_}; + if (lastMessageIdInBroker_.entryId() == -1L) { return false; } @@ -388,7 +378,7 @@ class ConsumerImpl : public ConsumerImplBase { const auto inclusive = config_.isStartMessageIdInclusive(); if (lastDequedMessageId_ == MessageId::earliest()) { // If startMessageId_ is none, use latest so that this method will return false - const auto startMessageId = startMessageId_.get().value_or(MessageId::latest()); + const auto startMessageId = startMessageId_.value_or(MessageId::latest()); return inclusive ? (lastMessageIdInBroker_ >= startMessageId) : (lastMessageIdInBroker_ > startMessageId); } else { @@ -396,11 +386,22 @@ class ConsumerImpl : public ConsumerImplBase { } } + auto getStartMessageId() const { + LockGuard lock{mutex_}; + return startMessageId_; + } + auto setLastDequedMessageId(const MessageId& messageId) { + LockGuard lock{mutex_}; + lastDequedMessageId_ = messageId; + } + void doImmediateAck(const ClientConnectionPtr& cnx, const MessageId& msgId, CommandAck_AckType ackType, const ResultCallback& callback); void doImmediateAck(const ClientConnectionPtr& cnx, const std::set& msgIds, const ResultCallback& callback); + using LockGuard = std::lock_guard; + friend class PulsarFriend; friend class MultiTopicsConsumerImpl; diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 37c6e2d5..1a4f573e 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -44,9 +44,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, state_(NotStarted), backoff_(backoff), epoch_(0), + reconnectionPending_(false), timer_(executor_->createDeadlineTimer()), creationTimer_(executor_->createDeadlineTimer()), - reconnectionPending_(false), redirectedClusterURI_("") {} HandlerBase::~HandlerBase() { diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index acce15d9..0e733f00 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -143,6 +143,7 @@ class HandlerBase : public std::enable_shared_from_this { std::atomic state_; Backoff backoff_; uint64_t epoch_; + std::atomic reconnectionPending_; Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const; @@ -160,7 +161,6 @@ class HandlerBase : public std::enable_shared_from_this { DeadlineTimerPtr creationTimer_; mutable std::mutex connectionMutex_; - std::atomic reconnectionPending_; ClientConnectionWeakPtr connection_; std::string redirectedClusterURI_; std::atomic firstRequestIdAfterConnect_{-1L}; diff --git a/lib/MockServer.h b/lib/MockServer.h new file mode 100644 index 00000000..8e4a2136 --- /dev/null +++ b/lib/MockServer.h @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "ClientConnection.h" +#include "ConsumerImpl.h" +#include "ExecutorService.h" +#include "LogUtils.h" +#include "PulsarApi.pb.h" + +namespace pulsar { + +class MockServer : public std::enable_shared_from_this { + public: + using RequestDelayType = std::unordered_map; + + MockServer(const ClientConnectionPtr& connection) : connection_(connection) { + requestDelays_["CLOSE_CONSUMER"] = 1; + } + + void setRequestDelay(std::initializer_list delays) { + std::lock_guard lock(mutex_); + for (auto&& delay : delays) { + requestDelays_[delay.first] = delay.second; + } + } + + bool sendRequest(const std::string& request, uint64_t requestId) { + auto connection = connection_.lock(); + if (!connection) { + return false; + } + std::lock_guard lock(mutex_); + if (auto iter = requestDelays_.find(request); iter != requestDelays_.end()) { + // Mock the `CLOSE_CONSUMER` command sent by broker, for simplicity, disconnect all consumers + if (request == "SEEK") { + schedule(connection, "CLOSE_CONSUMER" + std::to_string(requestId), + requestDelays_["CLOSE_CONSUMER"], [connection] { + std::vector consumerIds; + { + std::lock_guard lock{connection->mutex_}; + for (auto&& kv : connection->consumers_) { + if (auto consumer = kv.second.lock()) { + consumerIds.push_back(consumer->getConsumerId()); + } + } + } + for (auto consumerId : consumerIds) { + proto::CommandCloseConsumer closeConsumerCmd; + closeConsumerCmd.set_consumer_id(consumerId); + connection->handleCloseConsumer(closeConsumerCmd); + } + }); + } + schedule(connection, request + std::to_string(requestId), iter->second, [connection, requestId] { + proto::CommandSuccess success; + success.set_request_id(requestId); + connection->handleSuccess(success); + }); + return true; + } else { + return false; + } + } + + // Return the number of pending timers cancelled + auto close() { + std::lock_guard lock(mutex_); + auto result = pendingTimers_.size(); + for (auto&& kv : pendingTimers_) { + try { + LOG_INFO("Cancelling timer for " << kv.first); + kv.second->cancel(); + } catch (...) { + LOG_WARN("Failed to cancel timer for " << kv.first); + } + } + pendingTimers_.clear(); + return result; + } + + private: + mutable std::mutex mutex_; + std::unordered_map requestDelays_; + std::unordered_map pendingTimers_; + ClientConnectionWeakPtr connection_; + + void schedule(ClientConnectionPtr& connection, const std::string& key, long delayMs, + std::function&& task) { + auto timer = connection->executor_->createDeadlineTimer(); + pendingTimers_[key] = timer; + timer->expires_from_now(std::chrono::milliseconds(delayMs)); + LOG_INFO("Mock scheduling " << key << " with delay " << delayMs << " ms"); + auto self = shared_from_this(); + timer->async_wait([this, self, key, connection, task{std::move(task)}](const auto& ec) { + { + std::lock_guard lock(mutex_); + pendingTimers_.erase(key); + } + if (ec) { + LOG_INFO("Timer cancelled for " << key); + return; + } + if (connection->isClosed()) { + LOG_INFO("Connection is closed, not completing request " << key); + return; + } + LOG_INFO("Completing delayed request " << key); + task(); + }); + } + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 9d6a9a08..7fd14c7c 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -160,7 +160,7 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = shared_from_this(); setFirstRequestIdAfterConnect(requestId); - cnx->sendRequestWithId(cmd, requestId) + cnx->sendRequestWithId(cmd, requestId, "PRODUCER") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateProducer(cnx, result, responseData); if (handleResult == ResultOk) { @@ -204,7 +204,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } if (!producerCreatedPromise_.isComplete()) { @@ -266,7 +267,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result auto client = client_.lock(); if (client) { int requestId = client->newRequestId(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, + "CLOSE_PRODUCER"); } } @@ -818,7 +820,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) { int requestId = client->newRequestId(); auto self = shared_from_this(); - cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId) + cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, "CLOSE_PRODUCER") .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); } diff --git a/lib/Synchronized.h b/lib/Synchronized.h deleted file mode 100644 index 5449a9fe..00000000 --- a/lib/Synchronized.h +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#pragma once - -#include - -template -class Synchronized { - public: - explicit Synchronized(const T& value) : value_(value) {} - - T get() const { - std::lock_guard lock(mutex_); - return value_; - } - - T&& release() { - std::lock_guard lock(mutex_); - return std::move(value_); - } - - Synchronized& operator=(const T& value) { - std::lock_guard lock(mutex_); - value_ = value; - return *this; - } - - private: - T value_; - mutable std::mutex mutex_; -}; diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index f03ea5e3..f66c27d7 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -19,12 +19,18 @@ #include #include +#include +#include +#include #include #include #include #include "HttpHelper.h" +#include "lib/ClientConnection.h" #include "lib/LogUtils.h" +#include "lib/MockServer.h" +#include "tests/PulsarFriend.h" DECLARE_LOG_OBJECT() @@ -200,6 +206,58 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) { ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); } +static void assertSeekWithTimeout(Consumer& consumer) { + using namespace std::chrono_literals; + auto promise = std::make_shared>(); + std::weak_ptr> weakPromise = promise; + consumer.seekAsync(0L, [weakPromise](Result result) { + if (auto promise = weakPromise.lock()) { + promise->set_value(result); + } + }); + auto future = promise->get_future(); + ASSERT_EQ(future.wait_for(5s), std::future_status::ready); + ASSERT_EQ(future.get(), ResultOk); +} + +// Verify the `seek` method won't be blocked forever in any order of the Subscribe response and Seek response +TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) { + Client client(lookupUrl); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", consumer)); + + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}}); + assertSeekWithTimeout(consumer); + + mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}}); + assertSeekWithTimeout(consumer); + + ASSERT_EQ(mockServer->close(), 0); + client.close(); +} + +TEST_F(ConsumerSeekTest, testReconnectionSlow) { + Client client(lookupUrl, ClientConfiguration().setInitialBackoffIntervalMs(500)); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("testReconnectionSlow", "sub", consumer)); + + auto connection = *PulsarFriend::getConnections(client).begin(); + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + // Make seek response received before `connectionOpened` is called + mockServer->setRequestDelay({{"SEEK", 500}, {"CLOSE_CONSUMER", 1000}}); + assertSeekWithTimeout(consumer); + + // The CLOSE_CONSUMER request is in still flight + ASSERT_EQ(mockServer->close(), 1); + client.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); } // namespace pulsar