added option to configure the autocommit delay time of solr on-the-fly

This commit is contained in:
Michael Peter Christen 2012-06-25 14:59:46 +02:00
parent 5d9bd4ddc2
commit 8dd469b9dd
9 changed files with 106 additions and 9 deletions

View File

@ -1049,6 +1049,7 @@ color_searchurlhover = #008000
# - to check whats in solr after indexing, open http://localhost:8983/solr/admin/ # - to check whats in solr after indexing, open http://localhost:8983/solr/admin/
federated.service.solr.indexing.enabled = false federated.service.solr.indexing.enabled = false
federated.service.solr.indexing.url = http://127.0.0.1:8983/solr federated.service.solr.indexing.url = http://127.0.0.1:8983/solr
federated.service.solr.indexing.commitWithinMs = 180000
federated.service.solr.indexing.sharding = MODULO_HOST_MD5 federated.service.solr.indexing.sharding = MODULO_HOST_MD5
federated.service.solr.indexing.schemefile = solr.keys.default.list federated.service.solr.indexing.schemefile = solr.keys.default.list

View File

@ -61,6 +61,8 @@
#(/table)# #(/table)#
<dt class="TableCellDark">Solr URL(s)</dt> <dt class="TableCellDark">Solr URL(s)</dt>
<dd><textarea rows="5" cols="80" name="solr.indexing.url" id="solr.indexing.url"/>#[solr.indexing.url]#</textarea></dd> <dd><textarea rows="5" cols="80" name="solr.indexing.url" id="solr.indexing.url"/>#[solr.indexing.url]#</textarea></dd>
<dt class="TableCellDark">Commit-Within (milliseconds)</dt>
<dd><input type="text" size="6" maxlength="6" value="#[solr.indexing.commitWithinMs]#" name="solr.indexing.commitWithinMs" id="solr.indexing.commitWithinMs"/></dd>
<dt class="TableCellDark">Sharding Method</dt> <dt class="TableCellDark">Sharding Method</dt>
<dd><input type="text" size="50" maxlength="50" value="#[solr.indexing.sharding]#" name="solr.indexing.sharding" id="solr.indexing.sharding" disabled="disabled"/></dd> <dd><input type="text" size="50" maxlength="50" value="#[solr.indexing.sharding]#" name="solr.indexing.sharding" id="solr.indexing.sharding" disabled="disabled"/></dd>
<dt class="TableCellDark">Scheme</dt> <dt class="TableCellDark">Scheme</dt>

View File

