From 711641f1672c644e744a624f6c01af4481d2983c Mon Sep 17 00:00:00 2001 From: orbiter Date: Wed, 3 Oct 2007 15:06:12 +0000 Subject: [PATCH] extended client connection clean-up: there are now two time-outs, one for the complete connection time, and one for an idle time connections that are idle for more than 2 minutes are closed, and connections that are alive since more than one hour are also closed if the complete number of connections exceeds 64, all connections more than 64 and have most idle time are also closed During normal operation of peers these forced closings should never appear, but the existence of the idle connection check ensures the availability of the peer and the usability of the host. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4134 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/Connections_p.html | 2 + htroot/Connections_p.java | 1 + source/de/anomic/http/httpc.java | 91 ++++++++++++++++---- source/de/anomic/server/serverFileUtils.java | 52 ----------- 4 files changed, 79 insertions(+), 67 deletions(-) diff --git a/htroot/Connections_p.html b/htroot/Connections_p.html index e359ff28f..f5ec4f3a5 100644 --- a/htroot/Connections_p.html +++ b/htroot/Connections_p.html @@ -41,6 +41,7 @@ Protocol Duration + Idle Time Dest. IP[:Port] Command ID @@ -49,6 +50,7 @@ #[clientProtocol]# #[clientLifetime]# + #[clientIdletime]# #[clientTargetHost]# #[clientCommand]# #[clientID]# diff --git a/htroot/Connections_p.java b/htroot/Connections_p.java index 9d72245df..770ab01e2 100644 --- a/htroot/Connections_p.java +++ b/htroot/Connections_p.java @@ -248,6 +248,7 @@ public final class Connections_p { if (clientConnection != null) { prop.put("clientList_" + c + "_clientProtocol", (clientConnection.ssl) ? "HTTPS" : "HTTP"); prop.put("clientList_" + c + "_clientLifetime", System.currentTimeMillis() - clientConnection.initTime); + prop.put("clientList_" + c + "_clientIdletime", System.currentTimeMillis() - clientConnection.lastIO); prop.put("clientList_" + c + "_clientTargetHost", clientConnection.adressed_host + ":" + clientConnection.adressed_port); prop.put("clientList_" + c + "_clientCommand", (clientConnection.command == null) ? "-" : clientConnection.command); prop.put("clientList_" + c + "_clientID", clientConnection.hashCode()); diff --git a/source/de/anomic/http/httpc.java b/source/de/anomic/http/httpc.java index ddb72974e..9aac710c5 100644 --- a/source/de/anomic/http/httpc.java +++ b/source/de/anomic/http/httpc.java @@ -45,7 +45,9 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.io.PushbackInputStream; import java.io.Writer; import java.net.InetAddress; @@ -111,7 +113,8 @@ public final class httpc { private static final SimpleDateFormat HTTPGMTFormatter = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); private static final HashMap reverseMappingCache = new HashMap(); private static final HashSet activeConnections = new HashSet(); // all connections are stored here and deleted when they are finished - private static final long minimumTime_before_activeConnections_cleanup = 600000; + private static final long minimumTime_before_activeConnections_cleanup = 3600000; // 1 Hour + private static final long minimumTime_before_idleConnections_cleanup = 120000; // 2 Minutes private static final int activeConnections_maximum = 64; public static final connectionTimeComparator connectionTimeComparatorInstance = new connectionTimeComparator(); @@ -202,7 +205,7 @@ public final class httpc { private boolean allowContentEncoding = true; public boolean ssl; - public long initTime; + public long initTime, lastIO; public String command; public int timeout; @@ -243,6 +246,7 @@ public final class httpc { this.ssl = ssl; this.initTime = Long.MAX_VALUE; + this.lastIO = Long.MAX_VALUE; this.command = null; this.timeout = timeout; @@ -395,6 +399,7 @@ public final class httpc { // trying to establish a connection to the address this.initTime = System.currentTimeMillis(); + this.lastIO = System.currentTimeMillis(); this.socket.setKeepAlive(false); this.socket.connect(address, timeout); // setting socket timeout and keep alive behaviour @@ -459,12 +464,19 @@ public final class httpc { } for (int i = tbd; i < a.length; i++) { httpc clientConnection = a[i]; - if ((clientConnection != null) && - (clientConnection.initTime != Long.MAX_VALUE) && - (clientConnection.initTime + Math.max(minimumTime_before_activeConnections_cleanup, clientConnection.timeout) < System.currentTimeMillis())) { - // the time-out limit is reached. close the connection - clientConnection.close(); - c++; + if (clientConnection != null) { + if ((clientConnection.initTime != Long.MAX_VALUE) && + (clientConnection.initTime + Math.max(minimumTime_before_activeConnections_cleanup, clientConnection.timeout) < System.currentTimeMillis())) { + // the time-out limit is reached. close the connection + clientConnection.close(); + c++; + } + if ((clientConnection.lastIO != Long.MAX_VALUE) && + (clientConnection.lastIO + Math.max(minimumTime_before_idleConnections_cleanup, clientConnection.timeout) < System.currentTimeMillis())) { + // the time-out limit is reached. close the connection + clientConnection.close(); + c++; + } } } return c; @@ -505,8 +517,8 @@ public final class httpc { public int compare(Object o1, Object o2) { httpc c1 = (httpc) o1; httpc c2 = (httpc) o2; - long l1 = System.currentTimeMillis() - c1.initTime; - long l2 = System.currentTimeMillis() - c2.initTime; + long l1 = System.currentTimeMillis() - c1.lastIO; + long l2 = System.currentTimeMillis() - c2.lastIO; if (l1 < l2) return 1; if (l1 > l2) return -1; return 0; @@ -1381,11 +1393,11 @@ public final class httpc { } if (procOS instanceof OutputStream) { - serverFileUtils.writeX(this.getContentInputStream(), (OutputStream) procOS, sbb); + writeX(this.getContentInputStream(), (OutputStream) procOS, sbb); } else if (procOS instanceof Writer) { String charSet = this.responseHeader.getCharacterEncoding(); if (charSet == null) charSet = httpHeader.DEFAULT_CHARSET; - serverFileUtils.writeX(this.getContentInputStream(), charSet, (Writer) procOS, sbb, charSet); + writeX(this.getContentInputStream(), charSet, (Writer) procOS, sbb, charSet); } else { throw new IllegalArgumentException("Invalid procOS object type '" + procOS.getClass().getName() + "'"); } @@ -1414,14 +1426,14 @@ public final class httpc { try { InputStream is = this.getContentInputStream(); if (procOS == null) { - serverFileUtils.writeX(is, null, bufferOS); + writeX(is, null, bufferOS); } else if (procOS instanceof OutputStream) { - serverFileUtils.writeX(is, (OutputStream) procOS, bufferOS); + writeX(is, (OutputStream) procOS, bufferOS); //writeContentX(httpc.this.clientInput, this.gzip, this.responseHeader.contentLength(), procOS, bufferOS); } else if (procOS instanceof Writer) { String charSet = this.responseHeader.getCharacterEncoding(); if (charSet == null) charSet = httpHeader.DEFAULT_CHARSET; - serverFileUtils.writeX(is, charSet, (Writer) procOS, bufferOS, charSet); + writeX(is, charSet, (Writer) procOS, bufferOS, charSet); } else { throw new IllegalArgumentException("Invalid procOS object type '" + procOS.getClass().getName() + "'"); } @@ -1436,6 +1448,55 @@ public final class httpc { } } + + public void writeX(InputStream source, OutputStream procOS, OutputStream bufferOS) { + byte[] buffer = new byte[2048]; + int l, c = 0; + + while (true) try { + l = source.read(buffer, 0, buffer.length); + if (l <= 0) break; + lastIO = System.currentTimeMillis(); + c += l; + if (procOS != null) procOS.write(buffer, 0, l); + if (bufferOS != null) bufferOS.write(buffer, 0, l); + } catch (IOException e) { + //System.out.println("*** DEBUG: writeX/IOStream terminated with IOException, processed " + c + " bytes."); + break; + } + + // flush the streams + if (procOS != null) try { procOS.flush(); } catch (IOException e) {} + if (bufferOS != null) try { bufferOS.flush(); } catch (IOException e) {} + buffer = null; + } + + public void writeX(InputStream source, String inputCharset, Writer procOS, OutputStream bufferOS, String outputCharset) { + try { + InputStreamReader sourceReader = new InputStreamReader(source, inputCharset); + OutputStreamWriter bufferOSWriter = (bufferOS == null) ? null : new OutputStreamWriter(bufferOS,outputCharset); + char[] buffer = new char[2048]; + int l, c= 0; + + while (true) try{ + l = sourceReader.read(buffer, 0, buffer.length); + if (l <= 0) break; + lastIO = System.currentTimeMillis(); + c += l; + if (procOS != null) procOS.write(buffer, 0, l); + if (bufferOSWriter != null) bufferOSWriter.write(buffer, 0, l); + } catch (IOException e) { + //System.out.println("*** DEBUG: writeX/ReaderWriter terminated with IOException, processed " + c + " bytes."); + break; + } + + // flush the streams + if (procOS != null) try { procOS.flush(); } catch (IOException e) {} + if (bufferOSWriter != null) try { bufferOSWriter.flush(); } catch (IOException e) {} + buffer = null; + } catch (IOException e) {} + } + /** * This method outputs a logline to the serverlog with the current * status of this instance. diff --git a/source/de/anomic/server/serverFileUtils.java b/source/de/anomic/server/serverFileUtils.java index fb96c5190..8e5ed3763 100644 --- a/source/de/anomic/server/serverFileUtils.java +++ b/source/de/anomic/server/serverFileUtils.java @@ -51,7 +51,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.Reader; import java.io.Writer; @@ -111,57 +110,6 @@ public final class serverFileUtils { return total; } - public static void writeX(InputStream source, OutputStream procOS, OutputStream bufferOS) { - byte[] buffer = new byte[2048]; - int l, c = 0; - - while (true) try { - l = source.read(buffer, 0, buffer.length); - if (l <= 0) break; - c += l; - if (procOS != null) procOS.write(buffer, 0, l); - if (bufferOS != null) bufferOS.write(buffer, 0, l); - } catch (IOException e) { - //System.out.println("*** DEBUG: writeX/IOStream terminated with IOException, processed " + c + " bytes."); - break; - } - - // flush the streams - if (procOS != null) try { procOS.flush(); } catch (IOException e) {} - if (bufferOS != null) try { bufferOS.flush(); } catch (IOException e) {} - buffer = null; - } - - public static void writeX(Reader source, Writer procOS, Writer bufferOS) { - char[] buffer = new char[2048]; - int l, c= 0; - - while (true) try{ - l = source.read(buffer, 0, buffer.length); - if (l <= 0) break; - c += l; - if (procOS != null) procOS.write(buffer, 0, l); - if (bufferOS != null) bufferOS.write(buffer, 0, l); - } catch (IOException e) { - //System.out.println("*** DEBUG: writeX/ReaderWriter terminated with IOException, processed " + c + " bytes."); - break; - } - - // flush the streams - if (procOS != null) try { procOS.flush(); } catch (IOException e) {} - if (bufferOS != null) try { bufferOS.flush(); } catch (IOException e) {} - buffer = null; - } - - public static void writeX(InputStream source, String inputCharset, Writer procOS, OutputStream bufferOS, String outputCharset) { - try { - InputStreamReader sourceReader = new InputStreamReader(source, inputCharset); - OutputStreamWriter bufferOSWriter = (bufferOS == null) ? null : new OutputStreamWriter(bufferOS,outputCharset); - writeX(sourceReader, procOS, bufferOSWriter); - } catch (IOException e) {} - } - - public static int copy (File source, String inputCharset, Writer dest) throws IOException { InputStream fis = null; try {