Merge origin/master

This commit is contained in:
reger 2013-12-27 06:53:14 +01:00
commit fbdd89e198
8 changed files with 58 additions and 24 deletions

View File

@ -1091,6 +1091,7 @@ federated.service.solr.indexing.sharding = MODULO_HOST_MD5
# the lazy attribute causes that fields containing "" or 0 are not added and not written
federated.service.solr.indexing.lazy = true
federated.service.solr.indexing.timeout = 60000
federated.service.solr.indexing.writeEnabled = true
# temporary definition of backend services to use.
# After the migration a rwi+solr combination is used, the solr contains the content of the previously used metadata-db.

View File

@ -133,11 +133,12 @@ public class IndexFederated_p {
// switch on
final boolean usesolr = sb.getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_ENABLED, false) & solrurls.length() > 0;
final int solrtimeout = sb.getConfigInt(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_TIMEOUT, 10000);
final boolean writeEnabled = sb.getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_WRITEENABLED, true);
try {
if (usesolr) {
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null, solrtimeout);
sb.index.fulltext().connectRemoteSolr(instances);
sb.index.fulltext().connectRemoteSolr(instances, writeEnabled);
} else {
sb.index.fulltext().disconnectRemoteSolr();
}
@ -182,7 +183,6 @@ public class IndexFederated_p {
prop.put("solr.indexing.solrremote.checked", env.getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_ENABLED, false) ? 1 : 0);
prop.put("solr.indexing.url", env.getConfig(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_URL, "http://127.0.0.1:8983/solr").replace(",", "\n"));
prop.put("solr.indexing.sharding", env.getConfig(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_SHARDING, "modulo-host-md5"));
prop.put("solr.indexing.lazy.checked", env.getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_LAZY, true) ? 1 : 0);
// return rewrite properties

View File

@ -121,7 +121,7 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn
RemoteInstance instance = new RemoteInstance("http://127.0.0.1:8983/solr/", null, "collection1", 10000);
ArrayList<RemoteInstance> instances = new ArrayList<RemoteInstance>();
instances.add(instance);
solr = new RemoteSolrConnector(new ShardInstance(instances, ShardSelection.Method.MODULO_HOST_MD5), true, "solr");
solr = new RemoteSolrConnector(new ShardInstance(instances, ShardSelection.Method.MODULO_HOST_MD5, true), true, "solr");
solr.clear();
final File exampleDir = new File("test/parsertest/");
long t, t0, a = 0;

View File

