Hi Christian,
I understand that you have to test exhaustively this sensitive code. I can also understand that you may think to other solutions. No problem. BTW, reading my code this morning, I have found that the synchronization was not complete : I have removed the queueMutex and made a better usage of the existing mutex. I send you the fixed code for information.
Regards, Laurent
package org.basex.core;
import java.util.LinkedList; import org.basex.util.Util;
/** * Management of executing read/write processes. * Supports multiple readers, limited by {@link MainProp#PARALLEL}, * and single writers (readers/writer lock). * * @author BaseX Team 2005-11, BSD License * @author Christian Gruen */ final class Lock { /** Queue for all waiting processes. */ private final LinkedList<LockedCommand> queue = new LinkedList<LockedCommand>(); /** Mutex object. */ private final Object mutex = new Object(); /** Database context. */ private final Context ctx;
/** Number of active readers. */ private int readers; /** Writer flag. */ private boolean writer;
/** * Default constructor. * @param c context */ Lock(final Context c) { ctx = c; }
/** * Next pending command from the queue that can be processed. * @return Command or null if they are all locked. */ private LockedCommand next() { synchronized(mutex) { if ( writer ) return null; // Currently writing. No other command allowed. for(int i=0; i<queue.size(); i++) { LockedCommand cmd = queue.get(i); if(cmd.writer) { // Writing can be done only if no reading command is running. if(readers == 0) { writer = true; return cmd; } } else if(readers < Math.max(ctx.mprop.num(MainProp.PARALLEL), 1)) { // Additionnal readers are still allowed. ++readers; return cmd; } } return null; } }
/** * Modifications before executing a command. * @param w writing flag */ void lock(final boolean w) { synchronized(mutex) { final LockedCommand cmd = new LockedCommand(w); queue.add(cmd);
try { while(true) { LockedCommand nextCmd = next(); if ( nextCmd!=null && cmd.equals(nextCmd) ) break; mutex.wait(); } } catch(final InterruptedException ex) { Util.stack(ex); }
synchronized(mutex) { queue.remove(cmd); } } }
/** * Modifications after executing a command. * @param w writing flag */ synchronized void unlock(final boolean w) { synchronized(mutex) { if(w) { writer = false; } else { --readers; } mutex.notifyAll(); } } }
package org.basex.core;
/** * Locked command stored in Lock class queue. * @author Laurent Chevalier */ final class LockedCommand { /** Writer flag. */ public boolean writer;
/** * Default constructor. * @param w Writer flag. Tells whether it is an updating command (true) or not (false). */ LockedCommand(final boolean w) { this.writer = w; } }
-----Message d'origine----- De : Christian Grün [mailto:christian.gruen@gmail.com] Envoyé : mercredi 31 août 2011 00:17 À : Laurent Chevalier Cc : basex-talk@mailman.uni-konstanz.de Objet : Re: [basex-talk] BaseX server deadlock
Hi Laurent,
thanks again for offering another solution for the Lock class. Just an interim note: Before I'll commit some changes, I'll have to get sure that the update doesn't cause any other side effects, as the discussed code is very sensitive. It's a somewhat irritating fact that this Java class has undergone many iterations (considering the fact that it's pretty compact and light-weight), but the positive effect of that is that the existing code is very stable.
Be sure I'll keep you updated, Christian __________________________
On Tue, Aug 30, 2011 at 11:14 AM, Laurent Chevalier l.chevalier@cyim.com wrote:
I have found a new solution that preserves the order of arrival of
the requests and avoids the deadlock. You will find hereafter the Lock class and the new LockedCommand class used for the queue. Actually, instead of taking only the first item of the queue, I take the first item that can be processed.
Regards, Laurent
package org.basex.core;
import java.util.LinkedList; import org.basex.util.Util;
/**
- Management of executing read/write processes.
- Supports multiple readers, limited by {@link MainProp#PARALLEL},
- and single writers (readers/writer lock).
- @author BaseX Team 2005-11, BSD License
- @author Christian Gruen
*/ final class Lock { /** Queue for all waiting processes. */ private final LinkedList<LockedCommand> queue = new
LinkedList<LockedCommand>();
/** Mutex object. */ private final Object mutex = new Object(); /** Database context. */ private final Context ctx; /** Mutext used to synchronize pending commands (waiting processes).
*/
private final Object queueMutex = new Object();
/** Number of active readers. */ private int readers; /** Writer flag. */ private boolean writer;
/**
- Default constructor.
- @param c context
*/ Lock(final Context c) { ctx = c; }
/**
- Next pending command from the queue that can be processed.
- @return Command or null if they are all locked.
*/ LockedCommand next() { synchronized(queueMutex) { if ( writer ) return null; // Currently writing. No other
command allowed.
for(int i=0; i<queue.size(); i++) { LockedCommand cmd = queue.get(i); if(cmd.writer) { // Writing can be done only if no reading command is
running.
if(readers == 0) { writer = true; return cmd; } } else if(readers <
Math.max(ctx.mprop.num(MainProp.PARALLEL), 1)) {
// Additionnal readers are still allowed. ++readers; return cmd; } } return null;
} }
/**
- Modifications before executing a command.
- @param w writing flag
*/ void lock(final boolean w) { synchronized(mutex) { final LockedCommand cmd = new LockedCommand(w); queue.add(cmd);
try { while(true) { synchronized(queueMutex) { LockedCommand nextCmd = next(); if ( nextCmd!=null && cmd.equals(nextCmd) ) break; } mutex.wait(); } } catch(final InterruptedException ex) { Util.stack(ex); } queue.remove(cmd);
} }
/**
- Modifications after executing a command.
- @param w writing flag
*/ synchronized void unlock(final boolean w) { synchronized(mutex) { if(w) { writer = false; } else { --readers; } mutex.notifyAll(); } } }
package org.basex.core;
/**
- Locked command stored in Lock class queue.
- @author Laurent Chevalier
*/ final class LockedCommand { /** Writer flag. */ public boolean writer;
/**
- Default constructor.
- @param w Writer flag. Tells whether it is an updating command
(true) or not (false).
*/ LockedCommand(final boolean w) { this.writer = w; } }
-----Message d'origine----- De : Christian Grün [mailto:christian.gruen@gmail.com] Envoyé : mardi 30 août 2011 00:24 À : Laurent Chevalier Cc : basex-talk@mailman.uni-konstanz.de Objet : Re: [basex-talk] BaseX server deadlock
Dear Laurent,
thanks for the elaborate description of the problem and the proposed bug fix. Yes, I agree that this issue should not be ignored. I've
just
added your observation in our bug tracker:
https://github.com/BaseXdb/basex/issues/173
I'm not sure, however, how you're planning to utilize the randomly generated number, as the "code" variable isn't used in the
subsequent
lines of code. Did I miss something? Next, a random selection of incoming requests could mix up subsequent write operations, which would cause unexpected results in other use cases of BaseX (but
maybe
I missed your point here, regarding the notion of random execution).
I'll have some more thoughts on potential alternatives (which is always better than just questioning others' proposals). If you have some more ideas how to resolve the issue, you are welcome to tell
us.
Christian ____________________
On Mon, Aug 29, 2011 at 9:50 AM, Laurent Chevalier l.chevalier@cyim.com wrote:
Hi,
A deadlock occurs in the following situation: a first client
program
opens an iterative query. For each iteration, this program does some processing and sends another reading request to BaseX (using another BaseX session). All works fine until a second client program (or another thread) sends an updating command to BaseX (like optimize
for
instance). This locks BaseX server. To unlock it, you have to kill
the
first program.
I have read BaseX server code and found the reason for this
behavior
in the class org.basex.core.Lock:
- with the iterative query, there is always at least one reader
alive (readers=1).
- when the updating query is received, it is put in the queue
(index
- and remains in it as long as there is a reading query running
(that
is to say, as long as the iterative reading query is running).
- then a second reading request is received, it is put in the
queue
(index 1 as there is already the updating query in the queue). As it
is
only the second item of the queue, it remains in the queue as long
as
the first item in the queue (the updating query) has not been
processed
(BaseX processes the requests in the order of arrival, FIFO queue).
But
this first item can not be processed because there is the iterative reading query running. All queries are thus locked.
Some may say that we should not send another query while we are in
the loop of an iterative query but in our context of many sites
being
developed by several developers, it is possible that a developer
codes
this and we do not want BaseX to be locked in this case (whatever it
is
a mistake of the developer or not).
I have found a solution to this problem by modifying the
org.basex.core.Lock class. You will find my code hereafter. I do not use a queue anymore and i use a static mutex (called queueMutex) to synchronize all pending queries (threads). The "drawback" of this solution is that the queries are not processed anymore in the order
of
arrival but randomly.
What do you think of this solution ? Do you plan to update BaseX
locking mechanism ?
I'm using BaseX 6.7.1 but I have seen that Lock.java has not been
changed in BaseX 6.7.2.
Here is my code :
package org.basex.core;
import java.util.Date; //import java.util.LinkedList; import java.util.Random;
import org.basex.util.Util;
/**
- Management of executing read/write processes.
- Supports multiple readers, limited by {@link
MainProp#PARALLEL},
- and single writers (readers/writer lock).
- @author BaseX Team 2005-11, BSD License
- @author Christian Gruen
*/ final class Lock { /** Queue for all waiting processes. */ // private final LinkedList<Object> queue = new
LinkedList<Object>();
/** Mutex object. */ private final Object mutex = new Object(); /** Database context. */ private final Context ctx; /** Static mutex used to synchronize all pending queries. **/ private final static Object queueMutex = new Object();
/** Number of active readers. */ private int readers; /** Writer flag. */ private boolean writer;
/**
- Default constructor.
- @param c context
*/ Lock(final Context c) { ctx = c; }
/**
- Modifications before executing a command.
- @param w writing flag
*/ void lock(final boolean w) { synchronized(mutex) { int code = new Random(new Date().getTime()).nextInt(); // final Object o = new Object(); // queue.add(o);
try { while(true) { synchronized(queueMutex) {
// if(o == queue.get(0) && !writer) { if(!writer) { if(w) { if(readers == 0) { writer = true; break; } } else if(readers <
Math.max(ctx.mprop.num(MainProp.PARALLEL), 1)) {
++readers; break; } } } mutex.wait(); } } catch(final InterruptedException ex) { Util.stack(ex); }
// queue.remove(0); } }
/**
- Modifications after executing a command.
- @param w writing flag
*/ synchronized void unlock(final boolean w) { synchronized(mutex) { if(w) { writer = false; } else { --readers; } mutex.notifyAll(); } } }