logo资料库

hbase海量数据的全量导入方法.doc

第1页 / 共5页
第2页 / 共5页
第3页 / 共5页
第4页 / 共5页
第5页 / 共5页
资料共5页,全文预览结束
1.1.1
1.1.1 hbase 海量数据的全量导入方法 博客分类: 程序代码  HBase 数据结构 HadoopApacheMapreduce 最近有个需求要对 mysql 的全量数据迁移到 hbase,虽然 hbase 的设计非常利于高效的 读取,但是它的 compaction 实现对海量数据写入造成非常大的影响,数据到一定量之后, 就开始抽风。 分析 hbase 的实现,不管其运行的机制,其最终存储结构为分布式文件系统中的 hfile 格式。 刚好 hbase 的源代码中提供一个 HFileOutputFormat 类,分析其源代码可以看到: Java 代码 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. See the NOTICE file The ASF licenses this file /** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. * distributed with this work for additional information * regarding copyright ownership. * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hbase.mapreduce; http://www.apache.org/licenses/LICENSE-2.0 import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.mortbay.log.Log; /** * Writes HFiles. Passed KeyValues must arrive in order. * Currently, can only write files to a single column family at a * time. Multiple column families requires coordinating keys cross family. * Writes current time as the sequence id for the file. Sets the major compa cted * attribute on created hfiles. * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat { 51. public RecordWriter getRecordWriter(Task AttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context).getW orkPath(); Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml final long maxsize = conf.getLong("hbase.hregion.max.filesize", 26843545 6); n. er. final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536); // Invented config. Add to hbase-*.xml if other than default compressio final String compression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); return new RecordWriter() { // Map of families to writers and how much has been output on the writ private final Map writers = new TreeMap(Bytes.BYTES_COMPARATOR); 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68.
69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. ()); = 0) { nly"); s. 87. 88. 89. 90. 91. 92. 93. 94. 95. 96. 97. 98. 99. 100. 101. 102. 103. 104. 105. 106. 107. 108. private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte [] now = Bytes.toBytes(System.currentTimeMillis public void write(ImmutableBytesWritable row, KeyValue kv) throws IOException { long length = kv.getLength(); byte [] family = kv.getFamily(); WriterLength wl = this.writers.get(family); if (wl == null || ((length + wl.written) >= maxsize) && Bytes.compareTo(this.previousRow, 0, this.previousRow.length, kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) ! // Get a new writer. Path basedir = new Path(outputdir, Bytes.toString(family)); if (wl == null) { wl = new WriterLength(); this.writers.put(family, wl); if (this.writers.size() > 1) throw new IOException("One family o // If wl == null, first file in family. Ensure family dir exit if (!fs.exists(basedir)) fs.mkdirs(basedir); } wl.writer = getNewWriter(wl.writer, basedir); Log.info("Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); wl.written = 0; } kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; // Copy the row so we know when a row transition. this.previousRow = kv.getRow(); } /* Create a new HFile.Writer. Close current if there is one. * @param writer * @param familydir * @return A new HFile.Writer. * @throws IOException */ private HFile.Writer getNewWriter(final HFile.Writer writer, final Path familydir)
throws IOException { close(writer); return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydi blocksize, compression, KeyValue.KEY_COMPARATOR); } private void close(final HFile.Writer w) throws IOException { if (w != null) { StoreFile.appendMetadata(w, System.currentTimeMillis(), true); w.close(); } } public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (Map.Entry e: this.writers.entrySet close(e.getValue().writer); } } }; } /* * Data structure to hold a Writer and amount of data written on it. */ static class WriterLength { long written = 0; HFile.Writer writer = null; } 109. 110. 111. r), 112. 113. 114. 115. 116. 117. 118. 119. 120. 121. 122. 123. 124. ()) { 125. 126. 127. 128. 129. 130. 131. 132. 133. 134. 135. 136. 137. 138. } 可以看到,它的工作流程就是首先根据你的配置文件初始化,然后写成 hfile 的格式。 这里我做了个偷懒的 demo: Java 代码 1. 2. 3. 4. HFileOutputFormat hf = new HFileOutputFormat(); HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/home/performance/softs/hadoop/conf/core- site.xml")); conf.set("mapred.output.dir", "/tmp");
5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. conf.set("hfile.compression", Compression.Algorithm.LZO.getName ()); TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAt temptID()); RecordWriter writer = hf.getRecordWriter(context); KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toB ytes("offer:action"), ("test")); System.currentTimeMillis(), Bytes.toBytes KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.to Bytes("offer:id"), s("123")); System.currentTimeMillis(), Bytes.toByte KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.to Bytes("offer:action"), s("test")); System.currentTimeMillis(), Bytes.toByte KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.to System.currentTimeMillis(), Bytes.toByte Bytes("offer:id"), s("123")); writer.write(null, kv); writer.write(null, kv1); writer.write(null, kv3); writer.write(null, kv4); writer.close(context); 执行然之后,会在 hdfs 的/tmp 目录下生成一份文件。注意批量写数据的时候一定要保证 key 的有序性 这个时候,hbase 自己提供的一个基于 jruby 的 loadtable.rb 脚本就可以发挥作用了。 它的格式是 loadtable.rb 你希望的表明 hdfs 路径: hbase org.jruby.Mainloadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/ 执行完之后: 运行./hbase shell >list 就会显示刚才导入的 offer 表了。
分享到:
收藏