Package org.apache.storm.utils
Class ServerUtils
java.lang.Object
org.apache.storm.utils.ServerUtils
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final booleanstatic final org.slf4j.Loggerstatic final intstatic final int -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic booleanFind if all processes for the user on workId are dead.static booleancanUserReadBlob(ReadableBlobMeta meta, String user, Map<String, Object> conf) static StringcontainerFilePath(String dir) static StringReturns the value of java.class.path System property.static intexecCommand(String... command) static voidextractZipFile(ZipFile zipFile, File toDir, String prefix) Extracts the given file to the given directory.static voidforceKillProcess(String pid) static ClientBlobStorestatic intgetComponentParallelism(Map<String, Object> topoConf, Object component) getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) static longgetDiskUsage(File dir) Takes an input dir or file and returns the disk usage on that local directory.static doublegetEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) static intgetEstimatedWorkerCountForRasTopo(Map<String, Object> topoConf, StormTopology topology) static StringgetFileOwner(String path) static longGet system free memory in megabytes.static BlobStoregetNimbusBlobStore(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector) static BlobStoregetNimbusBlobStore(Map<String, Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector) static intgetPathOwnerUid(String fpath) Get the userId of the onwer of the path by running "ls -dn path" command.static URLReturns the current thread classloader.static intGet the userId for a user name.static <T> List<T>interleaveAll(List<List<T>> nodeList) static booleanisAbsolutePath(String path) static booleanisAnyPosixProcessPidDirAlive(Collection<Long> pids, String user) Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user.static booleanisAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser.static booleanisAnyProcessAlive(Collection<Long> pids, int uid) Are any of the processes alive and running for the specified userId.static booleanisAnyProcessAlive(Collection<Long> pids, String user) Are any of the processes alive and running for the specified user.static booleanisProcessAlive(long pid, String user) Is a process alive and running?.static booleanCheck if the scheduler is resource aware or not.static voidstatic longnimbusVersionOfBlob(String key, ClientBlobStore cb) static SubjectprincipalNameToSubject(String name) static StringscriptFilePath(String dir) static voidsendSignalToProcess(long lpid, int signum) static ServerUtilsProvide an instance of this class for delegates to use.static StringReturns the combined string, escaped for posix shell.static voidUnpack matching files from a jar.static voidstatic voidGiven a Tar File as input it will untar the file in a the untar directory passed as the second parameterstatic voidGiven a File input it will unzip the file in a the unzip directory passed as the second parameter.static voidvalidateTopologyAckerBundleResource(Map<String, Object> topoConf, StormTopology topology, String topoName) RAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker.static voidvalidateTopologyWorkerMaxHeapSizeConfigs(Map<String, Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) static StringWrites a posix shell script file to be executed in its own process.static StringWrites a posix shell script file to be executed in its own process.static booleanzipDoesContainDir(String zipfile, String target) Determines if a zip archive contains a particular directory.static longzipFileSize(File myFile) Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.
-
Field Details
-
LOG
public static final org.slf4j.Logger LOG -
IS_ON_WINDOWS
public static final boolean IS_ON_WINDOWS -
SIGKILL
public static final int SIGKILL- See Also:
-
SIGTERM
public static final int SIGTERM- See Also:
-
-
Constructor Details
-
ServerUtils
public ServerUtils()
-
-
Method Details
-
setInstance
Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.- Parameters:
u- a ServerUtils instance- Returns:
- the previously set instance
-
interleaveAll
-
getNimbusBlobStore
public static BlobStore getNimbusBlobStore(Map<String, Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector) -
getNimbusBlobStore
public static BlobStore getNimbusBlobStore(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector) -
isAbsolutePath
-
shellCmd
Returns the combined string, escaped for posix shell.- Parameters:
command- the list of strings to be combined- Returns:
- the resulting command string
-
getDiskUsage
Takes an input dir or file and returns the disk usage on that local directory. Very basic implementation.- Parameters:
dir- The input dir to get the disk space of this local dir- Returns:
- The total disk space of the input local directory
-
getClientBlobStoreForSupervisor
-
currentClasspath
Returns the value of java.class.path System property. Kept separate for testing.- Returns:
- the classpath
-
getResourceFromClassloader
Returns the current thread classloader. -
zipDoesContainDir
Determines if a zip archive contains a particular directory.- Parameters:
zipfile- path to the zipped filetarget- directory being looked for in the zip.- Returns:
- boolean whether or not the directory exists in the zip.
- Throws:
IOException
-
getFileOwner
- Throws:
IOException
-
containerFilePath
-
scriptFilePath
-
writeScript
public static String writeScript(String dir, List<String> command, Map<String, String> environment) throws IOExceptionWrites a posix shell script file to be executed in its own process.- Parameters:
dir- the directory under which the script is to be writtencommand- the command the script is to executeenvironment- optional environment variables to set before running the script's command. May be null.- Returns:
- the path to the script that has been written
- Throws:
IOException
-
writeScript
public static String writeScript(String dir, List<String> command, Map<String, String> environment, String umask) throws IOExceptionWrites a posix shell script file to be executed in its own process.- Parameters:
dir- the directory under which the script is to be writtencommand- the command the script is to executeenvironment- optional environment variables to set before running the script's command. May be null.umask- umask to be set. It can be null.- Returns:
- the path to the script that has been written
- Throws:
IOException
-
execCommand
public static int execCommand(String... command) throws org.apache.commons.exec.ExecuteException, IOException - Throws:
org.apache.commons.exec.ExecuteExceptionIOException
-
sendSignalToProcess
- Throws:
IOException
-
killProcessWithSigTerm
- Throws:
IOException
-
forceKillProcess
- Throws:
IOException
-
nimbusVersionOfBlob
public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException -
canUserReadBlob
-
unJar
Unpack matching files from a jar. Entries inside the jar that do not match the given pattern will be skipped.- Parameters:
jarFile- the .jar file to unpacktoDir- the destination directory into which to unpack the jar- Throws:
IOException
-
unTar
Given a Tar File as input it will untar the file in a the untar directory passed as the second parameter This utility will untar ".tar" files and ".tar.gz","tgz" files.- Parameters:
inFile- The tar file as inputuntarDir- The untar directory where to untar the tar filesymlinksDisabled- true if symlinks should be disabled, else false- Throws:
IOException
-
unpack
- Throws:
IOException
-
extractZipFile
Extracts the given file to the given directory. Only zip entries starting with the given prefix are extracted. The prefix is stripped off entry names before extraction.- Parameters:
zipFile- The zip file to extracttoDir- The directory to extract toprefix- The prefix to look for in the zip file. If not null only paths starting with the prefix will be extracted- Throws:
IOException
-
unZip
Given a File input it will unzip the file in a the unzip directory passed as the second parameter.- Parameters:
inFile- The zip file as inputtoDir- The unzip directory where to unzip the zip file- Throws:
IOException
-
zipFileSize
Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.- Parameters:
myFile- The zip file as input- Returns:
- zip file size as a long
- Throws:
IOException
-
isRas
Check if the scheduler is resource aware or not.- Parameters:
conf- The configuration- Returns:
- True if it's resource aware; false otherwise
-
getEstimatedWorkerCountForRasTopo
public static int getEstimatedWorkerCountForRasTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
getEstimatedTotalHeapMemoryRequiredByTopo
public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
getComponentParallelism
public static Map<String,Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
getComponentParallelism
public static int getComponentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
principalNameToSubject
-
currentClasspathImpl
-
getResourceFromClassloaderImpl
-
getMemInfoFreeMb
Get system free memory in megabytes.- Returns:
- system free memory in megabytes
- Throws:
IOException- on I/O exception
-
isProcessAlive
Is a process alive and running?.- Parameters:
pid- the PID of the running processuser- the user that is expected to own that process- Returns:
- true if it is, else false
- Throws:
IOException- on any error
-
isAnyProcessAlive
Are any of the processes alive and running for the specified user. If collection is empty or null then the return value is trivially false.- Parameters:
pids- the PIDs of the running processesuser- the user that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException- on I/O exception
-
isAnyProcessAlive
Are any of the processes alive and running for the specified userId. If collection is empty or null then the return value is trivially false.- Parameters:
pids- the PIDs of the running processesuid- the user that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException- on I/O exception
-
getUserId
Get the userId for a user name. This works on Posix systems by using "id -u" command. Throw IllegalArgumentException on Windows.- Parameters:
user- username to be converted to UID. This is optional, in which case current user is returned.- Returns:
- UID for the specified user (if supplied), else UID of current user, -1 upon Exception.
-
getPathOwnerUid
Get the userId of the onwer of the path by running "ls -dn path" command. This command works on Posix systems only.- Parameters:
fpath- full path to the file or directory.- Returns:
- UID for the specified if successful, -1 upon failure.
-
areAllProcessesDead
public static boolean areAllProcessesDead(Map<String, Object> conf, String user, String workerId, Set<Long> pids) throws IOExceptionFind if all processes for the user on workId are dead. This method attempts to optimize the calls by:- checking a collection of ProcessIds at once
- using userId one Posix systems instead of user
- Returns:
- true if all processes for the user are dead on the worker
- Throws:
IOException- if external commands have exception.
-
isAnyPosixProcessPidDirAlive
public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String user) throws IOException Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.
- Parameters:
pids- Process IDs that need to be monitored for livenessuser- the userId that is expected to own that process- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
IOException- on I/O exception
-
isAnyPosixProcessPidDirAlive
public static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) throws IOException Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid.
- Parameters:
pids- Process IDs that need to be monitored for livenessexpectedUser- the userId that is expected to own that processmockFileOwnerToUid- if true (used for testing), then convert File.owner to UID- Returns:
- true if any one of the processes is owned by expectedUser and alive, else false
- Throws:
IOException- on I/O exception
-
validateTopologyWorkerMaxHeapSizeConfigs
public static void validateTopologyWorkerMaxHeapSizeConfigs(Map<String, Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) throws InvalidTopologyException- Throws:
InvalidTopologyException
-
validateTopologyAckerBundleResource
public static void validateTopologyAckerBundleResource(Map<String, Object> topoConf, StormTopology topology, String topoName) throws InvalidTopologyExceptionRAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker. Validations are performed here: (Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER* memory for an acker + memory for the biggest topo executor) < max worker heap memory. When RAS tries to schedule an executor to a new worker, it will putConfig.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKERackers into the worker first. SoConfig.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MBneed to be able to accommodate this.- Parameters:
topoConf- Topology conftopology- Topology (not system topology)topoName- The name of the topology- Throws:
InvalidTopologyException
-