diff --git a/meson_options.txt b/meson_options.txt index 90267c0..1ca5810 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -1,3 +1,3 @@ option('latest_apis', type: 'boolean', value: false) -option('fiveg_api_approval', type: 'string', value: '110') +option('fiveg_api_approval', type: 'string', value: '111') option('fiveg_api_release', type: 'string', value: '18') diff --git a/src/mbsf/ActivePeriods.cc b/src/mbsf/ActivePeriods.cc index c538cf2..476b4f7 100644 --- a/src/mbsf/ActivePeriods.cc +++ b/src/mbsf/ActivePeriods.cc @@ -30,163 +30,197 @@ #include "App.hh" #include "Context.hh" #include "ActivePeriodsBase.hh" +#include "ServiceScheduleDesc.hh" +#include "UserDataIngSession.hh" +#include "utilities.hh" #include "openapi/model/TimeWindow.h" #include "openapi/model/DistSessionState.h" #include "openapi/model/MBSUserDataIngSession.h" -namespace reftools::mbsf { - class TimeWindow; - class MBSUserDataIngSession; - class DistSessionState; -} +#include "ActivePeriods.hh" using reftools::mbsf::TimeWindow; using reftools::mbsf::MBSUserDataIngSession; using reftools::mbsf::DistSessionState; -#include "ActivePeriods.hh" - - MBSF_NAMESPACE_START using TimestampAndActiveFlag = ActivePeriodsBase::TimestampAndActiveFlag; using ActPeriodsType = MBSUserDataIngSession::ActPeriodsType; +static DistSessionState dist_session_state_active; +static DistSessionState dist_session_state_inactive; +static DistSessionState dist_session_state_established; +static DistSessionState dist_session_state_no_val; + static std::optional parse_date_time(const std::string& date_time); static fiveg_mag_reftools::remove_std_optional::type extract_time_windows(const ActPeriodsType &actPeriods); static fiveg_mag_reftools::remove_std_optional::type extract_time_windows(ActPeriodsType &&actPeriods); -static void convert_act_periods(const ActPeriodsType& act_periods, std::list &act_periods_tp); -static void convert_act_periods(ActPeriodsType&& act_periods, std::list &act_periods_tp); -static void convert_act_periods(const fiveg_mag_reftools::remove_std_optional::type& act_periods_list, - std::list &act_periods_tp); +static void ensure_dist_session_state_statics(); -ActivePeriods::ActivePeriods(const ActPeriodsType &act_periods) +ActivePeriods::ActivePeriods(const ActPeriodsType &act_periods, const std::shared_ptr &old_active_periods, UserDataIngSession &user_data_ing_session) + :ActivePeriodsBase(user_data_ing_session.userDataIngSessionId()) + ,m_actPeriodsTP() { - convert_act_periods(act_periods, m_actPeriodsTP); + convertActPeriods(act_periods, old_active_periods, user_data_ing_session); } -ActivePeriods::ActivePeriods(ActPeriodsType &&act_periods) +ActivePeriods::ActivePeriods(ActPeriodsType &&act_periods, const std::shared_ptr &old_active_periods, UserDataIngSession &user_data_ing_session) + :ActivePeriodsBase(user_data_ing_session.userDataIngSessionId()) + ,m_actPeriodsTP() { - convert_act_periods(std::move(act_periods), m_actPeriodsTP); + convertActPeriods(std::move(act_periods), old_active_periods, user_data_ing_session); } const DistSessionState &ActivePeriods::currentState(const MbsDistSessStateType &dist_sess_state) const { std::chrono::seconds establish_pre_start_seconds{App::self().context()->actPeriodEstablishedStateDuration}; std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); - static DistSessionState dist_session_state; + + ensure_dist_session_state_statics(); // Evaluate first window with end > now for (auto &tw : m_actPeriodsTP) { - const auto& start = tw.first; - const auto& end = tw.second; + const auto& start = tw.start; + const auto& end = tw.end; - if (end <= now) continue; // already finished window + if (end <= now) continue; // already finished window, skip to next window if (start <= now) { - dist_session_state = DistSessionState::VAL_ACTIVE; - return dist_session_state; + // Inside current window, current state is active + //ogs_debug("Current state is ACTIVE"); + return dist_session_state_active; } // start > now + // not in a window, next window starts in the future auto pre_establish_time = start - establish_pre_start_seconds; if (pre_establish_time <= now) { - dist_session_state = DistSessionState::VAL_ESTABLISHED; - return dist_session_state; + // After pre_establish_time, so within the establishing period before the next window, current state is established + //ogs_debug("Current state is ESTABLISHED"); + return dist_session_state_established; } else { - - dist_session_state = DistSessionState::VAL_INACTIVE; - return dist_session_state; + // Before pre_establish_time, so within the inactive time between windows + //ogs_debug("Current state is INACTIVE"); + return dist_session_state_inactive; } - - } - dist_session_state = DistSessionState::VAL_INACTIVE; - return dist_session_state; + + // we are past all windows so the current state is inactive + //ogs_debug("Past all windows, state is INACTIVE"); + return dist_session_state_inactive; } -TimestampAndActiveFlag ActivePeriods::nextTransition () const +TimestampAndActiveFlag ActivePeriods::nextTransition() const { std::chrono::seconds establish_pre_start_seconds{App::self().context()->actPeriodEstablishedStateDuration}; std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); - static DistSessionState dist_session_state; + DistSessionState dist_session_state; // Evaluate first window with end > now for (auto &tw : m_actPeriodsTP) { - const auto& start = tw.first; - const auto& end = tw.second; - - if (end <= now) continue; // already finished window + const auto& end = tw.end; + if (end <= now) continue; // already finished this window, go to next + const auto& start = tw.start; if (start <= now) { + // In the window, so the next state change would be to inactive at the window end time dist_session_state = DistSessionState::VAL_INACTIVE; + //ogs_debug("%s", std::format("In the window, so the next state change would be to inactive at {}", end).c_str()); return {end, dist_session_state}; } // start > now + // outside a window, next window starts in the future auto pre_establish_time = start - establish_pre_start_seconds; if (pre_establish_time <= now) { - + // Within establish_pre_start_seconds of the start time, so the next transition is active at the start time dist_session_state = DistSessionState::VAL_ACTIVE; + //ogs_debug("%s", std::format("Within {0} of {1}, so the next transition is active at {1}", establish_pre_start_seconds, start).c_str()); return {start, dist_session_state}; } else { - + // Before establish_pre_start_seconds before the start time, so next transition is established at pre_establish_time dist_session_state = DistSessionState::VAL_ESTABLISHED; + //ogs_debug("%s", std::format("Before {} of {}, so next transition is established at {}", establish_pre_start_seconds, start, pre_establish_time).c_str()); return {pre_establish_time, dist_session_state}; } - } + + // All windows finished, remain inactive dist_session_state = DistSessionState::VAL_INACTIVE; return {std::nullopt, dist_session_state}; } -static void convert_act_periods(const ActPeriodsType& act_periods, std::list &act_periods_tp) +std::optional > > ActivePeriods::serviceScheduleDescriptions() const +{ + if (m_actPeriodsTP.empty()) return std::nullopt; + + std::list > ret_val; + + for (const auto &ver_time_win : m_actPeriodsTP) { + ret_val.emplace_back(new ServiceScheduleDesc(m_id, ver_time_win.version, time_point_to_iso8601_utc_str(ver_time_win.start), time_point_to_iso8601_utc_str(ver_time_win.end))); + } + + return ret_val; +} + +void ActivePeriods::convertActPeriods(const ActPeriodsType& act_periods, + const std::shared_ptr &old_active_periods, + UserDataIngSession &user_data_ing_session) { auto act_periods_list = extract_time_windows(act_periods); - return convert_act_periods(act_periods_list, act_periods_tp); + return convertActPeriods(act_periods_list, old_active_periods, user_data_ing_session); } -static void convert_act_periods(ActPeriodsType&& act_periods, std::list &act_periods_tp) +void ActivePeriods::convertActPeriods(ActPeriodsType&& act_periods, + const std::shared_ptr &old_active_periods, + UserDataIngSession &user_data_ing_session) { auto act_periods_list = extract_time_windows(std::move(act_periods)); - return convert_act_periods(act_periods_list, act_periods_tp); + return convertActPeriods(act_periods_list, old_active_periods, user_data_ing_session); } -static void convert_act_periods(const fiveg_mag_reftools::remove_std_optional::type& act_periods_list, - std::list &act_periods_tp) +void ActivePeriods::convertActPeriods(const fiveg_mag_reftools::remove_std_optional::type& act_periods_list, + const std::shared_ptr &old_active_periods, + UserDataIngSession &user_data_ing_session) { - act_periods_tp.clear(); + m_actPeriodsTP.clear(); + auto old_act_periods = std::dynamic_pointer_cast(old_active_periods); for (const auto &time_window : act_periods_list) { - if (!time_window.has_value()) - { - continue; - } + if (!time_window || !time_window.value()) continue; - const std::shared_ptr& time_win = *time_window; - if (!time_win) - { - continue; - } + const std::shared_ptr& time_win = time_window.value(); std::optional start_time = parse_date_time(time_win->getStartTime()); std::optional stop_time = parse_date_time(time_win->getStopTime()); - if (!start_time.has_value() || !stop_time.has_value()) continue; + if (!start_time || !stop_time) continue; - if (start_time.has_value() && stop_time.has_value()) { - act_periods_tp.emplace_back(std::make_pair(start_time.value(), stop_time.value())); + ActivePeriods::VersionedTimeWindowTP tw{.start = start_time.value(), .end = stop_time.value()}; + if (old_act_periods) { + auto it = std::find_if(old_act_periods->m_actPeriodsTP.begin(), old_act_periods->m_actPeriodsTP.end(), [&tw](const ActivePeriods::VersionedTimeWindowTP &other) { return other.start == tw.start && other.end == tw.end; }); + if (it != old_act_periods->m_actPeriodsTP.end()) { + tw.version = it->version; + } else { + tw.version = user_data_ing_session.serviceScheduleDescVersion(); + } + } else { + tw.version = user_data_ing_session.serviceScheduleDescVersion(); } + + m_actPeriodsTP.push_back(std::move(tw)); } - act_periods_tp.sort([](auto const& a, auto const& b) { - return a.first < b.first; + m_actPeriodsTP.sort([](auto const& a, auto const& b) { + return a.start < b.start; }); } static std::optional parse_date_time(const std::string& date_time) { +#if 0 std::string dt = date_time; // Remove trailing 'Z' if present @@ -220,6 +254,11 @@ static std::optional parse_date_time(cons auto tp = std::chrono::system_clock::from_time_t(time); tp += std::chrono::milliseconds(milliseconds); return tp; +#else + std::chrono::system_clock::time_point result = to_time_point_iso8601(date_time); + if (result == std::chrono::system_clock::time_point{}) return std::nullopt; + return result; +#endif } // conversion function @@ -238,7 +277,14 @@ static fiveg_mag_reftools::remove_std_optional::type extract_tim return fiveg_mag_reftools::remove_std_optional::type(std::move(*actPeriods)); } - +static void ensure_dist_session_state_statics() +{ + if (dist_session_state_active.getValue() != DistSessionState::VAL_ACTIVE) { + dist_session_state_active = DistSessionState::VAL_ACTIVE; + dist_session_state_inactive = DistSessionState::VAL_INACTIVE; + dist_session_state_established = DistSessionState::VAL_ESTABLISHED; + } +} MBSF_NAMESPACE_STOP diff --git a/src/mbsf/ActivePeriods.hh b/src/mbsf/ActivePeriods.hh index c67ecd3..29896c0 100644 --- a/src/mbsf/ActivePeriods.hh +++ b/src/mbsf/ActivePeriods.hh @@ -3,8 +3,9 @@ /****************************************************************************** * 5G-MAG Reference Tools: MBS Function: MBS Active Periods class ****************************************************************************** - * Copyright: (C)2025 British Broadcasting Corporation + * Copyright: (C)2025-2026 British Broadcasting Corporation * Author(s): Dev Audsin + * David Waring * License: 5G-MAG Public License v1 * * Licensed under the License terms and conditions for use, reproduction, and @@ -41,9 +42,9 @@ namespace reftools::mbsf { class DistSessionState; } - MBSF_NAMESPACE_START +class ServiceScheduleDesc; class UserDataIngSession; class ActivePeriods: public ActivePeriodsBase { @@ -51,24 +52,33 @@ public: using TimestampAndActiveFlag = ActivePeriodsBase::TimestampAndActiveFlag; using DistSessionState = ActivePeriodsBase::DistSessionState; - using TimeWindowTP = std::pair; + struct VersionedTimeWindowTP { + std::chrono::system_clock::time_point start; + std::chrono::system_clock::time_point end; + int32_t version; + }; using ActPeriodsType = reftools::mbsf::MBSUserDataIngSession::ActPeriodsType; using MbsDistSessStateType = reftools::mbsf::MBSDistributionSessionInfo::MbsDistSessStateType; - using versionedActivePeriod = std::pair>; - ActivePeriods(const ActPeriodsType &act_periods); - ActivePeriods(ActPeriodsType &&act_periods); + ActivePeriods(const ActPeriodsType &act_periods, const std::shared_ptr &old_active_periods, UserDataIngSession &user_data_ing_session); + ActivePeriods(ActPeriodsType &&act_periods, const std::shared_ptr &old_active_periods, UserDataIngSession &user_data_ing_session); virtual ~ActivePeriods() {}; + virtual const DistSessionState ¤tState(const MbsDistSessStateType &dist_session_state) const; virtual TimestampAndActiveFlag nextTransition() const; - - ActivePeriods &versionedActPeriods(std::list versioned_act_periods) { m_versionedActPeriods = versioned_act_periods; return *this;}; - const std::list &versionedActPeriods() const { return m_versionedActPeriods;}; + virtual std::optional > > serviceScheduleDescriptions() const; private: - std::list m_actPeriodsTP; - std::list m_versionedActPeriods; + void convertActPeriods(const ActPeriodsType& act_periods, const std::shared_ptr &old_active_periods, + UserDataIngSession &user_data_ing_session); + void convertActPeriods(ActPeriodsType&& act_periods, const std::shared_ptr &old_active_periods, + UserDataIngSession &user_data_ing_session); + void convertActPeriods(const fiveg_mag_reftools::remove_std_optional::type& act_periods_list, + const std::shared_ptr &old_active_periods, + UserDataIngSession &user_data_ing_session); + + std::list m_actPeriodsTP; }; MBSF_NAMESPACE_STOP diff --git a/src/mbsf/ActivePeriodsBase.hh b/src/mbsf/ActivePeriodsBase.hh index 57a053b..b51ad05 100644 --- a/src/mbsf/ActivePeriodsBase.hh +++ b/src/mbsf/ActivePeriodsBase.hh @@ -3,8 +3,9 @@ /****************************************************************************** * 5G-MAG Reference Tools: MBS Function: MBS Active Periods Base class ****************************************************************************** - * Copyright: (C)2025 British Broadcasting Corporation + * Copyright: (C)2025-2026 British Broadcasting Corporation * Author(s): Dev Audsin + * David Waring * License: 5G-MAG Public License v1 * * Licensed under the License terms and conditions for use, reproduction, and @@ -33,27 +34,25 @@ MBSF_NAMESPACE_START +class ServiceScheduleDesc; class ActivePeriodsBase { public: - using DistSessionState = reftools::mbsf::DistSessionState; using SysTimeMS = std::chrono::system_clock::time_point; using TimestampAndActiveFlag = std::pair, DistSessionState >; using ActPeriodsType = reftools::mbsf::MBSUserDataIngSession::ActPeriodsType; using MbsDistSessStateType = reftools::mbsf::MBSDistributionSessionInfo::MbsDistSessStateType; + ActivePeriodsBase(const std::string &user_data_ing_sess_id) : m_id(user_data_ing_sess_id) {}; + virtual ~ActivePeriodsBase() = default; virtual const DistSessionState ¤tState(const MbsDistSessStateType &dist_session_state) const = 0; virtual TimestampAndActiveFlag nextTransition() const = 0; + virtual std::optional > > serviceScheduleDescriptions() const = 0; -/* protected: - std::shared_ptr m_timestampAndActiveFlag; -*/ - -private: - + std::string m_id; }; MBSF_NAMESPACE_STOP diff --git a/src/mbsf/ActivePeriodsRepRule.cc b/src/mbsf/ActivePeriodsRepRule.cc index 2d14dce..5a91a4e 100644 --- a/src/mbsf/ActivePeriodsRepRule.cc +++ b/src/mbsf/ActivePeriodsRepRule.cc @@ -31,6 +31,7 @@ #include "App.hh" #include "Context.hh" #include "ActivePeriodsBase.hh" +#include "ServiceScheduleDesc.hh" #include "openapi/model/RepetitionRule.h" #include "openapi/model/DistSessionState.h" @@ -55,115 +56,130 @@ using ActPeriodsRepRuleType = MBSUserDataIngSession::ActPeriodsRepRuleType; static std::optional parse_date_time(const std::string& date_time); -ActivePeriodsRepRule::ActivePeriodsRepRule(const ActPeriodsRepRuleType &act_periods_rep_rule) - :m_repetitionRule(nullptr) +static DistSessionState dist_session_state_active; +static DistSessionState dist_session_state_inactive; +static DistSessionState dist_session_state_established; +static DistSessionState dist_session_state_no_val; + +static void ensure_dist_session_state_statics(); + +ActivePeriodsRepRule::ActivePeriodsRepRule(const ActPeriodsRepRuleType &act_periods_rep_rule, const std::shared_ptr &old_active_periods, UserDataIngSession &user_data_ingest_session) + :ActivePeriodsBase(user_data_ingest_session.userDataIngSessionId()) + ,m_repetitionRule(nullptr) { if (act_periods_rep_rule.has_value()) { - m_repetitionRule = act_periods_rep_rule.value(); + uint32_t version; + auto old_act_periods_rep_rule = std::dynamic_pointer_cast(old_active_periods); + if (old_act_periods_rep_rule && *old_act_periods_rep_rule->m_repetitionRule->second == *act_periods_rep_rule.value()) { + version = old_act_periods_rep_rule->m_repetitionRule->first; + } else { + version = user_data_ingest_session.serviceScheduleDescVersion(); + } + m_repetitionRule.reset(new VersionedRepetitionRule{version, act_periods_rep_rule.value()}); } } const DistSessionState &ActivePeriodsRepRule::currentState(const MbsDistSessStateType &dist_sess_state) const { + ensure_dist_session_state_statics(); + + if (!m_repetitionRule) return dist_session_state_no_val; + std::chrono::seconds establish_pre_start_seconds{App::self().context()->actPeriodEstablishedStateDuration}; std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); - static DistSessionState dist_session_state; - std::optional start = parse_date_time(m_repetitionRule->getStartTime()); - int32_t duration = m_repetitionRule->getDuration(); - int32_t repetition_interval = m_repetitionRule->getRepetitionInterval(); + std::optional start = parse_date_time(m_repetitionRule->second->getStartTime()); + if (!start.has_value()) { + ogs_debug("REP RULE START TIME HAS NO VAL"); + return dist_session_state_no_val; + } + + int32_t duration = m_repetitionRule->second->getDuration(); + int32_t repetition_interval = m_repetitionRule->second->getRepetitionInterval(); std::chrono::seconds dur{duration}; std::chrono::seconds rep_interval{repetition_interval}; - if (!start.has_value()) { - - ogs_info("REP RULE START TIME HAS NO VAL"); - dist_session_state = DistSessionState::NO_VAL; - return dist_session_state; - } - // Calculate time difference from the rule's start auto diff = now - start.value(); if (diff < -establish_pre_start_seconds) { - dist_session_state = DistSessionState::VAL_INACTIVE; - return dist_session_state; - } else if (diff < std::chrono::milliseconds::zero()) { - dist_session_state = DistSessionState::VAL_ESTABLISHED; - return dist_session_state; + // rule has not started, and is more than establish_pre_start_seconds before the start + return dist_session_state_inactive; + } else if (diff < std::chrono::system_clock::duration::zero()) { + // rule has not started, but is within establish_pre_start_seconds of the start + return dist_session_state_established; } else { - + // rule has started, so get our offset into the current rule window auto offset = std::chrono::duration_cast(diff % rep_interval); if (offset < dur) { - dist_session_state = DistSessionState::VAL_ACTIVE; - return dist_session_state; + // within the first dur seconds of the current window, so the window is active + return dist_session_state_active; } else if (offset < rep_interval - establish_pre_start_seconds) { - dist_session_state = DistSessionState::VAL_INACTIVE; - return dist_session_state; + // greater than dur seconds but earlier than establish_pre_start_seconds before the next window, so inactive + return dist_session_state_inactive; } else { - dist_session_state = DistSessionState::VAL_ESTABLISHED; - return dist_session_state; + // within establish_pre_start_seconds of the next window, so we need to establish the session + return dist_session_state_established; } - dist_session_state = DistSessionState::VAL_INACTIVE; - return dist_session_state; - - } - dist_session_state = DistSessionState::VAL_INACTIVE; - return dist_session_state; + // Shouldn't reach here, but just incase + return dist_session_state_inactive; } - TimestampAndActiveFlag ActivePeriodsRepRule::nextTransition () const { std::chrono::seconds establish_pre_start_seconds{App::self().context()->actPeriodEstablishedStateDuration}; std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); - static DistSessionState dist_session_state; - std::optional start = parse_date_time(m_repetitionRule->getStartTime()); - int32_t duration = m_repetitionRule->getDuration(); - int32_t repetition_interval = m_repetitionRule->getRepetitionInterval(); - - std::chrono::seconds dur{duration}; - std::chrono::seconds rep_interval{repetition_interval}; + ensure_dist_session_state_statics(); + std::optional start = parse_date_time(m_repetitionRule->second->getStartTime()); if (!start.has_value()) { - dist_session_state = DistSessionState::NO_VAL; - return {std::nullopt, dist_session_state}; + return {std::nullopt, dist_session_state_no_val}; } + int32_t duration = m_repetitionRule->second->getDuration(); + int32_t repetition_interval = m_repetitionRule->second->getRepetitionInterval(); + + std::chrono::seconds dur{duration}; + std::chrono::seconds rep_interval{repetition_interval}; + // Calculate time difference from the rule's start auto diff = now - start.value(); if (diff < -establish_pre_start_seconds) { - dist_session_state = DistSessionState::VAL_ESTABLISHED; - return {(start.value() - establish_pre_start_seconds), dist_session_state} ; - } else if (diff < std::chrono::seconds::zero()) { - dist_session_state = DistSessionState::VAL_ACTIVE; - return {start.value(), dist_session_state}; + // rule has not started and more than establish_pre_start_seconds before the rule start + return {(start.value() - establish_pre_start_seconds), dist_session_state_established} ; + } else if (diff < std::chrono::system_clock::duration::zero()) { + // rule has not started but is within establish_pre_start_seconds if the start + return {start.value(), dist_session_state_active}; } else { auto offset = std::chrono::duration_cast(diff % rep_interval); - auto active_start = now - offset; + auto active_start = start.value() + (rep_interval * (diff / rep_interval)); if (offset < dur) { - dist_session_state = DistSessionState::VAL_INACTIVE; - return {(active_start + dur), dist_session_state}; + return {(active_start + dur), dist_session_state_inactive}; } else if (offset < (rep_interval - establish_pre_start_seconds)) { - dist_session_state = DistSessionState::VAL_ESTABLISHED; - return {active_start + rep_interval - establish_pre_start_seconds, dist_session_state}; + return {active_start + rep_interval - establish_pre_start_seconds, dist_session_state_established}; } else { - dist_session_state = DistSessionState::VAL_ACTIVE; - return {active_start + rep_interval, dist_session_state}; - } + return {active_start + rep_interval, dist_session_state_active}; + } } - dist_session_state = DistSessionState::VAL_INACTIVE; - return {std::nullopt, dist_session_state}; + return {std::nullopt, dist_session_state_inactive}; +} + +std::optional > > ActivePeriodsRepRule::serviceScheduleDescriptions() const +{ + if (!m_repetitionRule) return std::nullopt; + + std::shared_ptr ssd(new ServiceScheduleDesc(m_id, m_repetitionRule->first, m_repetitionRule->second)); + return std::list >{ssd}; } static std::optional parse_date_time(const std::string& date_time) { @@ -206,6 +222,15 @@ static std::optional parse_date_time(cons return tp; } +static void ensure_dist_session_state_statics() +{ + if (dist_session_state_active.getValue() != DistSessionState::VAL_ACTIVE) { + dist_session_state_active = DistSessionState::VAL_ACTIVE; + dist_session_state_inactive = DistSessionState::VAL_INACTIVE; + dist_session_state_established = DistSessionState::VAL_ESTABLISHED; + } +} + MBSF_NAMESPACE_STOP /* vim:ts=8:sts=4:sw=4:expandtab: diff --git a/src/mbsf/ActivePeriodsRepRule.hh b/src/mbsf/ActivePeriodsRepRule.hh index 98e116b..bdd877e 100644 --- a/src/mbsf/ActivePeriodsRepRule.hh +++ b/src/mbsf/ActivePeriodsRepRule.hh @@ -3,8 +3,9 @@ /****************************************************************************** * 5G-MAG Reference Tools: MBS Function: MBS Active Periods Rep Rule class ****************************************************************************** - * Copyright: (C)2025 British Broadcasting Corporation + * Copyright: (C)2025-2026 British Broadcasting Corporation * Author(s): Dev Audsin + * David Waring * License: 5G-MAG Public License v1 * * Licensed under the License terms and conditions for use, reproduction, and @@ -25,6 +26,8 @@ #include #include +#include +#include #include #include "openapi/model/MBSUserDataIngSession.h" @@ -41,9 +44,10 @@ namespace reftools::mbsf { class DistSessionState; } - MBSF_NAMESPACE_START +class ServiceScheduleDesc; + class ActivePeriodsRepRule: public ActivePeriodsBase { public: @@ -52,24 +56,17 @@ public: using ActPeriodsRepRuleType = reftools::mbsf::MBSUserDataIngSession::ActPeriodsRepRuleType; using MbsDistSessStateType = reftools::mbsf::MBSDistributionSessionInfo::MbsDistSessStateType; using RepetitionRule = reftools::mbsf::RepetitionRule; - using versionedRepetitionRule = std::pair>; + using VersionedRepetitionRule = std::pair >; - ActivePeriodsRepRule(const ActPeriodsRepRuleType &act_periods_rep_rule); + ActivePeriodsRepRule(const ActPeriodsRepRuleType &act_periods_rep_rule, const std::shared_ptr &old_active_periods, UserDataIngSession &user_data_ingest_session); virtual ~ActivePeriodsRepRule() {}; virtual const DistSessionState ¤tState(const MbsDistSessStateType &dist_session_state) const; virtual TimestampAndActiveFlag nextTransition() const; - - ActivePeriodsRepRule &versionedActivePeriodsRepRule(std::shared_ptr versioned_repetition_rule ) { m_versionedRepetitionRule = versioned_repetition_rule; return *this;}; - const std::shared_ptr &versionedActivePeriodsRepRule() const { return m_versionedRepetitionRule;}; - std::shared_ptr &versionedActivePeriodsRepRule() { return m_versionedRepetitionRule;}; - - + virtual std::optional > > serviceScheduleDescriptions() const; private: - std::shared_ptr< RepetitionRule > m_repetitionRule; - std::shared_ptr m_versionedRepetitionRule; - + std::shared_ptr m_repetitionRule; }; MBSF_NAMESPACE_STOP diff --git a/src/mbsf/AlwaysActive.cc b/src/mbsf/AlwaysActive.cc index 6dfefde..3bf672b 100644 --- a/src/mbsf/AlwaysActive.cc +++ b/src/mbsf/AlwaysActive.cc @@ -24,21 +24,16 @@ #include #include "common.hh" +#include "ActivePeriodsBase.hh" #include "App.hh" #include "openapi/model/MBSUserDataIngSession.h" #include "openapi/model/TimeWindow.h" #include "openapi/model/DistSessionState.h" #include "openapi/model/MBSDistributionSessionInfo.h" -#include "ActivePeriodsBase.hh" +#include "ServiceScheduleDesc.hh" #include "AlwaysActive.hh" -namespace reftools::mbsf { - class TimeWindow; - class MBSUserDataIngSession; - class DistSessionState; -} - using reftools::mbsf::TimeWindow; using reftools::mbsf::MBSUserDataIngSession; using reftools::mbsf::DistSessionState; @@ -64,6 +59,11 @@ TimestampAndActiveFlag AlwaysActive::nextTransition() const { return {std::nullopt, empty}; } +std::optional > > AlwaysActive::serviceScheduleDescriptions() const +{ + return std::nullopt; +} + MBSF_NAMESPACE_STOP /* vim:ts=8:sts=4:sw=4:expandtab: diff --git a/src/mbsf/AlwaysActive.hh b/src/mbsf/AlwaysActive.hh index 49ba6e1..cec185b 100644 --- a/src/mbsf/AlwaysActive.hh +++ b/src/mbsf/AlwaysActive.hh @@ -3,8 +3,9 @@ /****************************************************************************** * 5G-MAG Reference Tools: MBS Function: MBS AlwaysActive class ****************************************************************************** - * Copyright: (C)2025 British Broadcasting Corporation + * Copyright: (C)2025-2026 British Broadcasting Corporation * Author(s): Dev Audsin + * David Waring * License: 5G-MAG Public License v1 * * Licensed under the License terms and conditions for use, reproduction, and @@ -41,26 +42,24 @@ namespace reftools::mbsf { class DistSessionState; } - MBSF_NAMESPACE_START -class AlwaysActive: public ActivePeriodsBase { +class ServiceScheduleDesc; +class AlwaysActive : public ActivePeriodsBase { public: - using TimestampAndActiveFlag = ActivePeriodsBase::TimestampAndActiveFlag; using DistSessionState = ActivePeriodsBase::DistSessionState; using ActPeriodsType = reftools::mbsf::MBSUserDataIngSession::ActPeriodsType; using MbsDistSessStateType = reftools::mbsf::MBSDistributionSessionInfo::MbsDistSessStateType; - AlwaysActive() {}; + AlwaysActive() = delete; + AlwaysActive(const std::string &user_data_ing_sess_id) : ActivePeriodsBase(user_data_ing_sess_id) {}; virtual ~AlwaysActive() {}; virtual const DistSessionState ¤tState(const MbsDistSessStateType &dist_session_state) const; virtual TimestampAndActiveFlag nextTransition() const; - -private: - + virtual std::optional > > serviceScheduleDescriptions() const; }; MBSF_NAMESPACE_STOP diff --git a/src/mbsf/Context.cc b/src/mbsf/Context.cc index dea51a2..01b73fe 100644 --- a/src/mbsf/Context.cc +++ b/src/mbsf/Context.cc @@ -93,6 +93,10 @@ Context::~Context() m_userDataIngStatSubscs.clear(); } + if (notificationBindAddress) { + ogs_freeaddrinfo(notificationBindAddress); + notificationBindAddress = nullptr; + } } bool Context::parseConfig() diff --git a/src/mbsf/DistributionSessionInfo.cc b/src/mbsf/DistributionSessionInfo.cc index 45fb46a..b8af86b 100644 --- a/src/mbsf/DistributionSessionInfo.cc +++ b/src/mbsf/DistributionSessionInfo.cc @@ -70,6 +70,7 @@ using fiveg_mag_reftools::CJson; using fiveg_mag_reftools::ModelException; +using reftools::mbsf::DistributionMethod; using reftools::mbsf::DistSessionState; using reftools::mbsf::DistSessionEventReportList; using reftools::mbsf::Event; @@ -96,8 +97,10 @@ DistributionSessionInfo::DistributionSessionInfo(CJson &json, bool as_request) ,m_mutex() ,m_dataIngestSessionEstablished(false) ,m_dataIngestSessionTerminated(false) - + ,m_dataIngestSessionActivated(false) + ,m_dataIngestSessionDeactivated(false) { + validate(); } DistributionSessionInfo::DistributionSessionInfo(const std::shared_ptr &mbs_distribution_session_info) @@ -110,8 +113,10 @@ DistributionSessionInfo::DistributionSessionInfo(const std::shared_ptr distribution_session_event_type = dist_sess_event_report->getEventType(); DistSessionEventType dist_session_event_type = *distribution_session_event_type; switch (dist_session_event_type) { + case DistSessionEventType::VAL_SERVICE_MANAGEMENT_FAILURE: + processServiceManagementFailure(dist_sess_event_report); + break; case DistSessionEventType::VAL_DATA_INGEST_FAILURE: processDataIngestFailure(dist_sess_event_report); break; case DistSessionEventType::VAL_DATA_INGEST_SESSION_ESTABLISHED: m_dataIngestSessionEstablished = true; + contextReportedState(ing_sess, DistSessionState::VAL_ESTABLISHED); break; case DistSessionEventType::VAL_DATA_INGEST_SESSION_TERMINATED: m_dataIngestSessionTerminated = true; + contextReportedState(ing_sess, DistSessionState::VAL_DEACTIVATING); + break; + case DistSessionEventType::VAL_SESSION_DEACTIVATED: + m_dataIngestSessionDeactivated = true; + contextReportedState(ing_sess, DistSessionState::VAL_INACTIVE); + break; + case DistSessionEventType::VAL_SESSION_ACTIVATED: + m_dataIngestSessionActivated = true; + contextReportedState(ing_sess, DistSessionState::VAL_ACTIVE); break; default: continue; } } + + continueStateTransitions(ing_sess); + return *this; } void DistributionSessionInfo::processDataIngestFailure(std::shared_ptr dist_sess_event_report) { - std::shared_ptr< DistSessionState > dist_session_state = nullptr; - dist_session_state.reset(new DistSessionState()); + std::shared_ptr< DistSessionState > dist_session_state(new DistSessionState()); *dist_session_state = DistSessionState::VAL_INACTIVE; setState(dist_session_state); - std::shared_ptr< DistSessionEventType > dist_session_event = nullptr; - dist_session_event.reset(new DistSessionEventType()); + std::shared_ptr< DistSessionEventType > dist_session_event(new DistSessionEventType()); *dist_session_event = DistSessionEventType::VAL_SESSION_DEACTIVATED; - std::shared_ptr dist_session_event_report = nullptr; - dist_session_event_report.reset(new DistSessionEventReport()); + std::shared_ptr dist_session_event_report(new DistSessionEventReport()); dist_session_event_report->setEventType(dist_session_event); dist_session_event_report->setTimeStamp(dist_sess_event_report->getTimeStamp()); registerEvent(dist_session_event_report); +} +void DistributionSessionInfo::processServiceManagementFailure(std::shared_ptr dist_sess_event_report) +{ + std::shared_ptr< DistSessionState > dist_session_state(new DistSessionState()); + *dist_session_state = DistSessionState::VAL_INACTIVE; + + setState(dist_session_state); + + std::shared_ptr< DistSessionEventType > dist_session_event(new DistSessionEventType()); + *dist_session_event = DistSessionEventType::VAL_SESSION_DEACTIVATED; + + std::shared_ptr dist_session_event_report(new DistSessionEventReport()); + dist_session_event_report->setEventType(dist_session_event); + dist_session_event_report->setTimeStamp(dist_sess_event_report->getTimeStamp()); + registerEvent(dist_session_event_report); } void DistributionSessionInfo::setState(std::shared_ptr< DistSessionState > dist_session_state) @@ -498,6 +531,55 @@ std::shared_ptr DistributionSessionInfo::populateDistri return distribution_session_desc; } +void DistributionSessionInfo::validate() const +{ + if (!m_mbsDistributionSessionInfo) throw std::runtime_error("No MBSDistributionSessionInfo to validate"); + const auto &distr_method = m_mbsDistributionSessionInfo->getDistrMethod(); + switch (distr_method->getValue()) { + case DistributionMethod::VAL_OBJECT: + { + const auto &obj_dist_method_info = m_mbsDistributionSessionInfo->getObjDistrInfo(); + if (!obj_dist_method_info) throw std::runtime_error("Must specify objDistrInfo if distrMethod is OBJECT"); + } + break; + case DistributionMethod::VAL_PACKET: + { + const auto &pkt_dist_method_info = m_mbsDistributionSessionInfo->getPckDistrInfo(); + if (!pkt_dist_method_info) throw std::runtime_error("Must specify pktDistrInfo if distrMethod is PACKET"); + } + break; + default: + break; + } +} + +void DistributionSessionInfo::contextReportedState(const std::shared_ptr &ing_sess, + DistSessionState::Enum state) +{ + auto &dist_session_infos = ing_sess->distributionSessionInfos(); + auto it = std::find_if(dist_session_infos.begin(), dist_session_infos.end(), [this](const std::remove_reference::type::value_type &val) -> bool { return val.second->distributionSessionInfo.get() == this; }); + if (it == dist_session_infos.end()) return; + it->second->last_reported_state = state; +} + +void DistributionSessionInfo::continueStateTransitions(const std::shared_ptr &ing_sess) +{ + auto &dist_session_infos = ing_sess->distributionSessionInfos(); + auto it = std::find_if(dist_session_infos.begin(), dist_session_infos.end(), [this](const std::remove_reference::type::value_type &val) -> bool { return val.second->distributionSessionInfo.get() == this; }); + if (it == dist_session_infos.end()) return; + auto &context = it->second; + const auto &want_state = context->info->getMbsDistSessState(); + if (want_state) { + if (context->last_reported_state.getValue() == DistSessionState::VAL_INACTIVE && + want_state.value()->getValue() == DistSessionState::VAL_ESTABLISHED) { + // transitioning through INACTIVE to ESTABLISHED + setState(want_state.value()); + context->stateUpdate = true; + ing_sess->sendLocalEventPatch(context->distSessionInfoKey); + } + } +} + MBSF_NAMESPACE_STOP /* vim:ts=8:sts=4:sw=4:expandtab: diff --git a/src/mbsf/DistributionSessionInfo.hh b/src/mbsf/DistributionSessionInfo.hh index f3d0f67..91ca882 100644 --- a/src/mbsf/DistributionSessionInfo.hh +++ b/src/mbsf/DistributionSessionInfo.hh @@ -30,6 +30,7 @@ #include "openapi/model/DistributionMethod.h" #include "openapi/model/MbsSessionId.h" #include "openapi/model/StatusNotifyReqData.h" +#include "openapi/model/DistSessionState.h" #include "common.hh" #include "DistributionSessionInfoSubscription.hh" @@ -40,7 +41,6 @@ namespace reftools::mbsf { class EventNotification; class MbsServiceArea; class ExternalMbsServiceArea; - class StatusNotifyReqData; } MBSF_NAMESPACE_START @@ -85,6 +85,7 @@ public: void resetEventSubscription(); void removeEventSubscription(); void processDataIngestFailure(std::shared_ptr dist_sess_event_report); + void processServiceManagementFailure(std::shared_ptr dist_sess_event_report); DistributionSessionInfo &distributionSessionEventReportsSort(); void displayEventReports(); @@ -111,6 +112,9 @@ private: void registerEvent(std::shared_ptr dist_sess_event_report); void registerEvent(SubscribedEvents::EventTypeBitMask event_type); void sendSubscriptionNotifications(); + void validate() const; + void contextReportedState(const std::shared_ptr &ing_sess, reftools::mbsf::DistSessionState::Enum state); + void continueStateTransitions(const std::shared_ptr &ing_sess); std::shared_ptr m_mbsDistributionSessionInfo; std::map m_eventSubscriptions; @@ -121,7 +125,8 @@ private: std::recursive_mutex m_mutex; bool m_dataIngestSessionEstablished; bool m_dataIngestSessionTerminated; - + bool m_dataIngestSessionActivated; + bool m_dataIngestSessionDeactivated; }; MBSF_NAMESPACE_STOP diff --git a/src/mbsf/Nmb2Build.cc b/src/mbsf/Nmb2Build.cc index 51008de..b884fd4 100644 --- a/src/mbsf/Nmb2Build.cc +++ b/src/mbsf/Nmb2Build.cc @@ -115,13 +115,14 @@ using reftools::mbsf::MbStfIngestAddr; MBSF_NAMESPACE_START -static std::shared_ptr< ObjDistributionData > populate_mbstf_obj_distribution_data(std::shared_ptr mbs_dist_session_info); -static std::shared_ptr< TunnelAddress > populate_mbstf_mb_upf_tunnel_addr(ogs_sockaddr_t *tunnel_addr); -static std::shared_ptr< UpTrafficFlowInfo > populate_mbstf_up_traffic_flow_info(std::shared_ptr< IpAddr > dest_addr); -static std::shared_ptr< DistSessionState > populate_mbstf_dist_session_state(std::shared_ptr ing_session, std::shared_ptr< UserDataIngSession::ContextData > context_data_ptr); -static std::optional > populate_mbstf_pkt_distribution_data(std::shared_ptr mbs_dist_session_info); -static std::shared_ptr make_mbstf_dist_session_subscription(const std::string &user_data_ing_session_id, std::shared_ptr< UserDataIngSession::ContextData > context_data_ptr); -static std::shared_ptr< DistSession > build_nmb2_create_dist_session(std::shared_ptr ing_session, std::shared_ptr< UserDataIngSession::ContextData > context_data_ptr); +static std::shared_ptr populate_mbstf_obj_distribution_data(std::shared_ptr mbs_dist_session_info); +static std::shared_ptr populate_mbstf_mb_upf_tunnel_addr(ogs_sockaddr_t *tunnel_addr); +static std::shared_ptr populate_mbstf_up_traffic_flow_info(const std::shared_ptr &ds_context); +static std::shared_ptr populate_mbstf_dist_session_state(const std::shared_ptr &ing_session, + const std::shared_ptr &context_data_ptr); +static std::optional > populate_mbstf_pkt_distribution_data(std::shared_ptr mbs_dist_session_info); +static std::shared_ptr make_mbstf_dist_session_subscription(const std::string &user_data_ing_session_id, std::shared_ptr context_data_ptr); +static std::shared_ptr build_nmb2_create_dist_session(const std::shared_ptr &ing_session, const std::shared_ptr &context_data_ptr); static std::string make_dist_session_subscription_notif_url(const std::string &user_data_ing_session_id, const std::string &dist_session_info_key); static std::string make_mbstf_dist_session_subscription_notify_correlation_id(const std::string &user_data_ing_session_id, const std::string &dist_session_info_key); @@ -151,12 +152,11 @@ ogs_sbi_request_t *Nmb2Build::buildNmb2DistSession(void *context, void *data) { //std::shared_ptr< UserDataIngSession::UserDataIngDistSessId> ids_ptr = std::make_shared(*ids); try { - std::shared_ptr ing_session = UserDataIngSession::find(ids_ptr->first); - std::shared_ptr< UserDataIngSession::ContextData > context_data_ptr(ing_session->getDistributionSessionInfoData(ids_ptr->second)); + std::shared_ptr context_data_ptr(ing_session->getDistributionSessionInfoData(ids_ptr->second)); create_req_data.reset(new CreateReqData()); - std::shared_ptr< DistSession > dist_session = build_nmb2_create_dist_session(ing_session, context_data_ptr); + std::shared_ptr dist_session = build_nmb2_create_dist_session(ing_session, context_data_ptr); UserDataIngSession::setDistSessionId(context_data_ptr, generate_uuid()); std::string sess_id(context_data_ptr->mbstfDistSessionId); @@ -234,10 +234,10 @@ ogs_sbi_request_t *Nmb2Build::buildNmb2DistSessionPatch(void *context, void *dat message.http.content_type = (char *)OGS_SBI_CONTENT_PATCH_TYPE; std::shared_ptr ing_session = UserDataIngSession::find(session_ids->second->first); - std::shared_ptr< UserDataIngSession::ContextData > context_data_ptr(ing_session->getDistributionSessionInfoData(session_ids->second->second)); + std::shared_ptr context_data_ptr(ing_session->getDistributionSessionInfoData(session_ids->second->second)); if (context_data_ptr->needsUpdate) { status_item.path = (char *)"/distSession"; - std::shared_ptr< DistSession > dist_session = build_nmb2_create_dist_session(ing_session, context_data_ptr); + std::shared_ptr dist_session = build_nmb2_create_dist_session(ing_session, context_data_ptr); std::string sess_id(context_data_ptr->mbstfDistSessionId); @@ -246,10 +246,19 @@ ogs_sbi_request_t *Nmb2Build::buildNmb2DistSessionPatch(void *context, void *dat patch_val = dist_session->toJSON(true); } else if (context_data_ptr->stateUpdate) { - //patch_val = ing_session->distSessionState(); - patch_val = ing_session->getdistSessState().toJSON(); - ing_session->currentDistSessionState(ing_session->distSessionState()); - status_item.path = (char *)"/distSession/distSessionState"; + auto ¤t_state = context_data_ptr->last_reported_state; + auto &want_state = ing_session->getDistSessionState(context_data_ptr->info->getMbsDistSessState()); + if (want_state != current_state) { + if (want_state.getValue() == DistSessionState::VAL_ESTABLISHED && + current_state.getValue() == DistSessionState::VAL_ACTIVE) { + DistSessionState inactive_state; + inactive_state = DistSessionState::VAL_INACTIVE; + patch_val = inactive_state.toJSON(); + } else { + patch_val = want_state.toJSON(); + } + status_item.path = (char *)"/distSession/distSessionState"; + } } patch_item_list = OpenAPI_list_create(); @@ -283,17 +292,15 @@ ogs_sbi_request_t *Nmb2Build::buildNmb2DistSessionPatch(void *context, void *dat return request; } -std::shared_ptr< UpTrafficFlowInfo > populate_mbstf_up_traffic_flow_info(std::shared_ptr< IpAddr > dest_addr) +std::shared_ptr populate_mbstf_up_traffic_flow_info(const std::shared_ptr &ds_context) { - std::shared_ptr< UpTrafficFlowInfo > flow_info = nullptr; - - static std::random_device rd; - static std::uniform_int_distribution ud(32768, 65535); - int32_t port = ud(rd); + std::shared_ptr flow_info = nullptr; flow_info.reset(new UpTrafficFlowInfo() ); - flow_info->setDestIpAddr(dest_addr); - flow_info->setPortNumber(port); + flow_info->setDestIpAddr(ds_context->ssm->getDestIpAddr()); + flow_info->setPortNumber(ds_context->ssm_port); + flow_info->setSrcIpAddr(ds_context->ssm->getSourceIpAddr()); + if (ds_context->tsi != 0) flow_info->setTransportSessionId(ds_context->tsi); return flow_info; } @@ -311,7 +318,7 @@ std::shared_ptr < TunnelAddress > populate_mbstf_mb_upf_tunnel_addr(ogs_sockaddr std::string ipv4_addr = std::string(buf, strnlen(buf, sizeof(buf))); tun_addr->setIpv4Addr(ipv4_addr); ogs_info(" UDP Tunnel = %s:%u", buf, OGS_PORT(sa)); - ogs_info("Recieved IPv4 tunnel address"); + ogs_info("Received IPv4 tunnel address"); } else if (sa->ogs_sa_family == AF_INET6) { std::shared_ptr < Ipv6Addr > ipv6_addr; @@ -362,13 +369,11 @@ static std::shared_ptr< ObjDistributionData > populate_mbstf_obj_distribution_da } -static std::shared_ptr< DistSessionState > populate_mbstf_dist_session_state(std::shared_ptr ing_session, std::shared_ptr< UserDataIngSession::ContextData > context_data_ptr) +static std::shared_ptr populate_mbstf_dist_session_state(const std::shared_ptr &ing_session, + const std::shared_ptr &context_data_ptr) { - std::optional > dist_session_state = context_data_ptr->info->getMbsDistSessState(); - if (dist_session_state.has_value()) { - return dist_session_state.value(); - } - return ing_session->getDistSessionState(); + const auto ¤t_state = ing_session->getDistSessionState(context_data_ptr->info->getMbsDistSessState()); + return std::shared_ptr(new DistSessionState(current_state)); } static std::optional > populate_mbstf_pkt_distribution_data(std::shared_ptr mbs_dist_session_info) @@ -474,27 +479,26 @@ static std::string make_dist_session_subscription_notif_url(const std::string &u #endif } -static std::shared_ptr< DistSession > build_nmb2_create_dist_session(std::shared_ptr ing_session, std::shared_ptr< UserDataIngSession::ContextData > context_data_ptr) +static std::shared_ptr< DistSession > build_nmb2_create_dist_session(const std::shared_ptr &ing_session, const std::shared_ptr &context_data_ptr) { - std::shared_ptr< ObjDistributionData > mbstf_obj_distribution_data = nullptr; - std::optional > mbstf_pkt_distribution_data = std::nullopt; - std::shared_ptr< UpTrafficFlowInfo > mbstf_up_traffic_flow_info = nullptr; - std::shared_ptr< TunnelAddress > mbstf_mb_upf_tunnel_addr = nullptr; - std::shared_ptr< DistSessionState > dist_session_state = nullptr; + std::shared_ptr mbstf_obj_distribution_data = nullptr; + std::optional > mbstf_pkt_distribution_data = std::nullopt; + std::shared_ptr mbstf_up_traffic_flow_info = nullptr; + std::shared_ptr mbstf_mb_upf_tunnel_addr = nullptr; + std::shared_ptr dist_session_state = nullptr; - std::shared_ptr< DistSession > dist_session = nullptr; + std::shared_ptr dist_session = nullptr; - std::shared_ptr< CreateReqData > create_req_data = nullptr; + std::shared_ptr create_req_data = nullptr; - std::shared_ptr< DistributionMethod > distribution_method = context_data_ptr->distributionSessionInfo->getDistributionMethod(); + std::shared_ptr distribution_method = context_data_ptr->distributionSessionInfo->getDistributionMethod(); if (distribution_method && distribution_method->getString() == "OBJECT") { mbstf_obj_distribution_data = populate_mbstf_obj_distribution_data(context_data_ptr->info); } if (distribution_method && distribution_method->getString() == "PACKET") { mbstf_pkt_distribution_data = populate_mbstf_pkt_distribution_data(context_data_ptr->info); } - std::shared_ptr< IpAddr > dest_addr = context_data_ptr->ssm->getDestIpAddr(); - mbstf_up_traffic_flow_info = populate_mbstf_up_traffic_flow_info(dest_addr); + mbstf_up_traffic_flow_info = populate_mbstf_up_traffic_flow_info(context_data_ptr); ogs_sockaddr_t *tunnel_addr = context_data_ptr->MBSSession->tunnelAddr(); mbstf_mb_upf_tunnel_addr = populate_mbstf_mb_upf_tunnel_addr(tunnel_addr); @@ -502,7 +506,6 @@ static std::shared_ptr< DistSession > build_nmb2_create_dist_session(std::shared dist_session_state = populate_mbstf_dist_session_state(ing_session, context_data_ptr); std::string mbr = UserDataIngSession::maxContBitRate(context_data_ptr->info); - //std::shared_ptr< DistSessionState > dist_sess_state = ing_session->getDistSessionState(); std::shared_ptr subscription = make_mbstf_dist_session_subscription(ing_session->userDataIngSessionId(), context_data_ptr); @@ -518,8 +521,6 @@ static std::shared_ptr< DistSession > build_nmb2_create_dist_session(std::shared dist_session->setDscpMarking(context_data_ptr->info->getTrafficMarkingInfo()); dist_session->setDistSessionSubscription(subscription); - ing_session->currentDistSessionState(*dist_session_state); - return dist_session; } diff --git a/src/mbsf/Nmb2Handler.cc b/src/mbsf/Nmb2Handler.cc index 2a46d1a..dfd3551 100644 --- a/src/mbsf/Nmb2Handler.cc +++ b/src/mbsf/Nmb2Handler.cc @@ -249,7 +249,7 @@ bool Nmb2Handler::processEvent(Open5GSEvent &event) } else if (method == OGS_SBI_HTTP_METHOD_PATCH) { ogs_info("PATCH MESSAGE STATUS: %d %lu CT:[%s]", message.resStatus(), response.contentLength(), message.contentType()); - if (message.resStatus() == OGS_SBI_HTTP_STATUS_OK) + if (message.resStatus() == OGS_SBI_HTTP_STATUS_OK) { if ( response.contentLength() && valid_content_type(message)) { @@ -265,16 +265,11 @@ bool Nmb2Handler::processEvent(Open5GSEvent &event) ogs_debug("PATCH RESPONSE Parsed JSON: %s", txt.c_str()); } UserDataIngSession::handlePatchUpdateResponse(sbi_xact); - - - } } else { ogs_error("MBSTF Patch Update failed"); UserDataIngSession::rollbackMBSTFDistSessionState(sbi_xact); } - - } else { ogs_error("Invalid HTTP method [%s]", method.c_str()); } diff --git a/src/mbsf/UserDataIngSession.cc b/src/mbsf/UserDataIngSession.cc index 4530438..32f9968 100644 --- a/src/mbsf/UserDataIngSession.cc +++ b/src/mbsf/UserDataIngSession.cc @@ -22,6 +22,7 @@ #include "ogs-sbi.h" // C library includes +#include #include #include #include @@ -31,8 +32,10 @@ #include // standard template library includes +#include #include #include +#include #include #include #include @@ -108,6 +111,7 @@ using fiveg_mag_reftools::CJson; using fiveg_mag_reftools::ModelException; using fiveg_mag_reftools::ProblemCause; using reftools::mbsf::AssociatedSessionId; +using reftools::mbsf::DistributionMethod; using reftools::mbsf::DistSession; using reftools::mbsf::DistSessionState; using reftools::mbsf::ExternalMbsServiceArea; @@ -150,18 +154,20 @@ static void process_mbs_distribution_session_info(std::shared_ptr< UserDataIngSe static std::string print_mbs_session_error(std::shared_ptr< UserDataIngSession::ContextData > context_data); static void handle_failed_mbstf_nf_instance_discover(ogs_sbi_xact_t *xact); static bool validate_state_setting_options(std::shared_ptr user_data_ing_session, Open5GSSBIStream &stream, Open5GSSBIMessage &message, const NfServer::AppMetadata &app_meta, std::optional api); -static void send_invalid_user_data_ing_session_err(const std::out_of_range &e, Open5GSSBIStream &stream, size_t number_of_components, - const Open5GSSBIMessage &message, const NfServer::AppMetadata &app_meta, std::optional api, const std::string &user_data_ing_session_id); -/* -static std::shared_ptr change_mbs_dist_session_infos(std::shared_ptr< UserDataIngSession::ContextData > context_data, - std::shared_ptr current_mbs_dist_session_infos, - std::shared_ptr new_mbs_dist_session_infos); -*/ - +static void send_invalid_user_data_ing_session_err(const std::out_of_range &e, Open5GSSBIStream &stream, + size_t number_of_components, const Open5GSSBIMessage &message, + const NfServer::AppMetadata &app_meta, + const std::optional &api, + const std::string &user_data_ing_session_id); static std::shared_ptr populate_mb_smf_mbs_session(std::shared_ptr< UserDataIngSession::ContextData > context_data, std::shared_ptr mb_smf_mbs_session); +static int64_t duration_timer(const std::chrono::system_clock::time_point &tp); +static uint64_t get_next_tsi(); +static void send_model_error(const ModelException &err, Open5GSSBIStream &stream, int path_segments, Open5GSSBIMessage &message, + const NfServer::AppMetadata &app_meta, const std::optional &api, + const std::string &no_cause_reason, const std::string &log_prefix); -static std::int64_t duration_timer(const std::chrono::system_clock::time_point &tp); +static std::atomic g_next_tsi = 2; std::recursive_mutex UserDataIngSession::s_registry_mutex; std::map> UserDataIngSession::s_xactRegistry; @@ -174,14 +180,9 @@ UserDataIngSession::UserDataIngSession(CJson &json, bool as_request) ,m_lastUsed() ,m_hash() ,m_UserDataIngSessionId() - ,m_alwaysActive(nullptr) ,m_activePeriods(nullptr) - ,m_activePeriodsRepRule(nullptr) ,m_activePeriodsTimer(nullptr) - ,m_distSessionState() - ,m_currentDistSessionState() - ,m_desiredDistSessionState() - ,m_startTimer(true) + ,m_startTimer(false) ,m_serviceScheduleDescriptionVersion(1) ,m_distributionSessionInfos() ,m_distSessInfosMutex(new decltype(m_distSessInfosMutex)::element_type) @@ -202,10 +203,6 @@ UserDataIngSession::UserDataIngSession(CJson &json, bool as_request) m_hash = calculate_hash(std::vector(json_str.begin(), json_str.end())); m_UserDataIngSessionId = std::string(id); - - m_distSessionState = DistSessionState::VAL_INACTIVE; - m_currentDistSessionState = DistSessionState::NO_VAL; - m_desiredDistSessionState = DistSessionState::NO_VAL; } @@ -334,17 +331,10 @@ bool UserDataIngSession::processEvent(Open5GSEvent &event) } try { - user_data_ing_session.reset(new UserDataIngSession(user_data_ing_sess, true)); } catch (ModelException &ex) { - if (ex.cause) { - ogs_assert(true == NfServer::sendError(stream, ex.cause.value(), 3, message, app_meta, - api, /*ex.cause.value().reason() */ "Mandatory information element missing", ex.what(), std::nullopt, std::nullopt)); - } else { - ogs_assert(true == NfServer::sendError(stream, OGS_SBI_HTTP_STATUS_BAD_REQUEST, 3, message, - app_meta, api, "Mandatory information element missing", ex.what(), std::nullopt, std::nullopt)); - } - return true; + send_model_error(ex, stream, 3, message, app_meta, api, "Problem with UserDataIngSession", "Creating UserDataIngSession"); + return true; } if (!validate_state_setting_options(user_data_ing_session, stream, message, app_meta, api)) return true; @@ -434,7 +424,7 @@ bool UserDataIngSession::processEvent(Open5GSEvent &event) int response_code = 200; CJson user_data_ing_session_json(user_data_ing_sess->json(false)); std::string body(user_data_ing_session_json.serialise()); - ogs_debug("Parsed JSON: %s", body.c_str()); + ogs_debug("Generated JSON: %s", body.c_str()); std::shared_ptr response(NfServer::newResponse(std::string(request.uri()), body.empty()?nullptr:"application/json", user_data_ing_sess->generated(), @@ -444,9 +434,7 @@ bool UserDataIngSession::processEvent(Open5GSEvent &event) ogs_assert(response); NfServer::populateResponse(response, body, response_code); ogs_assert(true == Open5GSSBIServer::sendResponse(stream, *response)); - } catch (const std::out_of_range &e) { - send_invalid_user_data_ing_session_err(e, stream, 3, message, app_meta, api, user_data_ing_session_id); } @@ -645,70 +633,9 @@ UserDataIngSession &UserDataIngSession::createTimer() { return *this; } -std::shared_ptr< DistSessionState > UserDataIngSession::getDistSessionState() -{ - std::shared_ptr< DistSessionState > dist_session_state(new DistSessionState()); - if (m_activePeriods) { - - DistSessionState dist_sess_state = m_activePeriods->currentState(std::nullopt); - *dist_session_state = dist_sess_state.getValue(); - } - - if (m_activePeriodsRepRule) { - DistSessionState dist_sess_state = m_activePeriodsRepRule->currentState(std::nullopt); - *dist_session_state = dist_sess_state.getValue(); - } - - if (m_alwaysActive) { - - DistSessionState dist_sess_state = m_alwaysActive->currentState(std::nullopt); - *dist_session_state = dist_sess_state.getValue(); - } - - /* - { - std::lock_guard lock(s_registry_mutex); - m_distSessionState = *dist_session_state; - } - */ - return dist_session_state; - -} - -const DistSessionState UserDataIngSession::getNextDistSessionState() const -{ - ActivePeriodsBase::TimestampAndActiveFlag transition; - - if (m_activePeriods) { - - transition = m_activePeriods->nextTransition(); - } else if (m_activePeriodsRepRule) { - - transition = m_activePeriodsRepRule->nextTransition(); - } else if (m_alwaysActive) { - - transition = m_alwaysActive->nextTransition(); - } - return transition.second; - -} - - - const std::string &UserDataIngSession::distSessionState() const -{ - { - std::lock_guard lock(s_registry_mutex); - return m_distSessionState; - } - -}; - -const DistSessionState UserDataIngSession::getdistSessState() const +const DistSessionState &UserDataIngSession::getDistSessionState(const std::optional > &user_state) const { - { - std::lock_guard lock(s_registry_mutex); - return m_distSessionState; - } + return m_activePeriods->currentState(user_state); } void UserDataIngSession::sendMbsmfActivityStatus(std::shared_ptr< UserDataIngSession::UserDataIngDistSessId > user_data_ing_dist_sess_ids) @@ -760,38 +687,22 @@ void UserDataIngSession::pushNotificationsEvent() const bool UserDataIngSession::startTimer() { - ActivePeriodsBase::TimestampAndActiveFlag transition; - - if (!m_startTimer) return false; - - if (m_activePeriods) { - transition = m_activePeriods->nextTransition(); - } else if (m_activePeriodsRepRule) { - - transition = m_activePeriodsRepRule->nextTransition(); - } else if (m_alwaysActive) { - - transition = m_alwaysActive->nextTransition(); - m_distSessionState = transition.second; + if (m_startTimer) { + m_activePeriodsTimer->stop(); + m_startTimer = false; } - if (transition.first.has_value()) { + ActivePeriodsBase::TimestampAndActiveFlag transition = m_activePeriods->nextTransition(); - { - std::lock_guard lock(s_registry_mutex); - m_distSessionState = transition.second; - } - std::int64_t dur = duration_timer(transition.first.value()); - ogs_time_t duration = dur; - - m_activePeriodsTimer->stop(); - m_activePeriodsTimer->start(duration); - //ogs_timer_start(m_activePeriodsTimer, ogs_time_from_msec(duration)); + if (transition.first.has_value()) { + int64_t dur_ms = duration_timer(transition.first.value()); + m_activePeriodsTimer->start(dur_ms); + ogs_debug("Next activePeriods event in %" PRIi64 "ms", dur_ms); + m_startTimer = true; return true; } return false; - } std::map> &UserDataIngSession::distributionSessionInfos() { @@ -916,52 +827,50 @@ std::shared_ptr< UserDataIngSession::UserDataIngDistSessId > UserDataIngSession: void UserDataIngSession::changeDistSessionState(void *data) { - char *id = (char *)data; - std::string user_data_ing_session_id = std::string(id); + const char *id = reinterpret_cast(data); + std::string user_data_ing_session_id(id); try { std::shared_ptr user_data_ing_sess = find(user_data_ing_session_id); - /* - if (!user_data_ing_sess->checkIfAllMBSTFResponsesReceived()) { - reset_timer(ids); - return; - } - */ - - for (auto &[dist_sess_id, user_ing_sess_id_ptr] : s_distSessionIdRegistry) { - if (user_ing_sess_id_ptr->first == user_data_ing_session_id) { - std::shared_ptr dist_sess_state = nullptr; - SessionIdContainer *session_id = new SessionIdContainer(dist_sess_id, user_ing_sess_id_ptr); - //UserDataIngDistSessId *ids = new UserDataIngDistSessId(user_ing_sess_id_ptr->first, user_ing_sess_id_ptr->second); - std::shared_ptr ids_ptr(user_ing_sess_id_ptr); - std::shared_ptr context_data = getContextData(ids_ptr); - context_data->stateUpdate = true; - dist_sess_state.reset(new DistSessionState()); - *dist_sess_state = user_data_ing_sess->distSessionState(); - context_data->info->setMbsDistSessState(dist_sess_state); - sendMbsmfActivityStatus(ids_ptr); - UserDataIngSession::sendNotificationsEvent(ids_ptr); - - user_data_ing_sess->nmbstfDiscoverAndSend(ids_ptr, Nmb2Build::buildNmb2DistSessionPatch, nullptr, session_id); - } - } - user_data_ing_sess->startTimer(); + user_data_ing_sess->_changeDistSessionState(); } catch (const std::out_of_range &e) { - std::ostringstream err; - err << "MBS User Data Ingest Session [" << user_data_ing_session_id << "] does not exist."; - ogs_error("%s", err.str().c_str()); + ogs_error("%s", std::format("MBS User Data Ingest Session [{}] does not exist.", user_data_ing_session_id).c_str()); } } +void UserDataIngSession::_changeDistSessionState() +{ + std::lock_guard lock(*m_distSessInfosMutex); + for (auto &[dist_sess_id, context_data] : m_distributionSessionInfos) { + const auto &dist_sess_state = context_data->info->getMbsDistSessState(); + const auto &want_dist_sess_state = m_activePeriods->currentState(dist_sess_state); + std::shared_ptr ids_ptr(new UserDataIngDistSessId{m_UserDataIngSessionId, dist_sess_id}); + + context_data->stateUpdate = true; + context_data->info->setMbsDistSessState(std::shared_ptr(new DistSessionState(want_dist_sess_state))); + sendMbsmfActivityStatus(ids_ptr); + sendNotificationsEvent(ids_ptr); + + SessionIdContainer *session_id = new SessionIdContainer{context_data->mbstfDistSessionId, ids_ptr}; + nmbstfDiscoverAndSend(ids_ptr, Nmb2Build::buildNmb2DistSessionPatch, nullptr, session_id); + } + startTimer(); +} + void UserDataIngSession::handleUserDataIngSessionUpdate(ogs_pool_id_t stream_id, std::shared_ptr &request) +{ + setMbstfsInDesiredState(); + updateContexts(stream_id, request); + updateMbstfRemovedDistSession(); + startTimer(); +} + +void UserDataIngSession::updateContexts(ogs_pool_id_t stream_id, std::shared_ptr &request) { std::shared_ptr ctx_data = nullptr; std::shared_ptr dist_sess_state = nullptr; - const MBSUserDataIngSession::MbsDisSessInfosType &dist_sess_infos = m_MBSUserDataIngSession->getMbsDisSessInfos(); - setMbstfsInDesiredState(); - for(const auto &[key, sess_info]: dist_sess_infos) { if (sess_info.has_value()) { @@ -986,340 +895,198 @@ void UserDataIngSession::handleUserDataIngSessionUpdate(ogs_pool_id_t stream_id, std::optional > mbs_session_id = info->getMbsSessionId(); if (mbs_session_id.has_value()) { std::shared_ptr mbs_sess_id = *mbs_session_id; - std::optional > ssm = mbs_sess_id->getSsm(); + std::optional > ssm = mbs_sess_id->getSsm(); if (ssm.has_value()) { - std::shared_ptr< Ssm > ssm_val = *ssm; - std::shared_ptr< IpAddr > src_ip_addr = ssm_val->getSourceIpAddr(); - std::shared_ptr< IpAddr > dest_ip_addr = ssm_val->getDestIpAddr(); - std::optional src_ipv4_addr = src_ip_addr->getIpv4Addr(); - std::optional dest_ipv4_addr = dest_ip_addr->getIpv4Addr(); - std::optional > src_ipv6_addr = src_ip_addr->getIpv6Addr(); - std::optional > dest_ipv6_addr = dest_ip_addr->getIpv6Addr(); - std::shared_ptr< Ssm > ssm_data = nullptr; - - ssm_data.reset(new Ssm(*ssm_val)); + std::shared_ptr ssm_val = ssm.value(); + std::shared_ptr dest_ip_addr = ssm_val->getDestIpAddr(); + std::optional dest_ipv4_addr = dest_ip_addr->getIpv4Addr(); + std::optional > dest_ipv6_addr = dest_ip_addr->getIpv6Addr(); + std::shared_ptr ssm_data(new Ssm(*ssm_val)); + if (info->getDistrMethod()->getValue() == DistributionMethod::VAL_PACKET && + info->getPckDistrInfo().value()->getOperatingMode()->getValue() == PktDistributionOperatingMode::VAL_PACKET_FORWARD_ONLY) { + /* Never pass source address in PACKET_FORWARD_ONLY mode */ + ssm_data->setSourceIpAddr(nullptr); + } + static std::random_device rd; + static std::uniform_int_distribution ud(32768, 65535); + in_port_t port = ud(rd); + uint64_t tsi = 0; + if (info->getDistrMethod()->getValue() == DistributionMethod::VAL_OBJECT) { + tsi = get_next_tsi(); + } + if (dest_ipv4_addr.has_value() || dest_ipv6_addr.has_value()) { distribution_session_info.reset(new DistributionSessionInfo(info)); - ctx_data.reset(new ContextData{m_UserDataIngSessionId, key, distribution_session_info, info, ssm_data, request, stream_id, nullptr}); + ctx_data.reset(new ContextData{ + .ingSessionId = m_UserDataIngSessionId, + .distSessionInfoKey = key, + .distributionSessionInfo = distribution_session_info, + .info = info, + .ssm = ssm_data, + .ssm_port = port, + .request = request, + .streamId = stream_id, + .tsi = tsi + }); addToDistributionSessionInfos(key, ctx_data); - nmbstfDiscoverOnly(ctx_data); + createMbsSession(ctx_data); } else { ogs_error("Unable to resolve SSM addresses"); continue; } } - } - } - } } } - updateMbstfRemovedDistSession(); - startTimer(); - } void UserDataIngSession::processUserDataIngSessionUpdate(ogs_pool_id_t stream_id, std::shared_ptr &request, CJson &json) { - - bool always_active = true; std::shared_ptr< DistSessionState > dist_sess_state = nullptr; - - std::shared_ptr mbs_user_data_ing_session = nullptr; - mbs_user_data_ing_session.reset(new MBSUserDataIngSession(json, true)); + std::shared_ptr mbs_user_data_ing_session(new MBSUserDataIngSession(json, true)); const ActPeriodsType &act_periods = mbs_user_data_ing_session->getActPeriods(); const ActPeriodsType ¤t_act_periods = m_MBSUserDataIngSession->getActPeriods(); const ActPeriodsRepRuleType &act_periods_rep_rule = mbs_user_data_ing_session->getActPeriodsRepRule(); - std::list current_versioned_active_periods; - std::shared_ptr current_versioned_active_periods_rep_rule = nullptr; + if (current_act_periods.has_value()) { - if (m_activePeriods) { - current_versioned_active_periods = m_activePeriods->versionedActPeriods(); - } m_MBSUserDataIngSession->clearActPeriods(); } - - - ogs_info("CURRENT VERSIONED ACT PERIODS SIZE: %ld", current_versioned_active_periods.size()); + m_MBSUserDataIngSession->setActPeriodsRepRule(std::nullopt); if (act_periods.has_value() && !act_periods->empty()) { - /* - if (has_act_periods_changed(act_periods, current_act_periods)) { - m_serviceScheduleDescriptionVersion++; - } - */ - always_active = false; - - resetAlwaysActive(); - resetActivePeriodsRepRule(); activePeriods(act_periods); - if (m_activePeriods) { - //const std::list ¤t_versioned_active_periods = m_activePeriods->versionedActPeriods(); - - std::list versioned_active_periods = versionedActPeriods(current_versioned_active_periods, act_periods); - m_activePeriods->versionedActPeriods(versioned_active_periods); - } - m_MBSUserDataIngSession->setActPeriods(std::move(act_periods)); - dist_sess_state = getDistSessionState(); createTimer(); - } - - if (m_activePeriodsRepRule) - { - current_versioned_active_periods_rep_rule = m_activePeriodsRepRule->versionedActivePeriodsRepRule(); - - } - - m_MBSUserDataIngSession->setActPeriodsRepRule(std::nullopt); - - - if (act_periods_rep_rule.has_value()) { - /* - if (has_act_periods_rep_rule_changed(act_periods_rep_rule, current_act_periods_rep_rule)) { - m_serviceScheduleDescriptionVersion++; - } - */ - - m_MBSUserDataIngSession->setActPeriodsRepRule(std::move(act_periods_rep_rule)); - always_active = false; - - resetAlwaysActive(); - resetActivePeriods(); - + } else if (act_periods_rep_rule.has_value()) { activePeriodsRepRule(act_periods_rep_rule); - if (m_activePeriodsRepRule) { - std::shared_ptr versioned_active_periods = versionedActPeriodsRepRule(current_versioned_active_periods_rep_rule, act_periods_rep_rule); - m_activePeriodsRepRule->versionedActivePeriodsRepRule(versioned_active_periods); - } + m_MBSUserDataIngSession->setActPeriodsRepRule(std::move(act_periods_rep_rule)); - dist_sess_state = getDistSessionState(); createTimer(); + } else { + alwaysActive(); } auto app_context = App::self().context(); - const MBSUserDataIngSession::MbsDisSessInfosType current_dist_sess_infos = m_MBSUserDataIngSession->getMbsDisSessInfos(); - for(const auto &[key, sess_info]: current_dist_sess_infos) { - + const MBSUserDataIngSession::MbsDisSessInfosType ¤t_dist_sess_infos = m_MBSUserDataIngSession->getMbsDisSessInfos(); + MBSUserDataIngSession::MbsDisSessInfosType update_dist_sess_infos = mbs_user_data_ing_session->getMbsDisSessInfos(); + for(const auto &[key, sess_info] : current_dist_sess_infos) { + if (!sess_info.has_value() || !sess_info.value()) { + m_MBSUserDataIngSession->removeMbsDisSessInfos(key); + continue; + } + const std::shared_ptr &info = sess_info.value(); std::shared_ptr context_data = getDistributionSessionInfoData(key); ogs_assert(context_data); - bool present_in_update = false; - if (sess_info.has_value()) - { - std::shared_ptr info = sess_info.value(); - const MBSUserDataIngSession::MbsDisSessInfosType &update_dist_sess_infos = mbs_user_data_ing_session->getMbsDisSessInfos(); - for(const auto &[key_in_update, sess_info_update]: update_dist_sess_infos) { - if (!sess_info_update.has_value() /*|| sess_info_update.value() == nullptr */) { - continue; - } - std::shared_ptr update_info = sess_info_update.value(); + context_data->needsUpdate = false; - if (key == key_in_update) { - present_in_update = true; + bool present_in_update = false; + for (const auto &[key_in_update, sess_info_update]: update_dist_sess_infos) { + if (key == key_in_update) { + if (sess_info_update.has_value() && sess_info_update.value()) { + // update std::shared_ptr update_info = sess_info_update.value(); - update_info->setMbsDistSessionId(info->getMbsDistSessionId()); - //update_info->setMbsDistSessState(info->getMbsDistSessState()); - if (!info && !update_info) { - context_data->needsUpdate = false; - continue; - } else if ( info && update_info && info == update_info) { - context_data->needsUpdate = false; - continue; - } else if (info && update_info && *info == *update_info) { - context_data->needsUpdate = false; - continue; - } else { - - //change_mbs_dist_session_infos(context_data, info, update_info); + // Copy old MBS Dist Session Id + update_info->setMbsDistSessionId(info->getMbsDistSessionId()); - context_data->distributionSessionInfo->updateMBSDistributionSessionInfo(update_info); + if (*update_info != *info) { context_data->needsUpdate = true; - } - - } else { - const auto &mbs_session_id = sess_info_update.value()->getMbsSessionId(); - if (mbs_session_id) { - const auto &mbs_svc_area = sess_info_update.value()->getTgtServAreas(); - const auto &ext_mbs_svc_area = sess_info_update.value()->getExtTgtServAreas(); - UniqueMbsSessionId cmp_mbs_session_id(!!mbs_session_id.value()->getSsm(), mbs_session_id.value(), - mbs_svc_area?mbs_svc_area.value():std::shared_ptr(), - ext_mbs_svc_area?ext_mbs_svc_area.value():std::shared_ptr()); - if (app_context->haveMbsSessionId(cmp_mbs_session_id)) { - ogs_error("UserDataIngSession update adds already allocated MBS Session Id"); - Open5GSSBIStream stream(stream_id); - Open5GSSBIMessage message; - message.parseHeader(*request); - NfServer::sendError(stream, MBSProblemCause::MBS_DIST_SESSION_ALREADY_CREATED, 2, message, App::self().mbsfAppMetadata(), g_nmbsf_userdataingsession_api_metadata, "Duplicate MBS Session Id", "UserDataIngSession update adds already allocated MBS Session Id"); - return; - } else { - app_context->addMbsSessionId(cmp_mbs_session_id); - } - } - m_MBSUserDataIngSession->addMbsDisSessInfos(key_in_update, sess_info_update); - } - std::optional > received_dist_session_state = update_info->getMbsDistSessState(); - if (!received_dist_session_state.has_value()) { - if (always_active) { - resetActivePeriods(); - resetActivePeriodsRepRule(); - alwaysActive(); - - dist_sess_state = getDistSessionState(); - if (dist_sess_state) info->setMbsDistSessState(dist_sess_state); - } else { - - if (dist_sess_state) info->setMbsDistSessState(dist_sess_state); + context_data->distributionSessionInfo->updateMBSDistributionSessionInfo(update_info); } } - + update_dist_sess_infos.erase(key_in_update); + present_in_update = true; + break; } - if (!present_in_update) { - context_data->markForDeletion = true; - if (context_data->distributionSessionInfo) { - auto mbs_session_id = context_data->distributionSessionInfo->getUniqueMbsSessionId(); - if (mbs_session_id && app_context->haveMbsSessionId(mbs_session_id)) { - app_context->deleteMbsSessionId(mbs_session_id); - } + } + if (!present_in_update) { + context_data->markForDeletion = true; + if (context_data->distributionSessionInfo) { + auto mbs_session_id = context_data->distributionSessionInfo->getUniqueMbsSessionId(); + if (mbs_session_id && app_context->haveMbsSessionId(mbs_session_id)) { + app_context->deleteMbsSessionId(mbs_session_id); } - m_MBSUserDataIngSession->removeMbsDisSessInfos(key); } + m_MBSUserDataIngSession->removeMbsDisSessInfos(key); } - - } - - const std::optional > &mbs_user_serv_anmt = mbs_user_data_ing_session->getMbsUserServAnmt(); - if (mbs_user_serv_anmt.has_value() && mbs_user_serv_anmt.value()) { - m_MBSUserDataIngSession->setMbsUserServAnmt(std::move(mbs_user_serv_anmt)); } - const std::optional > user_service_description = mbs_user_data_ing_session->getMbsUserServiceAnmt(); - if (user_service_description.has_value() && mbs_user_serv_anmt.value()) { - m_MBSUserDataIngSession->setMbsUserServiceAnmt(std::move(user_service_description)); + // What is left in update_dist_sess_infos are new entries so add them + for (const auto &[key_in_update, sess_info_update]: update_dist_sess_infos) { + const auto &mbs_session_id = sess_info_update.value()->getMbsSessionId(); + if (mbs_session_id) { + const auto &mbs_svc_area = sess_info_update.value()->getTgtServAreas(); + const auto &ext_mbs_svc_area = sess_info_update.value()->getExtTgtServAreas(); + UniqueMbsSessionId cmp_mbs_session_id(!!mbs_session_id.value()->getSsm(), mbs_session_id.value(), + mbs_svc_area?mbs_svc_area.value():std::shared_ptr(), + ext_mbs_svc_area?ext_mbs_svc_area.value():std::shared_ptr()); + if (app_context->haveMbsSessionId(cmp_mbs_session_id)) { + ogs_error("UserDataIngSession update adds already allocated MBS Session Id"); + Open5GSSBIStream stream(stream_id); + Open5GSSBIMessage message; + message.parseHeader(*request); + NfServer::sendError(stream, MBSProblemCause::MBS_DIST_SESSION_ALREADY_CREATED, 2, message, App::self().mbsfAppMetadata(), g_nmbsf_userdataingsession_api_metadata, "Duplicate MBS Session Id", "UserDataIngSession update adds already allocated MBS Session Id"); + return; + } else { + app_context->addMbsSessionId(cmp_mbs_session_id); + } + } + m_MBSUserDataIngSession->addMbsDisSessInfos(key_in_update, sess_info_update); } - const std::optional serv_anmt_url = mbs_user_data_ing_session->getMbsUserServiceAnmtUrl(); - if (serv_anmt_url.has_value() && !serv_anmt_url.value().empty()) { - m_MBSUserDataIngSession->setMbsUserServiceAnmtUrl(std::move(serv_anmt_url)); + // Reset the states for each dist session + for(const auto &[key, sess_info] : current_dist_sess_infos) { + if (!sess_info || !sess_info.value()) continue; + const std::shared_ptr &info = sess_info.value(); + const auto &new_dist_state = m_activePeriods->currentState(info->getMbsDistSessState()); + std::shared_ptr dist_state(new DistSessionState(new_dist_state)); + info->setMbsDistSessState(dist_state); } if (mbsUserService() && mbsUserService()->isServiceAnnModePassedBack() && checkIfAllMBSDistributionSessionsEstablishedOrActive()) { - std::shared_ptr user_service_desc = userServiceDesc(); userServiceAnnouncement(user_service_desc->userServiceDescription()); + } else { + userServiceAnnouncement(nullptr); } handleUserDataIngSessionUpdate(stream_id, request); } -void UserDataIngSession::processDistributionSessionInfo( ogs_pool_id_t stream_id, std::shared_ptr &request) +void UserDataIngSession::processDistributionSessionInfo(ogs_pool_id_t stream_id, std::shared_ptr &request) { - bool always_active = true; - std::shared_ptr ctx_data = nullptr; - std::shared_ptr dist_sess_state = nullptr; - - const ActPeriodsType &act_periods = m_MBSUserDataIngSession->getActPeriods(); + const ActPeriodsType &act_periods = m_MBSUserDataIngSession->getActPeriods(); const ActPeriodsRepRuleType &act_periods_rep_rule = m_MBSUserDataIngSession->getActPeriodsRepRule(); if (act_periods.has_value() && !act_periods->empty()) { - always_active = false; - - resetAlwaysActive(); - resetActivePeriodsRepRule(); - activePeriods(act_periods); - if (m_activePeriods) { - const std::list ¤t_versioned_active_periods = m_activePeriods->versionedActPeriods(); - std::list versioned_active_periods = versionedActPeriods(current_versioned_active_periods, act_periods); - m_activePeriods->versionedActPeriods(versioned_active_periods); - } - dist_sess_state = getDistSessionState(); createTimer(); - } - - if (act_periods_rep_rule.has_value()) { - always_active = false; - - resetAlwaysActive(); - resetActivePeriods(); - + } else if (act_periods_rep_rule.has_value()) { activePeriodsRepRule(act_periods_rep_rule); - - if (m_activePeriodsRepRule) { - const std::shared_ptr &versioned_active_periods_repetition_rule = m_activePeriodsRepRule->versionedActivePeriodsRepRule(); - std::shared_ptr versioned_active_periods_rep_rule = versionedActPeriodsRepRule(versioned_active_periods_repetition_rule, act_periods_rep_rule); - m_activePeriodsRepRule->versionedActivePeriodsRepRule(versioned_active_periods_rep_rule); - } - - - dist_sess_state = getDistSessionState(); createTimer(); + } else { + alwaysActive(); } - const MBSUserDataIngSession::MbsDisSessInfosType &dist_sess_infos = m_MBSUserDataIngSession->getMbsDisSessInfos(); + updateContexts(stream_id, request); + const MBSUserDataIngSession::MbsDisSessInfosType &dist_sess_infos = m_MBSUserDataIngSession->getMbsDisSessInfos(); for (const auto &[key, sess_info]: dist_sess_infos) { - if (sess_info.has_value()) - { - std::shared_ptr info = sess_info.value(); - if (info) { - std::shared_ptr distribution_session_info = nullptr; - - std::optional > mbs_session_id = info->getMbsSessionId(); - if (mbs_session_id.has_value()) { - std::shared_ptr mbs_sess_id = *mbs_session_id; - std::optional > ssm = mbs_sess_id->getSsm(); - if (ssm.has_value()) { - std::shared_ptr< Ssm > ssm_val = *ssm; - //std::shared_ptr< IpAddr > src_ip_addr = ssm_val->getSourceIpAddr(); - std::shared_ptr< IpAddr > dest_ip_addr = ssm_val->getDestIpAddr(); - //auto &src_ipv4_addr = src_ip_addr->getIpv4Addr(); - auto &dest_ipv4_addr = dest_ip_addr->getIpv4Addr(); - //auto &src_ipv6_addr = src_ip_addr->getIpv6Addr(); - auto &dest_ipv6_addr = dest_ip_addr->getIpv6Addr(); - std::shared_ptr< Ssm > ssm_data = nullptr; - - ssm_data.reset(new Ssm(*ssm_val)); - if (dest_ipv4_addr.has_value() || dest_ipv6_addr.has_value()) { - distribution_session_info.reset(new DistributionSessionInfo(info)); - ctx_data.reset(new ContextData{std::string(m_UserDataIngSessionId), std::string(key), distribution_session_info, info, ssm_data, request, stream_id, nullptr}); - addToDistributionSessionInfos(std::string(key), ctx_data); - nmbstfDiscoverOnly(ctx_data); - - } else { - ogs_error("Unable to resolve SSM addresses"); - continue; - } - - } - - } - std::optional > received_dist_session_state = info->getMbsDistSessState(); - if (!received_dist_session_state.has_value()) { - - if (always_active) { - resetActivePeriods(); - resetActivePeriodsRepRule(); - alwaysActive(); - dist_sess_state = getDistSessionState(); - if (dist_sess_state) info->setMbsDistSessState(dist_sess_state); - //ActivePeriodsBase::TimestampAndActiveFlag transition = always_active->nextTransition(); - } else { - if (dist_sess_state) info->setMbsDistSessState(dist_sess_state); - } - } - } - } + if (!sess_info || !sess_info.value()) continue; + const auto &new_dist_state = m_activePeriods->currentState(sess_info.value()->getMbsDistSessState()); + std::shared_ptr dist_state(new DistSessionState(new_dist_state)); + sess_info.value()->setMbsDistSessState(dist_state); } + startTimer(); } @@ -1428,7 +1195,6 @@ void UserDataIngSession::resetMBSDistributionSessionsTerminatedFlag() for (const auto &dist_sess_info : m_distributionSessionInfos) { dist_sess_info.second->distributionSessionInfo->resetDataIngestSessionTerminated(); } - } void UserDataIngSession::resetMBSDistributionSessionsEstablishedFlag() @@ -1436,39 +1202,27 @@ void UserDataIngSession::resetMBSDistributionSessionsEstablishedFlag() for (const auto &dist_sess_info : m_distributionSessionInfos) { dist_sess_info.second->distributionSessionInfo->resetDataIngestSessionEstablished(); } - } void UserDataIngSession::setMbstfsInDesiredState() { - //DistSessionState current_dist_session_state(std::move(m_currentDistSessionState)); - std::shared_ptr< DistSessionState > dist_session_state = getDistSessionState(); - DistSessionState dist_sess_state = getNextDistSessionState(); - std::string desired_state = getDistSessionState()->getString(); - - if (m_currentDistSessionState == DistSessionState::VAL_ACTIVE && (*dist_session_state == DistSessionState::VAL_ESTABLISHED || dist_sess_state == DistSessionState::VAL_ESTABLISHED)) - { - m_startTimer = false; - m_activePeriodsTimer->stop(); - m_distSessionState = DistSessionState::VAL_INACTIVE; - m_desiredDistSessionState = DistSessionState::VAL_ESTABLISHED; - changeDistSessionState((void *)m_UserDataIngSessionId.c_str()); + std::lock_guard lock(*m_distSessInfosMutex); + for (const auto &[dist_sess_id, dist_sess_ctx] : m_distributionSessionInfos) { + if (!dist_sess_ctx || !dist_sess_ctx->info) continue; + const auto ¤t_dist_session_state = dist_sess_ctx->info->getMbsDistSessState(); + const DistSessionState &want_dist_session_state = getDistSessionState(current_dist_session_state); + if (!current_dist_session_state || !current_dist_session_state.value() || + *current_dist_session_state.value() != want_dist_session_state) { + UserDataIngDistSessId id{m_UserDataIngSessionId, dist_sess_id}; + changeDistSessionState(&id); + } } } void UserDataIngSession::checkDesiredState() { - if (m_desiredDistSessionState != DistSessionState::NO_VAL) { - m_startTimer = false; - m_activePeriodsTimer->stop(); - m_distSessionState = m_desiredDistSessionState; - changeDistSessionState((void *)m_UserDataIngSessionId.c_str()); - m_currentDistSessionState = DistSessionState::NO_VAL; - m_desiredDistSessionState = DistSessionState::NO_VAL; - } else { - m_startTimer = true; - startTimer(); - } + setMbstfsInDesiredState(); + startTimer(); } bool UserDataIngSession::checkIfAllMBSDistributionSessionsEstablishedOrActive() @@ -1486,7 +1240,11 @@ bool UserDataIngSession::checkIfAllMBSDistributionSessionsEstablishedOrActive() UserDataIngSession &UserDataIngSession::userServiceAnnouncement(const std::shared_ptr &user_service_description) { - m_MBSUserDataIngSession->setMbsUserServiceAnmt(user_service_description); + if (user_service_description) { + m_MBSUserDataIngSession->setMbsUserServiceAnmt(user_service_description); + } else { + m_MBSUserDataIngSession->setMbsUserServiceAnmt(std::nullopt); + } return *this; } @@ -1529,6 +1287,8 @@ bool UserDataIngSession::sendNmbsfMbsUserDataIngestResponse(std::shared_ptr user_service_desc = userServiceDesc(); ing_sess->userServiceAnnouncement(user_service_desc->userServiceDescription()); + } else { + ing_sess->userServiceAnnouncement(nullptr); } CJson user_data_ing_sess_json(ing_sess->json(false)); @@ -1562,28 +1322,6 @@ bool UserDataIngSession::handleMbstfDiscover(ogs_sbi_nf_instance_t *nf_instance, return false; } - Open5GSSBIClient mbstf_client(reinterpret_cast(NF_INSTANCE_CLIENT(nf_instance))); - std::shared_ptr mb_smf_mbs_session; - std::string src_ipv4_addr; - std::string src_ipv6_addr; - - ogs_sockaddr_t *client_ipv4_addr = mbstf_client.ogsSBIClientIPv4Addr(); - ogs_sockaddr_t *client_ipv6_addr = mbstf_client.ogsSBIClientIPv6Addr(); - - if (client_ipv4_addr) { - char *ipv4_addr = mbstf_client.ogsIpStrdup(client_ipv4_addr); - src_ipv4_addr = std::string(ipv4_addr); - ogs_free(ipv4_addr); - } - - if (client_ipv6_addr) { - char *ipv6_addr = mbstf_client.ogsIpStrdup(client_ipv6_addr); - src_ipv6_addr = std::string(ipv6_addr); - ogs_free(ipv6_addr); - } - - - std::shared_ptr context_data = nullptr; std::shared_ptr ids = nullptr; { std::lock_guard lock(s_registry_mutex); @@ -1601,8 +1339,7 @@ bool UserDataIngSession::handleMbstfDiscover(ogs_sbi_nf_instance_t *nf_instance, if (nf_instance->t_validity) ogs_timer_stop(nf_instance->t_validity); ing_session->setNFInstance(xact->service_type, nf_instance); - - context_data = getContextData(ids); + const auto &context_data = getContextData(ids); if (!context_data) { ogs_error("Unable to get context data from registry"); return false; @@ -1610,29 +1347,37 @@ bool UserDataIngSession::handleMbstfDiscover(ogs_sbi_nf_instance_t *nf_instance, if (context_data->mbstfNFInstanceId.empty()) context_data->mbstfNFInstanceId = std::string(nf_instance->id); - std::shared_ptr ssm_ptr = context_data->ssm; + removeFromRegistry(xact); + + return true; +} + +bool UserDataIngSession::createMbsSession(const std::shared_ptr &context_data) +{ + const auto &ssm_ptr = context_data->ssm; if (!ssm_ptr) ogs_error("Unable to get SSM from Context Data"); - std::shared_ptr< IpAddr > dest_ip_addr = ssm_ptr->getDestIpAddr(); - std::optional dest_ipv4_addr = dest_ip_addr->getIpv4Addr(); - std::optional > dest_ipv6_addr = dest_ip_addr->getIpv6Addr(); + const auto &dest_ip_addr = ssm_ptr->getDestIpAddr(); + const auto &dest_ipv4_addr = dest_ip_addr?dest_ip_addr->getIpv4Addr():std::nullopt; + const auto &dest_ipv6_addr = dest_ip_addr?dest_ip_addr->getIpv6Addr():std::nullopt; + const auto &src_ip_addr = ssm_ptr->getSourceIpAddr(); + const auto &src_ipv4_addr = src_ip_addr?src_ip_addr->getIpv4Addr():std::nullopt; + const auto &src_ipv6_addr = src_ip_addr?src_ip_addr->getIpv6Addr():std::nullopt; - if (!client_ipv4_addr && !client_ipv6_addr) { + std::shared_ptr mb_smf_mbs_session = nullptr; + if (!src_ipv4_addr && !src_ipv6_addr) { mb_smf_mbs_session.reset(new MBSMFMBSSession(mb_smf_sc_mbs_session_new())); mb_smf_mbs_session->setTunnelRequest(true); - - } else if (client_ipv4_addr && dest_ipv4_addr.has_value()) { + } else if (src_ipv4_addr && dest_ipv4_addr) { struct addrinfo *ai_src = NULL, *ai_dest = NULL; void *src_addr = NULL, *dest_addr = NULL; - if (resolve_src_dest_addr(src_ipv4_addr, dest_ipv4_addr.value(), &ai_src, &ai_dest)) + if (resolve_src_dest_addr(src_ipv4_addr.value(), dest_ipv4_addr.value(), &ai_src, &ai_dest)) { if (get_src_dest_of_same_addr_family(AF_INET, ai_src, ai_dest, &src_addr, &dest_addr)) { mb_smf_mbs_session.reset(new MBSMFMBSSession( - mb_smf_sc_mbs_session_new_ipv4((const struct in_addr*)src_addr, - (const struct in_addr*)dest_addr))); - + mb_smf_sc_mbs_session_new_ipv4((const struct in_addr*)src_addr, (const struct in_addr*)dest_addr))); } else { ogs_error("Unable to resolve SSM addresses for IPv4 address family"); if (ai_src) { @@ -1646,6 +1391,9 @@ bool UserDataIngSession::handleMbstfDiscover(ogs_sbi_nf_instance_t *nf_instance, } return false; } + } else { + ogs_error("Unable to resolve SSM addresses for IPv4 address family"); + return false; } if (ai_src) { freeaddrinfo(ai_src); @@ -1656,29 +1404,28 @@ bool UserDataIngSession::handleMbstfDiscover(ogs_sbi_nf_instance_t *nf_instance, freeaddrinfo(ai_dest); ai_dest = NULL; } + } else if (src_ipv6_addr && dest_ipv6_addr) { + struct addrinfo *ai_src = NULL, *ai_dest = NULL; + void *src_addr = NULL, *dest_addr = NULL; - } else if (client_ipv6_addr && dest_ipv6_addr.has_value()) { - struct addrinfo *ai_src = NULL, *ai_dest = NULL; - void *src_addr = NULL, *dest_addr = NULL; - std::shared_ptr< std::string > dest_ipv6 = dest_ipv6_addr.value(); - - if (resolve_src_dest_addr(src_ipv6_addr, *dest_ipv6, &ai_src, &ai_dest)) - { - if (get_src_dest_of_same_addr_family(AF_INET6, ai_src, ai_dest, &src_addr, &dest_addr)) - { - mb_smf_mbs_session.reset(new MBSMFMBSSession( - mb_smf_sc_mbs_session_new_ipv6((const struct in6_addr*)src_addr, - (const struct in6_addr*)dest_addr))); - + if (resolve_src_dest_addr(*src_ipv6_addr.value(), *dest_ipv6_addr.value(), &ai_src, &ai_dest)) + { + if (get_src_dest_of_same_addr_family(AF_INET6, ai_src, ai_dest, &src_addr, &dest_addr)) + { + mb_smf_mbs_session.reset(new MBSMFMBSSession( + mb_smf_sc_mbs_session_new_ipv6((const struct in6_addr*)src_addr, (const struct in6_addr*)dest_addr))); } else { ogs_error("Unable to resolve SSM addresses for IPv6 address family"); if (ai_src) freeaddrinfo(ai_src); if (ai_dest) freeaddrinfo(ai_dest); return false; } - } - if (ai_src) freeaddrinfo(ai_src); - if (ai_dest) freeaddrinfo(ai_dest); + } else { + ogs_error("Unable to resolve SSM addresses for IPv6 address family"); + return false; + } + if (ai_src) freeaddrinfo(ai_src); + if (ai_dest) freeaddrinfo(ai_dest); } else { ogs_error("Unable to resolve SSM addresses"); @@ -1695,28 +1442,11 @@ bool UserDataIngSession::handleMbstfDiscover(ogs_sbi_nf_instance_t *nf_instance, UserDataIngDistSessId *ids = new UserDataIngDistSessId(context_data->ingSessionId, context_data->distSessionInfoKey); mb_smf_mbs_session->setCreatedCallback(reinterpret_cast(ids) /*(context_data.get()*/); populate_mb_smf_mbs_session(context_data, mb_smf_mbs_session); - - /* - if ((context_data->info->getMbsDistSessState() == DistSessionState::VAL_ACTIVE ) || - (context_data->info->getMbsDistSessState() == DistSessionState::VAL_ESTABLISHED)) { - mb_smf_mbs_session->setActivityStatus(MBS_SESSION_ACTIVITY_STATUS_ACTIVE); - } else { - - mb_smf_mbs_session->setActivityStatus(MBS_SESSION_ACTIVITY_STATUS_INACTIVE); - } - - context_data->MBSSession = mb_smf_mbs_session; - - mb_smf_mbs_session->pushChanges(); - */ } else { ogs_info("MB-SMF SSM is not present"); } - removeFromRegistry(xact); - return true; - } void UserDataIngSession::deleteMBSTFSession(ogs_sbi_xact_t *xact) @@ -2356,34 +2086,9 @@ std::list > UserDataIngSession::distr } -std::optional >> UserDataIngSession::serviceScheduleDescs() +std::optional > > UserDataIngSession::serviceScheduleDescs() { - std::optional >> service_schedule_descs = std::nullopt; - //std::optional > - const ActPeriodsType &act_periods = m_MBSUserDataIngSession->getActPeriods(); - //std::optional > - const ActPeriodsRepRuleType &act_periods_rep_rule = m_MBSUserDataIngSession->getActPeriodsRepRule(); - - if (!act_periods.has_value() && !act_periods_rep_rule.has_value()) return service_schedule_descs; - - if (act_periods.has_value() && !act_periods->empty() && m_activePeriods && !m_activePeriods->versionedActPeriods().empty()) { - - service_schedule_descs = std::list>(); - for (const auto &versioned_act_period : m_activePeriods->versionedActPeriods()) { - const std::shared_ptr< TimeWindow > &time_window = versioned_act_period.second; - std::shared_ptr< ServiceScheduleDesc > service_schedule_desc( new ServiceScheduleDesc(m_UserDataIngSessionId, versioned_act_period.first, time_window->getStartTime(), time_window->getStopTime())); - //m_serviceScheduleDescs.insert(std::make_pair(service_schedule_desc->serviceScheduleDescriptionId(), service_schedule_desc)); - service_schedule_descs->push_back(service_schedule_desc); - } - //return service_schedule_descs; - - } else if (act_periods_rep_rule.has_value()) { - const std::shared_ptr &versioned_active_periods_rep_rule = m_activePeriodsRepRule->versionedActivePeriodsRepRule(); - std::shared_ptr< ServiceScheduleDesc > service_schedule_desc( new ServiceScheduleDesc(m_UserDataIngSessionId, versioned_active_periods_rep_rule->first, versioned_active_periods_rep_rule->second /*act_periods_rep_rule*/)); - service_schedule_descs->push_back(service_schedule_desc); - } - return service_schedule_descs; - + return m_activePeriods->serviceScheduleDescriptions(); } void UserDataIngSession::serviceScheduleDescsUpdate(std::shared_ptr mbs_user_data_ing_session) @@ -2453,76 +2158,6 @@ const std::shared_ptr &UserDataIngSession::mbsUserService() } -std::shared_ptr UserDataIngSession::versionedActPeriodsRepRule(const std::shared_ptr &versioned_repetition_rule, const ActPeriodsRepRuleType &act_periods_rep_rule) -{ - //std::optional > ActPeriodsRepRuleType - if (!act_periods_rep_rule.has_value()) return versioned_repetition_rule; - - std::shared_ptr< RepetitionRule > repetition_rule = act_periods_rep_rule.value(); - - if (!versioned_repetition_rule) { - auto rep_rule = std::make_shared(serviceScheduleDescVersion(), repetition_rule); - return rep_rule; - } - - std::shared_ptr< RepetitionRule > versioned_rep_rule = versioned_repetition_rule->second; - if (*repetition_rule == *versioned_rep_rule) { - return versioned_repetition_rule; - } - - auto v_rep_rule = std::make_shared(serviceScheduleDescVersion(), repetition_rule); - return v_rep_rule; -} - -std::list -UserDataIngSession::versionedActPeriods(const std::list &versioned_active_periods, const ActPeriodsType &active_periods) -{ - std::list result; - - if (!active_periods.has_value()) return versioned_active_periods; - - std::list >, fiveg_mag_reftools::OgsAllocator > > > - act_periods = active_periods.value(); - - // --- Case 1: active_periods is empty → return versioned_active_periods unchanged --- - if (act_periods.empty()) { - return versioned_active_periods; - } - - ogs_info("SIZE OF VERSION ACT PERIODS: %ld, ACT PERIODS: %ld", versioned_active_periods.size(), act_periods.size()); - - // --- Case 2: versioned_active_periods is empty → initialize from active_periods --- - if (versioned_active_periods.empty()) { - for (const auto &act_period : act_periods) { - if (!act_period.has_value()) continue; - result.emplace_back(serviceScheduleDescVersion(), act_period.value()); // default version = 1 - } - return result; - } - - for(const auto &act_period: act_periods) { - if (!act_period.has_value()) continue; - bool match = false; - const std::shared_ptr< TimeWindow > &non_versioned_time_window = act_period.value(); - - for(const auto &versioned_active_period: versioned_active_periods) { - const std::shared_ptr< TimeWindow > &versioned_time_window = versioned_active_period.second; - if (*non_versioned_time_window == *versioned_time_window) { - result.emplace_back(versioned_active_period.first, versioned_active_period.second); - match = true; - break; - } - } - if (!match) { - result.emplace_back(serviceScheduleDescVersion(), act_period.value()); - } - - } - - ogs_info("RET RESULT SIZE OF VERSION ACT PERIODS: %ld, ACT PERIODS: %ld", versioned_active_periods.size(), act_periods.size()); - return result; -} - static void process_mbs_distribution_session_info(std::shared_ptr< UserDataIngSession::ContextData > context_data, std::shared_ptr< DistSession > dist_session) { std::optional > obj_distribution_method_info = context_data->info->getObjDistrInfo(); @@ -2666,11 +2301,11 @@ static void handle_failed_mbstf_nf_instance_discover(ogs_sbi_xact_t *xact) } -static std::int64_t duration_timer(const std::chrono::system_clock::time_point &tp) { +static int64_t duration_timer(const std::chrono::system_clock::time_point &tp) { const std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); auto diff = std::chrono::duration_cast(tp - now).count(); - if (diff <= 0) return static_cast(1); - return static_cast(diff); + if (diff <= 0) return 0; + return static_cast(diff); } static bool validate_state_setting_options(std::shared_ptr user_data_ing_session, Open5GSSBIStream &stream, Open5GSSBIMessage &message, const NfServer::AppMetadata &app_meta, std::optional api) @@ -2790,7 +2425,7 @@ static std::shared_ptr populate_mb_smf_mbs_session(std::shared_ static void send_invalid_user_data_ing_session_err(const std::out_of_range &e, Open5GSSBIStream &stream, size_t number_of_components, const Open5GSSBIMessage &message, const NfServer::AppMetadata &app_meta, - std::optional api, + const std::optional &api, const std::string &user_data_ing_session_id) { @@ -2809,6 +2444,41 @@ static void send_invalid_user_data_ing_session_err(const std::out_of_range &e, O } +static uint64_t get_next_tsi() +{ + uint64_t ret = g_next_tsi++; + if (g_next_tsi < 2) g_next_tsi = 2; + return ret; +} + +static void send_model_error(const ModelException &err, Open5GSSBIStream &stream, int path_segments, Open5GSSBIMessage &message, + const NfServer::AppMetadata &app_meta, const std::optional &api, + const std::string &no_cause_reason, const std::string &log_prefix) +{ + std::ostringstream error_oss; + std::ostringstream oss; + std::optional > invalid_params = std::nullopt; + + if (!err.parameter.empty()) { + invalid_params = std::map{ {err.parameter, err.what()} }; + error_oss << err.parameter << ": "; + } + error_oss << err.what(); + const std::string &error = error_oss.str(); + + if (err.cause) { + auto cause = err.cause.value(); + oss << cause.reason() << ": " << error; + ogs_assert(true == NfServer::sendError(stream, cause, path_segments, message, app_meta, api, cause.reason(), error, std::nullopt, + invalid_params)); + } else { + oss << no_cause_reason << ": " << error; + ogs_assert(true == NfServer::sendError(stream, OGS_SBI_HTTP_STATUS_BAD_REQUEST, path_segments, message, app_meta, api, no_cause_reason, + error)); + } + ogs_error("%s: %s", log_prefix.c_str(), oss.str().c_str()); +} + MBSF_NAMESPACE_STOP /* vim:ts=8:sts=4:sw=4:expandtab: diff --git a/src/mbsf/UserDataIngSession.hh b/src/mbsf/UserDataIngSession.hh index c8189c6..ef7fe1c 100644 --- a/src/mbsf/UserDataIngSession.hh +++ b/src/mbsf/UserDataIngSession.hh @@ -30,6 +30,7 @@ #include #include #include +#include "openapi/model/DistSessionState.h" #include "openapi/model/MBSUserDataIngSession.h" #include "openapi/model/MBSDistributionSessionInfo.h" #include "common.hh" @@ -88,6 +89,7 @@ public: std::shared_ptr distributionSessionInfo; std::shared_ptr info; std::shared_ptr ssm; + in_port_t ssm_port; std::shared_ptr request; ogs_pool_id_t streamId; std::shared_ptr MBSSession = nullptr; @@ -100,13 +102,15 @@ public: bool stateUpdate = false; bool needsUpdate = false; bool markForDeletion = false; - std::string distSessionId; - bool MBSTFDistSessionDeleted; - std::string mbstfNFInstanceId; - std::string mbstfDistSessionId; - bool distSessionState; + std::string distSessionId = std::string{}; + bool MBSTFDistSessionDeleted = false; + std::string mbstfNFInstanceId = std::string{}; + std::string mbstfDistSessionId = std::string{}; + bool distSessionState = false; mb_smf_sc_tmgi_t *tmgi = nullptr; - std::string mbstfNotificationUrl; + std::string mbstfNotificationUrl = std::string{}; + uint64_t tsi; + reftools::mbsf::DistSessionState last_reported_state; }; UserDataIngSession(fiveg_mag_reftools::CJson &json, bool as_request); @@ -133,35 +137,21 @@ public: const SysTimeMS &generated() const {return m_generated;}; const std::string &hash() const {return m_hash;}; const int32_t serviceScheduleDescVersion() {return m_serviceScheduleDescriptionVersion++;}; - const std::string &distSessionState() const ; ogs_sbi_xact_t *nmbstfDiscoverOnly(std::shared_ptr< ContextData > data); ogs_sbi_xact_t *nmbstfDiscoverAndSend( std::shared_ptr< UserDataIngSession::UserDataIngDistSessId> ids, ogs_sbi_build_f build, void *context, void *data); UserDataIngSession &setNFInstance(ogs_sbi_service_type_e service_type, ogs_sbi_nf_instance_t *nf_instance); - UserDataIngSession &alwaysActive() {m_alwaysActive.reset(new AlwaysActive()); return *this;}; - UserDataIngSession &alwaysActive(std::shared_ptr always_active) {m_alwaysActive = always_active; return *this;}; - UserDataIngSession &resetAlwaysActive() {m_alwaysActive.reset(); m_alwaysActive = nullptr; return *this;}; + UserDataIngSession &alwaysActive() {m_activePeriods.reset(new AlwaysActive(m_UserDataIngSessionId)); return *this;}; + UserDataIngSession &activePeriods(const ActPeriodsType &act_periods) {m_activePeriods.reset(new ActivePeriods(act_periods, m_activePeriods, *this)); return *this;}; + UserDataIngSession &activePeriodsRepRule(const ActPeriodsRepRuleType &act_periods_rep_rule) {m_activePeriods.reset(new ActivePeriodsRepRule(act_periods_rep_rule, m_activePeriods, *this)); return *this;}; - UserDataIngSession &activePeriods(const ActPeriodsType &act_periods) {m_activePeriods.reset(new ActivePeriods(act_periods)); return *this;}; - UserDataIngSession &activePeriods(std::shared_ptr active_periods) {m_activePeriods = active_periods; return *this;}; - UserDataIngSession &resetActivePeriods() {m_activePeriods.reset(); m_activePeriods = nullptr; return *this;}; - - UserDataIngSession &activePeriodsRepRule(const ActPeriodsRepRuleType &act_periods_rep_rule) {m_activePeriodsRepRule.reset(new ActivePeriodsRepRule(act_periods_rep_rule)); return *this;}; - UserDataIngSession &activePeriodsRepRule(std::shared_ptr active_periods_rep_rule) {m_activePeriodsRepRule = active_periods_rep_rule; return *this;}; - UserDataIngSession &resetActivePeriodsRepRule() {m_activePeriodsRepRule.reset(); m_activePeriodsRepRule = nullptr; return *this;}; - - - UserDataIngSession ¤tDistSessionState(const std::string &state) {m_currentDistSessionState = state; return *this;}; UserDataIngSession &userServiceAnnouncement(const std::shared_ptr &user_service_description); UserDataIngSession &createTimer(); UserDataIngSession &createCurrentStateTimer(); bool startTimer(); - std::shared_ptr getDistSessionState(); - const reftools::mbsf::DistSessionState getNextDistSessionState() const; - const reftools::mbsf::DistSessionState getdistSessState() const; - + const reftools::mbsf::DistSessionState &getDistSessionState(const std::optional > &user_state) const; void processUserDataIngSessionUpdate(ogs_pool_id_t stream_id, std::shared_ptr &request, fiveg_mag_reftools::CJson &json); void processDistributionSessionInfo(ogs_pool_id_t stream_id, std::shared_ptr &request); @@ -199,16 +189,12 @@ public: void resetMBSDistributionSessionsEstablishedFlag(); std::list > distributionSessionDescs(); - std::optional >> serviceScheduleDescs(); + std::optional > > serviceScheduleDescs(); std::shared_ptr userServiceDesc(); - std::map> &distributionSessionInfos(); + std::map > &distributionSessionInfos(); std::map > &getServiceScheduleDescs(); void serviceScheduleDescsUpdate(std::shared_ptr mbs_user_data_ing_session); - std::list versionedActPeriods(const std::list &versioned_active_periods, - const ActPeriodsType &active_periods); - - std::shared_ptr versionedActPeriodsRepRule(const std::shared_ptr &versioned_repetition_rule, const ActPeriodsRepRuleType &act_periods_rep_rule); static const char *localEventGetName( ogs_event_t *event); @@ -220,6 +206,7 @@ public: static void setMBSTFDistSessionDeletedFlag(std::string &dist_session_id); static bool processEvent(Open5GSEvent &event); static bool handleMbstfDiscover(ogs_sbi_nf_instance_t *nf_instance, ogs_sbi_xact_t *xact); + static bool createMbsSession(const std::shared_ptr &context_data); static bool processDistSession(std::shared_ptr dist_session); static std::shared_ptr getOperatingMode(std::shared_ptr &info); @@ -278,6 +265,9 @@ public: static void clearRegistries() { std::lock_guard lock(s_registry_mutex); s_xactRegistry.clear(); s_distSessionIdRegistry.clear(); }; private: + void updateContexts(ogs_pool_id_t stream_id, std::shared_ptr &request); + void _changeDistSessionState(); + static std::recursive_mutex s_registry_mutex; static std::map> s_xactRegistry; static std::map> s_distSessionIdRegistry; @@ -288,13 +278,8 @@ private: SysTimeMS m_lastUsed; std::string m_hash; std::string m_UserDataIngSessionId; - std::shared_ptr m_alwaysActive; - std::shared_ptr m_activePeriods; - std::shared_ptr m_activePeriodsRepRule; + std::shared_ptr m_activePeriods; std::unique_ptr m_activePeriodsTimer; - reftools::mbsf::DistSessionState m_distSessionState; - reftools::mbsf::DistSessionState m_currentDistSessionState; - reftools::mbsf::DistSessionState m_desiredDistSessionState; bool m_startTimer; int32_t m_serviceScheduleDescriptionVersion; // next ver no.