Changeset f186076


Ignore:
Timestamp:
Sep 19, 2011 2:13:24 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
6630c29
Parents:
49eeed6a
Message:
  • i2ptunnel HTTPResponseOutputStream: Use reusable gunzipper and a larger pipe for efficiency
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java

    r49eeed6a rf186076  
    2121import net.i2p.I2PAppContext;
    2222import net.i2p.data.ByteArray;
     23import net.i2p.util.BigPipedInputStream;
    2324import net.i2p.util.ByteCache;
    2425import net.i2p.util.I2PAppThread;
    2526import net.i2p.util.Log;
     27import net.i2p.util.ReusableGZIPInputStream;
    2628
    2729/**
     
    4547    protected boolean _gzip;
    4648    private long _dataWritten;
    47     private InternalGZIPInputStream _in;
    4849    private static final int CACHE_SIZE = 8*1024;
    4950    private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE);
     
    228229    protected void beginProcessing() throws IOException {
    229230        //out.flush();
    230         PipedInputStream pi = new PipedInputStream();
     231        PipedInputStream pi = BigPipedInputStream.getInstance();
    231232        PipedOutputStream po = new PipedOutputStream(pi);
    232233        // Run in the client thread pool, as there should be an unused thread
     
    243244   
    244245    private class Pusher implements Runnable {
    245         private InputStream _inRaw;
    246         private OutputStream _out;
     246        private final InputStream _inRaw;
     247        private final OutputStream _out;
     248
    247249        public Pusher(InputStream in, OutputStream out) {
    248250            _inRaw = in;
    249251            _out = out;
    250252        }
     253
    251254        public void run() {
    252             _in = null;
     255            ReusableGZIPInputStream _in = null;
    253256            long written = 0;
    254257            ByteArray ba = null;
    255258            try {
    256                 _in = new InternalGZIPInputStream(_inRaw);
     259                _in = ReusableGZIPInputStream.acquire();
     260                // blocking
     261                _in.initialize(_inRaw);
    257262                ba = _cache.acquire();
    258263                byte buf[] = ba.getData();
     
    261266                    if (_log.shouldLog(Log.DEBUG))
    262267                        _log.debug("Read " + read + " and writing it to the browser/streams");
    263                     _out.write(buf, 0, read);
     268;                   _out.write(buf, 0, read);
    264269                    _out.flush();
    265270                    written += read;
     
    287292            }
    288293
    289             double compressed = (_in != null ? _in.getTotalRead() : 0);
    290             double expanded = (_in != null ? _in.getTotalExpanded() : 0);
    291             if (compressed > 0 && expanded > 0) {
    292                 // only update the stats if we did something
    293                 double ratio = compressed/expanded;
    294                 _context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), 0);
    295                 _context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, 0);
    296                 _context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, 0);
     294            if (_in != null) {
     295                double compressed = _in.getTotalRead();
     296                double expanded = _in.getTotalExpanded();
     297                ReusableGZIPInputStream.release(_in);
     298                if (compressed > 0 && expanded > 0) {
     299                    // only update the stats if we did something
     300                    double ratio = compressed/expanded;
     301                    _context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), 0);
     302                    _context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, 0);
     303                    _context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, 0);
     304                }
    297305            }
    298306        }
    299307    }
    300308
    301     /** just a wrapper to provide stats for debugging */
    302     private static class InternalGZIPInputStream extends GZIPInputStream {
    303         public InternalGZIPInputStream(InputStream in) throws IOException {
    304             super(in);
    305         }
    306         public long getTotalRead() {
    307             try {
    308                 return super.inf.getTotalIn();
    309             } catch (Exception e) {
    310                 return 0;
    311             }
    312         }
    313         public long getTotalExpanded() {
    314             try {
    315                 return super.inf.getTotalOut();
    316             } catch (Exception e) {
    317                 return 0;
    318             }
    319         }
    320 
    321         /**
    322          *  From Inflater javadoc:
    323          *  Returns the total number of bytes remaining in the input buffer. This can be used to find out
    324          *  what bytes still remain in the input buffer after decompression has finished.
    325          */
    326         public long getRemaining() {
    327             try {
    328                 return super.inf.getRemaining();
    329             } catch (Exception e) {
    330                 return 0;
    331             }
    332         }
    333         public boolean getFinished() {
    334             try {
    335                 return super.inf.finished();
    336             } catch (Exception e) {
    337                 return true;
    338             }
    339         }
    340         @Override
    341         public String toString() {
    342             return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished();
    343         }
    344     }
    345    
    346     @Override
    347     public String toString() {
    348         return super.toString() + ": " + _in;
    349     }
    350    
    351309/*******
    352310    public static void main(String args[]) {
  • core/java/src/net/i2p/util/InternalServerSocket.java

    r49eeed6a rf186076  
    100100        if (iss == null)
    101101             throw new IOException("No server for port: " + port);
    102         PipedInputStream cis = new BigPipedInputStream();
    103         PipedInputStream sis = new BigPipedInputStream();
     102        PipedInputStream cis = BigPipedInputStream.getInstance();
     103        PipedInputStream sis = BigPipedInputStream.getInstance();
    104104        PipedOutputStream cos = new PipedOutputStream(sis);
    105105        PipedOutputStream sos = new PipedOutputStream(cis);
     
    107107        clientSock.setOutputStream(cos);
    108108        iss.queueConnection(new InternalSocket(sis, sos));
    109     }
    110 
    111     /**
    112      *  Until we switch to Java 1.6
    113      *  http://javatechniques.com/blog/low-memory-deep-copy-technique-for-java-objects/
    114      */
    115     private static class BigPipedInputStream extends PipedInputStream {
    116         protected static int PIPE_SIZE = 64*1024;
    117         public BigPipedInputStream() {
    118              super();
    119              buffer = new byte[PIPE_SIZE];
    120         }
    121109    }
    122110
  • core/java/src/net/i2p/util/LookaheadInputStream.java

    r49eeed6a rf186076  
    1515public class LookaheadInputStream extends FilterInputStream {
    1616    private boolean _eofReached;
    17     private byte[] _footerLookahead;
     17    private final byte[] _footerLookahead;
    1818    private static final InputStream _fakeInputStream = new ByteArrayInputStream(new byte[0]);
    1919   
    2020    public LookaheadInputStream(int lookaheadSize) {
    2121        super(_fakeInputStream);
    22         _eofReached = false;
    2322        _footerLookahead = new byte[lookaheadSize];
    2423    }
     
    2625    public boolean getEOFReached() { return _eofReached; }
    2726       
     27    /** blocking! */
    2828    public void initialize(InputStream src) throws IOException {
    2929        in = src;
     
    3636            footerRead += read;
    3737        }
    38         boolean f = true;
    3938    }
    4039   
     
    5453        return rv;
    5554    }
     55
    5656    @Override
    5757    public int read(byte buf[]) throws IOException {
    5858        return read(buf, 0, buf.length);
    5959    }
     60
    6061    @Override
    6162    public int read(byte buf[], int off, int len) throws IOException {
  • core/java/src/net/i2p/util/ResettableGZIPInputStream.java

    r49eeed6a rf186076  
    2121    private static final boolean DEBUG = false;
    2222    /** keep a typesafe copy of (LookaheadInputStream)in */
    23     private LookaheadInputStream _lookaheadStream;
    24     private CRC32 _crc32;
    25     private byte _buf1[] = new byte[1];
     23    private final LookaheadInputStream _lookaheadStream;
     24    private final CRC32 _crc32;
     25    private final byte _buf1[] = new byte[1];
    2626    private boolean _complete;
    2727   
     
    3535        _lookaheadStream = (LookaheadInputStream)in;
    3636        _crc32 = new CRC32();
    37         _complete = false;
    38     }
     37    }
     38
     39    /**
     40     * Warning - blocking!
     41     */
    3942    public ResettableGZIPInputStream(InputStream compressedStream) throws IOException {
    4043        this();
     
    7982        return read(buf, 0, buf.length);
    8083    }
     84
    8185    @Override
    8286    public int read(byte buf[], int off, int len) throws IOException {
     
    101105    }
    102106   
    103     long getCurrentCRCVal() { return _crc32.getValue(); }
    104    
    105     void verifyFooter() throws IOException {
     107    /**
     108     *  Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
     109     *  @since 0.8.9
     110     */
     111    public long getTotalRead() {
     112        try {
     113            return inf.getBytesRead();
     114        } catch (Exception e) {
     115            return 0;
     116        }
     117    }
     118
     119    /**
     120     *  Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
     121     *  @since 0.8.9
     122     */
     123    public long getTotalExpanded() {
     124        try {
     125            return inf.getBytesWritten();
     126        } catch (Exception e) {
     127            // possible NPE in some implementations
     128            return 0;
     129        }
     130    }
     131
     132    /**
     133     *  Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
     134     *  @since 0.8.9
     135     */
     136    public long getRemaining() {
     137        try {
     138            return inf.getRemaining();
     139        } catch (Exception e) {
     140            // possible NPE in some implementations
     141            return 0;
     142        }
     143    }
     144
     145    /**
     146     *  Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
     147     *  @since 0.8.9
     148     */
     149    public boolean getFinished() {
     150        try {
     151            return inf.finished();
     152        } catch (Exception e) {
     153            // possible NPE in some implementations
     154            return true;
     155        }
     156    }
     157
     158    /**
     159     *  Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
     160     *  @since 0.8.9
     161     */
     162    @Override
     163    public String toString() {
     164        return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished();
     165    }
     166
     167    private long getCurrentCRCVal() { return _crc32.getValue(); }
     168   
     169    private void verifyFooter() throws IOException {
    106170        byte footer[] = _lookaheadStream.getFooter();
    107171       
  • core/java/src/net/i2p/util/ResettableGZIPOutputStream.java

    r49eeed6a rf186076  
    2727    /** how much data is in the uncompressed stream? */
    2828    private long _writtenSize;
    29     private CRC32 _crc32;
     29    private final CRC32 _crc32;
    3030    private static final boolean DEBUG = false;
    3131   
    3232    public ResettableGZIPOutputStream(OutputStream o) {
    3333        super(o, new Deflater(9, true));
    34         _headerWritten = false;
    3534        _crc32 = new CRC32();
    3635    }
     36
    3737    /**
    3838     * Reinitialze everything so we can write a brand new gzip output stream
  • core/java/src/net/i2p/util/ReusableGZIPOutputStream.java

    r49eeed6a rf186076  
    11package net.i2p.util;
    22
    3 import java.io.ByteArrayInputStream;
     3//import java.io.ByteArrayInputStream;
    44import java.io.ByteArrayOutputStream;
    55import java.util.zip.Deflater;
     
    1010
    1111/**
    12  * Provide a cache of reusable GZIP streams, each handling up to 32KB without
     12 * Provide a cache of reusable GZIP streams, each handling up to 40 KB output without
    1313 * expansion.
    1414 *
     15 * This compresses to memory only. Retrieve the compressed data with getData().
     16 * There is no facility to compress to an output stream.
    1517 */
    1618public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream {
     
    5153   
    5254    private ByteArrayOutputStream _buffer = null;
     55
    5356    private ReusableGZIPOutputStream() {
    5457        super(new ByteArrayOutputStream(40*1024));
    5558        _buffer = (ByteArrayOutputStream)out;
    5659    }
     60
    5761    /** clear the data so we can start again afresh */
    5862    @Override
     
    6266        def.setLevel(Deflater.BEST_COMPRESSION);
    6367    }
     68
    6469    public void setLevel(int level) {
    6570        def.setLevel(level);
    6671    }
     72
    6773    /** pull the contents of the stream written */
    6874    public byte[] getData() { return _buffer.toByteArray(); }
Note: See TracChangeset for help on using the changeset viewer.