1
2
3
4
5 package net.java.truecommons.io;
6
7 import edu.umd.cs.findbugs.annotations.CleanupObligation;
8 import edu.umd.cs.findbugs.annotations.DischargesObligation;
9 import java.io.IOException;
10 import java.io.InputStream;
11 import java.nio.ByteBuffer;
12 import java.nio.channels.SeekableByteChannel;
13 import java.util.Objects;
14 import javax.annotation.Nullable;
15 import javax.annotation.WillCloseWhenClosed;
16 import javax.annotation.concurrent.NotThreadSafe;
17
18
19
20
21
22
23
24
25
26 @NotThreadSafe
27 @CleanupObligation
28 public class ChannelInputStream extends InputStream {
29
30 private final ByteBuffer single = ByteBuffer.allocate(1);
31
32
33 protected @Nullable SeekableByteChannel channel;
34
35
36
37
38
39 private long mark = -1;
40
41 protected ChannelInputStream() { }
42
43 public ChannelInputStream(
44 final @WillCloseWhenClosed SeekableByteChannel channel) {
45 this.channel = Objects.requireNonNull(channel);
46 }
47
48 @Override
49 public int read() throws IOException {
50 single.rewind();
51 return 1 == read(single) ? single.get(0) & 0xff : -1;
52 }
53
54 @Override
55 public final int read(byte[] b) throws IOException {
56 return read(ByteBuffer.wrap(b));
57 }
58
59 @Override
60 public int read(byte[] b, int off, int len) throws IOException {
61 return read(ByteBuffer.wrap(b, off, len));
62 }
63
64 @SuppressWarnings("SleepWhileInLoop")
65 private int read(ByteBuffer bb) throws IOException {
66 if (0 == bb.remaining()) return 0;
67 int read;
68 while (0 == (read = channel.read(bb))) {
69 try {
70 Thread.sleep(50);
71 } catch (final InterruptedException ex) {
72 Thread.currentThread().interrupt();
73 }
74 }
75 return read;
76 }
77
78 @Override
79 public long skip(long n) throws IOException {
80 if (n <= 0) return 0;
81 final long pos = channel.position();
82 final long size = channel.size();
83 final long rem = size - pos;
84 if (n > rem) n = (int) rem;
85 channel.position(pos + n);
86 return n;
87 }
88
89 @Override
90 public int available() throws IOException {
91 final long avl = channel.size() - channel.position();
92 return avl > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) avl;
93 }
94
95 @Override
96 @DischargesObligation
97 public void close() throws IOException { channel.close(); }
98
99 @Override
100 public void mark(final int readlimit) {
101 try {
102 mark = channel.position();
103 } catch (final IOException ex) {
104 mark = -2;
105 }
106 }
107
108 @Override
109 public void reset() throws IOException {
110 if (0 > mark)
111 throw new IOException(-1 == mark
112 ? "No mark set!"
113 : "mark()/reset() not supported!");
114 channel.position(mark);
115 mark = -1;
116 }
117
118 @Override
119 public boolean markSupported() {
120 try {
121 channel.position(channel.position());
122 return true;
123 } catch (final IOException ex) {
124 mark = -2;
125 return false;
126 }
127 }
128 }