@ -45,13 +45,20 @@ import net.yacy.cora.federate.solr.connector.ShardSelection;
public class ServerShard extends SolrServer {
private static final long serialVersionUID = -3524175189049786312L;
private static final UpdateResponse _dummyOKResponse = new UpdateResponse();
static {
_dummyOKResponse.setElapsedTime(0);
_dummyOKResponse.setResponse(new NamedList<Object>());
}
ArrayList<SolrServer> server;
ShardSelection sharding;
private final ArrayList<SolrServer> server;
private final ShardSelection sharding;
private final boolean writeEnabled;
public ServerShard(ArrayList<SolrServer> server, final ShardSelection.Method method) {
public ServerShard(ArrayList<SolrServer> server, final ShardSelection.Method method, final boolean writeEnabled) {
this.server = server;
this.sharding = new ShardSelection(method, this.server.size());
this.writeEnabled = writeEnabled;
}
/**
@ -60,6 +67,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse add(Collection<SolrInputDocument> docs) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrInputDocument doc: docs) ur = server.get(this.sharding.select(doc)).add(doc);
return ur; // TODO: this accumlation of update responses is wrong, but sufficient (because we do not evaluate it)
@ -73,6 +81,7 @@ public class ServerShard extends SolrServer {
* @since solr 3.5
*/
public UpdateResponse add(Collection<SolrInputDocument> docs, int commitWithinMs) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrInputDocument doc: docs) ur = server.get(this.sharding.select(doc)).add(doc, commitWithinMs);
return ur;
@ -84,6 +93,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse addBeans(Collection<?> beans ) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.addBeans(beans);
return ur;
@ -97,6 +107,7 @@ public class ServerShard extends SolrServer {
* @since solr 3.5
*/
public UpdateResponse addBeans(Collection<?> beans, int commitWithinMs) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.addBeans(beans, commitWithinMs);
return ur;
@ -108,6 +119,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse add(SolrInputDocument doc) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
return server.get(this.sharding.select(doc)).add(doc);
}
@ -119,6 +131,7 @@ public class ServerShard extends SolrServer {
* @since solr 3.5
*/
public UpdateResponse add(SolrInputDocument doc, int commitWithinMs) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
return server.get(this.sharding.select(doc)).add(doc, commitWithinMs);
}
@ -128,6 +141,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse addBean(Object obj) throws IOException, SolrServerException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.addBean(obj);
return ur;
@ -141,6 +155,7 @@ public class ServerShard extends SolrServer {
* @since solr 3.5
*/
public UpdateResponse addBean(Object obj, int commitWithinMs) throws IOException, SolrServerException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.addBean(obj, commitWithinMs);
return ur;
@ -153,6 +168,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse commit() throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.commit();
return ur;
@ -167,6 +183,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse optimize() throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.optimize();
return ur;
@ -179,6 +196,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse commit(boolean waitFlush, boolean waitSearcher) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.commit(waitFlush, waitSearcher);
return ur;
@ -192,6 +210,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse commit(boolean waitFlush, boolean waitSearcher, boolean softCommit) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.commit(waitFlush, waitSearcher, softCommit);
return ur;
@ -206,6 +225,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse optimize(boolean waitFlush, boolean waitSearcher) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.optimize(waitFlush, waitSearcher);
return ur;
@ -221,6 +241,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse optimize(boolean waitFlush, boolean waitSearcher, int maxSegments) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.optimize(waitFlush, waitSearcher, maxSegments);
return ur;
@ -235,6 +256,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse rollback() throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.rollback();
return ur;
@ -246,6 +268,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse deleteById(String id) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.deleteById(id);
return ur;
@ -259,6 +282,7 @@ public class ServerShard extends SolrServer {
* @since 3.6
*/
public UpdateResponse deleteById(String id, int commitWithinMs) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.deleteById(id, commitWithinMs);
return ur;
@ -270,6 +294,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse deleteById(List<String> ids) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.deleteById(ids);
return ur;
@ -283,6 +308,7 @@ public class ServerShard extends SolrServer {
* @since 3.6
*/
public UpdateResponse deleteById(List<String> ids, int commitWithinMs) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.deleteById(ids, commitWithinMs);
return ur;
@ -294,6 +320,7 @@ public class ServerShard extends SolrServer {
* @throws IOException If there is a low-level I/O error.
*/
public UpdateResponse deleteByQuery(String query) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.deleteByQuery(query);
return ur;
@ -307,6 +334,7 @@ public class ServerShard extends SolrServer {
* @since 3.6
*/
public UpdateResponse deleteByQuery(String query, int commitWithinMs) throws SolrServerException, IOException {
if (!this.writeEnabled) return _dummyOKResponse;
UpdateResponse ur = null;
for (SolrServer s: server) ur = s.deleteByQuery(query, commitWithinMs);
return ur;

View File

@ -36,10 +36,12 @@ public class ShardInstance implements SolrInstance {
private final ShardSelection.Method method;
private SolrServer defaultServer;
private Map<String, SolrServer> serverCache;
private final boolean writeEnabled;
public ShardInstance(final ArrayList<RemoteInstance> instances, final ShardSelection.Method method) {
public ShardInstance(final ArrayList<RemoteInstance> instances, final ShardSelection.Method method, final boolean writeEnabled) {
this.instances = instances;
this.method = method;
this.writeEnabled = writeEnabled;
this.defaultServer = null;
this.serverCache = new ConcurrentHashMap<String, SolrServer>();
}
@ -59,7 +61,7 @@ public class ShardInstance implements SolrInstance {
if (this.defaultServer != null) return this.defaultServer;
ArrayList<SolrServer> server = new ArrayList<SolrServer>(instances.size());
for (int i = 0; i < instances.size(); i++) server.set(i, instances.get(i).getDefaultServer());
this.defaultServer = new ServerShard(server, method);
this.defaultServer = new ServerShard(server, method, this.writeEnabled);
return this.defaultServer;
}
@ -69,7 +71,7 @@ public class ShardInstance implements SolrInstance {
if (s != null) return s;
ArrayList<SolrServer> server = new ArrayList<SolrServer>(instances.size());
for (int i = 0; i < instances.size(); i++) server.add(i, instances.get(i).getServer(name));
s = new ServerShard(server, method);
s = new ServerShard(server, method, this.writeEnabled);
this.serverCache.put(name, s);
return s;
}

View File

@ -511,11 +511,12 @@ public final class Switchboard extends serverSwitch {
final String solrurls = getConfig(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_URL, "http://127.0.0.1:8983/solr");
final boolean usesolr = getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_ENABLED, false) & solrurls.length() > 0;
final int solrtimeout = getConfigInt(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_TIMEOUT, 60000);
final boolean writeEnabled = getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_WRITEENABLED, true);
if (usesolr && solrurls != null && solrurls.length() > 0) {
try {
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null, solrtimeout);
this.index.fulltext().connectRemoteSolr(instances);
this.index.fulltext().connectRemoteSolr(instances, writeEnabled);
} catch (final IOException e ) {
ConcurrentLog.logException(e);
}
@ -1334,11 +1335,12 @@ public final class Switchboard extends serverSwitch {
final String solrurls = getConfig(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_URL, "http://127.0.0.1:8983/solr");
final boolean usesolr = getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_ENABLED, false) & solrurls.length() > 0;
final int solrtimeout = getConfigInt(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_TIMEOUT, 60000);
final boolean writeEnabled = getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_WRITEENABLED, true);
if (usesolr && solrurls != null && solrurls.length() > 0) {
try {
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null, solrtimeout);
this.index.fulltext().connectRemoteSolr(instances);
this.index.fulltext().connectRemoteSolr(instances, writeEnabled);
} catch (final IOException e ) {
ConcurrentLog.logException(e);
}

View File

@ -308,19 +308,20 @@ public final class SwitchboardConstants {
public static final String REMOTESEARCH_MAXCOUNT_USER = "remotesearch.maxcount";
public static final String REMOTESEARCH_MAXTIME_USER = "remotesearch.maxtime";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_ENABLED = "federated.service.solr.indexing.enabled";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_URL = "federated.service.solr.indexing.url";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_SHARDING = "federated.service.solr.indexing.sharding";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_LAZY = "federated.service.solr.indexing.lazy";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_TIMEOUT = "federated.service.solr.indexing.timeout";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_ENABLED = "federated.service.solr.indexing.enabled";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_URL = "federated.service.solr.indexing.url";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_SHARDING = "federated.service.solr.indexing.sharding";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_LAZY = "federated.service.solr.indexing.lazy";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_TIMEOUT = "federated.service.solr.indexing.timeout";
public static final String FEDERATED_SERVICE_SOLR_INDEXING_WRITEENABLED = "federated.service.solr.indexing.writeEnabled";
//public static final String CORE_SERVICE_URLDB = "core.service.urldb.tmp";
//public static final String CORE_SERVICE_SOLR = "core.service.solr.tmp";
public static final String CORE_SERVICE_FULLTEXT = "core.service.fulltext";
public static final String CORE_SERVICE_RWI = "core.service.rwi.tmp";
public static final String CORE_SERVICE_CITATION = "core.service.citation.tmp";
public static final String CORE_SERVICE_WEBGRAPH = "core.service.webgraph.tmp";
public static final String CORE_SERVICE_JENA = "core.service.jena.tmp";
public static final String CORE_SERVICE_FULLTEXT = "core.service.fulltext";
public static final String CORE_SERVICE_RWI = "core.service.rwi.tmp";
public static final String CORE_SERVICE_CITATION = "core.service.citation.tmp";
public static final String CORE_SERVICE_WEBGRAPH = "core.service.webgraph.tmp";
public static final String CORE_SERVICE_JENA = "core.service.jena.tmp";
/**
* <p><code>public static final String <strong>CRAWLER_THREADS_ACTIVE_MAX</strong> = "crawler.MaxActiveThreads"</code></p>
@ -333,7 +334,7 @@ public final class SwitchboardConstants {
/**
* debug flags
*/
public static final String DEBUG_SEARCH_LOCAL_DHT_OFF = "debug.search.local.dht.off"; // =true: do not use the local dht/rwi index (which is not done if we do remote searches)
public static final String DEBUG_SEARCH_LOCAL_DHT_OFF = "debug.search.local.dht.off"; // =true: do not use the local dht/rwi index (which is not done if we do remote searches)
public static final String DEBUG_SEARCH_LOCAL_SOLR_OFF = "debug.search.local.solr.off"; // =true: do not use solr
public static final String DEBUG_SEARCH_REMOTE_DHT_OFF = "debug.search.remote.dht.off"; // =true: do not use dht/rwi
public static final String DEBUG_SEARCH_REMOTE_DHT_TESTLOCAL= "debug.search.remote.dht.testlocal"; // =true: do not use dht, search local peer in a shortcut to the own server

View File

@ -187,8 +187,8 @@ public final class Fulltext {
return this.solrInstances.isConnected1();
}
public void connectRemoteSolr(final ArrayList<RemoteInstance> instances) {
this.solrInstances.connect1(new ShardInstance(instances, ShardSelection.Method.MODULO_HOST_MD5));
public void connectRemoteSolr(final ArrayList<RemoteInstance> instances, final boolean writeEnabled) {
this.solrInstances.connect1(new ShardInstance(instances, ShardSelection.Method.MODULO_HOST_MD5, writeEnabled));
}
public void disconnectRemoteSolr() {