001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PushbackInputStream; 024import java.util.Arrays; 025 026import org.apache.commons.compress.compressors.CompressorInputStream; 027import org.apache.commons.compress.utils.BoundedInputStream; 028import org.apache.commons.compress.utils.ByteUtils; 029import org.apache.commons.compress.utils.CountingInputStream; 030import org.apache.commons.compress.utils.IOUtils; 031import org.apache.commons.compress.utils.InputStreamStatistics; 032 033/** 034 * CompressorInputStream for the framing Snappy format. 035 * 036 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p> 037 * 038 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 039 * @since 1.7 040 */ 041public class FramedSnappyCompressorInputStream extends CompressorInputStream 042 implements InputStreamStatistics { 043 044 /** 045 * package private for tests only. 046 */ 047 static final long MASK_OFFSET = 0xa282ead8L; 048 049 private static final int STREAM_IDENTIFIER_TYPE = 0xff; 050 static final int COMPRESSED_CHUNK_TYPE = 0; 051 private static final int UNCOMPRESSED_CHUNK_TYPE = 1; 052 private static final int PADDING_CHUNK_TYPE = 0xfe; 053 private static final int MIN_UNSKIPPABLE_TYPE = 2; 054 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f; 055 private static final int MAX_SKIPPABLE_TYPE = 0xfd; 056 057 // used by FramedSnappyCompressorOutputStream as well 058 static final byte[] SZ_SIGNATURE = new byte[] { //NOSONAR 059 (byte) STREAM_IDENTIFIER_TYPE, // tag 060 6, 0, 0, // length 061 's', 'N', 'a', 'P', 'p', 'Y' 062 }; 063 064 private long unreadBytes; 065 private final CountingInputStream countingStream; 066 067 /** The underlying stream to read compressed data from */ 068 private final PushbackInputStream inputStream; 069 070 /** The dialect to expect */ 071 private final FramedSnappyDialect dialect; 072 073 private SnappyCompressorInputStream currentCompressedChunk; 074 075 // used in no-arg read method 076 private final byte[] oneByte = new byte[1]; 077 078 private boolean endReached, inUncompressedChunk; 079 080 private int uncompressedBytesRemaining; 081 private long expectedChecksum = -1; 082 private final int blockSize; 083 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 084 085 private final ByteUtils.ByteSupplier supplier = this::readOneByte; 086 087 /** 088 * Constructs a new input stream that decompresses 089 * snappy-framed-compressed data from the specified input stream 090 * using the {@link FramedSnappyDialect#STANDARD} dialect. 091 * @param in the InputStream from which to read the compressed data 092 * @throws IOException if reading fails 093 */ 094 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException { 095 this(in, FramedSnappyDialect.STANDARD); 096 } 097 098 /** 099 * Constructs a new input stream that decompresses snappy-framed-compressed data 100 * from the specified input stream. 101 * @param in the InputStream from which to read the compressed data 102 * @param dialect the dialect used by the compressed stream 103 * @throws IOException if reading fails 104 */ 105 public FramedSnappyCompressorInputStream(final InputStream in, 106 final FramedSnappyDialect dialect) 107 throws IOException { 108 this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect); 109 } 110 111 /** 112 * Constructs a new input stream that decompresses snappy-framed-compressed data 113 * from the specified input stream. 114 * @param in the InputStream from which to read the compressed data 115 * @param blockSize the block size to use for the compressed stream 116 * @param dialect the dialect used by the compressed stream 117 * @throws IOException if reading fails 118 * @throws IllegalArgumentException if blockSize is not bigger than 0 119 * @since 1.14 120 */ 121 public FramedSnappyCompressorInputStream(final InputStream in, 122 final int blockSize, 123 final FramedSnappyDialect dialect) 124 throws IOException { 125 if (blockSize <= 0) { 126 throw new IllegalArgumentException("blockSize must be bigger than 0"); 127 } 128 countingStream = new CountingInputStream(in); 129 this.inputStream = new PushbackInputStream(countingStream, 1); 130 this.blockSize = blockSize; 131 this.dialect = dialect; 132 if (dialect.hasStreamIdentifier()) { 133 readStreamIdentifier(); 134 } 135 } 136 137 /** {@inheritDoc} */ 138 @Override 139 public int read() throws IOException { 140 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF; 141 } 142 143 /** {@inheritDoc} */ 144 @Override 145 public void close() throws IOException { 146 try { 147 if (currentCompressedChunk != null) { 148 currentCompressedChunk.close(); 149 currentCompressedChunk = null; 150 } 151 } finally { 152 inputStream.close(); 153 } 154 } 155 156 /** {@inheritDoc} */ 157 @Override 158 public int read(final byte[] b, final int off, final int len) throws IOException { 159 if (len == 0) { 160 return 0; 161 } 162 int read = readOnce(b, off, len); 163 if (read == -1) { 164 readNextBlock(); 165 if (endReached) { 166 return -1; 167 } 168 read = readOnce(b, off, len); 169 } 170 return read; 171 } 172 173 /** {@inheritDoc} */ 174 @Override 175 public int available() throws IOException { 176 if (inUncompressedChunk) { 177 return Math.min(uncompressedBytesRemaining, 178 inputStream.available()); 179 } 180 if (currentCompressedChunk != null) { 181 return currentCompressedChunk.available(); 182 } 183 return 0; 184 } 185 186 /** 187 * @since 1.17 188 */ 189 @Override 190 public long getCompressedCount() { 191 return countingStream.getBytesRead() - unreadBytes; 192 } 193 194 /** 195 * Read from the current chunk into the given array. 196 * 197 * @return -1 if there is no current chunk or the number of bytes 198 * read from the current chunk (which may be -1 if the end of the 199 * chunk is reached). 200 */ 201 private int readOnce(final byte[] b, final int off, final int len) throws IOException { 202 int read = -1; 203 if (inUncompressedChunk) { 204 final int amount = Math.min(uncompressedBytesRemaining, len); 205 if (amount == 0) { 206 return -1; 207 } 208 read = inputStream.read(b, off, amount); 209 if (read != -1) { 210 uncompressedBytesRemaining -= read; 211 count(read); 212 } 213 } else if (currentCompressedChunk != null) { 214 final long before = currentCompressedChunk.getBytesRead(); 215 read = currentCompressedChunk.read(b, off, len); 216 if (read == -1) { 217 currentCompressedChunk.close(); 218 currentCompressedChunk = null; 219 } else { 220 count(currentCompressedChunk.getBytesRead() - before); 221 } 222 } 223 if (read > 0) { 224 checksum.update(b, off, read); 225 } 226 return read; 227 } 228 229 private void readNextBlock() throws IOException { 230 verifyLastChecksumAndReset(); 231 inUncompressedChunk = false; 232 final int type = readOneByte(); 233 if (type == -1) { 234 endReached = true; 235 } else if (type == STREAM_IDENTIFIER_TYPE) { 236 inputStream.unread(type); 237 unreadBytes++; 238 pushedBackBytes(1); 239 readStreamIdentifier(); 240 readNextBlock(); 241 } else if (type == PADDING_CHUNK_TYPE 242 || (type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE)) { 243 skipBlock(); 244 readNextBlock(); 245 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) { 246 throw new IOException("Unskippable chunk with type " + type 247 + " (hex " + Integer.toHexString(type) + ")" 248 + " detected."); 249 } else if (type == UNCOMPRESSED_CHUNK_TYPE) { 250 inUncompressedChunk = true; 251 uncompressedBytesRemaining = readSize() - 4 /* CRC */; 252 if (uncompressedBytesRemaining < 0) { 253 throw new IOException("Found illegal chunk with negative size"); 254 } 255 expectedChecksum = unmask(readCrc()); 256 } else if (type == COMPRESSED_CHUNK_TYPE) { 257 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks(); 258 final long size = readSize() - (expectChecksum ? 4L : 0L); 259 if (size < 0) { 260 throw new IOException("Found illegal chunk with negative size"); 261 } 262 if (expectChecksum) { 263 expectedChecksum = unmask(readCrc()); 264 } else { 265 expectedChecksum = -1; 266 } 267 currentCompressedChunk = 268 new SnappyCompressorInputStream(new BoundedInputStream(inputStream, size), blockSize); 269 // constructor reads uncompressed size 270 count(currentCompressedChunk.getBytesRead()); 271 } else { 272 // impossible as all potential byte values have been covered 273 throw new IOException("Unknown chunk type " + type 274 + " detected."); 275 } 276 } 277 278 private long readCrc() throws IOException { 279 final byte[] b = new byte[4]; 280 final int read = IOUtils.readFully(inputStream, b); 281 count(read); 282 if (read != 4) { 283 throw new IOException("Premature end of stream"); 284 } 285 return ByteUtils.fromLittleEndian(b); 286 } 287 288 static long unmask(long x) { 289 // ugly, maybe we should just have used ints and deal with the 290 // overflow 291 x -= MASK_OFFSET; 292 x &= 0xffffFFFFL; 293 return ((x >> 17) | (x << 15)) & 0xffffFFFFL; 294 } 295 296 private int readSize() throws IOException { 297 return (int) ByteUtils.fromLittleEndian(supplier, 3); 298 } 299 300 private void skipBlock() throws IOException { 301 final int size = readSize(); 302 if (size < 0) { 303 throw new IOException("Found illegal chunk with negative size"); 304 } 305 final long read = IOUtils.skip(inputStream, size); 306 count(read); 307 if (read != size) { 308 throw new IOException("Premature end of stream"); 309 } 310 } 311 312 private void readStreamIdentifier() throws IOException { 313 final byte[] b = new byte[10]; 314 final int read = IOUtils.readFully(inputStream, b); 315 count(read); 316 if (10 != read || !matches(b, 10)) { 317 throw new IOException("Not a framed Snappy stream"); 318 } 319 } 320 321 private int readOneByte() throws IOException { 322 final int b = inputStream.read(); 323 if (b != -1) { 324 count(1); 325 return b & 0xFF; 326 } 327 return -1; 328 } 329 330 private void verifyLastChecksumAndReset() throws IOException { 331 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) { 332 throw new IOException("Checksum verification failed"); 333 } 334 expectedChecksum = -1; 335 checksum.reset(); 336 } 337 338 /** 339 * Checks if the signature matches what is expected for a .sz file. 340 * 341 * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p> 342 * 343 * @param signature the bytes to check 344 * @param length the number of bytes to check 345 * @return true if this is a .sz stream, false otherwise 346 */ 347 public static boolean matches(final byte[] signature, final int length) { 348 349 if (length < SZ_SIGNATURE.length) { 350 return false; 351 } 352 353 byte[] shortenedSig = signature; 354 if (signature.length > SZ_SIGNATURE.length) { 355 shortenedSig = new byte[SZ_SIGNATURE.length]; 356 System.arraycopy(signature, 0, shortenedSig, 0, SZ_SIGNATURE.length); 357 } 358 359 return Arrays.equals(shortenedSig, SZ_SIGNATURE); 360 } 361 362}