001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import org.apache.commons.io.output.QueueOutputStream;
022
023import java.io.InputStream;
024import java.io.PipedInputStream;
025import java.io.PipedOutputStream;
026import java.util.Objects;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.LinkedBlockingQueue;
029
030/**
031 * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue
032 * output stream.
033 *
034 * <p>
035 * Example usage:
036 * </p>
037 * <pre>
038 * QueueInputStream inputStream = new QueueInputStream();
039 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
040 *
041 * outputStream.write("hello world".getBytes(UTF_8));
042 * inputStream.read();
043 * </pre>
044 * <p>
045 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a
046 * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current
047 * thread. Instances can be used longer after initial threads exited.
048 * </p>
049 * <p>
050 * Closing a {@code QueueInputStream} has no effect. The methods in this class can be called after the stream has been
051 * closed without generating an {@code IOException}.
052 * </p>
053 *
054 * @see QueueOutputStream
055 * @since 2.9.0
056 */
057public class QueueInputStream extends InputStream {
058
059    private final BlockingQueue<Integer> blockingQueue;
060
061    /**
062     * Constructs a new instance with no limit to its internal buffer size.
063     */
064    public QueueInputStream() {
065        this(new LinkedBlockingQueue<>());
066    }
067
068    /**
069     * Constructs a new instance with given buffer
070     *
071     * @param blockingQueue backing queue for the stream
072     */
073    public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
074        this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
075    }
076
077    /**
078     * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this
079     * input stream.
080     *
081     * @return QueueOutputStream connected to this stream
082     */
083    public QueueOutputStream newQueueOutputStream() {
084        return new QueueOutputStream(blockingQueue);
085    }
086
087    /**
088     * Reads and returns a single byte.
089     *
090     * @return either the byte read or {@code -1} if the end of the stream has been reached
091     */
092    @Override
093    public int read() {
094        final Integer value = blockingQueue.poll();
095        return value == null ? EOF : ((0xFF) & value);
096    }
097
098}