Skip to content

Commit 2ed0960

Browse files
committed
Merge remote-tracking branch 'refs/remotes/github/main' into shobs-series-dot-df
2 parents e41e83d + fbc31ab commit 2ed0960

File tree

17 files changed

+544
-118
lines changed

17 files changed

+544
-118
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,4 @@ repos:
3838
rev: v1.1.1
3939
hooks:
4040
- id: mypy
41-
additional_dependencies: [types-requests]
41+
additional_dependencies: [types-requests, types-tabulate]

README.rst

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,10 +267,9 @@ definition. To view and manage connections, do the following:
267267
3. In the Explorer pane, expand that project and then expand External connections.
268268

269269
BigQuery remote functions are created in the dataset you specify, or
270-
in a dataset with the name ``bigframes_temp_location``, where location is
271-
the location used by the BigQuery DataFrames session. For example,
272-
``bigframes_temp_us_central1``. To view and manage remote functions, do
273-
the following:
270+
in a special type of `hidden dataset <https://cloud.google.com/bigquery/docs/datasets#hidden_datasets>`__
271+
referred to as an anonymous dataset. To view and manage remote functions created
272+
in a user provided dataset, do the following:
274273

275274
1. Go to `BigQuery in the Google Cloud Console <https://console.cloud.google.com/bigquery>`__.
276275
2. Select the project in which you created the remote function.

bigframes/_config/display_options.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class DisplayOptions:
3232
progress_bar: Optional[str] = "auto"
3333
repr_mode: Literal["head", "deferred"] = "head"
3434

35+
max_info_columns: int = 100
36+
max_info_rows: Optional[int] = 200000
37+
memory_usage: bool = True
38+
3539

3640
@contextlib.contextmanager
3741
def pandas_repr(display_options: DisplayOptions):

