extended client connection clean-up:

there are now two time-outs, one for the complete connection time, and one for an idle time
connections that are idle for more than 2 minutes are closed, and connections that are alive since more than one hour are also closed
if the complete number of connections exceeds 64, all connections more than 64 and have most idle time are also closed

During normal operation of peers these forced closings should never appear,
but the existence of the idle connection check ensures the availability of the peer and the usability of the host.


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4134 6c8d7289-2bf4-0310-a012-ef5d649a1542
This commit is contained in:
orbiter 2007-10-03 15:06:12 +00:00
parent b19bb6e5b1
commit 711641f167
4 changed files with 79 additions and 67 deletions

View File

@ -41,6 +41,7 @@
<tr class="TableHeader" valign="bottom">
<td>Protocol</td>
<td>Duration</td>
<td>Idle Time</td>
<td>Dest. IP[:Port]</td>
<td>Command</td>
<td>ID</td>
@ -49,6 +50,7 @@
<tr class="TableCell#(dark)#Light::Dark#(/dark)#">
<td>#[clientProtocol]#</td>
<td>#[clientLifetime]#</td>
<td>#[clientIdletime]#</td>
<td>#[clientTargetHost]#</td>
<td>#[clientCommand]#</td>
<td>#[clientID]#</td>

View File

