Skip to content

Commit fb5a40f

Browse files
authored
feat(backend-native): expose lastRefreshTime via /cubesql API (#10206)
* add lastRefreshTime to load result in openapi spec * feat(backend-native): expose lastRefreshTime via /cubesql API * some fixes and improvements
1 parent 89bd72b commit fb5a40f

File tree

7 files changed

+77
-12
lines changed

7 files changed

+77
-12
lines changed

packages/cubejs-api-gateway/openspec.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,8 @@ components:
371371
type: "array"
372372
items:
373373
type: "object"
374+
lastRefreshTime:
375+
type: "string"
374376
V1Error:
375377
type: "object"
376378
required:

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,13 @@ async fn handle_sql_query(
327327
let mut schema_response = Map::new();
328328
schema_response.insert("schema".into(), columns_json);
329329

330+
if let Some(last_refresh_time) = stream_schema.metadata().get("lastRefreshTime") {
331+
schema_response.insert(
332+
"lastRefreshTime".into(),
333+
serde_json::Value::String(last_refresh_time.clone()),
334+
);
335+
}
336+
330337
write_jsonl_message(
331338
channel.clone(),
332339
stream_methods.write.clone(),

packages/cubejs-backend-native/src/orchestrator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct ResultWrapper {
3737
transform_data: TransformDataRequest,
3838
data: Arc<QueryResult>,
3939
transformed_data: Option<TransformedData>,
40+
pub last_refresh_time: Option<String>,
4041
}
4142

4243
impl ResultWrapper {
@@ -114,6 +115,7 @@ impl ResultWrapper {
114115
transform_data: transform_request,
115116
data: query_result,
116117
transformed_data: None,
118+
last_refresh_time: None,
117119
})
118120
}
119121

packages/cubejs-backend-native/src/transport.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ use std::fmt::Display;
55

66
use crate::auth::NativeSQLAuthContext;
77
use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator, ValueFromJs};
8+
use crate::node_obj_deserializer::JsValueDeserializer;
89
use crate::node_obj_serializer::NodeObjSerializer;
910
use crate::orchestrator::ResultWrapper;
1011
use crate::{
1112
auth::TransportRequest, channel::call_js_with_channel_as_callback,
1213
stream::call_js_with_stream_as_callback,
1314
};
1415
use async_trait::async_trait;
16+
use cubeorchestrator::query_result_transform::RequestResultData;
17+
use cubesql::compile::arrow::datatypes::Schema;
1518
use cubesql::compile::engine::df::scan::{
1619
convert_transport_response, transform_response, CacheMode, MemberField, RecordBatch, SchemaRef,
1720
};
@@ -26,7 +29,7 @@ use cubesql::{
2629
transport::{CubeStreamReceiver, LoadRequestMeta, MetaContext, TransportService},
2730
CubeError,
2831
};
29-
use serde::Serialize;
32+
use serde::{Deserialize, Serialize};
3033
use std::sync::Arc;
3134
use uuid::Uuid;
3235

@@ -402,14 +405,37 @@ impl TransportService for NodeBridgeTransport {
402405
.to_vec(cx)
403406
.map_cube_err("Can't convert JS result to array")?;
404407

405-
let native_wrapped_results = js_res_wrapped_vec
406-
.iter()
407-
.map(|r| ResultWrapper::from_js_result_wrapper(cx, *r))
408-
.collect::<Result<Vec<_>, _>>()
408+
let get_root_result_object_method: Handle<JsFunction> =
409+
js_result_wrapped.get(cx, "getRootResultObject").map_cube_err(
410+
"Can't get getRootResultObject method from JS ResultWrapper object",
411+
)?;
412+
413+
let result_data_js_array = get_root_result_object_method
414+
.call(cx, js_result_wrapped.upcast::<JsValue>(), [])
409415
.map_cube_err(
410-
"Can't construct result wrapper from JS ResultWrapper object",
416+
"Error calling getRootResultObject() method of JS ResultWrapper object",
411417
)?;
412418

419+
let result_data_js_vec = result_data_js_array
420+
.downcast::<JsArray, _>(cx)
421+
.map_cube_err("Can't downcast getRootResultObject result to array")?
422+
.to_vec(cx)
423+
.map_cube_err("Can't convert getRootResultObject result to array")?;
424+
425+
let mut native_wrapped_results = Vec::with_capacity(js_res_wrapped_vec.len());
426+
for (js_wrapper, js_result_data) in js_res_wrapped_vec.iter().zip(result_data_js_vec.iter()) {
427+
let mut wrapper = ResultWrapper::from_js_result_wrapper(cx, *js_wrapper)
428+
.map_cube_err("Can't construct result wrapper from JS ResultWrapper object")?;
429+
430+
let deserializer = JsValueDeserializer::new(cx, *js_result_data);
431+
let result_data: RequestResultData = Deserialize::deserialize(deserializer)
432+
.map_cube_err("Can't deserialize RequestResultData from getRootResultObject")?;
433+
434+
wrapper.last_refresh_time = result_data.last_refresh_time;
435+
436+
native_wrapped_results.push(wrapper);
437+
}
438+
413439
Ok(ValueFromJs::ResultWrapper(native_wrapped_results))
414440
} else if let Ok(str) = v.downcast::<JsString, _>(cx) {
415441
Ok(ValueFromJs::String(str.value(cx)))
@@ -482,7 +508,20 @@ impl TransportService for NodeBridgeTransport {
482508
break result_wrappers
483509
.into_iter()
484510
.map(|mut wrapper| {
485-
transform_response(&mut wrapper, schema.clone(), &member_fields)
511+
let updated_schema = if let Some(last_refresh_time) =
512+
wrapper.last_refresh_time.clone()
513+
{
514+
let mut metadata = schema.metadata().clone();
515+
metadata.insert("lastRefreshTime".to_string(), last_refresh_time);
516+
Arc::new(Schema::new_with_metadata(
517+
schema.fields().to_vec(),
518+
metadata,
519+
))
520+
} else {
521+
schema.clone()
522+
};
523+
524+
transform_response(&mut wrapper, updated_schema, &member_fields)
486525
})
487526
.collect::<Result<Vec<_>, _>>();
488527
}

rust/cubesql/cubeclient/src/models/v1_load_result.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub struct V1LoadResult {
2121
pub data: Vec<serde_json::Value>,
2222
#[serde(rename = "refreshKeyValues", skip_serializing_if = "Option::is_none")]
2323
pub refresh_key_values: Option<Vec<serde_json::Value>>,
24+
#[serde(rename = "lastRefreshTime", skip_serializing_if = "Option::is_none")]
25+
pub last_refresh_time: Option<String>,
2426
}
2527

2628
impl V1LoadResult {
@@ -33,6 +35,7 @@ impl V1LoadResult {
3335
annotation: Box::new(annotation),
3436
data,
3537
refresh_key_values: None,
38+
last_refresh_time: None,
3639
}
3740
}
3841
}

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub use datafusion::{
1818
ArrayRef, BooleanBuilder, Date32Builder, DecimalBuilder, Float32Builder,
1919
Float64Builder, Int16Builder, Int32Builder, Int64Builder, NullArray, StringBuilder,
2020
},
21-
datatypes::{DataType, SchemaRef},
21+
datatypes::{DataType, Schema, SchemaRef},
2222
error::{ArrowError, Result as ArrowResult},
2323
record_batch::RecordBatch,
2424
},
@@ -508,12 +508,13 @@ impl ExecutionPlan for CubeScanExecutionPlan {
508508

509509
// For now execute method executes only one query at a time, so we
510510
// take the first result
511+
let rb_schema = response.first().unwrap().schema().clone();
511512
one_shot_stream.data = Some(response.first().unwrap().clone());
512513

513514
Ok(Box::pin(CubeScanStreamRouter::new(
514515
None,
515516
one_shot_stream,
516-
self.schema.clone(),
517+
rb_schema,
517518
)))
518519
}
519520

