001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *   http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 */
018
019package org.apache.commons.compress.utils;
020
021import java.io.File;
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import java.nio.channels.ClosedChannelException;
025import java.nio.channels.NonWritableChannelException;
026import java.nio.channels.SeekableByteChannel;
027import java.nio.file.Files;
028import java.nio.file.StandardOpenOption;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.List;
033import java.util.Objects;
034
035/**
036 * Read-Only Implementation of {@link SeekableByteChannel} that
037 * concatenates a collection of other {@link SeekableByteChannel}s.
038 *
039 * <p>This is a lose port of <a
040 * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">MultiReadOnlySeekableByteChannel</a>
041 * by Tim Underwood.</p>
042 *
043 * @since 1.19
044 */
045public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
046
047    private final List<SeekableByteChannel> channels;
048    private long globalPosition;
049    private int currentChannelIdx;
050
051    /**
052     * Concatenates the given channels.
053     *
054     * @param channels the channels to concatenate
055     * @throws NullPointerException if channels is null
056     */
057    public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) {
058        this.channels = Collections.unmodifiableList(new ArrayList<>(
059            Objects.requireNonNull(channels, "channels must not be null")));
060    }
061
062    @Override
063    public synchronized int read(final ByteBuffer dst) throws IOException {
064        if (!isOpen()) {
065            throw new ClosedChannelException();
066        }
067        if (!dst.hasRemaining()) {
068            return 0;
069        }
070
071        int totalBytesRead = 0;
072        while (dst.hasRemaining() && currentChannelIdx < channels.size()) {
073            final SeekableByteChannel currentChannel = channels.get(currentChannelIdx);
074            final int newBytesRead = currentChannel.read(dst);
075            if (newBytesRead == -1) {
076                // EOF for this channel -- advance to next channel idx
077                currentChannelIdx += 1;
078                continue;
079            }
080            if (currentChannel.position() >= currentChannel.size()) {
081                // we are at the end of the current channel
082                currentChannelIdx++;
083            }
084            totalBytesRead += newBytesRead;
085        }
086        if (totalBytesRead > 0) {
087            globalPosition += totalBytesRead;
088            return totalBytesRead;
089        }
090        return -1;
091    }
092
093    @Override
094    public void close() throws IOException {
095        IOException first = null;
096        for (final SeekableByteChannel ch : channels) {
097            try {
098                ch.close();
099            } catch (final IOException ex) {
100                if (first == null) {
101                    first = ex;
102                }
103            }
104        }
105        if (first != null) {
106            throw new IOException("failed to close wrapped channel", first);
107        }
108    }
109
110    @Override
111    public boolean isOpen() {
112        for (final SeekableByteChannel ch : channels) {
113            if (!ch.isOpen()) {
114                return false;
115            }
116        }
117        return true;
118    }
119
120    /**
121     * Returns this channel's position.
122     *
123     * <p>This method violates the contract of {@link SeekableByteChannel#position()} as it will not throw any exception
124     * when invoked on a closed channel. Instead it will return the position the channel had when close has been
125     * called.</p>
126     */
127    @Override
128    public long position() {
129        return globalPosition;
130    }
131
132    /**
133     * set the position based on the given channel number and relative offset
134     *
135     * @param channelNumber  the channel number
136     * @param relativeOffset the relative offset in the corresponding channel
137     * @return global position of all channels as if they are a single channel
138     * @throws IOException if positioning fails
139     */
140    public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException {
141        if (!isOpen()) {
142            throw new ClosedChannelException();
143        }
144        long globalPosition = relativeOffset;
145        for (int i = 0; i < channelNumber; i++) {
146            globalPosition += channels.get(i).size();
147        }
148
149        return position(globalPosition);
150    }
151
152    @Override
153    public long size() throws IOException {
154        if (!isOpen()) {
155            throw new ClosedChannelException();
156        }
157        long acc = 0;
158        for (final SeekableByteChannel ch : channels) {
159            acc += ch.size();
160        }
161        return acc;
162    }
163
164    /**
165     * @throws NonWritableChannelException since this implementation is read-only.
166     */
167    @Override
168    public SeekableByteChannel truncate(final long size) {
169        throw new NonWritableChannelException();
170    }
171
172    /**
173     * @throws NonWritableChannelException since this implementation is read-only.
174     */
175    @Override
176    public int write(final ByteBuffer src) {
177        throw new NonWritableChannelException();
178    }
179
180    @Override
181    public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
182        if (newPosition < 0) {
183            throw new IOException("Negative position: " + newPosition);
184        }
185        if (!isOpen()) {
186            throw new ClosedChannelException();
187        }
188
189        globalPosition = newPosition;
190
191        long pos = newPosition;
192
193        for (int i = 0; i < channels.size(); i++) {
194            final SeekableByteChannel currentChannel = channels.get(i);
195            final long size = currentChannel.size();
196
197            final long newChannelPos;
198            if (pos == -1L) {
199                // Position is already set for the correct channel,
200                // the rest of the channels get reset to 0
201                newChannelPos = 0;
202            } else if (pos <= size) {
203                // This channel is where we want to be
204                currentChannelIdx = i;
205                final long tmp = pos;
206                pos = -1L; // Mark pos as already being set
207                newChannelPos = tmp;
208            } else {
209                // newPosition is past this channel.  Set channel
210                // position to the end and substract channel size from
211                // pos
212                pos -= size;
213                newChannelPos = size;
214            }
215
216            currentChannel.position(newChannelPos);
217        }
218        return this;
219    }
220
221    /**
222     * Concatenates the given channels.
223     *
224     * @param channels the channels to concatenate
225     * @throws NullPointerException if channels is null
226     * @return SeekableByteChannel that concatenates all provided channels
227     */
228    public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) {
229        if (Objects.requireNonNull(channels, "channels must not be null").length == 1) {
230            return channels[0];
231        }
232        return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
233    }
234
235    /**
236     * Concatenates the given files.
237     *
238     * @param files the files to concatenate
239     * @throws NullPointerException if files is null
240     * @throws IOException if opening a channel for one of the files fails
241     * @return SeekableByteChannel that concatenates all provided files
242     */
243    public static SeekableByteChannel forFiles(final File... files) throws IOException {
244        final List<SeekableByteChannel> channels = new ArrayList<>();
245        for (final File f : Objects.requireNonNull(files, "files must not be null")) {
246            channels.add(Files.newByteChannel(f.toPath(), StandardOpenOption.READ));
247        }
248        if (channels.size() == 1) {
249            return channels.get(0);
250        }
251        return new MultiReadOnlySeekableByteChannel(channels);
252    }
253
254}