@ -248,6 +248,7 @@ public final class Connections_p {
if (clientConnection != null) {
prop.put("clientList_" + c + "_clientProtocol", (clientConnection.ssl) ? "HTTPS" : "HTTP");
prop.put("clientList_" + c + "_clientLifetime", System.currentTimeMillis() - clientConnection.initTime);
prop.put("clientList_" + c + "_clientIdletime", System.currentTimeMillis() - clientConnection.lastIO);
prop.put("clientList_" + c + "_clientTargetHost", clientConnection.adressed_host + ":" + clientConnection.adressed_port);
prop.put("clientList_" + c + "_clientCommand", (clientConnection.command == null) ? "-" : clientConnection.command);
prop.put("clientList_" + c + "_clientID", clientConnection.hashCode());

View File

@ -45,7 +45,9 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PushbackInputStream;
import java.io.Writer;
import java.net.InetAddress;
@ -111,7 +113,8 @@ public final class httpc {
private static final SimpleDateFormat HTTPGMTFormatter = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US);
private static final HashMap reverseMappingCache = new HashMap();
private static final HashSet activeConnections = new HashSet(); // all connections are stored here and deleted when they are finished
private static final long minimumTime_before_activeConnections_cleanup = 600000;
private static final long minimumTime_before_activeConnections_cleanup = 3600000; // 1 Hour
private static final long minimumTime_before_idleConnections_cleanup = 120000; // 2 Minutes
private static final int activeConnections_maximum = 64;
public static final connectionTimeComparator connectionTimeComparatorInstance = new connectionTimeComparator();
@ -202,7 +205,7 @@ public final class httpc {
private boolean allowContentEncoding = true;
public boolean ssl;
public long initTime;
public long initTime, lastIO;
public String command;
public int timeout;
@ -243,6 +246,7 @@ public final class httpc {
this.ssl = ssl;
this.initTime = Long.MAX_VALUE;
this.lastIO = Long.MAX_VALUE;
this.command = null;
this.timeout = timeout;
@ -395,6 +399,7 @@ public final class httpc {
// trying to establish a connection to the address
this.initTime = System.currentTimeMillis();
this.lastIO = System.currentTimeMillis();
this.socket.setKeepAlive(false);
this.socket.connect(address, timeout);
// setting socket timeout and keep alive behaviour
@ -459,12 +464,19 @@ public final class httpc {
}
for (int i = tbd; i < a.length; i++) {
httpc clientConnection = a[i];
if ((clientConnection != null) &&
(clientConnection.initTime != Long.MAX_VALUE) &&
(clientConnection.initTime + Math.max(minimumTime_before_activeConnections_cleanup, clientConnection.timeout) < System.currentTimeMillis())) {
// the time-out limit is reached. close the connection
clientConnection.close();
c++;
if (clientConnection != null) {
if ((clientConnection.initTime != Long.MAX_VALUE) &&
(clientConnection.initTime + Math.max(minimumTime_before_activeConnections_cleanup, clientConnection.timeout) < System.currentTimeMillis())) {
// the time-out limit is reached. close the connection
clientConnection.close();
c++;
}
if ((clientConnection.lastIO != Long.MAX_VALUE) &&
(clientConnection.lastIO + Math.max(minimumTime_before_idleConnections_cleanup, clientConnection.timeout) < System.currentTimeMillis())) {
// the time-out limit is reached. close the connection
clientConnection.close();
c++;
}
}
}
return c;
@ -505,8 +517,8 @@ public final class httpc {
public int compare(Object o1, Object o2) {
httpc c1 = (httpc) o1;
httpc c2 = (httpc) o2;
long l1 = System.currentTimeMillis() - c1.initTime;
long l2 = System.currentTimeMillis() - c2.initTime;
long l1 = System.currentTimeMillis() - c1.lastIO;
long l2 = System.currentTimeMillis() - c2.lastIO;
if (l1 < l2) return 1;
if (l1 > l2) return -1;
return 0;
@ -1381,11 +1393,11 @@ public final class httpc {
}
if (procOS instanceof OutputStream) {
serverFileUtils.writeX(this.getContentInputStream(), (OutputStream) procOS, sbb);
writeX(this.getContentInputStream(), (OutputStream) procOS, sbb);
} else if (procOS instanceof Writer) {
String charSet = this.responseHeader.getCharacterEncoding();
if (charSet == null) charSet = httpHeader.DEFAULT_CHARSET;
serverFileUtils.writeX(this.getContentInputStream(), charSet, (Writer) procOS, sbb, charSet);
writeX(this.getContentInputStream(), charSet, (Writer) procOS, sbb, charSet);
} else {
throw new IllegalArgumentException("Invalid procOS object type '" + procOS.getClass().getName() + "'");
}
@ -1414,14 +1426,14 @@ public final class httpc {
try {
InputStream is = this.getContentInputStream();
if (procOS == null) {
serverFileUtils.writeX(is, null, bufferOS);
writeX(is, null, bufferOS);
} else if (procOS instanceof OutputStream) {
serverFileUtils.writeX(is, (OutputStream) procOS, bufferOS);
writeX(is, (OutputStream) procOS, bufferOS);
//writeContentX(httpc.this.clientInput, this.gzip, this.responseHeader.contentLength(), procOS, bufferOS);
} else if (procOS instanceof Writer) {
String charSet = this.responseHeader.getCharacterEncoding();
if (charSet == null) charSet = httpHeader.DEFAULT_CHARSET;
serverFileUtils.writeX(is, charSet, (Writer) procOS, bufferOS, charSet);
writeX(is, charSet, (Writer) procOS, bufferOS, charSet);
} else {
throw new IllegalArgumentException("Invalid procOS object type '" + procOS.getClass().getName() + "'");
}
@ -1436,6 +1448,55 @@ public final class httpc {
}
}
public void writeX(InputStream source, OutputStream procOS, OutputStream bufferOS) {
byte[] buffer = new byte[2048];
int l, c = 0;
while (true) try {
l = source.read(buffer, 0, buffer.length);
if (l <= 0) break;
lastIO = System.currentTimeMillis();
c += l;
if (procOS != null) procOS.write(buffer, 0, l);
if (bufferOS != null) bufferOS.write(buffer, 0, l);
} catch (IOException e) {
//System.out.println("*** DEBUG: writeX/IOStream terminated with IOException, processed " + c + " bytes.");
break;
}
// flush the streams
if (procOS != null) try { procOS.flush(); } catch (IOException e) {}
if (bufferOS != null) try { bufferOS.flush(); } catch (IOException e) {}
buffer = null;
}
public void writeX(InputStream source, String inputCharset, Writer procOS, OutputStream bufferOS, String outputCharset) {
try {
InputStreamReader sourceReader = new InputStreamReader(source, inputCharset);
OutputStreamWriter bufferOSWriter = (bufferOS == null) ? null : new OutputStreamWriter(bufferOS,outputCharset);
char[] buffer = new char[2048];
int l, c= 0;
while (true) try{
l = sourceReader.read(buffer, 0, buffer.length);
if (l <= 0) break;
lastIO = System.currentTimeMillis();
c += l;
if (procOS != null) procOS.write(buffer, 0, l);
if (bufferOSWriter != null) bufferOSWriter.write(buffer, 0, l);
} catch (IOException e) {
//System.out.println("*** DEBUG: writeX/ReaderWriter terminated with IOException, processed " + c + " bytes.");
break;
}
// flush the streams
if (procOS != null) try { procOS.flush(); } catch (IOException e) {}
if (bufferOSWriter != null) try { bufferOSWriter.flush(); } catch (IOException e) {}
buffer = null;
} catch (IOException e) {}
}
/**
* This method outputs a logline to the serverlog with the current
* status of this instance.

View File

@ -51,7 +51,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Reader;
import java.io.Writer;
@ -111,57 +110,6 @@ public final class serverFileUtils {
return total;
}
public static void writeX(InputStream source, OutputStream procOS, OutputStream bufferOS) {
byte[] buffer = new byte[2048];
int l, c = 0;
while (true) try {
l = source.read(buffer, 0, buffer.length);
if (l <= 0) break;
c += l;
if (procOS != null) procOS.write(buffer, 0, l);
if (bufferOS != null) bufferOS.write(buffer, 0, l);
} catch (IOException e) {
//System.out.println("*** DEBUG: writeX/IOStream terminated with IOException, processed " + c + " bytes.");
break;
}
// flush the streams
if (procOS != null) try { procOS.flush(); } catch (IOException e) {}
if (bufferOS != null) try { bufferOS.flush(); } catch (IOException e) {}
buffer = null;
}
public static void writeX(Reader source, Writer procOS, Writer bufferOS) {
char[] buffer = new char[2048];
int l, c= 0;
while (true) try{
l = source.read(buffer, 0, buffer.length);
if (l <= 0) break;
c += l;
if (procOS != null) procOS.write(buffer, 0, l);
if (bufferOS != null) bufferOS.write(buffer, 0, l);
} catch (IOException e) {
//System.out.println("*** DEBUG: writeX/ReaderWriter terminated with IOException, processed " + c + " bytes.");
break;
}
// flush the streams
if (procOS != null) try { procOS.flush(); } catch (IOException e) {}
if (bufferOS != null) try { bufferOS.flush(); } catch (IOException e) {}
buffer = null;
}
public static void writeX(InputStream source, String inputCharset, Writer procOS, OutputStream bufferOS, String outputCharset) {
try {
InputStreamReader sourceReader = new InputStreamReader(source, inputCharset);
OutputStreamWriter bufferOSWriter = (bufferOS == null) ? null : new OutputStreamWriter(bufferOS,outputCharset);
writeX(sourceReader, procOS, bufferOSWriter);
} catch (IOException e) {}
}
public static int copy (File source, String inputCharset, Writer dest) throws IOException {
InputStream fis = null;
try {