Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions deltachat-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ pub unsafe extern "C" fn dc_event_get_id(event: *mut dc_event_t) -> libc::c_int
EventType::IncomingCallAccepted { .. } => 2560,
EventType::OutgoingCallAccepted { .. } => 2570,
EventType::CallEnded { .. } => 2580,
EventType::TransportsModified => 2600,
#[allow(unreachable_patterns)]
#[cfg(test)]
_ => unreachable!("This is just to silence a rust_analyzer false-positive"),
Expand Down Expand Up @@ -593,7 +594,8 @@ pub unsafe extern "C" fn dc_event_get_data1_int(event: *mut dc_event_t) -> libc:
| EventType::AccountsBackgroundFetchDone
| EventType::ChatlistChanged
| EventType::AccountsChanged
| EventType::AccountsItemChanged => 0,
| EventType::AccountsItemChanged
| EventType::TransportsModified => 0,
EventType::IncomingReaction { contact_id, .. }
| EventType::IncomingWebxdcNotify { contact_id, .. } => contact_id.to_u32() as libc::c_int,
EventType::MsgsChanged { chat_id, .. }
Expand Down Expand Up @@ -681,7 +683,8 @@ pub unsafe extern "C" fn dc_event_get_data2_int(event: *mut dc_event_t) -> libc:
| EventType::IncomingCallAccepted { .. }
| EventType::OutgoingCallAccepted { .. }
| EventType::CallEnded { .. }
| EventType::EventChannelOverflow { .. } => 0,
| EventType::EventChannelOverflow { .. }
| EventType::TransportsModified => 0,
EventType::MsgsChanged { msg_id, .. }
| EventType::ReactionsChanged { msg_id, .. }
| EventType::IncomingReaction { msg_id, .. }
Expand Down Expand Up @@ -780,7 +783,8 @@ pub unsafe extern "C" fn dc_event_get_data2_str(event: *mut dc_event_t) -> *mut
| EventType::AccountsChanged
| EventType::AccountsItemChanged
| EventType::IncomingCallAccepted { .. }
| EventType::WebxdcRealtimeAdvertisementReceived { .. } => ptr::null_mut(),
| EventType::WebxdcRealtimeAdvertisementReceived { .. }
| EventType::TransportsModified => ptr::null_mut(),
EventType::IncomingCall {
place_call_info, ..
} => {
Expand Down
11 changes: 11 additions & 0 deletions deltachat-jsonrpc/src/api/types/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ pub enum EventType {
/// ID of the chat which the message belongs to.
chat_id: u32,
},

/// One or more transports has changed.
///
/// This event is used for tests to detect when transport
/// synchronization messages arrives.
/// UIs don't need to use it, it is unlikely
/// that user modifies transports on multiple
/// devices simultaneously.
TransportsModified,
}

