Previously we wrote Arrow Data to a Stream. Now we shall read those data from a stream.
Just like on the previous blog the we shall implement the Closeable interface. This is needed to close the RootAllocator and free-up memory.
We shall pass a ReadableByteChannel and thus get the stream into read objects.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | package com.gkatzioura.arrow;import java.io.Closeable;import java.io.IOException;import java.nio.channels.ReadableByteChannel;import java.util.ArrayList;import java.util.List;import org.apache.arrow.memory.RootAllocator;import org.apache.arrow.vector.IntVector;import org.apache.arrow.vector.VarCharVector;import org.apache.arrow.vector.ipc.ArrowStreamReader;public class DefaultEntriesReader implements Closeable { private final RootAllocator rootAllocator; public DefaultEntriesReader() { rootAllocator = new RootAllocator(Integer.MAX_VALUE); } public List<DefaultArrowEntry> readBytes(ReadableByteChannel readableByteChannel) throws IOException { List<DefaultArrowEntry> defaultArrowEntries = new ArrayList<>(); try(ArrowStreamReader arrowStreamReader = new ArrowStreamReader(readableByteChannel, rootAllocator)) { var root = arrowStreamReader.getVectorSchemaRoot(); var childVector1 = (VarCharVector)root.getVector(0); var childVector2 = (IntVector)root.getVector(1); while (arrowStreamReader.loadNextBatch()) { int batchSize = root.getRowCount(); for (int i = 0; i < batchSize; i++) { var strData = new String(childVector1.get(i)); var intData = childVector2.get(i); DefaultArrowEntry defaultArrowEntry = DefaultArrowEntry.builder().col1(strData).col2(intData).build(); defaultArrowEntries.add(defaultArrowEntry); } } return defaultArrowEntries; } } @Override public void close() throws IOException { rootAllocator.close(); }} |
Letβs wrap it up with a write and a Read
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package com.gkatzioura.arrow;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.nio.channels.Channels;import java.util.stream.Collectors;import java.util.stream.IntStream;public class ArrowMain { public static void main(String[] args) throws IOException { var originalEntries = IntStream.rangeClosed(0, 11) .boxed() .map(i -> new DefaultArrowEntry("data-"+i, i)).collect(Collectors.toList()); var outputStream = new ByteArrayOutputStream(); try(var arrowWriter = new DefaultEntriesWriter()) { arrowWriter.write(originalEntries, 10, Channels.newChannel(outputStream)); } byte[] introBytes = outputStream.toByteArray(); var inputStream = new ByteArrayInputStream(introBytes); try(var arrowReader = new DefaultEntriesReader()) { var entries =arrowReader.readBytes(Channels.newChannel(inputStream)); for (DefaultArrowEntry entry : entries) { System.out.println("Read "+entry.getCol1()+" "+entry.getCol2()); } } }} |
Thatβs it. To summarise we created Arrow Schemas, we wrote data to a Stream and we read data from a Stream!
Published on Java Code Geeks with permission by Emmanouil Gkatziouras, partner at our JCG program. See the original article here: Apache Arrow on the JVM: Streaming Reads Opinions expressed by Java Code Geeks contributors are their own. |
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy
Thank you!
We will contact you soon.
π Photo of Emmanouil Gkatziouras
Emmanouil GkatziourasJune 1st, 2021Last Updated: May 31st, 2021
Emmanouil GkatziourasJune 1st, 2021Last Updated: May 31st, 2021
0 473 2 minutes read

This site uses Akismet to reduce spam. Learn how your comment data is processed.