Changeset 6058422 for apps/ministreaming


Ignore:
Timestamp:
May 3, 2004 3:34:25 AM (17 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
2bfbe1ca
Parents:
44e34f7
git-author:
jrandom <jrandom> (05/03/04 03:34:25)
git-committer:
zzz <zzz@…> (05/03/04 03:34:25)
Message:

refactored packet handling into type specific methods
removed nested synchronization (which had been causing undetected deadlocks)
made sync blocks smaller, though this may have opened holes related to
resent ACK/SYN/CLOSE packets that are delivered in a race. I'm not as
fluent in the ministreaming lib code as i should be (yet), but duck's thread
dumps were showing hundreds of threads waiting on a lock that'll never get
released (since the only way to release it would be to receive another
packet, and no more packets can be received until the lock is released, etc)
also, I2PSession is threadsafe - i can see no reason to synchronize on it
(and it was being synchronized on only part of the time?)
also, refactored the charset encoding stuff and minor log tweaking
i've been testing this for the last hour or so, on eepsites and squid (large
and small files), as well as irc, and there haven't been any glitches. but
it needs more testing before it can be released, obviously.

Location:
apps/ministreaming/java/src/net/i2p/client/streaming
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

    r44e34f7 r6058422  
    7676                    throw new InterruptedIOException("Timed out waiting for remote ID");
    7777             
    78                 _log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) + " for "
    79                            + this.hashCode());
     78                if (_log.shouldLog(Log.DEBUG))
     79                    _log.debug("TIMING: RemoteID set to "
     80                               + I2PSocketManager.getReadableForm(remoteID) + " for "
     81                               + this.hashCode());
    8082            }
    8183            return remoteID;
     
    144146
    145147    private byte getMask(int add) {
    146         return (byte) ((outgoing ? (byte) 0xA0 : (byte) 0x50) + (byte) add);
     148        if (outgoing)
     149            return (byte)(I2PSocketManager.DATA_IN + (byte)add);
     150        else
     151            return (byte)(I2PSocketManager.DATA_OUT + (byte)add);
    147152    }
    148153
     
    188193                synchronized (flagLock) {
    189194                    if (closed) {
    190                         _log.debug("Closed is set, so closing stream: " + this.hashCode());
     195                        _log.debug("Closed is set, so closing stream: " + hashCode());
    191196                        return -1;
    192197                    }
     
    211216
    212217            if (_log.shouldLog(Log.DEBUG)) {
    213                 _log.debug("Read from I2PInputStream " + this.hashCode() + " returned " + read.length + " bytes");
     218                _log.debug("Read from I2PInputStream " + hashCode() + " returned "
     219                           + read.length + " bytes");
    214220            }
    215221            //if (_log.shouldLog(Log.DEBUG)) {
    216222            //  _log.debug("Read from I2PInputStream " + this.hashCode()
    217             //     + " returned "+read.length+" bytes:\n"
    218             //     + HexDump.dump(read));
     223            //             + " returned "+read.length+" bytes:\n"
     224            //             + HexDump.dump(read));
    219225            //}
    220226            return read.length;
     
    230236
    231237        public synchronized void queueData(byte[] data, int off, int len) {
    232             _log.debug("Insert " + len + " bytes into queue: " + this.hashCode());
     238            if (_log.shouldLog(Log.DEBUG))
     239                _log.debug("Insert " + len + " bytes into queue: " + hashCode());
    233240            bc.append(data, off, len);
    234241            notifyAll();
     
    269276            _log.debug("Runner's input stream is: " + in.hashCode());
    270277            this.in = in;
    271             setName("SocketRunner from " + I2PSocketImpl.this.remote.calculateHash().toBase64().substring(0, 4));
     278            String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
     279            setName("SocketRunner from " + peer.substring(0, 4));
    272280            start();
     281        }
     282       
     283        /**
     284         * Pump some more data
     285         *
     286         * @return true if we should keep on handling, false otherwise
     287         */
     288        private boolean handleNextPacket(ByteCollector bc, byte buffer[])
     289                                         throws IOException, I2PSessionException {
     290            int len = in.read(buffer);
     291            int bcsize = bc.getCurrentSize();
     292            if (len != -1) {
     293                bc.append(buffer, len);
     294            } else if (bcsize == 0) {
     295                // nothing left in the buffer, but the read(..) didn't EOF (-1)
     296                // this used to be 'break' (aka return false), though that seems
     297                // odd to me - shouldn't it keep reading packets until EOF? 
     298                // but perhaps there's something funky in the stream's operation,
     299                // or some other dependency within the rest of the ministreaming
     300                // lib, so for the moment, return false.  --jr
     301                return false;
     302            }
     303            if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
     304                if (_log.shouldLog(Log.DEBUG))
     305                    _log.debug("Runner Point d: " + hashCode());
     306
     307                try {
     308                    Thread.sleep(PACKET_DELAY);
     309                } catch (InterruptedException e) {
     310                    _log.warn("wtf", e);
     311                }
     312            }
     313            if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
     314                byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
     315                if (data.length > 0) {
     316                    if (_log.shouldLog(Log.DEBUG))
     317                        _log.debug("Message size is: " + data.length);
     318                    boolean sent = sendBlock(data);
     319                    if (!sent) {
     320                        _log.error("Error sending message to peer.  Killing socket runner");
     321                        return false;
     322                    }
     323                }
     324            }
     325            return true;
    273326        }
    274327
     
    276329            byte[] buffer = new byte[MAX_PACKET_SIZE];
    277330            ByteCollector bc = new ByteCollector();
    278             boolean sent = true;
     331            boolean keepHandling = true;
     332            int packetsHandled = 0;
    279333            try {
    280                 int len, bcsize;
    281334                //              try {
    282                 while (true) {
    283                     len = in.read(buffer);
    284                     bcsize = bc.getCurrentSize();
    285                     if (len != -1) {
    286                         bc.append(buffer, len);
    287                     } else if (bcsize == 0) {
    288                         break;
    289                     }
    290                     if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
    291                         _log.debug("Runner Point d: " + this.hashCode());
    292 
    293                         try {
    294                             Thread.sleep(PACKET_DELAY);
    295                         } catch (InterruptedException e) {
    296                             e.printStackTrace();
    297                         }
    298                     }
    299                     if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
    300                         byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
    301                         if (data.length > 0) {
    302                             _log.debug("Message size is: " + data.length);
    303                             sent = sendBlock(data);
    304                             if (!sent) {
    305                                 _log.error("Error sending message to peer.  Killing socket runner");
    306                                 break;
    307                             }
    308                         }
    309                     }
    310                 }
    311                 if ((bc.getCurrentSize() > 0) && sent) {
    312                     _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " + in.hashCode() + "; "
     335                while (keepHandling) {
     336                    keepHandling = handleNextPacket(bc, buffer);
     337                    packetsHandled++;
     338                }
     339                if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
     340                    _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: "
     341                               + in.hashCode() + "; "
    313342                               + "queue size: " + bc.getCurrentSize() + ")");
    314343                }
     
    316345                    closed2 = true;
    317346                }
    318                 //              } catch (IOException ex) {
    319                 //                  if (_log.shouldLog(Log.INFO))
    320                 //                      _log.info("Error reading and writing", ex);
    321                 //              }
    322347                boolean sc;
    323348                synchronized (flagLock) {
     
    325350                } // FIXME: Race here?
    326351                if (sc) {
    327                     _log.info("Sending close packet: " + outgoing);
    328                     byte[] packet = I2PSocketManager.makePacket((byte) (getMask(0x02)), remoteID, new byte[0]);
    329                     synchronized (manager.getSession()) {
    330                         sent = manager.getSession().sendMessage(remote, packet);
    331                     }
     352                    if (_log.shouldLog(Log.INFO))
     353                        _log.info("Sending close packet: " + outgoing);
     354                    byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
     355                    boolean sent = manager.getSession().sendMessage(remote, packet);
    332356                    if (!sent) {
    333357                        _log.error("Error sending close packet to peer");
     
    349373
    350374        private boolean sendBlock(byte data[]) throws I2PSessionException {
    351             _log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
     375            if (_log.shouldLog(Log.DEBUG))
     376                _log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
    352377            if (remoteID == null) {
    353378                _log.error("NULL REMOTEID");
     
    359384                if (closed2) return false;
    360385            }
    361             synchronized (manager.getSession()) {
    362                 sent = manager.getSession().sendMessage(remote, packet);
    363             }
     386            sent = manager.getSession().sendMessage(remote, packet);
    364387            return sent;
    365388        }
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java

    r44e34f7 r6058422  
    2323import net.i2p.data.Base64;
    2424import net.i2p.data.Destination;
     25import net.i2p.data.DataFormatException;
    2526import net.i2p.util.Log;
    2627
     
    4142    private HashMap _inSockets;
    4243    private I2PSocketOptions _defaultOptions;
     44   
     45    public static final short ACK = 0x51;
     46    public static final short CLOSE_OUT = 0x52;
     47    public static final short DATA_OUT = 0x50;
     48    public static final short SYN = 0xA1;
     49    public static final short CLOSE_IN = 0xA2;
     50    public static final short DATA_IN = 0xA0;
     51    public static final short CHAFF = 0xFF;
    4352
    4453    public I2PSocketManager() {
     
    6473        _log.error("Error occurred: [" + message + "]", error);
    6574    }
    66 
     75   
    6776    public void messageAvailable(I2PSession session, int msgId, long size) {
    6877        try {
     
    7887            }
    7988            int type = msg[0] & 0xff;
    80             String id = new String(new byte[] { msg[1], msg[2], msg[3]}, "ISO-8859-1");
     89            String id = toString(new byte[] { msg[1], msg[2], msg[3]});
    8190            byte[] payload = new byte[msg.length - 4];
    8291            System.arraycopy(msg, 4, payload, 0, payload.length);
    8392            _log.debug("Message read: type = [" + Integer.toHexString(type) + "] id = [" + getReadableForm(id)
    8493                       + "] payload length: " + payload.length + "]");
    85             synchronized (lock) {
    86                 switch (type) {
    87                 case 0x51:
    88                     // ACK outgoing
    89                     s = (I2PSocketImpl) _outSockets.get(id);
    90                     if (s == null) {
    91                         _log.warn("No socket responsible for ACK packet");
    92                         return;
    93                     }
    94                     if (payload.length == 3 && s.getRemoteID(false) == null) {
    95                         String newID = new String(payload, "ISO-8859-1");
    96                         s.setRemoteID(newID);
    97                         return;
    98                     } else {
    99                         if (payload.length != 3)
    100                             _log.warn("Ack packet had " + payload.length + " bytes");
    101                         else
    102                             _log.warn("Remote ID already exists? " + s.getRemoteID());
    103                         return;
    104                     }
    105                 case 0x52:
    106                     // disconnect outgoing
    107                     _log.debug("*Disconnect outgoing!");
    108                     try {
    109                         s = (I2PSocketImpl) _outSockets.get(id);
    110                         if (s != null) {
    111                             if (payload.length > 0) {
    112                                 _log.debug("Disconnect packet had "
    113                                            + payload.length + " bytes");
    114                             }
    115                             if (s.getRemoteID(false) == null) {
    116                                 s.setRemoteID(null); // Just to wake up socket
    117                                 return;
    118                             }
    119                             s.internalClose();
    120                             _outSockets.remove(id);
    121                         }
    122                         return;
    123                     } catch (Exception t) {
    124                         _log.error("Ignoring error on disconnect", t);
    125                     }
    126                 case 0x50:
    127                     // packet send outgoing
    128                     _log.debug("*Packet send outgoing [" + payload.length + "]");
    129                     s = (I2PSocketImpl) _outSockets.get(id);
    130                     if (s != null) {
    131                         s.queueData(payload);
    132                         return;
    133                     } else {
    134                         _log.error("Null socket with data available");
    135                         throw new IllegalStateException("Null socket with data available");
    136                     }
    137                 case 0xA1:
    138                     // SYN incoming
    139                     _log.debug("*Syn!");
    140                     String newLocalID = makeID(_inSockets);
    141                     Destination d = new Destination();
    142                     d.readBytes(new ByteArrayInputStream(payload));
    143                    
    144                     if (_serverSocket == null) {
    145                         // The app did not instantiate an I2PServerSocket
    146                         byte[] packet = makePacket((byte) 0x52, id, newLocalID.getBytes("ISO-8859-1"));
    147                         boolean replySentOk = false;
    148                         synchronized (_session) {
    149                             replySentOk = _session.sendMessage(d, packet);
    150                         }
    151                         if (!replySentOk) {
    152                             _log.error("Error sending close to " + d.calculateHash().toBase64()
    153                                        + " in response to a new con message", new Exception("Failed creation"));
    154                         }
    155                         return;
    156                     }
    157                    
    158                     s = new I2PSocketImpl(d, this, false, newLocalID);
    159                     s.setRemoteID(id);
    160                     if (_serverSocket.getNewSocket(s)) {
    161                         _inSockets.put(newLocalID, s);
    162                         byte[] packet = makePacket((byte) 0x51, id, newLocalID.getBytes("ISO-8859-1"));
    163                         boolean replySentOk = false;
    164                         synchronized (_session) {
    165                             replySentOk = _session.sendMessage(d, packet);
    166                         }
    167                         if (!replySentOk) {
    168                             _log.error("Error sending reply to " + d.calculateHash().toBase64()
    169                                        + " in response to a new con message", new Exception("Failed creation"));
    170                             s.internalClose();
    171                         }
    172                     } else {
    173                         byte[] packet = (" " + id).getBytes("ISO-8859-1");
    174                         packet[0] = 0x52;
    175                         boolean nackSent = session.sendMessage(d, packet);
    176                         if (!nackSent) {
    177                             _log.error("Error sending NACK for session creation");
    178                         }
    179                         s.internalClose();
    180                     }
    181                     return;
    182                 case 0xA2:
    183                     // disconnect incoming
    184                     _log.debug("*Disconnect incoming!");
    185                     try {
    186                         s = (I2PSocketImpl) _inSockets.get(id);
    187                         if (payload.length == 0 && s != null) {
    188                             s.internalClose();
    189                             _inSockets.remove(id);
    190                             return;
    191                         } else {
    192                             if (payload.length > 0) _log.warn("Disconnect packet had " + payload.length + " bytes");
    193                             return;
    194                         }
    195                     } catch (Exception t) {
    196                         _log.error("Ignoring error on disconnect", t);
    197                         return;
    198                     }
    199                 case 0xA0:
    200                     // packet send incoming
    201                     _log.debug("*Packet send incoming [" + payload.length + "]");
    202                     s = (I2PSocketImpl) _inSockets.get(id);
    203                     if (s != null) {
    204                         s.queueData(payload);
    205                         return;
    206                     } else {
    207                         _log.error("Null socket with data available");
    208                         throw new IllegalStateException("Null socket with data available");
    209                     }
    210                 case 0xFF:
     94            switch (type) {
     95                case ACK:
     96                    ackAvailable(id, payload);
     97                    return;
     98                case CLOSE_OUT:
     99                    disconnectAvailable(id, payload);
     100                    return;
     101                case DATA_OUT:
     102                    sendOutgoingAvailable(id, payload);
     103                    return;
     104                case SYN:
     105                    synIncomingAvailable(id, payload, session);
     106                    return;
     107                case CLOSE_IN:
     108                    disconnectIncoming(id, payload);
     109                    return;
     110                case DATA_IN:
     111                    sendIncoming(id, payload);
     112                case CHAFF:
    211113                    // ignore
    212114                    return;
     115                default:
     116                    handleUnknown(type, id, payload);
     117                    return;
     118            }
     119        } catch (I2PException ise) {
     120            _log.error("Error processing", ise);
     121        } catch (IllegalStateException ise) {
     122            _log.debug("Error processing", ise);
     123        }
     124    }
     125   
     126    /**
     127     * We've received an ACK packet (hopefully, in response to a SYN that we
     128     * recently sent out).  Notify the associated I2PSocket that we now have
     129     * the remote stream ID (which should get things going, since the handshake
     130     * is complete).
     131     *
     132     */
     133    private void ackAvailable(String id, byte payload[]) {
     134        I2PSocketImpl s = null;
     135        synchronized (lock) {
     136            s = (I2PSocketImpl) _outSockets.get(id);
     137        }
     138
     139        if (s == null) {
     140            _log.warn("No socket responsible for ACK packet");
     141            return;
     142        }
     143
     144        String remoteId = null;
     145        try {
     146            remoteId = s.getRemoteID(false);
     147        } catch (InterruptedIOException iie) {
     148            throw new RuntimeException("ERROR!  getRemoteId(false) should never throw InterruptedIOException!");
     149        }
     150
     151        if ( (payload.length == 3) && (remoteId == null) ) {
     152            String newID = toString(payload);
     153            s.setRemoteID(newID);
     154            return;
     155        } else {
     156            // (payload.length != 3 || getRemoteId != null)
     157            if (_log.shouldLog(Log.WARN)) {
     158                if (payload.length != 3)
     159                    _log.warn("Ack packet had " + payload.length + " bytes");
     160                else
     161                    _log.warn("Remote ID already exists? " + remoteId);
     162            }
     163            return;
     164        }
     165    }
     166   
     167    /**
     168     * We received a disconnect packet, telling us to tear down the specified
     169     * stream.
     170     */
     171    private void disconnectAvailable(String id, byte payload[]) {
     172        I2PSocketImpl s = null;
     173        synchronized (lock) {
     174            s = (I2PSocketImpl) _outSockets.get(id);
     175        }
     176       
     177        _log.debug("*Disconnect outgoing!");
     178        try {
     179            if (s != null) {
     180                if (payload.length > 0) {
     181                    _log.debug("Disconnect packet had "
     182                               + payload.length + " bytes");
    213183                }
    214                 _log.error("\n\n=============== Unknown packet! " + "============" + "\nType: " + (int) type
    215                            + "\nID:   " + getReadableForm(id) + "\nBase64'ed Data: " + Base64.encode(payload)
    216                            + "\n\n\n");
    217                 if (id != null) {
    218                     _inSockets.remove(id);
     184                if (s.getRemoteID(false) == null) {
     185                    s.setRemoteID(null); // Just to wake up socket
     186                    return;
     187                }
     188                s.internalClose();
     189                synchronized (lock) {
    219190                    _outSockets.remove(id);
    220191                }
    221192            }
    222         } catch (I2PException ise) {
    223             _log.error("Error processing", ise);
    224         } catch (IOException ioe) {
    225             _log.error("Error processing", ioe);
    226         } catch (IllegalStateException ise) {
    227             _log.debug("Error processing", ise);
    228         }
    229     }
    230 
     193            return;
     194        } catch (Exception t) {
     195            _log.error("Ignoring error on disconnect", t);
     196        }
     197    }
     198   
     199    /**
     200     * We've received data on a stream we created - toss the data onto
     201     * the socket for handling.
     202     *
     203     * @throws IllegalStateException if the socket isn't open or isn't known
     204     */
     205    private void sendOutgoingAvailable(String id, byte payload[]) throws IllegalStateException {
     206        I2PSocketImpl s = null;
     207        synchronized (lock) {
     208            s = (I2PSocketImpl) _outSockets.get(id);
     209        }
     210
     211        // packet send outgoing
     212        if (_log.shouldLog(Log.DEBUG))
     213            _log.debug("*Packet send outgoing [" + payload.length + "]");
     214        if (s != null) {
     215            s.queueData(payload);
     216            return;
     217        } else {
     218            _log.error("Null socket with data available");
     219            throw new IllegalStateException("Null socket with data available");
     220        }
     221    }
     222   
     223    /**
     224     * We've received a SYN packet (a request for a new stream).  If the client has
     225     * said they want incoming sockets (by retrieving the serverSocket), the stream
     226     * will be ACKed, but if they have not, they'll be NACKed)
     227     *
     228     * @throws DataFormatException if the destination in the SYN was invalid
     229     * @throws I2PSessionException if there was an I2P error sending the ACK or NACK
     230     */
     231    private void synIncomingAvailable(String id, byte payload[], I2PSession session)
     232                                      throws DataFormatException, I2PSessionException {
     233        _log.debug("*Syn!");
     234        Destination d = new Destination();
     235        d.fromByteArray(payload);
     236
     237        I2PSocketImpl s = null;
     238        boolean acceptConnections = (_serverSocket != null);
     239        String newLocalID = null;
     240        synchronized (lock) {
     241            newLocalID = makeID(_inSockets);
     242            if (acceptConnections) {
     243                s = new I2PSocketImpl(d, this, false, newLocalID);
     244                s.setRemoteID(id);
     245            }
     246        }   
     247       
     248        if (!acceptConnections) {
     249            // The app did not instantiate an I2PServerSocket
     250            byte[] packet = makePacket((byte) CLOSE_OUT, id, toBytes(newLocalID));
     251            boolean replySentOk = false;
     252            synchronized (_session) {
     253                replySentOk = _session.sendMessage(d, packet);
     254            }
     255            if (!replySentOk) {
     256                _log.error("Error sending close to " + d.calculateHash().toBase64()
     257                           + " in response to a new con message",
     258                           new Exception("Failed creation"));
     259            }
     260            return;
     261        }
     262
     263        if (_serverSocket.getNewSocket(s)) {
     264            _inSockets.put(newLocalID, s);
     265            byte[] packet = makePacket((byte) ACK, id, toBytes(newLocalID));
     266            boolean replySentOk = false;
     267            replySentOk = _session.sendMessage(d, packet);
     268            if (!replySentOk) {
     269                _log.error("Error sending reply to " + d.calculateHash().toBase64()
     270                           + " in response to a new con message",
     271                           new Exception("Failed creation"));
     272                s.internalClose();
     273            }
     274        } else {
     275            byte[] packet = toBytes(" " + id);
     276            packet[0] = CLOSE_OUT;
     277            boolean nackSent = session.sendMessage(d, packet);
     278            if (!nackSent) {
     279                _log.error("Error sending NACK for session creation");
     280            }
     281            s.internalClose();
     282        }
     283        return;
     284    }
     285   
     286    /**
     287     * We've received a disconnect for a socket we didn't initiate, so kill
     288     * the socket.
     289     *
     290     */
     291    private void disconnectIncoming(String id, byte payload[]) {
     292        _log.debug("*Disconnect incoming!");
     293        I2PSocketImpl s = null;
     294        synchronized (lock) {
     295            s = (I2PSocketImpl) _inSockets.get(id);
     296            if (payload.length == 0 && s != null) {
     297                _inSockets.remove(id);
     298            }
     299        }
     300       
     301        try {
     302            if (payload.length == 0 && s != null) {
     303                s.internalClose();
     304                return;
     305            } else {
     306                if ( (payload.length > 0) && (_log.shouldLog(Log.WARN)) )
     307                    _log.warn("Disconnect packet had " + payload.length + " bytes");
     308                return;
     309            }
     310        } catch (Exception t) {
     311            _log.error("Ignoring error on disconnect", t);
     312            return;
     313        }
     314    }
     315   
     316    /**
     317     * We've received data on a stream we received - toss the data onto
     318     * the socket for handling.
     319     *
     320     * @throws IllegalStateException if the socket isn't open or isn't known
     321     */
     322    private void sendIncoming(String id, byte payload[]) {
     323        if (_log.shouldLog(Log.DEBUG))
     324            _log.debug("*Packet send incoming [" + payload.length + "]");
     325        I2PSocketImpl s = null;
     326        synchronized (lock) {
     327            s = (I2PSocketImpl) _inSockets.get(id);
     328        }
     329       
     330        if (s != null) {
     331            s.queueData(payload);
     332            return;
     333        } else {
     334            _log.error("Null socket with data available");
     335            throw new IllegalStateException("Null socket with data available");
     336        }
     337    }
     338   
     339    /**
     340     * Unknown packet.  moo.
     341     *
     342     */
     343    private void handleUnknown(int type, String id, byte payload[]) {
     344        _log.error("\n\n=============== Unknown packet! " + "============"
     345                   + "\nType: " + (int) type
     346                   + "\nID:   " + getReadableForm(id)
     347                   + "\nBase64'ed Data: " + Base64.encode(payload)
     348                   + "\n\n\n");
     349        if (id != null) {
     350            synchronized (lock) {
     351                _inSockets.remove(id);
     352                _outSockets.remove(id);
     353            }
     354        }
     355    }
     356   
    231357    public void reportAbuse(I2PSession session, int severity) {
    232358        _log.error("Abuse reported [" + severity + "]");
     
    259385     * @throws I2PException if there is some other I2P-related problem
    260386     */
    261     public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException, ConnectException, NoRouteToHostException, InterruptedIOException {
    262 
     387    public I2PSocket connect(Destination peer, I2PSocketOptions options)
     388                             throws I2PException, ConnectException,
     389                             NoRouteToHostException, InterruptedIOException {
    263390        String localID, lcID;
    264391        I2PSocketImpl s;
     
    267394            lcID = getReadableForm(localID);
    268395            s = new I2PSocketImpl(peer, this, true, localID);
    269             _outSockets.put(s.getLocalID(), s);
     396            _outSockets.put(localID, s);
    270397        }
    271398        try {
     
    273400            _session.getMyDestination().writeBytes(pubkey);
    274401            String remoteID;
    275             byte[] packet = makePacket((byte) 0xA1, localID, pubkey.toByteArray());
     402            byte[] packet = makePacket((byte) SYN, localID, pubkey.toByteArray());
    276403            boolean sent = false;
    277             synchronized (_session) {
    278                 sent = _session.sendMessage(peer, packet);
    279             }
     404            sent = _session.sendMessage(peer, packet);
    280405            if (!sent) {
    281406                _log.info("Unable to send & receive ack for SYN packet");
     
    286411            }
    287412            remoteID = s.getRemoteID(true, options.getConnectTimeout());
    288             if (remoteID == null) { throw new ConnectException("Connection refused by peer"); }
    289             if ("".equals(remoteID)) { throw new NoRouteToHostException("Unable to reach peer"); }
    290             _log.debug("TIMING: s given out for remoteID " + getReadableForm(remoteID));
     413            if (remoteID == null) throw new ConnectException("Connection refused by peer");
     414            if ("".equals(remoteID)) throw new NoRouteToHostException("Unable to reach peer");
     415            if (_log.shouldLog(Log.DEBUG))
     416                _log.debug("TIMING: s given out for remoteID " + getReadableForm(remoteID));
    291417            return s;
    292418        } catch (InterruptedIOException ioe) {
     
    325451     * @throws I2PException if there is some other I2P-related problem
    326452     */
    327     public I2PSocket connect(Destination peer) throws I2PException, ConnectException, NoRouteToHostException, InterruptedIOException {
     453    public I2PSocket connect(Destination peer) throws I2PException, ConnectException,
     454                                               NoRouteToHostException, InterruptedIOException {
    328455        return connect(peer, null);
    329456    }
     
    407534    public boolean ping(Destination peer, long timeoutMs) {
    408535        try {
    409             return _session.sendMessage(peer, new byte[] { (byte) 0xFF});
     536            return _session.sendMessage(peer, new byte[] { (byte) CHAFF});
    410537        } catch (I2PException ex) {
    411538            _log.error("I2PException:", ex);
     
    416543    public void removeSocket(I2PSocketImpl sock) {
    417544        synchronized (lock) {
    418             _log.debug("Removing socket \""
    419                        + getReadableForm(sock.getLocalID()) + "\"");
     545            _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\"");
    420546            _inSockets.remove(sock.getLocalID());
    421547            _outSockets.remove(sock.getLocalID());
     
    425551
    426552    public static String getReadableForm(String id) {
    427         try {
    428             if (id == null) return "(null)";
    429             if (id.length() != 3) return "Bogus";
    430             return Base64.encode(id.getBytes("ISO-8859-1"));
    431         } catch (UnsupportedEncodingException ex) {
    432             ex.printStackTrace();
    433             return null;
    434         }
     553        if (id == null) return "(null)";
     554        if (id.length() != 3) return "Bogus";
     555        return Base64.encode(toBytes(id));
    435556    }
    436557
     
    440561     * @param uniqueIn map of already known local IDs so we don't collide. WARNING - NOT THREADSAFE!
    441562     */
    442     public static String makeID(HashMap uniqueIn) {
     563    private static String makeID(HashMap uniqueIn) {
    443564        String newID;
    444         try {
    445             do {
    446                 int id = (int) (Math.random() * 16777215 + 1);
    447                 byte[] nid = new byte[3];
    448                 nid[0] = (byte) (id / 65536);
    449                 nid[1] = (byte) ((id / 256) % 256);
    450                 nid[2] = (byte) (id % 256);
    451                 newID = new String(nid, "ISO-8859-1");
    452             } while (uniqueIn.get(newID) != null);
    453             return newID;
    454         } catch (UnsupportedEncodingException ex) {
    455             ex.printStackTrace();
    456             return null;
    457         }
     565        do {
     566            int id = (int) (Math.random() * 16777215 + 1);
     567            byte[] nid = new byte[3];
     568            nid[0] = (byte) (id / 65536);
     569            nid[1] = (byte) ((id / 256) % 256);
     570            nid[2] = (byte) (id % 256);
     571            newID = toString(nid);
     572        } while (uniqueIn.get(newID) != null);
     573        return newID;
    458574    }
    459575
     
    463579     */
    464580    public static byte[] makePacket(byte type, String id, byte[] payload) {
    465         try {
    466             byte[] packet = new byte[payload.length + 4];
    467             packet[0] = type;
    468             byte[] temp = id.getBytes("ISO-8859-1");
    469             if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length);
    470             System.arraycopy(temp, 0, packet, 1, 3);
    471             System.arraycopy(payload, 0, packet, 4, payload.length);
    472             return packet;
    473         } catch (UnsupportedEncodingException ex) {
    474             if (_log.shouldLog(Log.ERROR)) _log.error("Error building the packet", ex);
    475             return new byte[0];
     581        byte[] packet = new byte[payload.length + 4];
     582        packet[0] = type;
     583        byte[] temp = toBytes(id);
     584        if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length);
     585        System.arraycopy(temp, 0, packet, 1, 3);
     586        System.arraycopy(payload, 0, packet, 4, payload.length);
     587        return packet;
     588    }
     589   
     590    private static final String toString(byte data[]) {
     591        try {
     592            return new String(data, "ISO-8859-1");
     593        } catch (UnsupportedEncodingException uee) {
     594            throw new RuntimeException("WTF!  iso-8859-1 isn't supported?");
     595        }
     596    }
     597   
     598    private static final byte[] toBytes(String str) {
     599        try {
     600            return str.getBytes("ISO-8859-1");
     601        } catch (UnsupportedEncodingException uee) {
     602            throw new RuntimeException("WTF!  iso-8859-1 isn't supported?");
    476603        }
    477604    }
Note: See TracChangeset for help on using the changeset viewer.