|
View:
New views
5 Messages
—
Rating Filter:
Alert me
|
|
|
RepeatableInputStream vs. InputStream.reset()Hi,
I've been enjoying using Jets3t, thanks! I am curious about the architecture surrounding IRepeatableInputStream, and specifically why a Jets3t uses a new interface rather than using Java's InputStream.reset() and .markSupported() methods. I ran across this issue when trying to upload s3 objects that were backed by ByteArrayInputStream objects -- all the data was being held in memory as an array, and the input stream supports reset() to allow rewinding to the beginning. I was surprised when I started getting erros about my InputStream not being repeatable. As far as I can tell, it should be possible to replace "instanceof IRepeatableInputStream" checks with calls to InputStream.markSupported() and to replace callls to IRepeatableInputStream.repeatInputStream() with calls to InputStream.reset(). Java's FileInputStreams do not support reset(), so a RepeatableFileInputStream would still be required, but it would implement InputStream instead of RepeatableInputStream. This architecture change would allow natively repeatable streams like ByteArrayInputStreams to be used as-is. I believe it would also maintain all the existing functionality in Jets3t, unless there is something obvious that I'm missing? Would you be interested in receiving such a patch? Thanks, Keith --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@... For additional commands, e-mail: dev-help@... |
|
|
Re: RepeatableInputStream vs. InputStream.reset()Hi Keith,
You make a fair point about using the InputStream@markSupported() check instead of the IRepeatableInputStream interface to determine whether an input stream can be reset. I can't remember now whether I had a good reason for doing it with the IRepeatableInputStream originally, or whether it just made sense at the time. If you have a patch available already please do submit it, or send it in a follow-up email. However, if you don't have a patch ready don't bother preparing one, as I should have time to make the change myself today. Cheers, James --- http://www.jamesmurty.com On Sat, Jun 28, 2008 at 5:31 AM, Keith Bonawitz <bonawitz@...> wrote: Hi, |
|
|
Re: RepeatableInputStream vs. InputStream.reset()See attached patch. I haven't had a chance to test this yet, but it
should have all the necessary functional changes in it. Keith On Fri, Jun 27, 2008 at 10:12 PM, James Murty <jmurty@...> wrote: > Hi Keith, > > You make a fair point about using the InputStream@markSupported() check > instead of the IRepeatableInputStream interface to determine whether an > input stream can be reset. I can't remember now whether I had a good reason > for doing it with the IRepeatableInputStream originally, or whether it just > made sense at the time. > > If you have a patch available already please do submit it, or send it in a > follow-up email. However, if you don't have a patch ready don't bother > preparing one, as I should have time to make the change myself today. > > Cheers, > James > > --- > http://www.jamesmurty.com > > > On Sat, Jun 28, 2008 at 5:31 AM, Keith Bonawitz <bonawitz@...> > wrote: >> >> Hi, >> >> I've been enjoying using Jets3t, thanks! >> >> I am curious about the architecture surrounding >> IRepeatableInputStream, and specifically why a Jets3t uses a new >> interface rather than using Java's InputStream.reset() and >> .markSupported() methods. >> >> I ran across this issue when trying to upload s3 objects that were >> backed by ByteArrayInputStream objects -- all the data was being held >> in memory as an array, and the input stream supports reset() to allow >> rewinding to the beginning. I was surprised when I started getting >> erros about my InputStream not being repeatable. >> >> As far as I can tell, it should be possible to replace "instanceof >> IRepeatableInputStream" checks with calls to >> InputStream.markSupported() and to replace callls to >> IRepeatableInputStream.repeatInputStream() with calls to >> InputStream.reset(). Java's FileInputStreams do not support reset(), >> so a RepeatableFileInputStream would still be required, but it would >> implement InputStream instead of RepeatableInputStream. >> >> This architecture change would allow natively repeatable streams like >> ByteArrayInputStreams to be used as-is. I believe it would also >> maintain all the existing functionality in Jets3t, unless there is >> something obvious that I'm missing? >> >> Would you be interested in receiving such a patch? >> >> Thanks, >> Keith >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: dev-unsubscribe@... >> For additional commands, e-mail: dev-help@... >> > > Index: src/org/jets3t/service/io/RepeatableFileInputStream.java =================================================================== RCS file: /cvs/jets3t/src/org/jets3t/service/io/RepeatableFileInputStream.java,v retrieving revision 1.2 diff -u -r1.2 RepeatableFileInputStream.java --- src/org/jets3t/service/io/RepeatableFileInputStream.java 16 May 2007 13:48:15 -0000 1.2 +++ src/org/jets3t/service/io/RepeatableFileInputStream.java 28 Jun 2008 03:25:17 -0000 @@ -33,12 +33,13 @@ * * @author James Murty */ -public class RepeatableFileInputStream extends InputStream implements IRepeatableInputStream, InputStreamWrapper { +public class RepeatableFileInputStream extends InputStream implements InputStreamWrapper { private final Log log = LogFactory.getLog(RepeatableFileInputStream.class); private File file = null; private FileInputStream fis = null; - private long bytesReadTotal = 0; + private long bytesReadPastMarkPoint = 0; + private long markPoint = 0; /** * Creates a repeatable input stream based on a file. @@ -61,17 +62,28 @@ * @throws UnrecoverableIOException * when the FileInputStream cannot be re-created. */ - public void repeatInputStream() throws IOException { + public void reset() throws IOException { try { this.fis.close(); this.fis = new FileInputStream(file); - log.debug("Reset after returning " + bytesReadTotal + " bytes"); - bytesReadTotal = 0; + fis.skip(markPoint); + log.debug("Reset after returning " + bytesReadPastMarkPoint + " bytes"); + bytesReadPastMarkPoint = 0; } catch (IOException e) { throw new UnrecoverableIOException("Input stream is not repeatable: " + e.getMessage()); } } + public boolean markSupported() { + return true; + } + + public synchronized void mark(int readlimit) { + super.mark(readlimit); + this.markPoint += bytesReadPastMarkPoint; + this.bytesReadPastMarkPoint = 0; + } + public int available() throws IOException { return fis.available(); } @@ -83,7 +95,7 @@ public int read() throws IOException { int byteRead = fis.read(); if (byteRead != -1) { - bytesReadTotal++; + bytesReadPastMarkPoint++; return byteRead; } else { return -1; @@ -92,7 +104,7 @@ public int read(byte[] arg0, int arg1, int arg2) throws IOException { int count = fis.read(arg0, arg1, arg2); - bytesReadTotal += count; + bytesReadPastMarkPoint += count; return count; } Index: src/org/jets3t/service/io/IRepeatableInputStream.java =================================================================== RCS file: src/org/jets3t/service/io/IRepeatableInputStream.java diff -N src/org/jets3t/service/io/IRepeatableInputStream.java --- src/org/jets3t/service/io/IRepeatableInputStream.java 21 May 2007 04:12:22 -0000 1.3 +++ /dev/null 1 Jan 1970 00:00:00 -0000 @@ -1,55 +0,0 @@ -/* - * jets3t : Java Extra-Tasty S3 Toolkit (for Amazon S3 online storage service) - * This is a java.net project, see https://jets3t.dev.java.net/ - * - * Copyright 2006 James Murty - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jets3t.service.io; - -import java.io.IOException; - -/** - * An interface that marks input stream that can be repeated, at least under some circumstances - - * an input stream is repeatable if it can be reset to its starting point and re-read. - * <p> - * Such input streams are useful when transmitting data, as a transmission failure can be recovered - * from by re-transmitting data from the input stream. - * - * @author James Murty - */ -public interface IRepeatableInputStream { - - /** - * Resets the input stream to the beginning. After this method call, the input stream will - * provide data as if it was just created. - * - * @throws IOException - * when the input stream cannot be repeated, such as if the amount of buffered repeatable data - * is insufficient. Ideally this method should throw an {@link UnrecoverableIOException} to indicate - * that no further IO operations as possible. - */ - public void repeatInputStream() throws IOException; - - public int available() throws IOException; - - public void close() throws IOException; - - public int read() throws IOException; - - public int read(byte[] arg0, int arg1, int arg2) throws IOException; - - public int read(byte[] arg0) throws IOException; - -} Index: src/org/jets3t/service/io/RepeatableInputStream.java =================================================================== RCS file: /cvs/jets3t/src/org/jets3t/service/io/RepeatableInputStream.java,v retrieving revision 1.6 diff -u -r1.6 RepeatableInputStream.java --- src/org/jets3t/service/io/RepeatableInputStream.java 21 Mar 2008 23:45:12 -0000 1.6 +++ src/org/jets3t/service/io/RepeatableInputStream.java 28 Jun 2008 03:25:17 -0000 @@ -44,13 +44,13 @@ * * @author James Murty */ -public class RepeatableInputStream extends InputStream implements IRepeatableInputStream, InputStreamWrapper { +public class RepeatableInputStream extends InputStream implements InputStreamWrapper { private final Log log = LogFactory.getLog(RepeatableInputStream.class); private InputStream is = null; - private int bufferOffset = 0; private int bufferSize = 0; - private long bytesReadTotal = 0; + private int bytesBuffered = 0; + private long bytesReadPastMark = 0; private byte[] buffer = null; /** @@ -99,16 +99,32 @@ * when the available buffer size has been exceeded, in which case the input stream data cannot * be repeated. */ - public void repeatInputStream() throws IOException { - if (bytesReadTotal <= bufferSize) { - log.debug("Reset after reading " + bytesReadTotal + " bytes."); - bufferOffset = 0; + public void reset() throws IOException { + if (bytesReadPastMark <= bufferSize) { + log.debug("Reset after reading " + bytesReadPastMark + " bytes."); + bytesBuffered = 0; } else { throw new UnrecoverableIOException( - "Input stream is not repeatable as " + this.bytesReadTotal + "Input stream is not repeatable as " + this.bytesReadPastMark + " bytes have been written, exceeding the available buffer size of " + this.bufferSize); } } + + public boolean markSupported() { + return true; + } + + public synchronized void mark(int readlimit) { + super.mark(readlimit); + if (bytesReadPastMark < bytesBuffered && buffer != null) { + // it is safe to cast bytesReadPastMark to an int because it is known to be less than bufferOffset, which is an int + System.arraycopy(buffer, (int) bytesReadPastMark, buffer, 0, (int)(bytesBuffered - bytesReadPastMark)); + bytesBuffered -= bytesReadPastMark; + } else { + bytesBuffered = 0; + } + bytesReadPastMark = 0; + } public int available() throws IOException { return is.available(); @@ -122,18 +138,18 @@ byte[] tmp = new byte[outLength]; // Check whether we already have buffered data. - if (bufferOffset < bytesReadTotal && bufferOffset < bufferSize) { + if (bytesBuffered < bytesReadPastMark && bytesBuffered < bufferSize) { // Data is being repeated, read from buffer instead of wrapped input stream. // Write the buffered data in chunks so the progress monitor is only updated a // little as a time as the output stream actually pushes through data. int bytesFromBuffer = tmp.length; - if (bufferOffset + bytesFromBuffer > bytesReadTotal) { - bytesFromBuffer = (int) bytesReadTotal - bufferOffset; + if (bytesBuffered + bytesFromBuffer > bytesReadPastMark) { + bytesFromBuffer = (int) bytesReadPastMark - bytesBuffered; } // Write to output. - System.arraycopy(buffer, bufferOffset, out, outOffset, bytesFromBuffer); - bufferOffset += bytesFromBuffer; + System.arraycopy(buffer, bytesBuffered, out, outOffset, bytesFromBuffer); + bytesBuffered += bytesFromBuffer; return bytesFromBuffer; } @@ -145,12 +161,12 @@ } // Fill the buffer with data until it is full. - long length = (bytesReadTotal + count < bufferSize + long length = (bytesReadPastMark + count < bufferSize ? count - : bufferSize - bytesReadTotal); + : bufferSize - bytesReadPastMark); if (length > 0) { - System.arraycopy(tmp, 0, buffer, (int) bytesReadTotal, (int) length); - bufferOffset += length; + System.arraycopy(tmp, 0, buffer, (int) bytesReadPastMark, (int) length); + bytesBuffered += length; } else if (length < 0 && buffer != null) { // We have exceeded the buffer size, after which point it is of no use. Free the memory. log.debug("Buffer size " + bufferSize + @@ -160,7 +176,7 @@ // Write to output. System.arraycopy(tmp, 0, out, outOffset, count); - bytesReadTotal += count; + bytesReadPastMark += count; return count; } Index: src/org/jets3t/service/impl/rest/httpclient/RepeatableRequestEntity.java =================================================================== RCS file: /cvs/jets3t/src/org/jets3t/service/impl/rest/httpclient/RepeatableRequestEntity.java,v retrieving revision 1.7 diff -u -r1.7 RepeatableRequestEntity.java --- src/org/jets3t/service/impl/rest/httpclient/RepeatableRequestEntity.java 19 Feb 2008 01:48:27 -0000 1.7 +++ src/org/jets3t/service/impl/rest/httpclient/RepeatableRequestEntity.java 28 Jun 2008 03:25:17 -0000 @@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory; import org.jets3t.service.Constants; import org.jets3t.service.Jets3tProperties; -import org.jets3t.service.io.IRepeatableInputStream; import org.jets3t.service.io.InputStreamWrapper; import org.jets3t.service.io.ProgressMonitoredInputStream; import org.jets3t.service.io.RepeatableInputStream; @@ -61,7 +60,7 @@ private long contentLength = 0; private long bytesWritten = 0; - private IRepeatableInputStream repeatableInputStream = null; + private InputStream repeatableInputStream = null; private ProgressMonitoredInputStream progressMonitoredIS = null; protected static long MAX_BYTES_PER_SECOND = 0; @@ -101,21 +100,27 @@ this.contentType = contentType; InputStream inputStream = is; - while (inputStream instanceof InputStreamWrapper) { + while (true) { if (inputStream instanceof ProgressMonitoredInputStream) { progressMonitoredIS = (ProgressMonitoredInputStream) inputStream; - } - if (inputStream instanceof IRepeatableInputStream) { - repeatableInputStream = (IRepeatableInputStream) inputStream; } - inputStream = ((InputStreamWrapper) inputStream).getWrappedInputStream(); + if (inputStream.markSupported()) { + repeatableInputStream = inputStream; + } + + if (inputStream instanceof InputStreamWrapper) { + inputStream = ((InputStreamWrapper) inputStream).getWrappedInputStream(); + } else { + break; + } } if (this.repeatableInputStream == null) { log.debug("Wrapping non-repeatable input stream in a RepeatableInputStream"); this.is = new RepeatableInputStream(is); - this.repeatableInputStream = (IRepeatableInputStream) this.is; - } + this.repeatableInputStream = this.is; + } + MAX_BYTES_PER_SECOND = 1024 * Jets3tProperties.getInstance(Constants.JETS3T_PROPERTIES_FILENAME) .getLongProperty("httpclient.read-throttle", 0); @@ -158,7 +163,7 @@ public void writeRequest(OutputStream out) throws IOException { if (bytesWritten > 0) { // This entity is being repeated. - repeatableInputStream.repeatInputStream(); + repeatableInputStream.reset(); log.warn("Repeating transmission of " + bytesWritten + " bytes"); // Notify progress monitored input stream that we've gone backwards (if one is attached) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@... For additional commands, e-mail: dev-help@... |
|
|
Re: RepeatableInputStream vs. InputStream.reset()Hi Keith,
Thanks for the patch, I have incorporated it into the recent changes I made to the codebase. I had to make some modifications to your patch of RepeatableInputStream, and I have made some other small changes to the library, but apart from that the latest code should be fairly close to your local copy. I have done some testing of the modified repeatable input stream classes and they seem to work OK, but if you could perform further testing that would be great. Cheers, James --- http://www.jamesmurty.com On Sat, Jun 28, 2008 at 1:29 PM, Keith Bonawitz <bonawitz@...> wrote: See attached patch. I haven't had a chance to test this yet, but it |
|
|
Re: RepeatableInputStream vs. InputStream.reset()Hi James,
Great! I've been testing from cvs including these updates. Hundreds of files and dozens of gigabytes transferred, and everything seems rock solid. Thanks! Keith On Sat, Jun 28, 2008 at 7:37 AM, James Murty <jmurty@...> wrote: > Hi Keith, > > Thanks for the patch, I have incorporated it into the recent changes I made > to the codebase. I had to make some modifications to your patch of > RepeatableInputStream, and I have made some other small changes to the > library, but apart from that the latest code should be fairly close to your > local copy. > > I have done some testing of the modified repeatable input stream classes and > they seem to work OK, but if you could perform further testing that would be > great. > > Cheers, > James > > --- > http://www.jamesmurty.com > > > On Sat, Jun 28, 2008 at 1:29 PM, Keith Bonawitz <bonawitz@...> > wrote: >> >> See attached patch. I haven't had a chance to test this yet, but it >> should have all the necessary functional changes in it. >> >> Keith >> >> On Fri, Jun 27, 2008 at 10:12 PM, James Murty <jmurty@...> wrote: >> > Hi Keith, >> > >> > You make a fair point about using the InputStream@markSupported() check >> > instead of the IRepeatableInputStream interface to determine whether an >> > input stream can be reset. I can't remember now whether I had a good >> > reason >> > for doing it with the IRepeatableInputStream originally, or whether it >> > just >> > made sense at the time. >> > >> > If you have a patch available already please do submit it, or send it in >> > a >> > follow-up email. However, if you don't have a patch ready don't bother >> > preparing one, as I should have time to make the change myself today. >> > >> > Cheers, >> > James >> > >> > --- >> > http://www.jamesmurty.com >> > >> > >> > On Sat, Jun 28, 2008 at 5:31 AM, Keith Bonawitz >> > <bonawitz@...> >> > wrote: >> >> >> >> Hi, >> >> >> >> I've been enjoying using Jets3t, thanks! >> >> >> >> I am curious about the architecture surrounding >> >> IRepeatableInputStream, and specifically why a Jets3t uses a new >> >> interface rather than using Java's InputStream.reset() and >> >> .markSupported() methods. >> >> >> >> I ran across this issue when trying to upload s3 objects that were >> >> backed by ByteArrayInputStream objects -- all the data was being held >> >> in memory as an array, and the input stream supports reset() to allow >> >> rewinding to the beginning. I was surprised when I started getting >> >> erros about my InputStream not being repeatable. >> >> >> >> As far as I can tell, it should be possible to replace "instanceof >> >> IRepeatableInputStream" checks with calls to >> >> InputStream.markSupported() and to replace callls to >> >> IRepeatableInputStream.repeatInputStream() with calls to >> >> InputStream.reset(). Java's FileInputStreams do not support reset(), >> >> so a RepeatableFileInputStream would still be required, but it would >> >> implement InputStream instead of RepeatableInputStream. >> >> >> >> This architecture change would allow natively repeatable streams like >> >> ByteArrayInputStreams to be used as-is. I believe it would also >> >> maintain all the existing functionality in Jets3t, unless there is >> >> something obvious that I'm missing? >> >> >> >> Would you be interested in receiving such a patch? >> >> >> >> Thanks, >> >> Keith >> >> >> >> --------------------------------------------------------------------- >> >> To unsubscribe, e-mail: dev-unsubscribe@... >> >> For additional commands, e-mail: dev-help@... >> >> >> > >> > >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: dev-unsubscribe@... >> For additional commands, e-mail: dev-help@... > > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@... For additional commands, e-mail: dev-help@... |
| Free embeddable forum powered by Nabble | Forum Help |