Package ch.cern.dirq
Class QueueSimple
java.lang.Object
ch.cern.dirq.QueueSimple
QueueSimple - object oriented interface to a simple directory based queue.
A port of Perl module Directory::Queue::Simple http://search.cpan.org/dist/Directory-Queue/
The documentation from Directory::Queue::Simple module has been adapted for Java.
Compared to normal directory queue, this module:
The names of the intermediate directories are time based: the element insertion time is used to create a 8-digits long hexadecimal number. The granularity (see the constructor) is used to limit the number of new directories. For instance, with a granularity of 60 (the default), new directories will be created at most once per minute.
Since there is usually a filesystem limit in the number of directories a directory can hold, there is a trade-off to be made. If you want to support many added elements per second, you should use a low granularity to keep small directories. However, in this case, you will create many directories and this will limit the total number of elements you can store.
The elements themselves are stored in files (one per element) with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
A temporary element (being added to the queue) will have a
A locked element will have a hard link with the same name and the
Please refer to
A port of Perl module Directory::Queue::Simple http://search.cpan.org/dist/Directory-Queue/
The documentation from Directory::Queue::Simple module has been adapted for Java.
Usage
// sample producer
QueueSimple dirq = new QueueSimple("/tmp/test");
for (int i=0; i < 100; i++) {
String name = dirq.add("element " + i);
System.out.println("# added element " + i + " as " + name);
}
// sample consumer
dirq = QueueSimple("/tmp/test");
for (String name: dirq) {
if (! dirq.lock(name)) {
continue;
}
System.out.println("# reading element " + name);
String data = dirq.get(name);
// one could use dirq.unlock(name) to only browse the queue...
dirq.remove(name);
}
Description
This module is very similar to the normal directory queue, but uses a different way to store data in the filesystem, using less directories. Its API is almost identical.Compared to normal directory queue, this module:
- is simpler
- is faster
- uses less space on disk
- can be given existing files to store
- does not support schemas
- can only store and retrieve byte strings
- is not compatible (at filesystem level) with the normal directory queue
Directory Structure
The toplevel directory contains intermediate directories that contain the stored elements, each of them in a file.The names of the intermediate directories are time based: the element insertion time is used to create a 8-digits long hexadecimal number. The granularity (see the constructor) is used to limit the number of new directories. For instance, with a granularity of 60 (the default), new directories will be created at most once per minute.
Since there is usually a filesystem limit in the number of directories a directory can hold, there is a trade-off to be made. If you want to support many added elements per second, you should use a low granularity to keep small directories. However, in this case, you will create many directories and this will limit the total number of elements you can store.
The elements themselves are stored in files (one per element) with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
- SSSSSSSS represents the number of seconds since the Epoch
- MMMMM represents the microsecond part of the time since the Epoch
- R is a random hexadecimal digit used to reduce name collisions
A temporary element (being added to the queue) will have a
.tmp
suffix.
A locked element will have a hard link with the same name and the
.lck suffix.
Please refer to
Queue for general information about
directory queues.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate static classFileFilter class to iterate over temporary or locked elements.private static classFileFilter class to iterate over (normal) elements.private static classFileFilter class to iterate over intermediate directories.private static classIterator for the simple directory queue (private). -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate static final intprivate static final intprivate static final intstatic final Patternprivate Set<PosixFilePermission> private static final FileFilterprivate static final FileFilterstatic final Patternprivate Set<PosixFilePermission> private intprivate static final FileFilterstatic final Stringprivate static final org.slf4j.Loggerprivate static final intprivate static final intprivate static final longprivate static final intprivate static final intprivate static final longprivate intprivate intprivate Stringprivate Stringprivate static Randomprivate intprivate static final longstatic final Stringprivate int -
Constructor Summary
ConstructorsConstructorDescriptionQueueSimple(String path) Constructor creating a simple directory queue from the given path.QueueSimple(String path, int numask) Constructor creating a simple directory queue from the given path and umask. -
Method Summary
Modifier and TypeMethodDescriptionadd(byte[] data) Add byte array data to the queue.Add String data to the queue.private PathaddDataHelper(String dir, byte[] data) private PathaddDataHelper(String dir, String data) Add the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.private StringaddPathHelper(Path tmp, String dir) intcount()Return the number of elements in the queue.private PathcreateFile(String path) private Stringprivate static Set<PosixFilePermission> directoryPerms(int numask) private static StringelementName(int rnd) private voidensureDirectory(Path path) private static Set<PosixFilePermission> filePerms(int numask) Get the given locked element as String data.byte[]getAsByteArray(String name) Get the given locked element as byte array data.intGet the granularity.getId()Return a unique identifier for the queue.intGet the default maxLock for purge().intGet the default maxTemp for purge().private PathgetNewPath(String dir) Get the path of the given locked element.Return the path of the queue.intGet the random hexadecimal digit.intgetUmask()Get the umask.iterator()Iterator for the simple directory queue.booleanLock an element in permissive mode.booleanLock an element.voidpurge()Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.voidpurge(int maxLock) Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.voidpurge(int maxLock, int maxTemp) Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.voidRemove a locked element from the queue.setGranularity(int value) Set the granularity.setMaxLock(int value) Set the default maxLock for purge().setMaxTemp(int value) Set the default maxTemp for purge().setRndHex(int value) Set the random hexadecimal digit.setUmask(int value) Set the umask.private booleanbooleanUnlock an element in non-permissive mode.booleanUnlock an element.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface java.lang.Iterable
forEach, spliterator
-
Field Details
-
logger
private static final org.slf4j.Logger logger -
TEMPORARY_SUFFIX
- See Also:
-
LOCKED_SUFFIX
- See Also:
-
DIRECTORY_REGEXP
-
ELEMENT_REGEXP
-
DEFAULT_GRANULARITY
private static final int DEFAULT_GRANULARITY- See Also:
-
DEFAULT_MAXLOCK
private static final int DEFAULT_MAXLOCK- See Also:
-
DEFAULT_MAXTEMP
private static final int DEFAULT_MAXTEMP- See Also:
-
MAX_RNDHEX
private static final int MAX_RNDHEX- See Also:
-
MAX_UMASK
private static final int MAX_UMASK- See Also:
-
MAX_DIRECTORY_UMASK
private static final int MAX_DIRECTORY_UMASK- See Also:
-
MAX_FILE_UMASK
private static final int MAX_FILE_UMASK- See Also:
-
SECOND
private static final long SECOND- See Also:
-
NANO2MICRO
private static final long NANO2MICRO- See Also:
-
MAX_MICRO
private static final long MAX_MICRO- See Also:
-
INTERMEDIATE_DIRECTORY_FF
-
ELEMENT_FF
-
DOT_ELEMENT_FF
-
rand
-
granularity
private int granularity -
qMaxLock
private int qMaxLock -
qMaxTemp
private int qMaxTemp -
rndHex
private int rndHex -
umask
private int umask -
queueId
-
queuePath
-
directoryPermissions
-
filePermissions
-
-
Constructor Details
-
QueueSimple
Constructor creating a simple directory queue from the given path.- Parameters:
path- path of the directory queue- Throws:
IOException- if any file operation fails
-
QueueSimple
Constructor creating a simple directory queue from the given path and umask.- Parameters:
path- path of the directory queuenumask- numerical umask of the directory queue- Throws:
IOException- if any file operation fails
-
-
Method Details
-
getQueuePath
Description copied from interface:QueueReturn the path of the queue.- Specified by:
getQueuePathin interfaceQueue- Returns:
- queue path
-
getId
Description copied from interface:QueueReturn a unique identifier for the queue. -
add
Description copied from interface:QueueAdd String data to the queue.- Specified by:
addin interfaceQueue- Parameters:
data- data to be added- Returns:
- element name (as directory_name/file_name)
- Throws:
IOException- if any file operation fails
-
add
Description copied from interface:QueueAdd byte array data to the queue.- Specified by:
addin interfaceQueue- Parameters:
data- data to be added- Returns:
- element name (as directory_name/file_name)
- Throws:
IOException- if any file operation fails
-
addPath
Description copied from interface:QueueAdd the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.- Specified by:
addPathin interfaceQueue- Parameters:
path- path of the file to be added- Returns:
- element name (as directory_name/file_name)
- Throws:
IOException- if any file operation fails
-
get
Description copied from interface:QueueGet the given locked element as String data.- Specified by:
getin interfaceQueue- Parameters:
name- name of the element to be retrieved- Returns:
- data associated with the given element
- Throws:
IOException- if any file operation fails
-
getAsByteArray
Description copied from interface:QueueGet the given locked element as byte array data.- Specified by:
getAsByteArrayin interfaceQueue- Parameters:
name- name of the element to be retrieved- Returns:
- data associated with the given element
- Throws:
IOException- if any file operation fails
-
getPath
Description copied from interface:QueueGet the path of the given locked element.
This pathFile can be read but not removed, you must use the remove() method for this purpose. -
lock
Description copied from interface:QueueLock an element in permissive mode.- Specified by:
lockin interfaceQueue- Parameters:
name- name of the element to be locked- Returns:
trueon success,falseif the element could not be locked- Throws:
IOException- if any file operation fails
-
lock
Description copied from interface:QueueLock an element.- Specified by:
lockin interfaceQueue- Parameters:
name- name of the element to be lockedpermissive- work in permissive mode- Returns:
trueon success,falseif the element could not be locked- Throws:
IOException- if any file operation fails
-
unlock
Description copied from interface:QueueUnlock an element in non-permissive mode.- Specified by:
unlockin interfaceQueue- Parameters:
name- name of the element to be unlocked- Returns:
trueon success,falseif the element could not be unlocked- Throws:
IOException- if any file operation fails
-
unlock
Description copied from interface:QueueUnlock an element.- Specified by:
unlockin interfaceQueue- Parameters:
name- name of the element to be unlockedpermissive- work in permissive mode- Returns:
trueon success,falseif the element could not be unlocked- Throws:
IOException- if any file operation fails
-
remove
Description copied from interface:QueueRemove a locked element from the queue.- Specified by:
removein interfaceQueue- Parameters:
name- name of the element to be removed- Throws:
IOException- if any file operation fails
-
count
public int count()Description copied from interface:QueueReturn the number of elements in the queue.
Locked elements are counted but temporary elements are not. -
purge
Description copied from interface:QueuePurge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.
It uses default value for maxTemp and maxLock- Specified by:
purgein interfaceQueue- Throws:
IOException- if any file operation fails
-
purge
Description copied from interface:QueuePurge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.- Specified by:
purgein interfaceQueue- Parameters:
maxLock- maximum time for a locked element (in seconds); if set to 0, locked elements will not be unlocked; if set to null, the object's default value will be used- Throws:
IOException- if any file operation fails
-
purge
Description copied from interface:QueuePurge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.- Specified by:
purgein interfaceQueue- Parameters:
maxLock- maximum time for a locked element (in seconds); if set to 0, locked elements will not be unlocked; if set to null, the object's default value will be usedmaxTemp- maximum time for a temporary element (in seconds); if set to 0, temporary elements will not be removed if set to null, the object's default value will be used- Throws:
IOException- if any file operation fails
-
getGranularity
public int getGranularity()Get the granularity.- Returns:
- granularity (in seconds)
-
setGranularity
Set the granularity.- Parameters:
value- granularity to be set (in seconds)- Returns:
- the object itself
-
getUmask
public int getUmask()Get the umask.- Returns:
- numerical umask
-
setUmask
Set the umask.- Parameters:
value- umask to be set (numerical)- Returns:
- the object itself
-
getMaxLock
public int getMaxLock()Get the default maxLock for purge().- Returns:
- maximum lock time (in seconds)
-
setMaxLock
Set the default maxLock for purge().- Parameters:
value- maximum lock time (in seconds)- Returns:
- the object itself
-
getMaxTemp
public int getMaxTemp()Get the default maxTemp for purge().- Returns:
- maximum temporary time (in seconds)
-
setMaxTemp
Set the default maxTemp for purge().- Parameters:
value- maximum temporary time (in seconds)- Returns:
- the object itself
-
getRndHex
public int getRndHex()Get the random hexadecimal digit.- Returns:
- numerical hexadecimal digit
-
setRndHex
Set the random hexadecimal digit.- Parameters:
value- hexadecimal digit to be set (numerical)- Returns:
- the object itself
-
directoryPerms
-
filePerms
-
directoryName
-
elementName
-
addPathHelper
- Throws:
IOException
-
createFile
- Throws:
IOException
-
getNewPath
- Throws:
IOException
-
addDataHelper
- Throws:
IOException
-
addDataHelper
- Throws:
IOException
-
ensureDirectory
- Throws:
IOException
-
touchFile
-
iterator
Iterator for the simple directory queue.
-