Changeset d96ddd1
- Timestamp:
- Jun 3, 2015 11:42:54 AM (6 years ago)
- Branches:
- master
- Children:
- f57e37d
- Parents:
- 7b711eb (diff), b5455cee (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Files:
-
- 1 added
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
apps/sam/java/src/net/i2p/sam/SAMBridge.java
r7b711eb rd96ddd1 20 20 import java.nio.ByteBuffer; 21 21 import java.util.Arrays; 22 import java.util.ArrayList; 22 23 import java.util.HashMap; 24 import java.util.HashSet; 25 import java.util.List; 23 26 import java.util.Map; 24 27 import java.util.Properties; 28 import java.util.Set; 25 29 26 30 import net.i2p.I2PAppContext; … … 31 35 import net.i2p.util.I2PAppThread; 32 36 import net.i2p.util.Log; 37 import net.i2p.util.PortMapper; 33 38 34 39 /** … … 56 61 */ 57 62 private final Map<String,String> nameToPrivKeys; 63 private final Set<Handler> _handlers; 58 64 59 65 private volatile boolean acceptConnections = true; … … 96 102 persistFilename = options.keyFile; 97 103 nameToPrivKeys = new HashMap<String,String>(8); 104 _handlers = new HashSet<Handler>(8); 98 105 this.i2cpProps = options.opts; 99 106 _state = INITIALIZED; … … 125 132 persistFilename = persistFile; 126 133 nameToPrivKeys = new HashMap<String,String>(8); 134 _handlers = new HashSet<Handler>(8); 127 135 loadKeys(); 128 136 try { … … 210 218 211 219 /** 212 * Load up the keys from the persistFilename 213 * 220 * Load up the keys from the persistFilename. 221 * TODO use DataHelper 222 * TODO store in config dir, not base dir 214 223 */ 215 224 private void loadKeys() { … … 219 228 try { 220 229 br = new BufferedReader(new InputStreamReader( 221 new FileInputStream(persistFilename) ));230 new FileInputStream(persistFilename), "UTF-8")); 222 231 String line = null; 223 232 while ( (line = br.readLine()) != null) { … … 227 236 nameToPrivKeys.put(name, privKeys); 228 237 } 238 if (_log.shouldInfo()) 239 _log.info("Loaded " + nameToPrivKeys.size() + " private keys from " + persistFilename); 229 240 } catch (FileNotFoundException fnfe) { 230 241 _log.warn("Key file does not exist at " + persistFilename); … … 238 249 239 250 /** 240 * Store the current keys to disk in the location specified on creation 241 * 251 * Store the current keys to disk in the location specified on creation. 252 * TODO use DataHelper 253 * TODO store in config dir, not base dir 242 254 */ 243 255 private void storeKeys() { … … 249 261 String name = entry.getKey(); 250 262 String privKeys = entry.getValue(); 251 out.write(name.getBytes( ));263 out.write(name.getBytes("UTF-8")); 252 264 out.write('='); 253 out.write(privKeys.getBytes( ));265 out.write(privKeys.getBytes("UTF-8")); 254 266 out.write('\n'); 255 267 } 268 if (_log.shouldInfo()) 269 _log.info("Saved " + nameToPrivKeys.size() + " private keys to " + persistFilename); 256 270 } catch (IOException ioe) { 257 271 _log.error("Error writing out the SAM keys to " + persistFilename, ioe); … … 262 276 } 263 277 278 /** 279 * Handlers must call on startup 280 * @since 0.9.20 281 */ 282 public void register(Handler handler) { 283 if (_log.shouldInfo()) 284 _log.info("Register " + handler); 285 synchronized (_handlers) { 286 _handlers.add(handler); 287 } 288 } 289 290 /** 291 * Handlers must call on stop 292 * @since 0.9.20 293 */ 294 public void unregister(Handler handler) { 295 if (_log.shouldInfo()) 296 _log.info("Unregister " + handler); 297 synchronized (_handlers) { 298 _handlers.remove(handler); 299 } 300 } 301 302 /** 303 * Stop all the handlers. 304 * @since 0.9.20 305 */ 306 private void stopHandlers() { 307 List<Handler> handlers = null; 308 synchronized (_handlers) { 309 if (!_handlers.isEmpty()) { 310 handlers = new ArrayList<Handler>(_handlers); 311 _handlers.clear(); 312 } 313 } 314 if (handlers != null) { 315 for (Handler handler : handlers) { 316 if (_log.shouldInfo()) 317 _log.info("Stopping " + handler); 318 handler.stopHandling(); 319 } 320 } 321 } 322 264 323 ////// begin ClientApp interface, use only if using correct construtor 265 324 … … 271 330 return; 272 331 changeState(STARTING); 332 synchronized (_handlers) { 333 _handlers.clear(); 334 } 273 335 loadKeys(); 274 336 try { … … 286 348 287 349 /** 288 * Does NOT stop existing sessions. 350 * As of 0.9.20, stops running handlers and sessions. 351 * 289 352 * @since 0.9.6 290 353 */ … … 294 357 changeState(STOPPING); 295 358 acceptConnections = false; 359 stopHandlers(); 296 360 if (_runner != null) 297 361 _runner.interrupt(); 298 362 else 299 363 changeState(STOPPED); 300 // TODO does not stop active connections / sessions301 364 } 302 365 … … 376 439 */ 377 440 private void startThread() { 378 I2PAppThread t = new I2PAppThread(this, "SAMListener ");441 I2PAppThread t = new I2PAppThread(this, "SAMListener " + _listenPort); 379 442 if (Boolean.parseBoolean(System.getProperty("sam.shutdownOnOOM"))) { 380 443 t.addOOMEventThreadListener(new I2PAppThread.OOMEventListener() { … … 488 551 if (_mgr != null) 489 552 _mgr.register(this); 553 I2PAppContext.getGlobalContext().portMapper().register(PortMapper.SVC_SAM, _listenPort); 490 554 try { 491 555 while (acceptConnections) { … … 496 560 + s.socket().getPort()); 497 561 498 class HelloHandler implements Runnable {499 500 501 502 562 class HelloHandler implements Runnable, Handler { 563 private final SocketChannel s; 564 private final SAMBridge parent; 565 566 HelloHandler(SocketChannel s, SAMBridge parent) { 503 567 this.s = s ; 504 568 this.parent = parent ; 505 } 506 507 public void run() { 569 } 570 571 public void run() { 572 parent.register(this); 508 573 try { 509 SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps );574 SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps, parent); 510 575 if (handler == null) { 511 576 if (_log.shouldLog(Log.DEBUG)) … … 516 581 return; 517 582 } 518 handler.setBridge(parent);519 583 handler.startHandling(); 520 584 } catch (SAMException e) { … … 527 591 try { s.close(); } catch (IOException ioe) {} 528 592 _log.log(Log.CRIT, "Unexpected error handling SAM connection", ee); 529 } 530 } 593 } finally { 594 parent.unregister(this); 595 } 596 } 597 598 /** @since 0.9.20 */ 599 public void stopHandling() { 600 try { s.close(); } catch (IOException ioe) {} 601 } 531 602 } 532 // TODO: Handler threads are not saved or tracked and cannot be stopped 533 new I2PAppThread(new HelloHandler(s,this), "HelloHandler").start(); 603 new I2PAppThread(new HelloHandler(s,this), "SAM HelloHandler").start(); 534 604 } 535 605 changeState(STOPPING); … … 547 617 serverSocket.close(); 548 618 } catch (IOException e) {} 619 I2PAppContext.getGlobalContext().portMapper().unregister(PortMapper.SVC_SAM); 620 stopHandlers(); 549 621 changeState(STOPPED); 550 622 } -
apps/sam/java/src/net/i2p/sam/SAMHandler.java
r7b711eb rd96ddd1 26 26 * @author human 27 27 */ 28 abstract class SAMHandler implements Runnable {28 abstract class SAMHandler implements Runnable, Handler { 29 29 30 30 protected final Log _log; 31 31 32 protected I2PAppThread thread = null;33 protected SAMBridge bridge = null;32 protected I2PAppThread thread; 33 protected final SAMBridge bridge; 34 34 35 35 private final Object socketWLock = new Object(); // Guards writings on socket … … 42 42 protected final Properties i2cpProps; 43 43 44 pr ivatefinal Object stopLock = new Object();45 pr ivate volatileboolean stopHandler;44 protected final Object stopLock = new Object(); 45 protected boolean stopHandler; 46 46 47 47 /** … … 54 54 * @throws IOException 55 55 */ 56 protected SAMHandler(SocketChannel s, 57 int verMajor, int verMinor, Properties i2cpProps) throws IOException {56 protected SAMHandler(SocketChannel s, int verMajor, int verMinor, 57 Properties i2cpProps, SAMBridge parent) throws IOException { 58 58 _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); 59 59 socket = s; … … 62 62 this.verMinor = verMinor; 63 63 this.i2cpProps = i2cpProps; 64 bridge = parent; 64 65 } 65 66 … … 69 70 */ 70 71 public final void startHandling() { 71 thread = new I2PAppThread(this, "SAMHandler");72 thread = new I2PAppThread(this, getClass().getSimpleName()); 72 73 thread.start(); 73 74 } 74 75 public void setBridge(SAMBridge bridge) { this.bridge = bridge; } 76 75 77 76 /** 78 77 * Actually handle the SAM protocol. … … 82 81 83 82 /** 84 * Get the input stream of the socket connected to the SAM client 85 * 86 * @return input stream 87 * @throws IOException 83 * Get the channel of the socket connected to the SAM client 84 * 85 * @return channel 88 86 */ 89 87 protected final SocketChannel getClientSocket() { … … 157 155 158 156 /** 159 * Stop the SAM handler 160 * 161 */ 162 public finalvoid stopHandling() {157 * Stop the SAM handler, close the client socket, 158 * unregister with the bridge. 159 */ 160 public void stopHandling() { 163 161 synchronized (stopLock) { 164 162 stopHandler = true; 165 163 } 164 try { 165 closeClientSocket(); 166 } catch (IOException e) {} 167 bridge.unregister(this); 166 168 } 167 169 … … 184 186 @Override 185 187 public final String toString() { 186 return ( "SAM handler (class: " + this.getClass().getName()188 return (this.getClass().getSimpleName() 187 189 + "; SAM version: " + verMajor + "." + verMinor 188 190 + "; client: " … … 191 193 } 192 194 195 /** 196 * Register with the bridge, call handle(), 197 * unregister with the bridge. 198 */ 193 199 public final void run() { 194 handle(); 200 bridge.register(this); 201 try { 202 handle(); 203 } finally { 204 bridge.unregister(this); 205 } 195 206 } 196 207 } -
apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java
r7b711eb rd96ddd1 39 39 * @return A SAM protocol handler, or null if the client closed before the handshake 40 40 */ 41 public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException { 41 public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps, 42 SAMBridge parent) throws SAMException { 42 43 StringTokenizer tok; 43 44 Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMHandlerFactory.class); … … 46 47 Socket sock = s.socket(); 47 48 sock.setSoTimeout(HELLO_TIMEOUT); 49 sock.setKeepAlive(true); 48 50 String line = DataHelper.readLine(sock.getInputStream()); 49 51 sock.setSoTimeout(0); … … 104 106 switch (verMajor) { 105 107 case 1: 106 handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps );108 handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps, parent); 107 109 break; 108 110 case 2: 109 handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps );111 handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps, parent); 110 112 break; 111 113 case 3: 112 handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps );114 handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps, parent); 113 115 break; 114 116 default: -
apps/sam/java/src/net/i2p/sam/SAMMessageSession.java
r7b711eb rd96ddd1 73 73 handler = new SAMMessageSessionHandler(destStream, props); 74 74 75 // FIXME don't start threads in constructors 75 76 Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); 76 77 t.start(); … … 126 127 /** 127 128 * Close a SAM message-based session. 128 *129 129 */ 130 130 public void close() { -
apps/sam/java/src/net/i2p/sam/SAMUtils.java
r7b711eb rd96ddd1 213 213 214 214 /* Dump a Properties object in an human-readable form */ 215 /**** 215 216 private static String dumpProperties(Properties props) { 216 217 StringBuilder builder = new StringBuilder(); … … 232 233 return builder.toString(); 233 234 } 235 ****/ 234 236 235 237 /**** -
apps/sam/java/src/net/i2p/sam/SAMv1Handler.java
r7b711eb rd96ddd1 61 61 * @throws IOException 62 62 */ 63 public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException { 64 this(s, verMajor, verMinor, new Properties()); 63 public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, 64 SAMBridge parent) throws SAMException, IOException { 65 this(s, verMajor, verMinor, new Properties(), parent); 65 66 } 66 67 /** … … 76 77 * @throws IOException 77 78 */ 78 public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException { 79 super(s, verMajor, verMinor, i2cpProps); 79 public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, 80 Properties i2cpProps, SAMBridge parent) throws SAMException, IOException { 81 super(s, verMajor, verMinor, i2cpProps, parent); 80 82 _id = __id.incrementAndGet(); 81 83 if (_log.shouldLog(Log.DEBUG)) 82 84 _log.debug("SAM version 1 handler instantiated"); 83 85 84 if ( ! verifVersion() ) {86 if ( ! verifVersion() ) { 85 87 throw new SAMException("BUG! Wrong protocol version!"); 86 88 } … … 184 186 } catch (IOException e) { 185 187 if (_log.shouldLog(Log.DEBUG)) 186 _log.debug("Caught IOException (" 187 + e.getMessage() + ") for message [" + msg + "]", e); 188 _log.debug("Caught IOException for message [" + msg + "]", e); 188 189 } catch (Exception e) { 189 190 _log.error("Unexpected exception for message [" + msg + "]", e); … … 194 195 closeClientSocket(); 195 196 } catch (IOException e) { 196 _log.error("Error closing socket: " + e.getMessage()); 197 if (_log.shouldWarn()) 198 _log.warn("Error closing socket", e); 197 199 } 198 200 if (getRawSession() != null) { … … 798 800 if (getRawSession() == null) { 799 801 _log.error("BUG! Received raw bytes, but session is null!"); 800 throw new NullPointerException("BUG! RAW session is null!");802 return; 801 803 } 802 804 … … 819 821 if (getRawSession() == null) { 820 822 _log.error("BUG! Got raw receiving stop, but session is null!"); 821 throw new NullPointerException("BUG! RAW session is null!");823 return; 822 824 } 823 825 … … 834 836 if (getDatagramSession() == null) { 835 837 _log.error("BUG! Received datagram bytes, but session is null!"); 836 throw new NullPointerException("BUG! DATAGRAM session is null!");838 return; 837 839 } 838 840 … … 856 858 if (getDatagramSession() == null) { 857 859 _log.error("BUG! Got datagram receiving stop, but session is null!"); 858 throw new NullPointerException("BUG! DATAGRAM session is null!");860 return; 859 861 } 860 862 … … 874 876 { 875 877 _log.error ( "BUG! Want to answer to stream SEND, but session is null!" ); 876 throw new NullPointerException ( "BUG! STREAM session is null!" );878 return; 877 879 } 878 880 … … 892 894 { 893 895 _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" ); 894 throw new NullPointerException ( "BUG! STREAM session is null!" );896 return; 895 897 } 896 898 … … 905 907 if (getStreamSession() == null) { 906 908 _log.error("BUG! Received stream connection, but session is null!"); 907 throw new NullPointerException("BUG! STREAM session is null!");909 return; 908 910 } 909 911 … … 915 917 } 916 918 919 /** @param msg may be null */ 917 920 public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException 918 921 { … … 920 923 { 921 924 _log.error ( "BUG! Received stream connection, but session is null!" ); 922 throw new NullPointerException ( "BUG! STREAM session is null!" ); 923 } 924 925 String msgString = "" ; 926 927 if ( msg != null ) msgString = " MESSAGE=\"" + msg + "\""; 928 925 return; 926 } 927 928 String msgString = createMessageString(msg); 929 929 if ( !writeString ( "STREAM STATUS RESULT=" 930 930 + result … … 936 936 } 937 937 } 938 939 /** 940 * Create a string to be appended to a status. 941 * 942 * @param msg may be null 943 * @return non-null, "" if msg is null, MESSAGE=msg or MESSAGE="msg a b c" 944 * with leading space if msg is non-null 945 * @since 0.9.20 946 */ 947 protected static String createMessageString(String msg) { 948 String rv; 949 if ( msg != null ) { 950 msg = msg.replace("\n", " "); 951 msg = msg.replace("\r", " "); 952 if (!msg.startsWith("\"")) { 953 msg = msg.replace("\"", ""); 954 if (msg.contains("\"") || msg.contains("\t")) 955 msg = '"' + msg + '"'; 956 } 957 rv = " MESSAGE=\"" + msg + "\""; 958 } else { 959 rv = ""; 960 } 961 return rv; 962 } 938 963 939 964 public void receiveStreamBytes(int id, ByteBuffer data) throws IOException { 940 965 if (getStreamSession() == null) { 941 966 _log.error("Received stream bytes, but session is null!"); 942 throw new NullPointerException("BUG! STREAM session is null!");967 return; 943 968 } 944 969 … … 957 982 } 958 983 984 /** @param msg may be null */ 959 985 public void notifyStreamDisconnection(int id, String result, String msg) throws IOException { 960 986 if (getStreamSession() == null) { 961 987 _log.error("BUG! Received stream disconnection, but session is null!"); 962 throw new NullPointerException("BUG! STREAM session is null!"); 963 } 964 965 // FIXME: msg should be escaped! 966 if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result 967 + (msg == null ? "" : (" MESSAGE=" + msg)) 968 + "\n")) { 988 return; 989 } 990 991 String msgString = createMessageString(msg); 992 if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result + msgString + '\n')) { 969 993 throw new IOException("Error notifying disconnection to SAM client"); 970 994 } … … 977 1001 if (getStreamSession() == null) { 978 1002 _log.error("BUG! Got stream receiving stop, but session is null!"); 979 throw new NullPointerException("BUG! STREAM session is null!");1003 return; 980 1004 } 981 1005 -
apps/sam/java/src/net/i2p/sam/SAMv2Handler.java
r7b711eb rd96ddd1 35 35 * @param verMinor SAM minor version to manage 36 36 */ 37 public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException 37 public SAMv2Handler(SocketChannel s, int verMajor, int verMinor, 38 SAMBridge parent) throws SAMException, IOException 38 39 { 39 this ( s, verMajor, verMinor, new Properties());40 this(s, verMajor, verMinor, new Properties(), parent); 40 41 } 41 42 … … 51 52 */ 52 53 53 public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException 54 public SAMv2Handler(SocketChannel s, int verMajor, int verMinor, 55 Properties i2cpProps, SAMBridge parent) throws SAMException, IOException 54 56 { 55 super ( s, verMajor, verMinor, i2cpProps);57 super(s, verMajor, verMinor, i2cpProps, parent); 56 58 } 57 59 -
apps/sam/java/src/net/i2p/sam/SAMv3Handler.java
r7b711eb rd96ddd1 49 49 private Session session; 50 50 public static final SessionsDB sSessionsHash = new SessionsDB(); 51 private boolean stolenSocket;52 private boolean streamForwardingSocket;51 private volatile boolean stolenSocket; 52 private volatile boolean streamForwardingSocket; 53 53 54 54 … … 68 68 * @param verMinor SAM minor version to manage 69 69 */ 70 public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException 71 { 72 this ( s, verMajor, verMinor, new Properties() ); 70 public SAMv3Handler(SocketChannel s, int verMajor, int verMinor, 71 SAMBridge parent) throws SAMException, IOException 72 { 73 this(s, verMajor, verMinor, new Properties(), parent); 73 74 } 74 75 … … 84 85 */ 85 86 86 public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException 87 { 88 super ( s, verMajor, verMinor, i2cpProps ); 87 public SAMv3Handler(SocketChannel s, int verMajor, int verMinor, 88 Properties i2cpProps, SAMBridge parent) throws SAMException, IOException 89 { 90 super(s, verMajor, verMinor, i2cpProps, parent); 89 91 if (_log.shouldLog(Log.DEBUG)) 90 92 _log.debug("SAM version 3 handler instantiated"); … … 215 217 } 216 218 219 /** 220 * The values in the SessionsDB 221 */ 217 222 public static class SessionRecord 218 223 { … … 267 272 } 268 273 274 /** 275 * basically a HashMap from String to SessionRecord 276 */ 269 277 public static class SessionsDB 270 278 { 271 279 private static final long serialVersionUID = 0x1; 272 280 273 static class ExistingIdException 281 static class ExistingIdException extends Exception { 274 282 private static final long serialVersionUID = 0x1; 275 283 } … … 285 293 } 286 294 295 /** @return success */ 287 296 synchronized public boolean put( String nick, SessionRecord session ) 288 297 throws ExistingIdException, ExistingDestException … … 306 315 } 307 316 317 /** @return true if removed */ 308 318 synchronized public boolean del( String nick ) 309 319 { 310 SessionRecord rec = map.get(nick); 311 312 if ( rec!=null ) { 313 map.remove(nick); 314 return true ; 315 } 316 else 317 return false ; 318 } 320 return map.remove(nick) != null; 321 } 322 319 323 synchronized public SessionRecord get(String nick) 320 324 { … … 333 337 } 334 338 339 /** 340 * For SAMv3StreamSession connect and accept 341 */ 335 342 public void stealSocket() 336 343 { 337 344 stolenSocket = true ; 338 345 this.stopHandling(); 346 } 347 348 /** 349 * For SAMv3StreamSession 350 * @since 0.9.20 351 */ 352 SAMBridge getBridge() { 353 return bridge; 339 354 } 340 355 … … 349 364 this.thread.setName("SAMv3Handler " + _id); 350 365 if (_log.shouldLog(Log.DEBUG)) 351 _log.debug("SAM handling started");366 _log.debug("SAMv3 handling started"); 352 367 353 368 try { … … 423 438 } catch (IOException e) { 424 439 if (_log.shouldLog(Log.DEBUG)) 425 _log.debug("Caught IOException (" 426 + e.getMessage() + ") for message [" + msg + "]", e); 440 _log.debug("Caught IOException for message [" + msg + "]", e); 427 441 } catch (Exception e) { 428 442 _log.error("Unexpected exception for message [" + msg + "]", e); … … 436 450 closeClientSocket(); 437 451 } catch (IOException e) { 438 _log.error("Error closing socket: " + e.getMessage()); 452 if (_log.shouldWarn()) 453 _log.warn("Error closing socket", e); 439 454 } 440 455 } … … 445 460 ((SAMv3StreamSession)streamSession).stopForwardingIncoming(); 446 461 } catch (SAMException e) { 447 _log.error("Error while stopping forwarding connections: " + e.getMessage()); 462 if (_log.shouldWarn()) 463 _log.warn("Error while stopping forwarding connections", e); 448 464 } catch (InterruptedIOException e) { 449 _log.error("Interrupted while stopping forwarding connections: " + e.getMessage()); 465 if (_log.shouldWarn()) 466 _log.warn("Interrupted while stopping forwarding connections", e); 450 467 } 451 468 } 452 469 } 453 454 455 456 470 die(); 457 471 } 458 472 } 459 473 460 protected void die() { 474 /** 475 * Stop the SAM handler, close the socket, 476 * unregister with the bridge. 477 * 478 * Overridden to not close the client socket if stolen. 479 * 480 * @since 0.9.20 481 */ 482 @Override 483 public void stopHandling() { 484 synchronized (stopLock) { 485 stopHandler = true; 486 } 487 if (!stolenSocket) { 488 try { 489 closeClientSocket(); 490 } catch (IOException e) {} 491 } 492 bridge.unregister(this); 493 } 494 495 private void die() { 461 496 SessionRecord rec = null ; 462 497 … … 814 849 815 850 816 public void notifyStreamResult(boolean verbose, String result, String message) throws IOException 817 { 851 public void notifyStreamResult(boolean verbose, String result, String message) throws IOException { 818 852 if (!verbose) return ; 819 820 String out = "STREAM STATUS RESULT="+result; 821 if (message!=null) 822 out = out + " MESSAGE=\"" + message + "\""; 823 out = out + '\n'; 853 String msgString = createMessageString(message); 854 String out = "STREAM STATUS RESULT=" + result + msgString + '\n'; 824 855 825 if ( !writeString ( out ) ) 826 { 827 throw new IOException ( "Error notifying connection to SAM client" ); 828 } 829 } 856 if (!writeString(out)) { 857 throw new IOException ( "Error notifying connection to SAM client" ); 858 } 859 } 830 860 831 861 public void notifyStreamIncomingConnection(Destination d) throws IOException { -
apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java
r7b711eb rd96ddd1 75 75 /** 76 76 * Connect the SAM STREAM session to the specified Destination 77 * for a single connection, using the socket stolen from the handler. 77 78 * 78 79 * @param handler The handler that communicates with the requesting client … … 88 89 */ 89 90 public void connect ( SAMv3Handler handler, String dest, Properties props ) 90 throws I2PException, ConnectException, NoRouteToHostException,91 throws I2PException, ConnectException, NoRouteToHostException, 91 92 DataFormatException, InterruptedIOException, IOException { 92 93 … … 118 119 WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); 119 120 120 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start(); 121 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start(); 122 121 SAMBridge bridge = handler.getBridge(); 122 (new Thread(rec.getThreadGroup(), 123 new Pipe(fromClient, toI2P, bridge), 124 "ConnectV3 SAMPipeClientToI2P")).start(); 125 (new Thread(rec.getThreadGroup(), 126 new Pipe(fromI2P, toClient, bridge), 127 "ConnectV3 SAMPipeI2PToClient")).start(); 123 128 } 124 129 125 130 /** 126 * Accept a n incoming STREAM131 * Accept a single incoming STREAM on the socket stolen from the handler. 127 132 * 128 133 * @param handler The handler that communicates with the requesting client … … 151 156 } 152 157 153 I2PSocket i2ps; 154 i2ps = this.socketServer.accept(); 158 I2PSocket i2ps = this.socketServer.accept(); 155 159 156 160 synchronized( this.socketServerLock ) … … 160 164 161 165 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); 162 163 164 165 166 166 167 if ( rec==null || i2ps==null ) throw new InterruptedIOException() ; 168 169 if (verbose) 170 handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ; 167 171 168 172 handler.stealSocket() ; … … 172 176 WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); 173 177 174 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start(); 175 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start(); 178 SAMBridge bridge = handler.getBridge(); 179 (new Thread(rec.getThreadGroup(), 180 new Pipe(fromClient, toI2P, bridge), 181 "AcceptV3 SAMPipeClientToI2P")).start(); 182 (new Thread(rec.getThreadGroup(), 183 new Pipe(fromI2P, toClient, bridge), 184 "AcceptV3 SAMPipeI2PToClient")).start(); 176 185 } 177 186 … … 211 220 212 221 SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose); 213 (new Thread(rec.getThreadGroup(), new I2PAppThread(forwarder, "SAMStreamForwarder"), "SAMStreamForwarder")).start();222 (new Thread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start(); 214 223 } 215 224 216 private static class SocketForwarder extends Thread225 private static class SocketForwarder implements Runnable 217 226 { 218 227 private final String host; … … 255 264 // build pipes between both sockets 256 265 try { 266 clientServerSock.socket().setKeepAlive(true); 257 267 if (this.verbose) 258 268 SAMv3Handler.notifyStreamIncomingConnection( … … 262 272 WritableByteChannel toClient = clientServerSock ; 263 273 WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); 264 (new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start(); 265 (new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start(); 274 (new I2PAppThread(new Pipe(fromClient, toI2P, null), 275 "ForwardV3 SAMPipeClientToI2P")).start(); 276 (new I2PAppThread(new Pipe(fromI2P,toClient, null), 277 "ForwardV3 SAMPipeI2PToClient")).start(); 266 278 267 279 } catch (IOException e) { … … 278 290 } 279 291 280 private static class Pipe extends Thread292 private static class Pipe implements Runnable, Handler 281 293 { 282 294 private final ReadableByteChannel in ; 283 295 private final WritableByteChannel out ; 284 296 private final ByteBuffer buf ; 285 286 public Pipe(ReadableByteChannel in, WritableByteChannel out, String name) 287 { 288 super(name); 297 private final SAMBridge bridge; 298 299 /** 300 * @param bridge may be null 301 */ 302 public Pipe(ReadableByteChannel in, WritableByteChannel out, SAMBridge bridge) 303 { 289 304 this.in = in ; 290 305 this.out = out ; 291 306 this.buf = ByteBuffer.allocate(BUFFER_SIZE) ; 292 } 293 294 public void run() 295 { 296 try { 297 while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) { 298 buf.flip(); 299 out.write(buf); 300 buf.compact(); 301 } 302 } 303 catch (IOException e) 304 { 305 this.interrupt(); 307 this.bridge = bridge; 308 } 309 310 public void run() { 311 if (bridge != null) 312 bridge.register(this); 313 try { 314 while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) { 315 buf.flip(); 316 out.write(buf); 317 buf.compact(); 306 318 } 307 try { 308 in.close(); 309 } 310 catch (IOException e) {} 311 try { 312 buf.flip(); 313 while (buf.hasRemaining()) 314 out.write(buf); 315 } 316 catch (IOException e) {} 317 try { 318 out.close(); 319 } 320 catch (IOException e) {} 321 } 319 } catch (IOException ioe) { 320 // ignore 321 } finally { 322 try { 323 in.close(); 324 } catch (IOException e) {} 325 try { 326 buf.flip(); 327 while (buf.hasRemaining()) { 328 out.write(buf); 329 } 330 } catch (IOException e) {} 331 try { 332 out.close(); 333 } catch (IOException e) {} 334 if (bridge != null) 335 bridge.unregister(this); 336 } 337 } 338 339 /** 340 * Handler interface 341 * @since 0.9.20 342 */ 343 public void stopHandling() { 344 try { 345 in.close(); 346 } catch (IOException e) {} 347 } 322 348 } 323 349 -
core/java/src/net/i2p/client/I2PSessionImpl.java
r7b711eb rd96ddd1 494 494 I2PSSLSocketFactory fact = new I2PSSLSocketFactory(_context, false, "certificates/i2cp"); 495 495 _socket = fact.createSocket(_hostname, _portNum); 496 _socket.setKeepAlive(true); 496 497 } catch (GeneralSecurityException gse) { 497 498 IOException ioe = new IOException("SSL Fail"); … … 501 502 } else { 502 503 _socket = new Socket(_hostname, _portNum); 504 _socket.setKeepAlive(true); 503 505 } 504 506 // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. -
core/java/src/net/i2p/client/I2PSimpleSession.java
r7b711eb rd96ddd1 90 90 _socket = new Socket(_hostname, _portNum); 91 91 } 92 _socket.setKeepAlive(true); 92 93 OutputStream out = _socket.getOutputStream(); 93 94 out.write(I2PClient.PROTOCOL_BYTE); -
router/java/src/net/i2p/router/client/ClientListenerRunner.java
r7b711eb rd96ddd1 94 94 if (_log.shouldLog(Log.DEBUG)) 95 95 _log.debug("Connection received"); 96 socket.setKeepAlive(true); 96 97 runConnection(socket); 97 98 } else {
Note: See TracChangeset
for help on using the changeset viewer.