Changeset 7f1ace4


Ignore:
Timestamp:
Nov 26, 2010 12:44:00 AM (10 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
e1e6db2
Parents:
d37944e
Message:

in preparation for more improvements

Location:
apps/i2psnark/java/src/org/klomp/snark
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • apps/i2psnark/java/src/org/klomp/snark/Peer.java

    rd37944e r7f1ace4  
    2828import java.io.OutputStream;
    2929import java.util.Arrays;
     30import java.util.List;
    3031
    3132import net.i2p.client.streaming.I2PSocket;
     
    369370          PeerListener p = s.listener;
    370371          if (p != null) {
    371             p.savePeerPartial(s);
    372             p.markUnrequested(this);
     372            List<PartialPiece> pcs = s.returnPartialPieces();
     373            if (!pcs.isEmpty())
     374                p.savePartialPieces(this, pcs);
     375            // now covered by savePartialPieces
     376            //p.markUnrequested(this);
    373377          }
    374378        }
  • apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java

    rd37944e r7f1ace4  
    7575  private List<Piece> wantedPieces;
    7676
     77  /** partial pieces */
     78  private final List<PartialPiece> partialPieces;
     79
    7780  private boolean halted = false;
    7881
     
    9598
    9699    setWantedPieces();
     100    partialPieces = new ArrayList(getMaxConnections() + 1);
    97101
    98102    // Install a timer to check the uploaders.
     
    294298    }
    295299    // delete any saved orphan partial piece
    296     savedRequest = null;
     300    synchronized (partialPieces) {
     301        partialPieces.clear();
     302    }
    297303  }
    298304
     
    774780      }
    775781
     782    // just in case
     783    removePartialPiece(piece);
     784
    776785    // Announce to the world we have it!
    777786    // Disconnect from other seeders when we get the last piece
     
    867876  }
    868877
    869 
    870   /** Simple method to save a partial piece on peer disconnection
     878  /**
     879   *  Save partial pieces on peer disconnection
    871880   *  and hopefully restart it later.
    872    *  Only one partial piece is saved at a time.
    873    *  Replace it if a new one is bigger or the old one is too old.
     881   *  Replace a partial piece in the List if the new one is bigger.
    874882   *  Storage method is private so we can expand to save multiple partials
    875883   *  if we wish.
    876    */
    877   private Request savedRequest = null;
    878   private long savedRequestTime = 0;
    879   public void savePeerPartial(PeerState state)
    880   {
    881     if (halted)
    882       return;
    883     Request req = state.getPartialRequest();
    884     if (req == null)
    885       return;
    886     if (savedRequest == null ||
    887         req.off > savedRequest.off ||
    888         System.currentTimeMillis() > savedRequestTime + (15 * 60 * 1000)) {
    889       if (savedRequest == null || (req.piece != savedRequest.piece && req.off != savedRequest.off)) {
    890         if (_log.shouldLog(Log.DEBUG)) {
    891           _log.debug(" Saving orphaned partial piece " + req);
    892           if (savedRequest != null)
    893             _log.debug(" (Discarding previously saved orphan) " + savedRequest);
    894         }
    895       }
    896       savedRequest = req;
    897       savedRequestTime = System.currentTimeMillis();
    898     } else {
    899       if (req.piece != savedRequest.piece)
    900         if (_log.shouldLog(Log.DEBUG))
    901           _log.debug(" Discarding orphaned partial piece " + req);
    902     }
    903   }
    904 
    905   /** Return partial piece if it's still wanted and peer has it.
    906    */
    907   public Request getPeerPartial(BitField havePieces) {
    908     if (savedRequest == null)
     884   *
     885   *  Also mark the piece unrequested if this peer was the only one.
     886   *
     887   *  @param peer partials, must include the zero-offset (empty) ones too
     888   *  @since 0.8.2
     889   */
     890  public void savePartialPieces(Peer peer, List<PartialPiece> partials)
     891  {
     892      if (halted)
     893          return;
     894      if (_log.shouldLog(Log.INFO))
     895          _log.info("Partials received from " + peer + ": " + partials);
     896      synchronized(partialPieces) {
     897          for (PartialPiece pp : partials) {
     898              if (pp.getDownloaded() > 0) {
     899                  // PartialPiece.equals() only compares piece number, which is what we want
     900                  int idx = partialPieces.indexOf(pp);
     901                  if (idx < 0) {
     902                      partialPieces.add(pp);
     903                      if (_log.shouldLog(Log.INFO))
     904                          _log.info("Saving orphaned partial piece (new) " + pp);
     905                  } else if (idx >= 0 && pp.getDownloaded() > partialPieces.get(idx).getDownloaded()) {
     906                      // replace what's there now
     907                      partialPieces.set(idx, pp);
     908                      if (_log.shouldLog(Log.INFO))
     909                          _log.info("Saving orphaned partial piece (bigger) " + pp);
     910                  } else {
     911                      if (_log.shouldLog(Log.INFO))
     912                          _log.info("Discarding partial piece (not bigger)" + pp);
     913                  }
     914                  int max = getMaxConnections();
     915                  if (partialPieces.size() > max) {
     916                      // sorts by remaining bytes, least first
     917                      Collections.sort(partialPieces);
     918                      PartialPiece gone = partialPieces.remove(max);
     919                      if (_log.shouldLog(Log.INFO))
     920                          _log.info("Discarding orphaned partial piece (list full)" + gone);
     921                  }
     922              }  // else drop the empty partial piece
     923              // synchs on wantedPieces...
     924              markUnrequestedIfOnlyOne(peer, pp.getPiece());
     925          }
     926          if (_log.shouldLog(Log.INFO))
     927              _log.info("Partial list size now: " + partialPieces.size());
     928      }
     929  }
     930
     931  /**
     932   *  Return partial piece to the PeerState if it's still wanted and peer has it.
     933   *  @param havePieces pieces the peer has, the rv will be one of these
     934   *
     935   *  @return PartialPiece or null
     936   *  @since 0.8.2
     937   */
     938  public PartialPiece getPartialPiece(Peer peer, BitField havePieces) {
     939      // do it in this order to avoid deadlock (same order as in savePartialPieces())
     940      synchronized(partialPieces) {
     941          synchronized(wantedPieces) {
     942              // sorts by remaining bytes, least first
     943              Collections.sort(partialPieces);
     944              for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) {
     945                  PartialPiece pp = iter.next();
     946                  int savedPiece = pp.getPiece();
     947                  if (havePieces.get(savedPiece)) {
     948                     // this is just a double-check, it should be in there
     949                     for(Piece piece : wantedPieces) {
     950                         if (piece.getId() == savedPiece) {
     951                             piece.setRequested(true);
     952                             iter.remove();
     953                             if (_log.shouldLog(Log.INFO)) {
     954                                 _log.info("Restoring orphaned partial piece " + pp +
     955                                           " Partial list size now: " + partialPieces.size());
     956                             }
     957                             return pp;
     958                          }
     959                      }
     960                  }
     961              }
     962          }
     963      }
     964      // ...and this section turns this into the general move-requests-around code!
     965      // Temporary? So PeerState never calls wantPiece() directly for now...
     966      int piece = wantPiece(peer, havePieces);
     967      if (piece >= 0) {
     968          try {
     969              return new PartialPiece(piece, metainfo.getPieceLength(piece));
     970          } catch (OutOfMemoryError oom) {
     971              if (_log.shouldLog(Log.WARN))
     972                  _log.warn("OOM creating new partial piece");
     973          }
     974      }
     975      if (_log.shouldLog(Log.DEBUG))
     976          _log.debug("We have no partial piece to return");
    909977      return null;
    910     if (! havePieces.get(savedRequest.piece)) {
    911       if (_log.shouldLog(Log.DEBUG))
    912         _log.debug("Peer doesn't have orphaned piece " + savedRequest);
    913       return null;
    914     }
    915     synchronized(wantedPieces)
    916       {
    917         for(Iterator<Piece> iter = wantedPieces.iterator(); iter.hasNext(); ) {
    918           Piece piece = iter.next();
    919           if (piece.getId() == savedRequest.piece) {
    920             Request req = savedRequest;
    921             piece.setRequested(true);
    922             if (_log.shouldLog(Log.DEBUG))
    923               _log.debug("Restoring orphaned partial piece " + req);
    924             savedRequest = null;
    925             return req;
    926           }
    927         }
    928       }
    929     if (_log.shouldLog(Log.DEBUG))
    930       _log.debug("We no longer want orphaned piece " + savedRequest);
    931     savedRequest = null;
    932     return null;
     978  }
     979
     980  /**
     981   *  Remove saved state for this piece.
     982   *  Unless we are in the end game there shouldnt be anything in there.
     983   *  Do not call with wantedPieces lock held (deadlock)
     984   */
     985  private void removePartialPiece(int piece) {
     986      synchronized(partialPieces) {
     987          for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) {
     988              PartialPiece pp = iter.next();
     989              if (pp.getPiece() == piece) {
     990                  iter.remove();
     991                  // there should be only one but keep going to be sure
     992              }
     993          }
     994      }
    933995  }
    934996
     
    9481010          if (p.state == null)
    9491011            continue;
    950           int[] arr = p.state.getRequestedPieces();
    951           for (int i = 0; arr[i] >= 0; i++)
    952             if(arr[i] == piece) {
     1012          // FIXME don't go into the state
     1013          if (p.state.getRequestedPieces().contains(Integer.valueOf(piece))) {
    9531014              if (_log.shouldLog(Log.DEBUG))
    9541015                _log.debug("Another peer is requesting piece " + piece);
    9551016              return;
    956             }
     1017          }
    9571018        }
    9581019      }
     
    9741035  }
    9751036
    976   /** Mark a peer's requested pieces unrequested when it is disconnected
    977    ** Once for each piece
    978    ** This is enough trouble, maybe would be easier just to regenerate
    979    ** the requested list from scratch instead.
    980    */
    981   public void markUnrequested(Peer peer)
    982   {
    983     if (halted || peer.state == null)
    984       return;
    985     int[] arr = peer.state.getRequestedPieces();
    986     for (int i = 0; arr[i] >= 0; i++)
    987       markUnrequestedIfOnlyOne(peer, arr[i]);
    988   }
    989 
    9901037  /** Return number of allowed uploaders for this torrent.
    9911038   ** Check with Snark to see if we are over the total upload limit.
  • apps/i2psnark/java/src/org/klomp/snark/PeerListener.java

    rd37944e r7f1ace4  
    2121package org.klomp.snark;
    2222
     23import java.util.List;
     24
    2325/**
    2426 * Listener for Peer events.
    2527 */
    26 public interface PeerListener
     28interface PeerListener
    2729{
    2830  /**
     
    152154   * @param state the PeerState for the peer
    153155   */
    154   void savePeerPartial(PeerState state); /* FIXME Exporting non-public type through public API FIXME */
     156  void savePartialPieces(Peer peer, List<PartialPiece> pcs);
    155157
    156158  /**
     
    162164   * @return request (contains the partial data and valid length)
    163165   */
    164   Request getPeerPartial(BitField havePieces); /* FIXME Exporting non-public type through public API FIXME */
    165 
    166   /** Mark a peer's requested pieces unrequested when it is disconnected
    167    *  This prevents premature end game
    168    *
    169    * @param peer the peer that is disconnecting
    170    */
    171   void markUnrequested(Peer peer);
     166  PartialPiece getPartialPiece(Peer peer, BitField havePieces);
    172167}
  • apps/i2psnark/java/src/org/klomp/snark/PeerState.java

    rd37944e r7f1ace4  
    2424import java.io.InputStream;
    2525import java.util.ArrayList;
     26import java.util.HashSet;
    2627import java.util.Iterator;
    2728import java.util.List;
    2829import java.util.Map;
     30import java.util.Set;
    2931
    3032import net.i2p.I2PAppContext;
     
    3739{
    3840  private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerState.class);
    39   final Peer peer;
     41  private final Peer peer;
    4042  final PeerListener listener;
    41   final MetaInfo metainfo;
     43  private final MetaInfo metainfo;
    4244
    4345  // Interesting and choking describes whether we are interested in or
     
    5557  long uploaded;
    5658
     59  /** the pieces the peer has */
    5760  BitField bitfield;
    5861
     
    103106    if (interesting && !choked)
    104107      request(resend);
     108
     109    if (choked) {
     110        // TODO
     111        // savePartialPieces
     112        // clear request list
     113    }
    105114  }
    106115
     
    309318  }
    310319
     320  /**
     321   *  @return index in outstandingRequests or -1
     322   */
    311323  synchronized private int getFirstOutstandingRequest(int piece)
    312   {
     324   {
    313325    for (int i = 0; i < outstandingRequests.size(); i++)
    314326      if (outstandingRequests.get(i).piece == piece)
     
    398410  }
    399411
    400   // get longest partial piece
    401   synchronized Request getPartialRequest()
    402   {
    403     Request req = null;
    404     for (int i = 0; i < outstandingRequests.size(); i++) {
    405       Request r1 = outstandingRequests.get(i);
    406       int j = getFirstOutstandingRequest(r1.piece);
    407       if (j == -1)
    408         continue;
    409       Request r2 = outstandingRequests.get(j);
    410       if (r2.off > 0 && ((req == null) || (r2.off > req.off)))
    411         req = r2;
    412     }
    413     if (pendingRequest != null && req != null && pendingRequest.off < req.off) {
    414       if (pendingRequest.off != 0)
    415         req = pendingRequest;
    416       else
    417         req = null;
    418     }
    419     return req;
    420   }
    421 
    422   /**
    423    * return array of pieces terminated by -1
    424    * remove most duplicates
    425    * but still could be some duplicates, not guaranteed
    426    * TODO rework this Java-style to return a Set or a List
    427    */
    428   synchronized int[] getRequestedPieces()
    429   {
    430     int size = outstandingRequests.size();
    431     int[] arr = new int[size+2];
    432     int pc = -1;
    433     int pos = 0;
    434     if (pendingRequest != null) {
    435       pc = pendingRequest.piece;
    436       arr[pos++] = pc;
    437     }
    438     Request req = null;
    439     for (int i = 0; i < size; i++) {
    440       Request r1 = outstandingRequests.get(i);
    441       if (pc != r1.piece) {
    442         pc = r1.piece;
    443         arr[pos++] = pc;
    444       }
    445     }
    446     arr[pos] = -1;
    447     return(arr);
     412  /**
     413   *  @return lowest offset of any request for the piece
     414   *  @since 0.8.2
     415   */
     416  synchronized private Request getLowestOutstandingRequest(int piece) {
     417      Request rv = null;
     418      int lowest = Integer.MAX_VALUE;
     419      for (Request r :  outstandingRequests) {
     420          if (r.piece == piece && r.off < lowest) {
     421              lowest = r.off;
     422              rv = r;
     423          }
     424      }
     425      if (pendingRequest != null &&
     426          pendingRequest.piece == piece && pendingRequest.off < lowest)
     427          rv = pendingRequest;
     428
     429      if (_log.shouldLog(Log.DEBUG))
     430          _log.debug(peer + " lowest for " + piece + " is " + rv + " out of " + pendingRequest + " and " + outstandingRequests);
     431      return rv;
     432  }
     433
     434  /**
     435   *  get partial pieces, give them back to PeerCoordinator
     436   *  @return List of PartialPieces, even those with an offset == 0, or empty list
     437   *  @since 0.8.2
     438   */
     439  synchronized List<PartialPiece> returnPartialPieces()
     440  {
     441      Set<Integer> pcs = getRequestedPieces();
     442      List<PartialPiece> rv = new ArrayList(pcs.size());
     443      for (Integer p : pcs) {
     444          Request req = getLowestOutstandingRequest(p.intValue());
     445          if (req != null)
     446              rv.add(new PartialPiece(req));
     447      }
     448      return rv;
     449  }
     450
     451  /**
     452   * @return all pieces we are currently requesting, or empty Set
     453   */
     454  synchronized Set<Integer> getRequestedPieces() {
     455      Set<Integer> rv = new HashSet(outstandingRequests.size() + 1);
     456      for (Request req : outstandingRequests) {
     457          rv.add(Integer.valueOf(req.piece));
     458      if (pendingRequest != null)
     459          rv.add(Integer.valueOf(pendingRequest.piece));
     460      }
     461      return rv;
    448462  }
    449463
     
    556570        synchronized (this) {
    557571            out.sendRequests(outstandingRequests);
     572            if (_log.shouldLog(Log.DEBUG))
     573                _log.debug("Resending requests to " + peer + outstandingRequests);
    558574        }
    559575      }
     
    621637      {
    622638        // Check for adopting an orphaned partial piece
    623         Request r = listener.getPeerPartial(bitfield);
    624         if (r != null) {
    625               // Check that r not already in outstandingRequests
    626               int[] arr = getRequestedPieces();
    627               boolean found = false;
    628               for (int i = 0; arr[i] >= 0; i++) {
    629                 if (arr[i] == r.piece) {
    630                   found = true;
    631                   break;
    632                 }
    633               }
    634               if (!found) {
     639        PartialPiece pp = listener.getPartialPiece(peer, bitfield);
     640        if (pp != null) {
     641            // Double-check that r not already in outstandingRequests
     642            if (!getRequestedPieces().contains(Integer.valueOf(pp.getPiece()))) {
     643                Request r = pp.getRequest();
    635644                outstandingRequests.add(r);
    636645                if (!choked)
     
    638647                lastRequest = r;
    639648                return true;
    640               }
     649            }
    641650        }
    642651
  • apps/i2psnark/java/src/org/klomp/snark/Piece.java

    rd37944e r7f1ace4  
    66import net.i2p.util.ConcurrentHashSet;
    77
    8 public class Piece implements Comparable {
     8/**
     9 * This class is used solely by PeerCoordinator.
     10 */
     11class Piece implements Comparable {
    912
    1013    private int id;
  • apps/i2psnark/java/src/org/klomp/snark/Request.java

    rd37944e r7f1ace4  
    2323/**
    2424 * Holds all information needed for a partial piece request.
     25 * This class should be used only by PeerState, PeerConnectionIn, and PeerConnectionOut.
    2526 */
    2627class Request
Note: See TracChangeset for help on using the changeset viewer.