Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] }
futures = { workspace = true }
zip = { workspace = true }
bytes = { workspace = true }
prost = { workspace = true }
prost = { workspace = true }
tonic = "0.14.2"
log = "0.4.28"
40 changes: 20 additions & 20 deletions crates/cli/src/analyser/flow_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,27 @@ impl Analyser {
));
}

if let Some(identifier) = &flow.input_type_identifier {
if !self.data_type_identifier_exists(identifier, None) {
self.reporter.add(Diagnose::new(
name.clone(),
original.clone(),
DiagnosticKind::UndefinedDataTypeIdentifier {
identifier: identifier.clone(),
},
));
}
if let Some(identifier) = &flow.input_type_identifier
&& !self.data_type_identifier_exists(identifier, None)
{
self.reporter.add(Diagnose::new(
name.clone(),
original.clone(),
DiagnosticKind::UndefinedDataTypeIdentifier {
identifier: identifier.clone(),
},
));
}
if let Some(identifier) = &flow.return_type_identifier {
if !self.data_type_identifier_exists(identifier, None) {
self.reporter.add(Diagnose::new(
name.clone(),
original.clone(),
DiagnosticKind::UndefinedDataTypeIdentifier {
identifier: identifier.clone(),
},
));
}
if let Some(identifier) = &flow.return_type_identifier
&& !self.data_type_identifier_exists(identifier, None)
{
self.reporter.add(Diagnose::new(
name.clone(),
original.clone(),
DiagnosticKind::UndefinedDataTypeIdentifier {
identifier: identifier.clone(),
},
));
}

for setting in &flow.settings {
Expand Down
3 changes: 2 additions & 1 deletion crates/cli/src/command/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod definition;
pub mod download;
pub mod feature;
pub mod push;
pub mod report;
pub mod search;
pub mod watch;
15 changes: 15 additions & 0 deletions crates/cli/src/command/push/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::str::FromStr;
use tonic::metadata::{MetadataMap, MetadataValue};

pub fn get_authorization_metadata(token: &str) -> MetadataMap {
let metadata_value = MetadataValue::from_str(token).unwrap_or_else(|error| {
panic!(
"An error occurred trying to convert runtime_token into metadata: {}",
error
);
});

let mut map = MetadataMap::new();
map.insert("authorization", metadata_value);
map
}
49 changes: 49 additions & 0 deletions crates/cli/src/command/push/data_type_client_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::command::push::auth::get_authorization_metadata;
use tonic::{Extensions, Request, transport::Channel};
use tucana::sagittarius::{
DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest,
data_type_service_client::DataTypeServiceClient,
};
use tucana::shared::DefinitionDataType;

pub struct SagittariusDataTypeServiceClient {
client: DataTypeServiceClient<Channel>,
token: String,
}

impl SagittariusDataTypeServiceClient {
pub async fn new(sagittarius_url: String, token: String) -> Self {
let client = match DataTypeServiceClient::connect(sagittarius_url).await {
Ok(client) => {
log::info!("Successfully connected to Sagittarius DataType Endpoint!");
client
}
Err(err) => panic!(
"Failed to connect to Sagittarius (DataType Endpoint): {:?}",
err
),
};

Self { client, token }
}

pub async fn update_data_types(&mut self, data_types: Vec<DefinitionDataType>) {
let request = Request::from_parts(
get_authorization_metadata(&self.token),
Extensions::new(),
SagittariusDataTypeUpdateRequest { data_types },
);

match self.client.update(request).await {
Ok(response) => {
log::info!(
"Successfully transferred data types. Did Sagittarius updated them? {:?}",
&response
);
}
Err(err) => {
log::error!("Failed to update DataTypes: {:?}", err);
}
};
}
}
49 changes: 49 additions & 0 deletions crates/cli/src/command/push/flow_type_client_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::command::push::auth::get_authorization_metadata;
use tonic::Extensions;
use tonic::Request;
use tonic::transport::Channel;
use tucana::sagittarius::FlowTypeUpdateRequest as SagittariusFlowTypeUpdateRequest;
use tucana::sagittarius::flow_type_service_client::FlowTypeServiceClient;
use tucana::shared::FlowType;

pub struct SagittariusFlowTypeServiceClient {
client: FlowTypeServiceClient<Channel>,
token: String,
}

impl SagittariusFlowTypeServiceClient {
pub async fn new(sagittarius_url: String, token: String) -> Self {
let client = match FlowTypeServiceClient::connect(sagittarius_url).await {
Ok(client) => {
log::info!("Successfully connected to Sagittarius FlowType Endpoint!");
client
}
Err(err) => panic!(
"Failed to connect to Sagittarius (FlowType Endpoint): {:?}",
err
),
};

Self { client, token }
}

pub async fn update_flow_types(&mut self, flow_types: Vec<FlowType>) {
let request = Request::from_parts(
get_authorization_metadata(&self.token),
Extensions::new(),
SagittariusFlowTypeUpdateRequest { flow_types },
);

match self.client.update(request).await {
Ok(response) => {
log::info!(
"Successfully transferred FlowTypes. Did Sagittarius updated them? {:?}",
&response
);
}
Err(err) => {
log::error!("Failed to update FlowTypes: {:?}", err);
}
};
}
}
52 changes: 52 additions & 0 deletions crates/cli/src/command/push/function_client_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::command::push::auth::get_authorization_metadata;
use tonic::Extensions;
use tonic::Request;
use tonic::transport::Channel;
use tucana::sagittarius::RuntimeFunctionDefinitionUpdateRequest as SagittariusRuntimeFunctionUpdateRequest;
use tucana::sagittarius::runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient;
use tucana::shared::RuntimeFunctionDefinition;

pub struct SagittariusRuntimeFunctionServiceClient {
client: RuntimeFunctionDefinitionServiceClient<Channel>,
token: String,
}

impl SagittariusRuntimeFunctionServiceClient {
pub async fn new(sagittarius_url: String, token: String) -> Self {
let client = match RuntimeFunctionDefinitionServiceClient::connect(sagittarius_url).await {
Ok(client) => {
log::info!("Successfully connected to Sagittarius RuntimeFunction Endpoint!");
client
}
Err(err) => panic!(
"Failed to connect to Sagittarius (RuntimeFunction Endpoint): {:?}",
err
),
};

Self { client, token }
}

pub async fn update_runtime_function_definitions(
&mut self,
runtime_functions: Vec<RuntimeFunctionDefinition>,
) {
let request = Request::from_parts(
get_authorization_metadata(&self.token),
Extensions::new(),
SagittariusRuntimeFunctionUpdateRequest { runtime_functions },
);

match self.client.update(request).await {
Ok(response) => {
log::info!(
"Successfully transferred RuntimeFunctions. Did Sagittarius updated them? {:?}",
&response
);
}
Err(err) => {
log::error!("Failed to update RuntimeFunctions: {:?}", err);
}
};
}
}
137 changes: 137 additions & 0 deletions crates/cli/src/command/push/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::analyser::core::Analyser;
use crate::command::push::data_type_client_impl::SagittariusDataTypeServiceClient;
use crate::command::push::flow_type_client_impl::SagittariusFlowTypeServiceClient;
use crate::command::push::function_client_impl::SagittariusRuntimeFunctionServiceClient;
use crate::formatter::{default, info};
use notify::event::ModifyKind;
use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
use std::sync::mpsc::channel;
use std::time::{Duration, Instant};

mod auth;
mod data_type_client_impl;
mod flow_type_client_impl;
mod function_client_impl;

pub async fn push(token: String, url: String, path: Option<String>) {
let dir_path = path.unwrap_or_else(|| "./definitions".to_string());

info(format!("Watching directory: {dir_path}"));
info(String::from("Press Ctrl+C to stop watching..."));

{
Analyser::new(dir_path.as_str()).report(false);
}

// Set up file watcher
let (tx, rx) = channel();
let mut watcher = recommended_watcher(tx).unwrap();
watcher
.watch(std::path::Path::new(&dir_path), RecursiveMode::Recursive)
.unwrap();

let mut last_run = Instant::now();

let mut data_type_client =
SagittariusDataTypeServiceClient::new(url.clone(), token.clone()).await;
let mut flow_type_client =
SagittariusFlowTypeServiceClient::new(url.clone(), token.clone()).await;
let mut function_client = SagittariusRuntimeFunctionServiceClient::new(url, token).await;

loop {
if let Ok(Ok(event)) = rx.recv() {
match event.kind {
EventKind::Modify(modify) => {
if let ModifyKind::Data(_) = modify
&& last_run.elapsed() > Duration::from_millis(500)
{
default(String::from(
"\n\n\n--------------------------------------------------------------------------\n\n",
));
info(String::from("Change detected! Regenerating report..."));
let mut analyzer = Analyser::new(dir_path.as_str());

// No errors when reporter is empty!
if analyzer.reporter.is_empty() {
data_type_client
.update_data_types(
analyzer
.data_types
.iter()
.map(|d| d.definition_data_type.clone())
.collect(),
)
.await;
flow_type_client
.update_flow_types(
analyzer
.flow_types
.iter()
.map(|d| d.flow_type.clone())
.collect(),
)
.await;
function_client
.update_runtime_function_definitions(
analyzer
.functions
.iter()
.map(|d| d.function.clone())
.collect(),
)
.await;
}

analyzer.report(false);

last_run = Instant::now();
}
}
EventKind::Remove(_) => {
if last_run.elapsed() > Duration::from_millis(500) {
default(String::from(
"\n\n\n--------------------------------------------------------------------------\n\n",
));
info(String::from("Change detected! Regenerating report..."));
let mut analyzer = Analyser::new(dir_path.as_str());

// No errors when reporter is empty!
if analyzer.reporter.is_empty() {
data_type_client
.update_data_types(
analyzer
.data_types
.iter()
.map(|d| d.definition_data_type.clone())
.collect(),
)
.await;
flow_type_client
.update_flow_types(
analyzer
.flow_types
.iter()
.map(|d| d.flow_type.clone())
.collect(),
)
.await;
function_client
.update_runtime_function_definitions(
analyzer
.functions
.iter()
.map(|d| d.function.clone())
.collect(),
)
.await;
}

analyzer.report(false);
last_run = Instant::now();
}
}
_ => {}
}
}
}
}
Loading