001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 */ 018 019package org.apache.commons.compress.utils; 020 021import java.io.File; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import java.nio.channels.ClosedChannelException; 025import java.nio.channels.NonWritableChannelException; 026import java.nio.channels.SeekableByteChannel; 027import java.nio.file.Files; 028import java.nio.file.StandardOpenOption; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.List; 033import java.util.Objects; 034 035/** 036 * Read-Only Implementation of {@link SeekableByteChannel} that 037 * concatenates a collection of other {@link SeekableByteChannel}s. 038 * 039 * <p>This is a lose port of <a 040 * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">MultiReadOnlySeekableByteChannel</a> 041 * by Tim Underwood.</p> 042 * 043 * @since 1.19 044 */ 045public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel { 046 047 private final List<SeekableByteChannel> channels; 048 private long globalPosition; 049 private int currentChannelIdx; 050 051 /** 052 * Concatenates the given channels. 053 * 054 * @param channels the channels to concatenate 055 * @throws NullPointerException if channels is null 056 */ 057 public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) { 058 this.channels = Collections.unmodifiableList(new ArrayList<>( 059 Objects.requireNonNull(channels, "channels must not be null"))); 060 } 061 062 @Override 063 public synchronized int read(final ByteBuffer dst) throws IOException { 064 if (!isOpen()) { 065 throw new ClosedChannelException(); 066 } 067 if (!dst.hasRemaining()) { 068 return 0; 069 } 070 071 int totalBytesRead = 0; 072 while (dst.hasRemaining() && currentChannelIdx < channels.size()) { 073 final SeekableByteChannel currentChannel = channels.get(currentChannelIdx); 074 final int newBytesRead = currentChannel.read(dst); 075 if (newBytesRead == -1) { 076 // EOF for this channel -- advance to next channel idx 077 currentChannelIdx += 1; 078 continue; 079 } 080 if (currentChannel.position() >= currentChannel.size()) { 081 // we are at the end of the current channel 082 currentChannelIdx++; 083 } 084 totalBytesRead += newBytesRead; 085 } 086 if (totalBytesRead > 0) { 087 globalPosition += totalBytesRead; 088 return totalBytesRead; 089 } 090 return -1; 091 } 092 093 @Override 094 public void close() throws IOException { 095 IOException first = null; 096 for (final SeekableByteChannel ch : channels) { 097 try { 098 ch.close(); 099 } catch (final IOException ex) { 100 if (first == null) { 101 first = ex; 102 } 103 } 104 } 105 if (first != null) { 106 throw new IOException("failed to close wrapped channel", first); 107 } 108 } 109 110 @Override 111 public boolean isOpen() { 112 for (final SeekableByteChannel ch : channels) { 113 if (!ch.isOpen()) { 114 return false; 115 } 116 } 117 return true; 118 } 119 120 /** 121 * Returns this channel's position. 122 * 123 * <p>This method violates the contract of {@link SeekableByteChannel#position()} as it will not throw any exception 124 * when invoked on a closed channel. Instead it will return the position the channel had when close has been 125 * called.</p> 126 */ 127 @Override 128 public long position() { 129 return globalPosition; 130 } 131 132 /** 133 * set the position based on the given channel number and relative offset 134 * 135 * @param channelNumber the channel number 136 * @param relativeOffset the relative offset in the corresponding channel 137 * @return global position of all channels as if they are a single channel 138 * @throws IOException if positioning fails 139 */ 140 public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException { 141 if (!isOpen()) { 142 throw new ClosedChannelException(); 143 } 144 long globalPosition = relativeOffset; 145 for (int i = 0; i < channelNumber; i++) { 146 globalPosition += channels.get(i).size(); 147 } 148 149 return position(globalPosition); 150 } 151 152 @Override 153 public long size() throws IOException { 154 if (!isOpen()) { 155 throw new ClosedChannelException(); 156 } 157 long acc = 0; 158 for (final SeekableByteChannel ch : channels) { 159 acc += ch.size(); 160 } 161 return acc; 162 } 163 164 /** 165 * @throws NonWritableChannelException since this implementation is read-only. 166 */ 167 @Override 168 public SeekableByteChannel truncate(final long size) { 169 throw new NonWritableChannelException(); 170 } 171 172 /** 173 * @throws NonWritableChannelException since this implementation is read-only. 174 */ 175 @Override 176 public int write(final ByteBuffer src) { 177 throw new NonWritableChannelException(); 178 } 179 180 @Override 181 public synchronized SeekableByteChannel position(final long newPosition) throws IOException { 182 if (newPosition < 0) { 183 throw new IOException("Negative position: " + newPosition); 184 } 185 if (!isOpen()) { 186 throw new ClosedChannelException(); 187 } 188 189 globalPosition = newPosition; 190 191 long pos = newPosition; 192 193 for (int i = 0; i < channels.size(); i++) { 194 final SeekableByteChannel currentChannel = channels.get(i); 195 final long size = currentChannel.size(); 196 197 final long newChannelPos; 198 if (pos == -1L) { 199 // Position is already set for the correct channel, 200 // the rest of the channels get reset to 0 201 newChannelPos = 0; 202 } else if (pos <= size) { 203 // This channel is where we want to be 204 currentChannelIdx = i; 205 final long tmp = pos; 206 pos = -1L; // Mark pos as already being set 207 newChannelPos = tmp; 208 } else { 209 // newPosition is past this channel. Set channel 210 // position to the end and substract channel size from 211 // pos 212 pos -= size; 213 newChannelPos = size; 214 } 215 216 currentChannel.position(newChannelPos); 217 } 218 return this; 219 } 220 221 /** 222 * Concatenates the given channels. 223 * 224 * @param channels the channels to concatenate 225 * @throws NullPointerException if channels is null 226 * @return SeekableByteChannel that concatenates all provided channels 227 */ 228 public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) { 229 if (Objects.requireNonNull(channels, "channels must not be null").length == 1) { 230 return channels[0]; 231 } 232 return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels)); 233 } 234 235 /** 236 * Concatenates the given files. 237 * 238 * @param files the files to concatenate 239 * @throws NullPointerException if files is null 240 * @throws IOException if opening a channel for one of the files fails 241 * @return SeekableByteChannel that concatenates all provided files 242 */ 243 public static SeekableByteChannel forFiles(final File... files) throws IOException { 244 final List<SeekableByteChannel> channels = new ArrayList<>(); 245 for (final File f : Objects.requireNonNull(files, "files must not be null")) { 246 channels.add(Files.newByteChannel(f.toPath(), StandardOpenOption.READ)); 247 } 248 if (channels.size() == 1) { 249 return channels.get(0); 250 } 251 return new MultiReadOnlySeekableByteChannel(channels); 252 } 253 254}