diff --git a/README.md b/README.md
index 67506eec3e..828d4859cc 100644
--- a/README.md
+++ b/README.md
@@ -73,7 +73,7 @@ Parquet is a very active project, and new features are being added quickly. Here
* Type-specific encoding
* Hive integration (deprecated)
-* Pig integration
+* Pig integration (deprecated)
* Cascading integration (deprecated)
* Crunch integration
* Apache Arrow integration
@@ -132,24 +132,6 @@ See the APIs:
* [Record conversion API](https://github.com/apache/parquet-java/tree/master/parquet-column/src/main/java/org/apache/parquet/io/api)
* [Hadoop API](https://github.com/apache/parquet-java/tree/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api)
-## Apache Pig integration
-A [Loader](https://github.com/apache/parquet-java/blob/master/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java) and a [Storer](https://github.com/apache/parquet-java/blob/master/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetStorer.java) are provided to read and write Parquet files with Apache Pig
-
-Storing data into Parquet in Pig is simple:
-```
--- options you might want to fiddle with
-SET parquet.page.size 1048576 -- default. this is your min read/write unit.
-SET parquet.block.size 134217728 -- default. your memory budget for buffering data
-SET parquet.compression lzo -- or you can use none, gzip, snappy
-STORE mydata into '/some/path' USING parquet.pig.ParquetStorer;
-```
-Reading in Pig is also simple:
-```
-mydata = LOAD '/some/path' USING parquet.pig.ParquetLoader();
-```
-
-If the data was stored using Pig, things will "just work". If the data was stored using another method, you will need to provide the Pig schema equivalent to the data you stored (you can also write the schema to the file footer while writing it -- but that's pretty advanced). We will provide a basic automatic schema conversion soon.
-
## Hive integration
Hive integration is provided via the [parquet-hive](https://github.com/apache/parquet-java/tree/master/parquet-hive) sub-project.
diff --git a/parquet-pig-bundle/pom.xml b/parquet-pig-bundle/pom.xml
deleted file mode 100644
index e330298c99..0000000000
--- a/parquet-pig-bundle/pom.xml
+++ /dev/null
@@ -1,95 +0,0 @@
-
-
-
- org.apache.parquet
- parquet
- ../pom.xml
- 1.16.0-SNAPSHOT
-
-
- 4.0.0
-
- parquet-pig-bundle
- jar
-
- Apache Parquet Pig Bundle
- https://parquet.apache.org
-
-
-
-
-
-
- org.apache.parquet
- parquet-pig
- ${project.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
- package
-
- shade
-
-
- false
-
-
- *:*
-
- META-INF/LICENSE.txt
- META-INF/NOTICE.txt
-
-
-
-
-
- org.apache.parquet:parquet-*
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-dependency-plugin
-
-
-
- analyze-only
-
-
- true
-
-
-
-
-
-
-
-
diff --git a/parquet-pig-bundle/src/main/resources/META-INF/LICENSE b/parquet-pig-bundle/src/main/resources/META-INF/LICENSE
deleted file mode 100644
index b53f78f5c8..0000000000
--- a/parquet-pig-bundle/src/main/resources/META-INF/LICENSE
+++ /dev/null
@@ -1,248 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
---------------------------------------------------------------------------------
-
-This product depends on Apache Thrift and includes it in this binary artifact.
-
-Copyright: 2006-2010 The Apache Software Foundation.
-Home page: https://thrift.apache.org/
-License: http://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
-This product depends on SLF4J and includes SLF4J in this binary artifact. SLF4J
-is a simple logging facade for Java.
-
-Copyright: 2004-2013 QOS.ch.
-Home page: http://www.slf4j.org/
-License: http://slf4j.org/license.html (MIT license)
-
-The following is the SLF4J license (MIT):
-
- Copyright (c) 2004-2013 QOS.ch
- All rights reserved.
-
- Permission is hereby granted, free of charge, to any person obtaining
- a copy of this software and associated documentation files (the
- "Software"), to deal in the Software without restriction, including
- without limitation the rights to use, copy, modify, merge, publish,
- distribute, sublicense, and/or sell copies of the Software, and to
- permit persons to whom the Software is furnished to do so, subject to
- the following conditions:
-
- The above copyright notice and this permission notice shall be
- included in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
---------------------------------------------------------------------------------
-
-This project includes code from Daniel Lemire's JavaFastPFOR project in this
-binary artifact. The "Lemire" bit packing classes produced by parquet-generator
-are derived from the JavaFastPFOR project.
-
-Copyright: 2013 Daniel Lemire
-Home page: http://lemire.me/en/
-Project page: https://github.com/lemire/JavaFastPFOR
-License: Apache License Version 2.0 http://www.apache.org/licenses/LICENSE-2.0
-
---------------------------------------------------------------------------------
-
-This product depends on fastutil and includes it in this binary artifact.
-Fastutil provides type-specific collection implementations.
-
-Copyright: 2002-2014 Sebastiano Vigna
-Home page: http://fasutil.di.unimi.it/
-License: http://www.apache.org/licenses/LICENSE-2.0.html
-
---------------------------------------------------------------------------------
-
-This product depends on Jackson and includes it in this binary artifact.
-Jackson is a high-performance JSON processor.
-
-Copyright: 2007-2015 Tatu Saloranta and other contributors
-Home page: http://jackson.codehaus.org/
-License: http://www.apache.org/licenses/LICENSE-2.0.txt
-
diff --git a/parquet-pig-bundle/src/main/resources/org/apache/parquet/bundle b/parquet-pig-bundle/src/main/resources/org/apache/parquet/bundle
deleted file mode 100644
index fe95886d5c..0000000000
--- a/parquet-pig-bundle/src/main/resources/org/apache/parquet/bundle
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
diff --git a/parquet-pig/REVIEWERS.md b/parquet-pig/REVIEWERS.md
deleted file mode 100644
index 802f26081d..0000000000
--- a/parquet-pig/REVIEWERS.md
+++ /dev/null
@@ -1,25 +0,0 @@
-
-
-| Name | Apache Id | github id |
-|--------------------|------------|-------------|
-| Aniket Mokashi | aniket486 | aniket486 |
-| Dmitriy Ryaboy | dvryaboy | dvryaboy |
-| Jonathan Coveney | jcoveney | jcoveney |
-| Julien Le Dem | julien | julienledem |
diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml
deleted file mode 100644
index f1ed2c253f..0000000000
--- a/parquet-pig/pom.xml
+++ /dev/null
@@ -1,140 +0,0 @@
-
-
-
- org.apache.parquet
- parquet
- ../pom.xml
- 1.16.0-SNAPSHOT
-
-
- 4.0.0
-
- parquet-pig
- jar
-
- Apache Parquet Pig
- https://parquet.apache.org
-
-
-
-
-
-
- org.apache.parquet
- parquet-column
- ${project.version}
-
-
- org.apache.parquet
- parquet-hadoop
- ${project.version}
-
-
- org.apache.parquet
- parquet-common
- ${project.version}
-
-
- org.apache.parquet
- parquet-jackson
- ${project.version}
- runtime
-
-
- org.apache.pig
- pig
- ${pig.version}
- ${pig.classifier}
- provided
-
-
- org.apache.hadoop
- hadoop-client
- ${hadoop.version}
- provided
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-core
- ${hadoop.version}
- provided
-
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- provided
-
-
- ${jackson.groupId}
- jackson-databind
- ${jackson-databind.version}
-
-
- ${jackson.groupId}
- jackson-annotations
- ${jackson.version}
-
-
- org.apache.parquet
- parquet-column
- ${project.version}
- test-jar
- test
-
-
- org.antlr
- antlr-runtime
- 3.5.3
- test
-
-
- com.google.guava
- guava
- ${guava.version}
- test
-
-
- org.slf4j
- slf4j-log4j12
- ${slf4j.version}
- test
-
-
- org.slf4j
- slf4j-api
- ${slf4j.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
-
-
-
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
deleted file mode 100644
index 76e2e3290f..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import static java.util.Arrays.asList;
-import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths;
-import static org.apache.parquet.filter2.predicate.FilterApi.and;
-import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.eq;
-import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.gt;
-import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.lt;
-import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.not;
-import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.or;
-import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
-import static org.apache.parquet.pig.PigSchemaConverter.parsePigSchema;
-import static org.apache.parquet.pig.PigSchemaConverter.pigSchemaToString;
-import static org.apache.parquet.pig.PigSchemaConverter.serializeRequiredFieldList;
-import static org.apache.parquet.pig.TupleReadSupport.PARQUET_COLUMN_INDEX_ACCESS;
-import static org.apache.parquet.pig.TupleReadSupport.PARQUET_PIG_REQUIRED_FIELDS;
-import static org.apache.parquet.pig.TupleReadSupport.PARQUET_PIG_SCHEMA;
-import static org.apache.parquet.pig.TupleReadSupport.getPigSchemaFromMultipleFiles;
-import static org.apache.pig.Expression.BinaryExpression;
-import static org.apache.pig.Expression.Column;
-import static org.apache.pig.Expression.Const;
-import static org.apache.pig.Expression.OpType;
-
-import java.io.IOException;
-import java.lang.ref.Reference;
-import java.lang.ref.SoftReference;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.WeakHashMap;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
-import org.apache.parquet.filter2.predicate.Operators;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.parquet.hadoop.metadata.GlobalMetaData;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
-import org.apache.pig.Expression;
-import org.apache.pig.Expression.BetweenExpression;
-import org.apache.pig.Expression.InExpression;
-import org.apache.pig.Expression.UnaryExpression;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.LoadPredicatePushdown;
-import org.apache.pig.LoadPushDown;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.parser.ParserException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Pig Loader for the Parquet file format.
- */
-public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown, LoadPredicatePushdown {
- private static final Logger LOG = LoggerFactory.getLogger(ParquetLoader.class);
-
- public static final String ENABLE_PREDICATE_FILTER_PUSHDOWN = "parquet.pig.predicate.pushdown.enable";
- private static final boolean DEFAULT_PREDICATE_PUSHDOWN_ENABLED = false;
-
- // Using a weak hash map will ensure that the cache will be gc'ed when there is memory pressure
- static final Map>> inputFormatCache =
- new WeakHashMap>>();
-
- private Schema requestedSchema;
- private boolean columnIndexAccess;
-
- private String location;
- private boolean setLocationHasBeenCalled = false;
- private RecordReader reader;
- private ParquetInputFormat parquetInputFormat;
- private Schema schema;
- private RequiredFieldList requiredFieldList = null;
- protected String signature;
-
- /**
- * To read the content in its original schema
- */
- public ParquetLoader() {
- this(null);
- }
-
- /**
- * To read only a subset of the columns in the file
- *
- * @param requestedSchemaStr a subset of the original pig schema in the file
- */
- public ParquetLoader(String requestedSchemaStr) {
- this(parsePigSchema(requestedSchemaStr), false);
- }
-
- /**
- * To read only a subset of the columns in the file optionally assigned by
- * column positions. Using column positions allows for renaming the fields
- * and is more inline with the "schema-on-read" approach to accessing file
- * data.
- *
- * This will use the names provided in the requested schema and assign them
- * to column positions indicated by order.
- *
- * @param requestedSchemaStr a subset of the original pig schema in the file
- * @param columnIndexAccess use column index positions as opposed to name (default: false)
- */
- public ParquetLoader(String requestedSchemaStr, String columnIndexAccess) {
- this(parsePigSchema(requestedSchemaStr), Boolean.parseBoolean(columnIndexAccess));
- }
-
- /**
- * Use the provided schema to access the underlying file data.
- *
- * The same as the string based constructor but for programmatic use.
- *
- * @param requestedSchema a subset of the original pig schema in the file
- * @param columnIndexAccess use column index positions as opposed to name (default: false)
- */
- public ParquetLoader(Schema requestedSchema, boolean columnIndexAccess) {
- this.requestedSchema = requestedSchema;
- this.columnIndexAccess = columnIndexAccess;
- }
-
- @Override
- public void setLocation(String location, Job job) throws IOException {
- if (LOG.isDebugEnabled()) {
- String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
- LOG.debug("LoadFunc.setLocation({}, {})", location, jobToString);
- }
-
- setInput(location, job);
- }
-
- private void setInput(String location, Job job) throws IOException {
- this.setLocationHasBeenCalled = true;
- this.location = location;
- setInputPaths(job, location);
-
- // This is prior to load because the initial value comes from the constructor
- // not file metadata or pig framework and would get overwritten in initSchema().
- if (UDFContext.getUDFContext().isFrontend()) {
- storeInUDFContext(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
- }
-
- schema = PigSchemaConverter.parsePigSchema(getPropertyFromUDFContext(PARQUET_PIG_SCHEMA));
- requiredFieldList =
- PigSchemaConverter.deserializeRequiredFieldList(getPropertyFromUDFContext(PARQUET_PIG_REQUIRED_FIELDS));
- columnIndexAccess = Boolean.parseBoolean(getPropertyFromUDFContext(PARQUET_COLUMN_INDEX_ACCESS));
-
- initSchema(job);
-
- if (UDFContext.getUDFContext().isFrontend()) {
- // Setting for task-side loading via initSchema()
- storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
- storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
- }
-
- // Used by task-side loader via TupleReadSupport
- getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
- getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
- getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
-
- FilterPredicate filterPredicate = (FilterPredicate) getFromUDFContext(ParquetInputFormat.FILTER_PREDICATE);
- if (filterPredicate != null) {
- ParquetInputFormat.setFilterPredicate(getConfiguration(job), filterPredicate);
- }
- }
-
- @Override
- public InputFormat getInputFormat() throws IOException {
- LOG.debug("LoadFunc.getInputFormat()");
- return getParquetInputFormat();
- }
-
- private void checkSetLocationHasBeenCalled() {
- if (!setLocationHasBeenCalled) {
- throw new IllegalStateException("setLocation() must be called first");
- }
- }
-
- private static class UnregisteringParquetInputFormat extends ParquetInputFormat {
-
- private final String location;
-
- public UnregisteringParquetInputFormat(String location) {
- super(TupleReadSupport.class);
- this.location = location;
- }
-
- @Override
- public RecordReader createRecordReader(
- InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- // for local mode we don't want to keep that around
- inputFormatCache.remove(location);
- return super.createRecordReader(inputSplit, taskAttemptContext);
- }
- }
- ;
-
- private ParquetInputFormat getParquetInputFormat() throws ParserException {
- checkSetLocationHasBeenCalled();
- if (parquetInputFormat == null) {
- // unfortunately Pig will create many Loaders, so we cache the inputformat to avoid reading the metadata
- // more than once
- Reference> ref = inputFormatCache.get(location);
- parquetInputFormat = ref == null ? null : ref.get();
- if (parquetInputFormat == null) {
- parquetInputFormat = new UnregisteringParquetInputFormat(location);
- inputFormatCache.put(location, new SoftReference>(parquetInputFormat));
- }
- }
- return parquetInputFormat;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) throws IOException {
- LOG.debug("LoadFunc.prepareToRead({}, {})", reader, split);
- this.reader = reader;
- }
-
- @Override
- public Tuple getNext() throws IOException {
- try {
- if (reader.nextKeyValue()) {
- return (Tuple) reader.getCurrentValue();
- } else {
- return null;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ParquetDecodingException("Interrupted", e);
- }
- }
-
- @Override
- public String[] getPartitionKeys(String location, Job job) throws IOException {
- if (LOG.isDebugEnabled()) {
- String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
- LOG.debug("LoadMetadata.getPartitionKeys({}, {})", location, jobToString);
- }
- setInput(location, job);
- return null;
- }
-
- @Override
- public ResourceSchema getSchema(String location, Job job) throws IOException {
- if (LOG.isDebugEnabled()) {
- String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
- LOG.debug("LoadMetadata.getSchema({}, {})", location, jobToString);
- }
- setInput(location, job);
- return new ResourceSchema(schema);
- }
-
- private void initSchema(Job job) throws IOException {
- if (schema != null) {
- return;
- }
- if (schema == null && requestedSchema != null) {
- // this is only true in front-end
- schema = requestedSchema;
- }
- if (schema == null) {
- // no requested schema => use the schema from the file
- final GlobalMetaData globalMetaData = getParquetInputFormat().getGlobalMetaData(job);
- schema = getPigSchemaFromMultipleFiles(globalMetaData.getSchema(), globalMetaData.getKeyValueMetaData());
- }
- if (isElephantBirdCompatible(job)) {
- convertToElephantBirdCompatibleSchema(schema);
- }
- }
-
- private void convertToElephantBirdCompatibleSchema(Schema schema) {
- if (schema == null) {
- return;
- }
- for (FieldSchema fieldSchema : schema.getFields()) {
- if (fieldSchema.type == DataType.BOOLEAN) {
- fieldSchema.type = DataType.INTEGER;
- }
- convertToElephantBirdCompatibleSchema(fieldSchema.schema);
- }
- }
-
- private boolean isElephantBirdCompatible(Job job) {
- return getConfiguration(job).getBoolean(TupleReadSupport.PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE, false);
- }
-
- @Override
- public ResourceStatistics getStatistics(String location, Job job) throws IOException {
- if (LOG.isDebugEnabled()) {
- String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
- LOG.debug("LoadMetadata.getStatistics({}, {})", location, jobToString);
- }
- /* We need to call setInput since setLocation is not
- guaranteed to be called before this */
- setInput(location, job);
- long length = 0;
- try {
- for (InputSplit split : getParquetInputFormat().getSplits(job)) {
- length += split.getLength();
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted", e);
- Thread.currentThread().interrupt();
- return null;
- }
- ResourceStatistics stats = new ResourceStatistics();
- // TODO use pig-0.12 setBytes api when its available
- stats.setmBytes(length / 1024 / 1024);
- return stats;
- }
-
- @Override
- public void setPartitionFilter(Expression expression) throws IOException {
- LOG.debug("LoadMetadata.setPartitionFilter({})", expression);
- }
-
- @Override
- public List getFeatures() {
- return asList(LoadPushDown.OperatorSet.PROJECTION);
- }
-
- protected String getPropertyFromUDFContext(String key) {
- UDFContext udfContext = UDFContext.getUDFContext();
- return udfContext
- .getUDFProperties(this.getClass(), new String[] {signature})
- .getProperty(key);
- }
-
- protected Object getFromUDFContext(String key) {
- UDFContext udfContext = UDFContext.getUDFContext();
- return udfContext
- .getUDFProperties(this.getClass(), new String[] {signature})
- .get(key);
- }
-
- protected void storeInUDFContext(String key, Object value) {
- UDFContext udfContext = UDFContext.getUDFContext();
- java.util.Properties props = udfContext.getUDFProperties(this.getClass(), new String[] {signature});
- props.put(key, value);
- }
-
- @Override
- public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
- this.requiredFieldList = requiredFieldList;
-
- if (requiredFieldList == null) return null;
-
- schema = getSchemaFromRequiredFieldList(schema, requiredFieldList.getFields());
- storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
- storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
-
- return new RequiredFieldResponse(true);
- }
-
- @Override
- public void setUDFContextSignature(String signature) {
- this.signature = signature;
- }
-
- private Schema getSchemaFromRequiredFieldList(Schema schema, List fieldList)
- throws FrontendException {
- Schema s = new Schema();
- for (RequiredField rf : fieldList) {
- FieldSchema f;
- try {
- f = schema.getField(rf.getAlias()).clone();
- } catch (CloneNotSupportedException e) {
- throw new FrontendException("Clone not supported for the fieldschema", e);
- }
- if (rf.getSubFields() == null) {
- s.add(f);
- } else {
- Schema innerSchema = getSchemaFromRequiredFieldList(f.schema, rf.getSubFields());
- if (innerSchema == null) {
- return null;
- } else {
- f.schema = innerSchema;
- s.add(f);
- }
- }
- }
- return s;
- }
-
- @Override
- public List getPredicateFields(String s, Job job) throws IOException {
- if (!job.getConfiguration().getBoolean(ENABLE_PREDICATE_FILTER_PUSHDOWN, DEFAULT_PREDICATE_PUSHDOWN_ENABLED)) {
- return null;
- }
-
- List fields = new ArrayList();
-
- for (FieldSchema field : schema.getFields()) {
- switch (field.type) {
- case DataType.BOOLEAN:
- case DataType.INTEGER:
- case DataType.LONG:
- case DataType.FLOAT:
- case DataType.DOUBLE:
- case DataType.CHARARRAY:
- fields.add(field.alias);
- break;
- default:
- // Skip BYTEARRAY, TUPLE, MAP, BAG, DATETIME, BIGINTEGER, BIGDECIMAL
- break;
- }
- }
-
- return fields;
- }
-
- @Override
- public List getSupportedExpressionTypes() {
- OpType supportedTypes[] = {
- OpType.OP_EQ,
- OpType.OP_NE,
- OpType.OP_GT,
- OpType.OP_GE,
- OpType.OP_LT,
- OpType.OP_LE,
- OpType.OP_AND,
- OpType.OP_OR,
- // OpType.OP_BETWEEN, // not implemented in Pig yet
- // OpType.OP_IN, // not implemented in Pig yet
- OpType.OP_NOT
- };
-
- return Arrays.asList(supportedTypes);
- }
-
- @Override
- public void setPushdownPredicate(Expression e) throws IOException {
- LOG.info("Pig pushdown expression: {}", e);
-
- FilterPredicate pred = buildFilter(e);
- LOG.info("Parquet filter predicate expression: {}", pred);
-
- storeInUDFContext(ParquetInputFormat.FILTER_PREDICATE, pred);
- }
-
- private FilterPredicate buildFilter(Expression e) {
- OpType op = e.getOpType();
-
- if (e instanceof BinaryExpression) {
- Expression lhs = ((BinaryExpression) e).getLhs();
- Expression rhs = ((BinaryExpression) e).getRhs();
-
- switch (op) {
- case OP_AND:
- return and(buildFilter(lhs), buildFilter(rhs));
- case OP_OR:
- return or(buildFilter(lhs), buildFilter(rhs));
- case OP_BETWEEN:
- BetweenExpression between = (BetweenExpression) rhs;
- return and(
- buildFilter(OpType.OP_GE, (Column) lhs, (Const) between.getLower()),
- buildFilter(OpType.OP_LE, (Column) lhs, (Const) between.getUpper()));
- case OP_IN:
- FilterPredicate current = null;
- for (Object value : ((InExpression) rhs).getValues()) {
- FilterPredicate next = buildFilter(OpType.OP_EQ, (Column) lhs, (Const) value);
- if (current != null) {
- current = or(current, next);
- } else {
- current = next;
- }
- }
- return current;
- }
-
- if (lhs instanceof Column && rhs instanceof Const) {
- return buildFilter(op, (Column) lhs, (Const) rhs);
- } else if (lhs instanceof Const && rhs instanceof Column) {
- return buildFilter(op, (Column) rhs, (Const) lhs);
- }
- } else if (e instanceof UnaryExpression && op == OpType.OP_NOT) {
- return LogicalInverseRewriter.rewrite(not(buildFilter(((UnaryExpression) e).getExpression())));
- }
-
- throw new RuntimeException("Could not build filter for expression: " + e);
- }
-
- private FilterPredicate buildFilter(OpType op, Column col, Const value) {
- String name = col.getName();
- try {
- FieldSchema f = schema.getField(name);
- switch (f.type) {
- case DataType.BOOLEAN:
- Operators.BooleanColumn boolCol = booleanColumn(name);
- switch (op) {
- case OP_EQ:
- return eq(boolCol, getValue(value, boolCol.getColumnType()));
- case OP_NE:
- return notEq(boolCol, getValue(value, boolCol.getColumnType()));
- default:
- throw new RuntimeException(
- "Operation " + op + " not supported for boolean column: " + name);
- }
- case DataType.INTEGER:
- Operators.IntColumn intCol = intColumn(name);
- return op(op, intCol, value);
- case DataType.LONG:
- Operators.LongColumn longCol = longColumn(name);
- return op(op, longCol, value);
- case DataType.FLOAT:
- Operators.FloatColumn floatCol = floatColumn(name);
- return op(op, floatCol, value);
- case DataType.DOUBLE:
- Operators.DoubleColumn doubleCol = doubleColumn(name);
- return op(op, doubleCol, value);
- case DataType.CHARARRAY:
- Operators.BinaryColumn binaryCol = binaryColumn(name);
- return op(op, binaryCol, value);
- default:
- throw new RuntimeException("Unsupported type " + f.type + " for field: " + name);
- }
- } catch (FrontendException e) {
- throw new RuntimeException("Error processing pushdown for column:" + col, e);
- }
- }
-
- private static , COL extends Operators.Column & Operators.SupportsLtGt>
- FilterPredicate op(Expression.OpType op, COL col, Const valueExpr) {
- C value = getValue(valueExpr, col.getColumnType());
- switch (op) {
- case OP_EQ:
- return eq(col, value);
- case OP_NE:
- return notEq(col, value);
- case OP_GT:
- return gt(col, value);
- case OP_GE:
- return gtEq(col, value);
- case OP_LT:
- return lt(col, value);
- case OP_LE:
- return ltEq(col, value);
- }
- return null;
- }
-
- private static > C getValue(Const valueExpr, Class type) {
- Object value = valueExpr.getValue();
-
- if (value instanceof String) {
- value = Binary.fromString((String) value);
- }
-
- return type.cast(value);
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetStorer.java b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetStorer.java
deleted file mode 100644
index c70f71ca9f..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetStorer.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import java.io.IOException;
-import java.util.Properties;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.StoreMetadata;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Utils;
-import org.apache.pig.parser.ParserException;
-
-/**
- * A pig storer implementation for the Parquet file format.
- * see {@link ParquetOutputFormat} for available parameters.
- *
- * It uses a TupleWriteSupport to write Tuples into the ParquetOutputFormat
- * The Pig schema is automatically converted to the Parquet schema using {@link PigSchemaConverter}
- * and stored in the file
- */
-public class ParquetStorer extends StoreFunc implements StoreMetadata {
-
- private static final String SCHEMA = "schema";
-
- private RecordWriter recordWriter;
-
- private String signature;
-
- private Properties getProperties() {
- UDFContext udfc = UDFContext.getUDFContext();
- Properties p = udfc.getUDFProperties(this.getClass(), new String[] {signature});
- return p;
- }
-
- private Schema getSchema() {
- try {
- final String schemaString = getProperties().getProperty(SCHEMA);
- if (schemaString == null) {
- throw new ParquetEncodingException("Can not store relation in Parquet as the schema is unknown");
- }
- return Utils.getSchemaFromString(schemaString);
- } catch (ParserException e) {
- throw new ParquetEncodingException("can not get schema from context", e);
- }
- }
-
- public ParquetStorer() {}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
- super.setStoreFuncUDFContextSignature(signature);
- this.signature = signature;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- getProperties().setProperty(SCHEMA, s.toString());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public OutputFormat getOutputFormat() throws IOException {
- Schema pigSchema = getSchema();
- return new ParquetOutputFormat(new TupleWriteSupport(pigSchema));
- }
-
- /**
- * {@inheritDoc}
- */
- @SuppressWarnings({"rawtypes", "unchecked"}) // that's how the base class is defined
- @Override
- public void prepareToWrite(RecordWriter recordWriter) throws IOException {
- this.recordWriter = recordWriter;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void putNext(Tuple tuple) throws IOException {
- try {
- this.recordWriter.write(null, tuple);
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new ParquetEncodingException("Interrupted while writing", e);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void setStoreLocation(String location, Job job) throws IOException {
- FileOutputFormat.setOutputPath(job, new Path(location));
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException {}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void storeStatistics(ResourceStatistics resourceStatistics, String location, Job job) throws IOException {}
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/PigMetaData.java b/parquet-pig/src/main/java/org/apache/parquet/pig/PigMetaData.java
deleted file mode 100644
index 429848890b..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/PigMetaData.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import static org.apache.parquet.pig.PigSchemaConverter.pigSchemaToString;
-
-import java.util.Map;
-import java.util.Set;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Represents Pig meta data stored in the file footer
- */
-public class PigMetaData {
-
- private static final String PIG_SCHEMA = "pig.schema";
-
- /**
- * @param keyValueMetaData the key values from the footer
- * @return the parsed Pig metadata
- */
- public static PigMetaData fromMetaData(Map keyValueMetaData) {
- if (keyValueMetaData.containsKey(PIG_SCHEMA)) {
- return new PigMetaData(keyValueMetaData.get(PIG_SCHEMA));
- }
- return null;
- }
-
- /**
- * @param keyValueMetaData the key values from the footers
- * @return the list pig schemas from the footers
- */
- public static Set getPigSchemas(Map> keyValueMetaData) {
- return keyValueMetaData.get(PIG_SCHEMA);
- }
-
- private String pigSchema;
-
- /**
- * @param pigSchema the pig schema of the file
- */
- public PigMetaData(Schema pigSchema) {
- this.pigSchema = pigSchemaToString(pigSchema);
- }
-
- /**
- * @param pigSchema the pig schema of the file
- */
- public PigMetaData(String pigSchema) {
- this.pigSchema = pigSchema;
- }
-
- /**
- * @param pigSchema the pig schema of the file
- */
- public void setPigSchema(String pigSchema) {
- this.pigSchema = pigSchema;
- }
-
- /**
- * @return the pig schema of the file
- */
- public String getPigSchema() {
- return pigSchema;
- }
-
- /**
- * @param map where to add the key values representing this metadata
- */
- public void addToMetaData(Map map) {
- map.put(PIG_SCHEMA, pigSchema);
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
deleted file mode 100644
index 517f4b314b..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import static java.util.Optional.of;
-import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import org.apache.parquet.schema.ConversionPatterns;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
-import org.apache.parquet.schema.Type;
-import org.apache.parquet.schema.Type.Repetition;
-import org.apache.parquet.schema.Types;
-import org.apache.pig.LoadPushDown.RequiredField;
-import org.apache.pig.LoadPushDown.RequiredFieldList;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.Pair;
-import org.apache.pig.impl.util.Utils;
-import org.apache.pig.parser.ParserException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Converts a Pig Schema into a Parquet schema
- *
- * Bags are converted into an optional group containing one repeated group field to preserve distinction between empty bag and null.
- * Map are converted into an optional group containing one repeated group field of (key, value).
- * anonymous fields are named field_{index}. (in most cases pig already gives them an alias val_{int}, so this rarely happens)
- */
-public class PigSchemaConverter {
- private static final Logger LOG = LoggerFactory.getLogger(PigSchemaConverter.class);
- private static final String MAP_REPEATED_NAME = "key_value";
- static final String ARRAY_VALUE_NAME = "value";
- private ColumnAccess columnAccess;
-
- public PigSchemaConverter() {
- this(false);
- }
-
- /**
- * @param columnIndexAccess toggle between name and index based access (default: false)
- */
- public PigSchemaConverter(boolean columnIndexAccess) {
- this.columnAccess = columnIndexAccess ? new ColumnIndexAccess() : new ColumnNameAccess();
- }
-
- /**
- * @param pigSchemaString the pig schema to parse
- * @return the parsed pig schema
- */
- public static Schema parsePigSchema(String pigSchemaString) {
- try {
- return pigSchemaString == null ? null : Utils.getSchemaFromString(pigSchemaString);
- } catch (ParserException e) {
- throw new SchemaConversionException("could not parse Pig schema: " + pigSchemaString, e);
- }
- }
-
- interface ColumnAccess {
- List filterTupleSchema(GroupType schemaToFilter, Schema pigSchema, RequiredFieldList requiredFieldsList);
- }
-
- class ColumnIndexAccess implements ColumnAccess {
- @Override
- public List filterTupleSchema(
- GroupType schemaToFilter, Schema pigSchema, RequiredFieldList requiredFieldsList) {
- List newFields = new ArrayList();
- List> indexedFields = new ArrayList>();
-
- try {
- if (requiredFieldsList == null) {
- int index = 0;
- for (FieldSchema fs : pigSchema.getFields()) {
- indexedFields.add(new Pair(fs, index++));
- }
- } else {
- for (RequiredField rf : requiredFieldsList.getFields()) {
- indexedFields.add(
- new Pair(pigSchema.getField(rf.getAlias()), rf.getIndex()));
- }
- }
-
- for (Pair p : indexedFields) {
- FieldSchema fieldSchema = pigSchema.getField(p.first.alias);
- if (p.second < schemaToFilter.getFieldCount()) {
- Type type = schemaToFilter.getFields().get(p.second);
- newFields.add(filter(type, fieldSchema));
- }
- }
- } catch (FrontendException e) {
- throw new RuntimeException("Failed to filter requested fields", e);
- }
- return newFields;
- }
- }
-
- class ColumnNameAccess implements ColumnAccess {
- @Override
- public List filterTupleSchema(
- GroupType schemaToFilter, Schema requestedPigSchema, RequiredFieldList requiredFieldsList) {
- List fields = requestedPigSchema.getFields();
- List newFields = new ArrayList();
- for (int i = 0; i < fields.size(); i++) {
- FieldSchema fieldSchema = fields.get(i);
- String name = name(fieldSchema.alias, "field_" + i);
- if (schemaToFilter.containsField(name)) {
- newFields.add(filter(schemaToFilter.getType(name), fieldSchema));
- }
- }
- return newFields;
- }
- }
-
- /**
- * @param pigSchema the pig schema to turn into a string representation
- * @return the sctring representation of the schema
- */
- static String pigSchemaToString(Schema pigSchema) {
- final String pigSchemaString = pigSchema.toString();
- return pigSchemaString.substring(1, pigSchemaString.length() - 1);
- }
-
- public static RequiredFieldList deserializeRequiredFieldList(String requiredFieldString) {
- if (requiredFieldString == null) {
- return null;
- }
-
- try {
- return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
- } catch (IOException e) {
- throw new RuntimeException("Failed to deserialize pushProjection", e);
- }
- }
-
- static String serializeRequiredFieldList(RequiredFieldList requiredFieldList) {
- try {
- return ObjectSerializer.serialize(requiredFieldList);
- } catch (IOException e) {
- throw new RuntimeException("Failed to searlize required fields.", e);
- }
- }
-
- /**
- * converts a parquet schema into a pig schema
- *
- * @param parquetSchema the parquet schema to convert to Pig schema
- * @return the resulting schema
- */
- public Schema convert(MessageType parquetSchema) {
- return convertFields(parquetSchema.getFields());
- }
-
- /**
- * @param parquetType the type to convert
- * @return the resulting schema (containing one field)
- */
- public Schema convertField(Type parquetType) {
- return convertFields(Arrays.asList(parquetType));
- }
-
- private Schema convertFields(List parquetFields) {
- List fields = new ArrayList();
- for (Type parquetType : parquetFields) {
- try {
- FieldSchema innerfieldSchema = getFieldSchema(parquetType);
- if (parquetType.isRepetition(Repetition.REPEATED)) {
- Schema bagSchema = new Schema(Arrays.asList(innerfieldSchema));
- fields.add(new FieldSchema(null, bagSchema, DataType.BAG));
- } else {
- fields.add(innerfieldSchema);
- }
- } catch (FrontendException fe) {
- throw new SchemaConversionException("can't convert " + parquetType, fe);
- }
- }
- return new Schema(fields);
- }
-
- private FieldSchema getSimpleFieldSchema(final String fieldName, Type parquetType) throws FrontendException {
- final PrimitiveTypeName parquetPrimitiveTypeName =
- parquetType.asPrimitiveType().getPrimitiveTypeName();
- final LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
- return parquetPrimitiveTypeName.convert(
- new PrimitiveTypeNameConverter() {
- @Override
- public FieldSchema convertFLOAT(PrimitiveTypeName primitiveTypeName) throws FrontendException {
- return new FieldSchema(fieldName, null, DataType.FLOAT);
- }
-
- @Override
- public FieldSchema convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws FrontendException {
- return new FieldSchema(fieldName, null, DataType.DOUBLE);
- }
-
- @Override
- public FieldSchema convertINT32(PrimitiveTypeName primitiveTypeName) throws FrontendException {
- return new FieldSchema(fieldName, null, DataType.INTEGER);
- }
-
- @Override
- public FieldSchema convertINT64(PrimitiveTypeName primitiveTypeName) throws FrontendException {
- return new FieldSchema(fieldName, null, DataType.LONG);
- }
-
- @Override
- public FieldSchema convertINT96(PrimitiveTypeName primitiveTypeName) throws FrontendException {
- LOG.warn("Converting type " + primitiveTypeName + " to bytearray");
- return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
- }
-
- @Override
- public FieldSchema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName)
- throws FrontendException {
- if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
- return new FieldSchema(fieldName, null, DataType.BIGDECIMAL);
- } else {
- return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
- }
- }
-
- @Override
- public FieldSchema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws FrontendException {
- return new FieldSchema(fieldName, null, DataType.BOOLEAN);
- }
-
- @Override
- public FieldSchema convertBINARY(PrimitiveTypeName primitiveTypeName) throws FrontendException {
- if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
- return new FieldSchema(fieldName, null, DataType.CHARARRAY);
- } else {
- return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
- }
- }
- });
- }
-
- /*
- * RuntimeException class to workaround throwing checked FrontendException in logical type visitors.
- * Wrap the FrontendException inside the visitor in an inner catch block, and rethrow it outside of the visitor
- */
- private static final class FrontendExceptionWrapper extends RuntimeException {
- final FrontendException frontendException;
-
- FrontendExceptionWrapper(FrontendException frontendException) {
- this.frontendException = frontendException;
- }
- }
-
- private FieldSchema getComplexFieldSchema(String fieldName, Type parquetType) throws FrontendException {
- GroupType parquetGroupType = parquetType.asGroupType();
- LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
- if (logicalTypeAnnotation != null) {
- try {
- return logicalTypeAnnotation
- .accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() {
- @Override
- public Optional visit(
- LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
- try {
- // verify that its a map
- if (parquetGroupType.getFieldCount() != 1
- || parquetGroupType.getType(0).isPrimitive()) {
- throw new SchemaConversionException("Invalid map type " + parquetGroupType);
- }
- GroupType mapKeyValType =
- parquetGroupType.getType(0).asGroupType();
- if (!mapKeyValType.isRepetition(Repetition.REPEATED)
- || (mapKeyValType.getLogicalTypeAnnotation() != null
- && !mapKeyValType
- .getLogicalTypeAnnotation()
- .equals(
- LogicalTypeAnnotation.MapKeyValueTypeAnnotation
- .getInstance()))
- || mapKeyValType.getFieldCount() != 2) {
- throw new SchemaConversionException("Invalid map type " + parquetGroupType);
- }
- // if value is not primitive wrap it in a tuple
- Type valueType = mapKeyValType.getType(1);
- Schema s = convertField(valueType);
- s.getField(0).alias = null;
- return of(new FieldSchema(fieldName, s, DataType.MAP));
- } catch (FrontendException e) {
- throw new FrontendExceptionWrapper(e);
- }
- }
-
- @Override
- public Optional visit(
- LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
- try {
- Type type = parquetGroupType.getType(0);
- if (parquetGroupType.getFieldCount() != 1 || type.isPrimitive()) {
- // an array is effectively a bag
- Schema primitiveSchema = new Schema(
- getSimpleFieldSchema(parquetGroupType.getFieldName(0), type));
- Schema tupleSchema = new Schema(
- new FieldSchema(ARRAY_VALUE_NAME, primitiveSchema, DataType.TUPLE));
- return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
- }
- GroupType tupleType =
- parquetGroupType.getType(0).asGroupType();
- if (!tupleType.isRepetition(Repetition.REPEATED)) {
- throw new SchemaConversionException("Invalid list type " + parquetGroupType);
- }
- Schema tupleSchema = new Schema(new FieldSchema(
- tupleType.getName(), convertFields(tupleType.getFields()), DataType.TUPLE));
- return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
- } catch (FrontendException e) {
- throw new FrontendExceptionWrapper(e);
- }
- }
- })
- .orElseThrow(() -> new SchemaConversionException(
- "Unexpected original type for " + parquetType + ": " + logicalTypeAnnotation));
- } catch (FrontendExceptionWrapper e) {
- throw e.frontendException;
- }
- } else {
- // if original type is not set, we assume it to be tuple
- return new FieldSchema(fieldName, convertFields(parquetGroupType.getFields()), DataType.TUPLE);
- }
- }
-
- private FieldSchema getFieldSchema(Type parquetType) throws FrontendException {
- final String fieldName = parquetType.getName();
- if (parquetType.isPrimitive()) {
- return getSimpleFieldSchema(fieldName, parquetType);
- } else {
- return getComplexFieldSchema(fieldName, parquetType);
- }
- }
-
- /**
- * @param pigSchema the pig schema
- * @return the resulting Parquet schema
- */
- public MessageType convert(Schema pigSchema) {
- return new MessageType("pig_schema", convertTypes(pigSchema));
- }
-
- private Type[] convertTypes(Schema pigSchema) {
- List fields = pigSchema.getFields();
- Type[] types = new Type[fields.size()];
- for (int i = 0; i < types.length; i++) {
- types[i] = convert(fields.get(i), i);
- }
- return types;
- }
-
- private Type convert(FieldSchema fieldSchema, String defaultAlias) {
- String name = name(fieldSchema.alias, defaultAlias);
- return convertWithName(fieldSchema, name);
- }
-
- private Type convertWithName(FieldSchema fieldSchema, String name) {
- try {
- switch (fieldSchema.type) {
- case DataType.BAG:
- return convertBag(name, fieldSchema);
- case DataType.TUPLE:
- return convertTuple(name, fieldSchema, Repetition.OPTIONAL);
- case DataType.MAP:
- return convertMap(name, fieldSchema);
- case DataType.BOOLEAN:
- return primitive(name, PrimitiveTypeName.BOOLEAN);
- case DataType.CHARARRAY:
- return primitive(name, PrimitiveTypeName.BINARY, stringType());
- case DataType.INTEGER:
- return primitive(name, PrimitiveTypeName.INT32);
- case DataType.LONG:
- return primitive(name, PrimitiveTypeName.INT64);
- case DataType.FLOAT:
- return primitive(name, PrimitiveTypeName.FLOAT);
- case DataType.DOUBLE:
- return primitive(name, PrimitiveTypeName.DOUBLE);
- case DataType.DATETIME:
- throw new UnsupportedOperationException();
- case DataType.BYTEARRAY:
- return primitive(name, PrimitiveTypeName.BINARY);
- default:
- throw new SchemaConversionException(
- "Unknown type " + fieldSchema.type + " " + DataType.findTypeName(fieldSchema.type));
- }
- } catch (FrontendException e) {
- throw new SchemaConversionException("can't convert " + fieldSchema, e);
- }
- }
-
- private Type convert(FieldSchema fieldSchema, int index) {
- return convert(fieldSchema, "field_" + index);
- }
-
- /**
- * @param name
- * @param fieldSchema
- * @return an optional group containing one repeated group field
- * @throws FrontendException
- */
- private GroupType convertBag(String name, FieldSchema fieldSchema) throws FrontendException {
- FieldSchema innerField = fieldSchema.schema.getField(0);
- return ConversionPatterns.listType(
- Repetition.OPTIONAL,
- name,
- convertTuple(name(innerField.alias, "bag"), innerField, Repetition.REPEATED));
- }
-
- private String name(String fieldAlias, String defaultName) {
- return fieldAlias == null ? defaultName : fieldAlias;
- }
-
- private Type primitive(String name, PrimitiveTypeName primitive, LogicalTypeAnnotation logicalTypeAnnotation) {
- return Types.primitive(primitive, Repetition.OPTIONAL)
- .as(logicalTypeAnnotation)
- .named(name);
- }
-
- private PrimitiveType primitive(String name, PrimitiveTypeName primitive) {
- return Types.primitive(primitive, Repetition.OPTIONAL).named(name);
- }
-
- /**
- * @param alias
- * @param fieldSchema
- * @return an optional group containing one repeated group field (key, value)
- * @throws FrontendException
- */
- private GroupType convertMap(String alias, FieldSchema fieldSchema) {
- Schema innerSchema = fieldSchema.schema;
- if (innerSchema == null || innerSchema.size() != 1) {
- throw new SchemaConversionException(
- "Invalid map Schema, schema should contain exactly one field: " + fieldSchema);
- }
- FieldSchema innerField = null;
- try {
- innerField = innerSchema.getField(0);
- } catch (FrontendException fe) {
- throw new SchemaConversionException("Invalid map schema, cannot infer innerschema: ", fe);
- }
- Type convertedValue = convertWithName(innerField, "value");
- return ConversionPatterns.stringKeyMapType(
- Repetition.OPTIONAL, alias, name(innerField.alias, MAP_REPEATED_NAME), convertedValue);
- }
-
- private GroupType convertTuple(String alias, FieldSchema field, Repetition repetition) {
- return new GroupType(repetition, alias, convertTypes(field.schema));
- }
-
- /**
- * filters a Parquet schema based on a pig schema for projection
- *
- * @param schemaToFilter the schema to be filter
- * @param requestedPigSchema the pig schema to filter it with
- * @return the resulting filtered schema
- */
- public MessageType filter(MessageType schemaToFilter, Schema requestedPigSchema) {
- return filter(schemaToFilter, requestedPigSchema, null);
- }
-
- /**
- * filters a Parquet schema based on a pig schema for projection
- *
- * @param schemaToFilter the schema to be filter
- * @param requestedPigSchema the pig schema to filter it with
- * @param requiredFieldList projected required fields
- * @return the resulting filtered schema
- */
- public MessageType filter(
- MessageType schemaToFilter, Schema requestedPigSchema, RequiredFieldList requiredFieldList) {
- try {
- if (LOG.isDebugEnabled())
- LOG.debug("filtering schema:\n" + schemaToFilter + "\nwith requested pig schema:\n "
- + requestedPigSchema);
- List result = columnAccess.filterTupleSchema(schemaToFilter, requestedPigSchema, requiredFieldList);
- if (LOG.isDebugEnabled()) LOG.debug("schema:\n" + schemaToFilter + "\nfiltered to:\n" + result);
- return new MessageType(schemaToFilter.getName(), result);
- } catch (RuntimeException e) {
- throw new RuntimeException("can't filter " + schemaToFilter + " with " + requestedPigSchema, e);
- }
- }
-
- private Type filter(Type type, FieldSchema fieldSchema) {
- if (LOG.isDebugEnabled()) LOG.debug("filtering type:\n" + type + "\nwith:\n " + fieldSchema);
- try {
- switch (fieldSchema.type) {
- case DataType.BAG:
- return filterBag(type.asGroupType(), fieldSchema);
- case DataType.MAP:
- return filterMap(type.asGroupType(), fieldSchema);
- case DataType.TUPLE:
- return filterTuple(type.asGroupType(), fieldSchema);
- default:
- return type;
- }
- } catch (FrontendException e) {
- throw new SchemaConversionException("can't filter " + type + " with " + fieldSchema, e);
- } catch (RuntimeException e) {
- throw new RuntimeException("can't filter " + type + " with " + fieldSchema, e);
- }
- }
-
- private Type filterTuple(GroupType tupleType, FieldSchema tupleFieldSchema) throws FrontendException {
- if (LOG.isDebugEnabled()) LOG.debug("filtering TUPLE schema:\n" + tupleType + "\nwith:\n " + tupleFieldSchema);
- return tupleType.withNewFields(columnAccess.filterTupleSchema(tupleType, tupleFieldSchema.schema, null));
- }
-
- private Type filterMap(GroupType mapType, FieldSchema mapFieldSchema) throws FrontendException {
- if (LOG.isDebugEnabled()) LOG.debug("filtering MAP schema:\n" + mapType + "\nwith:\n " + mapFieldSchema);
- if (mapType.getFieldCount() != 1) {
- throw new RuntimeException("not unwrapping the right type, this should be a Map: " + mapType);
- }
- GroupType nested = mapType.getType(0).asGroupType();
- if (nested.getFieldCount() != 2) {
- throw new RuntimeException("this should be a Map Key/Value: " + mapType);
- }
- FieldSchema innerField = mapFieldSchema.schema.getField(0);
- return mapType.withNewFields(nested.withNewFields(nested.getType(0), filter(nested.getType(1), innerField)));
- }
-
- private Type filterBag(GroupType bagType, FieldSchema bagFieldSchema) throws FrontendException {
- if (LOG.isDebugEnabled()) LOG.debug("filtering BAG schema:\n" + bagType + "\nwith:\n " + bagFieldSchema);
- if (bagType.getFieldCount() != 1) {
- throw new RuntimeException("not unwrapping the right type, this should be a Bag: " + bagType);
- }
- Type nested = bagType.getType(0);
- FieldSchema innerField = bagFieldSchema.schema.getField(0);
- if (nested.isPrimitive()
- || nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
- || nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
- // Bags always contain tuples => we skip the extra tuple that was inserted in that case.
- innerField = innerField.schema.getField(0);
- }
- return bagType.withNewFields(filter(nested, innerField));
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/SchemaConversionException.java b/parquet-pig/src/main/java/org/apache/parquet/pig/SchemaConversionException.java
deleted file mode 100644
index e11e4e56c9..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/SchemaConversionException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import org.apache.parquet.ParquetRuntimeException;
-
-/**
- * thrown if the schema can not be converted
- */
-public class SchemaConversionException extends ParquetRuntimeException {
- private static final long serialVersionUID = 1L;
-
- public SchemaConversionException() {}
-
- public SchemaConversionException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public SchemaConversionException(String message) {
- super(message);
- }
-
- public SchemaConversionException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleConversionException.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleConversionException.java
deleted file mode 100644
index 04c8797f2b..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleConversionException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import org.apache.parquet.ParquetRuntimeException;
-
-public class TupleConversionException extends ParquetRuntimeException {
- private static final long serialVersionUID = 1L;
-
- public TupleConversionException() {
- super();
- }
-
- public TupleConversionException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public TupleConversionException(String message) {
- super(message);
- }
-
- public TupleConversionException(Throwable cause) {
- super(cause);
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
deleted file mode 100644
index 2a725a025f..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import static org.apache.parquet.pig.PigSchemaConverter.parsePigSchema;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.conf.HadoopParquetConfiguration;
-import org.apache.parquet.conf.ParquetConfiguration;
-import org.apache.parquet.hadoop.api.InitContext;
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.pig.convert.TupleRecordMaterializer;
-import org.apache.parquet.schema.IncompatibleSchemaModificationException;
-import org.apache.parquet.schema.MessageType;
-import org.apache.pig.LoadPushDown.RequiredFieldList;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Read support for Pig Tuple
- * a Pig MetaDataBlock is expected in the initialization call
- */
-public class TupleReadSupport extends ReadSupport {
- static final String PARQUET_PIG_SCHEMA = "parquet.pig.schema";
- static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.private.pig.column.index.access";
- static final String PARQUET_PIG_REQUIRED_FIELDS = "parquet.private.pig.required.fields";
- static final String PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE = "parquet.pig.elephantbird.compatible";
- private static final Logger LOG = LoggerFactory.getLogger(TupleReadSupport.class);
-
- private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter(false);
-
- /**
- * @param configuration the configuration for the current job
- * @return the pig schema requested by the user or null if none.
- */
- static Schema getPigSchema(Configuration configuration) {
- return getPigSchema(new HadoopParquetConfiguration(configuration));
- }
-
- /**
- * @param configuration the configuration
- * @return the pig schema requested by the user or null if none.
- */
- static Schema getPigSchema(ParquetConfiguration configuration) {
- return parsePigSchema(configuration.get(PARQUET_PIG_SCHEMA));
- }
-
- /**
- * @param configuration configuration for the current job
- * @return List of required fields from pushProjection
- */
- static RequiredFieldList getRequiredFields(Configuration configuration) {
- return getRequiredFields(new HadoopParquetConfiguration(configuration));
- }
-
- /**
- * @param configuration configuration
- * @return List of required fields from pushProjection
- */
- static RequiredFieldList getRequiredFields(ParquetConfiguration configuration) {
- String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
-
- if (requiredFieldString == null) {
- return null;
- }
-
- try {
- return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
- } catch (IOException iOException) {
- throw new RuntimeException("Failed to deserialize pushProjection");
- }
- }
-
- /**
- * @param fileSchema the parquet schema from the file
- * @param keyValueMetaData the extra meta data from the files
- * @return the pig schema according to the file
- */
- static Schema getPigSchemaFromMultipleFiles(MessageType fileSchema, Map> keyValueMetaData) {
- Set pigSchemas = PigMetaData.getPigSchemas(keyValueMetaData);
- if (pigSchemas == null) {
- return pigSchemaConverter.convert(fileSchema);
- }
- Schema mergedPigSchema = null;
- for (String pigSchemaString : pigSchemas) {
- try {
- mergedPigSchema = union(mergedPigSchema, parsePigSchema(pigSchemaString));
- } catch (FrontendException e) {
- throw new ParquetDecodingException("can not merge " + pigSchemaString + " into " + mergedPigSchema, e);
- }
- }
- return mergedPigSchema;
- }
-
- /**
- * @param fileSchema the parquet schema from the file
- * @param keyValueMetaData the extra meta data from the file
- * @return the pig schema according to the file
- */
- static Schema getPigSchemaFromFile(MessageType fileSchema, Map keyValueMetaData) {
- PigMetaData pigMetaData = PigMetaData.fromMetaData(keyValueMetaData);
- if (pigMetaData == null) {
- return pigSchemaConverter.convert(fileSchema);
- }
- return parsePigSchema(pigMetaData.getPigSchema());
- }
-
- private static Schema union(Schema merged, Schema pigSchema) throws FrontendException {
- List fields = new ArrayList();
- if (merged == null) {
- return pigSchema;
- }
- // merging existing fields
- for (FieldSchema fieldSchema : merged.getFields()) {
- FieldSchema newFieldSchema = pigSchema.getField(fieldSchema.alias);
- if (newFieldSchema == null) {
- fields.add(fieldSchema);
- } else {
- fields.add(union(fieldSchema, newFieldSchema));
- }
- }
- // adding new fields
- for (FieldSchema newFieldSchema : pigSchema.getFields()) {
- FieldSchema oldFieldSchema = merged.getField(newFieldSchema.alias);
- if (oldFieldSchema == null) {
- fields.add(newFieldSchema);
- }
- }
- return new Schema(fields);
- }
-
- private static FieldSchema union(FieldSchema mergedFieldSchema, FieldSchema newFieldSchema) {
- if (!mergedFieldSchema.alias.equals(newFieldSchema.alias) || mergedFieldSchema.type != newFieldSchema.type) {
- throw new IncompatibleSchemaModificationException(
- "Incompatible Pig schema change: " + mergedFieldSchema + " can not accept");
- }
- try {
- return new FieldSchema(
- mergedFieldSchema.alias,
- union(mergedFieldSchema.schema, newFieldSchema.schema),
- mergedFieldSchema.type);
- } catch (FrontendException e) {
- throw new SchemaConversionException(e);
- }
- }
-
- @Override
- public ReadContext init(InitContext initContext) {
- Schema pigSchema = getPigSchema(initContext.getParquetConfiguration());
- RequiredFieldList requiredFields = getRequiredFields(initContext.getParquetConfiguration());
- boolean columnIndexAccess =
- initContext.getParquetConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
-
- if (pigSchema == null) {
- return new ReadContext(initContext.getFileSchema());
- } else {
-
- // project the file schema according to the requested Pig schema
- MessageType parquetRequestedSchema = new PigSchemaConverter(columnIndexAccess)
- .filter(initContext.getFileSchema(), pigSchema, requiredFields);
- return new ReadContext(parquetRequestedSchema);
- }
- }
-
- @Override
- public RecordMaterializer prepareForRead(
- Configuration configuration,
- Map keyValueMetaData,
- MessageType fileSchema,
- ReadContext readContext) {
- return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext);
- }
-
- @Override
- public RecordMaterializer prepareForRead(
- ParquetConfiguration configuration,
- Map keyValueMetaData,
- MessageType fileSchema,
- ReadContext readContext) {
- MessageType requestedSchema = readContext.getRequestedSchema();
- Schema requestedPigSchema = getPigSchema(configuration);
- if (requestedPigSchema == null) {
- throw new ParquetDecodingException("Missing Pig schema: ParquetLoader sets the schema in the job conf");
- }
- boolean elephantBirdCompatible = configuration.getBoolean(PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE, false);
- boolean columnIndexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
- if (elephantBirdCompatible) {
- LOG.info("Numbers will default to 0 instead of NULL; Boolean will be converted to Int");
- }
- return new TupleRecordMaterializer(
- requestedSchema, requestedPigSchema, elephantBirdCompatible, columnIndexAccess);
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
deleted file mode 100644
index e8a220f731..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.conf.HadoopParquetConfiguration;
-import org.apache.parquet.conf.ParquetConfiguration;
-import org.apache.parquet.hadoop.api.WriteSupport;
-import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.impl.util.Utils;
-import org.apache.pig.parser.ParserException;
-
-public class TupleWriteSupport extends WriteSupport {
- private static final TupleFactory TF = TupleFactory.getInstance();
- private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter(false);
-
- public static TupleWriteSupport fromPigSchema(String pigSchemaString) throws ParserException {
- return new TupleWriteSupport(Utils.getSchemaFromString(pigSchemaString));
- }
-
- private RecordConsumer recordConsumer;
- private MessageType rootSchema;
- private Schema rootPigSchema;
-
- /**
- * @param pigSchema the pigSchema
- */
- public TupleWriteSupport(Schema pigSchema) {
- super();
- this.rootSchema = pigSchemaConverter.convert(pigSchema);
- this.rootPigSchema = pigSchema;
- }
-
- @Override
- public String getName() {
- return "pig";
- }
-
- public Schema getPigSchema() {
- return rootPigSchema;
- }
-
- public MessageType getParquetSchema() {
- return rootSchema;
- }
-
- @Override
- public WriteContext init(Configuration configuration) {
- return init(new HadoopParquetConfiguration(configuration));
- }
-
- @Override
- public WriteContext init(ParquetConfiguration configuration) {
- Map extraMetaData = new HashMap();
- new PigMetaData(rootPigSchema).addToMetaData(extraMetaData);
- return new WriteContext(rootSchema, extraMetaData);
- }
-
- @Override
- public void prepareForWrite(RecordConsumer recordConsumer) {
- this.recordConsumer = recordConsumer;
- }
-
- public void write(Tuple t) {
- try {
- recordConsumer.startMessage();
- writeTuple(rootSchema, rootPigSchema, t);
- recordConsumer.endMessage();
- } catch (ExecException | FrontendException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void writeTuple(GroupType schema, Schema pigSchema, Tuple t) throws ExecException, FrontendException {
- List fields = schema.getFields();
- List pigFields = pigSchema.getFields();
- assert fields.size() == pigFields.size();
- for (int i = 0; i < fields.size(); i++) {
- if (t.isNull(i)) {
- continue;
- }
- Type fieldType = fields.get(i);
- recordConsumer.startField(fieldType.getName(), i);
- FieldSchema pigType = pigFields.get(i);
- switch (pigType.type) {
- case DataType.BAG:
- Type bagType = fieldType.asGroupType().getType(0);
- FieldSchema pigBagInnerType = pigType.schema.getField(0);
- DataBag bag = (DataBag) t.get(i);
- recordConsumer.startGroup();
- if (bag.size() > 0) {
- recordConsumer.startField(bagType.getName(), 0);
- for (Tuple tuple : bag) {
- if (bagType.isPrimitive()) {
- writeValue(bagType, pigBagInnerType, tuple, 0);
- } else {
- recordConsumer.startGroup();
- writeTuple(bagType.asGroupType(), pigBagInnerType.schema, tuple);
- recordConsumer.endGroup();
- }
- }
- recordConsumer.endField(bagType.getName(), 0);
- }
- recordConsumer.endGroup();
- break;
- case DataType.MAP:
- Type mapType = fieldType.asGroupType().getType(0);
- FieldSchema pigMapInnerType = pigType.schema.getField(0);
- @SuppressWarnings("unchecked") // I know
- Map map = (Map) t.get(i);
- recordConsumer.startGroup();
- if (!map.isEmpty()) {
- recordConsumer.startField(mapType.getName(), 0);
- Set> entrySet = map.entrySet();
- for (Entry entry : entrySet) {
- recordConsumer.startGroup();
- Schema keyValueSchema = new Schema(Arrays.asList(
- new FieldSchema("key", DataType.CHARARRAY),
- new FieldSchema("value", pigMapInnerType.schema, pigMapInnerType.type)));
- writeTuple(
- mapType.asGroupType(),
- keyValueSchema,
- TF.newTuple(Arrays.asList(entry.getKey(), entry.getValue())));
- recordConsumer.endGroup();
- }
- recordConsumer.endField(mapType.getName(), 0);
- }
- recordConsumer.endGroup();
- break;
- default:
- writeValue(fieldType, pigType, t, i);
- break;
- }
- recordConsumer.endField(fieldType.getName(), i);
- }
- }
-
- private void writeValue(Type type, FieldSchema pigType, Tuple t, int i) {
- try {
- if (type.isPrimitive()) {
- switch (type.asPrimitiveType().getPrimitiveTypeName()) {
- // TODO: use PrimitiveTuple accessors
- case BINARY:
- byte[] bytes;
- if (pigType.type == DataType.BYTEARRAY) {
- bytes = ((DataByteArray) t.get(i)).get();
- } else if (pigType.type == DataType.CHARARRAY) {
- bytes = ((String) t.get(i)).getBytes("UTF-8");
- } else {
- throw new UnsupportedOperationException(
- "can not convert from " + DataType.findTypeName(pigType.type) + " to BINARY ");
- }
- recordConsumer.addBinary(Binary.fromReusedByteArray(bytes));
- break;
- case BOOLEAN:
- recordConsumer.addBoolean((Boolean) t.get(i));
- break;
- case INT32:
- recordConsumer.addInteger(((Number) t.get(i)).intValue());
- break;
- case INT64:
- recordConsumer.addLong(((Number) t.get(i)).longValue());
- break;
- case DOUBLE:
- recordConsumer.addDouble(((Number) t.get(i)).doubleValue());
- break;
- case FLOAT:
- recordConsumer.addFloat(((Number) t.get(i)).floatValue());
- break;
- default:
- throw new UnsupportedOperationException(
- type.asPrimitiveType().getPrimitiveTypeName().name());
- }
- } else {
- assert pigType.type == DataType.TUPLE;
- recordConsumer.startGroup();
- writeTuple(type.asGroupType(), pigType.schema, (Tuple) t.get(i));
- recordConsumer.endGroup();
- }
- } catch (Exception e) {
- throw new ParquetEncodingException(
- "can not write value at " + i + " in tuple " + t + " from type '" + pigType + "' to type '" + type
- + "'",
- e);
- }
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/DecimalUtils.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/DecimalUtils.java
deleted file mode 100644
index dd9d995731..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/DecimalUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.pig.convert;
-
-import static java.lang.Math.pow;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import org.apache.parquet.io.api.Binary;
-
-/*
- * Conversion between Parquet Decimal Type to Java BigDecimal in Pig
- * Code Based on the Apache Spark ParquetRowConverter.scala
- *
- *
- */
-
-public class DecimalUtils {
-
- public static BigDecimal binaryToDecimal(Binary value, int precision, int scale) {
- /*
- * Precision <= 18 checks for the max number of digits for an unscaled long,
- * else treat with big integer conversion
- */
- if (precision <= 18) {
- ByteBuffer buffer = value.toByteBuffer();
- byte[] bytes = buffer.array();
- int start = buffer.arrayOffset() + buffer.position();
- int end = buffer.arrayOffset() + buffer.limit();
- long unscaled = 0L;
- int i = start;
- while (i < end) {
- unscaled = (unscaled << 8 | bytes[i] & 0xff);
- i++;
- }
- int bits = 8 * (end - start);
- long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
- if (unscaledNew <= -pow(10, 18) || unscaledNew >= pow(10, 18)) {
- return new BigDecimal(unscaledNew);
- } else {
- return BigDecimal.valueOf(unscaledNew / pow(10, scale));
- }
- } else {
- return new BigDecimal(new BigInteger(value.getBytes()), scale);
- }
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/MapConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/MapConverter.java
deleted file mode 100644
index c1ebce7c0e..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/MapConverter.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.convert;
-
-import java.util.AbstractMap;
-import java.util.AbstractSet;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.pig.PigSchemaConverter;
-import org.apache.parquet.pig.SchemaConversionException;
-import org.apache.parquet.schema.GroupType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-/**
- * Converts groups into Pig Maps
- */
-final class MapConverter extends GroupConverter {
-
- private final MapKeyValueConverter keyValue;
- private final ParentValueContainer parent;
-
- private Map buffer = new BufferMap();
-
- private Object currentKey;
- private Object currentValue;
-
- MapConverter(
- GroupType parquetSchema,
- FieldSchema pigSchema,
- ParentValueContainer parent,
- boolean numbersDefaultToZero,
- boolean columnIndexAccess)
- throws FrontendException {
- if (parquetSchema.getFieldCount() != 1) {
- throw new IllegalArgumentException("maps have only one field. " + parquetSchema);
- }
- this.parent = parent;
- keyValue = new MapKeyValueConverter(
- parquetSchema.getType(0).asGroupType(),
- pigSchema.schema.getField(0),
- numbersDefaultToZero,
- columnIndexAccess);
- }
-
- @Override
- public Converter getConverter(int fieldIndex) {
- if (fieldIndex != 0) {
- throw new IllegalArgumentException("maps have only one field. can't reach " + fieldIndex);
- }
- return keyValue;
- }
-
- /**
- * runtime methods
- */
- @Override
- public final void start() {
- buffer.clear();
- }
-
- @Override
- public void end() {
- parent.add(new LinkedHashMap(buffer));
- }
-
- /**
- * to contain the values of the Map until we read them all
- */
- private static final class BufferMap extends AbstractMap {
- private List> entries = new ArrayList>();
- private Set> entrySet = new AbstractSet>() {
- @Override
- public Iterator> iterator() {
- return entries.iterator();
- }
-
- @Override
- public int size() {
- return entries.size();
- }
- };
-
- @Override
- public Tuple put(String key, Object value) {
- entries.add(new SimpleImmutableEntry(key, value));
- return null;
- }
-
- @Override
- public void clear() {
- entries.clear();
- }
-
- @Override
- public Set> entrySet() {
- return entrySet;
- }
- }
-
- /**
- * convert Key/Value groups into map entries
- */
- final class MapKeyValueConverter extends GroupConverter {
-
- private final Converter keyConverter;
- private final Converter valueConverter;
-
- MapKeyValueConverter(
- GroupType parquetSchema,
- Schema.FieldSchema pigSchema,
- boolean numbersDefaultToZero,
- boolean columnIndexAccess) {
- if (parquetSchema.getFieldCount() != 2
- || !parquetSchema.getType(0).getName().equals("key")
- || !parquetSchema.getType(1).getName().equals("value")) {
- throw new IllegalArgumentException("schema does not match map key/value " + parquetSchema);
- }
- try {
- keyConverter = TupleConverter.newConverter(
- new PigSchemaConverter()
- .convertField(parquetSchema.getType(0))
- .getField(0),
- parquetSchema.getType(0),
- new ParentValueContainer() {
- void add(Object value) {
- currentKey = value;
- }
- },
- numbersDefaultToZero,
- columnIndexAccess);
- } catch (FrontendException fe) {
- throw new SchemaConversionException("can't convert keytype " + parquetSchema.getType(0), fe);
- }
- valueConverter = TupleConverter.newConverter(
- pigSchema,
- parquetSchema.getType(1),
- new ParentValueContainer() {
- void add(Object value) {
- currentValue = value;
- }
- },
- numbersDefaultToZero,
- columnIndexAccess);
- }
-
- @Override
- public Converter getConverter(int fieldIndex) {
- if (fieldIndex == 0) {
- return keyConverter;
- } else if (fieldIndex == 1) {
- return valueConverter;
- }
- throw new IllegalArgumentException("only the key (0) and value (1) fields expected: " + fieldIndex);
- }
-
- /**
- * runtime methods
- */
- @Override
- public final void start() {
- currentKey = null;
- currentValue = null;
- }
-
- @Override
- public void end() {
- buffer.put(currentKey.toString(), currentValue);
- currentKey = null;
- currentValue = null;
- }
- }
-
- /**
- * convert the key into a string
- */
- final class StringKeyConverter extends PrimitiveConverter {
-
- @Override
- public final void addBinary(Binary value) {
- currentKey = value.toStringUsingUTF8();
- }
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/ParentValueContainer.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/ParentValueContainer.java
deleted file mode 100644
index 631faa9070..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/ParentValueContainer.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.convert;
-
-/**
- * for converters to add their current value to their parent
- */
-public abstract class ParentValueContainer {
-
- /**
- * will add the value to the parent whether it's a map, a bag or a tuple
- *
- * @param value
- */
- abstract void add(Object value);
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
deleted file mode 100644
index 7cb11b56a1..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.convert;
-
-import static java.lang.Math.max;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.pig.TupleConversionException;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-import org.apache.parquet.schema.Type.Repetition;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.NonSpillableDataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-/**
- * converts a group into a tuple
- */
-public class TupleConverter extends GroupConverter {
-
- private static final TupleFactory TF = TupleFactory.getInstance();
-
- private final int schemaSize;
-
- protected Tuple currentTuple;
- private final Converter[] converters;
-
- private final GroupType parquetSchema;
-
- private final boolean elephantBirdCompatible;
-
- public TupleConverter(
- GroupType parquetSchema, Schema pigSchema, boolean elephantBirdCompatible, boolean columnIndexAccess) {
- this.parquetSchema = parquetSchema;
- this.elephantBirdCompatible = elephantBirdCompatible;
- try {
- this.schemaSize =
- max(parquetSchema.getFieldCount(), pigSchema.getFields().size());
- this.converters = new Converter[this.schemaSize];
- for (int i = 0, c = 0; i < schemaSize; i++) {
- FieldSchema field = pigSchema.getField(i);
- if (parquetSchema.containsField(field.alias) || columnIndexAccess) {
- Type type = getType(columnIndexAccess, field.alias, i);
-
- if (type != null) {
- final int index = i;
- converters[c++] = newConverter(
- field,
- type,
- new ParentValueContainer() {
- @Override
- void add(Object value) {
- TupleConverter.this.set(index, value);
- }
- },
- elephantBirdCompatible,
- columnIndexAccess);
- }
- }
- }
- } catch (FrontendException e) {
- throw new ParquetDecodingException(
- "can not initialize pig converter from:\n" + parquetSchema + "\n" + pigSchema, e);
- }
- }
-
- private Type getType(boolean columnIndexAccess, String alias, int index) {
- if (columnIndexAccess) {
- if (index < parquetSchema.getFieldCount()) {
- return parquetSchema.getType(index);
- }
- } else {
- return parquetSchema.getType(parquetSchema.getFieldIndex(alias));
- }
-
- return null;
- }
-
- static Converter newConverter(
- FieldSchema pigField,
- Type type,
- final ParentValueContainer parent,
- boolean elephantBirdCompatible,
- boolean columnIndexAccess) {
- try {
- switch (pigField.type) {
- case DataType.BAG:
- return new BagConverter(
- type.asGroupType(), pigField, parent, elephantBirdCompatible, columnIndexAccess);
- case DataType.MAP:
- return new MapConverter(
- type.asGroupType(), pigField, parent, elephantBirdCompatible, columnIndexAccess);
- case DataType.TUPLE:
- return new TupleConverter(
- type.asGroupType(), pigField.schema, elephantBirdCompatible, columnIndexAccess) {
- @Override
- public void end() {
- super.end();
- parent.add(this.currentTuple);
- }
- };
- case DataType.CHARARRAY:
- // If the orignal type isn't a string, we don't want to use the dictionary because
- // a custom implementation will be needed for each type. Just default to no dictionary.
- return new FieldStringConverter(
- parent,
- type.getLogicalTypeAnnotation()
- instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation);
- case DataType.BYTEARRAY:
- return new FieldByteArrayConverter(parent);
- case DataType.INTEGER:
- return new FieldIntegerConverter(parent);
- case DataType.BOOLEAN:
- if (elephantBirdCompatible) {
- return new FieldIntegerConverter(parent);
- } else {
- return new FieldBooleanConverter(parent);
- }
- case DataType.FLOAT:
- return new FieldFloatConverter(parent);
- case DataType.DOUBLE:
- return new FieldDoubleConverter(parent);
- case DataType.LONG:
- return new FieldLongConverter(parent);
- case DataType.BIGDECIMAL:
- return new FieldBigDecimalConverter(type, parent);
- default:
- throw new TupleConversionException("unsupported pig type: " + pigField);
- }
- } catch (FrontendException | RuntimeException e) {
- throw new TupleConversionException("error while preparing converter for:\n" + pigField + "\n" + type, e);
- }
- }
-
- @Override
- public Converter getConverter(int fieldIndex) {
- return converters[fieldIndex];
- }
-
- private static final Integer I32_ZERO = 0;
- private static final Long I64_ZERO = 0L;
- private static final Float FLOAT_ZERO = 0.0f;
- private static final Double DOUBLE_ZERO = 0.0d;
-
- @Override
- public final void start() {
- currentTuple = TF.newTuple(schemaSize);
- if (elephantBirdCompatible) {
- try {
- int i = 0;
- for (Type field : parquetSchema.getFields()) {
- if (field.isPrimitive() && field.isRepetition(Repetition.OPTIONAL)) {
- PrimitiveType primitiveType = field.asPrimitiveType();
- switch (primitiveType.getPrimitiveTypeName()) {
- case INT32:
- currentTuple.set(i, I32_ZERO);
- break;
- case INT64:
- currentTuple.set(i, I64_ZERO);
- break;
- case FLOAT:
- currentTuple.set(i, FLOAT_ZERO);
- break;
- case DOUBLE:
- currentTuple.set(i, DOUBLE_ZERO);
- break;
- case BOOLEAN:
- currentTuple.set(i, I32_ZERO);
- break;
- }
- }
- ++i;
- }
- } catch (ExecException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- final void set(int fieldIndex, Object value) {
- try {
- currentTuple.set(fieldIndex, value);
- } catch (ExecException e) {
- throw new TupleConversionException(
- "Could not set " + value + " to current tuple " + currentTuple + " at " + fieldIndex, e);
- }
- }
-
- @Override
- public void end() {}
-
- public final Tuple getCurrentTuple() {
- return currentTuple;
- }
-
- /**
- * handle string values.
- * In case of dictionary encoding, the strings will be decoded only once.
- */
- static final class FieldStringConverter extends PrimitiveConverter {
-
- private final ParentValueContainer parent;
-
- private boolean dictionarySupport;
- private String[] dict;
-
- public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) {
- this.parent = parent;
- this.dictionarySupport = dictionarySupport;
- }
-
- @Override
- public final void addBinary(Binary value) {
- parent.add(value.toStringUsingUTF8());
- }
-
- @Override
- public boolean hasDictionarySupport() {
- return dictionarySupport;
- }
-
- @Override
- public void setDictionary(Dictionary dictionary) {
- dict = new String[dictionary.getMaxId() + 1];
- for (int i = 0; i <= dictionary.getMaxId(); i++) {
- dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
- }
- }
-
- @Override
- public void addValueFromDictionary(int dictionaryId) {
- parent.add(dict[dictionaryId]);
- }
-
- @Override
- public void addLong(long value) {
- parent.add(Long.toString(value));
- }
-
- @Override
- public void addInt(int value) {
- parent.add(Integer.toString(value));
- }
-
- @Override
- public void addFloat(float value) {
- parent.add(Float.toString(value));
- }
-
- @Override
- public void addDouble(double value) {
- parent.add(Double.toString(value));
- }
-
- @Override
- public void addBoolean(boolean value) {
- parent.add(Boolean.toString(value));
- }
- }
-
- /**
- * handles DataByteArrays
- */
- static final class FieldByteArrayConverter extends PrimitiveConverter {
-
- private final ParentValueContainer parent;
-
- public FieldByteArrayConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- public final void addBinary(Binary value) {
- parent.add(new DataByteArray(value.getBytes()));
- }
- }
-
- /**
- * Handles doubles
- */
- static final class FieldDoubleConverter extends PrimitiveConverter {
-
- private final ParentValueContainer parent;
-
- public FieldDoubleConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- public final void addDouble(double value) {
- parent.add(value);
- }
-
- @Override
- public void addLong(long value) {
- parent.add((double) value);
- }
-
- @Override
- public void addInt(int value) {
- parent.add((double) value);
- }
-
- @Override
- public void addFloat(float value) {
- parent.add((double) value);
- }
-
- @Override
- public void addBoolean(boolean value) {
- parent.add(value ? 1.0d : 0.0d);
- }
-
- @Override
- public void addBinary(Binary value) {
- parent.add(Double.parseDouble(value.toStringUsingUTF8()));
- }
- }
-
- /**
- * handles floats
- */
- static final class FieldFloatConverter extends PrimitiveConverter {
-
- private final ParentValueContainer parent;
-
- public FieldFloatConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- public final void addFloat(float value) {
- parent.add(value);
- }
-
- @Override
- public void addLong(long value) {
- parent.add((float) value);
- }
-
- @Override
- public void addInt(int value) {
- parent.add((float) value);
- }
-
- @Override
- public void addDouble(double value) {
- parent.add((float) value);
- }
-
- @Override
- public void addBoolean(boolean value) {
- parent.add(value ? 1.0f : 0.0f);
- }
-
- @Override
- public void addBinary(Binary value) {
- parent.add(Float.parseFloat(value.toStringUsingUTF8()));
- }
- }
-
- /**
- * Handles longs
- */
- static final class FieldLongConverter extends PrimitiveConverter {
-
- private final ParentValueContainer parent;
-
- public FieldLongConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- public final void addLong(long value) {
- parent.add(value);
- }
-
- @Override
- public void addInt(int value) {
- parent.add((long) value);
- }
-
- @Override
- public void addFloat(float value) {
- parent.add((long) value);
- }
-
- @Override
- public void addDouble(double value) {
- parent.add((long) value);
- }
-
- @Override
- public void addBoolean(boolean value) {
- parent.add(value ? 1L : 0L);
- }
-
- @Override
- public void addBinary(Binary value) {
- parent.add(Long.parseLong(value.toStringUsingUTF8()));
- }
- }
-
- /**
- * handle integers
- */
- static final class FieldIntegerConverter extends PrimitiveConverter {
-
- private final ParentValueContainer parent;
-
- public FieldIntegerConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- public final void addBoolean(boolean value) {
- parent.add(value ? 1 : 0);
- }
-
- @Override
- public final void addInt(int value) {
- parent.add(value);
- }
-
- @Override
- public void addLong(long value) {
- parent.add((int) value);
- }
-
- @Override
- public void addFloat(float value) {
- parent.add((int) value);
- }
-
- @Override
- public void addDouble(double value) {
- parent.add((int) value);
- }
-
- @Override
- public void addBinary(Binary value) {
- parent.add(Integer.parseInt(value.toStringUsingUTF8()));
- }
- }
-
- /**
- * handle booleans
- */
- static final class FieldBooleanConverter extends PrimitiveConverter {
-
- private final ParentValueContainer parent;
-
- public FieldBooleanConverter(ParentValueContainer parent) {
- this.parent = parent;
- }
-
- @Override
- public final void addBoolean(boolean value) {
- parent.add(value);
- }
-
- @Override
- public final void addInt(int value) {
- parent.add(value != 0);
- }
-
- @Override
- public void addLong(long value) {
- parent.add(value != 0);
- }
-
- @Override
- public void addFloat(float value) {
- parent.add(value != 0);
- }
-
- @Override
- public void addDouble(double value) {
- parent.add(value != 0);
- }
-
- @Override
- public void addBinary(Binary value) {
- parent.add(Boolean.parseBoolean(value.toStringUsingUTF8()));
- }
- }
-
- /**
- * handle decimal type
- */
- static final class FieldBigDecimalConverter extends PrimitiveConverter {
- private final ParentValueContainer parent;
- private final Type primitiveType;
-
- public FieldBigDecimalConverter(Type primitiveType, ParentValueContainer parent) {
- this.parent = parent;
- this.primitiveType = primitiveType;
- }
-
- @Override
- public final void addBinary(Binary value) {
- int precision = primitiveType.asPrimitiveType().getDecimalMetadata().getPrecision();
- int scale = primitiveType.asPrimitiveType().getDecimalMetadata().getScale();
- BigDecimal finaldecimal = DecimalUtils.binaryToDecimal(value, precision, scale);
- parent.add(finaldecimal);
- }
- }
-
- /**
- * Converts groups into bags
- */
- static class BagConverter extends GroupConverter {
-
- private final List buffer = new ArrayList();
- private final Converter child;
- private final ParentValueContainer parent;
-
- BagConverter(
- GroupType parquetSchema,
- FieldSchema pigSchema,
- ParentValueContainer parent,
- boolean numbersDefaultToZero,
- boolean columnIndexAccess)
- throws FrontendException {
- this.parent = parent;
- if (parquetSchema.getFieldCount() != 1) {
- throw new IllegalArgumentException(
- "bags have only one field. " + parquetSchema + " size = " + parquetSchema.getFieldCount());
- }
- Type nestedType = parquetSchema.getType(0);
-
- ParentValueContainer childsParent;
- FieldSchema pigField;
- if (nestedType.isPrimitive()
- || nestedType.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
- || nestedType.getLogicalTypeAnnotation()
- instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
- // Pig bags always contain tuples
- // In that case we need to wrap the value in an extra tuple
- childsParent = new ParentValueContainer() {
- @Override
- void add(Object value) {
- buffer.add(TF.newTuple(value));
- }
- };
- pigField = pigSchema.schema.getField(0).schema.getField(0);
- } else {
- childsParent = new ParentValueContainer() {
- @Override
- void add(Object value) {
- buffer.add((Tuple) value);
- }
- };
- pigField = pigSchema.schema.getField(0);
- }
- child = newConverter(pigField, nestedType, childsParent, numbersDefaultToZero, columnIndexAccess);
- }
-
- @Override
- public Converter getConverter(int fieldIndex) {
- if (fieldIndex != 0) {
- throw new IllegalArgumentException("bags have only one field. can't reach " + fieldIndex);
- }
- return child;
- }
-
- @Override
- public final void start() {
- buffer.clear();
- }
-
- @Override
- public void end() {
- parent.add(new NonSpillableDataBag(new ArrayList(buffer)));
- }
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleRecordMaterializer.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleRecordMaterializer.java
deleted file mode 100644
index b488a720b2..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleRecordMaterializer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.convert;
-
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.GroupType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-public class TupleRecordMaterializer extends RecordMaterializer {
-
- private TupleConverter root;
-
- public TupleRecordMaterializer(GroupType parquetSchema, Schema pigSchema, boolean numbersDefaultToZero) {
- this(parquetSchema, pigSchema, numbersDefaultToZero, false);
- }
-
- public TupleRecordMaterializer(
- GroupType parquetSchema, Schema pigSchema, boolean numbersDefaultToZero, boolean columnIndexAccess) {
- this.root = new TupleConverter(parquetSchema, pigSchema, numbersDefaultToZero, columnIndexAccess);
- }
-
- @Override
- public Tuple getCurrentRecord() {
- return root.getCurrentTuple();
- }
-
- @Override
- public GroupConverter getRootConverter() {
- return root;
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/BagSummaryData.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/BagSummaryData.java
deleted file mode 100644
index 091b87d11e..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/BagSummaryData.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.summary;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-/**
- * summary data for a bag
- */
-public class BagSummaryData extends SummaryData {
-
- private ValueStat size = new ValueStat();
-
- private FieldSummaryData content;
-
- public void add(Schema schema, DataBag bag) {
- super.add(bag);
- size.add(bag.size());
- FieldSchema field = getField(schema, 0);
- if (bag.size() > 0 && content == null) {
- content = new FieldSummaryData();
- content.setName(getName(field));
- }
- for (Tuple tuple : bag) {
- content.add(getSchema(field), tuple);
- }
- }
-
- @Override
- public void merge(SummaryData other) {
- super.merge(other);
- BagSummaryData otherBagSummary = (BagSummaryData) other;
- size.merge(otherBagSummary.size);
- content = merge(content, otherBagSummary.content);
- }
-
- public FieldSummaryData getContent() {
- return content;
- }
-
- public void setContent(FieldSummaryData content) {
- this.content = content;
- }
-
- public ValueStat getSize() {
- return size;
- }
-
- public void setSize(ValueStat size) {
- this.size = size;
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/EnumStat.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/EnumStat.java
deleted file mode 100644
index 8e93106b65..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/EnumStat.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.summary;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-public class EnumStat {
-
- private static final int MAX_COUNT = 1000;
-
- public static class EnumValueCount {
- private String value;
- private int count;
-
- public EnumValueCount() {}
-
- public EnumValueCount(String value) {
- this.value = value;
- }
-
- public void add() {
- ++count;
- }
-
- public String getValue() {
- return value;
- }
-
- public void setValue(String value) {
- this.value = value;
- }
-
- public int getCount() {
- return count;
- }
-
- public void setCount(int count) {
- this.count = count;
- }
-
- public void add(int countToAdd) {
- this.count += countToAdd;
- }
- }
-
- private Map values = new HashMap();
-
- public void add(String value) {
- if (values != null) {
- EnumValueCount enumValueCount = values.get(value);
- if (enumValueCount == null) {
- enumValueCount = new EnumValueCount(value);
- values.put(value, enumValueCount);
- }
- enumValueCount.add();
- checkValues();
- }
- }
-
- public void merge(EnumStat other) {
- if (values != null) {
- if (other.values == null) {
- values = null;
- return;
- }
- for (EnumValueCount otherValue : other.getValues()) {
- EnumValueCount myValue = values.get(otherValue.value);
- if (myValue == null) {
- values.put(otherValue.value, otherValue);
- } else {
- myValue.add(otherValue.count);
- }
- }
- checkValues();
- }
- }
-
- private void checkValues() {
- if (values.size() > MAX_COUNT) {
- values = null;
- }
- }
-
- public Collection getValues() {
- return values == null ? null : values.values();
- }
-
- public void setValues(Collection values) {
- if (values == null) {
- this.values = null;
- } else if (this.values != null) {
- for (EnumValueCount value : values) {
- this.values.put(value.getValue(), value);
- }
- }
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/FieldSummaryData.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/FieldSummaryData.java
deleted file mode 100644
index 43adeb0a7f..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/FieldSummaryData.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.summary;
-
-import java.util.Map;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * summary data for one field of a tuple
- * usually only one the *Summary member if set
- */
-public class FieldSummaryData extends SummaryData {
-
- private String name;
-
- private BagSummaryData bag;
- private TupleSummaryData tuple;
- private MapSummaryData map;
- private StringSummaryData string;
- private NumberSummaryData number;
-
- private long nullCount;
- private long unknown;
- private long error;
-
- @Override
- public void merge(SummaryData other) {
- super.merge(other);
- FieldSummaryData otherFieldSummaryData = (FieldSummaryData) other;
-
- if (otherFieldSummaryData.name != null) {
- setName(otherFieldSummaryData.name);
- }
-
- bag = merge(bag, otherFieldSummaryData.bag);
- tuple = merge(tuple, otherFieldSummaryData.tuple);
- map = merge(map, otherFieldSummaryData.map);
- string = merge(string, otherFieldSummaryData.string);
- number = merge(number, otherFieldSummaryData.number);
-
- nullCount += otherFieldSummaryData.nullCount;
- unknown += otherFieldSummaryData.unknown;
- error += otherFieldSummaryData.error;
- }
-
- public void add(Schema schema, Object o) {
- super.add(o);
- if (o == null) {
- ++nullCount;
- } else if (o instanceof DataBag) {
- if (bag == null) {
- bag = new BagSummaryData();
- }
- bag.add(schema, (DataBag) o);
- } else if (o instanceof Tuple) {
- if (tuple == null) {
- tuple = new TupleSummaryData();
- }
- tuple.addTuple(schema, (Tuple) o);
- } else if (o instanceof Map, ?>) {
- if (map == null) {
- map = new MapSummaryData();
- }
- map.add(schema, (Map, ?>) o);
- } else if (o instanceof String) {
- if (string == null) {
- string = new StringSummaryData();
- }
- string.add((String) o);
- } else if (o instanceof Number) {
- if (number == null) {
- number = new NumberSummaryData();
- }
- number.add((Number) o);
- } else {
- ++unknown;
- }
- }
-
- public void addError() {
- ++error;
- }
-
- public BagSummaryData getBag() {
- return bag;
- }
-
- public void setBag(BagSummaryData bag) {
- this.bag = bag;
- }
-
- public TupleSummaryData getTuple() {
- return tuple;
- }
-
- public void setTuple(TupleSummaryData tuple) {
- this.tuple = tuple;
- }
-
- public MapSummaryData getMap() {
- return map;
- }
-
- public void setMap(MapSummaryData map) {
- this.map = map;
- }
-
- public StringSummaryData getString() {
- return string;
- }
-
- public void setString(StringSummaryData string) {
- this.string = string;
- }
-
- public NumberSummaryData getNumber() {
- return number;
- }
-
- public void setNumber(NumberSummaryData number) {
- this.number = number;
- }
-
- public Long getNull() {
- return nullCount == 0 ? null : nullCount;
- }
-
- public void setNull(long nullCnt) {
- this.nullCount = nullCnt;
- }
-
- public Long getUnknown() {
- return unknown == 0 ? null : unknown;
- }
-
- public void setUnknown(long unknown) {
- this.unknown = unknown;
- }
-
- public Long getError() {
- return error == 0 ? null : error;
- }
-
- public void setError(long error) {
- this.error = error;
- }
-
- public void setName(String name) {
- if (this.name != null && !this.name.equals(name)) {
- throw new IllegalStateException("name mismatch " + this.name + " expected, got " + name);
- }
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/MapSummaryData.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/MapSummaryData.java
deleted file mode 100644
index 8c74075a3c..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/MapSummaryData.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.summary;
-
-import java.util.Map;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-/**
- * Summary data for a Map
- */
-public class MapSummaryData extends SummaryData {
-
- private ValueStat size = new ValueStat();
-
- private FieldSummaryData key;
- private FieldSummaryData value;
-
- public void add(Schema schema, Map, ?> m) {
- super.add(m);
- size.add(m.size());
- FieldSchema field = getField(schema, 0);
- if (!m.isEmpty() && key == null) {
- key = new FieldSummaryData();
- key.setName(getName(field));
- value = new FieldSummaryData();
- value.setName(getName(field));
- }
- for (Map.Entry, ?> entry : m.entrySet()) {
- key.add(null, entry.getKey());
- value.add(getSchema(field), entry.getValue());
- }
- }
-
- @Override
- public void merge(SummaryData other) {
- super.merge(other);
- MapSummaryData otherMapSummaryData = (MapSummaryData) other;
- size.merge(otherMapSummaryData.size);
- key = merge(key, otherMapSummaryData.key);
- value = merge(value, otherMapSummaryData.value);
- }
-
- public FieldSummaryData getKey() {
- return key;
- }
-
- public void setKey(FieldSummaryData key) {
- this.key = key;
- }
-
- public FieldSummaryData getValue() {
- return value;
- }
-
- public void setValue(FieldSummaryData value) {
- this.value = value;
- }
-
- public ValueStat getSize() {
- return size;
- }
-
- public void setSize(ValueStat size) {
- this.size = size;
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/NumberSummaryData.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/NumberSummaryData.java
deleted file mode 100644
index ac4d806515..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/NumberSummaryData.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.summary;
-
-/**
- * Summary data for a Number
- */
-public class NumberSummaryData extends SummaryData {
-
- private ValueStat value = new ValueStat();
-
- public void add(Number n) {
- super.add(n);
- value.add(n.doubleValue());
- }
-
- @Override
- public void merge(SummaryData other) {
- super.merge(other);
- value.merge(((NumberSummaryData) other).value);
- }
-
- public ValueStat getValue() {
- return value;
- }
-
- public void setValue(ValueStat value) {
- this.value = value;
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/StringSummaryData.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/StringSummaryData.java
deleted file mode 100644
index 7a73273e94..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/StringSummaryData.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.summary;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.parquet.pig.summary.EnumStat.EnumValueCount;
-
-/**
- * Summary data for a String
- */
-@JsonInclude(JsonInclude.Include.NON_NULL)
-public class StringSummaryData extends SummaryData {
-
- private ValueStat size = new ValueStat();
- private EnumStat values = new EnumStat();
-
- public void add(String s) {
- super.add(s);
- size.add(s.length());
- values.add(s);
- }
-
- @Override
- public void merge(SummaryData other) {
- super.merge(other);
- StringSummaryData stringSummaryData = (StringSummaryData) other;
- size.merge(stringSummaryData.size);
- values.merge(stringSummaryData.values);
- }
-
- public ValueStat getSize() {
- return size;
- }
-
- public void setSize(ValueStat size) {
- this.size = size;
- }
-
- public Collection getValues() {
- Collection values2 = values.getValues();
- if (values2 == null) {
- return null;
- }
- List list = new ArrayList(values2);
- Collections.sort(list, new Comparator() {
- @Override
- public int compare(EnumValueCount o1, EnumValueCount o2) {
- return o2.getCount() - o1.getCount();
- }
- });
- return list;
- }
-
- public void setValues(Collection values) {
- this.values.setValues(values);
- }
-}
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/Summary.java b/parquet-pig/src/main/java/org/apache/parquet/pig/summary/Summary.java
deleted file mode 100644
index 3592249d51..0000000000
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/summary/Summary.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.pig.summary;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * computes a summary of the input to a json string
- */
-public class Summary extends EvalFunc implements Algebraic {
-
- private static final TupleFactory TF = TupleFactory.getInstance();
-
- public static class Initial extends EvalFunc {
-
- @Override
- public Tuple exec(Tuple t) throws IOException {
- return new JSONTuple(sumUp(getInputSchema(), t));
- }
- }
-
- public static class Intermediate extends EvalFunc {
- @Override
- public Tuple exec(Tuple t) throws IOException {
- return new JSONTuple(merge(t));
- }
- }
-
- public static class Final extends EvalFunc {
- @Override
- public String exec(Tuple t) throws IOException {
- return SummaryData.toPrettyJSON(merge(t));
- }
- }
-
- private static final class JSONTuple implements Tuple {
- private static final long serialVersionUID = 1L;
- private TupleSummaryData data;
-
- public JSONTuple(TupleSummaryData data) {
- this.data = data;
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- Tuple t = TF.newTuple(json());
- t.write(dataOutput);
- }
-
- @Override
- public int compareTo(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void append(Object o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object get(int i) throws ExecException {
- if (i == 0) {
- return json();
- }
- throw new ExecException();
- }
-
- private String json() {
- return SummaryData.toJSON(data);
- }
-
- @Override
- public List