bigframes/core/blocks.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -389,23 +389,6 @@ def to_pandas(
389389
ordered: bool = True,
390390
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
391391
"""Run query and download results as a pandas DataFrame."""
392-
if max_download_size is None:
393-
max_download_size = bigframes.options.sampling.max_download_size
394-
if sampling_method is None:
395-
sampling_method = (
396-
bigframes.options.sampling.sampling_method
397-
if bigframes.options.sampling.sampling_method is not None
398-
else _UNIFORM
399-
)
400-
if random_state is None:
401-
random_state = bigframes.options.sampling.random_state
402-
403-
sampling_method = sampling_method.lower()
404-
if sampling_method not in _SAMPLING_METHODS:
405-
raise NotImplementedError(
406-
f"The downsampling method {sampling_method} is not implemented, "
407-
f"please choose from {','.join(_SAMPLING_METHODS)}."
408-
)
409392

410393
df, _, query_job = self._compute_and_count(
411394
value_keys=value_keys,
@@ -453,6 +436,28 @@ def _compute_and_count(
453436
) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]:
454437
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
455438
# TODO(swast): Allow for dry run and timeout.
439+
enable_downsampling = (
440+
True
441+
if sampling_method is not None
442+
else bigframes.options.sampling.enable_downsampling
443+
)
444+
445+
max_download_size = (
446+
max_download_size or bigframes.options.sampling.max_download_size
447+
)
448+
449+
random_state = random_state or bigframes.options.sampling.random_state
450+
451+
if sampling_method is None:
452+
sampling_method = bigframes.options.sampling.sampling_method or _UNIFORM
453+
sampling_method = sampling_method.lower()
454+
455+
if sampling_method not in _SAMPLING_METHODS:
456+
raise NotImplementedError(
457+
f"The downsampling method {sampling_method} is not implemented, "
458+
f"please choose from {','.join(_SAMPLING_METHODS)}."
459+
)
460+
456461
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
457462

458463
results_iterator, query_job = expr.start_query(
@@ -469,7 +474,7 @@ def _compute_and_count(
469474
)
470475

471476
if fraction < 1:
472-
if not bigframes.options.sampling.enable_downsampling:
477+
if not enable_downsampling:
473478
raise RuntimeError(
474479
f"The data size ({table_size:.2f} MB) exceeds the maximum download limit of "
475480
f"{max_download_size} MB. You can:\n\t* Enable downsampling in global options:\n"

bigframes/core/indexes/index.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,14 @@ def _block(self) -> blocks.Block:
155155
def T(self) -> Index:
156156
return self.transpose()
157157

158+
def _memory_usage(self) -> int:
159+
(n_rows,) = self.shape
160+
return sum(
161+
self.dtypes.map(
162+
lambda dtype: bigframes.dtypes.DTYPE_BYTE_SIZES.get(dtype, 8) * n_rows
163+
)
164+
)
165+
158166
def transpose(self) -> Index:
159167
return self
160168

@@ -326,7 +334,10 @@ def _apply_aggregation(self, op: agg_ops.AggregateOp) -> typing.Any:
326334

327335
def __getitem__(self, key: int) -> typing.Any:
328336
if isinstance(key, int):
329-
result_pd_df, _ = self._block.slice(key, key + 1, 1).to_pandas()
337+
if key != -1:
338+
result_pd_df, _ = self._block.slice(key, key + 1, 1).to_pandas()
339+
else: # special case, want [-1:] instead of [-1:0]
340+
result_pd_df, _ = self._block.slice(key).to_pandas()
330341
if result_pd_df.empty:
331342
raise IndexError("single positional indexer is out-of-bounds")
332343
return result_pd_df.index[0]

bigframes/dataframe.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import datetime
2020
import re
21+
import sys
2122
import textwrap
2223
import typing
2324
from typing import (
@@ -36,6 +37,7 @@
3637
import google.cloud.bigquery as bigquery
3738
import numpy
3839
import pandas
40+
import tabulate
3941

4042
import bigframes
4143
import bigframes._config.display_options as display_options
@@ -350,6 +352,88 @@ def query_job(self) -> Optional[bigquery.QueryJob]:
350352
self._set_internal_query_job(self._compute_dry_run())
351353
return self._query_job
352354

355+
def memory_usage(self, index: bool = True):
356+
n_rows, _ = self.shape
357+
# like pandas, treat all variable-size objects as just 8-byte pointers, ignoring actual object
358+
column_sizes = self.dtypes.map(
359+
lambda dtype: bigframes.dtypes.DTYPE_BYTE_SIZES.get(dtype, 8) * n_rows
360+
)
361+
if index:
362+
index_size = pandas.Series([self.index._memory_usage()], index=["Index"])
363+
column_sizes = pandas.concat([index_size, column_sizes])
364+
return column_sizes
365+
366+
def info(
367+
self,
368+
verbose: Optional[bool] = None,
369+
buf=None,
370+
max_cols: Optional[int] = None,
371+
memory_usage: Optional[bool] = None,
372+
show_counts: Optional[bool] = None,
373+
):
374+
obuf = buf or sys.stdout
375+
376+
n_rows, n_columns = self.shape
377+
378+
max_cols = (
379+
max_cols
380+
if max_cols is not None
381+
else bigframes.options.display.max_info_columns
382+
)
383+
384+
show_all_columns = verbose if verbose is not None else (n_columns < max_cols)
385+
386+
obuf.write(f"{type(self)}\n")
387+
388+
index_type = "MultiIndex" if self.index.nlevels > 1 else "Index"
389+
390+
# These accessses are kind of expensive, maybe should try to skip?
391+
first_indice = self.index[0]
392+
last_indice = self.index[-1]
393+
obuf.write(f"{index_type}: {n_rows} entries, {first_indice} to {last_indice}\n")
394+
395+
dtype_strings = self.dtypes.astype("string")
396+
if show_all_columns:
397+
obuf.write(f"Data columns (total {n_columns} columns):\n")
398+
column_info = self.columns.to_frame(name="Column")
399+
400+
max_rows = bigframes.options.display.max_info_rows
401+
too_many_rows = n_rows > max_rows if max_rows is not None else False
402+
403+
if show_counts if show_counts is not None else (not too_many_rows):
404+
non_null_counts = self.count().to_pandas()
405+
column_info["Non-Null Count"] = non_null_counts.map(
406+
lambda x: f"{int(x)} non-null"
407+
)
408+
409+
column_info["Dtype"] = dtype_strings
410+
411+
column_info = column_info.reset_index(drop=True)
412+
column_info.index.name = "#"
413+
414+
column_info_formatted = tabulate.tabulate(column_info, headers="keys") # type: ignore
415+
obuf.write(column_info_formatted)
416+
obuf.write("\n")
417+
418+
else: # Just number of columns and first, last
419+
obuf.write(
420+
f"Columns: {n_columns} entries, {self.columns[0]} to {self.columns[-1]}\n"
421+
)
422+
dtype_counts = dtype_strings.value_counts().sort_index(ascending=True).items()
423+
dtype_counts_formatted = ", ".join(
424+
f"{dtype}({count})" for dtype, count in dtype_counts
425+
)
426+
obuf.write(f"dtypes: {dtype_counts_formatted}\n")
427+
428+
show_memory = (
429+
memory_usage
430+
if memory_usage is not None
431+
else bigframes.options.display.memory_usage
432+
)
433+
if show_memory:
434+
# TODO: Convert to different units (kb, mb, etc.)
435+
obuf.write(f"memory usage: {self.memory_usage().sum()} bytes\n")
436+
353437
def _set_internal_query_job(self, query_job: bigquery.QueryJob):
354438
self._query_job = query_job
355439

bigframes/dtypes.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,19 @@
143143
# "string" and "string[pyarrow] are accepted"
144144
BIGFRAMES_STRING_TO_BIGFRAMES["string[pyarrow]"] = pd.StringDtype(storage="pyarrow")
145145

146+
# For the purposes of dataframe.memory_usage
147+
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#data_type_sizes
148+
DTYPE_BYTE_SIZES = {
149+
pd.BooleanDtype(): 1,
150+
pd.Int64Dtype(): 8,
151+
pd.Float32Dtype(): 8,
152+
pd.StringDtype(): 8,
153+
pd.ArrowDtype(pa.time64("us")): 8,
154+
pd.ArrowDtype(pa.timestamp("us")): 8,
155+
pd.ArrowDtype(pa.timestamp("us", tz="UTC")): 8,
156+
pd.ArrowDtype(pa.date32()): 8,
157+
}
158+
146159

147160
def ibis_dtype_to_bigframes_dtype(
148161
ibis_dtype: ibis_dtypes.DataType,

bigframes/remote_function.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def create_bq_remote_function(
188188
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
189189
bq_function_args = []
190190
bq_function_return_type = BigQueryType.from_ibis(output_type)
191+
191192
# We are expecting the input type annotations to be 1:1 with the input args
192193
for idx, name in enumerate(input_args):
193194
bq_function_args.append(
@@ -204,14 +205,22 @@ def create_bq_remote_function(
204205

205206
logger.info(f"Creating BQ remote function: {create_function_ddl}")
206207

207-
# Make sure the dataset exists
208+
# Make sure the dataset exists. I.e. if it doesn't exist, go ahead and
209+
# create it
208210
dataset = bigquery.Dataset(
209211
bigquery.DatasetReference.from_string(
210212
self._bq_dataset, default_project=self._gcp_project_id
211213
)
212214
)
213215
dataset.location = self._bq_location
214-
self._bq_client.create_dataset(dataset, exists_ok=True)
216+
try:
217+
# This check does not require bigquery.datasets.create IAM
218+
# permission. So, if the data set already exists, then user can work
219+
# without having that permission.
220+
self._bq_client.get_dataset(dataset)
221+
except google.api_core.exceptions.NotFound:
222+
# This requires bigquery.datasets.create IAM permission
223+
self._bq_client.create_dataset(dataset, exists_ok=True)
215224

216225
# TODO: Use session._start_query() so we get progress bar
217226
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
@@ -610,7 +619,7 @@ def get_routine_reference(
610619
raise DatasetMissingError
611620

612621
dataset_ref = bigquery.DatasetReference(
613-
bigquery_client.project, session._session_dataset_id
622+
bigquery_client.project, session._anonymous_dataset.dataset_id
614623
)
615624
return dataset_ref.routine(routine_ref_str)
616625

@@ -778,9 +787,7 @@ def remote_function(
778787
dataset, default_project=bigquery_client.project
779788
)
780789
else:
781-
dataset_ref = bigquery.DatasetReference.from_string(
782-
session._session_dataset_id, default_project=bigquery_client.project
783-
)
790+
dataset_ref = session._anonymous_dataset
784791

785792
bq_location, cloud_function_region = get_remote_function_locations(
786793
bigquery_client.location

bigframes/session/__init__.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,6 @@ def cloudfunctionsclient(self):
198198
def resourcemanagerclient(self):
199199
return self._clients_provider.resourcemanagerclient
200200

201-
@property
202-
def _session_dataset_id(self):
203-
"""A dataset for storing temporary objects local to the session
204-
This is a workaround for remote functions that do not
205-
yet support session-temporary instances."""
206-
return self._session_dataset.dataset_id
207-
208201
@property
209202
def _project(self):
210203
return self.bqclient.project
@@ -229,13 +222,6 @@ def _create_bq_datasets(self):
229222
query_destination.dataset_id,
230223
)
231224

232-
# Dataset for storing remote functions, which don't yet
233-
# support proper session temporary storage yet
234-
self._session_dataset = bigquery.Dataset(
235-
f"{self.bqclient.project}.bigframes_temp_{self._location.lower().replace('-', '_')}"
236-
)
237-
self._session_dataset.location = self._location
238-
239225
def close(self):
240226
"""No-op. Temporary resources are deleted after 7 days."""
241227

0 commit comments

Comments
 (0)