View Javadoc
1   /*
2    * Copyright (C) 2005-2015 Schlichtherle IT Services.
3    * All rights reserved. Use is subject to license terms.
4    */
5   package net.java.truecommons.io;
6   
7   import javax.annotation.WillClose;
8   import javax.annotation.WillNotClose;
9   import javax.annotation.concurrent.Immutable;
10  import java.io.IOException;
11  import java.io.InputStream;
12  import java.io.OutputStream;
13  import java.lang.ref.Reference;
14  import java.lang.ref.SoftReference;
15  import java.util.Deque;
16  import java.util.Objects;
17  import java.util.Queue;
18  import java.util.concurrent.*;
19  import java.util.concurrent.locks.Condition;
20  import java.util.concurrent.locks.Lock;
21  import java.util.concurrent.locks.ReentrantLock;
22  
23  /**
24   * Static utility methods for {@link InputStream}s and {@link OutputStream}s.
25   *
26   * @author Christian Schlichtherle
27   */
28  @Immutable
29  public final class Streams {
30  
31      /**
32       * The size of the FIFO used for exchanging I/O buffers between a reader
33       * thread and a writer thread.
34       * A minimum of two elements is required.
35       * The actual number is optimized to compensate for oscillating I/O
36       * bandwidths like e.g. with network shares.
37       */
38      static final int FIFO_SIZE = 4;
39  
40      /** The buffer size used for reading and writing, which is {@value}. */
41      public static final int BUFFER_SIZE = 8 * 1024;
42  
43      private static final ExecutorService executor
44              = Executors.newCachedThreadPool(new ReaderThreadFactory());
45  
46      private Streams() { }
47  
48      /**
49       * Copies the data from the given input stream to the given output stream
50       * and <em>always</em> closes <em>both</em> streams - even if an exception
51       * occurs.
52       * <p>
53       * This is a high performance implementation which uses a pooled background
54       * thread to fill a FIFO of pooled buffers which is concurrently flushed by
55       * the current thread.
56       * It performs best when used with <em>unbuffered</em> streams.
57       *
58       * @param in the input stream.
59       * @param out the output stream.
60       */
61      public static void copy(@WillClose InputStream in,
62                              @WillClose OutputStream out)
63      throws IOException {
64          copy(new OneTimeSource(in), new OneTimeSink(out));
65      }
66  
67      /**
68       * Copies the data from the given source to the given sink.
69       * <p>
70       * This is a high performance implementation which uses a pooled background
71       * thread to fill a FIFO of pooled buffers which is concurrently flushed by
72       * the current thread.
73       * It performs best when used with <em>unbuffered</em> streams.
74       *
75       * @param source the source for reading the data from.
76       * @param sink the sink for writing the data to.
77       */
78      @SuppressWarnings("ThrowFromFinallyBlock")
79      public static void copy(final Source source, final Sink sink)
80      throws IOException {
81          try (InputStream in = source.stream();
82               OutputStream out = sink.stream()) {
83              Throwable t1 = null;
84              try {
85                  cat(in, out);
86              } catch (final Throwable t2) {
87                  t1 = t2;
88                  throw t2;
89              } finally {
90                  // Help the resource management in TrueVFS by closing the input
91                  // stream first.
92                  // Note that closing an already closed input stream must have
93                  // no side effect, so this should be safe even though close()
94                  // will get called once again at the end of the enclosing
95                  // try-with-resources statement.
96                  try {
97                      in.close();
98                  } catch (final Throwable t2) {
99                      if (null == t1) throw t2;
100                     t1.addSuppressed(t2);
101                 }
102             }
103         }
104     }
105 
106     /**
107      * Copies the data from the given input stream to the given output stream
108      * <em>without</em> closing them.
109      * This method calls {@link OutputStream#flush()} unless an
110      * {@link IOException} occurs when writing to the output stream.
111      * This hold true even if an {@link IOException} occurs when reading from
112      * the input stream.
113      * <p>
114      * This is a high performance implementation which uses a pooled background
115      * thread to fill a FIFO of pooled buffers which is concurrently flushed by
116      * the current thread.
117      * It performs best when used with <em>unbuffered</em> streams.
118      * <p>
119      * The name of this method is inspired by the Unix command line utility
120      * {@code cat} because you could use it to con<i>cat</i>enate the contents
121      * of multiple streams.
122      *
123      * @param in the input stream.
124      * @param out the output stream.
125      */
126     public static void cat(final @WillNotClose InputStream in,
127                            final @WillNotClose OutputStream out)
128     throws IOException {
129         Objects.requireNonNull(in);
130         Objects.requireNonNull(out);
131 
132         // We will use a FIFO to exchange byte buffers between a pooled reader
133         // thread and the current writer thread.
134         // The pooled reader thread will fill the buffers with data from the
135         // input and the current thread will write the filled buffers to the
136         // output.
137         // The FIFO is simply implemented as a cached array or byte buffers
138         // with an offset and a size which is used like a ring buffer.
139 
140         final Lock lock = new ReentrantLock();
141         final Condition signal = lock.newCondition();
142         final Buffer[] buffers = Buffer.allocate();
143 
144         /*
145          * The task that cycles through the buffers in order to fill them
146          * with input.
147          */
148         final class ReaderTask implements Runnable {
149             /** The index of the next buffer to be written. */
150             int off;
151 
152             /** The number of buffers filled with data to be written. */
153             int size;
154 
155             /** The Throwable that happened in this task, if any. */
156             volatile Throwable exception;
157 
158             @Override
159             public void run() {
160                 final int buffersLength = buffers.length;
161 
162                 // The writer executor interrupts this executor to signal
163                 // that it cannot handle more input because there has been
164                 // an IOException during writing.
165                 // We stop processing in this case.
166                 int read;
167                 do {
168                     // Wait until a buffer is available.
169                     final Buffer buffer;
170                     lock.lock();
171                     try {
172                         while (size >= buffersLength) {
173                             try {
174                                 signal.await();
175                             } catch (InterruptedException cancel) {
176                                 return;
177                             }
178                         }
179                         buffer = buffers[(off + size) % buffersLength];
180                     } finally {
181                         lock.unlock();
182                     }
183 
184                     // Fill buffer until end of file or buffer.
185                     // This should normally complete in one loop cycle, but
186                     // we do not depend on this as it would be a violation
187                     // of InputStream's contract.
188                     try {
189                         final byte[] buf = buffer.buf;
190                         read = in.read(buf, 0, buf.length);
191                     } catch (final Throwable ex) {
192                         exception = ex;
193                         read = -1;
194                     }
195                     buffer.read = read;
196 
197                     // Advance head and signal writer.
198                     lock.lock();
199                     try {
200                         size++;
201                         signal.signal(); // only the writer could be waiting now!
202                     } finally {
203                         lock.unlock();
204                     }
205                 } while (0 <= read);
206             }
207         } // ReaderTask
208 
209         boolean interrupted = false;
210         try {
211             final ReaderTask reader = new ReaderTask();
212             final Future<?> result = executor.submit(reader);
213 
214             // Cache some data for better performance.
215             final int buffersLength = buffers.length;
216 
217             int write;
218             while (true) {
219                 // Wait until a buffer is available.
220                 final int off;
221                 final Buffer buffer;
222                 lock.lock();
223                 try {
224                     while (0 >= reader.size) {
225                         try {
226                             signal.await();
227                         } catch (InterruptedException interrupt) {
228                             interrupted = true;
229                         }
230                     }
231                     off = reader.off;
232                     buffer = buffers[off];
233                 } finally {
234                     lock.unlock();
235                 }
236 
237                 // Stop on last buffer.
238                 write = buffer.read;
239                 if (0 > write)
240                     break; // reader has terminated because of EOF or exception
241 
242                 // Process buffer.
243                 try {
244                     out.write(buffer.buf, 0, write);
245                 } catch (final Throwable ex) {
246                     try {
247                         cancel(result);
248                     } catch (final Throwable ex2) {
249                         ex.addSuppressed(ex2);
250                     }
251                     throw ex;
252                 }
253 
254                 // Advance tail and signal reader.
255                 lock.lock();
256                 try {
257                     reader.off = (off + 1) % buffersLength;
258                     reader.size--;
259                     signal.signal(); // only the reader could be waiting now!
260                 } finally {
261                     lock.unlock();
262                 }
263             }
264             out.flush();
265 
266             final Throwable t = reader.exception;
267             if (null != t) {
268                 if (t instanceof IOException)
269                     throw (IOException) t;
270                 else if (t instanceof RuntimeException)
271                     throw (RuntimeException) t;
272                 throw (Error) t;
273             }
274         } finally {
275             if (interrupted)
276                 Thread.currentThread().interrupt(); // restore
277             Buffer.release(buffers);
278         }
279     }
280 
281     /**
282      * Cancels the reader thread synchronously.
283      * Synchronous cancellation of the reader thread is required so that a
284      * re-entry to the cat(...) method by the same thread cannot concurrently
285      * access the same shared buffers that an unfinished reader thread of a
286      * previous call may still be using.
287      */
288     private static void cancel(final Future<?> result) {
289         result.cancel(true);
290         boolean interrupted = false;
291         try {
292             while (true) {
293                 try {
294                     result.get();
295                     break;
296                 } catch (CancellationException cancelled) {
297                     break;
298                 } catch (ExecutionException cannotHappen) {
299                     throw new AssertionError(cannotHappen);
300                 } catch (InterruptedException interrupt) {
301                     interrupted = true;
302                 }
303             }
304         } finally {
305             if (interrupted)
306                 Thread.currentThread().interrupt(); // restore
307         }
308     }
309 
310     /** A buffer for I/O. */
311     private static final class Buffer {
312         /**
313          * Each entry in this queue holds a soft reference to an array
314          * initialized with instances of this class.
315          * <p>
316          * The best choice would be a {@link ConcurrentLinkedDeque} where I
317          * could call {@link Deque#push(Object)} to achieve many garbage
318          * collector pickups of old {@link SoftReference}s further down the
319          * stack, but this class is only available since JSE 7.
320          * A {@link LinkedBlockingDeque} is supposedly not a good choice
321          * because it uses locks, which I would like to abandon.
322          */
323         static final Queue<Reference<Buffer[]>> queue
324                 = new ConcurrentLinkedQueue<>();
325 
326         static Buffer[] allocate() {
327             {
328                 Reference<Buffer[]> reference;
329                 while (null != (reference = queue.poll())) {
330                     final Buffer[] buffers = reference.get();
331                     if (null != buffers)
332                         return buffers;
333                 }
334             }
335 
336             final Buffer[] buffers = new Buffer[FIFO_SIZE];
337             for (int i = buffers.length; 0 <= --i; )
338                 buffers[i] = new Buffer();
339             return buffers;
340         }
341 
342         static void release(Buffer[] buffers) {
343             //queue.push(new SoftReference<>(buffers));
344             queue.add(new SoftReference<>(buffers));
345         }
346 
347         /** The byte buffer used for reading and writing. */
348         final byte[] buf = new byte[BUFFER_SIZE];
349 
350         /**
351          * The actual number of bytes read into the buffer.
352          * -1 represents end-of-file or {@link IOException}.
353          */
354         int read;
355     } // Buffer
356 
357     /** A factory for reader threads. */
358     private static final class ReaderThreadFactory implements ThreadFactory {
359         @Override
360         public Thread newThread(Runnable r) {
361             return new ReaderThread(r);
362         }
363     } // ReaderThreadFactory
364 
365     /**
366      * A pooled and cached daemon thread which runs tasks to read input streams.
367      * You cannot instantiate this class.
368      */
369     @SuppressWarnings("PublicInnerClass")
370     public static final class ReaderThread extends Thread {
371         private ReaderThread(Runnable r) {
372             super(ThreadGroups.getServerThreadGroup(), r, ReaderThread.class.getName());
373             setDaemon(true);
374         }
375     } // ReaderThread
376 }