上篇文章里面虽然结合hadoop用到mapreduce去计算属性的增益率,但是发现整个程序似乎也并没有做到并行化处理。后面又看了一些网上的资料,自己又想了想,然后又重新实现了一下决策树,大体思路如下:
1、将一个大数据集文件拆分成N个小数据集文件,对数据做好预处理工作,上传到HDFS
2、计算HDFS上小数据集文件的最佳分割属性与分割点
3、汇总N个小数据集文件的最佳划分,投票选出最佳划分
4、N个小数据集的节点根据最终的最佳划分,分割自己节点上的数据,上传到HDFS,跳转到第二步
下面是具体的实现代码:
public class DecisionTreeSprintBJob extends AbstractJob { private Map<String, Map<Object, Integer>> attributeValueStatistics = null; private Map<String, Set<String>> attributeNameToValues = null; private Set<String> allAttributes = null; /** 数据拆分,大数据文件拆分为小数据文件,便于分配到各个节点开启Job*/ private List<String> split(String input, String splitNum) { String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID(); String[] args = new String[]{input, output, splitNum}; DataFileSplitMR.main(args); List<String> inputs = new ArrayList<String>(); Path outputPath = new Path(output); try { FileSystem fs = outputPath.getFileSystem(conf); Path[] paths = HDFSUtils.getPathFiles(fs, outputPath); for(Path path : paths) { System.out.println("split input path: " + path); InputStream in = fs.open(path); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = reader.readLine(); while (null != line && !"".equals(line)) { inputs.add(line); line = reader.readLine(); } IOUtils.closeQuietly(in); IOUtils.closeQuietly(reader); } } catch (IOException e) { e.printStackTrace(); } System.out.println("inputs size: " + inputs.size()); return inputs; } /** 初始化工作,主要是获取特征属性集以及属性值的统计,主要是为了填充默认值*/ private void initialize(String input) { System.out.println("initialize start."); allAttributes = new HashSet<String>(); attributeNameToValues = new HashMap<String, Set<String>>(); attributeValueStatistics = new HashMap<String, Map<Object, Integer>>(); String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID(); String[] args = new String[]{input, output}; AttributeStatisticsMR.main(args); Path outputPath = new Path(output); SequenceFile.Reader reader = null; try { FileSystem fs = outputPath.getFileSystem(conf); Path[] paths = HDFSUtils.getPathFiles(fs, outputPath); for(Path path : paths) { reader = new SequenceFile.Reader(fs, path, conf); AttributeKVWritable key = (AttributeKVWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); IntWritable value = new IntWritable(); while (reader.next(key, value)) { String attributeName = key.getAttributeName(); allAttributes.add(attributeName); Set<String> values = attributeNameToValues.get(attributeName); if (null == values) { values = new HashSet<String>(); attributeNameToValues.put(attributeName, values); } String attributeValue = key.getAttributeValue(); values.add(attributeValue); Map<Object, Integer> valueStatistics = attributeValueStatistics.get(attributeName); if (null == valueStatistics) { valueStatistics = new HashMap<Object, Integer>(); attributeValueStatistics.put(attributeName, valueStatistics); } valueStatistics.put(attributeValue, value.get()); value = new IntWritable(); } } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } System.out.println("initialize end."); } /** 预处理,主要是将分割后的小文件填充好默认值后在上传到HDFS上面*/ private List<String> preHandle(List<String> inputs) throws IOException { List<String> fillInputs = new ArrayList<String>(); for (String input : inputs) { Data data =null; try { Path inputPath = new Path(input); FileSystem fs = inputPath.getFileSystem(conf); FSDataInputStream fsInputStream = fs.open(inputPath); data = DataLoader.load(fsInputStream, true); } catch (IOException e) { e.printStackTrace(); } DataHandler.computeFill(data.getInstances(), allAttributes.toArray(new String[0]), attributeValueStatistics, 1.0); OutputStream out = null; BufferedWriter writer = null; String outputDir = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID(); fillInputs.add(outputDir); String output = outputDir + File.separator + IdentityUtils.generateUUID(); try { Path outputPath = new Path(output); FileSystem fs = outputPath.getFileSystem(conf); out = fs.create(outputPath); writer = new BufferedWriter(new OutputStreamWriter(out)); StringBuilder sb = null; for (Instance instance : data.getInstances()) { sb = new StringBuilder(); sb.append(instance.getId()).append("\t"); sb.append(instance.getCategory()).append("\t"); Map<String, Object> attrs = instance.getAttributes(); for (Map.Entry<String, Object> entry : attrs.entrySet()) { sb.append(entry.getKey()).append(":"); sb.append(entry.getValue()).append("\t"); } writer.write(sb.toString()); writer.newLine(); } writer.flush(); } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(out); IOUtils.closeQuietly(writer); } } return fillInputs; } /** 创建JOB*/ private Job createJob(String jobName, String input, String output) { Configuration conf = new Configuration(); conf.set("mapred.job.queue.name", "q_hudong"); Job job = null; try { job = new Job(conf, jobName); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.setJarByClass(DecisionTreeSprintBJob.class); job.setMapperClass(CalculateGiniMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(AttributeWritable.class); job.setReducerClass(CalculateGiniReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(AttributeGiniWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); } catch (IOException e) { e.printStackTrace(); } return job; } /** 根据HDFS上的输出路径选择最佳属性*/ private AttributeGiniWritable chooseBestAttribute(String... outputs) { AttributeGiniWritable minSplitAttribute = null; double minSplitPointGini = 1.0; try { for (String output : outputs) { System.out.println("choose output: " + output); Path outputPath = new Path(output); FileSystem fs = outputPath.getFileSystem(conf); Path[] paths = HDFSUtils.getPathFiles(fs, outputPath); ShowUtils.print(paths); SequenceFile.Reader reader = null; for (Path path : paths) { reader = new SequenceFile.Reader(fs, path, conf); Text key = (Text) ReflectionUtils.newInstance( reader.getKeyClass(), conf); AttributeGiniWritable value = new AttributeGiniWritable(); while (reader.next(key, value)) { double gini = value.getGini(); System.out.println(value.getAttribute() + " : " + gini); if (gini <= minSplitPointGini) { minSplitPointGini = gini; minSplitAttribute = value; } value = new AttributeGiniWritable(); } IOUtils.closeQuietly(reader); } System.out.println("delete hdfs file start: " + outputPath.toString()); HDFSUtils.delete(conf, outputPath); System.out.println("delete hdfs file end: " + outputPath.toString()); } } catch (IOException e) { e.printStackTrace(); } if (null == minSplitAttribute) { System.out.println("minSplitAttribute is null"); } return minSplitAttribute; } private Data obtainData(String input) { Data data = null; Path inputPath = new Path(input); try { FileSystem fs = inputPath.getFileSystem(conf); Path[] hdfsPaths = HDFSUtils.getPathFiles(fs, inputPath); FSDataInputStream fsInputStream = fs.open(hdfsPaths[0]); data = DataLoader.load(fsInputStream, true); } catch (IOException e) { e.printStackTrace(); } return data; } /** 构建决策树*/ private Object build(List<String> inputs) throws IOException { List<String> outputs = new ArrayList<String>(); JobControl jobControl = new JobControl("CalculateGini"); for (String input : inputs) { System.out.println("split path: " + input); String output = HDFSUtils.HDFS_TEMP_OUTPUT_URL + IdentityUtils.generateUUID(); outputs.add(output); Configuration conf = new Configuration(); ControlledJob controlledJob = new ControlledJob(conf); controlledJob.setJob(createJob(input, input, output)); jobControl.addJob(controlledJob); } Thread jcThread = new Thread(jobControl); jcThread.start(); while(true){ if(jobControl.allFinished()){ // System.out.println(jobControl.getSuccessfulJobList()); jobControl.stop(); AttributeGiniWritable bestAttr = chooseBestAttribute( outputs.toArray(new String[0])); String attribute = bestAttr.getAttribute(); System.out.println("best attribute: " + attribute); System.out.println("isCategory: " + bestAttr.isCategory()); if (bestAttr.isCategory()) { return attribute; } TreeNode treeNode = new TreeNode(attribute); Map<String, List<String>> splitToInputs = new HashMap<String, List<String>>(); for (String input : inputs) { Data data = obtainData(input); String splitPoint = bestAttr.getSplitPoint(); // Map<String, Set<String>> attrName2Values = // DataHandler.attributeValueStatistics(data.getInstances()); Set<String> attributeValues = attributeNameToValues.get(attribute); System.out.println("attributeValues:"); ShowUtils.print(attributeValues); if (attributeNameToValues.size() == 0 || null == attributeValues) { continue; } attributeValues.remove(splitPoint); StringBuilder sb = new StringBuilder(); for (String attributeValue : attributeValues) { sb.append(attributeValue).append(","); } if (sb.length() > 0) sb.deleteCharAt(sb.length() - 1); String[] names = new String[]{splitPoint, sb.toString()}; DataSplit dataSplit = DataHandler.split(new Data( data.getInstances(), attribute, names)); for (DataSplitItem item : dataSplit.getItems()) { if (item.getInstances().size() == 0) continue; String path = item.getPath(); String name = path.substring(path.lastIndexOf(File.separator) + 1); String hdfsPath = HDFSUtils.HDFS_TEMP_INPUT_URL + name; HDFSUtils.copyFromLocalFile(conf, path, hdfsPath); String split = item.getSplitPoint(); List<String> nextInputs = splitToInputs.get(split); if (null == nextInputs) { nextInputs = new ArrayList<String>(); splitToInputs.put(split, nextInputs); } nextInputs.add(hdfsPath); } } for (Map.Entry<String, List<String>> entry : splitToInputs.entrySet()) { treeNode.setChild(entry.getKey(), build(entry.getValue())); } return treeNode; } if(jobControl.getFailedJobList().size() > 0){ // System.out.println(jobControl.getFailedJobList()); jobControl.stop(); } } } /** 分类样本集*/ private void classify(TreeNode treeNode, String testSet, String output) { OutputStream out = null; BufferedWriter writer = null; try { Path testSetPath = new Path(testSet); FileSystem testFS = testSetPath.getFileSystem(conf); Path[] testHdfsPaths = HDFSUtils.getPathFiles(testFS, testSetPath); FSDataInputStream fsInputStream = testFS.open(testHdfsPaths[0]); Data testData = DataLoader.load(fsInputStream, true); DataHandler.computeFill(testData.getInstances(), allAttributes.toArray(new String[0]), attributeValueStatistics, 1.0); Object[] results = (Object[]) treeNode.classifySprint(testData); ShowUtils.print(results); DataError dataError = new DataError(testData.getCategories(), results); dataError.report(); String path = FileUtils.obtainRandomTxtPath(); out = new FileOutputStream(new File(path)); writer = new BufferedWriter(new OutputStreamWriter(out)); StringBuilder sb = null; for (int i = 0, len = results.length; i < len; i++) { sb = new StringBuilder(); sb.append(i+1).append("\t").append(results[i]); writer.write(sb.toString()); writer.newLine(); } writer.flush(); Path outputPath = new Path(output); FileSystem fs = outputPath.getFileSystem(conf); if (!fs.exists(outputPath)) { fs.mkdirs(outputPath); } String name = path.substring(path.lastIndexOf(File.separator) + 1); HDFSUtils.copyFromLocalFile(conf, path, output + File.separator + name); } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(out); IOUtils.closeQuietly(writer); } } public void run(String[] args) { try { if (null == conf) conf = new Configuration(); String[] inputArgs = new GenericOptionsParser( conf, args).getRemainingArgs(); if (inputArgs.length != 4) { System.out.println("error, please input three path."); System.out.println("1. trainset path."); System.out.println("2. testset path."); System.out.println("3. result output path."); System.out.println("4. data split number."); System.exit(2); } List<String> splitInputs = split(inputArgs[0], inputArgs[3]); initialize(inputArgs[0]); List<String> inputs = preHandle(splitInputs); TreeNode treeNode = (TreeNode) build(inputs); TreeNodeHelper.print(treeNode, 0, null); classify(treeNode, inputArgs[1], inputArgs[2]); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { DecisionTreeSprintBJob job = new DecisionTreeSprintBJob(); long startTime = System.currentTimeMillis(); job.run(args); long endTime = System.currentTimeMillis(); System.out.println("spend time: " + (endTime - startTime)); } }
相关推荐
高效决策树算法是数据挖掘和机器学习领域中的一个重要工具,尤其在分类问题中表现出色。这一系列笔记将深入探讨如何构建高效、准确的决策树模型。决策树是一种以树状结构进行决策的模型,其中每个内部节点代表一个...
在数据挖掘领域,决策树算法是一种常用的分类方法,它通过一系列规则对数据进行分类或回归。C4.5决策树是决策树算法的一种改进形式,由Ross Quinlan开发,它继承了ID3决策树处理离散型属性的能力,并且还能够处理...
在数据挖掘中,常用的分类方法有KNN、决策树、朴素贝叶斯分类等。 KNN算法是指K-Nearest Neighbors算法,该算法通过计算测试样本与训练样本之间的距离来预测测试样本的类别。 决策树算法是指使用决策树来分类数据...
5. 数据挖掘技术:常见的数据挖掘技术包括决策树、贝叶斯网络、支持向量机、聚类算法如K-means和DBSCAN,以及关联规则算法如Apriori。这些技术各有优缺点,适用于不同的数据特性和问题场景。 6. 数据挖掘的应用领域...
分类算法如决策树(C4.5, ID3)、随机森林和神经网络,它们能根据已有数据构建模型,预测未知数据的类别。聚类算法如K-means、层次聚类和DBSCAN,则是无监督学习方法,用于发现数据的自然分组。关联规则学习,如...
决策树是一种常用的数据挖掘方法,尤其在机器学习领域中占据着重要的地位。它通过一系列基于数据属性的判断规则,将数据集分割成不同的类别或数值预测。Spark 是一个开源的大数据处理框架,它提供了MLlib库,其中...
在浙江大学的数据挖掘课程中,可能会涵盖这些基本概念,同时深入到更具体的算法和技术,如SVM(支持向量机)、决策树、神经网络、Apriori算法、K-means聚类等。此外,还可能涉及数据库管理系统、统计学基础、机器...
### Python版数据挖掘实验4报告:用决策树预测获胜球队 #### 实验名称与目的 本次实验名为“用决策树预测获胜球队”。其主要目的是利用机器学习中的决策树算法来预测篮球比赛中哪支球队可能获胜。这不仅是一次理论...
分类和预测任务中,支持向量机(SVM)、决策树、贝叶斯网络和神经网络是常用的模型。SVM通过构造最大分类间隔的超平面实现分类,对于非线性问题,它引入了核函数进行映射。贝叶斯网络则利用概率和条件概率来表示变量间...
《机器学习与数据挖掘学习笔记》是一份综合性的学习资料,涵盖了这两个领域的重要概念、算法和技术。这份笔记的目的是为了帮助读者深入理解机器学习和数据挖掘的基础知识,并提供实际操作的指导。 首先,我们来探讨...
数据挖掘十大算法详解,数据挖掘学习笔记--决策树C4.5 、数据挖掘十大算法--K-均值聚类算法 、机器学习与数据挖掘-支持向量机(SVM)、拉格朗日对偶、支持向量机(SVM)(三)-- 最优间隔分类器 (optimal margin ...
3. 学习和实践各种数据挖掘算法,如决策树、随机森林、支持向量机和神经网络等。 4. 了解如何在大数据环境中实现模型的训练和验证。 5. 提升问题解决能力,通过比赛代码学习如何解决实际问题并优化模型性能。 这个...
决策树算法是一种直观且易于理解的分类和回归方法。它通过学习一系列的决策规则,将特征空间划分为多个子空间,并且这些子空间对应于不同的类别标签。决策树的核心是选择最优的属性进行分割,这通常依据信息增益或...
数据仓库笔记的知识点涵盖了数据仓库和数据挖掘的基本概念、数据挖掘的主要任务与方法、学习算法以及搭建数据仓库的相关知识。下面将详细阐述这些知识点。 首先,数据仓库是为了企业决策支持而设计的系统,它主要...
"数据挖掘笔记"这部分内容可能是学习者对所学知识的整理,包括关键概念的总结、公式解析、算法实现步骤等,对于初学者来说,这是一份极具价值的参考资料,能帮助他们更好地理解和记忆复杂的知识点。 "习题"则提供了...
在数据挖掘中,十大经典算法包括关联规则挖掘、决策树、随机森林、K-均值聚类、支持向量机、K-近邻、神经网络、 Gradient Boosting、Naive Bayes、k- Means 等。这些算法可以用于解决不同的数据挖掘问题,如分类、...
数据挖掘中的分类技术 数据挖掘是一种常用的数据分析技术,旨在从大量数据中提取有价值的信息。数据挖掘技术可以分为多种类型,包括分类、预测、聚类、关联规则等。其中,分类是数据挖掘中的一种重要技术,旨在对...
1. **C4.5(决策树算法)**:一种基于ID3改进的决策树生成算法,适用于分类任务。 2. **K-Means**:一种无监督学习算法,主要用于聚类分析。 3. **Apriori**:一种关联规则学习算法,用于发现频繁项集和关联规则。 4...
分类算法如决策树、随机森林和支持向量机,用于将数据分成不同的类别。聚类方法如K-means和层次聚类则用于无监督学习,帮助发现数据的自然分组。关联规则学习如Apriori算法常用于市场篮子分析,找出商品之间的购买...
5. 决策树和基于规则的分类器:决策树通过一系列特征测试进行分类,而基于规则的分类器使用逻辑条件组合成规则。决策树的构建通常采用贪婪算法,可能无法找到全局最优解,但能获得近似最优的解决方案。 6. 其他分类...