001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.util.ArrayList;
022import java.util.List;
023
024import org.apache.commons.io.IOUtils;
025
026
027/**
028 * The {@link ObservableInputStream} allows, that an InputStream may be consumed
029 * by other receivers, apart from the thread, which is reading it.
030 * The other consumers are implemented as instances of {@link Observer}. A
031 * typical application may be the generation of a {@link java.security.MessageDigest} on the
032 * fly.
033 * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,
034 * as instances of InputStream usually aren't.
035 * If you must access the stream from multiple threads, then synchronization, locking,
036 * or a similar means must be used.
037 * @see MessageDigestCalculatingInputStream
038 */
039public class ObservableInputStream extends ProxyInputStream {
040
041    /**
042     * Abstracts observer callback for {@code ObservableInputStream}s.
043     */
044    public static abstract class Observer {
045
046        /** 
047         * Called to indicate, that {@link InputStream#read()} has been invoked
048         * on the {@link ObservableInputStream}, and will return a value.
049         * @param pByte The value, which is being returned. This will never be -1 (EOF),
050         *    because, in that case, {@link #finished()} will be invoked instead.
051         * @throws IOException if an i/o-error occurs
052         */
053        public void data(final int pByte) throws IOException {
054            // noop
055        }
056
057        /** 
058         * Called to indicate that {@link InputStream#read(byte[])}, or
059         * {@link InputStream#read(byte[], int, int)} have been called, and are about to
060         * invoke data.
061         * @param pBuffer The byte array, which has been passed to the read call, and where
062         *   data has been stored.
063         * @param pOffset The offset within the byte array, where data has been stored.
064         * @param pLength The number of bytes, which have been stored in the byte array.
065         * @throws IOException if an i/o-error occurs
066         */
067        public void data(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
068            // noop
069        }
070
071        /** 
072         * Called to indicate that EOF has been seen on the underlying stream.
073         * This method may be called multiple times, if the reader keeps invoking
074         * either of the read methods, and they will consequently keep returning
075         * EOF.
076         * @throws IOException if an i/o-error occurs
077         */
078        public void finished() throws IOException {
079            // noop
080        }
081
082        /** 
083         * Called to indicate that the {@link ObservableInputStream} has been closed.
084         * @throws IOException if an i/o-error occurs
085         */
086        public void closed() throws IOException {
087            // noop
088        }
089
090        /**
091         * Called to indicate that an error occurred on the underlying stream.
092         * @param pException the exception to throw
093         * @throws IOException if an i/o-error occurs
094         */
095        public void error(final IOException pException) throws IOException { throw pException; }
096    }
097
098    private final List<Observer> observers = new ArrayList<>();
099
100    /**
101     * Creates a new ObservableInputStream for the given InputStream.
102     * @param pProxy the input stream to proxy
103     */
104    public ObservableInputStream(final InputStream pProxy) {
105        super(pProxy);
106    }
107
108    /**
109     * Adds an Observer.
110     * @param pObserver the observer to add
111     */
112    public void add(final Observer pObserver) {
113        observers.add(pObserver);
114    }
115
116    /**
117     * Removes an Observer.
118     * @param pObserver the observer to remove
119     */
120    public void remove(final Observer pObserver) {
121        observers.remove(pObserver);
122    }
123
124    /**
125     * Removes all Observers.
126     */
127    public void removeAllObservers() {
128        observers.clear();
129    }
130
131    @Override
132    public int read() throws IOException {
133        int result = 0;
134        IOException ioe = null;
135        try {
136            result = super.read();
137        } catch (final IOException pException) {
138            ioe = pException;
139        }
140        if (ioe != null) {
141            noteError(ioe);
142        } else if (result == -1) {
143            noteFinished();
144        } else {
145            noteDataByte(result);
146        }
147        return result;
148    }
149
150    @Override
151    public int read(final byte[] pBuffer) throws IOException {
152        int result = 0;
153        IOException ioe = null;
154        try {
155            result = super.read(pBuffer);
156        } catch (final IOException pException) {
157            ioe = pException;
158        }
159        if (ioe != null) {
160            noteError(ioe);
161        } else if (result == -1) {
162            noteFinished();
163        } else if (result > 0) {
164            noteDataBytes(pBuffer, 0, result);
165        }
166        return result;
167    }
168
169    @Override
170    public int read(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
171        int result = 0;
172        IOException ioe = null;
173        try {
174            result = super.read(pBuffer, pOffset, pLength);
175        } catch (final IOException pException) {
176            ioe = pException;
177        }
178        if (ioe != null) {
179            noteError(ioe);
180        } else if (result == -1) {
181            noteFinished();
182        } else if (result > 0) {
183            noteDataBytes(pBuffer, pOffset, result);
184        }
185        return result;
186    }
187
188    /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}
189     * with the given arguments.
190     * @param pBuffer Passed to the observers.
191     * @param pOffset Passed to the observers.
192     * @param pLength Passed to the observers.
193     * @throws IOException Some observer has thrown an exception, which is being
194     *   passed down.
195     */
196    protected void noteDataBytes(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
197        for (final Observer observer : getObservers()) {
198            observer.data(pBuffer, pOffset, pLength);
199        }
200    }
201
202    /** Notifies the observers by invoking {@link Observer#finished()}.
203     * @throws IOException Some observer has thrown an exception, which is being
204     *   passed down.
205     */
206    protected void noteFinished() throws IOException {
207        for (final Observer observer : getObservers()) {
208            observer.finished();
209        }
210    }
211
212    /** Notifies the observers by invoking {@link Observer#data(int)}
213     * with the given arguments.
214     * @param pDataByte Passed to the observers.
215     * @throws IOException Some observer has thrown an exception, which is being
216     *   passed down.
217     */
218    protected void noteDataByte(final int pDataByte) throws IOException {
219        for (final Observer observer : getObservers()) {
220            observer.data(pDataByte);
221        }
222    }
223
224    /** Notifies the observers by invoking {@link Observer#error(IOException)}
225     * with the given argument.
226     * @param pException Passed to the observers.
227     * @throws IOException Some observer has thrown an exception, which is being
228     *   passed down. This may be the same exception, which has been passed as an
229     *   argument.
230     */
231    protected void noteError(final IOException pException) throws IOException {
232        for (final Observer observer : getObservers()) {
233            observer.error(pException);
234        }
235    }
236
237    /** Notifies the observers by invoking {@link Observer#finished()}.
238     * @throws IOException Some observer has thrown an exception, which is being
239     *   passed down.
240     */
241    protected void noteClosed() throws IOException {
242        for (final Observer observer : getObservers()) {
243            observer.closed();
244        }
245    }
246
247    /** Gets all currently registered observers.
248     * @return a list of the currently registered observers
249     */
250    protected List<Observer> getObservers() {
251        return observers;
252    }
253
254    @Override
255    public void close() throws IOException {
256        IOException ioe = null;
257        try {
258            super.close();
259        } catch (final IOException e) {
260            ioe = e;
261        }
262        if (ioe == null) {
263            noteClosed();
264        } else {
265            noteError(ioe);
266        }
267    }
268
269    /** Reads all data from the underlying {@link InputStream}, while notifying the
270     * observers.
271     * @throws IOException The underlying {@link InputStream}, or either of the
272     *   observers has thrown an exception.
273     */
274    public void consume() throws IOException {
275        final byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
276        for (;;) {
277            final int res = read(buffer);
278            if (res == -1) {
279                return;
280            }
281        }
282    }
283
284}