impl From<CoreEventType> for EventType {
Expand Down Expand Up @@ -642,6 +651,8 @@ impl From<CoreEventType> for EventType {
msg_id: msg_id.to_u32(),
chat_id: chat_id.to_u32(),
},
CoreEventType::TransportsModified => TransportsModified,

#[allow(unreachable_patterns)]
#[cfg(test)]
_ => unreachable!("This is just to silence a rust_analyzer false-positive"),
Expand Down
1 change: 1 addition & 0 deletions deltachat-rpc-client/src/deltachat_rpc_client/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class EventType(str, Enum):
CONFIG_SYNCED = "ConfigSynced"
WEBXDC_REALTIME_DATA = "WebxdcRealtimeData"
WEBXDC_REALTIME_ADVERTISEMENT_RECEIVED = "WebxdcRealtimeAdvertisementReceived"
TRANSPORTS_MODIFIED = "TransportsModified"


class ChatId(IntEnum):
Expand Down
45 changes: 45 additions & 0 deletions deltachat-rpc-client/tests/test_multitransport.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from deltachat_rpc_client import EventType
from deltachat_rpc_client.rpc import JsonRpcError


Expand Down Expand Up @@ -156,3 +157,47 @@ def test_reconfigure_transport(acfactory) -> None:
# Reconfiguring the transport should not reset
# the settings as if when configuring the first transport.
assert account.get_config("mvbox_move") == "1"


def test_transport_synchronization(acfactory, log) -> None:
"""Test synchronization of transports between devices."""
ac1, ac2 = acfactory.get_online_accounts(2)
ac1_clone = ac1.clone()
ac1_clone.bring_online()

qr = acfactory.get_account_qr()

ac1.add_transport_from_qr(qr)
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
assert len(ac1.list_transports()) == 2
assert len(ac1_clone.list_transports()) == 2

ac1_clone.add_transport_from_qr(qr)
ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
assert len(ac1.list_transports()) == 3
assert len(ac1_clone.list_transports()) == 3

log.section("ac1 clone removes second transport")
[transport1, transport2, transport3] = ac1_clone.list_transports()
addr3 = transport3["addr"]
ac1_clone.delete_transport(transport2["addr"])

ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
[transport1, transport3] = ac1.list_transports()

log.section("ac1 changes the primary transport")
ac1.set_config("configured_addr", transport3["addr"])

log.section("ac1 removes the first transport")
ac1.delete_transport(transport1["addr"])

ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
[transport3] = ac1_clone.list_transports()
assert transport3["addr"] == addr3
assert ac1_clone.get_config("configured_addr") == addr3

ac2_chat = ac2.create_chat(ac1)
ac2_chat.send_text("Hello!")

assert ac1.wait_for_incoming_msg().get_snapshot().text == "Hello!"
assert ac1_clone.wait_for_incoming_msg().get_snapshot().text == "Hello!"
18 changes: 13 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,11 +819,19 @@ impl Context {
self,
"Creating a pseudo configured account which will not be able to send or receive messages. Only meant for tests!"
);
ConfiguredLoginParam::from_json(&format!(
r#"{{"addr":"{addr}","imap":[],"imap_user":"","imap_password":"","smtp":[],"smtp_user":"","smtp_password":"","certificate_checks":"Automatic","oauth2":false}}"#
))?
.save_to_transports_table(self, &EnteredLoginParam::default())
.await?;
self.sql
.execute(
"INSERT INTO transports (addr, entered_param, configured_param) VALUES (?, ?, ?)",
(
addr,
serde_json::to_string(&EnteredLoginParam::default())?,
format!(r#"{{"addr":"{addr}","imap":[],"imap_user":"","imap_password":"","smtp":[],"smtp_user":"","smtp_password":"","certificate_checks":"Automatic","oauth2":false}}"#)
),
)
.await?;
self.sql
.set_raw_config(Config::ConfiguredAddr.as_ref(), Some(addr))
.await?;
}
self.sql
.transaction(|transaction| {
Expand Down
31 changes: 25 additions & 6 deletions src/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::sync::Sync::*;
use crate::tools::time;
use crate::transport::{
ConfiguredCertificateChecks, ConfiguredLoginParam, ConfiguredServerLoginParam,
ConnectionCandidate,
ConnectionCandidate, send_sync_transports,
};
use crate::{EventType, stock_str};
use crate::{chat, provider};
Expand Down Expand Up @@ -205,6 +205,7 @@ impl Context {
/// Removes the transport with the specified email address
/// (i.e. [EnteredLoginParam::addr]).
pub async fn delete_transport(&self, addr: &str) -> Result<()> {
let now = time();
self.sql
.transaction(|transaction| {
let primary_addr = transaction.query_row(
Expand All @@ -219,12 +220,13 @@ impl Context {
if primary_addr == addr {
bail!("Cannot delete primary transport");
}
let transport_id = transaction.query_row(
"DELETE FROM transports WHERE addr=? RETURNING id",
let (transport_id, add_timestamp) = transaction.query_row(
"DELETE FROM transports WHERE addr=? RETURNING id, add_timestamp",
(addr,),
|row| {
let id: u32 = row.get(0)?;
Ok(id)
let add_timestamp: i64 = row.get(1)?;
Ok((id, add_timestamp))
},
)?;
transaction.execute("DELETE FROM imap WHERE transport_id=?", (transport_id,))?;
Expand All @@ -233,9 +235,23 @@ impl Context {
(transport_id,),
)?;

// Removal timestamp should not be lower than addition timestamp
// to be accepted by other devices when synced.
let remove_timestamp = std::cmp::max(now, add_timestamp);

transaction.execute(
"INSERT INTO removed_transports (addr, remove_timestamp)
VALUES (?, ?)
ON CONFLICT (addr)
DO UPDATE SET remove_timestamp = excluded.remove_timestamp",
(addr, remove_timestamp),
)?;

Ok(())
})
.await?;
send_sync_transports(self).await?;

Ok(())
}

Expand Down Expand Up @@ -552,7 +568,8 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'

progress!(ctx, 900);

if !ctx.is_configured().await? {
let is_configured = ctx.is_configured().await?;
if !is_configured {
ctx.sql.set_raw_config("mvbox_move", Some("0")).await?;
ctx.sql.set_raw_config("only_fetch_mvbox", None).await?;
}
Expand All @@ -563,8 +580,10 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'

let provider = configured_param.provider;
configured_param
.save_to_transports_table(ctx, param)
.clone()
.save_to_transports_table(ctx, param, time())
.await?;
send_sync_transports(ctx).await?;

ctx.set_config_internal(Config::ConfiguredTimestamp, Some(&time().to_string()))
.await?;
Expand Down
9 changes: 9 additions & 0 deletions src/events/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,15 @@ pub enum EventType {
chat_id: ChatId,
},

/// One or more transports has changed.
///
/// This event is used for tests to detect when transport
/// synchronization messages arrives.
/// UIs don't need to use it, it is unlikely
/// that user modifies transports on multiple
/// devices simultaneously.
TransportsModified,

/// Event for using in tests, e.g. as a fence between normally generated events.
#[cfg(test)]
Test,
Expand Down
35 changes: 35 additions & 0 deletions src/receive_imf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,41 @@ pub(crate) async fn receive_imf_inner(
if let Some(ref sync_items) = mime_parser.sync_items {
if from_id == ContactId::SELF {
if mime_parser.was_encrypted() {
// Receiving encrypted message from self updates primary transport.
let from_addr = &mime_parser.from.addr;

let transport_changed = context
.sql
.transaction(|transaction| {
let transport_exists = transaction.query_row(
"SELECT COUNT(*) FROM transports WHERE addr=?",
(from_addr,),
|row| {
let count: i64 = row.get(0)?;
Ok(count > 0)
},
)?;

let transport_changed = if transport_exists {
transaction.execute(
"UPDATE config SET value=? WHERE keyname='configured_addr'",
(from_addr,),
)? > 0
} else {
warn!(
context,
"Received sync message from unknown address {from_addr:?}."
);
false
};
Ok(transport_changed)
})
.await?;
if transport_changed {
info!(context, "Primary transport changed to {from_addr:?}.");
context.sql.uncache_raw_config("configured_addr").await;
}

context
.execute_sync_items(sync_items, mime_parser.timestamp_sent)
.await;
Expand Down
15 changes: 15 additions & 0 deletions src/sql/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,21 @@ CREATE INDEX imap_sync_index ON imap_sync(transport_id, folder);
.await?;
}

inc_and_check(&mut migration_version, 142)?;
if dbversion < migration_version {
sql.execute_migration(
"ALTER TABLE transports
ADD COLUMN add_timestamp INTEGER NOT NULL DEFAULT 0;
CREATE TABLE removed_transports (
addr TEXT NOT NULL,
remove_timestamp INTEGER NOT NULL,
UNIQUE(addr)
) STRICT;",
migration_version,
)
.await?;
}

let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?
Expand Down
54 changes: 52 additions & 2 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ use crate::config::Config;
use crate::constants::Blocked;
use crate::contact::ContactId;
use crate::context::Context;
use crate::log::LogExt;
use crate::log::warn;
use crate::log::{LogExt as _, warn};
use crate::login_param::EnteredLoginParam;
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::sync::SyncData::{AddQrToken, AlterChat, DeleteQrToken};
use crate::token::Namespace;
use crate::tools::time;
use crate::transport::{ConfiguredLoginParamJson, sync_transports};
use crate::{message, stock_str, token};
use std::collections::HashSet;

Expand Down Expand Up @@ -52,6 +53,29 @@ pub(crate) struct QrTokenData {
pub(crate) grpid: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct TransportData {
/// Configured login parameters.
pub(crate) configured: ConfiguredLoginParamJson,

/// Login parameters entered by the user.
///
/// They can be used to reconfigure the transport.
pub(crate) entered: EnteredLoginParam,

/// Timestamp of when the transport was last time (re)configured.
pub(crate) timestamp: i64,
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct RemovedTransportData {
/// Address of the removed transport.
pub(crate) addr: String,

/// Timestamp of when the transport was removed.
pub(crate) timestamp: i64,
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum SyncData {
AddQrToken(QrTokenData),
Expand All @@ -71,6 +95,28 @@ pub(crate) enum SyncData {
DeleteMessages {
msgs: Vec<String>, // RFC724 id (i.e. "Message-Id" header)
},

/// Update transport configuration.
///
/// This message contains a list of all added transports
/// together with their addition timestamp,
/// and all removed transports together with
/// the removal timestamp.
///
/// In case of a tie, addition and removal timestamps
/// being the same, removal wins.
/// It is more likely that transport is added
/// and then removed within a second,
/// but unlikely the other way round
/// as adding new transport takes time
/// to run configuration.
Transports {
/// Active transports.
transports: Vec<TransportData>,

/// Removed transports with the timestamp of removal.
removed_transports: Vec<RemovedTransportData>,
},
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -274,6 +320,10 @@ impl Context {
SyncData::Config { key, val } => self.sync_config(key, val).await,
SyncData::SaveMessage { src, dest } => self.save_message(src, dest).await,
SyncData::DeleteMessages { msgs } => self.sync_message_deletion(msgs).await,
SyncData::Transports {
transports,
removed_transports,
} => sync_transports(self, transports, removed_transports).await,
},
SyncDataOrUnknown::Unknown(data) => {
warn!(self, "Ignored unknown sync item: {data}.");
Expand Down
Loading