Skip to content

Commit

Permalink
feat(processing_engine): Allow async plugin execution. (#25994)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse authored Feb 13, 2025
1 parent 8548410 commit b0a2422
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 113 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ pub struct TriggerConfig {
/// Create trigger in disabled state
#[clap(long)]
disabled: bool,
#[clap(long)]
run_asynchronous: bool,
/// Name for the new trigger
trigger_name: String,
}
Expand Down Expand Up @@ -351,14 +353,14 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification,
trigger_arguments,
disabled,
run_asynchronous,
}) => {
let trigger_arguments: Option<HashMap<String, String>> = trigger_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});

//println!("does this work?");
match client
.api_v3_configure_processing_engine_trigger_create(
database_name,
Expand All @@ -367,6 +369,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_specification.string_rep(),
trigger_arguments,
disabled,
run_asynchronous,
)
.await
{
Expand Down
9 changes: 7 additions & 2 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::DistinctCacheDefinition;
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef, PluginType, TriggerDefinition};
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, LastCacheValueColumnsDef, PluginType,
TriggerDefinition, TriggerFlag,
};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
use schema::TIME_DATA_TIMEZONE;
Expand Down Expand Up @@ -153,6 +155,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
trigger_name: trigger.trigger_name,
plugin_filename: trigger.plugin_filename,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
flags: trigger.flags,
trigger_arguments: trigger.trigger_arguments,
disabled: trigger.disabled,
database_name: trigger.database_name,
Expand Down Expand Up @@ -223,6 +226,7 @@ struct ProcessingEngineTriggerSnapshot {
pub plugin_filename: String,
pub database_name: String,
pub trigger_specification: String,
pub flags: Vec<TriggerFlag>,
pub trigger_arguments: Option<HashMap<String, String>>,
pub disabled: bool,
}
Expand Down Expand Up @@ -471,6 +475,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_filename: trigger.plugin_filename.to_string(),
database_name: trigger.database_name.to_string(),
flags: trigger.flags.clone(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
trigger_arguments: trigger.trigger_arguments.clone(),
Expand Down
1 change: 1 addition & 0 deletions influxdb3_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ iox_query_params.workspace = true

# Local deps
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_wal = { path = "../influxdb3_wal" }

# crates.io dependencies
bytes.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use url::Url;

use influxdb3_types::http::*;
pub use influxdb3_types::write::Precision;
use influxdb3_wal::TriggerFlag;

/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -456,6 +457,7 @@ impl Client {
}

/// Make a request to `POST /api/v3/configure/processing_engine_trigger`
#[allow(clippy::too_many_arguments)]
pub async fn api_v3_configure_processing_engine_trigger_create(
&self,
db: impl Into<String> + Send,
Expand All @@ -464,7 +466,13 @@ impl Client {
trigger_spec: impl Into<String> + Send,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
execute_async: bool,
) -> Result<()> {
let flags = if execute_async {
vec![TriggerFlag::ExecuteAsynchronously]
} else {
vec![]
};
let _bytes = self
.send_json_get_bytes(
Method::POST,
Expand All @@ -474,6 +482,7 @@ impl Client {
trigger_name: trigger_name.into(),
plugin_filename: plugin_filename.into(),
trigger_specification: trigger_spec.into(),
flags,
trigger_arguments,
disabled,
}),
Expand Down
1 change: 1 addition & 0 deletions influxdb3_processing_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes.workspace = true
chrono.workspace = true
cron.workspace = true
data_types.workspace = true
futures-util.workspace = true
humantime.workspace = true
hashbrown.workspace = true
hyper.workspace = true
Expand Down
32 changes: 21 additions & 11 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use influxdb3_types::http::{
use influxdb3_wal::PluginType;
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteTriggerDefinition, SnapshotDetails, TriggerDefinition,
TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents, WalFileNotifier, WalOp,
TriggerFlag, TriggerIdentifier, TriggerSpecificationDefinition, Wal, WalContents,
WalFileNotifier, WalOp,
};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
Expand Down Expand Up @@ -345,6 +346,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
Expand All @@ -360,6 +362,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
trigger_name,
plugin_filename,
trigger: trigger_specification,
flags,
trigger_arguments,
disabled,
database_name: db_name.to_string(),
Expand Down Expand Up @@ -449,7 +452,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
query_executor,
sys_event_store: Arc::clone(&self.sys_event_store),
};
let plugin_code = self.read_plugin_code(&trigger.plugin_filename).await?;
let plugin_code = Arc::new(self.read_plugin_code(&trigger.plugin_filename).await?);
match trigger.trigger.plugin_type() {
PluginType::WalRows => {
let rec = self
Expand Down Expand Up @@ -642,13 +645,15 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();

let res =
let res = tokio::task::spawn_blocking(move || {
plugins::run_test_wal_plugin(now, catalog, query_executor, code_string, request)
.unwrap_or_else(|e| WalPluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
errors: vec![e.to_string()],
});
})
})
.await?;

return Ok(res);
}
Expand All @@ -674,13 +679,16 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let code = self.read_plugin_code(&request.filename).await?;
let code_string = code.code().to_string();

let res = plugins::run_test_schedule_plugin(
now,
catalog,
query_executor,
code_string,
request,
)
let res = tokio::task::spawn_blocking(move || {
plugins::run_test_schedule_plugin(
now,
catalog,
query_executor,
code_string,
request,
)
})
.await?
.unwrap_or_else(|e| SchedulePluginTestResponse {
log_lines: vec![],
database_writes: Default::default(),
Expand Down Expand Up @@ -850,6 +858,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
false,
Expand Down Expand Up @@ -935,6 +944,7 @@ mod tests {
"foo",
"test_trigger".to_string(),
file_name,
vec![],
TriggerSpecificationDefinition::AllTablesWalWrite,
None,
true,
Expand Down
4 changes: 3 additions & 1 deletion influxdb3_processing_engine/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use influxdb3_types::http::{
SchedulePluginTestRequest, SchedulePluginTestResponse, WalPluginTestRequest,
WalPluginTestResponse,
};
use influxdb3_wal::TriggerSpecificationDefinition;
use influxdb3_wal::{TriggerFlag, TriggerSpecificationDefinition};
use influxdb3_write::WriteBuffer;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -57,11 +57,13 @@ pub enum ProcessingEngineError {
///
#[async_trait::async_trait]
pub trait ProcessingEngineManager: Debug + Send + Sync + 'static {
#[allow(clippy::too_many_arguments)]
async fn insert_trigger(
&self,
db_name: &str,
trigger_name: String,
plugin_filename: String,
flags: Vec<TriggerFlag>,
trigger_specification: TriggerSpecificationDefinition,
trigger_arguments: Option<HashMap<String, String>>,
disabled: bool,
Expand Down
Loading

0 comments on commit b0a2422

Please sign in to comment.