iface) {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> iface) {
+ return false;
+ }
+
+ // --- Helper Methods ---
+
+ /**
+ * Determines the effective catalog and schema pattern to use for metadata retrieval.
+ *
+ * This method applies the logic for the {@code FilterTablesOnDefaultDataset} connection
+ * property. If this property is enabled and the provided {@code catalog} or {@code schemaPattern}
+ * are null, empty, or wildcard ('%'), they may be overridden by the default catalog (project) and
+ * default dataset (schema) configured in the {@link BigQueryConnection}.
+ *
+ * @param catalog The catalog name provided by the user; may be {@code null}.
+ * @param schemaPattern The schema name pattern provided by the user; may be {@code null}.
+ * @return A {@link Tuple} where {@code Tuple.x()} is the effective catalog string and {@code
+ * Tuple.y()} is the effective schema pattern string. These are the values that should be used
+ * for querying BigQuery's metadata.
+ * @see BigQueryConnection#isFilterTablesOnDefaultDataset()
+ */
+ private Tuple determineEffectiveCatalogAndSchema(
+ String catalog, String schemaPattern) {
+ String effectiveCatalog = catalog;
+ String effectiveSchemaPattern = schemaPattern;
+
+ if (this.connection.isFilterTablesOnDefaultDataset()
+ && this.connection.getDefaultDataset() != null
+ && this.connection.getDefaultDataset().getDataset() != null
+ && !this.connection.getDefaultDataset().getDataset().isEmpty()) {
+
+ String defaultProjectFromConnection = this.connection.getCatalog();
+ // We only use the dataset part of the DefaultDataset for schema filtering
+ String defaultSchemaFromConnection = this.connection.getDefaultDataset().getDataset();
+
+ boolean catalogIsNullOrEmptyOrWildcard =
+ (catalog == null || catalog.isEmpty() || catalog.equals("%"));
+ boolean schemaPatternIsNullOrEmptyOrWildcard =
+ (schemaPattern == null || schemaPattern.isEmpty() || schemaPattern.equals("%"));
+
+ final String logPrefix = "FilterTablesOnDefaultDatasetTrue: ";
+ if (catalogIsNullOrEmptyOrWildcard && schemaPatternIsNullOrEmptyOrWildcard) {
+ effectiveCatalog = defaultProjectFromConnection;
+ effectiveSchemaPattern = defaultSchemaFromConnection;
+ LOG.info(
+ String.format(
+ logPrefix + "Using default catalog '%s' and default dataset '%s'.",
+ effectiveCatalog,
+ effectiveSchemaPattern));
+ } else if (catalogIsNullOrEmptyOrWildcard) {
+ effectiveCatalog = defaultProjectFromConnection;
+ LOG.info(
+ String.format(
+ logPrefix
+ + "Using default catalog '%s' with user dataset '%s'. Default dataset '%s' ignored.",
+ effectiveCatalog,
+ effectiveSchemaPattern,
+ defaultSchemaFromConnection));
+ } else if (schemaPatternIsNullOrEmptyOrWildcard) {
+ effectiveSchemaPattern = defaultSchemaFromConnection;
+ LOG.info(
+ String.format(
+ logPrefix + "Using user catalog '%s' and default dataset '%s'.",
+ effectiveCatalog,
+ effectiveSchemaPattern));
+ } else {
+ LOG.info(
+ String.format(
+ logPrefix
+ + "Using user catalog '%s' and schema '%s'. Default dataset '%s' ignored.",
+ effectiveCatalog,
+ effectiveSchemaPattern,
+ defaultSchemaFromConnection));
+ }
+ }
+ return Tuple.of(effectiveCatalog, effectiveSchemaPattern);
+ }
+
+ private ColumnTypeInfo getColumnTypeInfoForSqlType(StandardSQLTypeName bqType) {
+ if (bqType == null) {
+ LOG.warning("Null BigQuery type encountered: " + bqType.name() + ". Mapping to VARCHAR.");
+ return new ColumnTypeInfo(Types.VARCHAR, bqType.name(), null, null, null);
+ }
+
+ switch (bqType) {
+ case INT64:
+ return new ColumnTypeInfo(Types.BIGINT, "BIGINT", 19, 0, 10);
+ case BOOL:
+ return new ColumnTypeInfo(Types.BOOLEAN, "BOOLEAN", 1, null, null);
+ case FLOAT64:
+ return new ColumnTypeInfo(Types.DOUBLE, "DOUBLE", 15, null, 10);
+ case NUMERIC:
+ return new ColumnTypeInfo(Types.NUMERIC, "NUMERIC", 38, 9, 10);
+ case BIGNUMERIC:
+ return new ColumnTypeInfo(Types.NUMERIC, "NUMERIC", 77, 38, 10);
+ case STRING:
+ return new ColumnTypeInfo(Types.NVARCHAR, "NVARCHAR", null, null, null);
+ case TIMESTAMP:
+ case DATETIME:
+ return new ColumnTypeInfo(Types.TIMESTAMP, "TIMESTAMP", 29, null, null);
+ case DATE:
+ return new ColumnTypeInfo(Types.DATE, "DATE", 10, null, null);
+ case TIME:
+ return new ColumnTypeInfo(Types.TIME, "TIME", 15, null, null);
+ case GEOGRAPHY:
+ case JSON:
+ case INTERVAL:
+ return new ColumnTypeInfo(Types.VARCHAR, "VARCHAR", null, null, null);
+ case BYTES:
+ return new ColumnTypeInfo(Types.VARBINARY, "VARBINARY", null, null, null);
+ case STRUCT:
+ return new ColumnTypeInfo(Types.STRUCT, "STRUCT", null, null, null);
+ default:
+ LOG.warning(
+ "Unknown BigQuery type encountered: " + bqType.name() + ". Mapping to VARCHAR.");
+ return new ColumnTypeInfo(Types.VARCHAR, bqType.name(), null, null, null);
+ }
+ }
+
+ List findMatchingBigQueryObjects(
+ String objectTypeName,
+ Supplier> listAllOperation,
+ Function getSpecificOperation,
+ Function nameExtractor,
+ String pattern,
+ Pattern regex,
+ BigQueryJdbcCustomLogger logger) {
+
+ boolean needsList = needsListing(pattern);
+ List resultList = new ArrayList<>();
+
+ try {
+ Iterable objects;
+ if (needsList) {
+ logger.info(
+ String.format(
+ "Listing all %ss (pattern: %s)...",
+ objectTypeName, pattern == null ? "" : pattern));
+ Page firstPage = listAllOperation.get();
+ objects = firstPage.iterateAll();
+ logger.fine(
+ String.format(
+ "Retrieved initial %s list, iterating & filtering if needed...", objectTypeName));
+
+ } else {
+ logger.info(String.format("Getting specific %s: '%s'", objectTypeName, pattern));
+ T specificObject = getSpecificOperation.apply(pattern);
+ objects =
+ (specificObject == null)
+ ? Collections.emptyList()
+ : Collections.singletonList(specificObject);
+ if (specificObject == null) {
+ logger.info(String.format("Specific %s not found: '%s'", objectTypeName, pattern));
+ }
+ }
+
+ boolean wasListing = needsList;
+ for (T obj : objects) {
+ if (Thread.currentThread().isInterrupted()) {
+ logger.warning("Thread interrupted during " + objectTypeName + " processing loop.");
+ throw new InterruptedException(
+ "Interrupted during " + objectTypeName + " processing loop");
+ }
+ if (obj != null) {
+ if (wasListing && regex != null) {
+ String name = nameExtractor.apply(obj);
+ if (name != null && regex.matcher(name).matches()) {
+ resultList.add(obj);
+ }
+ } else {
+ resultList.add(obj);
+ }
+ }
+ }
+
+ } catch (BigQueryException e) {
+ if (!needsList && e.getCode() == 404) {
+ logger.info(String.format("%s '%s' not found (API error 404).", objectTypeName, pattern));
+ } else {
+ logger.warning(
+ String.format(
+ "BigQueryException finding %ss for pattern '%s': %s (Code: %d)",
+ objectTypeName, pattern, e.getMessage(), e.getCode()));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warning("Interrupted while finding " + objectTypeName + "s.");
+ } catch (Exception e) {
+ logger.severe(
+ String.format(
+ "Unexpected exception finding %ss for pattern '%s': %s",
+ objectTypeName, pattern, e.getMessage()));
+ }
+ return resultList;
+ }
+
+ private static class TypeInfoRowData {
+ String typeName;
+ int jdbcType;
+ Long precision;
+ String literalPrefix;
+ String literalSuffix;
+ String createParams;
+ int nullable;
+ boolean caseSensitive;
+ int searchable;
+ boolean unsignedAttribute;
+ boolean fixedPrecScale;
+ boolean autoIncrement;
+ String localTypeName;
+ Long minimumScale;
+ Long maximumScale;
+ Long numPrecRadix;
+
+ TypeInfoRowData(
+ String typeName,
+ int jdbcType,
+ Long precision,
+ String literalPrefix,
+ String literalSuffix,
+ String createParams,
+ int nullable,
+ boolean caseSensitive,
+ int searchable,
+ boolean unsignedAttribute,
+ boolean fixedPrecScale,
+ boolean autoIncrement,
+ String localTypeName,
+ Long minimumScale,
+ Long maximumScale,
+ Long numPrecRadix) {
+ this.typeName = typeName;
+ this.jdbcType = jdbcType;
+ this.precision = precision;
+ this.literalPrefix = literalPrefix;
+ this.literalSuffix = literalSuffix;
+ this.createParams = createParams;
+ this.nullable = nullable;
+ this.caseSensitive = caseSensitive;
+ this.searchable = searchable;
+ this.unsignedAttribute = unsignedAttribute;
+ this.fixedPrecScale = fixedPrecScale;
+ this.autoIncrement = autoIncrement;
+ this.localTypeName = localTypeName;
+ this.minimumScale = minimumScale;
+ this.maximumScale = maximumScale;
+ this.numPrecRadix = numPrecRadix;
+ }
+ }
+
+ void sortResults(
+ List collectedResults,
+ Comparator comparator,
+ String operationName,
+ BigQueryJdbcCustomLogger logger) {
+
+ if (collectedResults == null || collectedResults.isEmpty()) {
+ logger.info(String.format("No results collected for %s, skipping sort.", operationName));
+ return;
+ }
+ if (comparator == null) {
+ logger.info(String.format("No comparator provided for %s, skipping sort.", operationName));
+ return;
+ }
+
+ logger.info(
+ String.format(
+ "Sorting %d collected %s results...", collectedResults.size(), operationName));
+ try {
+ collectedResults.sort(comparator);
+ logger.info(String.format("%s result sorting completed.", operationName));
+ } catch (Exception e) {
+ logger.severe(
+ String.format("Error during sorting %s results: %s", operationName, e.getMessage()));
+ }
+ }
+
+ private List defineBasePrivilegeFields() {
+ List fields = new ArrayList<>(7);
+ fields.add(
+ Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build());
+ fields.add(
+ Field.newBuilder("TABLE_SCHEM", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build());
+ fields.add(
+ Field.newBuilder("TABLE_NAME", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.REQUIRED)
+ .build());
+ fields.add(
+ Field.newBuilder("GRANTOR", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build());
+ fields.add(
+ Field.newBuilder("GRANTEE", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.REQUIRED)
+ .build());
+ fields.add(
+ Field.newBuilder("PRIVILEGE", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.REQUIRED)
+ .build());
+ fields.add(
+ Field.newBuilder("IS_GRANTABLE", StandardSQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build());
+ return fields;
+ }
+
+ Pattern compileSqlLikePattern(String sqlLikePattern) {
+ if (sqlLikePattern == null) {
+ return null;
+ }
+ if (sqlLikePattern.isEmpty()) {
+ return Pattern.compile("(?!)");
+ }
+ StringBuilder regex = new StringBuilder(sqlLikePattern.length() * 2);
+ regex.append('^');
+ for (int i = 0; i < sqlLikePattern.length(); i++) {
+ char c = sqlLikePattern.charAt(i);
+ switch (c) {
+ case '%':
+ regex.append(".*");
+ break;
+ case '_':
+ regex.append('.');
+ break;
+ case '\\':
+ case '.':
+ case '[':
+ case ']':
+ case '(':
+ case ')':
+ case '{':
+ case '}':
+ case '*':
+ case '+':
+ case '?':
+ case '^':
+ case '$':
+ case '|':
+ regex.append('\\').append(c);
+ break;
+ default:
+ regex.append(c);
+ break;
+ }
+ }
+ regex.append('$');
+ return Pattern.compile(regex.toString(), Pattern.CASE_INSENSITIVE);
+ }
+
+ boolean needsListing(String pattern) {
+ return pattern == null || pattern.contains("%") || pattern.contains("_");
+ }
+
+ FieldValue createStringFieldValue(String value) {
+ return FieldValue.of(FieldValue.Attribute.PRIMITIVE, value);
+ }
+
+ FieldValue createLongFieldValue(Long value) {
+ return (value == null)
+ ? FieldValue.of(FieldValue.Attribute.PRIMITIVE, null)
+ : FieldValue.of(FieldValue.Attribute.PRIMITIVE, String.valueOf(value));
+ }
+
+ FieldValue createNullFieldValue() {
+ return FieldValue.of(FieldValue.Attribute.PRIMITIVE, null);
+ }
+
+ FieldValue createBooleanFieldValue(Boolean value) {
+ return (value == null)
+ ? FieldValue.of(FieldValue.Attribute.PRIMITIVE, null)
+ : FieldValue.of(FieldValue.Attribute.PRIMITIVE, value ? "1" : "0");
+ }
+
+ private String getStringValueOrNull(FieldValueList fvl, int index) {
+ if (fvl == null || index < 0 || index >= fvl.size()) return null;
+ FieldValue fv = fvl.get(index);
+ return (fv == null || fv.isNull()) ? null : fv.getStringValue();
+ }
+
+ private Long getLongValueOrNull(FieldValueList fvl, int index) {
+ if (fvl == null || index < 0 || index >= fvl.size()) return null;
+ FieldValue fv = fvl.get(index);
+ try {
+ return (fv == null || fv.isNull()) ? null : fv.getLongValue();
+ } catch (NumberFormatException e) {
+ LOG.warning("Could not parse Long value for index " + index);
+ return null;
+ }
+ }
+
+ private void waitForTasksCompletion(List> taskFutures) {
+ LOG.info(String.format("Waiting for %d submitted tasks to complete...", taskFutures.size()));
+ for (Future> future : taskFutures) {
+ try {
+ if (!future.isCancelled()) {
+ future.get();
+ }
+ } catch (CancellationException e) {
+ LOG.warning("A table processing task was cancelled.");
+ } catch (ExecutionException e) {
+ LOG.severe(
+ String.format(
+ "Error executing table processing task: %s",
+ (e.getCause() != null ? e.getCause().getMessage() : e.getMessage())));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warning(
+ "Fetcher thread interrupted while waiting for tasks. Attempting to cancel remaining"
+ + " tasks.");
+ taskFutures.forEach(f -> f.cancel(true));
+ break;
+ }
+ }
+ LOG.info("Finished waiting for tasks.");
+ }
+
+ private void populateQueue(
+ List collectedResults,
+ BlockingQueue queue,
+ FieldList resultSchemaFields) {
+ LOG.info(String.format("Populating queue with %d results...", collectedResults.size()));
+ try {
+ for (FieldValueList sortedRow : collectedResults) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warning("Interrupted during queue population.");
+ break;
+ }
+ queue.put(BigQueryFieldValueListWrapper.of(resultSchemaFields, sortedRow));
+ }
+ LOG.info("Finished populating queue.");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warning("Interrupted while putting row onto queue.");
+ } catch (Exception e) {
+ LOG.severe("Unexpected error populating queue: " + e.getMessage());
+ }
+ }
+
+ private void signalEndOfData(
+ BlockingQueue queue, FieldList resultSchemaFields) {
+ try {
+ LOG.info("Adding end signal to queue.");
+ queue.put(BigQueryFieldValueListWrapper.of(resultSchemaFields, null, true));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warning("Interrupted while sending end signal to queue.");
+ } catch (Exception e) {
+ LOG.severe("Exception while sending end signal to queue: " + e.getMessage());
+ }
+ }
+
+ private void shutdownExecutor(ExecutorService executor) {
+ if (executor == null || executor.isShutdown()) {
+ return;
+ }
+ LOG.info("Shutting down column executor service...");
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warning("Executor did not terminate gracefully after 10s, forcing shutdownNow().");
+ List droppedTasks = executor.shutdownNow();
+ LOG.warning(
+ "Executor shutdownNow() initiated. Dropped tasks count: " + droppedTasks.size());
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.severe("Executor did not terminate even after shutdownNow().");
+ }
+ }
+ LOG.info("Executor shutdown complete.");
+ } catch (InterruptedException ie) {
+ LOG.warning(
+ "Interrupted while waiting for executor termination. Forcing shutdownNow() again.");
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private String getCurrentCatalogName() {
+ return this.connection.getCatalog();
+ }
+
+ private List getAccessibleCatalogNames() {
+ Set accessibleCatalogs = new HashSet<>();
+ String primaryCatalog = getCurrentCatalogName();
+ if (primaryCatalog != null && !primaryCatalog.isEmpty()) {
+ accessibleCatalogs.add(primaryCatalog);
+ }
+
+ List