Skip to content

Commit ace8d8b

Browse files
authored
fix(query): fix stack overflow errors thrown during serialization (#19040)
1 parent 44a6622 commit ace8d8b

File tree

1 file changed

+97
-1
lines changed

1 file changed

+97
-1
lines changed

src/query/service/src/servers/flight/v1/packets/packet_fragment.rs

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,23 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::any::Any;
16+
use std::collections::HashMap;
17+
use std::collections::VecDeque;
1518
use std::fmt::Debug;
1619
use std::fmt::Formatter;
1720

21+
use serde::de::Error;
22+
use serde::Deserializer;
23+
use serde::Serializer;
24+
25+
use crate::physical_plans::IPhysicalPlan;
1826
use crate::physical_plans::PhysicalPlan;
27+
use crate::physical_plans::PhysicalPlanCast;
28+
use crate::physical_plans::PhysicalPlanMeta;
1929
use crate::servers::flight::v1::exchange::DataExchange;
2030

21-
#[derive(Clone, serde::Serialize, serde::Deserialize)]
31+
#[derive(Clone)]
2232
pub struct QueryFragment {
2333
pub fragment_id: usize,
2434
pub data_exchange: Option<DataExchange>,
@@ -39,6 +49,92 @@ impl QueryFragment {
3949
}
4050
}
4151

52+
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
53+
struct SerializedPhysicalPlanRef(u32);
54+
55+
#[typetag::serde]
56+
impl IPhysicalPlan for SerializedPhysicalPlanRef {
57+
fn as_any(&self) -> &dyn Any {
58+
self
59+
}
60+
61+
fn get_meta(&self) -> &PhysicalPlanMeta {
62+
unimplemented!()
63+
}
64+
65+
fn get_meta_mut(&mut self) -> &mut PhysicalPlanMeta {
66+
unimplemented!()
67+
}
68+
69+
fn derive(&self, _: Vec<PhysicalPlan>) -> PhysicalPlan {
70+
unimplemented!()
71+
}
72+
}
73+
74+
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
75+
struct SerializeQueryFragment {
76+
pub fragment_id: usize,
77+
pub data_exchange: Option<DataExchange>,
78+
pub flatten_plan: VecDeque<PhysicalPlan>,
79+
}
80+
81+
impl serde::Serialize for QueryFragment {
82+
#[recursive::recursive]
83+
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
84+
let mut flatten_queue = vec![self.physical_plan.clone()];
85+
let mut flattened = VecDeque::new();
86+
87+
while let Some(mut plan) = flatten_queue.pop() {
88+
for child in plan.children_mut() {
89+
let id = child.get_id();
90+
let mut new_child = PhysicalPlan::new(SerializedPhysicalPlanRef(id));
91+
std::mem::swap(&mut new_child, child);
92+
flatten_queue.push(new_child);
93+
}
94+
95+
flattened.push_front(plan);
96+
}
97+
98+
let serialize_fragment = SerializeQueryFragment {
99+
fragment_id: self.fragment_id,
100+
data_exchange: self.data_exchange.clone(),
101+
flatten_plan: flattened,
102+
};
103+
104+
serialize_fragment.serialize(serializer)
105+
}
106+
}
107+
108+
impl<'de> serde::Deserialize<'de> for QueryFragment {
109+
#[recursive::recursive]
110+
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
111+
let mut fragment = SerializeQueryFragment::deserialize(deserializer)?;
112+
113+
let mut flatten_storage = HashMap::new();
114+
115+
while let Some(mut plan) = fragment.flatten_plan.pop_front() {
116+
for child in plan.children_mut() {
117+
if let Some(ref_plan) = SerializedPhysicalPlanRef::from_physical_plan(child) {
118+
let Some(mut flatten_plan) = flatten_storage.remove(&ref_plan.0) else {
119+
return Err(D::Error::custom(""));
120+
};
121+
122+
std::mem::swap(child, &mut flatten_plan);
123+
}
124+
}
125+
126+
flatten_storage.insert(plan.get_id(), plan);
127+
}
128+
129+
assert_eq!(flatten_storage.len(), 1);
130+
Ok(QueryFragment {
131+
fragment_id: fragment.fragment_id,
132+
data_exchange: fragment.data_exchange,
133+
physical_plan: flatten_storage.into_values().next().unwrap(),
134+
})
135+
}
136+
}
137+
42138
impl Debug for QueryFragment {
43139
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
44140
f.debug_struct("QueryFragment")

0 commit comments

Comments
 (0)