- 特点:多线程,阻塞式导入
- 缺点:阻塞式,导入速度慢,线程状态无法精确记录,速度慢内存开销大
- 多线程
- 非阻塞式
- 内存开销恒定
- 线程可以自由增加
3) 尽可能少的占用数据库的打开游标数和CPU效率
4) 保证数据读和写的速度
- 它会自动阻塞大于Queue Size的写入动作
- 栈的机制,get一个队列中的item,相应的Queue中的item数就会减少一个
- 因为有栈的机制,因此我们可以使用Queue中的这个机制无需多写一个Daemon线程来监控我们的所有的items是不是全取完了然后结束线程,更有甚者我看到过许多程序员写一个While循环,循环直至所有的item取完哪怕有很大一部分是在“空转”也在所不惜。
- 读/处理完全相分离,读完后也一定处理完了
- public void run() {
- try {
- enumerate(super.fileName, super.colNames);
- } catch (Exception e) {
- logger.error("read txtFileName error, parse excel quit because :"
- + e.getMessage(), e);
- try {
- Thread.interrupted();
- } catch (Exception ee) {
- }
- } finally {
- try {
- queue.put(DUMMY);
- // BatchTaskQueue.getInstance().taskList.put(DUMMY);
- } catch (Exception ex) {
- }
- }
- }
- enumerate就是读,在这段代码下还有一个具体的enumerate的实现,它是顶部递归直到把一个文件内所有的ITEM全部queue.put到队列中去
- 为什么finally块中要有一个queue.put(DUMMY)哈,一般程序员看到这个语句或者碰到一个什么DUMMY的最头疼了,这是什么个玩意的哈?
- protected static Map DUMMY = new HashMap();
- while (!done) {
- Map data = (Map) queue.take();
- if (data == EnumerationEnginee.DUMMY) {
- //no data
- queue.put(data);
- done = true;
- } else {
- // if (data != null) {
- for (Iterator it = data.keySet().iterator(); it.hasNext();) {
- String key = String.valueOf(it.next());
- System.out.print("import:>>>[" + key + "] : ["+ data.get(key) + "]");
- }
- System.out.println("\n");
- }
- }
- public void run() {
- boolean done = false;
- try {
- synchronized (this) {
- while (!done) {
- Map data = (Map) queue.take();
- if (data == EnumerationEnginee.DUMMY) {
- //no data
- queue.put(data);
- done = true;
- } else {
- // if (data != null) {
- for (Iterator it = data.keySet().iterator(); it.hasNext();) {
- String key = String.valueOf(it.next());
- System.out.print("import:>>>[" + key + "] : ["+ data.get(key) + "]");
- }
- System.out.println("\n");
- }
- }
- }
- } catch (Exception e) {
- logger.error("import file into db error:" + e.getMessage(), e);
- try {
- Thread.interrupted();
- } catch (Exception ie) {
- }
- try {
- queue.put(EnumerationEnginee.DUMMY);
- done = true;
- } catch (Exception ex) {
- }
- } finally {
- threadSignal.countDown();
- }
- }
- 你 可以设一个size=100的Queue,然后把几十万数据往里扔,当扔到100个的时候它会自动帮你阻塞住,然后你可以起一堆的线程去扫这个Queue 里的item而且你扫一个(queue.take())一个,queue里实际的item就会自动减少一个,因此一个线程take后你不用担心另一个线程 去”重复take”。这样我们的读和handle就可以相分离。
- 在多线程扫queue里的item时你要告诉线程,已经到queue的底啦,没东西可取了,你可以停了,因此当所有的handle线程都碰到queue的“底”时,它们就都会自动停止了,因此我说了,基本上可以做到读完文件中的条数,所有的handle线程也正好处理完。
我们以实际场景出发一般在handle时都是写数据库或者是NOSQL,因此涉及到一个key, value的问题,因此在这边我们往queue里put的是一个Map。
DUMMY是一个“空”标准,可是你千万不能放一个NULL,因为一旦你放了NULL,在Queue.take, Queue.put时会直接出错,这将打乱整个线程的运行,因此你一定要New一个,如:
- <strong><span style="color:#cc0000;">Map DUMMY = new HashMap();</span></strong>
绝对不要Map DUMMP=null,那就完蛋了。D...D...D...D.E.A.D!
- 我们需要一个封装好的方法,传入一个文件,然后用多线程handle这个文件中的行数。
- 线程数,队列size可设
- 需要有一个计时的功能,即从处理开始到处理结束,这个过程一共耗时多少(不少人在多线程处理任务上的计时很头疼,在例子中一并解决该问题)
- 最后这个处理过程能够支持csv, txt, excel, 数据库...bla,bla,bla等多种格式的文件(由于篇幅有限我们在这边只实现 1)对于txt/csv和excel文件的处理 2)给出工厂方法可以便于大家自己去扩展这个FileParser。
- 处理大数据的excel文件 ,大家都知道我们无论是使用POI还是JXL都会遇上当EXCEL的行数超过65,535行时,你只要worksheet一下,整个JVM内存直接“爆掉”的经验,那么怎么去更高效更少内存的处理大数据量的EXCEL文件呢?如一个excel含有50万行数据时。。。你怎么处理?在此例子中一并给出解决方案。
- package batchpoc;
- import java.util.concurrent.BlockingQueue;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public abstract class BatchTask{
- protected final Logger logger = LoggerFactory.getLogger(this.getClass());
- public final static String TXT_IMP_EXP = "101";
- public final static String EXCEL_IMP_EXP = "102";
- public final static String TASK_RUNNING = "2";
- public final static String TASK_FINISHED = "4";
- public final static String TASK_FAILED = "5";
- protected BatchDTO taskContext = null;
- public BatchTask(BatchDTO taskContext) {
- this.taskContext = taskContext;
- }
- public abstract void doBatch() throws Exception;
- }
- package batchpoc;
- import java.util.Map;
- import java.util.concurrent.BlockingQueue;
- import util.Constants;
- public class EnumerationEngineeFactory {
- public static EnumerationEnginee getInstance(BlockingQueue<Map> queue,
- String type, String fileName, String colNames, boolean skipHeader,
- BatchDTO taskContext) {
- EnumerationEnginee task = null;
- if (type.equals(Constants.ENUMERATION_TXT_TASK)) {
- return new TxtEnumerationTask(queue, fileName, colNames,
- skipHeader, taskContext);
- } else if (type.equals(Constants.ENUMERATION_EXCEL_TASK)) {
- return new XLSEnumerationTask(queue, fileName, colNames,
- skipHeader, taskContext);
- }
- return task;
- }
- }
- package batchpoc;
- import java.io.File;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.CountDownLatch;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public abstract class EnumerationEnginee implements Runnable {
- protected String fileName = "";
- protected String colNames = "";
- protected final Logger logger = LoggerFactory.getLogger(this.getClass());
- protected boolean skipHeader = true;
- protected BatchDTO taskContext = null;
- protected static Map DUMMY = new HashMap();
- protected BlockingQueue<Map> queue = null;
- public EnumerationEnginee(BlockingQueue<Map> queue, String fileName,
- String colNames, boolean skipHeader, BatchDTO taskContext) {
- this.fileName = fileName;
- this.colNames = colNames;
- this.skipHeader = skipHeader;
- this.taskContext = taskContext;
- this.queue = queue;
- }
- public abstract void enumerate(String fileName, String strKeys)
- throws Exception;
- public abstract void run();
- }
- package batchpoc;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.CountDownLatch;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class ImportTask implements Runnable {
- private final Logger logger = LoggerFactory.getLogger(getClass());
- private BatchDTO taskContext = null;
- private CountDownLatch threadSignal = null;
- BlockingQueue<Map> queue = null;
- public ImportTask(BlockingQueue<Map> queue, BatchDTO taskContext,
- CountDownLatch threadSignal) {
- this.taskContext = taskContext;
- this.threadSignal = threadSignal;
- this.queue = queue;
- }
- public void run() {
- boolean done = false;
- try {
- synchronized (this) {
- while (!done) {
- Map data = (Map) queue.take();
- if (data == EnumerationEnginee.DUMMY) {
- //no data
- queue.put(data);
- done = true;
- } else {
- // if (data != null) {
- for (Iterator it = data.keySet().iterator(); it
- .hasNext();) {
- String key = String.valueOf(it.next());
- System.out.print("import:>>>[" + key + "] : ["
- + data.get(key) + "]");
- }
- System.out.println("\n");
- }
- }
- }
- } catch (Exception e) {
- logger.error("import file into db error:" + e.getMessage(), e);
- try {
- Thread.interrupted();
- } catch (Exception ie) {
- }
- try {
- queue.put(EnumerationEnginee.DUMMY);
- done = true;
- } catch (Exception ex) {
- }
- } finally {
- threadSignal.countDown();
- }
- }
- }
- package batchpoc;
- /*
- * Author: Mk
- * Created By: 2012-08-23
- */
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.LinkedHashMap;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- public class MapUtil {
- public static <K, V extends Comparable<? super V>> Map<K, V> sortByValue(
- Map<K, V> map) {
- List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(
- map.entrySet());
- Collections.sort(list, new Comparator<Map.Entry<K, V>>() {
- public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {
- return (String.valueOf(o1.getKey())).compareTo(String
- .valueOf(o2.getKey()));
- }
- });
- Map<K, V> result = new LinkedHashMap<K, V>();
- for (Map.Entry<K, V> entry : list) {
- result.put(entry.getKey(), entry.getValue());
- }
- return result;
- }
- }
- package batchpoc;
- import java.io.BufferedReader;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.InputStreamReader;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.BlockingQueue;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class TxtEnumerationTask extends EnumerationEnginee {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- public TxtEnumerationTask(BlockingQueue<Map> queue, String txtFileName,
- String colNames, boolean skipHeader, BatchDTO taskContext) {
- super(queue, txtFileName, colNames, taskContext.isHeadSkip(),
- taskContext);
- }
- @Override
- public void run() {
- try {
- enumerate(super.fileName, super.colNames);
- } catch (Exception e) {
- logger.error("read txtFileName error, parse excel quit because :"
- + e.getMessage(), e);
- try {
- Thread.interrupted();
- } catch (Exception ee) {
- }
- } finally {
- try {
- queue.put(DUMMY);
- } catch (Exception ex) {
- }
- }
- }
- public void enumerate(String txtFileName, String strKeys) throws Exception {
- FileInputStream is = null;
- StringBuilder sb = new StringBuilder();
- String a_line = "";
- String[] columnNames = null;
- String[] cellValues = null;
- Map dataRow = new HashMap();
- int i = 0;
- try {
- File f = new File(txtFileName);
- if (f.exists()) {
- is = new FileInputStream(new File(txtFileName));
- BufferedReader br = new BufferedReader(new InputStreamReader(
- is, "UTF-8"));
- if (skipHeader) {
- br.readLine();
- }
- while ((a_line = br.readLine()) != null) {
- if (a_line.trim().length() > 0) {
- String[] data = a_line.split(",");
- for (int index = 0; index < data.length; index++) {
- dataRow.put(String.valueOf(index), data[index]);
- }
- dataRow = MapUtil.sortByValue(dataRow);
- queue.put(dataRow);
- dataRow = new HashMap();
- i++;
- }
- }
- }
- } catch (Exception e) {
- throw new Exception("import was interrupted, error happened in "
- + i + " row", e);
- } finally {
- try {
- if (is != null) {
- is.close();
- is = null;
- }
- } catch (Exception e) {
- }
- }
- }
- }
- package batchpoc;
- import java.io.File;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.BlockingQueue;
- import org.apache.poi.openxml4j.opc.OPCPackage;
- import org.apache.poi.openxml4j.opc.PackageAccess;
- public class XLSEnumerationTask extends EnumerationEnginee {
- public XLSEnumerationTask(BlockingQueue<Map> queue, String txtFileName,
- String colNames, boolean skipHeader, BatchDTO taskContext) {
- super(queue, txtFileName, colNames, taskContext.isHeadSkip(),
- taskContext);
- }
- @Override
- public void enumerate(String fileName, String strKeys) throws Exception {
- File xlsxFile = new File(fileName);
- if (xlsxFile.exists()) {
- // The package open is instantaneous, as it should be.
- OPCPackage p = OPCPackage.open(xlsxFile.getPath(),
- PackageAccess.READ);
- Map dataMap = new HashMap();
- XLSXParser xlsxParser = new XLSXParser(p, queue, true);
- xlsxParser.process();
- }
- }
- @Override
- public void run() {
- try {
- enumerate(super.fileName, super.colNames);
- } catch (Exception e) {
- logger.error("read excel file error, parse excel quit because :"
- + e.getMessage(), e);
- try {
- Thread.interrupted();
- } catch (Exception ee) {
- }
- } finally {
- try {
- // queue.put(DUMMY);
- queue.put(DUMMY);
- } catch (Exception ex) {
- }
- }
- }
- }
- package batchpoc;
- import java.io.File;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.concurrent.BlockingQueue;
- import javax.xml.parsers.ParserConfigurationException;
- import javax.xml.parsers.SAXParser;
- import javax.xml.parsers.SAXParserFactory;
- import org.apache.poi.openxml4j.exceptions.OpenXML4JException;
- import org.apache.poi.openxml4j.opc.OPCPackage;
- import org.apache.poi.openxml4j.opc.PackageAccess;
- import org.apache.poi.ss.usermodel.BuiltinFormats;
- import org.apache.poi.ss.usermodel.DataFormatter;
- import org.apache.poi.xssf.eventusermodel.ReadOnlySharedStringsTable;
- import org.apache.poi.xssf.eventusermodel.XSSFReader;
- import org.apache.poi.xssf.model.StylesTable;
- import org.apache.poi.xssf.usermodel.XSSFCellStyle;
- import org.apache.poi.xssf.usermodel.XSSFRichTextString;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.xml.sax.Attributes;
- import org.xml.sax.ContentHandler;
- import org.xml.sax.InputSource;
- import org.xml.sax.SAXException;
- import org.xml.sax.XMLReader;
- import org.xml.sax.helpers.DefaultHandler;
- /**
- */
- public class XLSXParser {
- private final Logger logger = LoggerFactory.getLogger(getClass());
- /**
- * The type of the data value is indicated by an attribute on the cell. The
- * value is usually in a "v" element within the cell.
- */
- enum xssfDataType {
- }
- int countrows = 0;
- /**
- * Derived from http://poi.apache.org/spreadsheet/how-to.html#xssf_sax_api
- * <p/>
- * Also see Standard ECMA-376, 1st edition, part 4, pages 1928ff, at
- * http://www.ecma-international.org/publications/standards/Ecma-376.htm
- * <p/>
- * A web-friendly version is http://openiso.org/Ecma/376/Part4
- */
- class MyXSSFSheetHandler extends DefaultHandler {
- /**
- * Table with styles
- */
- private StylesTable stylesTable;
- private Map<String, String> dataMap = new HashMap<String, String>();
- /**
- * Table with unique strings
- */
- private ReadOnlySharedStringsTable sharedStringsTable;
- /**
- * Destination for data
- */
- // private final PrintStream output;
- /**
- * Number of columns to read starting with leftmost
- */
- // private final int minColumnCount;
- // Set when V start element is seen
- private boolean vIsOpen;
- // Set when cell start element is seen;
- // used when cell close element is seen.
- private xssfDataType nextDataType;
- // Used to format numeric cell values.
- private short formatIndex;
- private String formatString;
- private final DataFormatter formatter;
- private int thisRow = 0;
- private int thisColumn = -1;
- // The last column printed to the output stream
- private int lastColumnNumber = -1;
- // Gathers characters as they are seen.
- private StringBuffer value;
- /**
- * Accepts objects needed while parsing.
- *
- * @param styles
- * Table of styles
- * @param strings
- * Table of shared strings
- * @param cols
- * Minimum number of columns to show
- * @param target
- * Sink for output
- */
- public MyXSSFSheetHandler(StylesTable styles,
- ReadOnlySharedStringsTable strings, Map<String, String> dataMap) {
- this.stylesTable = styles;
- this.sharedStringsTable = strings;
- // this.minColumnCount = cols;
- this.value = new StringBuffer();
- this.nextDataType = xssfDataType.NUMBER;
- this.formatter = new DataFormatter();
- this.dataMap = dataMap;
- }
- /*
- * (non-Javadoc)
- *
- * @see
- * org.xml.sax.helpers.DefaultHandler#startElement(java.lang.String,
- * java.lang.String, java.lang.String, org.xml.sax.Attributes)
- */
- public void startElement(String uri, String localName, String name,
- Attributes attributes) throws SAXException {
- if ("inlineStr".equals(name) || "v".equals(name)) {
- vIsOpen = true;
- // Clear contents cache
- value.setLength(0);
- }
- // c => cell
- else if ("c".equals(name)) {
- // Get the cell reference
- String r = attributes.getValue("r");
- int firstDigit = -1;
- for (int c = 0; c < r.length(); ++c) {
- if (Character.isDigit(r.charAt(c))) {
- firstDigit = c;
- break;
- }
- }
- thisColumn = nameToColumn(r.substring(0, firstDigit));
- // Set up defaults.
- this.nextDataType = xssfDataType.NUMBER;
- this.formatIndex = -1;
- this.formatString = null;
- String cellType = attributes.getValue("t");
- String cellStyleStr = attributes.getValue("s");
- if ("b".equals(cellType))
- nextDataType = xssfDataType.BOOL;
- else if ("e".equals(cellType))
- nextDataType = xssfDataType.ERROR;
- else if ("inlineStr".equals(cellType))
- nextDataType = xssfDataType.INLINESTR;
- else if ("s".equals(cellType))
- nextDataType = xssfDataType.SSTINDEX;
- else if ("str".equals(cellType))
- nextDataType = xssfDataType.FORMULA;
- else if (cellStyleStr != null) {
- // It's a number, but almost certainly one
- // with a special style or format
- int styleIndex = Integer.parseInt(cellStyleStr);
- XSSFCellStyle style = stylesTable.getStyleAt(styleIndex);
- this.formatIndex = style.getDataFormat();
- this.formatString = style.getDataFormatString();
- if (this.formatString == null)
- this.formatString = BuiltinFormats
- .getBuiltinFormat(this.formatIndex);
- }
- }
- }
- /**
- * 取值
- *
- * @param str
- * @return
- */
- public String checkNumber(String str) {
- str = str.trim();
- String str2 = "";
- if (str != null && !"".equals(str)) {
- for (int i = 0; i < str.length(); i++) {
- if (str.charAt(i) >= 48 && str.charAt(i) <= 57) {
- str2 += str.charAt(i);
- }
- }
- }
- return str2.trim();
- }
- /*
- * (non-Javadoc)
- *
- * @see org.xml.sax.helpers.DefaultHandler#endElement(java.lang.String,
- * java.lang.String, java.lang.String)
- */
- public void endElement(String uri, String localName, String name)
- throws SAXException {
- String thisStr = null;
- // System.out.println("endElement----->" + name);
- // v => contents of a cell
- if ("v".equals(name)) {
- // Process the value contents as required.
- // Do now, as characters() may be called more than once
- switch (nextDataType) {
- case BOOL:
- char first = value.charAt(0);
- thisStr = first == '0' ? "FALSE" : "TRUE";
- break;
- case ERROR:
- thisStr = "\"ERROR:" + value.toString() + '"';
- break;
- case FORMULA:
- // A formula could result in a string value,
- // so always add double-quote characters.
- thisStr = '"' + value.toString() + '"';
- break;
- // TODO: have seen an example of this, so it's untested.
- XSSFRichTextString rtsi = new XSSFRichTextString(
- value.toString());
- if (rtsi != null) {
- thisStr = rtsi.toString().trim();
- thisStr = thisStr.substring(1, thisStr.length() - 1);
- }
- break;
- case SSTINDEX:
- String sstIndex = value.toString();
- try {
- int idx = Integer.parseInt(sstIndex);
- XSSFRichTextString rtss = new XSSFRichTextString(
- sharedStringsTable.getEntryAt(idx));
- if (rtss != null) {
- /*
- * thisStr = rtss.toString().trim()
- * .replaceAll("\\s*", "");
- */
- thisStr = checkNumber(rtss.toString().trim());
- /*
- * thisStr = thisStr .substring(1, thisStr.length()
- * - 1);
- */
- }
- } catch (NumberFormatException ex) {
- logger.error("Failed to parse SST index '" + sstIndex
- + "': " + ex.toString(), ex);
- }
- break;
- case NUMBER:
- String n = value.toString();
- if (this.formatString != null)
- thisStr = formatter.formatRawCellContents(
- Double.parseDouble(n), this.formatIndex,
- this.formatString);
- else
- thisStr = n;
- break;
- default:
- thisStr = "(TODO: Unexpected type: " + nextDataType + ")";
- break;
- }
- // Output after we've seen the string contents
- // Emit commas for any fields that were missing on this row
- if (lastColumnNumber == -1) {
- lastColumnNumber = 0;
- }
- // for (int i = lastColumnNumber; i < thisColumn; ++i) {
- // System.out.print(" col: " + i + " ");
- // }
- // Might be the empty string.
- // output.print(thisStr);
- // System.out.println(thisStr);
- // System.out.println("thisRow...." + thisRow);
- if (thisRow > 0 && thisStr != null
- && thisStr.trim().length() > 0) {
- // logger.info("dataMap.put()");
- dataMap.put(String.valueOf(thisColumn), thisStr);
- }
- // Update column
- if (thisColumn > -1)
- lastColumnNumber = thisColumn;
- } else if ("row".equals(name)) {
- try {
- if (dataMap.keySet().size() > 0) {
- dataMap = MapUtil.sortByValue(dataMap);
- if (toQueue) {
- queue.put(dataMap);
- }
- }
- } catch (Exception e) {
- logger.error(
- "put data into queue error: " + e.getMessage(), e);
- }
- thisRow++;
- dataMap = new HashMap<String, String>();
- lastColumnNumber = -1;
- }
- }
- /**
- * Captures characters only if a suitable element is open. Originally
- * was just "v"; extended for inlineStr also.
- */
- public void characters(char[] ch, int start, int length)
- throws SAXException {
- if (vIsOpen)
- value.append(ch, start, length);
- }
- /**
- * Converts an Excel column name like "C" to a zero-based index.
- *
- * @param name
- * @return Index corresponding to the specified name
- */
- private int nameToColumn(String name) {
- int column = -1;
- for (int i = 0; i < name.length(); ++i) {
- int c = name.charAt(i);
- column = (column + 1) * 26 + c - 'A';
- }
- return column;
- }
- }
- // /////////////////////////////////////
- private OPCPackage xlsxPackage;
- private BlockingQueue<Map> queue = null;
- private boolean toQueue = false;
- // private int minColumns;
- // private PrintStream output;
- /**
- * Creates a new XLSX -> XML converter
- *
- * @param pkg
- * The XLSX package to process
- * @param output
- * The PrintStream to output the CSV to
- * @param minColumns
- * The minimum number of columns to output, or -1 for no minimum
- */
- public XLSXParser(OPCPackage pkg, BlockingQueue<Map> queue, boolean toQueue) {
- this.xlsxPackage = pkg;
- this.queue = queue;
- this.toQueue = toQueue;
- // this.minColumns = minColumns;
- }
- /**
- * Parses and shows the content of one sheet using the specified styles and
- * shared-strings tables.
- *
- * @param styles
- * @param strings
- * @param sheetInputStream
- */
- public void processSheet(StylesTable styles,
- ReadOnlySharedStringsTable strings, InputStream sheetInputStream)
- throws IOException, ParserConfigurationException, SAXException {
- InputSource sheetSource = new InputSource(sheetInputStream);
- SAXParserFactory saxFactory = SAXParserFactory.newInstance();
- SAXParser saxParser = saxFactory.newSAXParser();
- XMLReader sheetParser = saxParser.getXMLReader();
- Map<String, String> dataMap = new HashMap<String, String>();
- ContentHandler handler = new MyXSSFSheetHandler(styles, strings,
- dataMap);
- sheetParser.setContentHandler(handler);
- sheetParser.parse(sheetSource);
- }
- /**
- * Initiates the processing of the XLS workbook file to CSV.
- *
- * @throws IOException
- * @throws OpenXML4JException
- * @throws ParserConfigurationException
- * @throws SAXException
- */
- public void process() throws IOException, OpenXML4JException,
- ParserConfigurationException, SAXException {
- ReadOnlySharedStringsTable strings = new ReadOnlySharedStringsTable(
- this.xlsxPackage);
- XSSFReader xssfReader = new XSSFReader(this.xlsxPackage);
- StylesTable styles = xssfReader.getStylesTable();
- XSSFReader.SheetIterator iter = (XSSFReader.SheetIterator) xssfReader
- .getSheetsData();
- int index = 0;
- while (iter.hasNext()) {
- InputStream stream = iter.next();
- String sheetName = iter.getSheetName();
- // System.out.println(sheetName + " [index=" + index + "]:");
- processSheet(styles, strings, stream);
- stream.close();
- ++index;
- }
- }
- public static void main(String[] args) throws Exception {
- /*
- * if (args.length < 1) { System.err.println("Use:");
- * System.err.println(" XLSX2CSV <xlsx file> [min columns]"); return; }
- */
- // File xlsxFile = new File(args[0]);
- File xlsxFile = new File("d:/test.xlsx");
- if (!xlsxFile.exists()) {
- System.err
- .println("Not found or not a file: " + xlsxFile.getPath());
- return;
- }
- int minColumns = -1;
- // if (args.length >= 2)
- // minColumns = Integer.parseInt(args[1]);
- minColumns = 2;
- // The package open is instantaneous, as it should be.
- OPCPackage p = OPCPackage.open(xlsxFile.getPath(), PackageAccess.READ);
- XLSXParser xlsxParser = new XLSXParser(p, null, false);
- xlsxParser.process();
- }
- }
public void endElement(String uri, String localName, String name)方法中如下语句:
- if (thisRow > 0 && thisStr != null&& thisStr.trim().length() > 0) {
- // logger.info("dataMap.put()");
- dataMap.put(String.valueOf(thisColumn), thisStr);
- }
- } else if ("row".equals(name)) {
- try {
- if (dataMap.keySet().size() > 0) {
- dataMap = MapUtil.sortByValue(dataMap);
- if (toQueue) {
- queue.put(dataMap);
- }
- }
- } catch (Exception e) {
- logger.error(
- "put data into queue error: " + e.getMessage(), e);
- }
- package batchpoc;
- public class UUID {
- protected static int count = 0;
- public static synchronized String getUUID() {
- count++;
- long time = System.currentTimeMillis();
- String timePattern = Long.toHexString(time);
- int leftBit = 14 - timePattern.length();
- if (leftBit > 0) {
- timePattern = "0000000000".substring(0, leftBit) + timePattern;
- }
- String uuid = timePattern
- + Long.toHexString(Double.doubleToLongBits(Math.random()))
- + Long.toHexString(Double.doubleToLongBits(Math.random()))
- + "000000000000000000";
- uuid = uuid.substring(0, 32).toUpperCase();
- return uuid;
- }
- }
- package batchpoc;
- import java.net.*;
- import java.util.*;
- import java.security.*;
- public class GuidCreator {
- private String seedingString = "";
- private String rawGUID = "";
- private boolean bSecure = false;
- private static Random myRand;
- private static SecureRandom mySecureRand;
- private static String s_id;
- public static final int BeforeMD5 = 1;
- public static final int AfterMD5 = 2;
- public static final int FormatString = 3;
- static {
- mySecureRand = new SecureRandom();
- long secureInitializer = mySecureRand.nextLong();
- myRand = new Random(secureInitializer);
- try {
- s_id = InetAddress.getLocalHost().toString();
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- }
- public GuidCreator() {
- }
- /*
- * Constructor with security option. Setting secure true enables each random
- * number generated to be cryptographically strong. Secure false defaults to
- * the standard Random function seeded with a single cryptographically
- * strong random number.
- */
- public GuidCreator(boolean secure) {
- bSecure = secure;
- }
- /*
- * Method to generate the random GUID
- */
- private void getRandomGUID(boolean secure) {
- MessageDigest md5 = null;
- StringBuffer sbValueBeforeMD5 = new StringBuffer();
- try {
- md5 = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- System.out.println("Error: " + e);
- }
- try {
- long time = System.currentTimeMillis();
- long rand = 0;
- if (secure) {
- rand = mySecureRand.nextLong();
- } else {
- rand = myRand.nextLong();
- }
- // This StringBuffer can be a long as you need; the MD5
- // hash will always return 128 bits. You can change
- // the seed to include anything you want here.
- // You could even stream a file through the MD5 making
- // the odds of guessing it at least as great as that
- // of guessing the contents of the file!
- sbValueBeforeMD5.append(s_id);
- sbValueBeforeMD5.append(":");
- sbValueBeforeMD5.append(Long.toString(time));
- sbValueBeforeMD5.append(":");
- sbValueBeforeMD5.append(Long.toString(rand));
- seedingString = sbValueBeforeMD5.toString();
- md5.update(seedingString.getBytes());
- byte[] array = md5.digest();
- StringBuffer sb = new StringBuffer();
- for (int j = 0; j < array.length; ++j) {
- int b = array[j] & 0xFF;
- if (b < 0x10)
- sb.append('0');
- sb.append(Integer.toHexString(b));
- }
- rawGUID = sb.toString();
- } catch (Exception e) {
- System.out.println("Error:" + e);
- }
- }
- public String createNewGuid(int nFormatType, boolean secure) {
- getRandomGUID(secure);
- String sGuid = "";
- if (BeforeMD5 == nFormatType) {
- sGuid = this.seedingString;
- } else if (AfterMD5 == nFormatType) {
- sGuid = this.rawGUID;
- } else {
- sGuid = this.toString();
- }
- return sGuid;
- }
- public String createNewGuid(int nFormatType) {
- return this.createNewGuid(nFormatType, this.bSecure);
- }
- /*
- * Convert to the standard format for GUID (Useful for SQL Server
- * UniqueIdentifiers, etc.) Example: C2FEEEAC-CFCD-11D1-8B05-00600806D9B6
- */
- public String toString() {
- String raw = rawGUID.toUpperCase();
- StringBuffer sb = new StringBuffer();
- sb.append(raw.substring(0, 8));
- sb.append("-");
- sb.append(raw.substring(8, 12));
- sb.append("-");
- sb.append(raw.substring(12, 16));
- sb.append("-");
- sb.append(raw.substring(16, 20));
- sb.append("-");
- sb.append(raw.substring(20));
- return sb.toString();
- }
- public static void main(String args[]) {
- GuidCreator myGUID = new GuidCreator();
- // System.out.println("Seeding String="
- // + myGUID.createNewGuid(GuidCreator.BeforeMD5));
- // System.out.println("rawGUID="
- // + myGUID.createNewGuid(GuidCreator.AfterMD5));
- System.out.println("RandomGUID="
- + myGUID.createNewGuid(GuidCreator.AfterMD5));
- }
- }
- package batchpoc;
- import java.text.SimpleDateFormat;
- public class GuidByRandom {
- private static int cnt = 0;
- public static synchronized String getGUID() throws Exception {
- StringBuffer code = new StringBuffer();
- try {
- java.util.Date dt = new java.util.Date(System.currentTimeMillis());
- SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddHHmmssSSS");//format system time
- String randomCode = fmt.format(dt);
- cnt = (cnt + 1) % 10000; // You are free the set %100 to
- // 1000,100000
- code.append(randomCode).append(cnt);
- return code.toString();
- } catch (Exception e) {
- throw new Exception("createFileName error:" + e.getMessage(), e);
- }
- }
- public static void main(String[] args) throws Exception {
- System.out.println(getGUID());
- }
- }
- package util;
- public class Constants {
- public final static String ENUMERATION_EXCEL_TASK = "excel";
- public final static String ENUMERATION_TXT_TASK = "txt";
- }
- package util;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- import java.util.Calendar;
- import java.util.Date;
- import java.sql.Blob;
- import java.text.*;
- import java.util.regex.Pattern;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class StringUtil {
- protected final static Logger logger = LoggerFactory
- .getLogger(StringUtil.class);
- public static Object unserializeObj(byte[] bytes) {
- ByteArrayInputStream bais = null;
- try {
- // 反序列化
- bais = new ByteArrayInputStream(bytes);
- ObjectInputStream ois = new ObjectInputStream(bais);
- return ois.readObject();
- } catch (Exception e) {
- logger.error("unserializeObj error:" + e.getMessage(), e);
- }
- return null;
- }
- public static byte[] serializeObj(Object obj) {
- ByteArrayOutputStream bout = null;
- ObjectOutputStream out = null;
- byte[] bytes = null;
- try {
- bout = new ByteArrayOutputStream();
- out = new ObjectOutputStream(bout);
- out.writeObject(obj);
- out.flush();
- bytes = bout.toByteArray();
- } catch (Exception e) {
- logger.error("serializeObject error:" + e.getMessage(), e);
- } finally {
- try {
- if (out != null) {
- out.close();
- out = null;
- }
- } catch (Exception e) {
- }
- try {
- if (bout != null) {
- bout.close();
- bout = null;
- }
- } catch (Exception e) {
- }
- }
- return bytes;
- }
- public static String escpaeCharacters(String s) {
- String val = "";
- try {
- if (s == null || s.length() < 1) {
- return s;
- }
- StringBuilder sb = new StringBuilder(s.length() + 16);
- for (int i = 0; i < s.length(); i++) {
- char c = s.charAt(i);
- switch (c) {
- case '\'':
- sb.append("′");// ´");
- break;
- case '′':
- sb.append("′");// ´");
- break;
- case '\"':
- sb.append(""");
- break;
- case '"':
- sb.append(""");
- break;
- case '&':
- sb.append("&");
- break;
- case '#':
- sb.append("#");
- break;
- case '\\':
- sb.append('¥');
- break;
- case '>':
- sb.append('>');
- break;
- case '<':
- sb.append('<');
- break;
- default:
- sb.append(c);
- break;
- }
- }
- val = sb.toString();
- return val;
- } catch (Exception e) {
- logger.error("sanitized characters error: " + e.getMessage(), e);
- return s;
- }
- }
- public static boolean isNotNullOrEmpty(String str) {
- return str != null && str.trim().length() > 0;
- }
- public static boolean isNull(Object... params) {
- if (params == null) {
- return true;
- }
- for (Object obj : params) {
- if (obj == null) {
- return true;
- }
- }
- return false;
- }
- public static String getString(Object val) {
- String rtnVal = "";
- try {
- rtnVal = (String) val;
- rtnVal = rtnVal.trim();
- } catch (Exception e) {
- rtnVal = "";
- }
- return rtnVal;
- }
- public static String nullToStr(Object val) {
- return ((val == null) ? "" : String.valueOf(val).trim());
- }
- public static int getInt(Object val) {
- int rtnVal = -1;
- String rtnValStr = "-1";
- try {
- rtnValStr = (String) val;
- rtnValStr = rtnValStr.trim();
- rtnVal = Integer.parseInt(rtnValStr);
- } catch (Exception e) {
- rtnVal = -1;
- }
- return rtnVal;
- }
- public static String convertDateToStr(Date dt) {
- String dateStr = "";
- DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- if (dt != null) {
- dateStr = format.format(dt);
- }
- return dateStr;
- }
- public static String convertDateToStr(Date dt, String formatter) {
- String dateStr = "";
- DateFormat format = new SimpleDateFormat(formatter);
- if (dt != null) {
- dateStr = format.format(dt);
- }
- return dateStr;
- }
- public static Date convertStrToDateByFormat(String dateStr) {
- String inputDateStr = "";
- SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
- Date date = null;
- try {
- inputDateStr = dateStr;
- if (dateStr == null || dateStr.trim().length() < 1) {
- inputDateStr = "1900-01-01";
- }
- java.util.Date d = sf.parse(inputDateStr.toString().trim());
- date = new Date(d.getTime());
- } catch (Exception e) {
- logger.error(
- "convertStrToDateByFormat(" + dateStr + ") error:"
- + e.getMessage(), e);
- }
- return date;
- }
- public static Date convertStrToDateByFormat(String dateStr, String formatter) {
- String inputDateStr = "";
- SimpleDateFormat sf = new SimpleDateFormat(formatter);
- Date date = null;
- try {
- inputDateStr = dateStr;
- if (dateStr == null || dateStr.trim().length() < 1) {
- inputDateStr = "1900-01-01 01:01:01";
- }
- java.util.Date d = sf.parse(inputDateStr.toString().trim());
- date = new Date(d.getTime());
- } catch (Exception e) {
- logger.error(
- "convertStrToDateByFormat(" + dateStr + ") error:"
- + e.getMessage(), e);
- }
- return date;
- }
- public static Object deepcopy(Object src) throws Exception {
- ByteArrayOutputStream byteout = null;
- ObjectOutputStream out = null;
- ByteArrayInputStream bytein = null;
- ObjectInputStream in = null;
- Object dest = null;
- try {
- byteout = new ByteArrayOutputStream();
- out = new ObjectOutputStream(byteout);
- out.writeObject(src);
- bytein = new ByteArrayInputStream(byteout.toByteArray());
- in = new ObjectInputStream(bytein);
- dest = (Object) in.readObject();
- } catch (Exception e) {
- throw new Exception("deep copy object[" + src
- + "] error cause by: " + e.getMessage(), e);
- } finally {
- try {
- if (in != null) {
- in.close();
- in = null;
- }
- } catch (Exception e) {
- }
- try {
- if (bytein != null) {
- bytein.close();
- bytein = null;
- }
- } catch (Exception e) {
- }
- try {
- if (out != null) {
- out.close();
- out = null;
- }
- } catch (Exception e) {
- }
- try {
- if (byteout != null) {
- byteout.close();
- byteout = null;
- }
- } catch (Exception e) {
- }
- }
- return dest;
- }
- public static Object blobToObject(Blob blob) throws Exception {
- Object obj = null;
- ObjectInputStream in = null;
- try {
- in = new ObjectInputStream(blob.getBinaryStream());
- obj = in.readObject();
- return obj;
- } catch (Exception e) {
- throw new Exception(e);
- } finally {
- try {
- if (in != null) {
- in.close();
- in = null;
- }
- } catch (Exception e) {
- }
- }
- }
- public static long dateSub(String dateStr) throws ParseException {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
- java.util.Date d = sdf.parse(dateStr);
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- long currentTime = calendar.getTimeInMillis();
- calendar.setTime(d);
- long timeEnd = calendar.getTimeInMillis();
- long theDay = (timeEnd - currentTime) / (1000 * 60 * 60 * 24);
- return theDay;
- }
- public static boolean isNumeric(String str) {
- Pattern pattern = Pattern.compile("[0-9]*");
- return pattern.matcher(str).matches();
- }
- }
- package batchpoc;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import util.Constants;
- public class TestImpLogfile {
- /**
- * @param args
- */
- public static void main(String[] args) {
- //final String fileName = "d:/log_small.csv";
- final String fileName = "d:/test_big.xlsx";
- try {
- GuidCreator myGUID = new GuidCreator();
- BatchDTO taskContext = new BatchDTO();
- String batchId = myGUID.createNewGuid(GuidCreator.AfterMD5);
- taskContext.setPkBtTaskId(batchId);
- taskContext.setTaskName(BatchTask.TXT_IMP_EXP);
- taskContext.setTaskDesc(fileName);
- taskContext.setCommitedBy("unittest");
- taskContext.setStatus(BatchTask.TASK_RUNNING);
- taskContext.setCommitedTime(new Date());
- taskContext.setBatchId(batchId);
- taskContext.setHeadSkip(true);
- //BatchImportExec task = new BatchImportExec(
- // Constants.ENUMERATION_TXT_TASK, fileName, "", taskContext);
- task.doBatch();
- // if (data != null && data.size() > 0) {
- // for (int i = 0; i < data.size(); i++) {
- // System.out.println("rows: " + i + "=====" + data.get(i));
- // }
- // }
- BatchImportExec task = new BatchImportExec( Constants.ENUMERATION_EXCEL_TASK, fileName, "", taskContext);
- task.doBatch();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
来看看读一个20万行记录以逗号“,“分隔的CSV文件 的效率吧。
经 过我实际测试在服务器上,16GB-32GB,4-6核CPU上运行一个导入50万条数据的EXCEL至ORACLE或者是SQL SERVER也只是在5分-8分钟内的事,内存占用不过几十MB,handle线程条数也不过5-10条(等于数据库连接占用数据)。。。。。。在此我想 到了07年。。。。。。我的以前有一个上家公司。。。。。。他们的一个批处理无法是读一个含有8000行,3列的txt文件导入至oracle单表,竟然 要导2-4小时,有时还会OOM。。。。。。感叹中。
当然,大家可能有更好的现在的框架或者是开源的组件如:spring batch, spring cloud来更高效简单的处理这样的批处理任务,但这篇文章的目的是在于使用尽可能简单的方式让大家可以廉价高效更重要的是通过此篇我们知道了:
- 如何处量含有大数据量的excel文件(超过65,535行记录)
- BlockQueue的妙用
- 如何在线程任务中计算整个过程耗时的方法
笔者拿这东西写过一个按照输入关键字找含有相关内容的文本文件的搜索引擎,搜索速度比windows自带搜索快了许多,是java swing界面的,有兴趣的同鞋也可以自己去做做玩玩。
- 如果要处理的文本文件不是用逗号”,“分隔的,如何做到动态可配置Txt文件Parser时的分隔符?
- 如 何支持多任务操作,即一个系统中对于多个不同格式的文件甚至数据库同时进行批处理,如:先启动一个100万行的txt文件的导入工作,再启动一个100万 行xls文件的导入,再启动对MYSQL中一张含有100万行记录的表导入到oracle的一个表中,这样系统中有3个任务,这3个任务都是10个线 程+1000个queue.size的任务,如何知道它们目前的运行情况是pending, finished还是stop or fail,甚至可以人为的去stop, suspend, restart这些批处理任务呢?
本文将详细解析标题和描述中提到的"C# Socket客户端服务端封装,支持多连接处理,Tasks多线程,队列处理,大数据拆分包处理"的相关知识点。 首先,Socket在C#中是.NET Framework提供的一个类库,它允许开发者构建...
在处理大数据时,还可以考虑以下优化策略: 1. 并行处理:利用Web Workers在后台线程中处理数据,避免阻塞主线程,提高性能。 2. 数据预处理:在上传前,可以在客户端进行数据清洗和过滤,减少传输的数据量。 3. ...
在SAP PI(Process Integration)系统中,消息处理过程中可能会遇到队列堵塞的情况,这通常会导致业务流程中断或延迟。本文将详细介绍如何通过SAP PI的SMQ2事务代码来诊断并解决队列堵塞的问题。 #### 二、队列堵塞...
本文将详细介绍线程池原理、使用场景及注意事项,以及阻塞队列的相关知识。 首先,线程池是一种基于池化思想管理线程的技术,它可以重用一组线程执行多个任务。线程池的工作原理是通过维护一定数量的工作线程,这些...
本篇文章将深入探讨如何在C#中利用`TreeView`控件处理大数据XML文件,包括XML的加载、`TreeView`的显示以及查询方法。 首先,让我们理解XML的基本概念。XML(eXtensible Markup Language)是一种用于存储和传输结构...
在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...
C++11 实现的阻塞队列 C++11 中的阻塞队列是指在多线程环境下,实现...综上,我们可以使用 C++11 中的标准库来实现阻塞队列,并使用 std::mutex、std::condition_variable 和 std::queue 等来实现线程安全和阻塞机制。
以下是一个具体的案例,展示了如何使用队列来处理文章发布的并发问题: 1. **QueueInfo类**:首先,定义一个名为`QueueInfo`的类,用于存储待处理文章的相关信息,如媒体(medias)、产品ID(proids)、主机名...
传统IO基于流模型,是同步阻塞的,即每次读写操作都需要等待完成才能进行下一步,这在处理大数据时可能导致效率低下。而NIO则提供了非阻塞模式,允许同时处理多个输入/输出事件,提高了系统并发能力。 在大数据导出...
- **定义**:阻塞队列是一种特殊的队列,除了具有队列的基本特性外,还提供了额外的阻塞行为,即当队列空时,从队列中获取元素的操作将会阻塞,等待队列变得非空;当队列满时,向队列插入元素的操作也会阻塞,等待...
5. **两锁队列算法**:通过使用两个锁来控制并发访问,允许一定程度的并行操作而减少阻塞。 #### 实现细节 - **非阻塞并发队列**:该算法利用CAS等原子操作实现,确保在多线程环境下数据的一致性和完整性。每个...
这可以通过设置合适的请求间隔,或者使用队列管理策略来实现,例如使用setInterval定时发送请求,或者利用Promise.all处理一批数据后再发送下一批。 在实际操作中,前端可能需要先对数据进行预处理,例如过滤无效...
后台工作者会从队列中取出任务并执行,从而实现了请求的非阻塞处理。这样,前端可以快速响应用户,而不会因为等待耗时操作而阻塞。 Redis作为一款高性能的键值数据库,常被用于构建消息队列,因为它提供了丰富的...
在“可以阻塞读的循环队列”中,我们主要关注的是如何在队列满时阻止读取操作,直到有新的元素入队,以及如何确保在多线程环境中的安全性。 首先,我们来理解循环队列的基本概念。它由一个固定大小的数组和两个指针...
阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...