Changeset 51c49d7c


Ignore:
Timestamp:
Apr 10, 2004 11:50:11 AM (17 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
e716f9e
Parents:
17a1b11
git-author:
shendaras <shendaras> (04/10/04 11:50:11)
git-committer:
zzz <zzz@…> (04/10/04 11:50:11)
Message:

format (shendaras)

Location:
apps
Files:
16 edited

Legend:

Unmodified
Added
Removed
  • apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java

    r17a1b11 r51c49d7c  
    77
    88    public ByteCollector() {
    9         contents=new byte[80];
    10         size=0;
     9        contents = new byte[80];
     10        size = 0;
    1111    }
    1212
    1313    public ByteCollector(byte[] b) {
    14         this();
    15         append(b);
     14        this();
     15        append(b);
    1616    }
    1717
    1818    public ByteCollector(byte b) {
    19         this();
    20         append(b);
     19        this();
     20        append(b);
    2121    }
    2222
    23     public ByteCollector append (byte b) {
    24         ensureCapacity(size+1);
    25         contents[size++]=b;
    26         return this;
     23    public ByteCollector append(byte b) {
     24        ensureCapacity(size + 1);
     25        contents[size++] = b;
     26        return this;
    2727    }
    2828
    29     public ByteCollector append (byte[] b) {
    30         ensureCapacity(size+b.length);
    31         System.arraycopy(b,0,contents,size,b.length);
    32         size+=b.length;
    33         return this;
     29    public ByteCollector append(byte[] b) {
     30        ensureCapacity(size + b.length);
     31        System.arraycopy(b, 0, contents, size, b.length);
     32        size += b.length;
     33        return this;
    3434    }
    3535
    3636    public ByteCollector append(byte[] b, int len) {
    37         return append(b,0,len);
     37        return append(b, 0, len);
    3838    }
    3939
    4040    public ByteCollector append(byte[] b, int off, int len) {
    41         ensureCapacity(size+len);
    42         System.arraycopy(b,off,contents,size,len);
    43         size+=len;
    44         return this;
     41        ensureCapacity(size + len);
     42        System.arraycopy(b, off, contents, size, len);
     43        size += len;
     44        return this;
    4545    }
    46    
     46
    4747    public ByteCollector append(ByteCollector bc) {
    48         // optimieren?
    49         return append(bc.toByteArray());
     48        // optimieren?
     49        return append(bc.toByteArray());
    5050    }
    5151
    5252    public byte[] toByteArray() {
    53         byte[] result=new byte[size];
    54         System.arraycopy(contents,0,result,0,size);
    55         return result;
     53        byte[] result = new byte[size];
     54        System.arraycopy(contents, 0, result, 0, size);
     55        return result;
    5656    }
    5757
    5858    public byte[] startToByteArray(int maxlen) {
    59         if (size < maxlen) {
    60             byte[] res = toByteArray();
    61             clear();
    62             return res;
    63         } else {
    64             byte[] result = new byte[maxlen];
    65             System.arraycopy(contents,0,result,0,maxlen);
    66             System.arraycopy(contents,maxlen,contents,0,size-maxlen);
    67             size-=maxlen;
    68             return result;
    69         }
     59        if (size < maxlen) {
     60            byte[] res = toByteArray();
     61            clear();
     62            return res;
     63        } else {
     64            byte[] result = new byte[maxlen];
     65            System.arraycopy(contents, 0, result, 0, maxlen);
     66            System.arraycopy(contents, maxlen, contents, 0, size - maxlen);
     67            size -= maxlen;
     68            return result;
     69        }
    7070    }
    71    
    72     public int getCurrentSize() { 
    73         return size;
     71
     72    public int getCurrentSize() {
     73        return size;
    7474    }
    7575
    7676    public boolean ensureCapacity(int cap) {
    77         if (contents.length<cap) {
    78             int l=contents.length;
    79             while (l<cap) {
    80                 l=(l*3)/2+1;
    81             }
    82             byte[] newcont=new byte[l];
    83             System.arraycopy(contents,0,newcont,0,size);
    84             contents=newcont;
    85             return true;
    86         }
    87         return false;
     77        if (contents.length < cap) {
     78            int l = contents.length;
     79            while (l < cap) {
     80                l = (l * 3) / 2 + 1;
     81            }
     82            byte[] newcont = new byte[l];
     83            System.arraycopy(contents, 0, newcont, 0, size);
     84            contents = newcont;
     85            return true;
     86        }
     87        return false;
    8888    }
    8989
    9090    public boolean isEmpty() {
    91         return size==0;
     91        return size == 0;
    9292    }
    9393
    9494    public int indexOf(ByteCollector bc) {
    95         // optimieren?
    96         return indexOf(bc.toByteArray());
     95        // optimieren?
     96        return indexOf(bc.toByteArray());
    9797    }
    98    
     98
    9999    public int indexOf(byte b) {
    100         // optimieren?
    101         return indexOf(new byte[] {b});
     100        // optimieren?
     101        return indexOf(new byte[] { b});
    102102    }
    103    
     103
    104104    public int indexOf(byte[] ba) {
    105         loop:
    106         for (int i=0;i<size-ba.length+1;i++) {
    107             for (int j=0;j<ba.length;j++) {
    108                 if (contents[i+j]!=ba[j]) continue loop;
    109             }
    110             return i;
    111         }
    112         return -1;
     105        loop: for (int i = 0; i < size - ba.length + 1; i++) {
     106            for (int j = 0; j < ba.length; j++) {
     107                if (contents[i + j] != ba[j]) continue loop;
     108            }
     109            return i;
     110        }
     111        return -1;
    113112    }
    114    
     113
    115114    public void clear() {
    116         size=0;
     115        size = 0;
    117116    }
    118117
    119118    public void clearAndShorten() {
    120         size=0;
    121         contents=new byte[80];
     119        size = 0;
     120        contents = new byte[80];
    122121    }
    123122
    124123    public String toString() {
    125         return new String(toByteArray());
     124        return new String(toByteArray());
    126125    }
    127126
    128127    public int hashCode() {
    129         int h =0;
    130         for (int i=0;i<size;i++) {
    131             h+=contents[i]*contents[i];
    132         }
    133         return h;
     128        int h = 0;
     129        for (int i = 0; i < size; i++) {
     130            h += contents[i] * contents[i];
     131        }
     132        return h;
    134133    }
    135134
    136135    public boolean equals(Object o) {
    137         if (o instanceof ByteCollector) {
    138             ByteCollector by=(ByteCollector)o;
    139             if (size!=by.size) return false;
    140             for (int i=0;i<size;i++) {
    141                 if (contents[i]!=by.contents[i]) return false;
    142             }
    143             return true;
    144         } else {
    145             return false;
    146         }
     136        if (o instanceof ByteCollector) {
     137            ByteCollector by = (ByteCollector) o;
     138            if (size != by.size) return false;
     139            for (int i = 0; i < size; i++) {
     140                if (contents[i] != by.contents[i]) return false;
     141            }
     142            return true;
     143        } else {
     144            return false;
     145        }
    147146    }
    148147
    149148    public byte removeFirst() {
    150         byte bb=contents[0];
    151         if (size==0)
    152             throw new IllegalArgumentException("ByteCollector is empty");
    153         if(size>1)
    154             System.arraycopy(contents,1,contents,0,--size);
    155         else
    156             size=0;
    157         return bb;
     149        byte bb = contents[0];
     150        if (size == 0) throw new IllegalArgumentException("ByteCollector is empty");
     151        if (size > 1)
     152            System.arraycopy(contents, 1, contents, 0, --size);
     153        else
     154            size = 0;
     155        return bb;
    158156    }
    159157}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java

    r17a1b11 r51c49d7c  
    1212     */
    1313    public void close() throws I2PException;
    14    
     14
    1515    /**
    1616     * Waits for the next socket connecting.  If a remote user tried to make a
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java

    r17a1b11 r51c49d7c  
    1111    private final static Log _log = new Log(I2PServerSocketImpl.class);
    1212    private I2PSocketManager mgr;
    13     private I2PSocket cached=null; // buffer one socket here
    14    
     13    private I2PSocket cached = null; // buffer one socket here
     14
    1515    public I2PServerSocketImpl(I2PSocketManager mgr) {
    16         this.mgr = mgr;
    17     }
    18    
    19     public synchronized I2PSocket accept() throws I2PException {
    20         while(cached == null) {
    21             myWait();
    22         }
    23         I2PSocket ret=cached;
    24         cached=null;
    25         notifyAll();
    26         _log.debug("TIMING: handed out accept result "+ret.hashCode());
    27         return ret;
    28     }
    29    
    30     public synchronized boolean getNewSocket(I2PSocket s){
    31         while(cached != null) {
    32             myWait();
    33         }
    34         cached=s;
    35         notifyAll();
    36         return true;
    37     }
    38    
    39     public void close() throws I2PException {
    40         //noop
    41     }
    42    
    43     private void myWait() {
    44         try{
    45             wait();
    46         } catch (InterruptedException ex) {}
     16        this.mgr = mgr;
    4717    }
    4818
    49     public I2PSocketManager getManager() { return mgr; }
     19    public synchronized I2PSocket accept() throws I2PException {
     20        while (cached == null) {
     21            myWait();
     22        }
     23        I2PSocket ret = cached;
     24        cached = null;
     25        notifyAll();
     26        _log.debug("TIMING: handed out accept result " + ret.hashCode());
     27        return ret;
     28    }
     29
     30    public synchronized boolean getNewSocket(I2PSocket s) {
     31        while (cached != null) {
     32            myWait();
     33        }
     34        cached = s;
     35        notifyAll();
     36        return true;
     37    }
     38
     39    public void close() throws I2PException {
     40        //noop
     41    }
     42
     43    private void myWait() {
     44        try {
     45            wait();
     46        } catch (InterruptedException ex) {
     47        }
     48    }
     49
     50    public I2PSocketManager getManager() {
     51        return mgr;
     52    }
    5053}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

    r17a1b11 r51c49d7c  
    1919    private final static Log _log = new Log(I2PSocketImpl.class);
    2020
    21     public static final int MAX_PACKET_SIZE = 1024*32;
    22     public static final int PACKET_DELAY=100;
    23    
     21    public static final int MAX_PACKET_SIZE = 1024 * 32;
     22    public static final int PACKET_DELAY = 100;
     23
    2424    private I2PSocketManager manager;
    2525    private Destination local;
     
    3232    private boolean outgoing;
    3333    private Object flagLock = new Object();
    34     private boolean closed = false, sendClose=true, closed2=false;
    35    
    36     public I2PSocketImpl(Destination peer, I2PSocketManager mgr,
    37                          boolean outgoing, String localID) {
    38         this.outgoing=outgoing;
    39         manager = mgr;
    40         remote = peer;
    41         local = mgr.getSession().getMyDestination();
    42         in = new I2PInputStream();
    43         I2PInputStream pin = new I2PInputStream();
    44         out = new I2POutputStream(pin);
    45         new I2PSocketRunner(pin);
    46         this.localID = localID;
    47     }
    48    
     34    private boolean closed = false, sendClose = true, closed2 = false;
     35
     36    public I2PSocketImpl(Destination peer, I2PSocketManager mgr, boolean outgoing, String localID) {
     37        this.outgoing = outgoing;
     38        manager = mgr;
     39        remote = peer;
     40        local = mgr.getSession().getMyDestination();
     41        in = new I2PInputStream();
     42        I2PInputStream pin = new I2PInputStream();
     43        out = new I2POutputStream(pin);
     44        new I2PSocketRunner(pin);
     45        this.localID = localID;
     46    }
     47
    4948    public String getLocalID() {
    50         return localID;
     49        return localID;
    5150    }
    5251
    5352    public void setRemoteID(String id) {
    54         synchronized(remoteIDWaiter) {
    55             remoteID=id;
    56             remoteIDWaiter.notifyAll();
    57         }
     53        synchronized (remoteIDWaiter) {
     54            remoteID = id;
     55            remoteIDWaiter.notifyAll();
     56        }
    5857    }
    5958
    6059    public String getRemoteID(boolean wait) throws InterruptedIOException {
    61         return getRemoteID(wait, -1);
    62     }
     60        return getRemoteID(wait, -1);
     61    }
     62
    6363    public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException {
    64         long dieAfter = System.currentTimeMillis() + maxWait;
    65         synchronized(remoteIDWaiter) {
    66             while (wait && remoteID==null) {
    67                 try {
    68                     if (maxWait > 0)
    69                         remoteIDWaiter.wait(maxWait);
    70                     else
    71                         remoteIDWaiter.wait();
    72                 } catch (InterruptedException ex) {}
    73                    
    74                 if ( (maxWait > 0) && (System.currentTimeMillis() > dieAfter) )
    75                     throw new InterruptedIOException("Timed out waiting for remote ID");
    76             }
    77             if (wait) {
    78                 _log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) +" for "+this.hashCode());
    79             }
    80             return remoteID;
    81         }
     64        long dieAfter = System.currentTimeMillis() + maxWait;
     65        synchronized (remoteIDWaiter) {
     66            while (wait && remoteID == null) {
     67                try {
     68                    if (maxWait > 0)
     69                        remoteIDWaiter.wait(maxWait);
     70                    else
     71                        remoteIDWaiter.wait();
     72                } catch (InterruptedException ex) {
     73                }
     74
     75                if ((maxWait > 0) && (System.currentTimeMillis() > dieAfter))
     76                    throw new InterruptedIOException("Timed out waiting for remote ID");
     77            }
     78            if (wait) {
     79                _log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) + " for "
     80                           + this.hashCode());
     81            }
     82            return remoteID;
     83        }
    8284    }
    8385
    8486    public String getRemoteID() throws InterruptedIOException {
    85         return getRemoteID(false);
     87        return getRemoteID(false);
    8688    }
    8789
    8890    public void queueData(byte[] data) {
    89         in.queueData(data);
     91        in.queueData(data);
    9092    }
    9193
     
    9395     * Return the Destination of this side of the socket.
    9496     */
    95     public Destination getThisDestination() { return local; }
     97    public Destination getThisDestination() {
     98        return local;
     99    }
    96100
    97101    /**
    98102     * Return the destination of the peer.
    99103     */
    100     public Destination getPeerDestination() { return remote; }
     104    public Destination getPeerDestination() {
     105        return remote;
     106    }
    101107
    102108    /**
    103109     * Return an InputStream to read from the socket.
    104110     */
    105     public InputStream getInputStream() throws IOException {
    106         if ( (in == null) )
    107             throw new IOException("Not connected");
    108         return in;
     111    public InputStream getInputStream() throws IOException {
     112        if ((in == null)) throw new IOException("Not connected");
     113        return in;
    109114    }
    110115
     
    113118     */
    114119    public OutputStream getOutputStream() throws IOException {
    115         if ( (out == null) )
    116             throw new IOException("Not connected");
    117         return out;
     120        if ((out == null)) throw new IOException("Not connected");
     121        return out;
    118122    }
    119123
     
    122126     */
    123127    public void close() throws IOException {
    124         synchronized(flagLock) {
    125             _log.debug("Closing connection");
    126             closed=true;
    127         }
    128         out.close();
    129         in.notifyClosed();
     128        synchronized (flagLock) {
     129            _log.debug("Closing connection");
     130            closed = true;
     131        }
     132        out.close();
     133        in.notifyClosed();
    130134    }
    131135
    132136    public void internalClose() {
    133         synchronized(flagLock) {
    134             closed=true;
    135             closed2=true;
    136             sendClose=false;
    137         }
    138         out.close();
    139         in.notifyClosed();
    140     }
    141        
     137        synchronized (flagLock) {
     138            closed = true;
     139            closed2 = true;
     140            sendClose = false;
     141        }
     142        out.close();
     143        in.notifyClosed();
     144    }
    142145
    143146    private byte getMask(int add) {
    144         return (byte)((outgoing?(byte)0xA0:(byte)0x50)+(byte)add);
    145     }
    146    
     147        return (byte) ((outgoing ? (byte) 0xA0 : (byte) 0x50) + (byte) add);
     148    }
     149
    147150    //--------------------------------------------------
    148151    public class I2PInputStream extends InputStream {
    149152
    150         private ByteCollector bc = new ByteCollector();
    151 
    152         public int read() throws IOException {
    153             byte[] b = new byte[1];
    154             int res = read(b);
    155             if (res == 1) return b[0] & 0xff;
    156             if (res == -1) return -1;
    157             throw new RuntimeException("Incorrect read() result");
    158         }
    159 
    160         public synchronized int read(byte[] b, int off, int len) throws IOException {
    161             _log.debug("Read called: "+this.hashCode());
    162             if (len==0) return 0;
    163             byte[] read = bc.startToByteArray(len);
    164             while (read.length==0) {
    165                 synchronized(flagLock) {
    166                     if (closed){
    167                         _log.debug("Closed is set, so closing stream: "+this.hashCode());
    168                         return -1;
    169                     }
    170                 }
    171                 try {
    172                     wait();
    173                 } catch (InterruptedException ex) {}
    174                 read = bc.startToByteArray(len);
    175             }
    176             if (read.length>len) throw new RuntimeException("BUG");
    177             System.arraycopy(read,0,b,off,read.length);
    178 
    179             if (_log.shouldLog(Log.DEBUG)) {
    180                 _log.debug("Read from I2PInputStream " + this.hashCode()
    181                            + " returned "+read.length+" bytes");
    182             }
    183             //if (_log.shouldLog(Log.DEBUG)) {
    184             //  _log.debug("Read from I2PInputStream " + this.hashCode()
    185             //     + " returned "+read.length+" bytes:\n"
    186             //     + HexDump.dump(read));
    187             //}
    188             return read.length;
    189         }
    190 
    191         public int available() {
    192             return bc.getCurrentSize();
    193         }
    194 
    195         public void queueData(byte[] data) {
    196             queueData(data,0,data.length);
    197         }
    198 
    199         public synchronized void queueData(byte[] data, int off, int len) {
    200             _log.debug("Insert "+len+" bytes into queue: "+this.hashCode());
    201             bc.append(data, off, len);
    202             notifyAll();
    203         }
    204 
    205         public synchronized void notifyClosed() {
    206             notifyAll();
    207         }
    208        
     153        private ByteCollector bc = new ByteCollector();
     154
     155        public int read() throws IOException {
     156            byte[] b = new byte[1];
     157            int res = read(b);
     158            if (res == 1) return b[0] & 0xff;
     159            if (res == -1) return -1;
     160            throw new RuntimeException("Incorrect read() result");
     161        }
     162
     163        public synchronized int read(byte[] b, int off, int len) throws IOException {
     164            _log.debug("Read called: " + this.hashCode());
     165            if (len == 0) return 0;
     166            byte[] read = bc.startToByteArray(len);
     167            while (read.length == 0) {
     168                synchronized (flagLock) {
     169                    if (closed) {
     170                        _log.debug("Closed is set, so closing stream: " + this.hashCode());
     171                        return -1;
     172                    }
     173                }
     174                try {
     175                    wait();
     176                } catch (InterruptedException ex) {
     177                }
     178                read = bc.startToByteArray(len);
     179            }
     180            if (read.length > len) throw new RuntimeException("BUG");
     181            System.arraycopy(read, 0, b, off, read.length);
     182
     183            if (_log.shouldLog(Log.DEBUG)) {
     184                _log.debug("Read from I2PInputStream " + this.hashCode() + " returned " + read.length + " bytes");
     185            }
     186            //if (_log.shouldLog(Log.DEBUG)) {
     187            //  _log.debug("Read from I2PInputStream " + this.hashCode()
     188            //     + " returned "+read.length+" bytes:\n"
     189            //     + HexDump.dump(read));
     190            //}
     191            return read.length;
     192        }
     193
     194        public int available() {
     195            return bc.getCurrentSize();
     196        }
     197
     198        public void queueData(byte[] data) {
     199            queueData(data, 0, data.length);
     200        }
     201
     202        public synchronized void queueData(byte[] data, int off, int len) {
     203            _log.debug("Insert " + len + " bytes into queue: " + this.hashCode());
     204            bc.append(data, off, len);
     205            notifyAll();
     206        }
     207
     208        public synchronized void notifyClosed() {
     209            notifyAll();
     210        }
     211
    209212    }
    210213
    211214    public class I2POutputStream extends OutputStream {
    212215
    213         public I2PInputStream sendTo;
    214        
    215         public I2POutputStream(I2PInputStream sendTo) {
    216             this.sendTo=sendTo;
    217         }
    218         public void write(int b) throws IOException {
    219             write(new byte[] {(byte)b});
    220         }
    221 
    222         public void write (byte[] b, int off, int len)  throws IOException {
    223             sendTo.queueData(b,off,len);
    224         }
    225 
    226         public void close() {
    227             sendTo.notifyClosed();
    228         }
     216        public I2PInputStream sendTo;
     217
     218        public I2POutputStream(I2PInputStream sendTo) {
     219            this.sendTo = sendTo;
     220        }
     221
     222        public void write(int b) throws IOException {
     223            write(new byte[] { (byte) b});
     224        }
     225
     226        public void write(byte[] b, int off, int len) throws IOException {
     227            sendTo.queueData(b, off, len);
     228        }
     229
     230        public void close() {
     231            sendTo.notifyClosed();
     232        }
    229233    }
    230234
    231235    public class I2PSocketRunner extends I2PThread {
    232236
    233         public InputStream in;
    234 
    235         public I2PSocketRunner(InputStream in) {
    236             _log.debug("Runner's input stream is: "+in.hashCode());
    237             this.in=in;
    238             setName("SocketRunner from " + I2PSocketImpl.this.remote.calculateHash().toBase64().substring(0, 4));
    239             start();
    240         }
    241 
    242         public void run() {
    243             byte[] buffer = new byte[MAX_PACKET_SIZE];
    244             ByteCollector bc = new ByteCollector();
    245             boolean sent = true;
    246             try {
    247                 int len, bcsize;
    248 //              try {
    249                 while (true) {
    250                     len = in.read(buffer);
    251                     bcsize = bc.getCurrentSize();
    252                     if (len != -1) {
    253                         bc.append(buffer,len);
    254                     } else if (bcsize == 0) {
    255                         break;
    256                     }
    257                     if ((bcsize < MAX_PACKET_SIZE)
    258                         && (in.available()==0)) {
    259                         _log.debug("Runner Point d: "+this.hashCode());
    260                                                
    261                         try {
    262                             Thread.sleep(PACKET_DELAY);
    263                         } catch (InterruptedException e) {
    264                             e.printStackTrace();
    265                         }
    266                     }
    267                     if ((bcsize >= MAX_PACKET_SIZE)
    268                         || (in.available()==0) ) {
    269                         byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
    270                         if (data.length > 0) {
    271                             _log.debug("Message size is: "+data.length);
    272                             sent = sendBlock(data);
    273                             if (!sent) {
    274                                 _log.error("Error sending message to peer.  Killing socket runner");
    275                                 break;
    276                             }
    277                         }
    278                     }
    279                 }
    280                 if ((bc.getCurrentSize() > 0) && sent) {
    281                     _log.error("A SCARY MONSTER HAS EATEN SOME DATA! "
    282                                + "(input stream: " + in.hashCode() + "; "
    283                                + "queue size: " + bc.getCurrentSize() + ")");
    284                 }
    285                 synchronized(flagLock) {
    286                     closed2=true;
    287                 }
    288 //              } catch (IOException ex) {
    289 //                  if (_log.shouldLog(Log.INFO))
    290 //                      _log.info("Error reading and writing", ex);
    291 //              }
    292                 boolean sc;
    293                 synchronized(flagLock) {
    294                     sc=sendClose;
    295                 } // FIXME: Race here?
    296                 if (sc) {
    297                     _log.info("Sending close packet: "+outgoing);
    298                     byte[] packet = I2PSocketManager.makePacket
    299                         ((byte)(getMask(0x02)),remoteID, new byte[0]);
    300                     synchronized(manager.getSession()) {
    301                         sent = manager.getSession().sendMessage(remote, packet);
    302                     }
    303                     if (!sent) {
    304                         _log.error("Error sending close packet to peer");
    305                     }
    306                 }
    307                 manager.removeSocket(I2PSocketImpl.this);
    308             } catch (IOException ex) {
    309                 // WHOEVER removes this event on inconsistent
    310                 // state before fixing the inconsistent state (a
    311                 // reference on the socket in the socket manager
    312                 // etc.) will get hanged by me personally -- mihi
    313                 _log.error("Error running - **INCONSISTENT STATE!!!**", ex);
    314             } catch (I2PException ex) {
    315                 _log.error("Error running - **INCONSISTENT STATE!!!**" , ex);
    316             }
    317         }
    318        
    319         private boolean sendBlock(byte data[]) throws I2PSessionException {
    320             _log.debug("TIMING: Block to send for "+I2PSocketImpl.this.hashCode());
    321             if (remoteID==null) {
    322                 _log.error("NULL REMOTEID");
    323                 return false;
    324             }
    325             byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID,
    326                                                  data);
    327             boolean sent;
    328             synchronized(flagLock) {
    329                 if (closed2) return false;
    330             }
    331             synchronized(manager.getSession()) {
    332                 sent = manager.getSession().sendMessage(remote, packet);
    333             }
    334             return sent;
    335         }
     237        public InputStream in;
     238
     239        public I2PSocketRunner(InputStream in) {
     240            _log.debug("Runner's input stream is: " + in.hashCode());
     241            this.in = in;
     242            setName("SocketRunner from " + I2PSocketImpl.this.remote.calculateHash().toBase64().substring(0, 4));
     243            start();
     244        }
     245
     246        public void run() {
     247            byte[] buffer = new byte[MAX_PACKET_SIZE];
     248            ByteCollector bc = new ByteCollector();
     249            boolean sent = true;
     250            try {
     251                int len, bcsize;
     252                //              try {
     253                while (true) {
     254                    len = in.read(buffer);
     255                    bcsize = bc.getCurrentSize();
     256                    if (len != -1) {
     257                        bc.append(buffer, len);
     258                    } else if (bcsize == 0) {
     259                        break;
     260                    }
     261                    if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
     262                        _log.debug("Runner Point d: " + this.hashCode());
     263
     264                        try {
     265                            Thread.sleep(PACKET_DELAY);
     266                        } catch (InterruptedException e) {
     267                            e.printStackTrace();
     268                        }
     269                    }
     270                    if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
     271                        byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
     272                        if (data.length > 0) {
     273                            _log.debug("Message size is: " + data.length);
     274                            sent = sendBlock(data);
     275                            if (!sent) {
     276                                _log.error("Error sending message to peer.  Killing socket runner");
     277                                break;
     278                            }
     279                        }
     280                    }
     281                }
     282                if ((bc.getCurrentSize() > 0) && sent) {
     283                    _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " + in.hashCode() + "; "
     284                               + "queue size: " + bc.getCurrentSize() + ")");
     285                }
     286                synchronized (flagLock) {
     287                    closed2 = true;
     288                }
     289                //              } catch (IOException ex) {
     290                //                  if (_log.shouldLog(Log.INFO))
     291                //                      _log.info("Error reading and writing", ex);
     292                //              }
     293                boolean sc;
     294                synchronized (flagLock) {
     295                    sc = sendClose;
     296                } // FIXME: Race here?
     297                if (sc) {
     298                    _log.info("Sending close packet: " + outgoing);
     299                    byte[] packet = I2PSocketManager.makePacket((byte) (getMask(0x02)), remoteID, new byte[0]);
     300                    synchronized (manager.getSession()) {
     301                        sent = manager.getSession().sendMessage(remote, packet);
     302                    }
     303                    if (!sent) {
     304                        _log.error("Error sending close packet to peer");
     305                    }
     306                }
     307                manager.removeSocket(I2PSocketImpl.this);
     308            } catch (IOException ex) {
     309                // WHOEVER removes this event on inconsistent
     310                // state before fixing the inconsistent state (a
     311                // reference on the socket in the socket manager
     312                // etc.) will get hanged by me personally -- mihi
     313                _log.error("Error running - **INCONSISTENT STATE!!!**", ex);
     314            } catch (I2PException ex) {
     315                _log.error("Error running - **INCONSISTENT STATE!!!**", ex);
     316            }
     317        }
     318
     319        private boolean sendBlock(byte data[]) throws I2PSessionException {
     320            _log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
     321            if (remoteID == null) {
     322                _log.error("NULL REMOTEID");
     323                return false;
     324            }
     325            byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, data);
     326            boolean sent;
     327            synchronized (flagLock) {
     328                if (closed2) return false;
     329            }
     330            synchronized (manager.getSession()) {
     331                sent = manager.getSession().sendMessage(remote, packet);
     332            }
     333            return sent;
     334        }
    336335    }
    337336}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java

    r17a1b11 r51c49d7c  
    3838    private I2PSocketOptions _defaultOptions;
    3939
    40     public static final int PUBKEY_LENGTH=387;
    41 
    42    
     40    public static final int PUBKEY_LENGTH = 387;
     41
    4342    public I2PSocketManager() {
    44         _session=null;
    45         _serverSocket = new I2PServerSocketImpl(this);
    46         _inSockets = new HashMap(16);
    47         _outSockets = new HashMap(16);
     43        _session = null;
     44        _serverSocket = new I2PServerSocketImpl(this);
     45        _inSockets = new HashMap(16);
     46        _outSockets = new HashMap(16);
    4847    }
    4948
    5049    public I2PSession getSession() {
    51         return _session;
    52     }
    53    
    54     public void setSession(I2PSession session) {
    55         _session = session;
    56         if (session != null)
    57             session.setSessionListener(this);
    58     }
    59    
     50        return _session;
     51    }
     52
     53    public void setSession(I2PSession session) {
     54        _session = session;
     55        if (session != null) session.setSessionListener(this);
     56    }
     57
    6058    public void disconnected(I2PSession session) {
    61         _log.error("Disconnected from the session");
    62     }
    63    
     59        _log.error("Disconnected from the session");
     60    }
     61
    6462    public void errorOccurred(I2PSession session, String message, Throwable error) {
    65         _log.error("Error occurred: [" + message + "]", error);
    66     }
    67    
     63        _log.error("Error occurred: [" + message + "]", error);
     64    }
     65
    6866    public void messageAvailable(I2PSession session, int msgId, long size) {
    69         try {
    70             I2PSocketImpl s;
    71             byte msg[] = session.receiveMessage(msgId);
    72             if (msg.length == 1 && msg[0] == -1) {
    73                 _log.debug("Ping received");
    74                 return;
    75             }
    76             if (msg.length <4) {
    77                 _log.error("==== packet too short ====");
    78                 return;
    79             }
    80             int type = msg[0] & 0xff;
    81             String id = new String(new byte[] {msg[1], msg[2], msg[3]},
    82                                    "ISO-8859-1");
    83             byte[] payload = new byte[msg.length-4];
    84             System.arraycopy(msg, 4, payload, 0, payload.length);
    85             _log.debug("Message read: type = [" + Integer.toHexString(type) +
    86                        "] id = [" + getReadableForm(id)+
    87                        "] payload length: " + payload.length + "]");
    88             synchronized(lock) {
    89                 switch(type) {
    90                 case 0x51: // ACK outgoing
    91                     s = (I2PSocketImpl) _outSockets.get(id);
    92                     if (s == null) {
    93                         _log.warn("No socket responsible for ACK packet");
    94                         return;
    95                     }
    96                     if (payload.length==3 && s.getRemoteID(false)==null) {
    97                         String newID = new String(payload,
    98                                                   "ISO-8859-1");
    99                         s.setRemoteID(newID);
    100                         return;
    101                     } else {
    102                         if (payload.length != 3)
    103                             _log.warn("Ack packet had " + payload.length + " bytes");
    104                         else
    105                             _log.warn("Remote ID already exists? " + s.getRemoteID());
    106                         return;
    107                     }
    108                 case 0x52: // disconnect outgoing
    109                     _log.debug("*Disconnect outgoing!");
    110                     try {
    111                         s = (I2PSocketImpl) _outSockets.get(id);
    112                         if (payload.length==0 && s != null) {
    113                             s.internalClose();
    114                             _outSockets.remove(id);
    115                             return;
    116                         } else {
    117                             if (payload.length > 0)
    118                                 _log.warn("Disconnect packet had " + payload.length + " bytes");
    119                             return;
    120                         }
    121                     } catch (Exception t) {
    122                         _log.error("Ignoring error on disconnect", t);
    123                     }
    124                 case 0x50: // packet send outgoing
    125                     _log.debug("*Packet send outgoing [" + payload.length + "]");
    126                     s = (I2PSocketImpl) _outSockets.get(id);
    127                     if (s != null) {
    128                         s.queueData(payload);
    129                         return;
    130                     } else {
    131                         _log.error("Null socket with data available");
    132                         throw new IllegalStateException("Null socket with data available");
    133                     }
    134                 case 0xA1: // SYN incoming
    135                     _log.debug("*Syn!");
    136                     if (payload.length==PUBKEY_LENGTH) {
    137                         String newLocalID = makeID(_inSockets);
    138                         Destination d = new Destination();
    139                         d.readBytes(new ByteArrayInputStream(payload));
    140                        
    141                         s = new I2PSocketImpl(d, this, false,
    142                                               newLocalID);
    143                         s.setRemoteID(id);
    144                         if (_serverSocket.getNewSocket(s)) {
    145                             _inSockets.put(newLocalID, s);
    146                             byte[] packet = makePacket
    147                                 ((byte)0x51, id,
    148                                  newLocalID.getBytes("ISO-8859-1"));
    149                             boolean replySentOk = false;
    150                             synchronized(_session) {
    151                                 replySentOk = _session.sendMessage(d, packet);
    152                             }
    153                                 if (!replySentOk) {
    154                                     _log.error("Error sending reply to " +
    155                                                d.calculateHash().toBase64() +
    156                                                " in response to a new con message",
    157                                                new Exception("Failed creation"));
    158                                     s.internalClose();
    159                                 }
    160                         } else {
    161                             byte[] packet =
    162                                 (" "+id).getBytes("ISO-8859-1");
    163                             packet[0]=0x52;
    164                             boolean nackSent = session.sendMessage(d, packet);
    165                             if (!nackSent) {
    166                                 _log.error("Error sending NACK for session creation");
    167                             }
    168                             s.internalClose();
    169                         }
    170                         return;
    171                     } else {
    172                         _log.error("Syn packet that has a payload not equal to the pubkey length (" + payload.length + " != " + PUBKEY_LENGTH + ")");
    173                         return;
    174                     }
    175                 case 0xA2: // disconnect incoming
    176                     _log.debug("*Disconnect incoming!");
    177                     try {
    178                         s = (I2PSocketImpl) _inSockets.get(id);
    179                         if (payload.length==0 && s != null) {
    180                             s.internalClose();
    181                             _inSockets.remove(id);
    182                             return;
    183                         } else {
    184                             if (payload.length > 0)
    185                                 _log.warn("Disconnect packet had " + payload.length + " bytes");
    186                             return;
    187                         }
    188                     } catch (Exception t) {
    189                         _log.error("Ignoring error on disconnect", t);
    190                         return;
    191                     }
    192                 case 0xA0: // packet send incoming
    193                     _log.debug("*Packet send incoming [" + payload.length + "]");
    194                     s = (I2PSocketImpl) _inSockets.get(id);
    195                     if (s != null) {
    196                         s.queueData(payload);
    197                         return;
    198                     } else {
    199                         _log.error("Null socket with data available");
    200                         throw new IllegalStateException("Null socket with data available");
    201                     }
    202                 case 0xFF: // ignore
    203                     return;
    204                 }
    205                 _log.error("\n\n=============== Unknown packet! "+
    206                                    "============"+
    207                                    "\nType: "+(int)type+
    208                                    "\nID:   " + getReadableForm(id)+
    209                                    "\nBase64'ed Data: "+Base64.encode(payload)+
    210                                    "\n\n\n");
    211                 if (id != null) {
    212                     _inSockets.remove(id);
    213                     _outSockets.remove(id);
    214                 }
    215             }
    216         } catch (I2PException ise) {
    217             _log.error("Error processing", ise);
    218         } catch (IOException ioe) {
    219             _log.error("Error processing", ioe);
    220         } catch (IllegalStateException ise) {
    221             _log.debug("Error processing", ise);
    222         }
    223     }
    224    
     67        try {
     68            I2PSocketImpl s;
     69            byte msg[] = session.receiveMessage(msgId);
     70            if (msg.length == 1 && msg[0] == -1) {
     71                _log.debug("Ping received");
     72                return;
     73            }
     74            if (msg.length < 4) {
     75                _log.error("==== packet too short ====");
     76                return;
     77            }
     78            int type = msg[0] & 0xff;
     79            String id = new String(new byte[] { msg[1], msg[2], msg[3]}, "ISO-8859-1");
     80            byte[] payload = new byte[msg.length - 4];
     81            System.arraycopy(msg, 4, payload, 0, payload.length);
     82            _log.debug("Message read: type = [" + Integer.toHexString(type) + "] id = [" + getReadableForm(id)
     83                       + "] payload length: " + payload.length + "]");
     84            synchronized (lock) {
     85                switch (type) {
     86                case 0x51:
     87                    // ACK outgoing
     88                    s = (I2PSocketImpl) _outSockets.get(id);
     89                    if (s == null) {
     90                        _log.warn("No socket responsible for ACK packet");
     91                        return;
     92                    }
     93                    if (payload.length == 3 && s.getRemoteID(false) == null) {
     94                        String newID = new String(payload, "ISO-8859-1");
     95                        s.setRemoteID(newID);
     96                        return;
     97                    } else {
     98                        if (payload.length != 3)
     99                            _log.warn("Ack packet had " + payload.length + " bytes");
     100                        else
     101                            _log.warn("Remote ID already exists? " + s.getRemoteID());
     102                        return;
     103                    }
     104                case 0x52:
     105                    // disconnect outgoing
     106                    _log.debug("*Disconnect outgoing!");
     107                    try {
     108                        s = (I2PSocketImpl) _outSockets.get(id);
     109                        if (payload.length == 0 && s != null) {
     110                            s.internalClose();
     111                            _outSockets.remove(id);
     112                            return;
     113                        } else {
     114                            if (payload.length > 0) _log.warn("Disconnect packet had " + payload.length + " bytes");
     115                            return;
     116                        }
     117                    } catch (Exception t) {
     118                        _log.error("Ignoring error on disconnect", t);
     119                    }
     120                case 0x50:
     121                    // packet send outgoing
     122                    _log.debug("*Packet send outgoing [" + payload.length + "]");
     123                    s = (I2PSocketImpl) _outSockets.get(id);
     124                    if (s != null) {
     125                        s.queueData(payload);
     126                        return;
     127                    } else {
     128                        _log.error("Null socket with data available");
     129                        throw new IllegalStateException("Null socket with data available");
     130                    }
     131                case 0xA1:
     132                    // SYN incoming
     133                    _log.debug("*Syn!");
     134                    if (payload.length == PUBKEY_LENGTH) {
     135                        String newLocalID = makeID(_inSockets);
     136                        Destination d = new Destination();
     137                        d.readBytes(new ByteArrayInputStream(payload));
     138
     139                        s = new I2PSocketImpl(d, this, false, newLocalID);
     140                        s.setRemoteID(id);
     141                        if (_serverSocket.getNewSocket(s)) {
     142                            _inSockets.put(newLocalID, s);
     143                            byte[] packet = makePacket((byte) 0x51, id, newLocalID.getBytes("ISO-8859-1"));
     144                            boolean replySentOk = false;
     145                            synchronized (_session) {
     146                                replySentOk = _session.sendMessage(d, packet);
     147                            }
     148                            if (!replySentOk) {
     149                                _log.error("Error sending reply to " + d.calculateHash().toBase64()
     150                                           + " in response to a new con message", new Exception("Failed creation"));
     151                                s.internalClose();
     152                            }
     153                        } else {
     154                            byte[] packet = (" " + id).getBytes("ISO-8859-1");
     155                            packet[0] = 0x52;
     156                            boolean nackSent = session.sendMessage(d, packet);
     157                            if (!nackSent) {
     158                                _log.error("Error sending NACK for session creation");
     159                            }
     160                            s.internalClose();
     161                        }
     162                        return;
     163                    } else {
     164                        _log.error("Syn packet that has a payload not equal to the pubkey length (" + payload.length
     165                                   + " != " + PUBKEY_LENGTH + ")");
     166                        return;
     167                    }
     168                case 0xA2:
     169                    // disconnect incoming
     170                    _log.debug("*Disconnect incoming!");
     171                    try {
     172                        s = (I2PSocketImpl) _inSockets.get(id);
     173                        if (payload.length == 0 && s != null) {
     174                            s.internalClose();
     175                            _inSockets.remove(id);
     176                            return;
     177                        } else {
     178                            if (payload.length > 0) _log.warn("Disconnect packet had " + payload.length + " bytes");
     179                            return;
     180                        }
     181                    } catch (Exception t) {
     182                        _log.error("Ignoring error on disconnect", t);
     183                        return;
     184                    }
     185                case 0xA0:
     186                    // packet send incoming
     187                    _log.debug("*Packet send incoming [" + payload.length + "]");
     188                    s = (I2PSocketImpl) _inSockets.get(id);
     189                    if (s != null) {
     190                        s.queueData(payload);
     191                        return;
     192                    } else {
     193                        _log.error("Null socket with data available");
     194                        throw new IllegalStateException("Null socket with data available");
     195                    }
     196                case 0xFF:
     197                    // ignore
     198                    return;
     199                }
     200                _log.error("\n\n=============== Unknown packet! " + "============" + "\nType: " + (int) type
     201                           + "\nID:   " + getReadableForm(id) + "\nBase64'ed Data: " + Base64.encode(payload)
     202                           + "\n\n\n");
     203                if (id != null) {
     204                    _inSockets.remove(id);
     205                    _outSockets.remove(id);
     206                }
     207            }
     208        } catch (I2PException ise) {
     209            _log.error("Error processing", ise);
     210        } catch (IOException ioe) {
     211            _log.error("Error processing", ioe);
     212        } catch (IllegalStateException ise) {
     213            _log.debug("Error processing", ise);
     214        }
     215    }
     216
    225217    public void reportAbuse(I2PSession session, int severity) {
    226         _log.error("Abuse reported [" + severity + "]");
    227     }
    228  
    229     public void setDefaultOptions(I2PSocketOptions options) { _defaultOptions = options; }
    230 
    231     public I2PSocketOptions getDefaultOptions() { return _defaultOptions ; }
    232    
    233     public I2PServerSocket getServerSocket() { return _serverSocket; }
    234    
     218        _log.error("Abuse reported [" + severity + "]");
     219    }
     220
     221    public void setDefaultOptions(I2PSocketOptions options) {
     222        _defaultOptions = options;
     223    }
     224
     225    public I2PSocketOptions getDefaultOptions() {
     226        return _defaultOptions;
     227    }
     228
     229    public I2PServerSocket getServerSocket() {
     230        return _serverSocket;
     231    }
     232
    235233    /**
    236234     * Create a new connected socket (block until the socket is created)
     
    240238    public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException {
    241239
    242         String localID, lcID;
    243         I2PSocketImpl s;
    244         synchronized(lock) {
    245             localID=makeID(_outSockets);
    246             lcID=getReadableForm(localID);
    247             s = new I2PSocketImpl(peer, this, true, localID);
    248             _outSockets.put(s.getLocalID(),s);
    249         }
    250         try {
    251             ByteArrayOutputStream pubkey = new ByteArrayOutputStream();
    252             _session.getMyDestination().writeBytes(pubkey);
    253             String remoteID;
    254             byte[] packet = makePacket((byte)0xA1, localID,
    255                                        pubkey.toByteArray());
    256             boolean sent = false;
    257             synchronized(_session) {
    258                 sent = _session.sendMessage(peer, packet);
    259             }
    260             if (!sent) {
    261                 _log.info("Unable to send & receive ack for SYN packet");
    262                 synchronized(lock) {
    263                     _outSockets.remove(s.getLocalID());
    264                 }
    265                 throw new I2PException("Unable to reach peer");
    266             }               
    267             remoteID = s.getRemoteID(true, options.getConnectTimeout());
    268             if ("".equals(remoteID)) {
    269                 throw new I2PException("Unable to reach peer");
    270             }
    271             _log.debug("TIMING: s given out for remoteID "+getReadableForm(remoteID));
    272             return s;
    273         } catch (InterruptedIOException ioe) {
    274             _log.error("Timeout waiting for ack from syn for id " + getReadableForm(lcID), ioe);
    275             synchronized(lock) {
    276                 _outSockets.remove(s.getLocalID());
    277             }
    278             throw new I2PException("Timeout waiting for ack");
    279         } catch (IOException ex) {
    280             _log.error("Error sending syn on id " + getReadableForm(lcID), ex);
    281             synchronized(lock) {
    282                 _outSockets.remove(s.getLocalID());
    283             }
    284             throw new I2PException("IOException occurred");
    285         } catch (I2PException ex) {
    286                 _log.info("Error sending syn on id " + getReadableForm(lcID), ex);
    287                 synchronized(lock) {
    288                     _outSockets.remove(s.getLocalID());
    289                 }
    290                 throw ex;
    291         }
    292     }
    293    
     240        String localID, lcID;
     241        I2PSocketImpl s;
     242        synchronized (lock) {
     243            localID = makeID(_outSockets);
     244            lcID = getReadableForm(localID);
     245            s = new I2PSocketImpl(peer, this, true, localID);
     246            _outSockets.put(s.getLocalID(), s);
     247        }
     248        try {
     249            ByteArrayOutputStream pubkey = new ByteArrayOutputStream();
     250            _session.getMyDestination().writeBytes(pubkey);
     251            String remoteID;
     252            byte[] packet = makePacket((byte) 0xA1, localID, pubkey.toByteArray());
     253            boolean sent = false;
     254            synchronized (_session) {
     255                sent = _session.sendMessage(peer, packet);
     256            }
     257            if (!sent) {
     258                _log.info("Unable to send & receive ack for SYN packet");
     259                synchronized (lock) {
     260                    _outSockets.remove(s.getLocalID());
     261                }
     262                throw new I2PException("Unable to reach peer");
     263            }
     264            remoteID = s.getRemoteID(true, options.getConnectTimeout());
     265            if ("".equals(remoteID)) { throw new I2PException("Unable to reach peer"); }
     266            _log.debug("TIMING: s given out for remoteID " + getReadableForm(remoteID));
     267            return s;
     268        } catch (InterruptedIOException ioe) {
     269            _log.error("Timeout waiting for ack from syn for id " + getReadableForm(lcID), ioe);
     270            synchronized (lock) {
     271                _outSockets.remove(s.getLocalID());
     272            }
     273            throw new I2PException("Timeout waiting for ack");
     274        } catch (IOException ex) {
     275            _log.error("Error sending syn on id " + getReadableForm(lcID), ex);
     276            synchronized (lock) {
     277                _outSockets.remove(s.getLocalID());
     278            }
     279            throw new I2PException("IOException occurred");
     280        } catch (I2PException ex) {
     281            _log.info("Error sending syn on id " + getReadableForm(lcID), ex);
     282            synchronized (lock) {
     283                _outSockets.remove(s.getLocalID());
     284            }
     285            throw ex;
     286        }
     287    }
     288
    294289    public I2PSocket connect(Destination peer) throws I2PException {
    295         return connect(peer, null);
    296     }
    297    
    298      /**
     290        return connect(peer, null);
     291    }
     292
     293    /**
    299294     * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
    300295     *
    301296     */
    302297    public Set listSockets() {
    303         Set sockets = new HashSet(8);
    304         synchronized (lock) {
    305             sockets.addAll(_inSockets.values());
    306             sockets.addAll(_outSockets.values());
    307         }
    308         return sockets;
    309     }
    310    
     298        Set sockets = new HashSet(8);
     299        synchronized (lock) {
     300            sockets.addAll(_inSockets.values());
     301            sockets.addAll(_outSockets.values());
     302        }
     303        return sockets;
     304    }
     305
    311306    /**
    312307     * Ping the specified peer, returning true if they replied to the ping within
     
    315310     */
    316311    public boolean ping(Destination peer, long timeoutMs) {
    317         try {
    318             return _session.sendMessage(peer, new byte[] {(byte)0xFF});
    319         } catch (I2PException ex) {
    320             _log.error("I2PException:",ex);
    321             return false;
    322         }
     312        try {
     313            return _session.sendMessage(peer, new byte[] { (byte) 0xFF});
     314        } catch (I2PException ex) {
     315            _log.error("I2PException:", ex);
     316            return false;
     317        }
    323318    }
    324319
    325320    public void removeSocket(I2PSocketImpl sock) {
    326         synchronized(lock) {
    327             _inSockets.remove(sock.getLocalID());
    328             _outSockets.remove(sock.getLocalID());
    329         }
     321        synchronized (lock) {
     322            _inSockets.remove(sock.getLocalID());
     323            _outSockets.remove(sock.getLocalID());
     324        }
    330325    }
    331326
    332327    public static String getReadableForm(String id) {
    333         try {
    334             if (id.length() != 3) return "Bogus";
    335             return Base64.encode(id.getBytes("ISO-8859-1"));
    336         } catch (UnsupportedEncodingException ex) {
    337             ex.printStackTrace();
    338             return null;
    339         }
     328        try {
     329            if (id.length() != 3) return "Bogus";
     330            return Base64.encode(id.getBytes("ISO-8859-1"));
     331        } catch (UnsupportedEncodingException ex) {
     332            ex.printStackTrace();
     333            return null;
     334        }
    340335    }
    341336
     
    346341     */
    347342    public static String makeID(HashMap uniqueIn) {
    348         String newID;
    349         try {
    350             do {
    351                 int id = (int)(Math.random()*16777215+1);
    352                 byte[] nid = new byte[3];
    353                 nid[0]=(byte)(id / 65536);
    354                 nid[1] = (byte)((id/256) % 256);
    355                 nid[2]= (byte)(id %256);
    356                 newID = new String(nid, "ISO-8859-1");
    357             } while (uniqueIn.get(newID) != null);
    358             return newID;
    359         } catch (UnsupportedEncodingException ex) {
    360             ex.printStackTrace();
    361             return null;
    362         }
     343        String newID;
     344        try {
     345            do {
     346                int id = (int) (Math.random() * 16777215 + 1);
     347                byte[] nid = new byte[3];
     348                nid[0] = (byte) (id / 65536);
     349                nid[1] = (byte) ((id / 256) % 256);
     350                nid[2] = (byte) (id % 256);
     351                newID = new String(nid, "ISO-8859-1");
     352            } while (uniqueIn.get(newID) != null);
     353            return newID;
     354        } catch (UnsupportedEncodingException ex) {
     355            ex.printStackTrace();
     356            return null;
     357        }
    363358    }
    364359
     
    368363     */
    369364    public static byte[] makePacket(byte type, String id, byte[] payload) {
    370         try {
    371             byte[] packet = new byte[payload.length+4];
    372             packet[0]=type;
    373             byte[] temp = id.getBytes("ISO-8859-1");
    374             if (temp.length != 3)
    375                 throw new RuntimeException("Incorrect ID length: "+
    376                                            temp.length);
    377             System.arraycopy(temp,0,packet,1,3);
    378             System.arraycopy(payload,0,packet,4,payload.length);
    379             return packet;
    380         } catch (UnsupportedEncodingException ex) {
    381             if (_log.shouldLog(Log.ERROR))
    382                 _log.error("Error building the packet", ex);
    383             return new byte[0];
    384         }
     365        try {
     366            byte[] packet = new byte[payload.length + 4];
     367            packet[0] = type;
     368            byte[] temp = id.getBytes("ISO-8859-1");
     369            if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length);
     370            System.arraycopy(temp, 0, packet, 1, 3);
     371            System.arraycopy(payload, 0, packet, 4, payload.length);
     372            return packet;
     373        } catch (UnsupportedEncodingException ex) {
     374            if (_log.shouldLog(Log.ERROR)) _log.error("Error building the packet", ex);
     375            return new byte[0];
     376        }
    385377    }
    386378}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java

    r17a1b11 r51c49d7c  
    2323public class I2PSocketManagerFactory {
    2424    private final static Log _log = new Log(I2PSocketManagerFactory.class);
    25    
     25
    2626    /**
    2727     * Create a socket manager using a brand new destination connected to the
     
    3131     */
    3232    public static I2PSocketManager createManager() {
    33         return createManager("localhost", 7654, new Properties());
     33        return createManager("localhost", 7654, new Properties());
    3434    }
    35    
     35
    3636    /**
    3737     * Create a socket manager using a brand new destination connected to the
     
    4141     */
    4242    public static I2PSocketManager createManager(String i2cpHost, int i2cpPort, Properties opts) {
    43         I2PClient client = I2PClientFactory.createClient();
    44         ByteArrayOutputStream keyStream = new ByteArrayOutputStream(512);
    45         try {
    46             Destination dest = client.createDestination(keyStream);
    47             ByteArrayInputStream in = new ByteArrayInputStream(keyStream.toByteArray());
    48             return createManager(in, i2cpHost, i2cpPort, opts);
    49         } catch (IOException ioe) {
    50             _log.error("Error creating the destination for socket manager", ioe);
    51             return null;
    52         } catch (I2PException ie) {
    53             _log.error("Error creating the destination for socket manager", ie);
    54             return null;
    55         }
     43        I2PClient client = I2PClientFactory.createClient();
     44        ByteArrayOutputStream keyStream = new ByteArrayOutputStream(512);
     45        try {
     46            Destination dest = client.createDestination(keyStream);
     47            ByteArrayInputStream in = new ByteArrayInputStream(keyStream.toByteArray());
     48            return createManager(in, i2cpHost, i2cpPort, opts);
     49        } catch (IOException ioe) {
     50            _log.error("Error creating the destination for socket manager", ioe);
     51            return null;
     52        } catch (I2PException ie) {
     53            _log.error("Error creating the destination for socket manager", ie);
     54            return null;
     55        }
    5656    }
    57    
     57
    5858    /**
    5959     * Create a socket manager using the destination loaded from the given private key
     
    6363     * @return the newly created socket manager, or null if there were errors
    6464     */
    65     public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort, Properties opts) {
    66         I2PClient client = I2PClientFactory.createClient();
    67         opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
    68         opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
    69         opts.setProperty(I2PClient.PROP_TCP_PORT, ""+i2cpPort);
    70         try {
    71             I2PSession session = client.createSession(myPrivateKeyStream, opts);
    72             session.connect();
    73             return createManager(session);
    74         } catch (I2PSessionException ise) {
    75             _log.error("Error creating session for socket manager", ise);
    76             return null;
    77         }
     65    public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort,
     66                                                 Properties opts) {
     67        I2PClient client = I2PClientFactory.createClient();
     68        opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
     69        opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
     70        opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort);
     71        try {
     72            I2PSession session = client.createSession(myPrivateKeyStream, opts);
     73            session.connect();
     74            return createManager(session);
     75        } catch (I2PSessionException ise) {
     76            _log.error("Error creating session for socket manager", ise);
     77            return null;
     78        }
    7879    }
    79    
     80
    8081    private static I2PSocketManager createManager(I2PSession session) {
    81         I2PSocketManager mgr = new I2PSocketManager();
    82         mgr.setSession(session);
    83         return mgr;
     82        I2PSocketManager mgr = new I2PSocketManager();
     83        mgr.setSession(session);
     84        return mgr;
    8485    }
    8586}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java

    r17a1b11 r51c49d7c  
    88public class I2PSocketOptions {
    99    private long _connectTimeout;
     10
    1011    public I2PSocketOptions() {
    11         _connectTimeout = -1;
     12        _connectTimeout = -1;
    1213    }
    13    
     14
    1415    /**
    1516     * How long we will wait for the ACK from a SYN, in milliseconds.
     
    1718     * @return milliseconds to wait, or -1 if we will wait indefinitely
    1819     */
    19     public long getConnectTimeout() { return _connectTimeout; }
    20     public void setConnectTimeout(long ms) { _connectTimeout = ms; }
     20    public long getConnectTimeout() {
     21        return _connectTimeout;
     22    }
     23
     24    public void setConnectTimeout(long ms) {
     25        _connectTimeout = ms;
     26    }
    2127}
  • apps/phttprelay/java/src/net/i2p/phttprelay/CheckSendStatusServlet.java

    r17a1b11 r51c49d7c  
    11package net.i2p.phttprelay;
     2
    23/*
    34 * free (adj.): unencumbered; not under the control of others
     
    4546public class CheckSendStatusServlet extends PHTTPRelayServlet {
    4647    /* URL parameters on the check */
    47    
     48
    4849    /** H(routerIdent).toBase64() of the target to receive the message */
    49     public final static String PARAM_SEND_TARGET = "target"; 
     50    public final static String PARAM_SEND_TARGET = "target";
    5051    /** msgId parameter */
    5152    public final static String PARAM_MSG_ID = "msgId";
     
    5455    public final static String STATUS_PENDING = "pending";
    5556    public final static String STATUS_UNKNOWN = "unknown";
    56    
     57
    5758    public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    58         String target = req.getParameter(PARAM_SEND_TARGET);
    59         String msgIdStr = req.getParameter(PARAM_MSG_ID);
    60        
    61         log("Checking status of [" + target + "] message [" + msgIdStr + "]");
    62         if (!isKnownMessage(target, msgIdStr)) {
    63             log("Not known - its not pending");
    64             notPending(req, resp);
    65             return;
    66         } else {
    67             log("Known - its still pending");
    68             pending(req, resp);
    69             return;
    70         }
     59        String target = req.getParameter(PARAM_SEND_TARGET);
     60        String msgIdStr = req.getParameter(PARAM_MSG_ID);
     61
     62        log("Checking status of [" + target + "] message [" + msgIdStr + "]");
     63        if (!isKnownMessage(target, msgIdStr)) {
     64            log("Not known - its not pending");
     65            notPending(req, resp);
     66            return;
     67        } else {
     68            log("Known - its still pending");
     69            pending(req, resp);
     70            return;
     71        }
    7172    }
    72    
     73
    7374    private boolean isKnownMessage(String target, String msgId) throws IOException {
    74         if ( (target == null) || (target.trim().length() <= 0) ) return false;
    75         if ( (msgId == null) || (msgId.trim().length() <= 0) ) return false;
    76         File identDir = getIdentDir(target);
    77         if (identDir.exists()) {
    78             File identFile = new File(identDir, "identity.dat");
    79             if (identFile.exists()) {
    80                 // known and valid (maybe we need to check the file format... naw, fuck it
    81                 File msgFile = new File(identDir, "msg" + msgId + ".dat");
    82                 if (msgFile.exists())
    83                     return true;
    84                 else
    85                     return false;
    86             } else {
    87                 return false;
    88             }
    89         } else {
    90             return false;
    91         }
     75        if ((target == null) || (target.trim().length() <= 0)) return false;
     76        if ((msgId == null) || (msgId.trim().length() <= 0)) return false;
     77        File identDir = getIdentDir(target);
     78        if (identDir.exists()) {
     79            File identFile = new File(identDir, "identity.dat");
     80            if (identFile.exists()) {
     81                // known and valid (maybe we need to check the file format... naw, fuck it
     82                File msgFile = new File(identDir, "msg" + msgId + ".dat");
     83                if (msgFile.exists())
     84                    return true;
     85                else
     86                    return false;
     87            } else {
     88                return false;
     89            }
     90        } else {
     91            return false;
     92        }
    9293    }
    93    
     94
    9495    private void pending(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    95         resp.setStatus(HttpServletResponse.SC_OK);
    96         ServletOutputStream out = resp.getOutputStream();
    97         StringBuffer buf = new StringBuffer();
    98         buf.append(PROP_STATUS).append('=').append(STATUS_PENDING).append('\n');
    99         out.write(buf.toString().getBytes());
    100         out.flush();
    101         out.close();
     96        resp.setStatus(HttpServletResponse.SC_OK);
     97        ServletOutputStream out = resp.getOutputStream();
     98        StringBuffer buf = new StringBuffer();
     99        buf.append(PROP_STATUS).append('=').append(STATUS_PENDING).append('\n');
     100        out.write(buf.toString().getBytes());
     101        out.flush();
     102        out.close();
    102103    }
    103    
     104
    104105    private void notPending(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    105         resp.setStatus(HttpServletResponse.SC_OK);
    106         ServletOutputStream out = resp.getOutputStream();
    107         StringBuffer buf = new StringBuffer();
    108         buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n');
    109         out.write(buf.toString().getBytes());
    110         out.flush();
    111         out.close();
     106        resp.setStatus(HttpServletResponse.SC_OK);
     107        ServletOutputStream out = resp.getOutputStream();
     108        StringBuffer buf = new StringBuffer();
     109        buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n');
     110        out.write(buf.toString().getBytes());
     111        out.flush();
     112        out.close();
    112113    }
    113114}
  • apps/phttprelay/java/src/net/i2p/phttprelay/LockManager.java

    r17a1b11 r51c49d7c  
    11package net.i2p.phttprelay;
     2
    23/*
    34 * free (adj.): unencumbered; not under the control of others
     
    1920class LockManager {
    2021    private volatile static Set _locks = new HashSet(); // target
    21    
     22
    2223    public static void lockIdent(String target) {
    23         while (true) {
    24             synchronized (_locks) {
    25                 if (!_locks.contains(target)) {
    26                     _locks.add(target);
    27                     return;
    28                 }
    29                 try { _locks.wait(1000); } catch (InterruptedException ie) {}
    30             }
    31         }
     24        while (true) {
     25            synchronized (_locks) {
     26                if (!_locks.contains(target)) {
     27                    _locks.add(target);
     28                    return;
     29                }
     30                try {
     31                    _locks.wait(1000);
     32                } catch (InterruptedException ie) {
     33                }
     34            }
     35        }
    3236    }
    33    
     37
    3438    public static void unlockIdent(String target) {
    35         synchronized (_locks) {
    36             _locks.remove(target);
    37             _locks.notifyAll();
    38         }
     39        synchronized (_locks) {
     40            _locks.remove(target);
     41            _locks.notifyAll();
     42        }
    3943    }
    4044}
  • apps/phttprelay/java/src/net/i2p/phttprelay/PHTTPRelayServlet.java

    r17a1b11 r51c49d7c  
    11package net.i2p.phttprelay;
     2
    23/*
    34 * free (adj.): unencumbered; not under the control of others
     
    2627    /*public final static String PARAM_BASEDIR = "baseDir";*/
    2728    public final static String ENV_BASEDIR = "phttpRelay.baseDir";
    28    
     29
    2930    /** match the clock fudge factor on the router, rather than importing the entire router cvs module */
    30     public final static long CLOCK_FUDGE_FACTOR = 1*60*1000;
    31        
     31    public final static long CLOCK_FUDGE_FACTOR = 1 * 60 * 1000;
     32
    3233    protected String buildURL(HttpServletRequest req, String path) {
    33         StringBuffer buf = new StringBuffer();
    34         buf.append(req.getScheme()).append("://");
    35         buf.append(req.getServerName()).append(":").append(req.getServerPort());
    36         buf.append(req.getContextPath());
    37         buf.append(path);
    38         log("URL built: " + buf.toString());
    39         return buf.toString();
     34        StringBuffer buf = new StringBuffer();
     35        buf.append(req.getScheme()).append("://");
     36        buf.append(req.getServerName()).append(":").append(req.getServerPort());
     37        buf.append(req.getContextPath());
     38        buf.append(path);
     39        log("URL built: " + buf.toString());
     40        return buf.toString();
    4041    }
    41    
     42
    4243    protected File getIdentDir(String target) throws IOException {
    43         if ( (_baseDir == null) || (target == null) ) throw new IOException("dir not specified to deal with");
    44         File baseDir = new File(_baseDir);
    45         if (!baseDir.exists()) {
    46             boolean created = baseDir.mkdirs();
    47             log("Creating PHTTP Relay Base Directory: " + baseDir.getAbsolutePath() + " - ok? " + created);
    48         }
    49         File identDir = new File(baseDir, target);
    50         log("Ident dir: " + identDir.getAbsolutePath());
    51         return identDir;
     44        if ((_baseDir == null) || (target == null)) throw new IOException("dir not specified to deal with");
     45        File baseDir = new File(_baseDir);
     46        if (!baseDir.exists()) {
     47            boolean created = baseDir.mkdirs();
     48            log("Creating PHTTP Relay Base Directory: " + baseDir.getAbsolutePath() + " - ok? " + created);
     49        }
     50        File identDir = new File(baseDir, target);
     51        log("Ident dir: " + identDir.getAbsolutePath());
     52        return identDir;
    5253    }
    53    
     54
    5455    public void init(ServletConfig config) throws ServletException {
    55         super.init(config);
    56         String dir = System.getProperty(ENV_BASEDIR);
    57         if (dir == null) {
    58             _log.warn("Base directory for the polling http relay system not in the environment [" + ENV_BASEDIR +"]");
    59             _log.warn("Setting the base directory to ./relayDir for " + getServletName());
    60             _baseDir = ".relayDir";
    61         } else {
    62             _baseDir = dir;
    63             log("Loaded up " + getServletName() + " with base directory " + _baseDir);
    64         }
     56        super.init(config);
     57        String dir = System.getProperty(ENV_BASEDIR);
     58        if (dir == null) {
     59            _log.warn("Base directory for the polling http relay system not in the environment [" + ENV_BASEDIR + "]");
     60            _log.warn("Setting the base directory to ./relayDir for " + getServletName());
     61            _baseDir = ".relayDir";
     62        } else {
     63            _baseDir = dir;
     64            log("Loaded up " + getServletName() + " with base directory " + _baseDir);
     65        }
    6566    }
    66    
     67
    6768    public void log(String msg) {
    68         _log.debug(msg);
     69        _log.debug(msg);
    6970    }
     71
    7072    public void log(String msg, Throwable t) {
    71         _log.debug(msg, t);
     73        _log.debug(msg, t);
    7274    }
    7375}
  • apps/phttprelay/java/src/net/i2p/phttprelay/PollServlet.java

    r17a1b11 r51c49d7c  
    11package net.i2p.phttprelay;
     2
    23/*
    34 * free (adj.): unencumbered; not under the control of others
     
    5657 *
    5758 */
    58 public class PollServlet extends PHTTPRelayServlet {   
     59public class PollServlet extends PHTTPRelayServlet {
    5960    /* URL parameters on the check */
    60    
     61
    6162    /** H(routerIdent).toBase64() of the target to receive the message */
    62     public final static String PARAM_SEND_TARGET = "target"; 
    63    
     63    public final static String PARAM_SEND_TARGET = "target";
     64
    6465    /** HTTP error code if the target is not known*/
    6566    public final static int CODE_UNKNOWN = HttpServletResponse.SC_NOT_FOUND;
     
    6869    /** HTTP error code if everything is ok */
    6970    public final static int CODE_OK = HttpServletResponse.SC_OK;
    70    
     71
    7172    public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    72         byte data[] = getData(req);
    73         if (data == null) return;
    74         ByteArrayInputStream bais = new ByteArrayInputStream(data);
    75         String target = getTarget(bais);
    76         if (target == null) {
    77             log("Target not specified");
    78             resp.sendError(CODE_UNKNOWN);
    79             return;
    80         }
    81        
    82         if (!isKnown(target)) {
    83             resp.sendError(CODE_UNKNOWN);
    84             return;
    85         }
    86        
    87         if (!isAuthorized(target, bais)) {
    88             resp.sendError(CODE_UNAUTHORIZED);
    89             return;
    90         } else {
    91             log("Authorized access for target " + target);
    92         }
    93          
    94         sendMessages(resp, target);
    95     }
    96  
     73        byte data[] = getData(req);
     74        if (data == null) return;
     75        ByteArrayInputStream bais = new ByteArrayInputStream(data);
     76        String target = getTarget(bais);
     77        if (target == null) {
     78            log("Target not specified");
     79            resp.sendError(CODE_UNKNOWN);
     80            return;
     81        }
     82
     83        if (!isKnown(target)) {
     84            resp.sendError(CODE_UNKNOWN);
     85            return;
     86        }
     87
     88        if (!isAuthorized(target, bais)) {
     89            resp.sendError(CODE_UNAUTHORIZED);
     90            return;
     91        } else {
     92            log("Authorized access for target " + target);
     93        }
     94
     95        sendMessages(resp, target);
     96    }
     97
    9798    private byte[] getData(HttpServletRequest req) throws ServletException, IOException {
    98         ServletInputStream in = req.getInputStream();
    99         int len = req.getContentLength();
    100         byte data[] = new byte[len];
    101         int cur = 0;
    102         int read = DataHelper.read(in, data);
    103         if (read != len) {
    104             log("Size read is incorrect [" + read + " instead of expected " + len + "]");
    105             return null;
    106         } else {
    107             log("Read data length: " + data.length + " in base64: " + Base64.encode(data));
    108             return data;
    109         }
    110     }
    111    
     99        ServletInputStream in = req.getInputStream();
     100        int len = req.getContentLength();
     101        byte data[] = new byte[len];
     102        int cur = 0;
     103        int read = DataHelper.read(in, data);
     104        if (read != len) {
     105            log("Size read is incorrect [" + read + " instead of expected " + len + "]");
     106            return null;
     107        } else {
     108            log("Read data length: " + data.length + " in base64: " + Base64.encode(data));
     109            return data;
     110        }
     111    }
     112
    112113    private String getTarget(InputStream in) throws IOException {
    113         StringBuffer buf = new StringBuffer(64);
    114         int numBytes = 0;
    115         int c = 0;
    116         while ( (c = in.read()) != -1) {
    117             if (c == (int)'&') break;
    118             buf.append((char)c);
    119             numBytes++;
    120             if (numBytes > 128) {
    121                 log("Target didn't find the & after 128 bytes [" + buf.toString() + "]");
    122                 return null;
    123             }
    124         }
    125         if (buf.toString().indexOf("target=") != 0) {
    126             log("Did not start with target= [" + buf.toString() + "]");
    127             return null;
    128         }
    129         return buf.substring("target=".length());
    130     }
    131    
     114        StringBuffer buf = new StringBuffer(64);
     115        int numBytes = 0;
     116        int c = 0;
     117        while ((c = in.read()) != -1) {
     118            if (c == (int) '&') break;
     119            buf.append((char) c);
     120            numBytes++;
     121            if (numBytes > 128) {
     122                log("Target didn't find the & after 128 bytes [" + buf.toString() + "]");
     123                return null;
     124            }
     125        }
     126        if (buf.toString().indexOf("target=") != 0) {
     127            log("Did not start with target= [" + buf.toString() + "]");
     128            return null;
     129        }
     130        return buf.substring("target=".length());
     131    }
     132
    132133    private void sendMessages(HttpServletResponse resp, String target) throws IOException {
    133         log("Before lock " + target);
    134         LockManager.lockIdent(target);
    135         log("Locked " + target);
    136         try {
    137             File identDir = getIdentDir(target);
    138             expire(identDir);
    139             File messageFiles[] = identDir.listFiles();
    140             resp.setStatus(CODE_OK);
    141             log("Sending back " + (messageFiles.length -1) + " messages");
    142             ServletOutputStream out = resp.getOutputStream();
    143             DataHelper.writeDate(out, new Date(Clock.getInstance().now()));
    144             DataHelper.writeLong(out, 2, messageFiles.length -1);
    145             for (int i = 0; i < messageFiles.length; i++) {
    146                 if ("identity.dat".equals(messageFiles[i].getName())) {
    147                     // skip
    148                 } else {
    149                     log("Message file " + messageFiles[i].getName() + " is " + messageFiles[i].length() + " bytes");
    150                     DataHelper.writeLong(out, 4, messageFiles[i].length());
    151                     writeFile(out, messageFiles[i]);
    152                     boolean deleted = messageFiles[i].delete();
    153                     if (!deleted) {
    154                         log("!!!Error removing message file " + messageFiles[i].getAbsolutePath() + " - please delete!");
    155                     }
    156                 }
    157             }
    158             out.flush();
    159             out.close();
    160         } catch (DataFormatException dfe) {
    161             log("Error sending message", dfe);
    162         } finally {
    163             LockManager.unlockIdent(target);
    164             log("Unlocked " + target);
    165         }
    166     }
    167    
    168     private final static long EXPIRE_DELAY = 60*1000; // expire messages every minute
    169    
     134        log("Before lock " + target);
     135        LockManager.lockIdent(target);
     136        log("Locked " + target);
     137        try {
     138            File identDir = getIdentDir(target);
     139            expire(identDir);
     140            File messageFiles[] = identDir.listFiles();
     141            resp.setStatus(CODE_OK);
     142            log("Sending back " + (messageFiles.length - 1) + " messages");
     143            ServletOutputStream out = resp.getOutputStream();
     144            DataHelper.writeDate(out, new Date(Clock.getInstance().now()));
     145            DataHelper.writeLong(out, 2, messageFiles.length - 1);
     146            for (int i = 0; i < messageFiles.length; i++) {
     147                if ("identity.dat".equals(messageFiles[i].getName())) {
     148                    // skip
     149                } else {
     150                    log("Message file " + messageFiles[i].getName() + " is " + messageFiles[i].length() + " bytes");
     151                    DataHelper.writeLong(out, 4, messageFiles[i].length());
     152                    writeFile(out, messageFiles[i]);
     153                    boolean deleted = messageFiles[i].delete();
     154                    if (!deleted) {
     155                        log("!!!Error removing message file " + messageFiles[i].getAbsolutePath() + " - please delete!");
     156                    }
     157                }
     158            }
     159            out.flush();
     160            out.close();
     161        } catch (DataFormatException dfe) {
     162            log("Error sending message", dfe);
     163        } finally {
     164            LockManager.unlockIdent(target);
     165            log("Unlocked " + target);
     166        }
     167    }
     168
     169    private final static long EXPIRE_DELAY = 60 * 1000; // expire messages every minute
     170
    170171    private void expire(File identDir) throws IOException {
    171         File files[] = identDir.listFiles();
    172         long now = System.currentTimeMillis();
    173         for (int i = 0 ; i < files.length; i++) {
    174             if ("identity.dat".equals(files[i].getName())) {
    175                 continue;
    176             }
    177             if (files[i].lastModified() + EXPIRE_DELAY < now) {
    178                 log("Expiring " + files[i].getAbsolutePath());
    179                 files[i].delete();
    180             }
    181         }
    182     }
    183    
     172        File files[] = identDir.listFiles();
     173        long now = System.currentTimeMillis();
     174        for (int i = 0; i < files.length; i++) {
     175            if ("identity.dat".equals(files[i].getName())) {
     176                continue;
     177            }
     178            if (files[i].lastModified() + EXPIRE_DELAY < now) {
     179                log("Expiring " + files[i].getAbsolutePath());
     180                files[i].delete();
     181            }
     182        }
     183    }
     184
    184185    private void writeFile(ServletOutputStream out, File file) throws IOException {
    185         FileInputStream fis = new FileInputStream(file);
    186         try {
    187             byte buf[] = new byte[4096];
    188             while (true) {
    189                 int read = DataHelper.read(fis, buf);
    190                 if (read > 0)
    191                     out.write(buf, 0, read);
    192                 else
    193                     break;
    194             }
    195         } finally {
    196             fis.close();
    197         }
    198     }
    199    
    200    
     186        FileInputStream fis = new FileInputStream(file);
     187        try {
     188            byte buf[] = new byte[4096];
     189            while (true) {
     190                int read = DataHelper.read(fis, buf);
     191                if (read > 0)
     192                    out.write(buf, 0, read);
     193                else
     194                    break;
     195            }
     196        } finally {
     197            fis.close();
     198        }
     199    }
     200
    201201    private boolean isKnown(String target) throws IOException {
    202         File identDir = getIdentDir(target);
    203         if (identDir.exists()) {
    204             File identFile = new File(identDir, "identity.dat");
    205             if (identFile.exists()) {
    206                 // known and valid (maybe we need to check the file format... naw, fuck it
    207                 return true;
    208             } else {
    209                 return false;
    210             }
    211         } else {
    212             return false;
    213         }
    214     }
    215    
     202        File identDir = getIdentDir(target);
     203        if (identDir.exists()) {
     204            File identFile = new File(identDir, "identity.dat");
     205            if (identFile.exists()) {
     206                // known and valid (maybe we need to check the file format... naw, fuck it
     207                return true;
     208            } else {
     209                return false;
     210            }
     211        } else {
     212            return false;
     213        }
     214    }
     215
    216216    private boolean isAuthorized(String target, InputStream in) throws IOException {
    217         RouterIdentity ident = null;
    218         try {
    219             ident = getRouterIdentity(target);
    220         } catch (DataFormatException dfe) {
    221             log("Identity was not valid", dfe);
    222         }
    223        
    224         if (ident == null) {
    225             log("Identity not registered");
    226             return false;
    227         }
    228        
    229         try {
    230             long val = DataHelper.readLong(in, 4);
    231             Signature sig = new Signature();
    232             sig.readBytes(in);
    233             ByteArrayOutputStream baos = new ByteArrayOutputStream();
    234             DataHelper.writeLong(baos, 4, val);
    235             if (DSAEngine.getInstance().verifySignature(sig, baos.toByteArray(), ident.getSigningPublicKey())) {
    236                 return true;
    237             } else {
    238                 log("Signature does NOT match");
    239                 return false;
    240             }
    241         } catch (DataFormatException dfe) {
    242             log("Format error reading the nonce and signature", dfe);
    243             return false;
    244         }
    245     }
    246    
     217        RouterIdentity ident = null;
     218        try {
     219            ident = getRouterIdentity(target);
     220        } catch (DataFormatException dfe) {
     221            log("Identity was not valid", dfe);
     222        }
     223
     224        if (ident == null) {
     225            log("Identity not registered");
     226            return false;
     227        }
     228
     229        try {
     230            long val = DataHelper.readLong(in, 4);
     231            Signature sig = new Signature();
     232            sig.readBytes(in);
     233            ByteArrayOutputStream baos = new ByteArrayOutputStream();
     234            DataHelper.writeLong(baos, 4, val);
     235            if (DSAEngine.getInstance().verifySignature(sig, baos.toByteArray(), ident.getSigningPublicKey())) {
     236                return true;
     237            } else {
     238                log("Signature does NOT match");
     239                return false;
     240            }
     241        } catch (DataFormatException dfe) {
     242            log("Format error reading the nonce and signature", dfe);
     243            return false;
     244        }
     245    }
     246
    247247    private RouterIdentity getRouterIdentity(String target) throws IOException, DataFormatException {
    248         File identDir = getIdentDir(target);
    249         if (identDir.exists()) {
    250             File identFile = new File(identDir, "identity.dat");
    251             if (identFile.exists()) {
    252                 // known and valid (maybe we need to check the file format... naw, fuck it
    253                 RouterIdentity ident = new RouterIdentity();
    254                 ident.readBytes(new FileInputStream(identFile));
    255                 return ident;
    256             } else {
    257                 return null;
    258             }
    259         } else {
    260             return null;
    261         }
     248        File identDir = getIdentDir(target);
     249        if (identDir.exists()) {
     250            File identFile = new File(identDir, "identity.dat");
     251            if (identFile.exists()) {
     252                // known and valid (maybe we need to check the file format... naw, fuck it
     253                RouterIdentity ident = new RouterIdentity();
     254                ident.readBytes(new FileInputStream(identFile));
     255                return ident;
     256            } else {
     257                return null;
     258            }
     259        } else {
     260            return null;
     261        }
    262262    }
    263263}
  • apps/phttprelay/java/src/net/i2p/phttprelay/RegisterServlet.java

    r17a1b11 r51c49d7c  
    11package net.i2p.phttprelay;
     2
    23/*
    34 * free (adj.): unencumbered; not under the control of others
     
    6768    public final static String PARAM_POLL_PATH = "pollPath";
    6869    public final static String PARAM_SEND_PATH = "sendPath";
    69    
     70
    7071    /* key=val keys sent back on registration */
    71     public final static String PROP_STATUS   = "status";
     72    public final static String PROP_STATUS = "status";
    7273    public final static String PROP_POLL_URL = "pollURL";
    7374    public final static String PROP_SEND_URL = "sendURL";
     
    7576
    7677    /* values for the PROP_STATUS */
    77     public final static String STATUS_FAILED     = "failed";
     78    public final static String STATUS_FAILED = "failed";
    7879    public final static String STATUS_REGISTERED = "registered";
    79    
     80
    8081    public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    81         ServletInputStream in = req.getInputStream();
    82         RouterIdentity ident = new RouterIdentity();
    83         try {
    84             Date remoteTime = DataHelper.readDate(in);
    85             long skew = getSkew(remoteTime);
    86             ident.readBytes(in);
    87             boolean ok = registerIdent(ident);
    88             sendURLs(req, resp, skew, ok);
    89         } catch (DataFormatException dfe) {
    90             log("Invalid format for router identity posted", dfe);
    91         } finally {
    92             in.close();
    93         }
     82        ServletInputStream in = req.getInputStream();
     83        RouterIdentity ident = new RouterIdentity();
     84        try {
     85            Date remoteTime = DataHelper.readDate(in);
     86            long skew = getSkew(remoteTime);
     87            ident.readBytes(in);
     88            boolean ok = registerIdent(ident);
     89            sendURLs(req, resp, skew, ok);
     90        } catch (DataFormatException dfe) {
     91            log("Invalid format for router identity posted", dfe);
     92        } finally {
     93            in.close();
     94        }
    9495    }
    95    
     96
    9697    private long getSkew(Date remoteDate) {
    97         if (remoteDate == null) {
    98             log("*ERROR: remote date was null");
    99             return Long.MAX_VALUE;
    100         } else {
    101             long diff = Clock.getInstance().now() - remoteDate.getTime();
    102             return diff;
    103         }
     98        if (remoteDate == null) {
     99            log("*ERROR: remote date was null");
     100            return Long.MAX_VALUE;
     101        } else {
     102            long diff = Clock.getInstance().now() - remoteDate.getTime();
     103            return diff;
     104        }
    104105    }
    105    
     106
    106107    private boolean registerIdent(RouterIdentity ident) throws DataFormatException, IOException {
    107         File identDir = getIdentDir(ident.getHash().toBase64());
    108         boolean created = identDir.mkdirs();
    109         File identFile = new File(identDir, "identity.dat");
    110         FileOutputStream fos = null;
    111         try {
    112             fos = new FileOutputStream(identFile);
    113             ident.writeBytes(fos);
    114         } finally {
    115             if (fos != null) try { fos.close(); } catch (IOException ioe) {}
    116         }
    117         log("Identity registered into " + identFile.getAbsolutePath());
    118         return true;
     108        File identDir = getIdentDir(ident.getHash().toBase64());
     109        boolean created = identDir.mkdirs();
     110        File identFile = new File(identDir, "identity.dat");
     111        FileOutputStream fos = null;
     112        try {
     113            fos = new FileOutputStream(identFile);
     114            ident.writeBytes(fos);
     115        } finally {
     116            if (fos != null) try {
     117                fos.close();
     118            } catch (IOException ioe) {
     119            }
     120        }
     121        log("Identity registered into " + identFile.getAbsolutePath());
     122        return true;
    119123    }
    120    
     124
    121125    private void sendURLs(HttpServletRequest req, HttpServletResponse resp, long skew, boolean ok) throws IOException {
    122         ServletOutputStream out = resp.getOutputStream();
     126        ServletOutputStream out = resp.getOutputStream();
    123127
    124         log("*Debug: clock skew of " + skew + "ms (local-remote)");
    125        
    126         StringBuffer buf = new StringBuffer();
    127         if (ok) {
    128             buf.append(PROP_POLL_URL).append("=").append(buildURL(req, _pollPath)).append("\n");
    129             buf.append(PROP_SEND_URL).append("=").append(buildURL(req, _sendPath)).append("\n");
    130             buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n");
    131             buf.append(PROP_STATUS).append("=").append(STATUS_REGISTERED).append("\n");
    132         } else {
    133             buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n");
    134             buf.append(PROP_STATUS).append("=").append(STATUS_FAILED).append("\n");
    135         }
    136         out.write(buf.toString().getBytes());
    137         out.close();
     128        log("*Debug: clock skew of " + skew + "ms (local-remote)");
     129
     130        StringBuffer buf = new StringBuffer();
     131        if (ok) {
     132            buf.append(PROP_POLL_URL).append("=").append(buildURL(req, _pollPath)).append("\n");
     133            buf.append(PROP_SEND_URL).append("=").append(buildURL(req, _sendPath)).append("\n");
     134            buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n");
     135            buf.append(PROP_STATUS).append("=").append(STATUS_REGISTERED).append("\n");
     136        } else {
     137            buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n");
     138            buf.append(PROP_STATUS).append("=").append(STATUS_FAILED).append("\n");
     139        }
     140        out.write(buf.toString().getBytes());
     141        out.close();
    138142    }
    139    
     143
    140144    public void init(ServletConfig config) throws ServletException {
    141         super.init(config);
    142        
    143         String pollPath = config.getInitParameter(PARAM_POLL_PATH);
    144         if (pollPath == null)
    145             throw new ServletException("Polling path for the registration servlet required [" + PARAM_POLL_PATH + "]");
    146         else
    147             _pollPath = pollPath;
    148         String sendPath = config.getInitParameter(PARAM_SEND_PATH);
    149         if (sendPath == null)
    150             throw new ServletException("Sending path for the registration servlet required [" + PARAM_SEND_PATH + "]");
    151         else
    152             _sendPath = sendPath;
     145        super.init(config);
     146
     147        String pollPath = config.getInitParameter(PARAM_POLL_PATH);
     148        if (pollPath == null)
     149            throw new ServletException("Polling path for the registration servlet required [" + PARAM_POLL_PATH + "]");
     150        else
     151            _pollPath = pollPath;
     152        String sendPath = config.getInitParameter(PARAM_SEND_PATH);
     153        if (sendPath == null)
     154            throw new ServletException("Sending path for the registration servlet required [" + PARAM_SEND_PATH + "]");
     155        else
     156            _sendPath = sendPath;
    153157    }
    154158}
  • apps/phttprelay/java/src/net/i2p/phttprelay/SendServlet.java

    r17a1b11 r51c49d7c  
    11package net.i2p.phttprelay;
     2
    23/*
    34 * free (adj.): unencumbered; not under the control of others
     
    6465    private String _checkPath;
    6566    private int _maxMessagesPerIdent;
    66    
     67
    6768    /* config params */
    6869    public final static String PARAM_CHECK_PATH = "checkPath";
    6970    public final static String PARAM_MAX_MESSAGES_PER_IDENT = "maxMessagesPerIdent";
    70    
     71
    7172    /* URL parameters on the send */
    72    
     73
    7374    /** H(routerIdent).toBase64() of the target to receive the message */
    74     public final static String PARAM_SEND_TARGET = "target"; 
     75    public final static String PARAM_SEND_TARGET = "target";
    7576    /** # ms to wait for the message to be delivered before failing it */
    7677    public final static String PARAM_SEND_TIMEOUTMS = "timeoutMs";
     
    7980    /** sending router's time in ms */
    8081    public final static String PARAM_SEND_TIME = "localTime";
    81    
     82
    8283    /** msgId parameter to access the check path servlet with (along side PARAM_SEND_TARGET) */
    8384    public final static String PARAM_MSG_ID = "msgId";
    8485
    85    
    8686    /* key=val keys sent back on registration */
    8787    public final static String PROP_CHECK_URL = "statusCheckURL";
     
    8989    public final static String STATUS_OK = "accepted";
    9090    public final static String STATUS_UNKNOWN = "unknown";
    91     private final static String STATUS_CLOCKSKEW = "clockSkew_"; /** prefix for (local-remote) */
    92    
     91    private final static String STATUS_CLOCKSKEW = "clockSkew_";
     92
     93    /** prefix for (local-remote) */
     94
    9395    public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    94         ServletInputStream in = req.getInputStream();
    95         try {
    96             int contentLen = req.getContentLength();
    97             String firstLine = getFirstLine(in, contentLen);
    98             if (firstLine == null) {
    99                 return;
    100             }
    101             Map params = getParameters(firstLine);
    102             String target = (String)params.get(PARAM_SEND_TARGET);
    103             String timeoutStr = (String)params.get(PARAM_SEND_TIMEOUTMS);
    104             String lenStr = (String)params.get(PARAM_SEND_DATA_LENGTH);
    105             String remoteTimeStr = (String)params.get(PARAM_SEND_TIME);
    106             long skew = 0;
    107             try {
    108                 long remTime = Long.parseLong(remoteTimeStr);
    109                 skew = System.currentTimeMillis() - remTime;
    110             } catch (Throwable t) {
    111                 skew = Long.MAX_VALUE;
    112                 log("*ERROR could not parse the remote time from [" + remoteTimeStr + "]");
    113             }
    114 
    115             log("Target [" + target + "] timeout [" + timeoutStr + "] length [" + lenStr + "] skew [" + skew + "]");
    116 
    117             if ( (skew > CLOCK_FUDGE_FACTOR) || (skew < 0 - CLOCK_FUDGE_FACTOR) ) {
    118                 log("Attempt to send by a skewed router: skew = " + skew + "ms (local-remote)");
    119                 failSkewed(req, resp, skew);
    120             }
    121            
    122             if (!isValidTarget(target)) {
    123                 log("Attempt to send to an invalid target [" + target + "]");
    124                 fail(req, resp, "Unknown or invalid target");
    125                 return;
    126             }
    127 
    128             long len = -1;
    129             try {
    130                 len = Long.parseLong(lenStr);
    131             } catch (Throwable t) {
    132                 log("Unable to parse length parameter [" + PARAM_SEND_DATA_LENGTH + "] (" + lenStr + ")");
    133                 fail(req, resp, "Invalid length parameter");
    134                 return;
    135             }
    136 
    137             int msgId = saveFile(in, resp, target, len);
    138             if (msgId >= 0) {
    139                 sendSuccess(req, resp, target, msgId);
    140             } else {
    141                 fail(req, resp, "Unable to queue up the message for delivery");
    142             }
    143         } finally {
    144             try { in.close(); } catch (IOException ioe) {}
    145         }
    146     }
    147    
    148    
     96        ServletInputStream in = req.getInputStream();
     97        try {
     98            int contentLen = req.getContentLength();
     99            String firstLine = getFirstLine(in, contentLen);
     100            if (firstLine == null) { return; }
     101            Map params = getParameters(firstLine);
     102            String target = (String) params.get(PARAM_SEND_TARGET);
     103            String timeoutStr = (String) params.get(PARAM_SEND_TIMEOUTMS);
     104            String lenStr = (String) params.get(PARAM_SEND_DATA_LENGTH);
     105            String remoteTimeStr = (String) params.get(PARAM_SEND_TIME);
     106            long skew = 0;
     107            try {
     108                long remTime = Long.parseLong(remoteTimeStr);
     109                skew = System.currentTimeMillis() - remTime;
     110            } catch (Throwable t) {
     111                skew = Long.MAX_VALUE;
     112                log("*ERROR could not parse the remote time from [" + remoteTimeStr + "]");
     113            }
     114
     115            log("Target [" + target + "] timeout [" + timeoutStr + "] length [" + lenStr + "] skew [" + skew + "]");
     116
     117            if ((skew > CLOCK_FUDGE_FACTOR) || (skew < 0 - CLOCK_FUDGE_FACTOR)) {
     118                log("Attempt to send by a skewed router: skew = " + skew + "ms (local-remote)");
     119                failSkewed(req, resp, skew);
     120            }
     121
     122            if (!isValidTarget(target)) {
     123                log("Attempt to send to an invalid target [" + target + "]");
     124                fail(req, resp, "Unknown or invalid target");
     125                return;
     126            }
     127
     128            long len = -1;
     129            try {
     130                len = Long.parseLong(lenStr);
     131            } catch (Throwable t) {
     132                log("Unable to parse length parameter [" + PARAM_SEND_DATA_LENGTH + "] (" + lenStr + ")");
     133                fail(req, resp, "Invalid length parameter");
     134                return;
     135            }
     136
     137            int msgId = saveFile(in, resp, target, len);
     138            if (msgId >= 0) {
     139                sendSuccess(req, resp, target, msgId);
     140            } else {
     141                fail(req, resp, "Unable to queue up the message for delivery");
     142            }
     143        } finally {
     144            try {
     145                in.close();
     146            } catch (IOException ioe) {
     147            }
     148        }
     149    }
     150
    149151    private String getFirstLine(ServletInputStream in, int len) throws ServletException, IOException {
    150         StringBuffer buf = new StringBuffer(128);
    151         int numBytes = 0;
    152         int c = 0;
    153         while ( (c = in.read()) != -1) {
    154             if (c == (int)'\n') break;
    155             buf.append((char)c);
    156             numBytes++;
    157             if (numBytes > 512) {
    158                 log("First line is > 512 bytes [" + buf.toString() + "]");
    159                 return null;
    160             }
    161         }
    162         log("First line: " + buf.toString());
    163         return buf.toString();
    164     }
    165    
     152        StringBuffer buf = new StringBuffer(128);
     153        int numBytes = 0;
     154        int c = 0;
     155        while ((c = in.read()) != -1) {
     156            if (c == (int) '\n') break;
     157            buf.append((char) c);
     158            numBytes++;
     159            if (numBytes > 512) {
     160                log("First line is > 512 bytes [" + buf.toString() + "]");
     161                return null;
     162            }
     163        }
     164        log("First line: " + buf.toString());
     165        return buf.toString();
     166    }
     167
    166168    private static Map getParameters(String line) {
    167         //StringTokenizer tok = new StringTokenizer(line, "&=", true);
    168         Map params = new HashMap();
    169         while (line != null) {
    170             String key = null;
    171             String val = null;
    172             int firstAmp = line.indexOf('&');
    173             int firstEq = line.indexOf('=');
    174             if (firstAmp > 0) {
    175                 key = line.substring(0, firstEq);
    176                 val = line.substring(firstEq+1, firstAmp);
    177                 line = line.substring(firstAmp+1);
    178                 params.put(key, val);
    179             } else {
    180                 line = null;
    181             }
    182         }
    183         return params;
    184     }
    185    
     169        //StringTokenizer tok = new StringTokenizer(line, "&=", true);
     170        Map params = new HashMap();
     171        while (line != null) {
     172            String key = null;
     173            String val = null;
     174            int firstAmp = line.indexOf('&');
     175            int firstEq = line.indexOf('=');
     176            if (firstAmp > 0) {
     177                key = line.substring(0, firstEq);
     178                val = line.substring(firstEq + 1, firstAmp);
     179                line = line.substring(firstAmp + 1);
     180                params.put(key, val);
     181            } else {
     182                line = null;
     183            }
     184        }
     185        return params;
     186    }
     187
    186188    private boolean isValidTarget(String target) throws IOException {
    187         File identDir = getIdentDir(target);
    188         if (identDir.exists()) {
    189             File identFile = new File(identDir, "identity.dat");
    190             if (identFile.exists()) {
    191                 // known and valid (maybe we need to check the file format... naw, fuck it
    192                 String files[] = identDir.list();
    193                 // we skip 1 because of identity.dat
    194                 if (files.length -1 > _maxMessagesPerIdent) {
    195                     log("Too many messages pending for " + target + ": " + (files.length-1));
    196                     return false;
    197                 } else {
    198                     return true;
    199                 }
    200             } else {
    201                 log("Ident directory exists, but identity does not... corrupt for " + target);
    202                 return false;
    203             }
    204         } else {
    205             log("Unknown ident " + target);
    206             return false;
    207         }
    208     }
    209    
     189        File identDir = getIdentDir(target);
     190        if (identDir.exists()) {
     191            File identFile = new File(identDir, "identity.dat");
     192            if (identFile.exists()) {
     193                // known and valid (maybe we need to check the file format... naw, fuck it
     194                String files[] = identDir.list();
     195                // we skip 1 because of identity.dat
     196                if (files.length - 1 > _maxMessagesPerIdent) {
     197                    log("Too many messages pending for " + target + ": " + (files.length - 1));
     198                    return false;
     199                } else {
     200                    return true;
     201                }
     202            } else {
     203                log("Ident directory exists, but identity does not... corrupt for " + target);
     204                return false;
     205            }
     206        } else {
     207            log("Unknown ident " + target);
     208            return false;
     209        }
     210    }
     211
    210212    private int saveFile(InputStream in, HttpServletResponse resp, String target, long len) throws IOException {
    211         File identDir = getIdentDir(target);
    212         if (!identDir.exists()) return -1;
    213         try {
    214             LockManager.lockIdent(target);
    215             int i = 0;
    216             while (true) {
    217                 File curFile = new File(identDir, "msg" + i + ".dat");
    218                 if (!curFile.exists()) {
    219                     boolean ok = writeFile(curFile, in, len);
    220                     if (ok)
    221                         return i;
    222                     else
    223                         return -1;
    224                 }
    225                 i++;
    226                 continue;
    227             }
    228         } finally {
    229             LockManager.unlockIdent(target);
    230         }
    231     }
    232    
     213        File identDir = getIdentDir(target);
     214        if (!identDir.exists()) return -1;
     215        try {
     216            LockManager.lockIdent(target);
     217            int i = 0;
     218            while (true) {
     219                File curFile = new File(identDir, "msg" + i + ".dat");
     220                if (!curFile.exists()) {
     221                    boolean ok = writeFile(curFile, in, len);
     222                    if (ok)
     223                        return i;
     224                    else
     225                        return -1;
     226                }
     227                i++;
     228                continue;
     229            }
     230        } finally {
     231            LockManager.unlockIdent(target);
     232        }
     233    }
     234
    233235    private boolean writeFile(File file, InputStream in, long len) throws IOException {
    234         long remaining = len;
    235         FileOutputStream fos = null;
    236         try {
    237             fos = new FileOutputStream(file);
    238             byte buf[] = new byte[4096];
    239             while (remaining > 0) {
    240                 int read = in.read(buf);
    241                 if (read == -1)
    242                     break;
    243                 remaining -= read;
    244                 if (read > 0)
    245                     fos.write(buf, 0, read);
    246             }
    247         } finally {
    248             if (fos != null) {
    249                 try { fos.close(); } catch (IOException ioe) {}
    250             }
    251             if (remaining != 0) {
    252                 log("Invalid remaining bytes [" + remaining + " out of " + len + "] - perhaps message was cancelled partway through delivery?  deleting " + file.getAbsolutePath());
    253                 boolean deleted = file.delete();
    254                 if (!deleted)
    255                     log("!!!Error deleting temporary file " + file.getAbsolutePath());
    256                 return false;
    257             }
    258         }
    259         return true;
    260     }
    261    
    262     private void sendSuccess(HttpServletRequest req, HttpServletResponse resp, String target, int msgId) throws IOException {
    263         ServletOutputStream out = resp.getOutputStream();
    264         StringBuffer buf = new StringBuffer();
    265         buf.append(PROP_STATUS).append('=').append(STATUS_OK).append('\n');
    266         buf.append(PROP_CHECK_URL).append('=').append(buildURL(req, _checkPath));
    267         buf.append('?');
    268         buf.append(PARAM_SEND_TARGET).append('=').append(target).append("&");
    269         buf.append(PARAM_MSG_ID).append('=').append(msgId).append("\n");
    270         out.write(buf.toString().getBytes());
    271         out.flush();
    272     }
    273    
     236        long remaining = len;
     237        FileOutputStream fos = null;
     238        try {
     239            fos = new FileOutputStream(file);
     240            byte buf[] = new byte[4096];
     241            while (remaining > 0) {
     242                int read = in.read(buf);
     243                if (read == -1) break;
     244                remaining -= read;
     245                if (read > 0) fos.write(buf, 0, read);
     246            }
     247        } finally {
     248            if (fos != null) {
     249                try {
     250                    fos.close();
     251                } catch (IOException ioe) {
     252                }
     253            }
     254            if (remaining != 0) {
     255                log("Invalid remaining bytes [" + remaining + " out of " + len
     256                    + "] - perhaps message was cancelled partway through delivery?  deleting " + file.getAbsolutePath());
     257                boolean deleted = file.delete();
     258                if (!deleted) log("!!!Error deleting temporary file " + file.getAbsolutePath());
     259                return false;
     260            }
     261        }
     262        return true;
     263    }
     264
     265    private void sendSuccess(HttpServletRequest req, HttpServletResponse resp, String target, int msgId)
     266                                                                                                        throws IOException {
     267        ServletOutputStream out = resp.getOutputStream();
     268        StringBuffer buf = new StringBuffer();
     269        buf.append(PROP_STATUS).append('=').append(STATUS_OK).append('\n');
     270        buf.append(PROP_CHECK_URL).append('=').append(buildURL(req, _checkPath));
     271        buf.append('?');
     272        buf.append(PARAM_SEND_TARGET).append('=').append(target).append("&");
     273        buf.append(PARAM_MSG_ID).append('=').append(msgId).append("\n");
     274        out.write(buf.toString().getBytes());
     275        out.flush();
     276    }
     277
    274278    private void fail(HttpServletRequest req, HttpServletResponse resp, String err) throws IOException {
    275         ServletOutputStream out = resp.getOutputStream();
    276         StringBuffer buf = new StringBuffer();
    277         buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n');
    278         out.write(buf.toString().getBytes());
    279         out.flush();
    280     }
    281    
     279        ServletOutputStream out = resp.getOutputStream();
     280        StringBuffer buf = new StringBuffer();
     281        buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n');
     282        out.write(buf.toString().getBytes());
     283        out.flush();
     284    }
     285
    282286    private void failSkewed(HttpServletRequest req, HttpServletResponse resp, long skew) throws IOException {
    283         ServletOutputStream out = resp.getOutputStream();
    284         StringBuffer buf = new StringBuffer();
    285         buf.append(PROP_STATUS).append('=').append(STATUS_CLOCKSKEW).append(skew).append('\n');
    286         out.write(buf.toString().getBytes());
    287         out.flush();
    288     }
    289    
     287        ServletOutputStream out = resp.getOutputStream();
     288        StringBuffer buf = new StringBuffer();
     289        buf.append(PROP_STATUS).append('=').append(STATUS_CLOCKSKEW).append(skew).append('\n');
     290        out.write(buf.toString().getBytes());
     291        out.flush();
     292    }
     293
    290294    public void init(ServletConfig config) throws ServletException {
    291         super.init(config);
    292        
    293         String checkPath = config.getInitParameter(PARAM_CHECK_PATH);
    294         if (checkPath == null)
    295             throw new ServletException("Check status path for the sending servlet required [" + PARAM_CHECK_PATH + "]");
    296         else
    297             _checkPath = checkPath;
    298        
    299         String maxMessagesPerIdentStr = config.getInitParameter(PARAM_MAX_MESSAGES_PER_IDENT);
    300         if (maxMessagesPerIdentStr == null)
    301             throw new ServletException("Max messages per ident for the sending servlet required [" + PARAM_MAX_MESSAGES_PER_IDENT + "]");
    302         try {
    303             _maxMessagesPerIdent = Integer.parseInt(maxMessagesPerIdentStr);
    304         } catch (Throwable t) {
    305             throw new ServletException("Valid max messages per ident for the sending servlet required [" + PARAM_MAX_MESSAGES_PER_IDENT + "]");
    306         }
    307     }
    308    
     295        super.init(config);
     296
     297        String checkPath = config.getInitParameter(PARAM_CHECK_PATH);
     298        if (checkPath == null)
     299            throw new ServletException("Check status path for the sending servlet required [" + PARAM_CHECK_PATH + "]");
     300        else
     301            _checkPath = checkPath;
     302
     303        String maxMessagesPerIdentStr = config.getInitParameter(PARAM_MAX_MESSAGES_PER_IDENT);
     304        if (maxMessagesPerIdentStr == null)
     305            throw new ServletException("Max messages per ident for the sending servlet required ["
     306                                       + PARAM_MAX_MESSAGES_PER_IDENT + "]");
     307        try {
     308            _maxMessagesPerIdent = Integer.parseInt(maxMessagesPerIdentStr);
     309        } catch (Throwable t) {
     310            throw new ServletException("Valid max messages per ident for the sending servlet required ["
     311                                       + PARAM_MAX_MESSAGES_PER_IDENT + "]");
     312        }
     313    }
     314
    309315    public static void main(String args[]) {
    310         String line = "target=pp0ARjQiB~IKC-0FsMUsPEMrwR3gxVBZGRYfEr1IzHI=&timeoutMs=52068&dataLength=2691&";
    311         Map props = getParameters(line);
    312         for (java.util.Iterator iter = props.keySet().iterator(); iter.hasNext(); ) {
    313             String key = (String)iter.next();
    314             String val = (String)props.get(key);
    315             System.out.println("[" + key + "] = [" + val + "]");
    316         }
     316        String line = "target=pp0ARjQiB~IKC-0FsMUsPEMrwR3gxVBZGRYfEr1IzHI=&timeoutMs=52068&dataLength=2691&";
     317        Map props = getParameters(line);
     318        for (java.util.Iterator iter = props.keySet().iterator(); iter.hasNext();) {
     319            String key = (String) iter.next();
     320            String val = (String) props.get(key);
     321            System.out.println("[" + key + "] = [" + val + "]");
     322        }
    317323    }
    318324}
  • apps/tests/echotester/BasicEchoTestAnalyzer.java

    r17a1b11 r51c49d7c  
    99     */
    1010    private static int REPORT_DELAY = 20;
    11    
     11
    1212    private static int SUMMARY_SIZE = 100;
    1313
    1414    public BasicEchoTestAnalyzer() {
    15         this(20, 100);
     15        this(20, 100);
    1616    }
    17    
     17
    1818    public BasicEchoTestAnalyzer(int reportDelay, int summarySize) {
    19         REPORT_DELAY = reportDelay;
    20         SUMMARY_SIZE = summarySize;
     19        REPORT_DELAY = reportDelay;
     20        SUMMARY_SIZE = summarySize;
    2121    }
    22    
    23     private int events = 0,
    24         packetLosses = 0,
    25         packetLossesDisconnect=0,
    26         disconnects = 0,
    27         disconnectsRefused = 0,
    28         delayCount=0,
    29         lastDelayPtr = 0;
    30     private long minDelay=Long.MAX_VALUE, maxDelay = 0, delaySum=0;
     22
     23    private int events = 0, packetLosses = 0, packetLossesDisconnect = 0, disconnects = 0, disconnectsRefused = 0,
     24            delayCount = 0, lastDelayPtr = 0;
     25    private long minDelay = Long.MAX_VALUE, maxDelay = 0, delaySum = 0;
    3126    private long[] lastDelays = new long[SUMMARY_SIZE];
    32    
    33    
     27
    3428    public synchronized void packetLossOccurred(boolean beforeDisconnect) {
    35         System.out.println("1: Packet lost"+
    36                            (beforeDisconnect?" before disconnect":"")+
    37                            ".");
    38         packetLosses++;
    39         if (beforeDisconnect) packetLossesDisconnect++;
    40         countEvent();
     29        System.out.println("1: Packet lost" + (beforeDisconnect ? " before disconnect" : "") + ".");
     30        packetLosses++;
     31        if (beforeDisconnect) packetLossesDisconnect++;
     32        countEvent();
    4133    }
    4234
    4335    public synchronized void successOccurred(long delay) {
    44         System.out.println("0: Delay = "+delay);
    45         if (delay > maxDelay) maxDelay=delay;
    46         if (delay < minDelay) minDelay=delay;
    47         delaySum+=delay;
    48         delayCount++;
    49         lastDelays[lastDelayPtr++]=delay;
    50         lastDelayPtr%=SUMMARY_SIZE;
    51         countEvent();
     36        System.out.println("0: Delay = " + delay);
     37        if (delay > maxDelay) maxDelay = delay;
     38        if (delay < minDelay) minDelay = delay;
     39        delaySum += delay;
     40        delayCount++;
     41        lastDelays[lastDelayPtr++] = delay;
     42        lastDelayPtr %= SUMMARY_SIZE;
     43        countEvent();
    5244    }
    5345
    5446    public synchronized void disconnected(boolean refused) {
    55         System.out.println("2: Disconnected"+
    56                            (refused?" (connection refused)":"")+
    57                            ".");
    58         disconnects++;
    59         if (refused) disconnectsRefused++;
    60         countEvent();
     47        System.out.println("2: Disconnected" + (refused ? " (connection refused)" : "") + ".");
     48        disconnects++;
     49        if (refused) disconnectsRefused++;
     50        countEvent();
    6151    }
    6252
    6353    private void countEvent() {
    64         events++;
    65         if (events % REPORT_DELAY == 0) {
    66             int packets = packetLosses+delayCount;
    67             long delaySummary=0;
    68             for (int i=0;i<SUMMARY_SIZE;i++) {
    69                 delaySummary+=lastDelays[i];
    70             }
    71             System.out.println
    72                 ("++++++++++++++++ ECHO STATISTICS +++++++++++++++++++++++++"+
    73                  "\n++ Number of total echo messages: "+packets+
    74                  "\n++ No response for "+packetLosses+
    75                  "\n++    (of which "+ packetLossesDisconnect+
    76                  " due to a disconnect)"+
    77                  "\n++ Disconnects: "+disconnects+
    78                  "\n++    (of which "+disconnectsRefused+
    79                  " due to 'connection refused')"+
    80                  (disconnects>0 || true
    81                   ?"\n++ Average lost packets per disconnect: "+
    82                   (packetLossesDisconnect/(float)disconnects)
    83                   :"")+
    84                  "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++"+
    85                  "\n++ Minimal delay: "+minDelay+
    86                  "\n++ Average delay: "+(delaySum/(float)delayCount)+
    87                  "\n++ Maximal delay: "+maxDelay+
    88                  (delayCount >=SUMMARY_SIZE
    89                   ?"\n++ Average delay over last " + SUMMARY_SIZE + ": "+(delaySummary/(float)SUMMARY_SIZE)
    90                   :"")+
    91                  "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
    92         }
     54        events++;
     55        if (events % REPORT_DELAY == 0) {
     56            int packets = packetLosses + delayCount;
     57            long delaySummary = 0;
     58            for (int i = 0; i < SUMMARY_SIZE; i++) {
     59                delaySummary += lastDelays[i];
     60            }
     61            System.out.println("++++++++++++++++ ECHO STATISTICS +++++++++++++++++++++++++"
     62                               + "\n++ Number of total echo messages: "
     63                               + packets
     64                               + "\n++ No response for "
     65                               + packetLosses
     66                               + "\n++    (of which "
     67                               + packetLossesDisconnect
     68                               + " due to a disconnect)"
     69                               + "\n++ Disconnects: "
     70                               + disconnects
     71                               + "\n++    (of which "
     72                               + disconnectsRefused
     73                               + " due to 'connection refused')"
     74                               + (disconnects > 0 || true ? "\n++ Average lost packets per disconnect: "
     75                                                            + (packetLossesDisconnect / (float) disconnects) : "")
     76                               + "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
     77                               + "\n++ Minimal delay: "
     78                               + minDelay
     79                               + "\n++ Average delay: "
     80                               + (delaySum / (float) delayCount)
     81                               + "\n++ Maximal delay: "
     82                               + maxDelay
     83                               + (delayCount >= SUMMARY_SIZE ? "\n++ Average delay over last " + SUMMARY_SIZE + ": "
     84                                                               + (delaySummary / (float) SUMMARY_SIZE) : "")
     85                               + "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
     86        }
    9387    }
    9488}
  • apps/tests/echotester/EchoTestAnalyzer.java

    r17a1b11 r51c49d7c  
    1616}
    1717
    18 
    19    
  • apps/tests/echotester/EchoTester.java

    r17a1b11 r51c49d7c  
    2727     * How long to wait between packets. Default is 6 seconds.
    2828     */
    29     private static long PACKET_DELAY= 6000;
     29    private static long PACKET_DELAY = 6000;
    3030
    3131    /**
     
    3333     * seen as "broken" and disconnected.
    3434     */
    35     private static final long MAX_PACKETS_QUEUED=50; // unused
    36    
    37    
     35    private static final long MAX_PACKETS_QUEUED = 50; // unused
     36
    3837    private EchoTestAnalyzer eta;
    3938    private String host;
     
    4241    // the following vars are synchronized via the lock.
    4342    private Object lock = new Object();
    44     private long nextPacket=0;
    45     private long nextUnreceived=0;
    46     private boolean readerRunning=false;
     43    private long nextPacket = 0;
     44    private long nextUnreceived = 0;
     45    private boolean readerRunning = false;
    4746
    4847    public static void main(String[] args) {
    49         if (args.length == 3)
    50             PACKET_DELAY = Long.parseLong(args[2]);
    51         new EchoTester(args[0], Integer.parseInt(args[1]),
    52                        new BasicEchoTestAnalyzer());
     48        if (args.length == 3) PACKET_DELAY = Long.parseLong(args[2]);
     49        new EchoTester(args[0], Integer.parseInt(args[1]), new BasicEchoTestAnalyzer());
    5350    }
    54    
     51
    5552    public EchoTester(String host, int port, EchoTestAnalyzer eta) {
    56         this.eta=eta;
    57         this.host=host;
    58         this.port=port;
    59         start();
     53        this.eta = eta;
     54        this.host = host;
     55        this.port = port;
     56        start();
    6057    }
    6158
    6259    public void run() {
    63         try {
    64             while (true) {
    65                 Socket s;
    66                 try {
    67                     s = new Socket(host, port);
    68                 } catch (ConnectException ex) {
    69                     eta.disconnected(true);
    70                     Thread.sleep(PACKET_DELAY);
    71                     continue;
    72                 }
    73                 System.out.println("41: Connected to "+host+":"+port);
    74                 synchronized(lock) {
    75                     nextUnreceived=nextPacket;
    76                 }
    77                 Thread t = new ResponseReaderThread(s);
    78                 Writer w = new BufferedWriter(new OutputStreamWriter
    79                                               (s.getOutputStream()));
    80                 while (true) {
    81                     long no;
    82                     synchronized(lock) {
    83                         no = nextPacket++;
    84                     }
    85                     try {
    86                         w.write(no+" "+System.currentTimeMillis()+"\n");
    87                         w.flush();
    88                     } catch (SocketException ex) {
    89                         break;
    90                     }
    91                     Thread.sleep(PACKET_DELAY);
    92                 }
    93                 s.close();
    94                 t.join();
    95                 synchronized(lock) {
    96                     if (readerRunning) {
    97                         System.out.println("*** WHY IS THIS THREAD STILL"+
    98                                            " RUNNING?");
    99                     }
    100                     while (nextUnreceived < nextPacket) {
    101                         nextUnreceived++;
    102                         eta.packetLossOccurred(true);
    103                     }
    104                     if (nextUnreceived > nextPacket) {
    105                         System.out.println("*** WTF? "+nextUnreceived+" > "+
    106                                            nextPacket);
    107                     }
    108                 }
    109                 eta.disconnected(false);
    110             }
    111         } catch (InterruptedException ex) {
    112             ex.printStackTrace();
    113             System.exit(1); // treat these errors as fatal         
    114         } catch (IOException ex) {
    115             ex.printStackTrace();
    116             System.exit(1); // treat these errors as fatal
    117         }
     60        try {
     61            while (true) {
     62                Socket s;
     63                try {
     64                    s = new Socket(host, port);
     65                } catch (ConnectException ex) {
     66                    eta.disconnected(true);
     67                    Thread.sleep(PACKET_DELAY);
     68                    continue;
     69                }
     70                System.out.println("41: Connected to " + host + ":" + port);
     71                synchronized (lock) {
     72                    nextUnreceived = nextPacket;
     73                }
     74                Thread t = new ResponseReaderThread(s);
     75                Writer w = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
     76                while (true) {
     77                    long no;
     78                    synchronized (lock) {
     79                        no = nextPacket++;
     80                    }
     81                    try {
     82                        w.write(no + " " + System.currentTimeMillis() + "\n");
     83                        w.flush();
     84                    } catch (SocketException ex) {
     85                        break;
     86                    }
     87                    Thread.sleep(PACKET_DELAY);
     88                }
     89                s.close();
     90                t.join();
     91                synchronized (lock) {
     92                    if (readerRunning) {
     93                        System.out.println("*** WHY IS THIS THREAD STILL" + " RUNNING?");
     94                    }
     95                    while (nextUnreceived < nextPacket) {
     96                        nextUnreceived++;
     97                        eta.packetLossOccurred(true);
     98                    }
     99                    if (nextUnreceived > nextPacket) {
     100                        System.out.println("*** WTF? " + nextUnreceived + " > " + nextPacket);
     101                    }
     102                }
     103                eta.disconnected(false);
     104            }
     105        } catch (InterruptedException ex) {
     106            ex.printStackTrace();
     107            System.exit(1); // treat these errors as fatal         
     108        } catch (IOException ex) {
     109            ex.printStackTrace();
     110            System.exit(1); // treat these errors as fatal
     111        }
    118112
    119113    }
     
    121115    private class ResponseReaderThread extends Thread {
    122116
    123         private Socket s;
     117        private Socket s;
    124118
    125         public ResponseReaderThread(Socket s) {
    126             this.s=s;
    127             synchronized(lock) {
    128                 readerRunning=true;
    129             }
    130             start();
    131         }
     119        public ResponseReaderThread(Socket s) {
     120            this.s = s;
     121            synchronized (lock) {
     122                readerRunning = true;
     123            }
     124            start();
     125        }
    132126
    133         public void run() {
    134             try {
    135                 BufferedReader br = new BufferedReader(new InputStreamReader
    136                                                        (s.getInputStream()));
    137                 String line;
    138                 int index;
    139                 while ((line=br.readLine()) != null) {
    140                     if ((index=line.indexOf(" ")) == -1)
    141                         continue;
    142                     long now, packetNumber, packetTime;
    143                     now = System.currentTimeMillis();
    144                     try {
    145                         packetNumber = Long.parseLong
    146                             (line.substring(0,index));
    147                         packetTime = Long.parseLong
    148                             (line.substring(index+1));
    149                     } catch (NumberFormatException ex) {
    150                         System.out.println(ex.toString());
    151                         continue;
    152                     }
    153                     synchronized (lock) {
    154                         while (packetNumber > nextUnreceived) {
    155                             nextUnreceived++;
    156                             eta.packetLossOccurred(false);
    157                         }
    158                         if (nextUnreceived > packetNumber) {
    159                             System.out.println("*** DOUBLE PACKET!");
    160                         } else {
    161                             nextUnreceived++;
    162                         }
    163                     }
    164                     eta.successOccurred(now-packetTime);
    165                 }
    166             } catch (SocketException ex) {
    167                 // ignore
    168             } catch (IOException ex) {
    169                 ex.printStackTrace();
    170                 System.exit(0);
    171             }
    172             synchronized(lock) {
    173                 readerRunning=false;
    174             }
    175         }
     127        public void run() {
     128            try {
     129                BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
     130                String line;
     131                int index;
     132                while ((line = br.readLine()) != null) {
     133                    if ((index = line.indexOf(" ")) == -1) continue;
     134                    long now, packetNumber, packetTime;
     135                    now = System.currentTimeMillis();
     136                    try {
     137                        packetNumber = Long.parseLong(line.substring(0, index));
     138                        packetTime = Long.parseLong(line.substring(index + 1));
     139                    } catch (NumberFormatException ex) {
     140                        System.out.println(ex.toString());
     141                        continue;
     142                    }
     143                    synchronized (lock) {
     144                        while (packetNumber > nextUnreceived) {
     145                            nextUnreceived++;
     146                            eta.packetLossOccurred(false);
     147                        }
     148                        if (nextUnreceived > packetNumber) {
     149                            System.out.println("*** DOUBLE PACKET!");
     150                        } else {
     151                            nextUnreceived++;
     152                        }
     153                    }
     154                    eta.successOccurred(now - packetTime);
     155                }
     156            } catch (SocketException ex) {
     157                // ignore
     158            } catch (IOException ex) {
     159                ex.printStackTrace();
     160                System.exit(0);
     161            }
     162            synchronized (lock) {
     163                readerRunning = false;
     164            }
     165        }
    176166    }
    177167}
Note: See TracChangeset for help on using the changeset viewer.