diff --git a/agent/crates/enterprise-utils/src/lib.rs b/agent/crates/enterprise-utils/src/lib.rs index f187886c131..a1520f7b86d 100644 --- a/agent/crates/enterprise-utils/src/lib.rs +++ b/agent/crates/enterprise-utils/src/lib.rs @@ -311,6 +311,13 @@ pub mod l7 { } } + pub fn find_bind_values<'a>( + _: &'a [u8], + _: usize, + ) -> Option + 'a> { + Some(std::iter::empty()) + } + #[derive(Serialize, Clone, Copy, Debug, Default, PartialEq)] pub enum TnsPacketType { #[default] diff --git a/agent/src/flow_generator/protocol_logs/sql/oracle.rs b/agent/src/flow_generator/protocol_logs/sql/oracle.rs index 411ef3eca83..ba44a846992 100644 --- a/agent/src/flow_generator/protocol_logs/sql/oracle.rs +++ b/agent/src/flow_generator/protocol_logs/sql/oracle.rs @@ -17,8 +17,13 @@ use std::borrow::Cow; use serde::Serialize; +use sqlparser::{ + dialect::GenericDialect, + tokenizer::{Token, Tokenizer}, +}; use super::super::value_is_default; +use super::sql_check::trim_head_comment_and_get_first_word; use crate::config::handler::LogParserConfig; use crate::flow_generator::{ protocol_logs::{ @@ -44,12 +49,13 @@ use enterprise_utils::l7::{ custom_policy::{ custom_field_policy::{ enums::{Op, Source}, - Store, + PolicySlice, Store, }, enums::TrafficDirection, }, sql::oracle::{ - Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser, TnsPacketType, + find_bind_values, Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser, + TnsPacketType, }, }; use public::l7_protocol::{Field, FieldSetter, L7Log, L7LogAttribute, L7Protocol, LogMessageType}; @@ -294,6 +300,137 @@ impl From<&OracleInfo> for LogCache { pub struct OracleLog { perf_stats: Vec, custom_field_store: Store, + cached_info: Option, +} + +struct CachedOracleInfo { + info: OracleInfo, + perf_stats_recorded: bool, +} + +const INSERT_KEYWORD: &[u8] = b"INSERT"; +const UPDATE_KEYWORD: &[u8] = b"UPDATE"; +const SET_KEYWORD: &[u8] = b"set"; +const VALUES_KEYWORD: &[u8] = b"values"; + +fn sql_has_bind_placeholder(sql: &str) -> bool { + let Some(scan_start) = upsert_bind_scan_start(sql) else { + return false; + }; + + let sql = &sql[scan_start..]; + let dialect = GenericDialect; + let Ok(tokens) = Tokenizer::new(&dialect, sql) + .with_unescape(false) + .tokenize() + else { + return false; + }; + + // `sqlparser` splits `:1` into `Colon + Number("1")`, + // and `:x1` into `Colon + Word("x1")`. + tokens + .windows(2) + .any(|window| matches!(window[0], Token::Colon) && token_bind_placeholder(&window[1])) +} + +// 这里需要和 SqlUpsertColumnExtractor::FieldIter::new() 保持一致。 +// 目前只支持带空白边界的简单 upsert 语句: +// `insert ... values ...`、`insert ... set ...`、`update ... set ...`。 +// 像 `insert into t(a)values(:1)` 这种没有空白边界的形式暂时不支持。 +// +// This must stay aligned with SqlUpsertColumnExtractor::FieldIter::new(). +// We currently only support whitespace-delimited simple upsert statements: +// `insert ... values ...`, `insert ... set ...`, and `update ... set ...`. +// Forms such as `insert into t(a)values(:1)` are intentionally unsupported. +fn upsert_bind_scan_start(sql: &str) -> Option { + let sql = sql.as_bytes(); + let first = trim_head_comment_and_get_first_word(sql, 6)?; + if first.eq_ignore_ascii_case(INSERT_KEYWORD) { + find_keyword_with_spaces(sql, VALUES_KEYWORD) + .or_else(|| find_keyword_with_spaces(sql, SET_KEYWORD)) + .map(|(index, len)| index + 1 + len) + } else if first.eq_ignore_ascii_case(UPDATE_KEYWORD) { + find_keyword_with_spaces(sql, SET_KEYWORD).map(|(index, len)| index + 1 + len) + } else { + None + } +} + +fn find_keyword_with_spaces(sql: &[u8], keyword: &[u8]) -> Option<(usize, usize)> { + if keyword.is_empty() || sql.len() < keyword.len() + 2 { + return None; + } + + sql.windows(keyword.len() + 2) + .position(|window| { + window[0].is_ascii_whitespace() + && window[1..1 + keyword.len()].eq_ignore_ascii_case(keyword) + && window[1 + keyword.len()].is_ascii_whitespace() + }) + .map(|index| (index, keyword.len())) +} + +fn token_bind_placeholder(token: &Token) -> bool { + match token { + Token::Number(value, _) => value.parse::().ok().is_some_and(|n| n > 0), + Token::Word(word) => word_bind_placeholder(&word.value), + _ => false, + } +} + +fn word_bind_placeholder(word: &str) -> bool { + let value = word + .strip_prefix('x') + .or_else(|| word.strip_prefix('X')) + .unwrap_or(word); + !value.is_empty() + && value.bytes().all(|b| b.is_ascii_digit()) + && value.parse::().ok().is_some_and(|n| n > 0) +} + +impl OracleLog { + fn cache_pending_request(&mut self, mut info: OracleInfo, param: &ParseParam) { + let perf_stats_recorded = self.record_perf_stats(&mut info, param); + self.cached_info = Some(CachedOracleInfo { + info, + perf_stats_recorded, + }); + } + + fn apply_custom_field_operations( + &mut self, + policies: PolicySlice, + info: &mut OracleInfo, + frame_payload: &[u8], + ) { + policies.apply( + &mut self.custom_field_store, + info, + TrafficDirection::REQUEST, + Source::Sql(&info.sql, Some(frame_payload)), + ); + for op in self.custom_field_store.drain_with(policies, info) { + match &op.op { + Op::AddMetric(_, _) | Op::SaveHeader(_) | Op::SavePayload(_) => (), + _ => auto_merge_custom_field(op, info), + } + } + } + + fn record_perf_stats(&mut self, log_info: &mut OracleInfo, param: &ParseParam) -> bool { + if !param.parse_perf { + return false; + } + + let mut perf_stat = L7PerfStats::default(); + if let Some(stats) = log_info.perf_stats(param) { + log_info.rrt = stats.rrt_sum; + perf_stat.sequential_merge(&stats); + } + self.perf_stats.push(perf_stat); + true + } } impl L7ProtocolParserInterface for OracleLog { @@ -335,6 +472,7 @@ impl L7ProtocolParserInterface for OracleLog { let mut info = vec![]; for frame in frames { let frame_payload = frame.payload; + let is_request = matches!(&frame.body, Body::Request(_)); let mut log_info = match frame.body { Body::Request(req) => OracleInfo { msg_type: param.direction.into(), @@ -366,33 +504,56 @@ impl L7ProtocolParserInterface for OracleLog { }, }; + if let Some(config) = param.parse_config { + log_info.set_is_on_blacklist(config); + } + + let mut custom_fields_applied = false; + // 缓存 SQL-only request 时已经记录过 perf stats,用来固定 RRT 起点在原始 SQL 帧。 + // 后续 bind-only frame 合并回来时要跳过第二次 perf 记账,避免重复更新 perf cache。 + // + // The cached SQL-only request already recorded perf stats so the RRT start + // stays on the original SQL frame. When a later bind-only frame merges back + // into it, skip a second perf accounting pass to avoid double-updating the cache. + let mut perf_stats_recorded = false; if let Some(policies) = custom_policies { - if !log_info.sql.is_empty() { - policies.apply( - &mut self.custom_field_store, - &log_info, - TrafficDirection::REQUEST, - Source::Sql(&log_info.sql, Some(frame_payload)), - ); - for op in self.custom_field_store.drain_with(policies, &log_info) { - match &op.op { - Op::AddMetric(_, _) | Op::SaveHeader(_) | Op::SavePayload(_) => (), - _ => auto_merge_custom_field(op, &mut log_info), - } + // Oracle 请求里,SQL 文本和 bind value 可能分布在两个 request frame 中。 + // 先缓存只携带 SQL 的请求,等待下一帧带上 bind value 后再重新执行自定义 SQL 提取。 + // + // In Oracle requests, the SQL text and bind values may be split across two + // request frames. Cache the SQL-only request first, then re-run custom SQL + // extraction when the next request frame carries the bind values. + if !is_request { + if let Some(cached) = self.cached_info.take() { + info.push(L7ProtocolInfo::OracleInfo(cached.info)); + } + } else if let Some(mut cached) = self.cached_info.take() { + if log_info.sql.is_empty() && find_bind_values(frame_payload, 0).is_some() { + cached.info.merge(&mut log_info); + perf_stats_recorded = cached.perf_stats_recorded; + log_info = cached.info; + self.apply_custom_field_operations(policies, &mut log_info, frame_payload); + custom_fields_applied = true; + } else { + info.push(L7ProtocolInfo::OracleInfo(cached.info)); } } - } - if let Some(config) = param.parse_config { - log_info.set_is_on_blacklist(config); - } - if param.parse_perf { - let mut perf_stat = L7PerfStats::default(); - if let Some(stats) = log_info.perf_stats(param) { - log_info.rrt = stats.rrt_sum; - perf_stat.sequential_merge(&stats); + if !custom_fields_applied && !log_info.sql.is_empty() { + let has_bind_placeholder = sql_has_bind_placeholder(&log_info.sql); + let can_apply_custom_fields = + !has_bind_placeholder || find_bind_values(frame_payload, 0).is_some(); + if can_apply_custom_fields { + self.apply_custom_field_operations(policies, &mut log_info, frame_payload); + } else { + self.cache_pending_request(log_info, param); + continue; + } } - self.perf_stats.push(perf_stat); + } + + if !perf_stats_recorded { + self.record_perf_stats(&mut log_info, param); } info.push(L7ProtocolInfo::OracleInfo(log_info)); @@ -412,3 +573,26 @@ impl L7ProtocolParserInterface for OracleLog { false } } + +#[cfg(test)] +mod tests { + use super::sql_has_bind_placeholder; + + #[test] + fn sql_has_bind_placeholder_only_tracks_simple_upsert() { + assert!(!sql_has_bind_placeholder("select ':1' from dual")); + assert!(!sql_has_bind_placeholder("select * from t where a = :1")); + assert!(sql_has_bind_placeholder( + "insert into t values (':1', :1, :x2, /* :3 */ :4)" + )); + assert!(sql_has_bind_placeholder( + "update t set a = :1, b = ':2', c = :X51 -- :9" + )); + assert!(sql_has_bind_placeholder( + "insert into t set a = ':1', b = :1" + )); + assert!(!sql_has_bind_placeholder( + "insert into t select :1 from dual" + )); + } +}