Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-machine.php
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,9 @@ function datamachine_activate_for_site() {
// Migrate agent_ping step types to flow configs (idempotent).
datamachine_migrate_agent_ping_to_system_task();

// Migrate agent_ping step types to pipeline configs (idempotent).
datamachine_migrate_agent_ping_pipeline_to_system_task();

// Migrate `update` step type to `upsert` in pipeline/flow configs (idempotent).
datamachine_migrate_update_to_upsert_step_type();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ export default function FlowStepCard( {
const handlerSlugs = flowStepConfig?.handler_slugs || [];
const primarySlug = handlerSlugs[0];
const primaryConfig = primarySlug && flowStepConfig?.handler_configs?.[primarySlug];
return primaryConfig?.prompt || '';
// Support both top-level prompt (legacy) and nested params.prompt (system_task).
return primaryConfig?.prompt || primaryConfig?.params?.prompt || '';
}, [ isAiStep, flowStepConfig.user_message, flowStepConfig?.handler_slugs, flowStepConfig?.handler_configs ] );

// Determine if this step type shows a prompt field.
Expand All @@ -77,7 +78,10 @@ export default function FlowStepCard( {
const handlerSlugs = flowStepConfig?.handler_slugs || [];
const primarySlug = handlerSlugs[0];
const primaryConfig = primarySlug && flowStepConfig?.handler_configs?.[primarySlug];
const hasPromptConfig = primaryConfig && primaryConfig.prompt !== undefined;
const hasPromptConfig = primaryConfig && (
primaryConfig.prompt !== undefined ||
primaryConfig.params?.prompt !== undefined
);
const showPromptField = isAiStep || shouldShowQueue || hasPromptConfig;

// Fields to exclude from inline config (handled by QueueablePromptField).
Expand All @@ -92,14 +96,28 @@ export default function FlowStepCard( {
const handlePromptSave = useCallback(
async ( value ) => {
try {
const config = isAiStep
? { user_message: value }
: {
let config;
if ( isAiStep ) {
config = { user_message: value };
} else if ( primaryConfig?.params?.prompt !== undefined ) {
// system_task with nested params (e.g. agent_ping).
config = {
handler_config: {
...( primaryConfig || {} ),
prompt: value
}
params: {
...( primaryConfig.params || {} ),
prompt: value,
},
},
};
} else {
config = {
handler_config: {
...( primaryConfig || {} ),
prompt: value,
},
};
}

const response = await updateFlowStepConfig( flowStepId, config );
if ( ! response?.success ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,19 @@ export default function PipelineStepCard( {
// Use TanStack Query for data
const { data: stepTypes = {} } = useStepTypes();
const isAiStep = step.step_type === 'ai';
const isSystemTask = step.step_type === 'system_task';

const stepConfig = pipelineConfig[ step.pipeline_step_id ] || null;

// Resolve display label: step type registry, then legacy fallback for agent_ping.
const displayLabel = stepTypes[ step.step_type ]?.label
|| ( step.step_type === 'agent_ping' ? __( 'Agent Ping', 'data-machine' ) : step.step_type );

// For system_task steps, show the task name badge (e.g. "Agent Ping").
const systemTaskName = isSystemTask
? ( stepConfig?.handler_configs?.system_task?.task || step.handler_configs?.system_task?.task || '' )
: '';

/**
* Save system prompt to API (AI steps)
*/
Expand Down Expand Up @@ -106,8 +116,13 @@ export default function PipelineStepCard( {
<CardBody>
<div className="datamachine-step-card-header">
<strong>
{ stepTypes[ step.step_type ]?.label || step.step_type }
{ displayLabel }
</strong>
{ systemTaskName && (
<span className="datamachine-step-card-task-badge">
{ systemTaskName }
</span>
) }
</div>

{ /* AI Step: inline system prompt editor */ }
Expand Down
108 changes: 107 additions & 1 deletion inc/migrations/agent-ping.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
/**
* Data Machine — Agent ping migration.
*
* Migrates agent_ping step types to system_task steps in flow configs.
* Migrates agent_ping step types to system_task steps in flow configs
* and pipeline configs.
*
* @package DataMachine
* @since 0.60.0
Expand Down Expand Up @@ -117,3 +118,108 @@ function datamachine_migrate_agent_ping_to_system_task(): void {
);
}
}

/**
* Migrate agent_ping step types to system_task steps in pipeline configs.
*
* Follow-up to datamachine_migrate_agent_ping_to_system_task() which only
* migrated flow configs. Pipeline configs were missed, leaving orphaned
* agent_ping steps in the pipeline UI.
*
* Idempotent: guarded by datamachine_agent_ping_pipeline_migrated option.
*
* @since 0.73.0
*/
function datamachine_migrate_agent_ping_pipeline_to_system_task(): void {
if ( get_option( 'datamachine_agent_ping_pipeline_migrated', false ) ) {
return;
}

global $wpdb;
$table = $wpdb->prefix . 'datamachine_pipelines';

// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery
// phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix.
$table_exists = $wpdb->get_var( $wpdb->prepare( 'SHOW TABLES LIKE %s', $table ) );
// phpcs:enable WordPress.DB.PreparedSQL
if ( ! $table_exists ) {
update_option( 'datamachine_agent_ping_pipeline_migrated', true, true );
return;
}

// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching
// phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix.
$rows = $wpdb->get_results( "SELECT pipeline_id, pipeline_config FROM {$table}", ARRAY_A );
// phpcs:enable WordPress.DB.PreparedSQL
if ( empty( $rows ) ) {
update_option( 'datamachine_agent_ping_pipeline_migrated', true, true );
return;
}

$migrated = 0;

foreach ( $rows as $row ) {
$pipeline_config = json_decode( $row['pipeline_config'], true );
if ( ! is_array( $pipeline_config ) ) {
continue;
}

$changed = false;
foreach ( $pipeline_config as $step_id => &$step ) {
if ( ! is_array( $step ) ) {
continue;
}

// Only convert steps with agent_ping step type.
if ( 'agent_ping' !== ( $step['step_type'] ?? '' ) ) {
continue;
}

// Extract existing agent_ping handler config (if present).
$old_config = $step['handler_configs']['agent_ping'] ?? array();

// Build new system_task handler_config.
$new_config = array(
'task' => 'agent_ping',
'params' => array(
'webhook_url' => $old_config['webhook_url'] ?? '',
'prompt' => $old_config['prompt'] ?? '',
'auth_header_name' => $old_config['auth_header_name'] ?? '',
'auth_token' => $old_config['auth_token'] ?? '',
'reply_to' => $old_config['reply_to'] ?? '',
),
);

// Convert step type and handler references.
$step['step_type'] = 'system_task';
$step['handler_slugs'] = array( 'system_task' );
$step['handler_configs'] = array( 'system_task' => $new_config );

$changed = true;
}
unset( $step );

if ( $changed ) {
// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching
$wpdb->update(
$table,
array( 'pipeline_config' => wp_json_encode( $pipeline_config ) ),
array( 'pipeline_id' => $row['pipeline_id'] ),
array( '%s' ),
array( '%d' )
);
++$migrated;
}
}

update_option( 'datamachine_agent_ping_pipeline_migrated', true, true );

if ( $migrated > 0 ) {
do_action(
'datamachine_log',
'info',
'Migrated agent_ping steps to system_task steps in pipeline configs',
array( 'pipelines_updated' => $migrated )
);
}
}
Loading