001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.output;
018
019import org.apache.commons.io.input.QueueInputStream;
020
021import java.io.InterruptedIOException;
022import java.io.OutputStream;
023import java.io.PipedInputStream;
024import java.io.PipedOutputStream;
025import java.util.Objects;
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.LinkedBlockingQueue;
028
029/**
030 * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's written in queue
031 * output stream.
032 * <p>
033 * Example usage:
034 * </p>
035 *
036 * <pre>
037 * QueueOutputStream outputStream = new QueueOutputStream();
038 * QueueInputStream inputStream = outputStream.newPipeInputStream();
039 *
040 * outputStream.write("hello world".getBytes(UTF_8));
041 * inputStream.read();
042 * </pre>
043 *
044 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a
045 * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current
046 * thread. Instances can be used longer after initial threads exited.
047 * <p>
048 * Closing a {@code QueueOutputStream} has no effect. The methods in this class can be called after the stream has been
049 * closed without generating an {@code IOException}.
050 * </p>
051 *
052 * @see QueueInputStream
053 * @since 2.9.0
054 */
055public class QueueOutputStream extends OutputStream {
056
057    private final BlockingQueue<Integer> blockingQueue;
058
059    /**
060     * Constructs a new instance with no limit to internal buffer size.
061     */
062    public QueueOutputStream() {
063        this(new LinkedBlockingQueue<>());
064    }
065
066    /**
067     * Constructs a new instance with given buffer.
068     *
069     * @param blockingQueue backing queue for the stream
070     */
071    public QueueOutputStream(final BlockingQueue<Integer> blockingQueue) {
072        this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
073    }
074
075    /**
076     * Creates a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the
077     * input stream.
078     *
079     * @return QueueInputStream connected to this stream
080     */
081    public QueueInputStream newQueueInputStream() {
082        return new QueueInputStream(blockingQueue);
083    }
084
085    /**
086     * Writes a single byte.
087     *
088     * @throws InterruptedIOException if the thread is interrupted while writing to the queue.
089     */
090    @Override
091    public void write(final int b) throws InterruptedIOException {
092        try {
093            blockingQueue.put(0xFF & b);
094        } catch (final InterruptedException e) {
095            Thread.currentThread().interrupt();
096            final InterruptedIOException interruptedIoException = new InterruptedIOException();
097            interruptedIoException.initCause(e);
098            throw interruptedIoException;
099        }
100    }
101}