@@ -109,9 +109,9 @@ void LogTcpSocketDiagnostics(util::FiberSocketBase* dest) {
109109
110110} // namespace
111111
112- JournalStreamer::JournalStreamer (journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn,
113- bool is_stable_sync )
114- : cntx_(cntx), journal_(journal), is_stable_sync_(is_stable_sync), send_lsn_(send_lsn ) {
112+ JournalStreamer::JournalStreamer (journal::Journal* journal, ExecutionState* cntx,
113+ JournalStreamer::Config config )
114+ : cntx_(cntx), journal_(journal), config_(config ) {
115115 // cache the flag to avoid accessing it later.
116116 replication_stream_output_limit_cached = absl::GetFlag (FLAGS_replication_stream_output_limit);
117117 migration_buckets_sleep_usec_cached = absl::GetFlag (FLAGS_migration_buckets_sleep_usec);
@@ -136,7 +136,7 @@ void JournalStreamer::ConsumeJournalChange(const JournalChangeItem& item) {
136136 time_t now = time (nullptr );
137137 last_lsn_writen_ = item.journal_item .lsn ;
138138 // TODO: to chain it to the previous Write call.
139- if (send_lsn_ == SendLsn::YES && now - last_lsn_time_ > 3 ) {
139+ if (config_. should_sent_lsn && now - last_lsn_time_ > 3 ) {
140140 last_lsn_time_ = now;
141141 io::StringSink sink;
142142 JournalWriter writer (&sink);
@@ -148,14 +148,19 @@ void JournalStreamer::ConsumeJournalChange(const JournalChangeItem& item) {
148148void JournalStreamer::Start (util::FiberSocketBase* dest) {
149149 CHECK (dest_ == nullptr && dest != nullptr );
150150 dest_ = dest;
151- journal_cb_id_ = journal_->RegisterOnChange (this );
151+ // For partial sync we first catch up from journal replication buffer and only then register.
152+ if (config_.start_partial_sync_at == 0 ) {
153+ journal_cb_id_ = journal_->RegisterOnChange (this );
154+ }
152155 StartStalledDataWriterFiber ();
153156}
154157
155158void JournalStreamer::Cancel () {
156159 VLOG (1 ) << " JournalStreamer::Cancel " << cntx_->IsCancelled ();
157160 waker_.notifyAll ();
158- journal_->UnregisterOnChange (journal_cb_id_);
161+ if (journal_cb_id_) {
162+ journal_->UnregisterOnChange (journal_cb_id_);
163+ }
159164 StopStalledDataWriterFiber ();
160165 WaitForInflightToComplete ();
161166}
@@ -182,7 +187,7 @@ void JournalStreamer::Write(std::string str) {
182187}
183188
184189void JournalStreamer::StartStalledDataWriterFiber () {
185- if (is_stable_sync_ && !stalled_data_writer_.IsJoinable ()) {
190+ if (config_. init_from_stable_sync && !stalled_data_writer_.IsJoinable ()) {
186191 auto pb = fb2::ProactorBase::me ();
187192 std::chrono::milliseconds period_us (stalled_writer_base_period_ms);
188193 stalled_data_writer_ = MakeFiber ([this , index = pb->GetPoolIndex (), period_us]() mutable {
@@ -192,8 +197,55 @@ void JournalStreamer::StartStalledDataWriterFiber() {
192197 }
193198}
194199
200+ bool JournalStreamer::MaybePartialStreamLSNs () {
201+ // Same algorithm as SwitchIncrementalFb. The only difference is that we don't sent
202+ // the old LSN"s via a snapshot but rather as journal changes.
203+ if (config_.start_partial_sync_at > 0 ) {
204+ LSN lsn = config_.start_partial_sync_at ;
205+ DCHECK_LE (lsn, journal_->GetLsn ()) << " The replica tried to sync from the future." ;
206+
207+ LOG (INFO) << " Starting partial sync from lsn: " << lsn;
208+ // The replica sends the LSN of the next entry is wants to receive.
209+ while (cntx_->IsRunning () && journal_->IsLSNInBuffer (lsn)) {
210+ JournalChangeItem item;
211+ item.journal_item .data = journal_->GetEntry (lsn);
212+ item.journal_item .lsn = lsn;
213+ ConsumeJournalChange (item);
214+ lsn++;
215+ }
216+
217+ if (!cntx_->IsRunning ()) {
218+ return false ;
219+ }
220+
221+ if (journal_->GetLsn () != lsn) {
222+ // We stopped but we didn't manage to send the whole stream.
223+ cntx_->ReportError (
224+ std::make_error_code (errc::state_not_recoverable),
225+ absl::StrCat (" Partial sync was unsuccessful because entry #" , lsn,
226+ " was dropped from the buffer. Current lsn=" , journal_->GetLsn ()));
227+ return false ;
228+ }
229+
230+ // We are done, register back to the journal so we don't miss any changes
231+ journal_cb_id_ = journal_->RegisterOnChange (this );
232+
233+ LOG (INFO) << " Last LSN sent in partial sync was " << (lsn - 1 );
234+ // flush pending
235+ if (pending_buf_.Size () != 0 ) {
236+ AsyncWrite (true );
237+ }
238+ }
239+ return true ;
240+ }
241+
195242void JournalStreamer::StalledDataWriterFiber (std::chrono::milliseconds period_ms,
196243 util::fb2::Done* waiter) {
244+ if (!MaybePartialStreamLSNs ()) {
245+ // Either context got cancelled, or partial sync failed because the lsn's stalled.
246+ return ;
247+ }
248+
197249 while (cntx_->IsRunning ()) {
198250 if (waiter->WaitFor (period_ms)) {
199251 if (!cntx_->IsRunning ()) {
@@ -222,7 +274,7 @@ void JournalStreamer::AsyncWrite(bool force_send) {
222274
223275 // Writing in stable sync and outside of fiber needs to check
224276 // threshold before writing data.
225- if (is_stable_sync_ && !force_send &&
277+ if (config_. init_from_stable_sync && !force_send &&
226278 pending_buf_.FrontBufSize () < replication_dispatch_threshold) {
227279 return ;
228280 }
@@ -332,7 +384,7 @@ void JournalStreamer::WaitForInflightToComplete() {
332384}
333385
334386void JournalStreamer::StopStalledDataWriterFiber () {
335- if (is_stable_sync_ && stalled_data_writer_.IsJoinable ()) {
387+ if (config_. init_from_stable_sync && stalled_data_writer_.IsJoinable ()) {
336388 stalled_data_writer_done_.Notify ();
337389 if (stalled_data_writer_.IsJoinable ()) {
338390 stalled_data_writer_.Join ();
@@ -346,9 +398,7 @@ bool JournalStreamer::IsStalled() const {
346398
347399RestoreStreamer::RestoreStreamer (DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
348400 ExecutionState* cntx)
349- : JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO, false ),
350- db_slice_ (slice),
351- my_slots_(std::move(slots)) {
401+ : JournalStreamer(journal, cntx, {}), db_slice_(slice), my_slots_(std::move(slots)) {
352402 DCHECK (slice != nullptr );
353403 migration_buckets_serialization_threshold_cached =
354404 absl::GetFlag (FLAGS_migration_buckets_serialization_threshold);
0 commit comments