Hi Jason,
you could try to explicitly close your session via cs.close() to avoid side effects. Apart from that, it's true that there is no atomic operation in BaseX (yet?) to create an empty, non-existing database. Instead, we offer CHECK, a convenience command to open a database for the specified argument, or create a database if it doesn't exist.
In your scenario, I would recommend to create the database in the initialization step, and then access it via threads, or synchronize your own code to avoid that a non-existing database is created twice.
Hope this helps, Christian ___________________________
On Thu, May 5, 2011 at 10:27 PM, Jason Klassen Jason.Klassen@protegra.com wrote:
Hi Christian & Michael,
Thank you both for the quick response. Converting to a dedicated ClientSession per thread resolved the mangled commands problem. I'm curious to see what the performance trade off of that will be on a large scale.
I was still encountering a write lock when multiple threads close together tried to open a database and create it if missing. This resulted in many create attempts and write locks when a previous thread had it open. I've solved that for now with some try catches and a retry loop. This seems to work reliably for 200 threads but at 1100 threads I'm seeing 50-70% success. Do you have any examples on the best way to implement "open and create collection if missing" behavior? Below is my attempt at it but I don't like the fact that I'm using exceptions to control the flow. Again it's in Scala but should read pretty close to Java syntax.
Thanks, Jason
def clientSessionFromConfig : ClientSession = { new ClientSession( dbHost, dbPort.toInt, dbUser, dbPwd ) }
def createDB(cs: ClientSession, xmlCollStr:String): ClientSession = { try { cs.execute(new CreateDB(xmlCollStr)) } catch { case e: BaseXException => { //do nothing println(e) } } cs }
def openDB(xmlCollStr:String): ClientSession = { openDB(xmlCollStr, 50, 100) }
def openDB(xmlCollStr:String, retries:Int, wait:Int): ClientSession = { var cs = clientSessionFromConfig for (i <- 1 to retries) { try { cs.execute(new Open(xmlCollStr)) println("stopping at " + i) return cs } catch { case e: BaseXException => { println(e) cs = createDB(cs, xmlCollStr) } Thread.sleep(wait) } finally { } } return cs }
-----Original Message----- From: Michael Seiferle [mailto:michael.seiferle@uni-konstanz.de] Sent: May-05-11 10:07 AM To: Jason Klassen Cc: Christian Grün; basex-talk@mailman.uni-konstanz.de Subject: Re: [basex-talk] Concurrency with the server API under high load
Hi Jason,
thanks again for your detailed report. I found myself digging around your scala code a little, quite a nice language.
I managed to reproduce the problem here, even in plain old java code:
The problem was, that you were sharing a single ClientSession instance between all concurrent threads, thus it eventually happened that one thread overwrote the ClientSessions's command buffer that has been sent to the server. This led to the strange concatenations like 'OOOPPPEEENNN... '.
I recommend using a dedicated ClientSession _per Thread_, this will help you to avoid these kind problems*. We will add a note to our documentation to clarify that one should use a ClientSession per Thread.
Feel free to ask for more help, thanks for reporting!
Kind Regards Michael
P.S. Inspired by your scala code I will add a Java testcase to our testsuite as well, to stresstest our server a little more, thanks ;-)
- You might as well try to synchronize the ClientSession execute method, but this would involve a lot extra effort to also keep returned results consistent.
Am 04.05.2011 um 23:34 schrieb Jason Klassen:
Hello Christian,
One of the nice things about Scala is that it compiles to run on the JVM. Maven should download all the Scala .jar dependencies it needs in order to convert .scala classes to JVM bytecode. In theory you only need Scala installed if you want to run the Scala compiler directly without Maven. Here's a short article describing what you should expect with Maven and Scala for the first time. http://www.scala-blogs.org/2008/01/maven-for-scala.html
If Maven doesn't work for you it would be better for me if another from the core group could run the code.
Thanks, Jason
-----Original Message----- From: Christian Grün [mailto:christian.gruen@gmail.com] Sent: May-04-11 4:22 PM To: Jason Klassen Cc: basex-talk@mailman.uni-konstanz.de Subject: Re: [basex-talk] Concurrency with the server API under high load
Hi Jason,
thanks again for the clean test setup. I'm sorry I currently do not have Scala installed; do you think it would be possible to provide us with equivalent Java code? As an alternative, can someone else of our core group run the code and give first feedback?
Thanks, Christian
On Wed, May 4, 2011 at 11:10 PM, Jason Klassen Jason.Klassen@protegra.com wrote:
Hello Christian,
I've attached a file called BaseXExamples.zip containing a small Maven project written in Scala that will demonstrate the issue. The tests are written in Scala Specs but behave very similar to JUnit. This roughly mimics our production scenario. Assuming Maven is installed,
- run "...\BaseX\bin\basexserver.bat"
- unzip and navigate to the root directory with the pom.xml 3. run
"mvn clean compile" 4. run "mvn test"
The project can also be run equally well through IntelliJ IDEA. The tests should all pass but they are just there to populate the databases, the results will need to be examined by hand. The tests are repeatable and clean up after themselves before each run.
Let me know if you need anything further.
Jason
-----Original Message----- From: Christian Grün [mailto:christian.gruen@gmail.com] Sent: May-04-11 12:32 PM To: Jason Klassen Cc: basex-talk@mailman.uni-konstanz.de Subject: Re: [basex-talk] Concurrency with the server API under high load
Dear Jason,
thanks for giving us detailed information on your test scenario. It would be great if you could provide us an additional Java class that allows us to reproduce the issue more easily.
Best, Christian ___________________________
Christian Grün Uni KN, Box 188 78457 Konstanz, Germany http://www.inf.uni-konstanz.de/~gruen
On Wed, May 4, 2011 at 7:06 PM, Jason Klassen Jason.Klassen@protegra.com wrote:
Hi,
I'm seeking some help on the best way to use BaseX in a highly concurrent environment. We're using the server API to interact with a BaseX 6.6 server and noticing some very odd behavior when 100+ threads use clientsession concurrently to access the same db.
Looking at the BaseX logs I get some correct lines like:
01:19:21.101 [127.0.0.1:33153] OPEN Async_StressTestDB_Med
01:19:22.301 [127.0.0.1:33274] XQUERY insert node <record><contentChannel><String>data2</String></contentChannel><Stri n g
value goes here618</String></record> into for $db in
collection('Async_StressTestDB_Med')/records return $db
But then I also get lines like:
01:19:21.105 [127.0.0.1:33161] OPEONP EANs yAnscy_nSct_rSetsrseTsessTteDsBt_DMBe_dM Error: Stopped at line 1, column 6: Unknown command 'OPEONP'; try "help".
01:19:21.112 [127.0.0.1:33156] OOPPEENN AAssyynncc__SSttrreessssTTeessttDDBB__MMeedd Error: Stopped at line 1, column 8: Unknown command 'OOPPEENN'; try "help".
01:19:23.065 [127.0.0.1:33642] XQXERY insYrt ninserde <tr ecord><code nrecord><ctnteneChanntChannel><etl><Strirg>data2i/Strg>datng><a/2con nrecord>t e ntC/Striang>nnel><Stcintenng>value Channeloes hereString>va<uStrine>< goecord>esn hert152o foring></rec rd>$db in collectiono'Async_ tressTestDor ddb in 'ollec/recoion('As nc_SteressTestDu_Med')/n $dbecords return $db Error: Stopped at line 1, column 6: Unknown command 'xqxery'; did you mean 'xquery'?
The issue seems to happen when calling clientSession.execute(...) because I can verify that the command string being passed to execute is correct even when the log executes a mangled command.
I believe 2 problems are happening to us.
- A collection in clientSession is not thread safe when
storing the commands passed to it.
- BaseX is write locking the db on concurrent inserts. I
understand the need for that but the wiki on Transaction Management implied that there was a queue where requests would wait if isolation could not be guaranteed. This sounded automatic. Are there any steps I need to take to enable the Transaction Management Queue?
Below is the JUnit 4 test setup we're using to find the errors, generally I run 1 test at a time to observe and limit side effects. The BaseX GUI is closed while the tests run. I've also attached the full BaseX logs generated by running the tests but if they get removed I can easily email them upon request.
How can the problem can be resolved?
Thanks,
Jason
import org.junit._
import Assert._
import scala.util.Random
import scala.concurrent.ops._
import scala.actors.Actor._
import org.basex.core.BaseXException
import org.basex.core.cmd._
import org.basex.server.ClientSession
import scala.xml._
class MockBaseXXMLStore
{
val dbHost = "localhost"
val dbPort = "1984"
val dbUser = "admin"
val dbPwd = "admin"
var _clientSession: Option[ClientSession] = None
def clientSession: ClientSession =
{
_clientSession match {
case Some(cs) => cs
case None => {
val cs = new ClientSession(dbHost, dbPort.toInt, dbUser, dbPwd)
_clientSession = Some(cs)
cs
}
}
}
def insertUpdate(xmlCollStr: String)(xmlElemStr: String): Unit =
{
val srvrRspStrm = new java.io.ByteArrayOutputStream()
try {
clientSession.setOutputStream(srvrRspStrm)
clientSession.execute(new Open(xmlCollStr))
val insertTemplate =
(
"insert node %NODE% into "
+ "for $db in collection('%COLLNAME%')/records return $db"
);
val insertQry =
insertTemplate.replace(
"%NODE%",
xmlElemStr
).replace(
"%COLLNAME%",
xmlCollStr
)
try {
clientSession.execute(new XQuery(insertQry))
}
catch {
case e: BaseXException => {
}
}
}
catch {
case e: BaseXException => {
val recordsElem = <records>
{XML.loadString(xmlElemStr)}
</records>
clientSession.execute(new CreateDB(xmlCollStr))
clientSession.execute(new Add(recordsElem.toString))
}
}
finally {
srvrRspStrm.close()
}
}
}
class BaseXTest
{
val rand = new Random()
@Test
//Success: 1 created
def insertUpdate =
{
val db = new MockBaseXXMLStore
db.insertUpdate("MockBaseXXMLStoreTestDB")("<record><contentChannel> < S tring>data2</String></contentChannel><String>value goes here" + rand.nextInt + "</String></record>")
}
@Test
//Success: 100 and 1000 created
def stressInsertUpdate =
{
val db = new MockBaseXXMLStore
println("starting small")
for (i <- 1 to 100) {
db.insertUpdate("StressTestDB_Small")("<record><contentChannel><Stri n g
data2</String></contentChannel><String>value
goes here" + i + "</String></record>")
}
println("starting med")
for (i <- 1 to 1000) {
db.insertUpdate("StressTestDB_Med")("<record><contentChannel><String
d ata2</String></contentChannel><String>value
goes here" + i + "</String></record>")
}
}
@Test
//Fail: 0 and 184 created
def stressInsertUpdateAsync =
{
val db = new MockBaseXXMLStore
println("async starting small")
for (i <- 1 to 100) {
spawn {
db.insertUpdate("Async_StressTestDB_Small")("<record><contentChannel
< String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")
}
}
println("async starting med")
for (i <- 1 to 1000) {
spawn {
db.insertUpdate("Async_StressTestDB_Med")("<record><contentChannel>< S t ring>data2</String></contentChannel><String>value goes here" + i + "</String></record>")
}
}
}
@Test
//fail 4 and 0 created
def stressInsertUpdateActor =
{
val db = new MockBaseXXMLStore
println("async starting small")
for (i <- 1 to 100) {
actor {
db.insertUpdate("Actor_StressTestDB_Small")("<record><contentChannel
< String>data2</String></contentChannel><String>value goes here" + i + "</String></record>")
}
}
println("async starting med")
for (i <- 1 to 1000) {
actor {
db.insertUpdate("Actor_StressTestDB_Med")("<record><contentChannel>< S t ring>data2</String></contentChannel><String>value goes here" + i + "</String></record>")
}
}
}
}
BaseX-Talk mailing list BaseX-Talk@mailman.uni-konstanz.de https://mailman.uni-konstanz.de/mailman/listinfo/basex-talk
BaseX-Talk mailing list BaseX-Talk@mailman.uni-konstanz.de https://mailman.uni-konstanz.de/mailman/listinfo/basex-talk