<GANGLIA_XML VERSION="3.6.0" SOURCE="gmond"> <CLUSTER NAME="Yarn_Cluster" LOCALTIME="1397029444" OWNER="apache" LATLONG="unspecified" URL="unspecified"> <HOST NAME="host1" IP="" TAGS="" REPORTED="1397029434" TN="10" TMAX="20" DMAX="86400" LOCATION="unspecified" GMOND_STARTED=" 1395401999"> ...... <METRIC NAME="jvm.JvmMetrics.GcTimeMillis" VAL="3730009" TYPE="float" UNITS="" TN="56" TMAX="60" DMAX="0" SLOPE="both"> <EXTRA_DATA> <EXTRA_ELEMENT NAME="GROUP" VAL="jvm.JvmMetrics"/> </EXTRA_DATA> </METRIC> ....... </HOST> </CLUSTER> </GANGLIA_XML>
可以看出,ganglia中的一个metrics是属于某个集群的某个机器的。它本身的属性有:NAME="jvm.JvmMetrics.GcTimeMillis" VAL="3730009" TYPE="float" UNITS="" TN="56" TMAX="60" DMAX="0" SLOPE="both"。很多属性都是自描述的,不再赘述。
下面是一个简单的java 的GangliaMetricsSender实现:
import org.apache.hadoop.metrics.spi.Util; import; import*; import java.util.HashMap; import java.util.List; import java.util.Map; /** * This is NOT thread safe. * <p/> * User: mzang * Date: 1/7/14 * Time: 3:55 PM */ public class GangliaSender { public static final int SLOPE_ZERO = 0; public static final int SLOPE_POSITIVE = 1; public static final int SLOPE_NEGATIVE = 2; public static final int SLOPE_BOTH = 3; public static final int SPOOF_FALSE = 0; public static final int SPOOF_TRUE = 1; public static String intType = "int32"; public static String doubleType = "double"; public static String floatType = "float"; public int defaultSlope = SLOPE_BOTH; public int defaultSpoof = SPOOF_TRUE; public int tmax = 60; public int dmax = 0; public String units = ""; private List<InetSocketAddress> metricsServers; private DatagramSocket datagramSocket; protected byte[] buffer = new byte[1500]; protected int offset; public GangliaSender(String targetGmonds) { init(targetGmonds); } private void init(String targetGmonds) { metricsServers = Util.parse(targetGmonds, 8649); try { datagramSocket = new DatagramSocket(); } catch (SocketException se) { se.printStackTrace(); } } public void sendMetricInt32(String hostName, String metricName, int value) { try { this.sendMetric(hostName, metricName, intType, units, String.valueOf(value), defaultSpoof, defaultSlope, tmax, dmax); } catch (IOException e) { e.printStackTrace(); } } public void sendMetricDouble(String hostName, String metricName, double value) { try { this.sendMetric(hostName, metricName, doubleType, units, String.valueOf(value), defaultSpoof, defaultSlope, tmax, dmax); } catch (IOException e) { e.printStackTrace(); } } public void sendMetricFloat(String hostName, String metricName, double value) { try { this.sendMetric(hostName, metricName, floatType, units, String.valueOf(value), defaultSpoof, defaultSlope, tmax, dmax); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws UnknownHostException { String hostName1 = ""; String metricName1 = "postive"; GangliaSender gs1 = new GangliaSender("host1:8649"); gs1.defaultSlope = SLOPE_POSITIVE; gs1.defaultSpoof = SPOOF_TRUE; String hostName2 = ""; String metricName2 = "nagative"; GangliaSender gs2 = new GangliaSender("host2:8649"); gs2.defaultSlope = SLOPE_NEGATIVE; gs2.defaultSpoof = SPOOF_TRUE; String hostName3 = ""; String metricName3 = "both"; GangliaSender gs3 = new GangliaSender("host3:8649"); gs3.defaultSlope = SLOPE_BOTH; gs3.defaultSpoof = SPOOF_TRUE; int positive = 0; int negative = 0; while (true) { gs1.sendMetricFloat(hostName1, metricName1 + ".doublevalue", positive += (Math.random() * 1000)); gs1.sendMetricInt32(hostName1, metricName1 + ".intvalue", positive += (int) (Math.random() * 1000)); gs2.sendMetricDouble(hostName2, metricName2 + ".doublevalue", negative -= (Math.random() * 1000)); gs2.sendMetricInt32(hostName2, metricName2 + ".intvalue", negative -= (int) (Math.random() * 1000)); gs3.sendMetricDouble(hostName3, metricName3 + ".doublevalue", (Math.random() * 1000)); gs3.sendMetricInt32(hostName3, metricName3 + ".intvalue", (int) (Math.random() * 1000)); try { Thread.sleep(1000); } catch (InterruptedException e) { } } } private static Map<String, String> hostname2IP = new HashMap<String, String>(); private static String getIPByHostname(String hostname) { String ip = hostname2IP.get(hostname); if (ip == null) { InetAddress ia = null; try { ia = InetAddress.getByName(hostname); ip = ia.getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); ip = "NA"; } hostname2IP.put(hostname, ip); } return ip; } public void sendMetric(String hostName, String metricName, String type, String units, String value, int spoof, int slope, int tmax, int dmax) throws IOException { // format must be ip:hostname, e.g. String ipAndHostName = getIPByHostname(hostName) + ":" + hostName; String groupName = metricName.substring(0, metricName.lastIndexOf(".")); // The following XDR recipe was done through a careful reading of // gm_protocol.x in Ganglia 3.1 and carefully examining the output of // the gmetric utility with strace. // First we send out a metadata message xdr_int(128); // metric_id = metadata_msg xdr_string(ipAndHostName); // hostname xdr_string(metricName); // metric name xdr_int(spoof); // spoof = True xdr_string(type); // metric type xdr_string(metricName); // metric name xdr_string(units); // units xdr_int(slope); // slope xdr_int(tmax); // tmax, the maximum time between metrics xdr_int(dmax); // dmax, the maximum data value xdr_int(1); /*Num of the entries in extra_value field for Ganglia 3.1.x*/ xdr_string("GROUP"); /*Group attribute*/ xdr_string(groupName); /*Group value*/ sendMetricData(); // Now we send out a message with the actual value. // Technically, we only need to send out the metadata message once for // each metric, but I don't want to have to record which metrics we did and // did not send. xdr_int(133); // we are sending a string value xdr_string(ipAndHostName); // ipAndHostName xdr_string(metricName); // metric name xdr_int(spoof); // spoof = True xdr_string("%s"); // format field xdr_string(value); // metric value sendMetricData(); } /** * Puts a string into the buffer by first writing the size of the string * as an int, followed by the bytes of the string, padded if necessary to * a multiple of 4. */ protected void xdr_string(String s) { byte[] bytes = s.getBytes(); int len = bytes.length; xdr_int(len); System.arraycopy(bytes, 0, buffer, offset, len); offset += len; pad(); } /** * Pads the buffer with zero bytes up to the nearest multiple of 4. */ private void pad() { int newOffset = ((offset + 3) / 4) * 4; while (offset < newOffset) { buffer[offset++] = 0; } } /** * Puts an integer into the buffer as 4 bytes, big-endian. */ protected void xdr_int(int i) { buffer[offset++] = (byte) ((i >> 24) & 0xff); buffer[offset++] = (byte) ((i >> 16) & 0xff); buffer[offset++] = (byte) ((i >> 8) & 0xff); buffer[offset++] = (byte) (i & 0xff); } protected void sendMetricData() throws IOException { try { for (SocketAddress socketAddress : metricsServers) { DatagramPacket packet = new DatagramPacket(buffer, offset, socketAddress); datagramSocket.send(packet); } } finally { offset = 0; } } }
