Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agent/crates/enterprise-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,13 @@ pub mod l7 {
}
}

pub fn find_bind_values<'a>(
_: &'a [u8],
_: usize,
) -> Option<impl Iterator<Item = ()> + 'a> {
Some(std::iter::empty())
}

#[derive(Serialize, Clone, Copy, Debug, Default, PartialEq)]
pub enum TnsPacketType {
#[default]
Expand Down
232 changes: 208 additions & 24 deletions agent/src/flow_generator/protocol_logs/sql/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
use std::borrow::Cow;

use serde::Serialize;
use sqlparser::{
dialect::GenericDialect,
tokenizer::{Token, Tokenizer},
};

use super::super::value_is_default;
use super::sql_check::trim_head_comment_and_get_first_word;
use crate::config::handler::LogParserConfig;
use crate::flow_generator::{
protocol_logs::{
Expand All @@ -44,12 +49,13 @@ use enterprise_utils::l7::{
custom_policy::{
custom_field_policy::{
enums::{Op, Source},
Store,
PolicySlice, Store,
},
enums::TrafficDirection,
},
sql::oracle::{
Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser, TnsPacketType,
find_bind_values, Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser,
TnsPacketType,
},
};
use public::l7_protocol::{Field, FieldSetter, L7Log, L7LogAttribute, L7Protocol, LogMessageType};
Expand Down Expand Up @@ -294,6 +300,137 @@ impl From<&OracleInfo> for LogCache {
pub struct OracleLog {
perf_stats: Vec<L7PerfStats>,
custom_field_store: Store,
cached_info: Option<CachedOracleInfo>,
}

struct CachedOracleInfo {
info: OracleInfo,
perf_stats_recorded: bool,
}

const INSERT_KEYWORD: &[u8] = b"INSERT";
const UPDATE_KEYWORD: &[u8] = b"UPDATE";
const SET_KEYWORD: &[u8] = b"set";
const VALUES_KEYWORD: &[u8] = b"values";

fn sql_has_bind_placeholder(sql: &str) -> bool {
let Some(scan_start) = upsert_bind_scan_start(sql) else {
return false;
};

let sql = &sql[scan_start..];
let dialect = GenericDialect;
let Ok(tokens) = Tokenizer::new(&dialect, sql)
.with_unescape(false)
.tokenize()
else {
return false;
};

// `sqlparser` splits `:1` into `Colon + Number("1")`,
// and `:x1` into `Colon + Word("x1")`.
tokens
.windows(2)
.any(|window| matches!(window[0], Token::Colon) && token_bind_placeholder(&window[1]))
}

// 这里需要和 SqlUpsertColumnExtractor::FieldIter::new() 保持一致。
// 目前只支持带空白边界的简单 upsert 语句:
// `insert ... values ...`、`insert ... set ...`、`update ... set ...`。
// 像 `insert into t(a)values(:1)` 这种没有空白边界的形式暂时不支持。
//
// This must stay aligned with SqlUpsertColumnExtractor::FieldIter::new().
// We currently only support whitespace-delimited simple upsert statements:
// `insert ... values ...`, `insert ... set ...`, and `update ... set ...`.
// Forms such as `insert into t(a)values(:1)` are intentionally unsupported.
fn upsert_bind_scan_start(sql: &str) -> Option<usize> {
let sql = sql.as_bytes();
let first = trim_head_comment_and_get_first_word(sql, 6)?;
if first.eq_ignore_ascii_case(INSERT_KEYWORD) {
find_keyword_with_spaces(sql, VALUES_KEYWORD)
.or_else(|| find_keyword_with_spaces(sql, SET_KEYWORD))
.map(|(index, len)| index + 1 + len)
} else if first.eq_ignore_ascii_case(UPDATE_KEYWORD) {
find_keyword_with_spaces(sql, SET_KEYWORD).map(|(index, len)| index + 1 + len)
} else {
None
}
}

fn find_keyword_with_spaces(sql: &[u8], keyword: &[u8]) -> Option<(usize, usize)> {
if keyword.is_empty() || sql.len() < keyword.len() + 2 {
return None;
}

sql.windows(keyword.len() + 2)
.position(|window| {
window[0].is_ascii_whitespace()
&& window[1..1 + keyword.len()].eq_ignore_ascii_case(keyword)
&& window[1 + keyword.len()].is_ascii_whitespace()
})
.map(|index| (index, keyword.len()))
}

fn token_bind_placeholder(token: &Token) -> bool {
match token {
Token::Number(value, _) => value.parse::<usize>().ok().is_some_and(|n| n > 0),
Token::Word(word) => word_bind_placeholder(&word.value),
_ => false,
}
}

fn word_bind_placeholder(word: &str) -> bool {
let value = word
.strip_prefix('x')
.or_else(|| word.strip_prefix('X'))
.unwrap_or(word);
!value.is_empty()
&& value.bytes().all(|b| b.is_ascii_digit())
&& value.parse::<usize>().ok().is_some_and(|n| n > 0)
}

impl OracleLog {
fn cache_pending_request(&mut self, mut info: OracleInfo, param: &ParseParam) {
let perf_stats_recorded = self.record_perf_stats(&mut info, param);
self.cached_info = Some(CachedOracleInfo {
info,
perf_stats_recorded,
});
}

fn apply_custom_field_operations(
&mut self,
policies: PolicySlice,
info: &mut OracleInfo,
frame_payload: &[u8],
) {
policies.apply(
&mut self.custom_field_store,
info,
TrafficDirection::REQUEST,
Source::Sql(&info.sql, Some(frame_payload)),
);
for op in self.custom_field_store.drain_with(policies, info) {
match &op.op {
Op::AddMetric(_, _) | Op::SaveHeader(_) | Op::SavePayload(_) => (),
_ => auto_merge_custom_field(op, info),
}
}
}

fn record_perf_stats(&mut self, log_info: &mut OracleInfo, param: &ParseParam) -> bool {
if !param.parse_perf {
return false;
}

let mut perf_stat = L7PerfStats::default();
if let Some(stats) = log_info.perf_stats(param) {
log_info.rrt = stats.rrt_sum;
perf_stat.sequential_merge(&stats);
}
self.perf_stats.push(perf_stat);
true
}
}

impl L7ProtocolParserInterface for OracleLog {
Expand Down Expand Up @@ -335,6 +472,7 @@ impl L7ProtocolParserInterface for OracleLog {
let mut info = vec![];
for frame in frames {
let frame_payload = frame.payload;
let is_request = matches!(&frame.body, Body::Request(_));
let mut log_info = match frame.body {
Body::Request(req) => OracleInfo {
msg_type: param.direction.into(),
Expand Down Expand Up @@ -366,33 +504,56 @@ impl L7ProtocolParserInterface for OracleLog {
},
};

if let Some(config) = param.parse_config {
log_info.set_is_on_blacklist(config);
}

let mut custom_fields_applied = false;
// 缓存 SQL-only request 时已经记录过 perf stats,用来固定 RRT 起点在原始 SQL 帧。
// 后续 bind-only frame 合并回来时要跳过第二次 perf 记账,避免重复更新 perf cache。
//
// The cached SQL-only request already recorded perf stats so the RRT start
// stays on the original SQL frame. When a later bind-only frame merges back
// into it, skip a second perf accounting pass to avoid double-updating the cache.
let mut perf_stats_recorded = false;
if let Some(policies) = custom_policies {
if !log_info.sql.is_empty() {
policies.apply(
&mut self.custom_field_store,
&log_info,
TrafficDirection::REQUEST,
Source::Sql(&log_info.sql, Some(frame_payload)),
);
for op in self.custom_field_store.drain_with(policies, &log_info) {
match &op.op {
Op::AddMetric(_, _) | Op::SaveHeader(_) | Op::SavePayload(_) => (),
_ => auto_merge_custom_field(op, &mut log_info),
}
// Oracle 请求里,SQL 文本和 bind value 可能分布在两个 request frame 中。
// 先缓存只携带 SQL 的请求,等待下一帧带上 bind value 后再重新执行自定义 SQL 提取。
//
// In Oracle requests, the SQL text and bind values may be split across two
// request frames. Cache the SQL-only request first, then re-run custom SQL
// extraction when the next request frame carries the bind values.
if !is_request {
if let Some(cached) = self.cached_info.take() {
info.push(L7ProtocolInfo::OracleInfo(cached.info));
}
} else if let Some(mut cached) = self.cached_info.take() {
if log_info.sql.is_empty() && find_bind_values(frame_payload, 0).is_some() {
cached.info.merge(&mut log_info);
perf_stats_recorded = cached.perf_stats_recorded;
log_info = cached.info;
self.apply_custom_field_operations(policies, &mut log_info, frame_payload);
custom_fields_applied = true;
} else {
info.push(L7ProtocolInfo::OracleInfo(cached.info));
}
}
}

if let Some(config) = param.parse_config {
log_info.set_is_on_blacklist(config);
}
if param.parse_perf {
let mut perf_stat = L7PerfStats::default();
if let Some(stats) = log_info.perf_stats(param) {
log_info.rrt = stats.rrt_sum;
perf_stat.sequential_merge(&stats);
if !custom_fields_applied && !log_info.sql.is_empty() {
let has_bind_placeholder = sql_has_bind_placeholder(&log_info.sql);
let can_apply_custom_fields =
!has_bind_placeholder || find_bind_values(frame_payload, 0).is_some();
if can_apply_custom_fields {
self.apply_custom_field_operations(policies, &mut log_info, frame_payload);
} else {
self.cache_pending_request(log_info, param);
continue;
}
}
self.perf_stats.push(perf_stat);
}

if !perf_stats_recorded {
self.record_perf_stats(&mut log_info, param);
}

info.push(L7ProtocolInfo::OracleInfo(log_info));
Expand All @@ -412,3 +573,26 @@ impl L7ProtocolParserInterface for OracleLog {
false
}
}

#[cfg(test)]
mod tests {
use super::sql_has_bind_placeholder;

#[test]
fn sql_has_bind_placeholder_only_tracks_simple_upsert() {
assert!(!sql_has_bind_placeholder("select ':1' from dual"));
assert!(!sql_has_bind_placeholder("select * from t where a = :1"));
assert!(sql_has_bind_placeholder(
"insert into t values (':1', :1, :x2, /* :3 */ :4)"
));
assert!(sql_has_bind_placeholder(
"update t set a = :1, b = ':2', c = :X51 -- :9"
));
assert!(sql_has_bind_placeholder(
"insert into t set a = ':1', b = :1"
));
assert!(!sql_has_bind_placeholder(
"insert into t select :1 from dual"
));
}
}
Loading