1- // Copyright (c) 2023-present, Arana/Kiwi Community. All rights reserved.
1+ // Copyright (c) 2023-present, arana-db Community. All rights reserved.
22// This source code is licensed under the BSD-style license found in the
33// LICENSE file in the root directory of this source tree. An additional grant
44// of patent rights can be found in the PATENTS file in the same directory
99
1010#include " base_cmd.h"
1111
12- #include " fmt/core.h"
13-
1412#include " raft/raft.h"
1513
1614#include " common.h"
1715#include " config.h"
1816#include " kiwi.h"
1917#include " log.h"
20- #include " raft/raft.h"
2118
2219namespace kiwi {
2320
@@ -44,16 +41,19 @@ void BaseCmd::Execute(PClient* client) {
4441 // read consistency (lease read) / write redirection
4542 if (g_config.use_raft && (HasFlag (kCmdFlagsReadonly ) || HasFlag (kCmdFlagsWrite ))) {
4643 if (!RAFT_INST.IsInitialized ()) {
47- return client->SetRes (CmdRes::kErrOther , " RAFT_INST is not initialized" );
44+ client->SetRes (CmdRes::kErrOther , " RAFT_INST is not initialized" );
45+ return ;
4846 }
4947
5048 if (!RAFT_INST.IsLeader ()) {
5149 auto leader_addr = RAFT_INST.GetLeaderAddress ();
5250 if (leader_addr.empty ()) {
53- return client->SetRes (CmdRes::kErrOther , std::string (" -CLUSTERDOWN No Raft leader" ));
51+ client->SetRes (CmdRes::kErrClusterDown , " No raft leader" );
52+ return ;
5453 }
5554
56- return client->SetRes (CmdRes::kErrOther , fmt::format (" -MOVED {}" , leader_addr));
55+ client->SetRes (CmdRes::kErrMoved , leader_addr);
56+ return ;
5757 }
5858 }
5959
@@ -106,6 +106,84 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
106106 return subCmd->second .get ();
107107}
108108
109+ void BaseCmd::BlockThisClientToWaitLRPush (std::vector<std::string>& keys, int64_t expire_time,
110+ std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
111+ std::lock_guard<std::shared_mutex> map_lock (g_kiwi->GetBlockMtx ());
112+ auto & key_to_conns = g_kiwi->GetMapFromKeyToConns ();
113+ for (const auto & key : keys) {
114+ kiwi::BlockKey blpop_key{client->GetCurrentDB (), key};
115+
116+ auto it = key_to_conns.find (blpop_key);
117+ if (it == key_to_conns.end ()) {
118+ key_to_conns.emplace (blpop_key, std::make_unique<std::list<BlockedConnNode>>());
119+ it = key_to_conns.find (blpop_key);
120+ }
121+ it->second ->emplace_back (expire_time, client, type);
122+ }
123+ }
124+
125+ void BaseCmd::ServeAndUnblockConns (PClient* client) {
126+ kiwi::BlockKey key{client->GetCurrentDB (), client->Key ()};
127+
128+ std::lock_guard<std::shared_mutex> map_lock (g_kiwi->GetBlockMtx ());
129+ auto & key_to_conns = g_kiwi->GetMapFromKeyToConns ();
130+ auto it = key_to_conns.find (key);
131+ if (it == key_to_conns.end ()) {
132+ // no client is waitting for this key
133+ return ;
134+ }
135+
136+ auto & waitting_list = it->second ;
137+ std::vector<std::string> elements;
138+ storage::Status s;
139+
140+ // traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
141+ for (auto conn_blocked = waitting_list->begin (); conn_blocked != waitting_list->end ();) {
142+ auto BlockedClient = conn_blocked->GetBlockedClient ();
143+
144+ if (BlockedClient->State () == ClientState::kClosed ) {
145+ conn_blocked = waitting_list->erase (conn_blocked);
146+ g_kiwi->CleanBlockedNodes (BlockedClient);
147+ continue ;
148+ }
149+
150+ switch (conn_blocked->GetCmdType ()) {
151+ case BlockedConnNode::Type::BLPop:
152+ s = STORE_INST.GetBackend (client->GetCurrentDB ())->GetStorage ()->LPop (client->Key (), 1 , &elements);
153+ break ;
154+ case BlockedConnNode::Type::BRPop:
155+ s = STORE_INST.GetBackend (client->GetCurrentDB ())->GetStorage ()->RPop (client->Key (), 1 , &elements);
156+ break ;
157+ case BlockedConnNode::Type::NotAny:
158+ // ! DOING NOTHING?
159+ break ;
160+ }
161+
162+ if (s.ok ()) {
163+ BlockedClient->AppendArrayLen (2 );
164+ BlockedClient->AppendString (client->Key ());
165+ BlockedClient->AppendString (elements[0 ]);
166+ } else if (s.IsNotFound ()) {
167+ // this key has no more elements to serve more blocked conn.
168+ break ;
169+ } else {
170+ BlockedClient->SetRes (CmdRes::kErrOther , s.ToString ());
171+ }
172+ BlockedClient->SendPacket ();
173+ // remove this conn from current waiting list
174+ conn_blocked = waitting_list->erase (conn_blocked);
175+ g_kiwi->CleanBlockedNodes (BlockedClient);
176+ }
177+ }
178+
179+ bool BlockedConnNode::IsExpired (std::chrono::system_clock::time_point now) {
180+ if (expire_time_ == 0 ) {
181+ return false ;
182+ }
183+ int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch ().count ();
184+ return expire_time_ <= now_in_ms;
185+ }
186+
109187bool BaseCmdGroup::DoInitial (PClient* client) {
110188 client->SetSubCmdName (client->argv_ [1 ]);
111189 if (!subCmds_.contains (client->SubCmdName ())) {
0 commit comments