java多线程处理执行solr创建索引示例

时间:2021-05-19

复制代码 代码如下:
public class SolrIndexer implements Indexer, Searcher, DisposableBean {
//~ Static fields/initializers =============================================

static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);

private static final long SHUTDOWN_TIMEOUT = 5 * 60 * 1000L; // long enough

private static final int INPUT_QUEUE_LENGTH = 16384;

//~ Instance fields ========================================================

private CommonsHttpSolrServer server;

private BlockingQueue<Operation> inputQueue;

private Thread updateThread;
volatile boolean running = true;
volatile boolean shuttingDown = false;

//~ Constructors ===========================================================

public SolrIndexer(String url) throws MalformedURLException {
server = new CommonsHttpSolrServer(url);

inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);

updateThread = new Thread(new UpdateTask());
updateThread.setName("SolrIndexer");
updateThread.start();
}

//~ Methods ================================================================

public void setSoTimeout(int timeout) {
server.setSoTimeout(timeout);
}

public void setConnectionTimeout(int timeout) {
server.setConnectionTimeout(timeout);
}

public void setAllowCompression(boolean allowCompression) {
server.setAllowCompression(allowCompression);
}


public void addIndex(Indexable indexable) throws IndexingException {
if (shuttingDown) {
throw new IllegalStateException("SolrIndexer is shutting down");
}
inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
}

public void delIndex(Indexable indexable) throws IndexingException {
if (shuttingDown) {
throw new IllegalStateException("SolrIndexer is shutting down");
}
inputQueue.offer(new Operation(indexable, OperationType.DELETE));
}


private void updateIndices(String type, List<Indexable> indices) throws IndexingException {
if (indices == null || indices.size() == 0) {
return;
}

logger.debug("Updating {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);

for (Indexable idx : indices) {
Doc doc = idx.getDoc();

SolrInputDocument solrDoc = new SolrInputDocument();
solrDoc.setDocumentBoost(doc.getDocumentBoost());
for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
Field field = i.next();
solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
}

req.add(solrDoc);
}

try {
req.process(server);
} catch (SolrServerException e) {
logger.error("SolrServerException occurred", e);
throw new IndexingException(e);
} catch (IOException e) {
logger.error("IOException occurred", e);
throw new IndexingException(e);
}
}


private void delIndices(String type, List<Indexable> indices) throws IndexingException {
if (indices == null || indices.size() == 0) {
return;
}

logger.debug("Deleting {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");
req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
for (Indexable indexable : indices) {
req.deleteById(indexable.getDocId());
}

try {
req.process(server);
} catch (SolrServerException e) {
logger.error("SolrServerException occurred", e);
throw new IndexingException(e);
} catch (IOException e) {
logger.error("IOException occurred", e);
throw new IndexingException(e);
}
}


public QueryResult search(Query query) throws IndexingException {
SolrQuery sq = new SolrQuery();
sq.setQuery(query.getQuery());
if (query.getFilter() != null) {
sq.addFilterQuery(query.getFilter());
}
if (query.getOrderField() != null) {
sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
}
sq.setStart(query.getOffset());
sq.setRows(query.getLimit());

QueryRequest req = new QueryRequest(sq);
req.setPath("/" + query.getType() + "/select");

try {
QueryResponse rsp = req.process(server);
SolrDocumentList docs = rsp.getResults();

QueryResult result = new QueryResult();
result.setOffset(docs.getStart());
result.setTotal(docs.getNumFound());
result.setSize(sq.getRows());

List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
SolrDocument solrDocument = i.next();

Doc doc = new Doc();
for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
Map.Entry<String, Object> field = iter.next();
doc.addField(field.getKey(), field.getValue());
}

resultDocs.add(doc);
}

result.setDocs(resultDocs);
return result;

} catch (SolrServerException e) {
logger.error("SolrServerException occurred", e);
throw new IndexingException(e);
}
}

public void destroy() throws Exception {
shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}

public boolean shutdown(long timeout, TimeUnit unit) {
if (shuttingDown) {
logger.info("Suppressing duplicate attempt to shut down");
return false;
}
shuttingDown = true;
String baseName = updateThread.getName();
updateThread.setName(baseName + " - SHUTTING DOWN");
boolean rv = false;
try {
// Conditionally wait
if (timeout > 0) {
updateThread.setName(baseName + " - SHUTTING DOWN (waiting)");
rv = waitForQueue(timeout, unit);
}
} finally {
// But always begin the shutdown sequence
running = false;
updateThread.setName(baseName + " - SHUTTING DOWN (informed client)");
}
return rv;
}

/**
* @param timeout
* @param unit
* @return
*/
private boolean waitForQueue(long timeout, TimeUnit unit) {
CountDownLatch latch = new CountDownLatch(1);
inputQueue.add(new StopOperation(latch));
try {
return latch.await(timeout, unit);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for queues", e);
}
}



class UpdateTask implements Runnable {
public void run() {
while (running) {
try {
syncIndices();
} catch (Throwable e) {
if (shuttingDown) {
logger.warn("Exception occurred during shutdown", e);
} else {
logger.error("Problem handling solr indexing updating", e);
}
}
}
logger.info("Shut down SolrIndexer");
}
}

void syncIndices() throws InterruptedException {
Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);

if (op == null) {
return;
}

if (op instanceof StopOperation) {
((StopOperation) op).stop();
return;
}

// wait 1 second
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

}

List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);
ops.add(op);
inputQueue.drainTo(ops);

Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);
Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);

for (Operation o : ops) {
if (o instanceof StopOperation) {
((StopOperation) o).stop();
} else {
Indexable indexable = o.indexable;
if (o.type == OperationType.DELETE) {
List<Indexable> docs = deleteMap.get(indexable.getType());
if (docs == null) {
docs = new LinkedList<Indexable>();
deleteMap.put(indexable.getType(), docs);
}
docs.add(indexable);
} else {
List<Indexable> docs = updateMap.get(indexable.getType());
if (docs == null) {
docs = new LinkedList<Indexable>();
updateMap.put(indexable.getType(), docs);
}
docs.add(indexable);
}
}
}

for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> entry = i.next();
delIndices(entry.getKey(), entry.getValue());
}

for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, List<Indexable>> entry = i.next();
updateIndices(entry.getKey(), entry.getValue());
}
}

enum OperationType { DELETE, UPDATE, SHUTDOWN }

static class Operation {
OperationType type;
Indexable indexable;

Operation() {}

Operation(Indexable indexable, OperationType type) {
this.indexable = indexable;
this.type = type;
}
}

static class StopOperation extends Operation {
CountDownLatch latch;

StopOperation(CountDownLatch latch) {
this.latch = latch;
this.type = OperationType.SHUTDOWN;
}

public void stop() {
latch.countDown();
}
}

//~ Accessors ===============

}

声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。

相关文章