Skip to content

Commit 19b283e

Browse files
committed
[flink] Remove useless classloader in FlinkCatalog
1 parent 452e6d3 commit 19b283e

File tree

3 files changed

+12
-26
lines changed

3 files changed

+12
-26
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
9494
import org.apache.flink.table.catalog.TableChange.ResetOption;
9595
import org.apache.flink.table.catalog.TableChange.SetOption;
96+
import org.apache.flink.table.catalog.UniqueConstraint;
9697
import org.apache.flink.table.catalog.WatermarkSpec;
9798
import org.apache.flink.table.catalog.exceptions.CatalogException;
9899
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -179,23 +180,16 @@ public class FlinkCatalog extends AbstractCatalog {
179180

180181
private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class);
181182

182-
private final ClassLoader classLoader;
183183
private final Catalog catalog;
184184
private final String name;
185185

186186
private final boolean disableCreateTableInDefaultDatabase;
187187

188-
public FlinkCatalog(
189-
Catalog catalog,
190-
String name,
191-
String defaultDatabase,
192-
ClassLoader classLoader,
193-
Options options) {
188+
public FlinkCatalog(Catalog catalog, String name, String defaultDatabase, Options options) {
194189
super(name, defaultDatabase);
195190
LOG.info("Creating Flink catalog: metastore={}", options.get(CatalogOptions.METASTORE));
196191
this.catalog = catalog;
197192
this.name = name;
198-
this.classLoader = classLoader;
199193
this.disableCreateTableInDefaultDatabase = options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
200194
if (!disableCreateTableInDefaultDatabase) {
201195
try {
@@ -248,9 +242,9 @@ public void createDatabase(String name, CatalogDatabase database, boolean ignore
248242
Map<String, String> properties;
249243
if (database != null) {
250244
properties = new HashMap<>(database.getProperties());
251-
if (database.getDescription().isPresent()
252-
&& !database.getDescription().get().equals("")) {
253-
properties.put(COMMENT_PROP, database.getDescription().get());
245+
Optional<String> description = database.getDescription();
246+
if (description.isPresent() && !description.get().isEmpty()) {
247+
properties.put(COMMENT_PROP, description.get());
254248
}
255249
} else {
256250
properties = Collections.emptyMap();
@@ -941,7 +935,7 @@ private CatalogBaseTable toCatalogTable(Table table) {
941935
}
942936

943937
// add primary keys
944-
if (table.primaryKeys().size() > 0) {
938+
if (!table.primaryKeys().isEmpty()) {
945939
builder.primaryKey(table.primaryKeys());
946940
}
947941

@@ -1012,6 +1006,7 @@ private byte[] decodeRefreshHandlerBytes(String refreshHandlerBytes) {
10121006
}
10131007

10141008
public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
1009+
@SuppressWarnings("unchecked")
10151010
ResolvedSchema schema =
10161011
((ResolvedCatalogBaseTable<CatalogBaseTable>) catalogTable).getResolvedSchema();
10171012
RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
@@ -1037,7 +1032,7 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
10371032
.options(options)
10381033
.primaryKey(
10391034
schema.getPrimaryKey()
1040-
.map(pk -> pk.getColumns())
1035+
.map(UniqueConstraint::getColumns)
10411036
.orElse(Collections.emptyList()))
10421037
.partitionKeys(getPartitionKeys(catalogTable));
10431038
Map<String, String> columnComments = getColumnComments(catalogTable);
@@ -1185,7 +1180,7 @@ private Table getPaimonTable(ObjectPath tablePath) throws TableNotExistException
11851180
private List<PartitionEntry> getPartitionEntries(
11861181
Table table, ObjectPath tablePath, @Nullable CatalogPartitionSpec partitionSpec)
11871182
throws TableNotPartitionedException {
1188-
if (table.partitionKeys() == null || table.partitionKeys().size() == 0) {
1183+
if (table.partitionKeys() == null || table.partitionKeys().isEmpty()) {
11891184
throw new TableNotPartitionedException(getName(), tablePath);
11901185
}
11911186

@@ -1267,7 +1262,7 @@ public final boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
12671262
throws CatalogException {
12681263
try {
12691264
List<CatalogPartitionSpec> partitionSpecs = getPartitionSpecs(tablePath, partitionSpec);
1270-
return partitionSpecs.size() > 0;
1265+
return !partitionSpecs.isEmpty();
12711266
} catch (TableNotPartitionedException | TableNotExistException e) {
12721267
throw new CatalogException(e);
12731268
}
@@ -1444,9 +1439,7 @@ public final void alterFunction(
14441439
}
14451440
} catch (Catalog.FunctionNotExistException e) {
14461441
throw new FunctionNotExistException(getName(), functionPath);
1447-
} catch (Catalog.DefinitionAlreadyExistException e) {
1448-
throw new RuntimeException(e);
1449-
} catch (Catalog.DefinitionNotExistException e) {
1442+
} catch (Catalog.DefinitionAlreadyExistException | Catalog.DefinitionNotExistException e) {
14501443
throw new RuntimeException(e);
14511444
}
14521445
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,11 @@ public static FlinkCatalog createCatalog(
6363
CatalogFactory.createCatalog(context, classLoader),
6464
catalogName,
6565
context.options().get(DEFAULT_DATABASE),
66-
classLoader,
6766
context.options());
6867
}
6968

7069
public static FlinkCatalog createCatalog(String catalogName, Catalog catalog, Options options) {
71-
return new FlinkCatalog(
72-
catalog,
73-
catalogName,
74-
Catalog.DEFAULT_DATABASE,
75-
FlinkCatalogFactory.class.getClassLoader(),
76-
options);
70+
return new FlinkCatalog(catalog, catalogName, Catalog.DEFAULT_DATABASE, options);
7771
}
7872

7973
public static Catalog createPaimonCatalog(Options catalogOptions) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ public static FlinkGenericCatalog createCatalog(
9696
CatalogContext.create(options, new FlinkFileIOLoader()), cl),
9797
name,
9898
options.get(DEFAULT_DATABASE),
99-
cl,
10099
options);
101100

102101
return new FlinkGenericCatalog(paimon, flinkCatalog);

0 commit comments

Comments
 (0)