@ -59,6 +59,7 @@ public class IndexFederated_p {
final boolean solrIsOnAfterwards = post.getBoolean("solr.indexing.solrremote", false); final boolean solrIsOnAfterwards = post.getBoolean("solr.indexing.solrremote", false);
env.setConfig("federated.service.solr.indexing.enabled", solrIsOnAfterwards); env.setConfig("federated.service.solr.indexing.enabled", solrIsOnAfterwards);
String solrurls = post.get("solr.indexing.url", env.getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr")); String solrurls = post.get("solr.indexing.url", env.getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr"));
int commitWithinMs = post.getInt("solr.indexing.commitWithinMs", env.getConfigInt("federated.service.solr.indexing.commitWithinMs", 180000));
final BufferedReader r = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(UTF8.getBytes(solrurls)))); final BufferedReader r = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(UTF8.getBytes(solrurls))));
final StringBuilder s = new StringBuilder(); final StringBuilder s = new StringBuilder();
String s0; String s0;
@ -76,6 +77,7 @@ public class IndexFederated_p {
} }
solrurls = s.toString().trim(); solrurls = s.toString().trim();
env.setConfig("federated.service.solr.indexing.url", solrurls); env.setConfig("federated.service.solr.indexing.url", solrurls);
env.setConfig("federated.service.solr.indexing.commitWithinMs", commitWithinMs);
env.setConfig("federated.service.solr.indexing.sharding", post.get("solr.indexing.sharding", env.getConfig("federated.service.solr.indexing.sharding", "modulo-host-md5"))); env.setConfig("federated.service.solr.indexing.sharding", post.get("solr.indexing.sharding", env.getConfig("federated.service.solr.indexing.sharding", "modulo-host-md5")));
final String schemename = post.get("solr.indexing.schemefile", env.getConfig("federated.service.solr.indexing.schemefile", "solr.keys.default.list")); final String schemename = post.get("solr.indexing.schemefile", env.getConfig("federated.service.solr.indexing.schemefile", "solr.keys.default.list"));
env.setConfig("federated.service.solr.indexing.schemefile", schemename); env.setConfig("federated.service.solr.indexing.schemefile", schemename);
@ -90,7 +92,13 @@ public class IndexFederated_p {
// switch on // switch on
final boolean usesolr = sb.getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0; final boolean usesolr = sb.getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0;
try { try {
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectRemoteSolr((usesolr) ? new ShardSolrConnector(solrurls, ShardSelection.Method.MODULO_HOST_MD5, 10000, true) : null); if (usesolr) {
SolrConnector solr = new ShardSolrConnector(solrurls, ShardSelection.Method.MODULO_HOST_MD5, 10000, true);
solr.setCommitWithinMs(commitWithinMs);
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectRemoteSolr(solr);
} else {
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectRemoteSolr(null);
}
} catch (final IOException e) { } catch (final IOException e) {
Log.logException(e); Log.logException(e);
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectRemoteSolr(null); sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectRemoteSolr(null);
@ -179,6 +187,7 @@ public class IndexFederated_p {
prop.put("yacy.indexing.engine.off.checked", env.getConfig("federated.service.yacy.indexing.engine", "classic").equals("off") ? 1 : 0); prop.put("yacy.indexing.engine.off.checked", env.getConfig("federated.service.yacy.indexing.engine", "classic").equals("off") ? 1 : 0);
prop.put("solr.indexing.solrremote.checked", env.getConfigBool("federated.service.solr.indexing.enabled", false) ? 1 : 0); prop.put("solr.indexing.solrremote.checked", env.getConfigBool("federated.service.solr.indexing.enabled", false) ? 1 : 0);
prop.put("solr.indexing.url", env.getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr").replace(",", "\n")); prop.put("solr.indexing.url", env.getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr").replace(",", "\n"));
prop.put("solr.indexing.commitWithinMs", env.getConfigInt("federated.service.solr.indexing.commitWithinMs", 180000));
prop.put("solr.indexing.sharding", env.getConfig("federated.service.solr.indexing.sharding", "modulo-host-md5")); prop.put("solr.indexing.sharding", env.getConfig("federated.service.solr.indexing.sharding", "modulo-host-md5"));
prop.put("solr.indexing.schemefile", schemename); prop.put("solr.indexing.schemefile", schemename);

View File

@ -41,9 +41,11 @@ import org.apache.solr.common.SolrInputDocument;
public class AbstractSolrConnector implements SolrConnector { public class AbstractSolrConnector implements SolrConnector {
protected SolrServer server; protected SolrServer server;
protected int commitWithinMs; // max time (in ms) before a commit will happen
protected AbstractSolrConnector() { protected AbstractSolrConnector() {
this.server = null; this.server = null;
this.commitWithinMs = 180000;
} }
protected void init(SolrServer server) { protected void init(SolrServer server) {
@ -54,6 +56,24 @@ public class AbstractSolrConnector implements SolrConnector {
return this.server; return this.server;
} }
/**
* get the solr autocommit delay
* @return the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public int getCommitWithinMs() {
return this.commitWithinMs;
}
/**
* set the solr autocommit delay
* @param c the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public void setCommitWithinMs(int c) {
this.commitWithinMs = c;
}
@Override @Override
public synchronized void close() { public synchronized void close() {
try { try {
@ -137,7 +157,7 @@ public class AbstractSolrConnector implements SolrConnector {
@Override @Override
public void add(final SolrDoc solrdoc) throws IOException, SolrException { public void add(final SolrDoc solrdoc) throws IOException, SolrException {
try { try {
this.server.add(solrdoc,180000); // commitWithIn 180s this.server.add(solrdoc, this.commitWithinMs);
//this.server.commit(); //this.server.commit();
} catch (SolrServerException e) { } catch (SolrServerException e) {
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdoc.toString()); Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdoc.toString());
@ -150,7 +170,7 @@ public class AbstractSolrConnector implements SolrConnector {
ArrayList<SolrInputDocument> l = new ArrayList<SolrInputDocument>(); ArrayList<SolrInputDocument> l = new ArrayList<SolrInputDocument>();
for (SolrDoc d: solrdocs) l.add(d); for (SolrDoc d: solrdocs) l.add(d);
try { try {
this.server.add(l,180000); // commitWithIn 120s this.server.add(l, this.commitWithinMs);
//this.server.commit(); //this.server.commit();
} catch (SolrServerException e) { } catch (SolrServerException e) {
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdocs.toString()); Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdocs.toString());

View File

@ -15,11 +15,13 @@ public class MultipleSolrConnector implements SolrConnector {
private final ArrayBlockingQueue<SolrDoc> queue; private final ArrayBlockingQueue<SolrDoc> queue;
private final AddWorker[] worker; private final AddWorker[] worker;
private final SolrConnector solr; private final SolrConnector solr;
private int commitWithinMs;
public MultipleSolrConnector(final String url, int connections) throws IOException { public MultipleSolrConnector(final String url, int connections) throws IOException {
this.solr = new SingleSolrConnector(url); this.solr = new SingleSolrConnector(url);
this.queue = new ArrayBlockingQueue<SolrDoc>(1000); this.queue = new ArrayBlockingQueue<SolrDoc>(1000);
this.worker = new AddWorker[connections]; this.worker = new AddWorker[connections];
this.commitWithinMs = 180000;
for (int i = 0; i < connections; i++) { for (int i = 0; i < connections; i++) {
this.worker[i] = new AddWorker(url); this.worker[i] = new AddWorker(url);
this.worker[i].start(); this.worker[i].start();
@ -30,6 +32,7 @@ public class MultipleSolrConnector implements SolrConnector {
private final SolrConnector solr; private final SolrConnector solr;
public AddWorker(final String url) throws IOException { public AddWorker(final String url) throws IOException {
this.solr = new SingleSolrConnector(url); this.solr = new SingleSolrConnector(url);
this.solr.setCommitWithinMs(MultipleSolrConnector.this.commitWithinMs);
} }
@Override @Override
public void run() { public void run() {
@ -51,6 +54,22 @@ public class MultipleSolrConnector implements SolrConnector {
} }
} }
@Override
public int getCommitWithinMs() {
return this.commitWithinMs;
}
/**
* set the solr autocommit delay
* @param c the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public void setCommitWithinMs(int c) {
this.commitWithinMs = c;
this.solr.setCommitWithinMs(c);
for (AddWorker w: this.worker) w.solr.setCommitWithinMs(c);
}
@Override @Override
public void close() { public void close() {
for (@SuppressWarnings("unused") AddWorker element : this.worker) { for (@SuppressWarnings("unused") AddWorker element : this.worker) {
@ -59,8 +78,8 @@ public class MultipleSolrConnector implements SolrConnector {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
this.solr.close();
} }
this.solr.close();
} }
@Override @Override

View File

@ -41,6 +41,20 @@ public class RetrySolrConnector implements SolrConnector {
this.retryMaxTime = retryMaxTime; this.retryMaxTime = retryMaxTime;
} }
@Override
public int getCommitWithinMs() {
return this.solrConnector.getCommitWithinMs();
}
/**
* set the solr autocommit delay
* @param c the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public void setCommitWithinMs(int c) {
this.solrConnector.setCommitWithinMs(c);
}
@Override @Override
public synchronized void close() { public synchronized void close() {
this.solrConnector.close(); this.solrConnector.close();

View File

@ -55,6 +55,20 @@ public class ShardSolrConnector implements SolrConnector {
this.sharding = new ShardSelection(method, this.urls.length); this.sharding = new ShardSelection(method, this.urls.length);
} }
@Override
public int getCommitWithinMs() {
return this.connectors.get(0).getCommitWithinMs();
}
/**
* set the solr autocommit delay
* @param c the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public void setCommitWithinMs(int c) {
for (final SolrConnector connector: this.connectors) connector.setCommitWithinMs(c);
}
@Override @Override
public synchronized void close() { public synchronized void close() {
for (final SolrConnector connector: this.connectors) connector.close(); for (final SolrConnector connector: this.connectors) connector.close();

View File

@ -33,6 +33,21 @@ import org.apache.solr.common.SolrException;
public interface SolrConnector { public interface SolrConnector {
/**
* get the solr autocommit delay
* @return the maximum waiting time after a solr command until it is transported to the server
*/
public int getCommitWithinMs();
/**
* set the solr autocommit delay
* @param c the maximum waiting time after a solr command until it is transported to the server
*/
public void setCommitWithinMs(int c);
/**
* close the server connection
*/
public void close(); public void close();
/** /**

View File

@ -95,6 +95,7 @@ import net.yacy.cora.protocol.http.HTTPClient;
import net.yacy.cora.protocol.http.ProxySettings; import net.yacy.cora.protocol.http.ProxySettings;
import net.yacy.cora.services.federated.solr.ShardSelection; import net.yacy.cora.services.federated.solr.ShardSelection;
import net.yacy.cora.services.federated.solr.ShardSolrConnector; import net.yacy.cora.services.federated.solr.ShardSolrConnector;
import net.yacy.cora.services.federated.solr.SolrConnector;
import net.yacy.cora.services.federated.solr.SolrDoc; import net.yacy.cora.services.federated.solr.SolrDoc;
import net.yacy.cora.services.federated.yacy.CacheStrategy; import net.yacy.cora.services.federated.yacy.CacheStrategy;
import net.yacy.document.Condenser; import net.yacy.document.Condenser;
@ -407,14 +408,16 @@ public final class Switchboard extends serverSwitch
// set up the solr interface // set up the solr interface
final String solrurls = getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr"); final String solrurls = getConfig("federated.service.solr.indexing.url", "http://127.0.0.1:8983/solr");
final boolean usesolr = getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0; final boolean usesolr = getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0;
int commitWithinMs = getConfigInt("federated.service.solr.indexing.commitWithinMs", 180000);
if (usesolr && solrurls != null && solrurls.length() > 0) { if (usesolr && solrurls != null && solrurls.length() > 0) {
try { try {
this.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectRemoteSolr( SolrConnector solr = new ShardSolrConnector(
new ShardSolrConnector( solrurls,
solrurls, ShardSelection.Method.MODULO_HOST_MD5,
ShardSelection.Method.MODULO_HOST_MD5, 10000, true);
10000, true)); solr.setCommitWithinMs(commitWithinMs);
this.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectRemoteSolr(solr);
} catch ( final IOException e ) { } catch ( final IOException e ) {
Log.logException(e); Log.logException(e);
} }