flumeng agent collect log events to flumeng collector
flumeng collector sends data into hbase
flumeng property file is monitored and controlled by JMX
Other MBean Example existing in Flume
ObjectName org.apache.flume.channel:type=fc
ClassName org.apache.flume.instrumentation.ChannelCounter
ObjectName org.apache.flume.source:type=S1
ClassName org.apache.flume.instrumentation.SourceCounter
1. Define the flume ng property file management MBean interface
import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.commons.configuration.ConfigurationException; /** * <p> * {@link PropertyFileManagerMBean} provides methods to handle * Create/Read/Update/Delete operations on properties file. * </p> */ public interface PropertyFileManagerMBean { /** * <p>Add or Update a property</p> * * @param key property key * @param value property value * @throws ConfigurationException throws if save operation fails. */ public void setProperty(String key, String value) throws ConfigurationException; /** * <p>Add or Update set of properties</p> * * @param properties Properties with key, value pair in the form of {@link Map}. * @throws ConfigurationException throws if save operation fails. */ public void setProperties(Map<String, String> properties) throws ConfigurationException ; /** * <p> Get property value</p> * * @param key Property key * @return Property value */ public String getProperty(String key) ; /** * <p> Read properties for list of input keys <p> * * @param keys The list of properties keys * @return Properties with key, value pair in the form of {@link Map}. */ public Map<String, String> getProperties(List<String> keys); /** * <p> * Read list of properties with key starts with input value of parentKey. * </p> * * @param parentKey Prefix of Property key. * @return Properties with key, value pair in the form of {@link Map}. */ public Map<String, String> getPropertiesWithKeyPrefix(String parentKey); /** * <p>Remove property with provided key from property file.</p> * @param key Key of the property to be removed. * @throws ConfigurationException throws if save operation fails. */ public void deleteProperty(String key) throws ConfigurationException; /** * <p>Remove property with provided list of keys from property file.</p> * @param keys Properties key list to be removed. * @throws ConfigurationException throws if save operation fails. */ public void deleteProperties(List<String> keys) throws ConfigurationException; /** * <p>Read configuration properties file content from local file system.</p> * @return Configuration properties file content. */ public String readPropertyFile(); /** * <p>Write configuration properties file content from local file system.</p> * * @param file */ public void writePropertyFile(String fileContent) throws IOException; }
2 provide an implementation
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * <p> * Each flume-ng agent has configurations for its components like source, channel * and sinks. {@link PropertyFileManager} class will help users to access the agent * configuration file from remote location by connecting to JMX port enabled * while agent getting started. * </p> */ public class PropertyFileManager implements PropertyFileManagerMBean{ private PropertiesConfiguration config = null; private static PropertyFileManager propertyFileManager = null; private final Logger LOG = LoggerFactory.getLogger(PropertyFileManager.class); private PropertyFileManager(String filePath) { init(filePath); } public static PropertyFileManager getInstance(String filePath) { if(propertyFileManager == null) propertyFileManager = new PropertyFileManager(filePath); return propertyFileManager; } private void init(String filePath) { LOG.info("Initializing PropertyFileManager for configuration file - " + filePath); try { config = new PropertiesConfiguration(new File(filePath)); config.setReloadingStrategy(new FileChangedReloadingStrategy()); } catch (ConfigurationException e) { LOG.error("Failed to load flume configuration file - " + filePath + "\n"+ e); } } /** * <p>Add or Update a property</p> * * @param key property key * @param value property value * @throws ConfigurationException throws if save operation fails. */ public void setProperty(String key, String value) throws ConfigurationException { LOG.info("Set property for Key - " + key + " , Value - "+value); config.setProperty(key, value); config.save(); } /** * <p>Add or Update set of properties</p> * * @param properties Properties with key, value pair in the form of {@link Map}. * @throws ConfigurationException throws if save operation fails. */ public void setProperties(Map<String, String> properties) throws ConfigurationException { LOG.info("Update config file with - " + properties); if (properties != null) { for (String key : properties.keySet()) { config.setProperty(key, properties.get(key)); } } config.save(); } /** * <p> Get property value</p> * * @param key Property key * @return Property value * @throws ConfigurationException */ public String getProperty(String key) { LOG.info("Read property - " + key); return (String)config.getProperty(key); } /** * <p> Read properties for list of input keys <p> * * @param keys The list of properties keys * @return Properties with key, value pair in the form of {@link Map}. * @throws ConfigurationException */ public Map<String, String> getProperties(List<String> keys) { LOG.info("Read properties - " + keys); Map<String, String> properties = new LinkedHashMap<String, String>(); for(String key : keys) { String value = (String)config.getProperty(key); properties.put(key, value); } return properties; } /** * <p> * Read list of properties with key starts with input value of parentKey. * </p> * * @param parentKey Prefix of Property key. * @return Properties with key, value pair in the form of {@link Map}. * @throws ConfigurationException */ public Map<String, String> getPropertiesWithKeyPrefix(String parentKey) { LOG.info("Read config properties of key prefix with" + parentKey); Map<String, String> properties = new LinkedHashMap<String, String>(); Iterator<String> keys = config.getKeys(parentKey); while(keys.hasNext()) { String key = keys.next(); String value = (String)config.getProperty(key); properties.put(key, value); } return properties; } /** * <p>Remove property with provided key from property file.</p> * @param key Key of the property to be removed. * @throws ConfigurationException throws if save operation fails. */ public void deleteProperty(String key) throws ConfigurationException { LOG.info("Delete config property with key " + key); config.clearProperty(key); config.save(); } /** * <p>Remove property with provided list of keys from property file.</p> * @param keys Properties key list to be removed. * @throws ConfigurationException throws if save operation fails. */ public void deleteProperties(List<String> keys) throws ConfigurationException { LOG.info("Delete config properties with keys " + keys); for(String key : keys) { config.clearProperty(key); } config.save(); } /** * <p>Read configuration properties file content from local file system.</p> * @return Configuration properties file content. */ public String readPropertyFile() { LOG.info("Reading config properties file.." + config.getURL()); String propertyFile =""; try { propertyFile = readFileAsString(config.getFile()); } catch(Exception e) { LOG.error("Failed to read config file" + config.getURL()); } LOG.info("Successfully completed reading config properties file.." + config.getURL()); return propertyFile; } /** * <p>Write configuration properties file content to local file system.</p> * * @param file */ public void writePropertyFile(String fileContent) throws IOException { LOG.info("Write below content to config properties file.." + config.getURL() + "\n" + fileContent); File file = config.getFile(); FileWriter out = new FileWriter(file); out.write(fileContent); out.flush(); out.close(); LOG.info("Successfully completed writing config properties file.." + config.getURL()); config.reload(); } private String readFileAsString(File file) throws java.io.IOException { if(file == null ) return ""; StringBuffer fileData = new StringBuffer(1000); BufferedReader reader = new BufferedReader(new FileReader(file)); char[] buf = new char[1024]; int numRead = 0; while ((numRead = reader.read(buf)) != -1) { String readData = String.valueOf(buf, 0, numRead); fileData.append(readData); buf = new char[1024]; } reader.close(); return fileData.toString(); } /** * @param args */ public static void main(String[] args) { PropertyFileManager fileManager = new PropertyFileManager("C:\\WorkSpace\\flume-conf-TEMPLATE.properties"); //System.out.println(fileManager.getProperties("agent")); try { fileManager.deleteProperty("agent.sources"); } catch (ConfigurationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
3 LogInterceptor is defined to add other meta information for the log events.
This interceptor is deployed at flume agent side to collect log events.
There is a MBean denfined here.
import LogInterceptor.Constants.APP_ID; import LogInterceptor.Constants.ENV; import LogInterceptor.Constants.HOST_NAME; import LogInterceptor.Constants.LOG_FILE_NAME; import LogInterceptor.Constants.LOG_FILE_PATH; import LogInterceptor.Constants.LOG_TYPE; import LogInterceptor.Constants.TIMESTAMP; import LogInterceptor.Constants.TIME_STAMP_NANO; import java.lang.management.ManagementFactory; import java.util.List; import java.util.Map; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.flume.interceptor.TimestampInterceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import PropertyFileManager; /** * Simple Interceptor class that sets the current system timestamp on all events * that are intercepted. * By convention, this timestamp header is named "timestamp" and its format * is a "stringified" long timestamp in milliseconds since the UNIX epoch. */ public class LogInterceptor implements Interceptor { private final Logger LOG = LoggerFactory.getLogger(LogInterceptor.class); private final String hostName; private final String env; private final String logType; private final String appId; private final String logFilePath; private final String logFileName; private MBeanServer beanServer; /** * Only {@link TimestampInterceptor.Builder} can build me */ private LogInterceptor(String hostName, String env, String logType, String appId, String logFilePath, String logFileName) { LOG.info("Instantiate LogInterceptor Object.."); this.hostName = hostName; this.env = env; this.logType = logType; this.appId = appId; this.logFilePath = logFilePath; this.logFileName = logFileName; if (beanServer == null) { beanServer = ManagementFactory.getPlatformMBeanServer(); try { ObjectName objectName = new ObjectName( "com.dean.cate.elastic:name=Agent"); if(!beanServer.isRegistered(objectName)) { String filePath = PropertyFileManager.class.getClassLoader().getResource("flume-conf.properties").getFile(); beanServer.registerMBean(PropertyFileManager.getInstance(filePath),objectName ); LOG.info("Register PropertyFileManager MBean to manage configuration file - " + filePath); } } catch (Exception ex) { LOG.error("Failed to register monitored counter group for Property config Manager " + ", name: " + this.getClass().getName() + ex); } } } @Override public void initialize() { // no-op } /** * Modifies events in-place. */ @Override public Event intercept(Event event) { eventCounter++; //env, logType, appId, logPath and logFileName Map<String, String> headers = event.getHeaders(); long now = System.currentTimeMillis(); String nowNano = Long.toString(System.nanoTime()); //nowNano = nowNano.substring(nowNano.length()-5); headers.put(TIMESTAMP, Long.toString(now)); headers.put(HOST_NAME, hostName); headers.put(ENV, env); headers.put(LOG_TYPE, logType); headers.put(APP_ID, appId); headers.put(LOG_FILE_PATH, logFilePath); headers.put(LOG_FILE_NAME, logFileName); headers.put(TIME_STAMP_NANO, nowNano); return event; } /** * Delegates to {@link #intercept(Event)} in a loop. * @param events * @return */ @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { // no-op } /** * Builder which builds new instances of the LogInterceptor. */ public static class Builder implements Interceptor.Builder { private String hostName = ""; private String env = "env"; private String logType = "logType"; private String appId = "appId"; private String logFilePath = "logFilePath"; private String logFileName = "logFileName"; @Override public Interceptor build() { return new LogInterceptor(hostName, env, logType, appId, logFilePath, logFileName); } @Override public void configure(Context context) { hostName = context.getString(HOST_NAME,""); env = context.getString(ENV, ""); logType = context.getString(LOG_TYPE, ""); appId = context.getString(APP_ID, ""); logFilePath = context.getString(LOG_FILE_PATH, ""); logFileName = context.getString(LOG_FILE_NAME, ""); } } public static class Constants { public static String TIMESTAMP = "timestamp"; public static String PRESERVE = "preserveExisting"; public static String HOST_NAME = "hostName"; public static boolean PRESERVE_DFLT = false; public static String ENV = "env"; public static String LOG_TYPE = "logType"; public static String APP_ID = "appId"; public static String LOG_FILE_PATH = "logFilePath"; public static String LOG_FILE_NAME = "logFileName"; public static String TIME_STAMP_NANO= "nano"; } }
4. LogHbaseEventSerializer is defined to send events into hbase.
It's deployed at flume collector side and there is an MBean defined here.
import static LogHbaseEventSerializer.Constants.APP_ID; import static LogHbaseEventSerializer.Constants.ENV; import static LogHbaseEventSerializer.Constants.HOST_NAME; import static LogHbaseEventSerializer.Constants.LOG_FILE_NAME; import static LogHbaseEventSerializer.Constants.LOG_FILE_PATH; import static LogHbaseEventSerializer.Constants.LOG_TYPE; import static LogHbaseEventSerializer.Constants.TIMESTAMP; import static LogHbaseEventSerializer.Constants.TIME_STAMP_NANO; import java.lang.management.ManagementFactory; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.HbaseEventSerializer; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import PropertyFileManager; import com.google.common.base.Charsets; import com.google.common.collect.Lists; /** * An {@link HbaseEventSerializer} which parses columns based on a supplied * regular expression and column name list. * * Note that if the regular expression does not return the correct number of * groups for a particular event, or it does not correctly match an event, * the event is silently dropped. * * Row keys for each event consist of a timestamp concatenated with an * identifier which enforces uniqueness of keys across flume agents. * * See static constant variables for configuration options. */ public class LogHbaseEventSerializer implements HbaseEventSerializer { /** Comma separated list of column names to place matching groups in. */ public static final String COL_NAME_CONFIG = "colNames"; public static final String COLUMN_NAME_DEFAULT = "payload"; public static final byte[] CONTENT = "content".getBytes(Charsets.UTF_8); public static final byte[] SYS_INFO = "sysInfo".getBytes(Charsets.UTF_8); private final Logger LOG = LoggerFactory.getLogger(LogHbaseEventSerializer.class); protected byte[] cf; private byte[] payload; private Map<String,String> headers = new HashMap<String, String>(); private MBeanServer beanServer; @Override public void configure(Context context) { LOG.info("Configuring LogHbaseEventSerializer.."); if (beanServer == null) { beanServer = ManagementFactory.getPlatformMBeanServer(); try { ObjectName objectName = new ObjectName( "com.dean.cate.elastic:name=Agent"); if(!beanServer.isRegistered(objectName)) { String filePath = PropertyFileManager.class.getClassLoader().getResource("flume-conf.properties").getFile(); beanServer.registerMBean(PropertyFileManager.getInstance(filePath),objectName ); LOG.info("Successfully registered PropertyFileManager MBean to manage configuration file - " + filePath); } } catch (Exception ex) { LOG.error("Failed to register monitored counter group for Property config Manager " + ", name: " + this.getClass().getName() + ex); } } } @Override public void configure(ComponentConfiguration conf) { } @Override public void initialize(Event event, byte[] columnFamily) { this.payload = event.getBody(); this.cf = columnFamily; this.headers.putAll(event.getHeaders()); } protected byte[] getRowKey() { String appId = headers.get(APP_ID); String timeStamp = headers.get(TIMESTAMP); String timeNano = headers.get(TIME_STAMP_NANO); String hostName = headers.get(HOST_NAME); int index = hostName.indexOf('.'); hostName = hostName.substring(0, index>0?index:hostName.length()); String rowKey = String.format("%s:%s:%s:%s",appId,timeStamp,timeNano,hostName); return rowKey.getBytes(Charsets.UTF_8); } @Override public List<Row> getActions() throws FlumeException { List<Row> actions = Lists.newArrayList(); byte[] rowKey; String appId = headers.get(APP_ID); String hostName = headers.get(HOST_NAME); String env = headers.get(ENV); String logType = headers.get(LOG_TYPE); String logFilePath = headers.get(LOG_FILE_PATH); String logFileName = headers.get(LOG_FILE_NAME); try { rowKey = getRowKey(); Put put = new Put(rowKey); put.add(SYS_INFO, HOST_NAME.getBytes(Charsets.UTF_8), hostName.getBytes(Charsets.UTF_8)); put.add(SYS_INFO, ENV.getBytes(Charsets.UTF_8), env.getBytes(Charsets.UTF_8)); put.add(SYS_INFO, APP_ID.getBytes(Charsets.UTF_8), appId.getBytes(Charsets.UTF_8)); put.add(CONTENT, LOG_TYPE.getBytes(Charsets.UTF_8), logType.getBytes(Charsets.UTF_8)); put.add(CONTENT, LOG_FILE_PATH.getBytes(Charsets.UTF_8), logFilePath.getBytes(Charsets.UTF_8)); put.add(CONTENT, LOG_FILE_NAME.getBytes(Charsets.UTF_8), logFileName.getBytes(Charsets.UTF_8)); put.add(CONTENT, "body".getBytes(Charsets.UTF_8),payload); actions.add(put); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } return actions; } @Override public List<Increment> getIncrements() { return Lists.newArrayList(); } @Override public void close() { } public static class Constants { public static String TIMESTAMP = "timestamp"; public static String PRESERVE = "preserveExisting"; public static String HOST_NAME = "hostName"; public static boolean PRESERVE_DFLT = false; public static String ENV = "env"; public static String LOG_TYPE = "logType"; public static String APP_ID = "appId"; public static String LOG_FILE_PATH = "logFilePath"; public static String LOG_FILE_NAME = "logFileName"; public static String TIME_STAMP_NANO= "nano"; } }
5. JMXConnectionUtils is used to manage MBean connections.
import java.io.IOException; import java.net.MalformedURLException; import javax.management.MBeanServerConnection; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; public class JMXConnectionUtils { public static final String JMX_URL_CONN_STRING = "service:jmx:rmi:///jndi/rmi://<HOST>:<PORT>/jmxrmi"; public static JMXConnector getJMXConnector(String host, String port) { JMXServiceURL url = null; JMXConnector jmxConnector = null; try { String urlString = prepareJMXURL(host, port); url = new JMXServiceURL(urlString); jmxConnector = JMXConnectorFactory.connect(url, null); } catch (MalformedURLException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return jmxConnector; } public static MBeanServerConnection getMBeanServerConnection(JMXConnector jmxConnector) { MBeanServerConnection mBeanServerConnection = null; try { if(jmxConnector != null) mBeanServerConnection = jmxConnector.getMBeanServerConnection(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return mBeanServerConnection; } public static void closeJMXConnector(JMXConnector jmxConnector) { try { if(jmxConnector!=null) jmxConnector.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static String prepareJMXURL(String host, String port) { String url = JMX_URL_CONN_STRING.replaceFirst("<HOST>", host); url = url.replaceFirst("<PORT>", port); return url; } }
6. FlumeConfiguration is constructed to manage all the sources on flume agent by using the MBean
import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.net.MalformedURLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.StringTokenizer; import java.util.TreeMap; import javax.management.Attribute; import javax.management.AttributeList; import javax.management.AttributeNotFoundException; import javax.management.InstanceNotFoundException; import javax.management.MBeanException; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.ReflectionException; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import org.apache.commons.configuration.ConfigurationException; import org.codehaus.groovy.ast.stmt.ThrowStatement; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import JMXConnectionUtils.*; /** * <p> * {@link FlumeConfiguration} Number of flume agents running on remote * locations. This call will help to perform create, update and delete source * configuration on remote agent configurations. * </p> * <p> * This class has implemented methods to read/update remote configuration file as whole. * </p> * */ public class FlumeConfiguration { public static final String OBJECT_NAME = "com.dean.cate.elastic:name=Agent"; private final Logger LOG = LoggerFactory.getLogger(FlumeConfiguration.class); /** * <p> * Create new flume agent source where agent is running on host:port * </p> * * @param propertiesJson * JSON object with list of Log Intercepter values (appId, env, * hostName, logFileName, logFilePath, logType) * @param host * Host name or IP address of flume agent running. * @param port * JMX port enabled while running flume agent service * @param sourceName * Name of the source to create * @return Status of new source component creation. TRUE if creation * happened successfully otherwise FALSE. */ public boolean createSource(String propertiesJson, String host, String port, String sourceName) { LOG.info("Start creating new source " + sourceName + " for agent running on " + host + ":" + port + ". Configuration input parameters are - " + propertiesJson); boolean result = true; if (propertiesJson == null || propertiesJson.isEmpty() || host == null || host.isEmpty() || port == null || port.isEmpty() || sourceName == null || sourceName.isEmpty()) return false; JMXConnector jmxConnector = null; try { // Parse request json object and create key - value pair map object Map<String, String> properties = parsePropertiesJson(propertiesJson); Map<String, String> sourceConfigurations = new TreeMap<String, String>(); // Get MBean Server connection jmxConnector = getJMXConnector(host, port); MBeanServerConnection mBeanServerConnection = getMBeanServerConnection(jmxConnector); ObjectName objectName = new ObjectName(OBJECT_NAME); PropertyFileManagerMBean hostManagerProxy = (PropertyFileManagerMBean) MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, objectName, PropertyFileManagerMBean.class, false); // Update source list String sources = hostManagerProxy.getProperty(AGENT_SOURCES); StringTokenizer sourceTokens = new StringTokenizer(sources.trim(), " "); boolean isSourceAvailable = false; while(sourceTokens.hasMoreTokens()) { String token = sourceTokens.nextToken(); if(token.trim().equals(sourceName)) { isSourceAvailable = true; } } if(!isSourceAvailable) { sources = sources.trim() + " " + sourceName; sourceConfigurations.put(AGENT_SOURCES, sources); sourceConfigurations.putAll(prepareSourceConfigurations(sourceName, properties)); // Save attributes hostManagerProxy.setProperties(sourceConfigurations); LOG.info("Successfully created new source with configurations - \n " +sourceConfigurations); } } catch (MalformedObjectNameException e) { result = false; LOG.error("The format of the string does not correspond to a valid MBean ObjectName"+ e); } catch (ConfigurationException e) { result = false; LOG.error("Failed to save source configurations. "+ e); } finally { closeJMXConnector(jmxConnector); } return result; } /** * <p> Update flume agent source configurations which is running on remote host.</p> * * @param propertiesJson List of configurations to be modified in json object format. * @param host Host name or IP address where agent is running * @param port JMX port to connect remotely running agent * @param sourceName Source name for which configurations has to be updated. * @return Status of source component Update. TRUE if update * happened successfully otherwise FALSE. * @throws ConfigurationException */ public boolean updateSource(String propertiesJson, String host, String port, String sourceName) { LOG.info("Start updating existing source " + sourceName + " for agent running on " + host + ":" + port + ". Configuration input parameters are - " + propertiesJson); boolean result = true; // Validate input parameters if (propertiesJson == null || propertiesJson.isEmpty() || host == null || host.isEmpty() || port == null || port.isEmpty() || sourceName == null || sourceName.isEmpty()) return false; JMXConnector jmxConnector = null; try { // Parse request json object and create key - value pair map object Map<String, String> properties = parsePropertiesJson(propertiesJson); Map<String, String> sourceConfigurations = new TreeMap<String, String>(); jmxConnector = getJMXConnector(host, port); // Get MBean Server connection MBeanServerConnection mBeanServerConnection = getMBeanServerConnection(jmxConnector); ObjectName objectName = new ObjectName(OBJECT_NAME); PropertyFileManagerMBean hostManagerProxy = (PropertyFileManagerMBean) MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, objectName, PropertyFileManagerMBean.class, false); // Prepare AttributeList from properties map object sourceConfigurations.putAll(prepareInterceptorConf(properties, sourceName)); // Add source tail command String fileName = properties.get(LOG_FILE_NAME); String filePath = properties.get(LOG_FILE_PATH); if(fileName!=null && !fileName.isEmpty() && filePath!=null && !filePath.isEmpty()) sourceConfigurations.put(prepareSourceTailCmdKey(sourceName), prepareSourceTailCmdValue(filePath + fileName)); // Save attributes hostManagerProxy.setProperties(sourceConfigurations); LOG.info("Successfully updated existing source with configurations - \n " +sourceConfigurations); } catch (MalformedObjectNameException e) { result = false; LOG.error("The format of the string does not correspond to a valid MBean ObjectName"+ e); } catch (ConfigurationException e) { LOG.error("Failed to save source configurations. "+ e); } finally { closeJMXConnector(jmxConnector); } return result; } /** * <p> Delete flume agent source configurations which is running on remote host. </p> * * @param host Host name or IP address where agent is running * @param port port JMX port to connect remotely running agent * @param sourceName sourceName Source name for which configurations has to be deleted. * @return Status of source component deletion. TRUE if delete * happened successfully otherwise FALSE. */ public boolean deleteSource(String host, String port, String sourceName) { LOG.info("Start deleting source " + sourceName + " on agent running" + host + ":" + port + "."); boolean result = true; // Validate input parameters if (host == null || host.isEmpty() || port == null || port.isEmpty() || sourceName == null || sourceName.isEmpty()) return false; JMXConnector jmxConnector = getJMXConnector(host, port); // Get MBean Server connection MBeanServerConnection mBeanServerConnection = getMBeanServerConnection(jmxConnector); try { ObjectName objectName = new ObjectName(OBJECT_NAME); PropertyFileManagerMBean hostManagerProxy = (PropertyFileManagerMBean) MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, objectName, PropertyFileManagerMBean.class, false); // Update source list String sources = hostManagerProxy.getProperty(AGENT_SOURCES); StringTokenizer sourceTokens = new StringTokenizer(sources.trim(), " "); String updateSources = ""; while(sourceTokens.hasMoreTokens()) { String token = sourceTokens.nextToken().trim(); if(!token.equals(sourceName)) { updateSources = updateSources + " " +token; } } Map<String, String> sourceProps = new HashMap<String, String>(); sourceProps.putAll(hostManagerProxy.getPropertiesWithKeyPrefix(AGENT_SOURCES+"."+sourceName)); List<String> keys = new ArrayList<String>(sourceProps.keySet()); try { LOG.info("Deleting keys.. " + keys); hostManagerProxy.deleteProperties(keys); LOG.info("Update " + AGENT_SOURCES + " with value " + updateSources); hostManagerProxy.setProperty(AGENT_SOURCES, updateSources); LOG.info("Successfully delete source " + sourceName + " running on agent " + host); }catch (ConfigurationException e) { result = false; LOG.error("Problem in deleting source " + sourceName + e); // Roll back delete changes LOG.info("Rollback configurations with previous values.. "); sourceProps.put(AGENT_SOURCES, sources); try { hostManagerProxy.setProperties(sourceProps); } catch (ConfigurationException e1) { result = false; LOG.error("Failed to rollback source configurations while delete operations is not successful " + sourceName + e1); } } } catch (MalformedObjectNameException e) { LOG.error("The format of the string does not correspond to a valid MBean ObjectName"+ e); result = false; } finally { closeJMXConnector(jmxConnector); } return result; } /** * <p> * Read flume configuration file for agent running on remote location. This * will read the file and return content as string object. * </p> * * @param host Host name or IP address where agent is running * @param port port JMX port to connect remotely running agent * @return content of flume configuration file as {@link String} */ public String readFlumeConfigurations(String host, String port) { LOG.info("Start reading source configuration file from " + host + ":" + port + "."); JMXConnector jmxConnector = null; String fileContent = ""; try { jmxConnector = getJMXConnector(host, port); // Get MBean Server connection MBeanServerConnection mBeanServerConnection = getMBeanServerConnection(jmxConnector); ObjectName objectName = new ObjectName(OBJECT_NAME); PropertyFileManagerMBean hostManagerProxy = (PropertyFileManagerMBean) MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, objectName, PropertyFileManagerMBean.class, false); //read all properties fileContent = hostManagerProxy.readPropertyFile(); LOG.info("Successfully read configuration file"); } catch (MalformedObjectNameException e) { LOG.error("The format of the string does not correspond to a valid MBean ObjectName"+ e); } finally { closeJMXConnector(jmxConnector); } return fileContent; } /** * <p> * Re-write flume configuration file content as whole with input content. * </p> * * @param content Content of the configuration to replace. * @param host Host name or IP address where agent is running * @param port port JMX port to connect remotely running agent */ public boolean updateConfigurations(String content, String host, String port) { LOG.info("Start updating source configuration file from " + host + ":" + port + "."); JMXConnector jmxConnector = null; boolean result = true; try { jmxConnector = getJMXConnector(host, port); // Get MBean Server connection MBeanServerConnection mBeanServerConnection = getMBeanServerConnection(jmxConnector); if(mBeanServerConnection == null) result = false; else { ObjectName objectName = new ObjectName(OBJECT_NAME); PropertyFileManagerMBean hostManagerProxy = (PropertyFileManagerMBean) MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, objectName, PropertyFileManagerMBean.class, false); // Save Configuration file hostManagerProxy.writePropertyFile(content); LOG.info("Successfully updated configuration file"); } } catch (MalformedObjectNameException e) { LOG.error("The format of the string does not correspond to a valid MBean ObjectName"+ e); result = false; } catch (IOException e) { LOG.error("Failed to update config file content. "+ e); }finally { closeJMXConnector(jmxConnector); } return result; } private Map<String, String> prepareSourceConfigurations(String sourceName, Map<String, String> properties) { Map<String, String> sourceConfigurations = new HashMap<String, String>(); // Add command type sourceConfigurations.put(prepareSrcTypeKey(sourceName),SRC_TYPE_EXEC_VALUE); // Add batch size sourceConfigurations.put(prepareBatchKey(sourceName), SRC_BATCH_SIZE_VALUE); // Add Channel sourceConfigurations.put(prepareChannelKey(sourceName),CHANNELS_VALUE); // Add source tail command String fileName = properties.get(LOG_FILE_NAME); String filePath = properties.get(LOG_FILE_PATH); if(fileName!=null && !fileName.isEmpty() && filePath!=null && !filePath.isEmpty()) sourceConfigurations.put(prepareSourceTailCmdKey(sourceName), prepareSourceTailCmdValue(filePath + fileName)); // Add Intercepter sourceConfigurations.put(prepareInterceptorBuilderKey(sourceName), INTERCEPTOR_TYPE_VALUE); // Add Interceptors sourceConfigurations.put(prepareInterceptorsKey(sourceName), INTERCEPTORS_VALUE); // Prepare Intercepter configurations from properties map object sourceConfigurations.putAll(prepareInterceptorConf(properties, sourceName)); return sourceConfigurations; } @SuppressWarnings("unchecked") private Map<String, String> parsePropertiesJson(String propertiesJson) { JsonFactory factory = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(factory); Map<String, String> properties = new HashMap<String, String>(); TypeReference<HashMap<String, String>> typeRef = new TypeReference<HashMap<String, String>>() { }; try { properties.putAll((Map<String,String>)mapper.readValue(propertiesJson, typeRef)); System.out.println(properties); } catch (Exception e) { LOG.error("Input JSON object parsing logic failed "+ e); } return properties; } private String prepareInterceptorBuilderKey(String sourceName) { return INTERCEPTOR_TYPE_KEY.replaceFirst("\\.source\\.", "."+sourceName+"."); } private String prepareSrcTypeKey(String sourceName) { return SRC_TYPE_KEY.replaceFirst("\\.source\\.", "."+sourceName+"."); } private String prepareBatchKey(String sourceName) { return SRC_BATCH_SIZE_KEY.replaceFirst("\\.source\\.", "."+sourceName+"."); } private String prepareChannelKey(String sourceName) { return CHANNELS_KEY.replaceFirst("\\.source\\.", "."+sourceName+"."); } private String prepareInterceptorsKey(String sourceName) { return INTERCEPTORS_KEY.replaceFirst("\\.source\\.", "."+sourceName+"."); } private String prepareSourceTailCmdKey(String sourceName) { return SRC_CMD_KEY.replaceFirst("\\.source\\.", "."+sourceName+"."); } private String prepareSourceTailCmdValue(String filePath) { return SRC_CMD_TAIL_VALUE + filePath; } private Map<String,String> prepareInterceptorConf(Map<String,String> properties, String sourceName) { Map<String, String> sourceProperties = new TreeMap<String, String>(); for(String key : properties.keySet()) { String name = FlumeConfigurationConstants.properties.get(key); name = name.replaceFirst("\\.source\\.","."+sourceName+"."); sourceProperties.put(name, properties.get(key)); } return sourceProperties; } }
7. There are some constants used here.
import java.util.HashMap; import java.util.Map; public final class FlumeConfigurationConstants { public static final String APP_ID = "appId"; public static final String ENV = "env"; public static final String HOST_NAME= "hostName"; public static final String LOG_FILE_NAME = "logFileName"; public static final String LOG_FILE_PATH = "logFilePath"; public static final String LOG_TYPE = "logType"; public static final String AGENT_SOURCES = "agent.sources"; public static final String INTERCEPTOR_TYPE_KEY = "agent.sources.source.interceptors.logIntercept.type"; public static final String INTERCEPTOR_TYPE_VALUE = "LogInterceptor$Builder"; public static final String INTERCEPTORS_KEY = "agent.sources.source.interceptors"; public static final String INTERCEPTORS_VALUE = "logIntercept"; public static final String CHANNELS_KEY = "agent.sources.source.channels"; public static final String CHANNELS_VALUE = "fileChannel"; public static final String SRC_CMD_KEY = "agent.sources.source.command"; public static final String SRC_CMD_TAIL_VALUE = "tail -F "; public static final String SRC_TYPE_KEY = "agent.sources.source.type"; public static final String SRC_TYPE_EXEC_VALUE = "exec"; public static final String SRC_BATCH_SIZE_KEY = "agent.sources.source.batchSize"; public static final String SRC_BATCH_SIZE_VALUE = "1"; public static final Map<String,String> properties = new HashMap<String, String>(); static { properties.put(APP_ID, "agent.sources.source.interceptors.logIntercept.appId"); properties.put(ENV, "agent.sources.source.interceptors.logIntercept.env"); properties.put(HOST_NAME, "agent.sources.source.interceptors.logIntercept.hostName"); properties.put(LOG_FILE_NAME, "agent.sources.source.interceptors.logIntercept.logFileName"); properties.put(LOG_FILE_PATH, "agent.sources.source.interceptors.logIntercept.logFilePath"); properties.put(LOG_TYPE, "agent.sources.source.interceptors.logIntercept.logType"); } }
相关推荐
在本实战中,我们将深入探讨如何利用JMX对HBase和Hadoop进行监控,以确保系统的稳定性和性能。 首先,我们需要理解HBase。HBase是一个基于Google的Bigtable设计的开源NoSQL数据库,它运行在Hadoop之上,提供高度可...
在大数据处理领域,Flume、HBase、Sqoop和ZooKeeper是四个非常重要的组件,它们各自承担着不同的角色,并且在很多情况下需要协同工作。这个整合包包含了这些组件的Linux版本,为大数据处理提供了完整的解决方案。 ...
在大数据处理领域,Flume、HBase和Hive是三个重要的组件,它们分别负责数据采集、存储和查询分析。这个“19:Flume+HBase+Hive集成大数据项目离线分析”的压缩包文件提供了关于如何将这三个工具集成到一起进行离线...
在大数据技术领域,Flume和HBase是两个关键的组件,它们在日志数据的采集、传输和存储中发挥着重要作用。本篇文章将详细介绍这两个工具的使用及其在实际应用中的结合。 首先,让我们来理解Flume。Apache Flume是一...
Flume和Hbase集成的sink包,修改这个包里的源码可以成功客制化Flume往Hbase中写数据的格式。
Kafka+FlumeNG+Storm+HBase实时处理系统介绍
尚硅谷 大数据 hive oozie sqoop kalfa flume zk hbase Hadoop
在大数据处理领域,Flume、Kafka和HBase是三个重要的组件,它们分别扮演着数据收集、消息中间件和分布式存储的角色。本章节我们将深入探讨这三个工具如何集成以实现高效的数据流动与处理。 Flume是Apache的一个开源...
### Kafka+FlumeNG+Storm+HBase 架构设计详解 #### 一、概述 在当前的大数据处理场景下,构建一个既能处理历史数据又能实时处理新增数据的架构至关重要。本文将详细介绍如何利用Kafka、FlumeNG、Storm与HBase搭建...
Flume可以和许多的系统进行整合,包括了Hadoop、Spark、Kafka、Hbase等等;当然,强悍的Flume也是可以和Mysql进行整合,将分析好的日志存储到Mysql(当然,你也可以存放到pg、oracle等等关系型数据库)。
### Kafka+Flume-ng搭建详解 #### 一、概述 Kafka+Flume-ng是一种高效的数据传输方案,常被用于大数据处理系统中。Kafka作为一款高性能的消息队列中间件,能够提供高吞吐量的消息发布订阅服务。而Flume-ng则是一款...
5. **监控与管理**:提供了丰富的监控指标和工具,如JMX接口,便于实时监控Flume运行状态,及时发现和解决问题。 6. **安全性**:CDH 5.5.2集成了Hadoop的安全特性,Flume NG也支持Kerberos认证,增强了数据传输的...
自己整理的Hadoop环境的一些安装,和一些简单的使用,其中包括Hadoop、hbase、hive、mysql、zookeeper、Kafka、flume。都是一些简单的安装步骤和使用,只在自己的虚拟机(Linux centOS7)上使用测试过。按照步骤一步...
在大数据处理领域,Flume、Kafka和HBase是三个重要的组件,它们分别扮演着数据收集、消息中间件和分布式存储的角色。在这个“Flume+Kafka+HBase实例”中,我们将深入探讨如何在电信客服项目中整合这三个工具,以实现...
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合...
Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其传输到目标系统中,如 HDFS、HBase 等。在本文中,我们将介绍如何在 Windows ...
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...
标题中的"flumeng for streaming spark"涉及到两个关键的开源技术:Flume和Spark Streaming。Flume是Apache软件基金会的一个项目,主要用于收集、聚合和移动大量日志数据,而Spark Streaming是Apache Spark的一个...
在描述中提到的问题是关于 Flume 不支持 HBase 2.0 的情况,这可能是由于 Flume 的内置组件或其依赖库与 HBase 2.0 版本不兼容导致的。 HBase 是一个基于 Google 的 Bigtable 论文设计的开源 NoSQL 数据库,属于 ...