1
2
3
4
5 package net.java.truecommons.io;
6
7 import javax.annotation.WillClose;
8 import javax.annotation.WillNotClose;
9 import javax.annotation.concurrent.Immutable;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.io.OutputStream;
13 import java.lang.ref.Reference;
14 import java.lang.ref.SoftReference;
15 import java.util.Deque;
16 import java.util.Objects;
17 import java.util.Queue;
18 import java.util.concurrent.*;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22
23
24
25
26
27
28 @Immutable
29 public final class Streams {
30
31
32
33
34
35
36
37
38 static final int FIFO_SIZE = 4;
39
40
41 public static final int BUFFER_SIZE = 8 * 1024;
42
43 private static final ExecutorService executor
44 = Executors.newCachedThreadPool(new ReaderThreadFactory());
45
46 private Streams() { }
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public static void copy(@WillClose InputStream in,
62 @WillClose OutputStream out)
63 throws IOException {
64 copy(new OneTimeSource(in), new OneTimeSink(out));
65 }
66
67
68
69
70
71
72
73
74
75
76
77
78 @SuppressWarnings("ThrowFromFinallyBlock")
79 public static void copy(final Source source, final Sink sink)
80 throws IOException {
81 try (InputStream in = source.stream();
82 OutputStream out = sink.stream()) {
83 Throwable t1 = null;
84 try {
85 cat(in, out);
86 } catch (final Throwable t2) {
87 t1 = t2;
88 throw t2;
89 } finally {
90
91
92
93
94
95
96 try {
97 in.close();
98 } catch (final Throwable t2) {
99 if (null == t1) throw t2;
100 t1.addSuppressed(t2);
101 }
102 }
103 }
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 public static void cat(final @WillNotClose InputStream in,
127 final @WillNotClose OutputStream out)
128 throws IOException {
129 Objects.requireNonNull(in);
130 Objects.requireNonNull(out);
131
132
133
134
135
136
137
138
139
140 final Lock lock = new ReentrantLock();
141 final Condition signal = lock.newCondition();
142 final Buffer[] buffers = Buffer.allocate();
143
144
145
146
147
148 final class ReaderTask implements Runnable {
149
150 int off;
151
152
153 int size;
154
155
156 volatile Throwable exception;
157
158 @Override
159 public void run() {
160 final int buffersLength = buffers.length;
161
162
163
164
165
166 int read;
167 do {
168
169 final Buffer buffer;
170 lock.lock();
171 try {
172 while (size >= buffersLength) {
173 try {
174 signal.await();
175 } catch (InterruptedException cancel) {
176 return;
177 }
178 }
179 buffer = buffers[(off + size) % buffersLength];
180 } finally {
181 lock.unlock();
182 }
183
184
185
186
187
188 try {
189 final byte[] buf = buffer.buf;
190 read = in.read(buf, 0, buf.length);
191 } catch (final Throwable ex) {
192 exception = ex;
193 read = -1;
194 }
195 buffer.read = read;
196
197
198 lock.lock();
199 try {
200 size++;
201 signal.signal();
202 } finally {
203 lock.unlock();
204 }
205 } while (0 <= read);
206 }
207 }
208
209 boolean interrupted = false;
210 try {
211 final ReaderTask reader = new ReaderTask();
212 final Future<?> result = executor.submit(reader);
213
214
215 final int buffersLength = buffers.length;
216
217 int write;
218 while (true) {
219
220 final int off;
221 final Buffer buffer;
222 lock.lock();
223 try {
224 while (0 >= reader.size) {
225 try {
226 signal.await();
227 } catch (InterruptedException interrupt) {
228 interrupted = true;
229 }
230 }
231 off = reader.off;
232 buffer = buffers[off];
233 } finally {
234 lock.unlock();
235 }
236
237
238 write = buffer.read;
239 if (0 > write)
240 break;
241
242
243 try {
244 out.write(buffer.buf, 0, write);
245 } catch (final Throwable ex) {
246 try {
247 cancel(result);
248 } catch (final Throwable ex2) {
249 ex.addSuppressed(ex2);
250 }
251 throw ex;
252 }
253
254
255 lock.lock();
256 try {
257 reader.off = (off + 1) % buffersLength;
258 reader.size--;
259 signal.signal();
260 } finally {
261 lock.unlock();
262 }
263 }
264 out.flush();
265
266 final Throwable t = reader.exception;
267 if (null != t) {
268 if (t instanceof IOException)
269 throw (IOException) t;
270 else if (t instanceof RuntimeException)
271 throw (RuntimeException) t;
272 throw (Error) t;
273 }
274 } finally {
275 if (interrupted)
276 Thread.currentThread().interrupt();
277 Buffer.release(buffers);
278 }
279 }
280
281
282
283
284
285
286
287
288 private static void cancel(final Future<?> result) {
289 result.cancel(true);
290 boolean interrupted = false;
291 try {
292 while (true) {
293 try {
294 result.get();
295 break;
296 } catch (CancellationException cancelled) {
297 break;
298 } catch (ExecutionException cannotHappen) {
299 throw new AssertionError(cannotHappen);
300 } catch (InterruptedException interrupt) {
301 interrupted = true;
302 }
303 }
304 } finally {
305 if (interrupted)
306 Thread.currentThread().interrupt();
307 }
308 }
309
310
311 private static final class Buffer {
312
313
314
315
316
317
318
319
320
321
322
323 static final Queue<Reference<Buffer[]>> queue
324 = new ConcurrentLinkedQueue<>();
325
326 static Buffer[] allocate() {
327 {
328 Reference<Buffer[]> reference;
329 while (null != (reference = queue.poll())) {
330 final Buffer[] buffers = reference.get();
331 if (null != buffers)
332 return buffers;
333 }
334 }
335
336 final Buffer[] buffers = new Buffer[FIFO_SIZE];
337 for (int i = buffers.length; 0 <= --i; )
338 buffers[i] = new Buffer();
339 return buffers;
340 }
341
342 static void release(Buffer[] buffers) {
343
344 queue.add(new SoftReference<>(buffers));
345 }
346
347
348 final byte[] buf = new byte[BUFFER_SIZE];
349
350
351
352
353
354 int read;
355 }
356
357
358 private static final class ReaderThreadFactory implements ThreadFactory {
359 @Override
360 public Thread newThread(Runnable r) {
361 return new ReaderThread(r);
362 }
363 }
364
365
366
367
368
369 @SuppressWarnings("PublicInnerClass")
370 public static final class ReaderThread extends Thread {
371 private ReaderThread(Runnable r) {
372 super(ThreadGroups.getServerThreadGroup(), r, ReaderThread.class.getName());
373 setDaemon(true);
374 }
375 }
376 }