logo资料库

扩展Heritrix3指定内容提取.pdf

第1页 / 共10页
第2页 / 共10页
第3页 / 共10页
第4页 / 共10页
第5页 / 共10页
第6页 / 共10页
第7页 / 共10页
第8页 / 共10页
资料共10页,剩余部分请下载后查看
扩展 Heritrix 指定内容提取 一、系统基本情况 1) Heritrix3.1.0 2) java version "1.7.0_51" 3) ubuntu 13.10 二、扩展需求分析 网页抓取下来之后,我们需要提取里面具体的信息,比如题目,内容,时间,作者, 价格等等,提取信息的目的就是将非结构法的网页变成结构化的信息的重要手段。这是为将 来进行数据分析的前提。 然而 Heritrix 没有提供网页信息提取的模块,但是 Heritrix 非常容易扩展,下面就介绍怎 么在 Heritrix 里面添加内容提取模块。 三、扩增接口分析 在 Heritrix 的配置文件中有如下内容: 我们可以发现, FetchChain 整个过程有抓取网页组件(fetchDns,fetchHttp),然后有链接抽取组 件(extractorHttp,extractorHtml,extractorCss,extractorJs,extractorSwf) 应该在这个 FetchChain 中加入我们的内容提取者,而且加入的位置在网页抓取之后,我们看一下这些组件 有什么特点。 再来看配置文件中下面一段介绍,大致我们就清楚了 FetchChain 的作用: PROCESSING CHAINS Much of the crawler's work is specified by the sequential application of swappable Processor modules. These Processors are collected into three 'chains. The CandidateChain is applied to URIs being considered for inclusion, before a URI is enqueued for collection. The FetchChain is applied to URIs when their turn for collection comes up. The DispositionChain is applied
after a URI is fetched and analyzed/link-extracted. 大意是: PROCESSING CHAINS: 大部分爬虫的工作就是通过特定顺序的可以热插拔的处理模块来实现的。这些处理者被三个链集中在 一起。 CandidateChain 应用于在 URI 进入收集队列之前是否将该 URI 包含进来。 FetchChain 应用于抓取 URI 已经抓取之后的信息提取 DispositionChain 应用于 URI 已经抓取、分析和链接抽取之后 看来被 FetchChain 包含的组件也是 processor,我们就是要实现一个 processer。 四、扩增需要注意的地方 先来看一段在 org.archive.crawler.framework.CrawlController 中的代码: /** * Fetch chain */ protected FetchChain fetchChain; public FetchChain getFetchChain() { return this.fetchChain; } @Autowired public void setFetchChain(FetchChain fetchChain) { this.fetchChain = fetchChain; } /** * Disposition chain */ protected DispositionChain dispositionChain; public DispositionChain getDispositionChain() { return this.dispositionChain; } @Autowired public void setDispositionChain(DispositionChain dispositionChain) { this.dispositionChain = dispositionChain; } /** * Candidate chain */ protected CandidateChain candidateChain; public CandidateChain getCandidateChain() { return this.candidateChain; } @Autowired public void setCandidateChain(CandidateChain candidateChain) { this.candidateChain = candidateChain; } 很显然,爬虫控制中心 CrawlController 接受了配置文件中配置的三个处理链,当然也包括了 FetchChain,那么这些 processor 到底怎么工作?这主要就要看 ToeThread,下面重点分析 ToeThread。 每个爬虫线程 ToeThread 都有一个缓存中心,下面是初始化 ToeThread 代码: /** * Create a ToeThread *
* @param g ToeThreadGroup * @param sn serial number */ public ToeThread(ToePool g, int sn) { // TODO: add crawl name? super(g,"ToeThread #" + sn); coreName="ToeThread #" + sn + ": "; controller = g.getController(); serialNumber = sn; setPriority(DEFAULT_PRIORITY);//设置默认权限 //获取 job/scratch 文件夹里面的 tt{sn}http 文件 int outBufferSize = controller.getRecorderOutBufferBytes(); int inBufferSize = controller.getRecorderInBufferBytes(); //获取 job/scratch 文件夹里面的 tt{sn}http 文件的输入输出流 httpRecorder = new Recorder(controller.getScratchDir().getFile(), "tt" + sn + "http", outBufferSize, inBufferSize); lastFinishTime = System.currentTimeMillis(); } 下面看一下本地文件对应文件夹的内容: 我开了 25 五个线程,刚好与这 25 个线程对应。 下面重点看 processor 怎么工作?这主要体现在 ToeThread 里面的 run 方法中: /** (non-Javadoc) * @see java.lang.Thread#run() */ public void run() { String name = controller.getMetadata().getJobName(); logger.fine(getName()+" started for order '"+name+"'"); Recorder.setHttpRecorder(httpRecorder); try { while ( true ) { ArchiveUtils.continueCheck(); setStep(Step.ABOUT_TO_GET_URI, null); //获取下一条需要处理的 uri CrawlURI curi = controller.getFrontier().next(); synchronized(this) {//下面的步骤必须同步,也就是一个线程独占 ArchiveUtils.continueCheck(); setCurrentCuri(curi);//设置当前的处理的链接 currentCuri.setThreadNumber(this.serialNumber);//给处理当前链
接的记录一下序号 lastStartTime = System.currentTimeMillis();//记录最近开始时间 currentCuri.setRecorder(httpRecorder);//设置 currentCuri 的 Recorder,将来爬取下来的网址就放在这个里面 } try { KeyedProperties.loadOverridesFrom(curi); controller.getFetchChain().process(curi,this); controller.getFrontier().beginDisposition(curi); controller.getDispositionChain().process(curi,this); } catch (RuntimeExceptionWrapper e) { // Workaround to get cause from BDB if(e.getCause() == null) { e.initCause(e.getCause()); } recoverableProblem(e); } catch (AssertionError ae) { // This risks leaving crawl in fatally inconsistent state, // but is often reasonable for per-Processor assertion problems recoverableProblem(ae); } catch (RuntimeException e) { recoverableProblem(e); } catch (InterruptedException e) { if(currentCuri!=null) { recoverableProblem(e); Thread.interrupted(); // clear interrupt status } else { throw e; } } catch (StackOverflowError err) { recoverableProblem(err); } catch (Error err) { // OutOfMemory and any others seriousError(err); } finally { httpRecorder.endReplays(); KeyedProperties.clearOverridesFrom(curi); } setStep(Step.ABOUT_TO_RETURN_URI, null); ArchiveUtils.continueCheck(); synchronized(this) {//处理完毕 controller.getFrontier().finished(currentCuri); controller.getFrontier().endDisposition(); setCurrentCuri(null); } curi = null; setStep(Step.FINISHING_PROCESS, null); lastFinishTime = System.currentTimeMillis(); if(shouldRetire) { break; // from while(true) } } } catch (InterruptedException e) {
if(currentCuri!=null){ logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e); } // thread interrupted, ok to end logger.log(Level.FINE,this.getName()+ " ended with Interruption"); } catch (Exception e) { // everything else (including interruption) logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); } catch (OutOfMemoryError err) { seriousError(err); } finally { controller.getFrontier().endDisposition(); } setCurrentCuri(null); // Do cleanup so that objects can be GC. this.httpRecorder.closeRecorders(); this.httpRecorder = null; logger.fine(getName()+" finished for order '"+name+"'"); setStep(Step.FINISHED, null); controller = null; } 这里面大部分内容都是异常处理,步骤设置,抓取记录者设置等等,设置一些状态, 每一条 uri 处理必须经过的一些步骤。里面关键代码: controller.getFetchChain().process(curi,this); 这里面我们重点看一下 FetchChain 它是继承与 ProcessorChain public void process(CrawlURI curi, ChainStatusReceiver thread) throws 如何执行,下面是它的 process 方法: InterruptedException { assert KeyedProperties.overridesActiveFrom(curi); String skipToProc = null; ploop: for(Processor curProc : this ) {//循环所有的处理器 if(skipToProc!=null && !curProc.getBeanName().equals(skipToProc)) {//需要 跳到的处理器,并且现在不是那个处理器 continue; } else { skipToProc = null; //否则需要跳到的处理器已经在处理了,可能某些处理器继续处理 } if(thread!=null) { thread.atProcessor(curProc);//当前线程正则处理 } ArchiveUtils.continueCheck(); ProcessResult pr = curProc.process(curi);//处理结果 switch (pr.getProcessStatus()) { case PROCEED: continue; case FINISH: break ploop; case JUMP: skipToProc = pr.getJumpTarget(); continue; } } }
从上面的代码我们可以发现:我们的处理器链在处理过程中就是调用的处理器的 process 方法,而且通过返回的结果有下面三个: 1) PROCEED 下一个处理 2) FINISH 处理结束,后面的处理器都不处理了 3) JUMP 跳到下一个处理器(指定的) 从上面的分析,我们发现这个处理器设计的非常合理,我们可以扩展做许多事情, public ProcessResult process(CrawlURI uri) throws InterruptedException { if (!getEnabled()) { return ProcessResult.PROCEED; } //判断这个处理器是否激活,如果没有激活直接跳过这个处理器 if (getShouldProcessRule().decisionFor(uri) == DecideResult.REJECT) { innerRejectProcess(uri); return ProcessResult.PROCEED; } //如果处理规则决定这条 uri 应该拒绝,那么拒绝之后的一些操作之后直接跳过这个处理器 if (shouldProcess(uri)) {//是否应该处理这个 uri 吗? uriCount.incrementAndGet();//如果应该处理就记录一下 return innerProcessResult(uri);//返回内部处理的结果 } else { return ProcessResult.PROCEED;//如果不应该处理,直接跳过这个处理器 } } 下面是我的扩展类: 五、扩展具体操作 package com.wisdomdata.heritrix.modules; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import org.apache.commons.httpclient.URIException; import org.archive.modules.CrawlURI; import org.archive.modules.Processor; import org.archive.util.TextUtils; import com.wisdomdata.analyplatform.ContentExtractor; import com.wisdomdata.analyplatform.TemplateFinder; import com.wisdomdata.analyplatform.dao.ExtractResult; import com.wisdomdata.analyplatform.dao.ExtractRule; import com.wisdomdata.analyplatform.helper.IOHelper; import com.wisdomdata.analyplatform.helper.StringHelper; public class WantContentsExtractor extends Processor { private static final Logger LOGGER = Logger.getLogger(WantContentsExtractor.class.getName()); /** * TemplateFinder 模板文件查找者,根据 url 找到模板对应的规则 * */
return finder; this.finder = finder; TemplateFinder finder = null; /** * 有模板查找者找到的模板对应的规则 * */ public TemplateFinder getFinder() { } public void setFinder(TemplateFinder finder) { } /** * 设置内容抽取者 * */ ContentExtractor extractor = null; public ContentExtractor getExtractor() { } public void setExtractor(ContentExtractor extractor) { } this.extractor = extractor; return extractor; private List rules = null; /** * 该方法可以尝试去找到 uri 对应处理的模板,如果找不到处理模板,则返回 false * @author clebeg time:2014-08-01 * */ @Override protected boolean shouldProcess(CrawlURI uri) { try { } catch (URIException e) { e.printStackTrace(); rules = finder.findTemplate(uri.getUURI().getURI()); } if (rules != null && rules.size() > 0) return false; return true; } /** * 该方法通过找到的模板去提取想要的信息,然后调用数据库模块去存储信息 * @author clebeg time:2014-08-01 * */ @Override protected void innerProcess(CrawlURI uri) throws InterruptedException { StringHelper.findRightEncoding(uri);
List results = new ArrayList(); if (rules == null || rules.size() == 0) { LOGGER.warning("没有内容提取规则,不 xu 要提取,请检查配置信 return;//没有提取规则不需要提取 息是否有误!"); } for (ExtractRule rule : rules) { results.add( getExtractor().extract( rule, uri ) ); } //下面需要存储提取的结果信息 for (ExtractResult result : results) { if (result.isSuccessExtract()) { IOHelper.writeToLocalFile( uri.getURI() + "\t" + result.getRule().getAttrName() + "\t" + result.getResults(), "results.txt", "append" ); } } } } 我这个内容提取者,主要是根据模板寻找者找到相应的模板,再调用指定的内容提取 者提取指定的内容,最后返回提取的结果,保存在 results.txt 中,需要使用这个操作者, 需要在配置文件中 crawler-beans.cxml 配置一些内容: 作用是抽取用户想要的内容,必须通过模板发现者去发现模板, 然后通过内容 value="jobs/xinlang/templates/url_template.xml"/>
分享到:
收藏