@@ -1177,7 +1178,18 @@ pub fn convert_transport_response(
11771178
.into_iter()
11781179
.map(|r| {
11791180
let mut response = JsonValueObject::new(r.data.clone());
1180-
transform_response(&mut response, schema.clone(), &member_fields)
1181+
let updated_schema = if let Some(last_refresh_time) = r.last_refresh_time.clone() {
1182+
let mut metadata = schema.metadata().clone();
1183+
metadata.insert("lastRefreshTime".to_string(), last_refresh_time);
1184+
Arc::new(Schema::new_with_metadata(
1185+
schema.fields().to_vec(),
1186+
metadata,
1187+
))
1188+
} else {
1189+
schema.clone()
1190+
};
1191+
1192+
transform_response(&mut response, updated_schema, &member_fields)
11811193
})
11821194
.collect::<std::result::Result<Vec<RecordBatch>, CubeError>>()
11831195
}

rust/cubesql/cubesql/src/transport/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,11 @@ struct MetaCacheBucket {
193193
value: Arc<MetaContext>,
194194
}
195195

196-
/// This transports is used in standalone mode
196+
/// This transport is used in standalone mode
197197
#[derive(Debug)]
198198
pub struct HttpTransport {
199199
/// We use simple cache to improve DX with standalone mode
200-
/// because currently we dont persist DF in the SessionState
200+
/// because currently we don't persist DF in the SessionState,
201201
/// and it causes a lot of HTTP requests which slow down BI connections
202202
cache: RwLockAsync<Option<MetaCacheBucket>>,
203203
}

0 commit comments

Comments
 (0)