本文介绍分布式压测系统ngrinder及ngrinder工作原理 ngrinder源码解析
install & run
- dowload ngrinder-controller-3.4.1.war
- deploy ngrinder-controller-3.4.1.war
- dowload agent http://{controller_host_name}/agent/download
- run agent ./run_agent.sh
- create script http://localhost:8080/script/
- create test http://localhost:8080/perftest/new
- run test
run by docker
./install_maven_lib.sh
mvn clean
mvn compile
mvn package -Dmaven.test.skip=true
./docker/prepare.sh
使用docker-compose 运行 docker/docker-compose.yml
refrence
- https://github.com/naver/ngrinder/wiki/Installation-Guide
- https://github.com/naver/ngrinder/wiki/Agent-Configuration-Guide
- https://testerhome.com/topics/4225
- http://blog.csdn.net/zhiaini06/article/details/45287663
core code
controller
PrefTestRunnable.java
/**
* Run the given test.
* <p/>
* If fails, it marks STOP_BY_ERROR in the given {@link PerfTest} status
*
* @param perfTest perftest instance;
*/
public void doTest(final PerfTest perfTest) {
SingleConsole singleConsole = null;
try {
singleConsole = startConsole(perfTest);
// prepare distribute groovy maven projet build
ScriptHandler prepareDistribution = perfTestService.prepareDistribution(perfTest);
// generate the properties for agent include runtime properties
GrinderProperties grinderProperties = perfTestService.getGrinderProperties(perfTest, prepareDistribution);
// agent start with properties
startAgentsOn(perfTest, grinderProperties, checkCancellation(singleConsole));
// distribute files to agents
distributeFileOn(perfTest, checkCancellation(singleConsole));
// report path
singleConsole.setReportPath(perfTestService.getReportFileDirectory(perfTest));
// start test with properties
runTestOn(perfTest, grinderProperties, checkCancellation(singleConsole));
} catch (SingleConsoleCancellationException ex) {
// In case of error, mark the occurs error on perftest.
doCancel(perfTest, singleConsole);
notifyFinish(perfTest, StopReason.CANCEL_BY_USER);
} catch (Exception e) {
// In case of error, mark the occurs error on perftest.
LOG.error("Error while executing test: {} - {} ", perfTest.getTestIdentifier(), e.getMessage());
LOG.debug("Stack Trace is : ", e);
doTerminate(perfTest, singleConsole);
notifyFinish(perfTest, StopReason.ERROR_WHILE_PREPARE);
}
}
PrefTestService.java
/**
* Prepare files for distribution. This method stores the files on the path
* ${NGRINDER_HOME}/perftest/{test_id}/dist folder.
*
* @param perfTest perfTest
* @return File location in which the perftest script and resources are distributed.
*/
public ScriptHandler prepareDistribution(PerfTest perfTest) {
File perfTestDistDirectory = getDistributionPath(perfTest);
User user = perfTest.getCreatedUser();
FileEntry scriptEntry = checkNotNull(fileEntryService.getOne(user, checkNotEmpty(perfTest.getScriptName(), "perfTest should have script name"), getSafe(perfTest.getScriptRevision())), "script should exist");
// Get all files in the script path
ScriptHandler handler = scriptHandlerFactory.getHandler(scriptEntry);
ProcessingResultPrintStream processingResult = new ProcessingResultPrintStream(new ByteArrayOutputStream());
// buildmaven project maven clean complie package
handler.prepareDist(perfTest.getId(), user, scriptEntry, perfTestDistDirectory, config.getControllerProperties(), processingResult);
// zip dist directory
try {
CompressionUtils.zip(perfTestDistDirectory);
} catch (IOException e) {
throw processException("Error while file zip perfTestDistDirectory.");
}
LOGGER.info("File write is completed in {}", perfTestDistDirectory);
if (!processingResult.isSuccess()) {
File logDir = new File(getLogFileDirectory(perfTest), "distribution_log.txt");
try {
FileUtils.writeByteArrayToFile(logDir, processingResult.getLogByteArray());
} catch (IOException e) {
noOp();
}
throw processException("Error while file distribution is prepared.");
}
return handler;
}
PerfTestService.java line:560
/**
* Create {@link GrinderProperties} based on the passed {@link PerfTest}.
*
* @param perfTest base data
* @param scriptHandler scriptHandler
* @return created {@link GrinderProperties} instance
*/
public GrinderProperties getGrinderProperties(PerfTest perfTest, ScriptHandler scriptHandler) {
try {
// Use default properties first
GrinderProperties grinderProperties = new GrinderProperties(config.getHome().getDefaultGrinderProperties());
User user = perfTest.getCreatedUser();
// Get all files in the script path
String scriptName = perfTest.getScriptName();
FileEntry userDefinedGrinderProperties = fileEntryService.getOne(user, FilenameUtils.concat(FilenameUtils.getPath(scriptName), DEFAULT_GRINDER_PROPERTIES), -1L);
if (!config.isSecurityEnabled() && userDefinedGrinderProperties != null) {
// Make the property overridden by user property.
GrinderProperties userProperties = new GrinderProperties();
userProperties.load(new StringReader(userDefinedGrinderProperties.getContent()));
grinderProperties.putAll(userProperties);
}
grinderProperties.setAssociatedFile(new File(DEFAULT_GRINDER_PROPERTIES));
grinderProperties.setProperty(GRINDER_PROP_SCRIPT, scriptHandler.getScriptExecutePath(scriptName));
grinderProperties.setProperty(GRINDER_PROP_TEST_ID, "test_" + perfTest.getId());
grinderProperties.setInt(GRINDER_PROP_AGENTS, getSafe(perfTest.getAgentCount()));
grinderProperties.setInt(GRINDER_PROP_PROCESSES, getSafe(perfTest.getProcesses()));
grinderProperties.setInt(GRINDER_PROP_THREAD, getSafe(perfTest.getThreads()));
if (perfTest.isThresholdDuration()) {
grinderProperties.setLong(GRINDER_PROP_DURATION, getSafe(perfTest.getDuration()));
grinderProperties.setInt(GRINDER_PROP_RUNS, 0);
} else {
grinderProperties.setInt(GRINDER_PROP_RUNS, getSafe(perfTest.getRunCount()));
if (grinderProperties.containsKey(GRINDER_PROP_DURATION)) {
grinderProperties.remove(GRINDER_PROP_DURATION);
}
}
grinderProperties.setProperty(GRINDER_PROP_ETC_HOSTS, StringUtils.defaultIfBlank(perfTest.getTargetHosts(), ""));
grinderProperties.setBoolean(GRINDER_PROP_USE_CONSOLE, true);
if (BooleanUtils.isTrue(perfTest.getUseRampUp())) {
grinderProperties.setBoolean(GRINDER_PROP_THREAD_RAMPUP, perfTest.getRampUpType() == RampUp.THREAD);
grinderProperties.setInt(GRINDER_PROP_PROCESS_INCREMENT, getSafe(perfTest.getRampUpStep()));
grinderProperties.setInt(GRINDER_PROP_PROCESS_INCREMENT_INTERVAL, getSafe(perfTest.getRampUpIncrementInterval()));
if (perfTest.getRampUpType() == RampUp.PROCESS) {
grinderProperties.setInt(GRINDER_PROP_INITIAL_SLEEP_TIME, getSafe(perfTest.getRampUpInitSleepTime()));
} else {
grinderProperties.setInt(GRINDER_PROP_INITIAL_THREAD_SLEEP_TIME, getSafe(perfTest.getRampUpInitSleepTime()));
}
grinderProperties.setInt(GRINDER_PROP_INITIAL_PROCESS, getSafe(perfTest.getRampUpInitCount()));
} else {
grinderProperties.setInt(GRINDER_PROP_PROCESS_INCREMENT, 0);
}
grinderProperties.setInt(GRINDER_PROP_REPORT_TO_CONSOLE, 500);
grinderProperties.setProperty(GRINDER_PROP_USER, perfTest.getCreatedUser().getUserId());
// set grinder.jvm.classpath for agent run
grinderProperties.setProperty(GRINDER_PROP_JVM_CLASSPATH, getCustomClassPath(perfTest));
grinderProperties.setInt(GRINDER_PROP_IGNORE_SAMPLE_COUNT, getSafe(perfTest.getIgnoreSampleCount()));
grinderProperties.setBoolean(GRINDER_PROP_SECURITY, config.isSecurityEnabled());
// For backward agent compatibility.
// If the security is not enabled, pass it as jvm argument.
// If enabled, pass it to grinder.param. In this case, I drop the
// compatibility.
if (StringUtils.isNotBlank(perfTest.getParam())) {
String param = perfTest.getParam().replace("'", "\\'").replace(" ", "");
if (config.isSecurityEnabled()) {
grinderProperties.setProperty(GRINDER_PROP_PARAM, StringUtils.trimToEmpty(param));
} else {
String property = grinderProperties.getProperty(GRINDER_PROP_JVM_ARGUMENTS, "");
property = property + " -Dparam=" + param + " ";
// set grinder.jvm.arguments for agent run
grinderProperties.setProperty(GRINDER_PROP_JVM_ARGUMENTS, property);
}
}
LOGGER.info("Grinder Properties : {} ", grinderProperties);
return grinderProperties;
} catch (Exception e) {
throw processException("error while prepare grinder property for " + perfTest.getTestName(), e);
}
}
GroovyMavenProjectScriptHandler.java
@Override
protected void prepareDistMore(Long testId, User user, FileEntry script, File distDir, PropertiesWrapper properties, ProcessingResultPrintStream processingResult) {
String pomPathInSVN = PathUtils.join(getBasePath(script), "pom.xml");
MavenCli cli = new MavenCli();
processingResult.println("\nCopy dependencies by running 'mvn dependency:copy-dependencies"+ " -DoutputDirectory=./lib -DexcludeScope=provided'");
// excute maven commond
int result = cli.doMain(new String[]{ // goal specification
"dependency:copy-dependencies", // run dependency goal
"-DoutputDirectory=./lib", // to the lib folder
"-DexcludeScope=provided" // but exclude the provided
// library
}, distDir.getAbsolutePath(), processingResult, processingResult);
boolean success = (result == 0);
if (success) {
processingResult.printf("\nDependencies in %s was copied.\n", pomPathInSVN);
LOGGER.info("Dependencies in {} is copied into {}/lib folder", pomPathInSVN, distDir.getAbsolutePath());
} else {
processingResult.printf("\nDependencies copy in %s is failed.\n", pomPathInSVN);
LOGGER.info("Dependencies copy in {} is failed.", pomPathInSVN);
}
// Then it's not necessary to include pom.xml anymore.
FileUtils.deleteQuietly(new File(distDir, "pom.xml"));
processingResult.setSuccess(result == 0);
}
PrefTestRunnable.java
/**
* Distribute files to agents.
*
* @param perfTest perftest
* @param singleConsole console to be used.
*/
void distributeFileOn(final PerfTest perfTest, SingleConsole singleConsole) {
// Distribute files
perfTestService.markStatusAndProgress(perfTest, DISTRIBUTE_FILES, "All necessary files are being distributed.");
ListenerSupport<SingleConsole.FileDistributionListener> listener = ListenerHelper.create();
final long safeThreadHold = getSafeTransmissionThreshold();
listener.add(new SingleConsole.FileDistributionListener() {
@Override
public void distributed(String fileName) {
perfTestService.markProgress(perfTest, " - " + fileName);
}
@Override
public boolean start(File dir, boolean safe) {
if (safe) {
perfTestService.markProgress(perfTest, "Safe file distribution mode is enabled.");
return safe;
}
long sizeOfDirectory = FileUtils.sizeOfDirectory(dir);
if (sizeOfDirectory > safeThreadHold) {
perfTestService.markProgress(perfTest, "The total size of distributed files is over "+ UnitUtils.byteCountToDisplaySize(safeThreadHold) + "B.\n- Safe file distribution mode is enabled by force.");
return true;
}
return safe;
}
});
// the files have prepared before
// start distribute files
singleConsole.distributeFiles(perfTestService.getDistributionPath(perfTest), listener, isSafeDistPerfTest(perfTest));
perfTestService.markStatusAndProgress(perfTest, DISTRIBUTE_FILES_FINISHED, "All necessary files are distributed.");
}
SingleConsole.java
/**
* Distribute files on agents.
*
* @param listener listener
* @param safe safe mode
*/
public void distributeFiles(ListenerSupport<FileDistributionListener> listener, final boolean safe) {
final FileDistribution fileDistribution = getConsoleComponent(FileDistribution.class);
final AgentCacheState agentCacheState = fileDistribution.getAgentCacheState();
final Condition cacheStateCondition = new Condition();
agentCacheState.addListener(new PropertyChangeListener() {
public void propertyChange(PropertyChangeEvent ignored) {
synchronized (cacheStateCondition) {
cacheStateCondition.notifyAll();
}
}
});
final MutableBoolean safeDist = new MutableBoolean(safe);
ConsoleProperties consoleComponent = getConsoleComponent(ConsoleProperties.class);
final File file = consoleComponent.getDistributionDirectory().getFile();
if (listener != null) {
listener.apply(new Informer<FileDistributionListener>() {
@Override
public void inform(FileDistributionListener listener) {
safeDist.setValue(listener.start(file, safe));
}
});
}
final FileDistributionHandler distributionHandler = fileDistribution.getHandler();
// When cancel is called.. stop processing.
int fileCount = 0;
while (!cancel) {
try {
// distribute files in directory
final FileDistributionHandler.Result result = distributionHandler.sendNextFile();
fileCount++;
if (result == null) {
break;
}
if (listener != null) {
listener.apply(new Informer<FileDistributionListener>() {
@Override
public void inform(FileDistributionListener listener) {
listener.distributed(result.getFileName());
}
});
}
if (safeDist.isTrue()) {
// The cache status is updated asynchronously by agent
// reports. If the listener is registered, this waits for up
// to five seconds for
// all agents to indicate that they are up to date.
checkSafetyWithCacheState(fileDistribution, cacheStateCondition, 1);
}
} catch (FileContents.FileContentsException e) {
throw processException("Error while distribute files for " + getConsolePort());
}
}
if (safeDist.isFalse()) {
ThreadUtils.sleep(1000);
checkSafetyWithCacheState(fileDistribution, cacheStateCondition, fileCount);
}
}
net/sf/grinder/grinder-core/3.9.1/grinder-core-3.9.1.jar!/net/grinder/console/communication/DistributionControlImplementation.class
public Result sendNextFile() throws FileContents.FileContentsException {
try {
if (m_fileIndex < m_files.length) {
if (m_fileIndex == 0) {
// Clear any cache that has out of date cache parameters.
// We currently we do nothing about cached copies of deleted files.
final Address addressAgentsWithInvalidCaches = m_agents.getAddressOfOutOfDateAgents(0);
m_distributionControl.clearFileCaches(addressAgentsWithInvalidCaches);
}
try {
final int index = m_fileIndex;
final File file = m_files[index];
final Address addressAgentsWithoutFile = m_agents.getAddressOfOutOfDateAgents(new File(m_directory, file.getPath()).lastModified());
// send one file to address of one agent
m_distributionControl.sendFile(addressAgentsWithoutFile, new FileContents(m_directory, file));
return new Result() {
public int getProgressInCents() {
return ((index + 1) * 100) / m_files.length;
}
public String getFileName() {
return file.getPath();
}
};
} finally {
++m_fileIndex;
}
} else {
m_distributionControl.setHighWaterMark(
m_agents.getAddressOfAllAgents(),
m_cacheParameters.createHighWaterMark(m_latestFileTime));
return null;
}
} catch (OutOfDateException e) {
return null;
}
}
net/sf/grinder/grinder-core/3.9.1/grinder-core-3.9.1.jar!/net/grinder/console/communication/DistributionControlImplementation.class
/**
* Send a file to the agents matching the given address.
*
* @param address
* The address of the agents.
* @param fileContents The file contents.
*/
public void sendFile(Address address, FileContents fileContents) {
m_consoleCommunication.sendToAddressedAgents(address, new DistributeFileMessage(fileContents));
}
agent
/**
* Run the Grinder agent process.
*
* @param grinderProperties {@link GrinderProperties} which contains grinder agent base configuration.
* @throws GrinderException If an error occurs.
*/
public void run(GrinderProperties grinderProperties) throws GrinderException {
StartGrinderMessage startMessage = null;
ConsoleCommunication consoleCommunication = null;
m_fanOutStreamSender = new FanOutStreamSender(GrinderConstants.AGENT_FANOUT_STREAM_THREAD_COUNT);
m_timer = new Timer(false);
try {
while (true) {
m_logger.info(GrinderBuild.getName());
ScriptLocation script = null;
GrinderProperties properties;
do {
properties = createAndMergeProperties(grinderProperties, startMessage != null ? startMessage.getProperties() : null);
properties.setProperty(GrinderProperties.CONSOLE_HOST, m_agentConfig.getControllerIP());
m_agentIdentity.setName(m_agentConfig.getAgentHostID());
final Connector connector = m_connectorFactory.create(properties);
// We only reconnect if the connection details have changed.
if (consoleCommunication != null && !consoleCommunication.getConnector().equals(connector)) {
shutdownConsoleCommunication(consoleCommunication);
consoleCommunication = null;
// Accept any startMessage from previous console - see
// bug 2092881.
}
if (consoleCommunication == null && connector != null) {
try {
consoleCommunication = new ConsoleCommunication(connector, grinderProperties.getProperty("grinder.user", "_default"));
consoleCommunication.start();
m_logger.info("Connect to console at {}", connector.getEndpointAsString());
} catch (CommunicationException e) {
if (m_proceedWithoutConsole) {
m_logger.warn("{}, proceeding without the console; set "
+ "grinder.useConsole=false to disable this warning.", e.getMessage());
} else {
m_logger.error(e.getMessage());
return;
}
}
}
if (consoleCommunication != null && startMessage == null) {
m_logger.info("Waiting for console signal");
m_consoleListener.waitForMessage();
if (m_consoleListener.received(ConsoleListener.START)) {
startMessage = m_consoleListener.getLastStartGrinderMessage();
continue; // Loop to handle new properties.
} else {
break; // Another message, check at end of outer while loop.
}
}
if (startMessage != null) {
final GrinderProperties messageProperties = startMessage.getProperties();
final Directory fileStoreDirectory = m_fileStore.getDirectory();
// Convert relative path to absolute path.
messageProperties.setAssociatedFile(fileStoreDirectory.getFile(messageProperties .getAssociatedFile()));
final File consoleScript = messageProperties.resolveRelativeFile(messageProperties.getFile(GrinderProperties.SCRIPT, GrinderProperties.DEFAULT_SCRIPT));
// We only fall back to the agent properties if the start message
// doesn't specify a script and there is no default script.
if (messageProperties.containsKey(GrinderProperties.SCRIPT) || consoleScript.canRead()) {
// The script directory may not be the file's direct parent.
script = new ScriptLocation(fileStoreDirectory, consoleScript);
}
m_agentIdentity.setNumber(startMessage.getAgentNumber());
} else {
m_agentIdentity.setNumber(-1);
}
if (script == null) {
final File scriptFile = properties.resolveRelativeFile(properties.getFile(GrinderProperties.SCRIPT, GrinderProperties.DEFAULT_SCRIPT));
script = new ScriptLocation(scriptFile);
}
m_logger.debug("The script location is {}", script.getFile().getAbsolutePath());
if (!script.getFile().canRead()) {
m_logger.error("The script file '{}' does not exist or is not readable.", script);
script = null;
break;
}
} while (script == null);
if (script != null) {
// Set up log directory.
if (!properties.containsKey(GrinderProperties.LOG_DIRECTORY)) {
properties.setFile(GrinderProperties.LOG_DIRECTORY, new File(m_agentConfig.getHome() .getLogDirectory(), properties.getProperty(GRINDER_PROP_TEST_ID, "default")));
}
File logFile = new File(properties.getFile(GrinderProperties.LOG_DIRECTORY, new File(".")), m_agentIdentity.getName() + "-" + m_agentIdentity.getNumber() + ".log");
m_logger.info("log file : {}", logFile);
AbstractLanguageHandler handler = Lang.getByFileName(script.getFile()).getHandler();
final WorkerFactory workerFactory;
Properties rebasedSystemProperty = rebaseSystemClassPath(System.getProperties(), m_agentConfig.getCurrentDirectory());
String jvmArguments = buildTestRunProperties(script, handler, rebasedSystemProperty, properties);
if (!properties.getBoolean("grinder.debug.singleprocess", false)) {
// Fix to provide empty system classpath to speed up
final WorkerProcessCommandLine workerCommandLine = new WorkerProcessCommandLine(properties, filterSystemClassPath(rebasedSystemProperty, handler, m_logger), jvmArguments, script.getDirectory());
m_logger.info("Worker process command line: {}", workerCommandLine);
FileUtils.writeStringToFile(logFile, workerCommandLine.toString() + "\n\n");
workerFactory = new ProcessWorkerFactory(workerCommandLine, m_agentIdentity, m_fanOutStreamSender, consoleCommunication != null, script, properties);
} else {
m_logger.info("DEBUG MODE. Spawning threads rather than processes");
m_logger.warn("grinder.jvm.arguments ({}) ignored in single process mode", jvmArguments);
workerFactory = new DebugThreadWorkerFactory(m_agentIdentity, m_fanOutStreamSender, consoleCommunication != null, script, properties);
}
m_logger.debug("Worker launcher is prepared.");
final WorkerLauncher workerLauncher = new WorkerLauncher(properties.getInt("grinder.processes", 1), workerFactory, m_eventSynchronisation, m_logger);
m_workerLauncherForShutdown = workerLauncher;
final boolean threadRampUp = properties.getBoolean("grinder.threadRampUp", false);
final int increment = properties.getInt("grinder.processIncrement", 0);
if (!threadRampUp) {
m_logger.debug("'Ramp Up' mode by {}.", increment);
}
if (!threadRampUp && increment > 0) {
final boolean moreProcessesToStart = workerLauncher.startSomeWorkers(properties.getInt("grinder.initialProcesses", increment));
if (moreProcessesToStart) {
final int incrementInterval = properties.getInt("grinder.processIncrementInterval", 60000);
final RampUpTimerTask rampUpTimerTask = new RampUpTimerTask(workerLauncher, increment);
m_timer.scheduleAtFixedRate(rampUpTimerTask, incrementInterval, incrementInterval);
}
} else {
m_logger.debug("start all workers");
workerLauncher.startAllWorkers();
}
// Wait for a termination event.
synchronized (m_eventSynchronisation) {
final long maximumShutdownTime = 5000;
long consoleSignalTime = -1;
while (!workerLauncher.allFinished()) {
m_logger.debug("Waiting until all workers are finished");
if (consoleSignalTime == -1 && m_consoleListener.checkForMessage(ConsoleListener.ANY ^ ConsoleListener.START)) {
m_logger.info("Don't start anymore by message from controller.");
workerLauncher.dontStartAnyMore();
consoleSignalTime = System.currentTimeMillis();
}
if (consoleSignalTime >= 0 && System.currentTimeMillis() - consoleSignalTime > maximumShutdownTime) {
m_logger.info("Terminating unresponsive processes by force");
// destroyAllWorkers() prevents further workers
// from starting.
workerLauncher.destroyAllWorkers();
}
m_eventSynchronisation.waitNoInterrruptException(maximumShutdownTime);
}
m_logger.info("All workers are finished");
}
m_logger.debug("Normal shutdown");
workerLauncher.shutdown();
break;
}
if (consoleCommunication == null) {
m_logger.debug("Console communication death");
break;
} else {
// Ignore any pending start messages.
m_consoleListener.discardMessages(ConsoleListener.START);
if (!m_consoleListener.received(ConsoleListener.ANY)) {
// We've got here naturally, without a console signal.
m_logger.debug("Test is finished, wait for console signal");
m_consoleListener.waitForMessage();
}
if (m_consoleListener.received(ConsoleListener.START)) {
startMessage = m_consoleListener.getLastStartGrinderMessage();
} else if (m_consoleListener.received(ConsoleListener.STOP | ConsoleListener.SHUTDOWN)) {
m_logger.debug("Got shutdown message");
break;
} else {
m_logger.debug("Natural death");
// ConsoleListener.RESET or natural death.
startMessage = null;
}
}
}
} catch (Exception e) {
m_logger.error("Exception occurred in the agent message loop", e);
} finally {
if (m_timer != null) {
m_timer.cancel();
m_timer = null;
}
shutdownConsoleCommunication(consoleCommunication);
if (m_fanOutStreamSender != null) {
m_fanOutStreamSender.shutdown();
m_fanOutStreamSender = null;
}
m_consoleListener.shutdown();
m_logger.info("Test shuts down.");
}
}
private final class ConsoleCommunication {
private final ClientSender m_sender;
private final Connector m_connector;
private final TimerTask m_reportRunningTask;
private final MessagePump m_messagePump;
public ConsoleCommunication(Connector connector, String user) throws CommunicationException, FileStore.FileStoreException {
final ClientReceiver receiver = ClientReceiver.connect(connector, new AgentAddress(m_agentIdentity));
m_sender = ClientSender.connect(receiver);
m_connector = connector;
if (m_fileStore == null) {
// Only create the file store if we connected.
File base = m_agentConfig.getHome().getDirectory();
File directory = new File(new File(base, "file-store"), user);
// create FileStore
m_fileStore = new FileStore(directory, m_logger);
}
m_sender.send(new AgentProcessReportMessage(ProcessReport.STATE_STARTED, m_fileStore .getCacheHighWaterMark()));
final MessageDispatchSender fileStoreMessageDispatcher = new MessageDispatchSender();
m_fileStore.registerMessageHandlers(fileStoreMessageDispatcher);
final MessageDispatchSender messageDispatcher = new MessageDispatchSender();
m_consoleListener.registerMessageHandlers(messageDispatcher);
// Everything that the file store doesn't handle is tee'd to the
// worker processes and our message handlers.
fileStoreMessageDispatcher.addFallback(new TeeSender(messageDispatcher, new IgnoreShutdownSender(m_fanOutStreamSender)));
m_messagePump = new MessagePump(receiver, fileStoreMessageDispatcher, 1);
m_reportRunningTask = new TimerTask() {
public void run() {
try {
m_sender.send(new AgentProcessReportMessage(ProcessReport.STATE_RUNNING, m_fileStore .getCacheHighWaterMark()));
} catch (CommunicationException e) {
cancel();
m_logger.error("Error while pumping up the AgentProcessReportMessage", e.getMessage());
m_logger.debug("The error detail is ", e);
}
}
};
}
public void start() {
m_messagePump.start();
m_timer.schedule(m_reportRunningTask, GrinderConstants.AGENT_HEARTBEAT_DELAY, GrinderConstants.AGENT_HEARTBEAT_INTERVAL);
}
public Connector getConnector() {
return m_connector;
}
public void shutdown() {
m_reportRunningTask.cancel();
try {
m_sender.send(new AgentProcessReportMessage(ProcessReport.STATE_FINISHED, m_fileStore .getCacheHighWaterMark()));
m_logger.debug("Shut down message was sent");
} catch (CommunicationException e) {
NoOp.noOp();
} finally {
m_messagePump.shutdown();
}
}
}
/**
* Registers message handlers with a dispatcher.
*
* @param messageDispatcher The dispatcher.
*/
public void registerMessageHandlers(
MessageDispatchRegistry messageDispatcher) {
messageDispatcher.set(ClearCacheMessage.class, new AbstractHandler<ClearCacheMessage>() {
public void handle(ClearCacheMessage message) throws CommunicationException {
m_logger.info("Clearing file store");
try {
synchronized (m_incomingDirectory) {
// clean
m_incomingDirectory.deleteContents();
m_incremental = false;
}
} catch (Directory.DirectoryException e) {
m_logger.error(e.getMessage());
throw new CommunicationException(e.getMessage(), e);
}
}
});
messageDispatcher.set(DistributeFileMessage.class, new AbstractHandler<DistributeFileMessage>() {
public void handle(DistributeFileMessage message) throws CommunicationException {
try {
synchronized (m_incomingDirectory) {
m_incomingDirectory.create();
createReadmeFile();
final FileContents fileContents = message.getFileContents();
m_logger.info("Updating file store: {}", fileContents);
// get distrbute file
fileContents.create(m_incomingDirectory);
}
} catch (FileContents.FileContentsException e) {
m_logger.error(e.getMessage());
throw new CommunicationException(e.getMessage(), e);
} catch (Directory.DirectoryException e) {
m_logger.error(e.getMessage());
throw new CommunicationException(e.getMessage(), e);
}
}
});
messageDispatcher.set(DistributionCacheCheckpointMessage.class, new AbstractHandler<DistributionCacheCheckpointMessage>() {
public void handle(DistributionCacheCheckpointMessage message) {
m_cacheHighWaterMark = message.getCacheHighWaterMark();
}
});
}
修改jar中class
caohm@caohm-ThinkPad-E450:~/.m2/repository/net/sf/grinder/grinder-core/3.9.1$ unzip -l grinder-core-3.9.1.1.jar | grep "FileContents.class" | awk '{printf $4}' | xargs -I {} zip -d grinder-core-3.9.1.1.jar {}
caohm@caohm-ThinkPad-E450:~/.m2/repository/net/sf/grinder/grinder-core/3.9.1$ unzip -l grinder-core-3.9.1.1.jar | grep "FileDistributionHandlerImplementation.class" | awk '{printf $4}' | xargs -I {} zip -d grinder-core-3.9.1.1.jar {}
caohm@caohm-ThinkPad-E450:~/.m2/repository/net/sf/grinder/grinder-core/3.9.1$ unzip -l grinder-core-3.9.1.1.jar | grep "FileStore.class" | awk '{printf $4}' | xargs -I {} zip -d grinder-core-3.9.1.1.jar {}
caohm@caohm-ThinkPad-E450:~/.m2/repository/net/sf/grinder/grinder-core/3.9.1$ cp grinder-core-3.9.1.1.jar ~/github/caohm/ngrinder/lib/
cluster mode
######################################################################################
# clustering configuration.
# This is not the option applied on the fly. You need to reboot to apply this.
######################################################################################
# These should be very carefully set.
# You can refer http://www.cubrid.org/wiki_ngrinder/entry/controller-clustering-guide
# if you want to enable controller clustering. please enable below.
#cluster.enabled=false
# comma separated IP list of all clustered controller servers.
#cluster.members=192.168.1.1;192.168.2.2;192.168.3.3
# cluster communication port. This port should be same across the controllers if advanced cluster mode is enabled.
#cluster.port=40003
# Followings are options which should be set in ${NGRINDER_EX_HOME}!!
#
# Region setting for the current controller.
# When running cluster mode, the ${NGRINDER_HOME} should be shared via NFS by multiple controllers.
# Which means the controllers share same system.conf file and have same properties.
# However each controllers should looks different region info. To make it possible
# The following options should be defined in ${NGRINDER_EX_HOME}(by default it's .ngrinder_ex/system-ex.conf
# which is not shared via NFS, so that each node cluster looks different value.
#Console binding IP of this region. If not set, console will be bound to all available IPs.
#cluster.host=
# cluster communication port. This port should be different across the controllers if easy cluster mode is enabled.
#cluster.port=40003
#cluster.region=Beijing
# true if the current region should be hide
#cluster.hidden_region=false
# true if the current region's file distribution should be done in safe way.
#cluster.safe_dist=false