3131#include < olp/core/client/OlpClientSettingsFactory.h>
3232#include < olp/core/logging/Log.h>
3333#include < olp/core/porting/make_unique.h>
34+ #include < olp/dataservice/write/VolatileLayerClient.h>
35+ #include < olp/dataservice/write/model/PublishPartitionDataRequest.h>
36+ #include < olp/dataservice/write/model/StartBatchRequest.h>
3437#include < testutils/CustomParameters.hpp>
3538
3639#include " olp/dataservice/read/PartitionsRequest.h"
40+ #include " olp/dataservice/read/PrefetchTileResult.h"
3741#include " olp/dataservice/read/VolatileLayerClient.h"
3842
3943using namespace olp ::dataservice::read;
44+ using namespace olp ::dataservice::write::model;
4045using namespace testing ;
4146
4247namespace {
@@ -46,6 +51,19 @@ const auto kAppSecretEnvName = "dataservice_read_volatile_test_secret";
4651const auto kCatalogEnvName = " dataservice_read_volatile_test_catalog" ;
4752const auto kLayerEnvName = " dataservice_read_volatile_layer" ;
4853
54+ const auto kPrefetchAppId = " dataservice_read_volatile_test_prefetch_appid" ;
55+ const auto kPrefetchAppSecret =
56+ " dataservice_read_volatile_test_prefetch_secret" ;
57+ const auto kPrefetchCatalog = " dataservice_read_volatile_test_prefetch_catalog" ;
58+ const auto kPrefetchLayer = " dataservice_read_volatile_prefetch_layer" ;
59+ const auto kPrefetchTile = " 23618401" ;
60+ const auto kPrefetchSubTile1 = " 23618410" ;
61+ const auto kPrefetchSubTile2 = " 23618406" ;
62+ const auto kPrefetchEndpoint = " endpoint" ;
63+
64+ // The limit for 100 retries is 10 minutes. Therefore, the wait time between
65+ // retries is 6 seconds.
66+ const auto kWaitBeforeRetry = std::chrono::seconds(6 );
4967const auto kTimeout = std::chrono::seconds(5 );
5068
5169class VolatileLayerClientTest : public ::testing::Test {
@@ -69,6 +87,27 @@ class VolatileLayerClientTest : public ::testing::Test {
6987 olp::cache::CacheSettings cache_settings;
7088 settings_.cache = olp::client::OlpClientSettingsFactory::CreateDefaultCache (
7189 cache_settings);
90+
91+ // prefetch setup
92+ auto prefetch_app_id = CustomParameters::getArgument (kPrefetchAppId );
93+ auto prefetch_secret = CustomParameters::getArgument (kPrefetchAppSecret );
94+ prefetch_catalog_ = CustomParameters::getArgument (kPrefetchCatalog );
95+ prefetch_layer_ = CustomParameters::getArgument (kPrefetchLayer );
96+
97+ olp::authentication::Settings prefetch_auth_settings (
98+ {prefetch_app_id, prefetch_secret});
99+ prefetch_auth_settings.token_endpoint_url =
100+ CustomParameters::getArgument (kPrefetchEndpoint );
101+ prefetch_auth_settings.network_request_handler = network;
102+
103+ olp::client::AuthenticationSettings prefetch_auth_client_settings;
104+ prefetch_auth_client_settings.provider =
105+ olp::authentication::TokenProviderDefault (prefetch_auth_settings);
106+
107+ prefetch_settings_.authentication_settings = prefetch_auth_client_settings;
108+ prefetch_settings_.network_request_handler = network;
109+ prefetch_settings_.task_scheduler =
110+ olp::client::OlpClientSettingsFactory::CreateDefaultTaskScheduler (1 );
72111 }
73112
74113 void TearDown () override {
@@ -77,6 +116,7 @@ class VolatileLayerClientTest : public ::testing::Test {
77116 // anywhere. Also network is still used in authentication settings and in
78117 // TokenProvider internally so it needs to be cleared.
79118 settings_ = olp::client::OlpClientSettings ();
119+ prefetch_settings_ = olp::client::OlpClientSettings ();
80120 EXPECT_EQ (network.use_count (), 1 );
81121 }
82122
@@ -88,8 +128,79 @@ class VolatileLayerClientTest : public ::testing::Test {
88128 return CustomParameters::getArgument (kLayerEnvName );
89129 }
90130
131+ void WritePrefetchTilesData () {
132+ auto hrn = olp::client::HRN{prefetch_catalog_};
133+ // write desired partitions into the layer
134+ olp::dataservice::write::VolatileLayerClient write_client (
135+ hrn, prefetch_settings_);
136+
137+ {
138+ auto batch_request = StartBatchRequest ().WithLayers ({prefetch_layer_});
139+
140+ auto response = write_client.StartBatch (batch_request).GetFuture ().get ();
141+
142+ EXPECT_TRUE (response.IsSuccessful ());
143+ ASSERT_TRUE (response.GetResult ().GetId ());
144+ ASSERT_NE (" " , response.GetResult ().GetId ().value ());
145+
146+ std::vector<PublishPartitionDataRequest> partition_requests;
147+ PublishPartitionDataRequest partition_request;
148+ partition_requests.push_back (
149+ partition_request.WithLayerId (prefetch_layer_)
150+ .WithPartitionId (kPrefetchTile ));
151+ partition_requests.push_back (
152+ partition_request.WithPartitionId (kPrefetchSubTile1 ));
153+ partition_requests.push_back (
154+ partition_request.WithPartitionId (kPrefetchSubTile2 ));
155+
156+ auto publish_to_batch_response =
157+ write_client.PublishToBatch (response.GetResult (), partition_requests)
158+ .GetFuture ()
159+ .get ();
160+ EXPECT_TRUE (publish_to_batch_response.IsSuccessful ());
161+
162+ // publish data blobs
163+ std::vector<unsigned char > data = {1 , 2 , 3 };
164+ auto data_ptr = std::make_shared<std::vector<unsigned char >>(data);
165+ for (auto & partition : partition_requests) {
166+ partition.WithData (data_ptr);
167+ auto publish_data_response =
168+ write_client.PublishPartitionData (partition).GetFuture ().get ();
169+ EXPECT_TRUE (publish_data_response.IsSuccessful ());
170+ }
171+
172+ auto complete_batch_response =
173+ write_client.CompleteBatch (response.GetResult ()).GetFuture ().get ();
174+ EXPECT_TRUE (complete_batch_response.IsSuccessful ());
175+
176+ olp::dataservice::write::GetBatchResponse get_batch_response;
177+ for (int i = 0 ; i < 100 ; ++i) {
178+ get_batch_response =
179+ write_client.GetBatch (response.GetResult ()).GetFuture ().get ();
180+
181+ EXPECT_TRUE (get_batch_response.IsSuccessful ());
182+ ASSERT_EQ (response.GetResult ().GetId ().value (),
183+ get_batch_response.GetResult ().GetId ().value ());
184+ if (get_batch_response.GetResult ().GetDetails ()->GetState () !=
185+ " succeeded" ) {
186+ ASSERT_EQ (" submitted" ,
187+ get_batch_response.GetResult ().GetDetails ()->GetState ());
188+ std::this_thread::sleep_for (kWaitBeforeRetry );
189+ } else {
190+ break ;
191+ }
192+ }
193+
194+ ASSERT_EQ (" succeeded" ,
195+ get_batch_response.GetResult ().GetDetails ()->GetState ());
196+ }
197+ }
198+
91199 protected:
92200 olp::client::OlpClientSettings settings_;
201+ olp::client::OlpClientSettings prefetch_settings_;
202+ std::string prefetch_catalog_;
203+ std::string prefetch_layer_;
93204};
94205
95206TEST_F (VolatileLayerClientTest, GetPartitions) {
@@ -224,4 +335,100 @@ TEST_F(VolatileLayerClientTest, GetPartitionsDifferentFetchOptions) {
224335 }
225336}
226337
338+ TEST_F (VolatileLayerClientTest, Prefetch) {
339+ WritePrefetchTilesData ();
340+
341+ olp::client::HRN hrn (prefetch_catalog_);
342+ VolatileLayerClient client (hrn, prefetch_layer_, prefetch_settings_);
343+ {
344+ SCOPED_TRACE (" Prefetch tiles online and store them in memory cache" );
345+ std::vector<olp::geo::TileKey> tile_keys = {
346+ olp::geo::TileKey::FromHereTile (kPrefetchTile )};
347+ std::vector<olp::geo::TileKey> expected_tile_keys = {
348+ olp::geo::TileKey::FromHereTile (kPrefetchTile ),
349+ olp::geo::TileKey::FromHereTile (kPrefetchSubTile1 ),
350+ olp::geo::TileKey::FromHereTile (kPrefetchSubTile2 )};
351+
352+ auto request = olp::dataservice::read::PrefetchTilesRequest ()
353+ .WithTileKeys (tile_keys)
354+ .WithMinLevel (10 )
355+ .WithMaxLevel (12 );
356+
357+ auto future = client.PrefetchTiles (request).GetFuture ();
358+
359+ ASSERT_NE (future.wait_for (kTimeout ), std::future_status::timeout);
360+ PrefetchTilesResponse response = future.get ();
361+ EXPECT_TRUE (response.IsSuccessful ());
362+ ASSERT_FALSE (response.GetResult ().empty ());
363+
364+ const auto & result = response.GetResult ();
365+
366+ for (auto tile_result : result) {
367+ EXPECT_TRUE (tile_result->IsSuccessful ());
368+ ASSERT_TRUE (tile_result->tile_key_ .IsValid ());
369+ auto it = std::find (expected_tile_keys.begin (), expected_tile_keys.end (),
370+ tile_result->tile_key_ );
371+ ASSERT_NE (it, expected_tile_keys.end ());
372+ }
373+
374+ ASSERT_EQ (expected_tile_keys.size (), result.size ());
375+ }
376+
377+ {
378+ SCOPED_TRACE (" min/max levels are 0" );
379+
380+ std::vector<olp::geo::TileKey> tile_keys = {
381+ olp::geo::TileKey::FromHereTile (kPrefetchTile )};
382+ auto request = olp::dataservice::read::PrefetchTilesRequest ()
383+ .WithTileKeys (tile_keys)
384+ .WithMinLevel (0 )
385+ .WithMaxLevel (0 );
386+
387+ auto future = client.PrefetchTiles (request).GetFuture ();
388+
389+ ASSERT_NE (future.wait_for (kTimeout ), std::future_status::timeout);
390+ PrefetchTilesResponse response = future.get ();
391+ EXPECT_TRUE (response.IsSuccessful ());
392+ const auto & result = response.GetResult ();
393+
394+ for (auto tile_result : result) {
395+ EXPECT_TRUE (tile_result->IsSuccessful ());
396+ ASSERT_TRUE (tile_result->tile_key_ .IsValid ());
397+ auto it =
398+ std::find (tile_keys.begin (), tile_keys.end (), tile_result->tile_key_ );
399+ ASSERT_NE (it, tile_keys.end ());
400+ }
401+
402+ ASSERT_EQ (tile_keys.size (), result.size ());
403+ }
404+
405+ {
406+ SCOPED_TRACE (" min/max levels are equal" );
407+
408+ std::vector<olp::geo::TileKey> tile_keys = {
409+ olp::geo::TileKey::FromHereTile (kPrefetchTile )};
410+ auto request = olp::dataservice::read::PrefetchTilesRequest ()
411+ .WithTileKeys (tile_keys)
412+ .WithMinLevel (12 )
413+ .WithMaxLevel (12 );
414+
415+ auto future = client.PrefetchTiles (request).GetFuture ();
416+
417+ ASSERT_NE (future.wait_for (kTimeout ), std::future_status::timeout);
418+ PrefetchTilesResponse response = future.get ();
419+ EXPECT_TRUE (response.IsSuccessful ());
420+ const auto & result = response.GetResult ();
421+
422+ for (auto tile_result : result) {
423+ EXPECT_TRUE (tile_result->IsSuccessful ());
424+ ASSERT_TRUE (tile_result->tile_key_ .IsValid ());
425+ auto it =
426+ std::find (tile_keys.begin (), tile_keys.end (), tile_result->tile_key_ );
427+ ASSERT_NE (it, tile_keys.end ());
428+ }
429+
430+ ASSERT_EQ (tile_keys.size (), result.size ());
431+ }
432+ }
433+
227434} // namespace
0 commit comments