added synchronizations and timeouts in solr api; missing

synchronizations in index modification methods causes deadlocks inside
solr.
This commit is contained in:
Michael Peter Christen 2013-06-12 02:13:18 +02:00
parent 3e1e358fdc
commit 1762911f57
8 changed files with 147 additions and 106 deletions

View File

@ -804,7 +804,7 @@ search.verify.delete = true
# remote search details
remotesearch.maxcount = 10
remotesearch.maxtime = 1000
remotesearch.maxtime = 3000
# specifies if yacy should set it's own referer if no referer URL
# was set by the client.

View File

@ -129,7 +129,7 @@ public class IndexFederated_p {
final boolean usesolr = sb.getConfigBool(SwitchboardConstants.FEDERATED_SERVICE_SOLR_INDEXING_ENABLED, false) & solrurls.length() > 0;
try {
if (usesolr) {
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null);
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null, 10000);
sb.index.fulltext().connectRemoteSolr(instances);
} else {
sb.index.fulltext().disconnectRemoteSolr();

View File

@ -103,6 +103,7 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
*/
@Override
public long getSize() {
if (this.server == null) return 0;
String threadname = Thread.currentThread().getName();
Thread.currentThread().setName("solr query: size");
EmbeddedSolrServer ess = (EmbeddedSolrServer) this.server;
@ -185,12 +186,12 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
@Override
public QueryResponse getResponseByParams(ModifiableSolrParams params) throws IOException {
if (this.server == null) throw new IOException("server disconnected");
// during the solr query we set the thread name to the query string to get more debugging info in thread dumps
String q = params.get("q");
String threadname = Thread.currentThread().getName();
if (q != null) Thread.currentThread().setName("solr query: q = " + q);
QueryResponse rsp;
try {
// during the solr query we set the thread name to the query string to get more debugging info in thread dumps
String q = params.get("q");
String threadname = Thread.currentThread().getName();
if (q != null) Thread.currentThread().setName("solr query: q = " + q);
QueryResponse rsp;
rsp = this.server.query(params);
if (q != null) Thread.currentThread().setName(threadname);
if (rsp != null) log.debug(rsp.getResults().getNumFound() + " results for q=" + q);

View File

@ -73,42 +73,43 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn
@Override
public QueryResponse getResponseByParams(ModifiableSolrParams params) throws IOException {
// during the solr query we set the thread name to the query string to get more debugging info in thread dumps
String q = params.get("q");
String threadname = Thread.currentThread().getName();
if (q != null) Thread.currentThread().setName("solr query: q = " + q);
QueryRequest request = new QueryRequest(params);
ResponseParser responseParser = new XMLResponseParser();
request.setResponseParser(responseParser);
long t = System.currentTimeMillis();
NamedList<Object> result = null;
// during the solr query we set the thread name to the query string to get more debugging info in thread dumps
String q = params.get("q");
String threadname = Thread.currentThread().getName();
if (q != null) Thread.currentThread().setName("solr query: q = " + q);
QueryRequest request = new QueryRequest(params);
ResponseParser responseParser = new XMLResponseParser();
request.setResponseParser(responseParser);
long t = System.currentTimeMillis();
NamedList<Object> result = null;
try {
result = this.server.request(request);
} catch (Throwable e) {
//Log.logException(e);
throw new IOException(e.getMessage());
/*
Log.logException(e);
server = instance.getServer(this.corename);
super.init(server);
try {
result = server.request(request);
} catch (Throwable e) {
throw new IOException(e.getMessage());
/*
Log.logException(e);
server = instance.getServer(this.corename);
super.init(server);
try {
result = server.request(request);
} catch (Throwable e1) {
throw new IOException(e1.getMessage());
}
*/
} catch (Throwable e1) {
throw new IOException(e1.getMessage());
}
QueryResponse response = new QueryResponse(result, server);
response.setElapsedTime(System.currentTimeMillis() - t);
*/
}
QueryResponse response = new QueryResponse(result, this.server);
response.setElapsedTime(System.currentTimeMillis() - t);
if (q != null) Thread.currentThread().setName(threadname);
return response;
if (q != null) Thread.currentThread().setName(threadname);
return response;
}
public static void main(final String args[]) {
RemoteSolrConnector solr;
try {
RemoteInstance instance = new RemoteInstance("http://127.0.0.1:8983/solr/", null, "collection1");
RemoteInstance instance = new RemoteInstance("http://127.0.0.1:8983/solr/", null, "collection1", 10000);
ArrayList<RemoteInstance> instances = new ArrayList<RemoteInstance>();
instances.add(instance);
solr = new RemoteSolrConnector(new ShardInstance(instances, ShardSelection.Method.MODULO_HOST_MD5), "solr");

View File

@ -30,6 +30,7 @@ import net.yacy.kelondro.logging.Log;
import net.yacy.search.schema.CollectionSchema;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.NumericTokenStream;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
@ -42,7 +43,12 @@ import org.apache.solr.common.SolrInputDocument;
public abstract class SolrServerConnector extends AbstractSolrConnector implements SolrConnector {
protected final static Logger log = Logger.getLogger(SolrServerConnector.class);
public final static NumericTokenStream classLoaderSynchro = new NumericTokenStream();
// pre-instantiate this object to prevent sun.misc.Launcher$AppClassLoader deadlocks
// this is a very nasty problem; solr instantiates objects dynamically which can cause deadlocks
static {
assert classLoaderSynchro != null;
}
protected SolrServer server;
protected SolrServerConnector() {
@ -58,13 +64,14 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
}
@Override
public synchronized void commit(final boolean softCommit) {
//if (this.server instanceof HttpSolrServer) ((HttpSolrServer) this.server).getHttpClient().getConnectionManager().closeExpiredConnections();
try {
this.server.commit(true, true, softCommit);
//if (this.server instanceof HttpSolrServer) ((HttpSolrServer) this.server).shutdown();
} catch (Throwable e) {
//Log.logException(e);
public void commit(final boolean softCommit) {
synchronized (this.server) {
try {
this.server.commit(true, true, softCommit);
//if (this.server instanceof HttpSolrServer) ((HttpSolrServer) this.server).shutdown();
} catch (Throwable e) {
//Log.logException(e);
}
}
}
@ -72,18 +79,22 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
* force an explicit merge of segments
* @param maxSegments the maximum number of segments. Set to 1 for maximum optimization
*/
public synchronized void optimize(int maxSegments) {
try {
this.server.optimize(true, true, maxSegments);
} catch (Throwable e) {
Log.logException(e);
public void optimize(int maxSegments) {
if (this.server == null) return;
synchronized (this.server) {
try {
this.server.optimize(true, true, maxSegments);
} catch (Throwable e) {
Log.logException(e);
}
}
}
@Override
public synchronized void close() {
public void close() {
if (this.server == null) return;
try {
if (this.server != null && this.server instanceof EmbeddedSolrServer) synchronized (this.server) {this.server.commit(true, true, false);}
if (this.server instanceof EmbeddedSolrServer) synchronized (this.server) {this.server.commit(true, true, false);}
this.server = null;
} catch (Throwable e) {
Log.logException(e);
@ -92,6 +103,7 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
@Override
public long getSize() {
if (this.server == null) return 0;
try {
final QueryResponse rsp = getResponseByParams(AbstractSolrConnector.catchSuccessQuery);
if (rsp == null) return 0;
@ -109,32 +121,41 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
* @throws IOException
*/
@Override
public synchronized void clear() throws IOException {
try {
this.server.deleteByQuery(AbstractSolrConnector.CATCHALL_TERM);
this.server.commit(true, true, false);
} catch (final Throwable e) {
throw new IOException(e);
public void clear() throws IOException {
if (this.server == null) return;
synchronized (this.server) {
try {
this.server.deleteByQuery(AbstractSolrConnector.CATCHALL_TERM);
this.server.commit(true, true, false);
} catch (final Throwable e) {
throw new IOException(e);
}
}
}
@Override
public void deleteById(final String id) throws IOException {
try {
this.server.deleteById(id, -1);
} catch (final Throwable e) {
throw new IOException(e);
if (this.server == null) return;
synchronized (this.server) {
try {
this.server.deleteById(id, -1);
} catch (final Throwable e) {
throw new IOException(e);
}
}
}
@Override
public void deleteByIds(final Collection<String> ids) throws IOException {
if (this.server == null) return;
List<String> l = new ArrayList<String>();
for (String s: ids) l.add(s);
try {
this.server.deleteById(l, -1);
} catch (final Throwable e) {
throw new IOException(e);
synchronized (this.server) {
try {
this.server.deleteById(l, -1);
} catch (final Throwable e) {
throw new IOException(e);
}
}
}
@ -145,10 +166,13 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
*/
@Override
public void deleteByQuery(final String querystring) throws IOException {
try {
this.server.deleteByQuery(querystring, -1);
} catch (final Throwable e) {
throw new IOException(e);
if (this.server == null) return;
synchronized (this.server) {
try {
this.server.deleteByQuery(querystring, -1);
} catch (final Throwable e) {
throw new IOException(e);
}
}
}
@ -170,22 +194,29 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
@Override
public void add(final SolrInputDocument solrdoc) throws IOException, SolrException {
if (this.server == null) return;
try {
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
this.server.add(solrdoc, -1);
} catch (Throwable e) {
// catches "version conflict for": try this again and delete the document in advance
try {
this.server.deleteById((String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
} catch (SolrServerException e1) {}
synchronized (this.server) {
try {
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
this.server.add(solrdoc, -1);
} catch (Throwable ee) {
} catch (Throwable e) {
Log.logException(e);
// catches "version conflict for": try this again and delete the document in advance
try {
this.server.deleteById((String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
} catch (SolrServerException e1) {
Log.logException(e1);
}
try {
this.server.commit();
this.server.add(solrdoc, -1);
} catch (Throwable eee) {
throw new IOException(eee);
} catch (Throwable ee) {
Log.logException(ee);
try {
this.server.commit();
this.server.add(solrdoc, -1);
} catch (Throwable eee) {
Log.logException(eee);
throw new IOException(eee);
}
}
}
}
@ -194,23 +225,29 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
@Override
public void add(final Collection<SolrInputDocument> solrdocs) throws IOException, SolrException {
if (this.server == null) return;
try {
for (SolrInputDocument solrdoc : solrdocs) {
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
}
this.server.add(solrdocs, -1);
} catch (Throwable e) {
// catches "version conflict for": try this again and delete the document in advance
List<String> ids = new ArrayList<String>();
for (SolrInputDocument solrdoc : solrdocs) ids.add((String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
try {
this.server.deleteById(ids);
} catch (SolrServerException e1) {}
synchronized (this.server) {
try {
for (SolrInputDocument solrdoc : solrdocs) {
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
}
this.server.add(solrdocs, -1);
} catch (Throwable ee) {
log.warn(e.getMessage() + " IDs=" + ids.toString());
throw new IOException(ee);
} catch (Throwable e) {
Log.logException(e);
// catches "version conflict for": try this again and delete the document in advance
List<String> ids = new ArrayList<String>();
for (SolrInputDocument solrdoc : solrdocs) ids.add((String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
try {
this.server.deleteById(ids);
} catch (SolrServerException e1) {
Log.logException(e1);
}
try {
this.server.add(solrdocs, -1);
} catch (Throwable ee) {
Log.logException(ee);
log.warn(e.getMessage() + " IDs=" + ids.toString());
throw new IOException(ee);
}
}
}
}

View File

@ -68,19 +68,21 @@ public class RemoteInstance implements SolrInstance {
private final HttpSolrServer defaultServer;
private final Collection<String> coreNames;
private final Map<String, HttpSolrServer> server;
private final int timeout;
public static ArrayList<RemoteInstance> getShardInstances(final String urlList, Collection<String> coreNames, String defaultCoreName) throws IOException {
public static ArrayList<RemoteInstance> getShardInstances(final String urlList, Collection<String> coreNames, String defaultCoreName, final int timeout) throws IOException {
urlList.replace(' ', ',');
String[] urls = urlList.split(",");
ArrayList<RemoteInstance> instances = new ArrayList<RemoteInstance>();
for (final String u: urls) {
RemoteInstance instance = new RemoteInstance(u, coreNames, defaultCoreName);
RemoteInstance instance = new RemoteInstance(u, coreNames, defaultCoreName, timeout);
instances.add(instance);
}
return instances;
}
public RemoteInstance(final String url, final Collection<String> coreNames, final String defaultCoreName) throws IOException {
public RemoteInstance(final String url, final Collection<String> coreNames, final String defaultCoreName, final int timeout) throws IOException {
this.timeout = timeout;
this.server= new HashMap<String, HttpSolrServer>();
this.solrurl = url == null ? "http://127.0.0.1:8983/solr/" : url; // that should work for the example configuration of solr 4.x.x
this.coreNames = coreNames == null ? new ArrayList<String>() : coreNames;
@ -148,8 +150,8 @@ public class RemoteInstance implements SolrInstance {
}
};
HttpParams params = this.client.getParams();
HttpConnectionParams.setConnectionTimeout(params, 10000);
HttpConnectionParams.setSoTimeout(params, 10000);
HttpConnectionParams.setConnectionTimeout(params, timeout);
HttpConnectionParams.setSoTimeout(params, timeout);
this.client.addRequestInterceptor(new HttpRequestInterceptor() {
@Override
public void process(final HttpRequest request, final HttpContext context) throws IOException {
@ -236,9 +238,9 @@ public class RemoteInstance implements SolrInstance {
s = new HttpSolrServer(this.solrurl + name);
}
s.setAllowCompression(true);
s.setConnectionTimeout(60000);
s.setConnectionTimeout(this.timeout);
s.setMaxRetries(1); // Solr-Doc: No more than 1 recommended (depreciated)
s.setSoTimeout(60000);
s.setSoTimeout(this.timeout);
this.server.put(name, s);
return s;
}

View File

@ -1058,7 +1058,7 @@ public final class Protocol {
} else {
try {
String address = target == event.peers.mySeed() ? "localhost:" + target.getPort() : target.getPublicAddress();
instance = new RemoteInstance("http://" + address, null, "solr"); // this is a 'patch configuration' which considers 'solr' as default collection
instance = new RemoteInstance("http://" + address, null, "solr", 3000); // this is a 'patch configuration' which considers 'solr' as default collection
solrConnector = new RemoteSolrConnector(instance, "solr");
rsp = solrConnector.getResponseByParams(solrQuery);
docList = rsp.getResults();
@ -1175,7 +1175,7 @@ public final class Protocol {
event.addNodes(container, facets, snippets, true, "localpeer", (int) docList.getNumFound());
event.addFinalize();
event.addExpectedRemoteReferences(-count);
Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references");
Network.log.logInfo("local search (solr): localpeer sent " + container.size() + "/" + docList.getNumFound() + " references");
} else {
for (SolrInputDocument doc: docs) {
event.query.getSegment().putDocumentInQueue(doc);
@ -1184,7 +1184,7 @@ public final class Protocol {
event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound());
event.addFinalize();
event.addExpectedRemoteReferences(-count);
Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.get(0).size()) + "/" + docList.size() + " references");
Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.size()) + "/" + docList.getNumFound() + " references");
}
final int dls = docList.size();
docList.clear();

View File

@ -493,7 +493,7 @@ public final class Switchboard extends serverSwitch {
if (usesolr && solrurls != null && solrurls.length() > 0) {
try {
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null);
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null, 10000);
this.index.fulltext().connectRemoteSolr(instances);
} catch ( final IOException e ) {
Log.logException(e);
@ -1329,7 +1329,7 @@ public final class Switchboard extends serverSwitch {
if (usesolr && solrurls != null && solrurls.length() > 0) {
try {
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null);
ArrayList<RemoteInstance> instances = RemoteInstance.getShardInstances(solrurls, null, null, 10000);
this.index.fulltext().connectRemoteSolr(instances);
} catch ( final IOException e ) {
Log.logException(e);