Changeset 868e5e9


Ignore:
Timestamp:
Nov 25, 2015 10:59:41 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
807e5bf
Parents:
612e01c
Message:

More v3 support
Convert IDs from ints to Strings
Wait for STREAM STATUS
Open 2nd socket for sender
v3 sender working

Location:
apps/sam/java/src/net/i2p/sam/client
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java

    r612e01c r868e5e9  
    1111    public void namingReplyReceived(String name, String result, String value, String message) {}
    1212    public void sessionStatusReceived(String result, String destination, String message) {}
    13     public void streamClosedReceived(String result, int id, String message) {}
    14     public void streamConnectedReceived(String remoteDestination, int id) {}
    15     public void streamDataReceived(int id, byte[] data, int offset, int length) {}
    16     public void streamStatusReceived(String result, int id, String message) {}
     13    public void streamClosedReceived(String result, String id, String message) {}
     14    public void streamConnectedReceived(String remoteDestination, String id) {}
     15    public void streamDataReceived(String id, byte[] data, int offset, int length) {}
     16    public void streamStatusReceived(String result, String id, String message) {}
    1717    public void unknownMessageReceived(String major, String minor, Properties params) {}
    1818}
  • apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java

    r612e01c r868e5e9  
    1919    private final Object _helloLock = new Object();
    2020    private Boolean _sessionCreateOk;
     21    private Boolean _streamStatusOk;
    2122    private final Object _sessionCreateLock = new Object();
    2223    private final Object _namingReplyLock = new Object();
     24    private final Object _streamStatusLock = new Object();
    2325    private final Map<String,String> _namingReplies = new HashMap<String,String>();
    2426
     
    2830    }
    2931   
    30         @Override
     32    @Override
    3133    public void helloReplyReceived(boolean ok, String version) {
    3234        synchronized (_helloLock) {
     
    4042    }
    4143
    42         @Override
     44    @Override
    4345    public void sessionStatusReceived(String result, String destination, String msg) {
    4446        synchronized (_sessionCreateLock) {
     
    5153    }
    5254
    53         @Override
     55    @Override
    5456    public void namingReplyReceived(String name, String result, String value, String msg) {
    5557        synchronized (_namingReplyLock) {
     
    6264    }
    6365
    64         @Override
     66    @Override
     67    public void streamStatusReceived(String result, String id, String message) {
     68        synchronized (_streamStatusLock) {
     69            if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
     70                _streamStatusOk = Boolean.TRUE;
     71            else
     72                _streamStatusOk = Boolean.FALSE;
     73            _streamStatusLock.notifyAll();
     74        }
     75    }
     76
     77    @Override
    6578    public void unknownMessageReceived(String major, String minor, Properties params) {
    6679        _log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]");
     
    107120        }
    108121    }
     122
     123    /**
     124     * Wait for the stream to be created, returning true if everything went ok
     125     *
     126     * @return true if everything ok
     127     */
     128    public boolean waitForStreamStatusReply() {
     129        while (true) {
     130            try {
     131                synchronized (_streamStatusLock) {
     132                    if (_streamStatusOk == null)
     133                        _streamStatusLock.wait();
     134                    else
     135                        return _streamStatusOk.booleanValue();
     136                }
     137            } catch (InterruptedException ie) {}
     138        }
     139    }
    109140   
    110141    /**
  • apps/sam/java/src/net/i2p/sam/client/SAMReader.java

    r612e01c r868e5e9  
    7474        public void helloReplyReceived(boolean ok, String version);
    7575        public void sessionStatusReceived(String result, String destination, String message);
    76         public void streamStatusReceived(String result, int id, String message);
    77         public void streamConnectedReceived(String remoteDestination, int id);
    78         public void streamClosedReceived(String result, int id, String message);
    79         public void streamDataReceived(int id, byte data[], int offset, int length);
     76        public void streamStatusReceived(String result, String id, String message);
     77        public void streamConnectedReceived(String remoteDestination, String id);
     78        public void streamClosedReceived(String result, String id, String message);
     79        public void streamDataReceived(String id, byte data[], int offset, int length);
    8080        public void namingReplyReceived(String name, String result, String value, String message);
    8181        public void destReplyReceived(String publicKey, String privateKey);
     
    182182                String id = params.getProperty("ID");
    183183                String msg = params.getProperty("MESSAGE");
     184                // id is null in v3, so pass it through regardless
     185                //if (id != null) {
     186                    _listener.streamStatusReceived(result, id, msg);
     187                //} else {
     188                //    _listener.unknownMessageReceived(major, minor, params);
     189                //}
     190            } else if ("CONNECTED".equals(minor)) {
     191                String dest = params.getProperty("DESTINATION");
     192                String id = params.getProperty("ID");
    184193                if (id != null) {
    185                     try {
    186                         _listener.streamStatusReceived(result, Integer.parseInt(id), msg);
    187                     } catch (NumberFormatException nfe) {
    188                         _listener.unknownMessageReceived(major, minor, params);
    189                     }
     194                    _listener.streamConnectedReceived(dest, id);
    190195                } else {
    191196                    _listener.unknownMessageReceived(major, minor, params);
    192197                }
    193             } else if ("CONNECTED".equals(minor)) {
    194                 String dest = params.getProperty("DESTINATION");
    195                 String id = params.getProperty("ID");
     198            } else if ("CLOSED".equals(minor)) {
     199                String result = params.getProperty("RESULT");
     200                String id = params.getProperty("ID");
     201                String msg = params.getProperty("MESSAGE");
    196202                if (id != null) {
    197                     try {
    198                         _listener.streamConnectedReceived(dest, Integer.parseInt(id));
    199                     } catch (NumberFormatException nfe) {
    200                         _listener.unknownMessageReceived(major, minor, params);
    201                     }
    202                 } else {
    203                     _listener.unknownMessageReceived(major, minor, params);
    204                 }
    205             } else if ("CLOSED".equals(minor)) {
    206                 String result = params.getProperty("RESULT");
    207                 String id = params.getProperty("ID");
    208                 String msg = params.getProperty("MESSAGE");
    209                 if (id != null) {
    210                     try {
    211                         _listener.streamClosedReceived(result, Integer.parseInt(id), msg);
    212                     } catch (NumberFormatException nfe) {
    213                         _listener.unknownMessageReceived(major, minor, params);
    214                     }
     203                    _listener.streamClosedReceived(result, id, msg);
    215204                } else {
    216205                    _listener.unknownMessageReceived(major, minor, params);
     
    221210                if (id != null) {
    222211                    try {
    223                         int idVal = Integer.parseInt(id);
    224212                        int sizeVal = Integer.parseInt(size);
    225213                       
     
    229217                            _listener.unknownMessageReceived(major, minor, params);
    230218                        } else {
    231                             _listener.streamDataReceived(idVal, data, 0, sizeVal);
     219                            _listener.streamDataReceived(id, data, 0, sizeVal);
    232220                        }
    233221                    } catch (NumberFormatException nfe) {
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java

    r612e01c r868e5e9  
    3131    private final String _dataFile;
    3232    private String _conOptions;
    33     private Socket _samSocket;
    34     private OutputStream _samOut;
    35     private InputStream _samIn;
    36     private SAMReader _reader;
     33    private SAMReader _reader, _reader2;
     34    private boolean _isV3;
     35    private String _v3ID;
    3736    //private boolean _dead;
    38     private final SAMEventHandler _eventHandler;
    3937    /** Connection id (Integer) to peer (Flooder) */
    40     private final Map<Integer, Sender> _remotePeers;
     38    private final Map<String, Sender> _remotePeers;
    4139   
    4240    public static void main(String args[]) {
     
    6159        _dataFile = dataFile;
    6260        _conOptions = "";
    63         _eventHandler = new SendEventHandler(_context);
    64         _remotePeers = new HashMap<Integer,Sender>();
     61        _remotePeers = new HashMap<String, Sender>();
    6562    }
    6663   
     
    6865        if (_log.shouldLog(Log.DEBUG))
    6966            _log.debug("Starting up");
    70         boolean ok = connect();
    71         if (_log.shouldLog(Log.DEBUG))
    72             _log.debug("Connected: " + ok);
    73         if (ok) {
    74             _reader = new SAMReader(_context, _samIn, _eventHandler);
     67        try {
     68            Socket sock = connect();
     69            SAMEventHandler eventHandler = new SendEventHandler(_context);
     70            _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
    7571            _reader.startReading();
    7672            if (_log.shouldLog(Log.DEBUG))
    7773                _log.debug("Reader created");
    78             String ourDest = handshake(version);
     74            OutputStream out = sock.getOutputStream();
     75            String ourDest = handshake(out, version, true, eventHandler);
     76            if (ourDest == null)
     77                throw new IOException("handshake failed");
    7978            if (_log.shouldLog(Log.DEBUG))
    8079                _log.debug("Handshake complete.  we are " + ourDest);
     80            if (_isV3) {
     81                Socket sock2 = connect();
     82                eventHandler = new SendEventHandler(_context);
     83                _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
     84                _reader2.startReading();
     85                if (_log.shouldLog(Log.DEBUG))
     86                    _log.debug("Reader2 created");
     87                out = sock2.getOutputStream();
     88                String ok = handshake(out, version, false, eventHandler);
     89                if (ok == null)
     90                    throw new IOException("2nd handshake failed");
     91                if (_log.shouldLog(Log.DEBUG))
     92                    _log.debug("Handshake2 complete.");
     93            }
    8194            if (ourDest != null) {
    82                 send();
    83             }
     95                send(out, eventHandler);
     96            }
     97        } catch (IOException e) {
     98            _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
     99            if (_reader != null)
     100                _reader.stopReading();
     101            if (_reader2 != null)
     102                _reader2.stopReading();
    84103        }
    85104    }
     
    89108
    90109        @Override
    91         public void streamClosedReceived(String result, int id, String message) {
     110        public void streamClosedReceived(String result, String id, String message) {
    92111            Sender sender = null;
    93112            synchronized (_remotePeers) {
    94                 sender = _remotePeers.remove(Integer.valueOf(id));
     113                sender = _remotePeers.remove(id);
    95114            }
    96115            if (sender != null) {
     
    104123    }
    105124   
    106     private boolean connect() {
    107         try {
    108             _samSocket = new Socket(_samHost, Integer.parseInt(_samPort));
    109             _samOut = _samSocket.getOutputStream();
    110             _samIn = _samSocket.getInputStream();
    111             return true;
    112         } catch (Exception e) {
    113             _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
    114             return false;
    115         }
    116     }
    117    
    118     private String handshake(String version) {
    119         synchronized (_samOut) {
     125    private Socket connect() throws IOException {
     126        return new Socket(_samHost, Integer.parseInt(_samPort));
     127    }
     128   
     129    /** @return our b64 dest or null */
     130    private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) {
     131        synchronized (samOut) {
    120132            try {
    121                 _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
    122                 _samOut.flush();
     133                samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
     134                samOut.flush();
    123135                if (_log.shouldLog(Log.DEBUG))
    124136                    _log.debug("Hello sent");
    125                 String hisVersion = _eventHandler.waitForHelloReply();
     137                String hisVersion = eventHandler.waitForHelloReply();
    126138                if (_log.shouldLog(Log.DEBUG))
    127139                    _log.debug("Hello reply found: " + hisVersion);
    128140                if (hisVersion == null)
    129141                    throw new IOException("Hello failed");
    130                 boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
    131                 if (isV3) {
     142                if (!isMaster)
     143                    return "OK";
     144                _isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
     145                if (_isV3) {
    132146                    byte[] id = new byte[5];
    133147                    _context.random().nextBytes(id);
    134                     _conOptions = "ID=" + Base32.encode(id);
     148                    _v3ID = Base32.encode(id);
     149                    _conOptions = "ID=" + _v3ID;
    135150                }
    136151                String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n";
    137                 _samOut.write(req.getBytes());
    138                 _samOut.flush();
     152                samOut.write(req.getBytes());
     153                samOut.flush();
    139154                if (_log.shouldLog(Log.DEBUG))
    140155                    _log.debug("Session create sent");
    141                 boolean ok = _eventHandler.waitForSessionCreateReply();
     156                boolean ok = eventHandler.waitForSessionCreateReply();
    142157                if (!ok)
    143158                    throw new IOException("Session create failed");
     
    146161
    147162                req = "NAMING LOOKUP NAME=ME\n";
    148                 _samOut.write(req.getBytes());
    149                 _samOut.flush();
     163                samOut.write(req.getBytes());
     164                samOut.flush();
    150165                if (_log.shouldLog(Log.DEBUG))
    151166                    _log.debug("Naming lookup sent");
    152                 String destination = _eventHandler.waitForNamingReply("ME");
     167                String destination = eventHandler.waitForNamingReply("ME");
    153168                if (_log.shouldLog(Log.DEBUG))
    154169                    _log.debug("Naming lookup reply found: " + destination);
     
    167182    }
    168183   
    169     private void send() {
    170         Sender sender = new Sender();
     184    private void send(OutputStream samOut, SAMEventHandler eventHandler) {
     185        Sender sender = new Sender(samOut, eventHandler);
    171186        boolean ok = sender.openConnection();
    172187        if (ok) {
     
    177192   
    178193    private class Sender implements Runnable {
    179         private int _connectionId;
     194        private final String _connectionId;
    180195        private String _remoteDestination;
    181196        private InputStream _in;
     
    183198        private long _started;
    184199        private long _totalSent;
    185        
    186         public Sender() {}
     200        private final OutputStream _samOut;
     201        private final SAMEventHandler _eventHandler;
     202       
     203        public Sender(OutputStream samOut, SAMEventHandler eventHandler) {
     204            _samOut = samOut;
     205            _eventHandler = eventHandler;
     206            synchronized (_remotePeers) {
     207                if (_v3ID != null)
     208                    _connectionId = _v3ID;
     209                else
     210                    _connectionId = Integer.toString(_remotePeers.size() + 1);
     211                _remotePeers.put(_connectionId, Sender.this);
     212            }
     213        }
    187214       
    188215        public boolean openConnection() {
     
    194221
    195222                _remoteDestination = new String(dest, 0, read);
    196                 synchronized (_remotePeers) {
    197                     _connectionId = _remotePeers.size() + 1;
    198                     _remotePeers.put(Integer.valueOf(_connectionId), Sender.this);
    199                 }
    200223
    201224                _context.statManager().createRateStat("send." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
     
    208231                    _samOut.flush();
    209232                }
     233                _log.debug("STREAM CONNECT sent, waiting for STREAM STATUS...");
     234                boolean ok = _eventHandler.waitForStreamStatusReply();
     235                if (!ok)
     236                    throw new IOException("STREAM CONNECT failed");
    210237
    211238                _in = new FileInputStream(_dataFile);
     
    223250        }
    224251       
    225         public int getConnectionId() { return _connectionId; }
     252        public String getConnectionId() { return _connectionId; }
    226253        public String getDestination() { return _remoteDestination; }
    227254       
     
    253280                        lastSend = now;
    254281                       
    255                         byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes();
    256282                        synchronized (_samOut) {
    257                             _samOut.write(msg);
     283                            if (!_isV3) {
     284                                byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes();
     285                                _samOut.write(msg);
     286                            }
    258287                            _samOut.write(data, 0, read);
    259288                            _samOut.flush();
     
    269298            }
    270299           
    271             byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
    272             try {
    273                 synchronized (_samOut) {
    274                     _samOut.write(msg);
    275                     _samOut.flush();
    276                 }
    277             } catch (IOException ioe) {
    278                 _log.info("Error closing", ioe);
     300            if (_isV3) {
     301                try {
     302                    _samOut.close();
     303                } catch (IOException ioe) {
     304                    _log.info("Error closing", ioe);
     305                }
     306            } else {
     307                byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
     308                try {
     309                    synchronized (_samOut) {
     310                        _samOut.write(msg);
     311                        _samOut.flush();
     312                        _samOut.close();
     313                    }
     314                } catch (IOException ioe) {
     315                    _log.info("Error closing", ioe);
     316                }
    279317            }
    280318           
     
    284322            if (toSend != _totalSent)
    285323                _log.error("Only sent " + _totalSent + " of " + toSend + " bytes");
     324            if (_reader2 != null)
     325                _reader2.stopReading();
    286326            // stop the reader, since we're only doing this once for testing
    287327            // you wouldn't do this in a real application
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    r612e01c r868e5e9  
    3232    private final String _sinkDir;
    3333    private String _conOptions;
    34     private Socket _samSocket;
    35     private OutputStream _samOut;
    36     private InputStream _samIn;
    3734    private SAMReader _reader;
     35    private boolean _isV3;
    3836    //private boolean _dead;
    3937    private final SAMEventHandler _eventHandler;
    4038    /** Connection id (Integer) to peer (Flooder) */
    41     private final Map<Integer, Sink> _remotePeers;
     39    private final Map<String, Sink> _remotePeers;
    4240   
    4341    public static void main(String args[]) {
     
    6260        _conOptions = "";
    6361        _eventHandler = new SinkEventHandler(_context);
    64         _remotePeers = new HashMap<Integer,Sink>();
     62        _remotePeers = new HashMap<String, Sink>();
    6563    }
    6664   
     
    6866        if (_log.shouldLog(Log.DEBUG))
    6967            _log.debug("Starting up");
    70         boolean ok = connect();
    71         if (_log.shouldLog(Log.DEBUG))
    72             _log.debug("Connected: " + ok);
    73         if (ok) {
    74             _reader = new SAMReader(_context, _samIn, _eventHandler);
     68        try {
     69            Socket sock = connect();
     70            _reader = new SAMReader(_context, sock.getInputStream(), _eventHandler);
    7571            _reader.startReading();
    7672            if (_log.shouldLog(Log.DEBUG))
    7773                _log.debug("Reader created");
    78             String ourDest = handshake(version);
     74            OutputStream out = sock.getOutputStream();
     75            String ourDest = handshake(out, version, true);
    7976            if (_log.shouldLog(Log.DEBUG))
    8077                _log.debug("Handshake complete.  we are " + ourDest);
     
    8582                _reader.stopReading();
    8683            }
     84        } catch (IOException e) {
     85            _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
    8786        }
    8887    }
     
    9392
    9493        @Override
    95         public void streamClosedReceived(String result, int id, String message) {
     94        public void streamClosedReceived(String result, String id, String message) {
    9695            Sink sink = null;
    9796            synchronized (_remotePeers) {
    98                 sink = _remotePeers.remove(Integer.valueOf(id));
     97                sink = _remotePeers.remove(id);
    9998            }
    10099            if (sink != null) {
     
    108107
    109108        @Override
    110         public void streamDataReceived(int id, byte data[], int offset, int length) {
     109        public void streamDataReceived(String id, byte data[], int offset, int length) {
    111110            Sink sink = null;
    112111            synchronized (_remotePeers) {
    113                 sink = _remotePeers.get(Integer.valueOf(id));
     112                sink = _remotePeers.get(id);
    114113            }
    115114            if (sink != null) {
     
    121120
    122121        @Override
    123         public void streamConnectedReceived(String dest, int id) { 
     122        public void streamConnectedReceived(String dest, String id) { 
    124123            if (_log.shouldLog(Log.DEBUG))
    125124                _log.debug("Connection " + id + " received from " + dest);
     
    128127                Sink sink = new Sink(id, dest);
    129128                synchronized (_remotePeers) {
    130                     _remotePeers.put(Integer.valueOf(id), sink);
     129                    _remotePeers.put(id, sink);
    131130                }
    132131            } catch (IOException ioe) {
     
    136135    }
    137136   
    138     private boolean connect() {
    139         try {
    140             _samSocket = new Socket(_samHost, Integer.parseInt(_samPort));
    141             _samOut = _samSocket.getOutputStream();
    142             _samIn = _samSocket.getInputStream();
    143             return true;
    144         } catch (Exception e) {
    145             _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
    146             return false;
    147         }
    148     }
    149    
    150     private String handshake(String version) {
    151         synchronized (_samOut) {
     137    private Socket connect() throws IOException {
     138        return new Socket(_samHost, Integer.parseInt(_samPort));
     139    }
     140   
     141    /** @return our b64 dest or null */
     142    private String handshake(OutputStream samOut, String version, boolean isMaster) {
     143        synchronized (samOut) {
    152144            try {
    153                 _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
    154                 _samOut.flush();
     145                samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
     146                samOut.flush();
    155147                if (_log.shouldLog(Log.DEBUG))
    156148                    _log.debug("Hello sent");
     
    160152                if (hisVersion == null)
    161153                    throw new IOException("Hello failed");
    162                 boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
     154                _isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
    163155                String dest;
    164                 if (isV3) {
     156                if (_isV3) {
    165157                    // we use the filename as the name in sam.keys
    166158                    // and read it in ourselves
     
    184176                            _log.debug("Requesting new transient destination");
    185177                    }
    186                     byte[] id = new byte[5];
    187                     _context.random().nextBytes(id);
    188                     _conOptions = "ID=" + Base32.encode(id);
     178                    if (isMaster) {
     179                        byte[] id = new byte[5];
     180                        _context.random().nextBytes(id);
     181                        _conOptions = "ID=" + Base32.encode(id);
     182                    }
    189183                } else {
    190184                    // we use the filename as the name in sam.keys
     
    193187                }
    194188                String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n";
    195                 _samOut.write(req.getBytes());
    196                 _samOut.flush();
     189                samOut.write(req.getBytes());
     190                samOut.flush();
    197191                if (_log.shouldLog(Log.DEBUG))
    198192                    _log.debug("Session create sent");
     
    204198
    205199                req = "NAMING LOOKUP NAME=ME\n";
    206                 _samOut.write(req.getBytes());
    207                 _samOut.flush();
     200                samOut.write(req.getBytes());
     201                samOut.flush();
    208202                if (_log.shouldLog(Log.DEBUG))
    209203                    _log.debug("Naming lookup sent");
     
    228222    private boolean writeDest(String dest) {
    229223        File f = new File(_destFile);
     224/*
    230225        if (f.exists()) {
    231226            if (_log.shouldLog(Log.DEBUG))
    232                 _log.debug("Destination file exists, not overwriting:" + _destFile);
     227                _log.debug("Destination file exists, not overwriting: " + _destFile);
    233228            return false;
    234229        }
     230*/
    235231        FileOutputStream fos = null;
    236232        try {
     
    249245   
    250246    private class Sink {
    251         private final int _connectionId;
     247        private final String _connectionId;
    252248        private final String _remoteDestination;
    253249        private volatile boolean _closed;
     
    256252        private final OutputStream _out;
    257253       
    258         public Sink(int conId, String remDest) throws IOException {
     254        public Sink(String conId, String remDest) throws IOException {
    259255            _connectionId = conId;
    260256            _remoteDestination = remDest;
     
    274270        }
    275271       
    276         public int getConnectionId() { return _connectionId; }
     272        public String getConnectionId() { return _connectionId; }
    277273        public String getDestination() { return _remoteDestination; }
    278274       
Note: See TracChangeset for help on using the changeset viewer.