Volksdata 1.0b7
RDF library
Loading...
Searching...
No Matches
store_mdb.c
Go to the documentation of this file.
2
6#define N_DB 9
7
11#if (defined DEBUG || defined TESTING)
12#define DEFAULT_MAPSIZE 1<<24 // 16Mb (limit for Valgrind)
13#elif !(defined __LP64__ || defined __LLP64__) || \
14 defined _WIN32 && !defined _WIN64
15#define DEFAULT_MAPSIZE 1<<31 // 2Gb (limit for 32-bit systems)
16#else
17#define DEFAULT_MAPSIZE 1UL<<40 // 1Tb
18#endif
19
20#define ENV_DIR_MODE 0750
21#define ENV_FILE_MODE 0640
22
23/*
24 * Data types.
25 */
26
27typedef char DbLabel[8];
28typedef struct mdbstore_iter_t MDBIterator;
29
31typedef enum {
32 LSSTORE_OPEN = 1<<0,
34
36typedef enum {
38 //<
39 //< The iterator has begun a new
40 //< transaction on initialization
41 //< which needs to be closed. If
42 //< false, the iterator is using an
43 //< existing transaction which will
44 //< not be closed with
45 //< #mdbiter_free().
46} IterFlags;
47
48typedef enum {
51} StoreOp;
52
53typedef struct mdbstore_t {
54 MDB_env * env;
55 MDB_dbi dbi[N_DB];
57} MDBStore;
58
69typedef void (*iter_op_fn_t)(MDBIterator *it);
70
71
73typedef struct mdbstore_iter_t {
76 MDB_txn * txn;
77 MDB_cursor * cur;
78 MDB_cursor * ctx_cur;
79 MDB_val key;
80 MDB_val data;
87 const uint8_t * term_order;
90 size_t i;
91 size_t ct;
93 int rc;
95
96
97/*
98 * Static variables.
99 */
100
101#define DUPFIXED_MASK MDB_DUPSORT | MDB_DUPFIXED
102
108#define MAIN_TABLE \
109/* #ID pfx #DB label #Flags */ \
110 ENTRY( T_ST, "t:st", 0 ) /* Key to ser. term */ \
111 ENTRY( SPO_C, "spo:c", DUPFIXED_MASK ) /* Triple to context */ \
112
113
116#define LOOKUP_TABLE \
117/* #ID pfx #DB label #Flags */ \
118 ENTRY( S_PO, "s:po", DUPFIXED_MASK ) /* 1-bound lookup */ \
119 ENTRY( P_SO, "p:so", DUPFIXED_MASK ) /* 1-bound lookup */ \
120 ENTRY( O_SP, "o:sp", DUPFIXED_MASK ) /* 1-bound lookup */ \
121 ENTRY( PO_S, "po:s", DUPFIXED_MASK ) /* 2-bound lookup */ \
122 ENTRY( SO_P, "so:p", DUPFIXED_MASK ) /* 2-bound lookup */ \
123 ENTRY( SP_O, "sp:o", DUPFIXED_MASK ) /* 2-bound lookup */ \
124 ENTRY( C_SPO, "c:spo", DUPFIXED_MASK ) /* Context lookup */ \
125
126
129#define ENTRY(a, b, c) static const DbLabel DB_##a = b;
132#undef ENTRY
133
134/*
135 * Numeric index of each DB. Prefixed with IDX_
136 *
137 * These index numbers are referred to in all the arrays defeined below. They
138 * are independent from the LMDB dbi values which are considered opaque here.
139 */
140typedef enum {
141#define ENTRY(a, b, c) IDX_##a,
144#undef ENTRY
145} DBIdx;
146
150static const char *db_labels[N_DB] = {
151#define ENTRY(a, b, c) DB_##a,
154#undef ENTRY
155};
156
157/*
158 * DB flags. These are aligned with the dbi_labels index.
159 */
160static const unsigned int db_flags[N_DB] = {
161#define ENTRY(a, b, c) c,
164#undef ENTRY
165};
166
167/*
168 * 1-bound and 2-bound lookup indices.
169 *
170 * N.B. Only the first 6 (1-bound and 2-bound term lookup) are used.
171 * The others are added just because they belong logically to the lookup table.
172 */
173static DBIdx lookup_indices[9] = {
174#define ENTRY(a, b, c) IDX_##a,
176#undef ENTRY
177};
178
179static const uint8_t lookup_ordering_1bound[3][3] = {
180 {0, 1, 2}, // s:po
181 {1, 0, 2}, // p:so
182 {2, 0, 1}, // o:sp
183};
184
185static const uint8_t lookup_ordering_2bound[3][3] = {
186 {1, 2, 0}, // po:s
187 {0, 2, 1}, // so:p
188 {0, 1, 2}, // sp:o
189};
190
191
192/*
193 * Static prototypes.
194 */
195static int index_triple(
196 MDBStore *store, StoreOp op, VOLK_TripleKey spok, VOLK_Key ck,
197 MDB_txn *txn);
198static VOLK_rc mdbstore_add_term (void *h, const VOLK_Buffer *sterm, void *th);
199
200inline static VOLK_rc lookup_0bound (MDBIterator *it, size_t *ct);
201inline static VOLK_rc lookup_1bound (
202 uint8_t idx0, MDBIterator *it, size_t *ct);
203inline static VOLK_rc lookup_2bound (
204 uint8_t idx0, uint8_t idx1, MDBIterator *it, size_t *ct);
205inline static VOLK_rc lookup_3bound(MDBIterator *it, size_t *ct);
206
207
208static const char *
209mdbstore_path_from_id (const char *id)
210{
211 // Set environment path.
212 if (!id) id = getenv ("VOLK_MDB_STORE_URN");
213 if (!id) {
215 log_info (
216 "`VOLK_MDB_STORE_URN' environment variable is not "
217 "set. The default URN %s has been set as the store ID.", id
218 );
219 } else if (strncmp ("file://", id, 7) != 0) {
220 log_error ("MDB store ID must be in the `file://<abs_path>` format.");
221
222 return NULL;
223 }
224
225 return id + 7;
226}
227
228
229/*
230 * Inliners.
231 */
232
233static inline VOLK_rc
234txn_begin (MDB_env *env, MDB_txn *p, unsigned int f, MDB_txn **tp) {
235 VOLK_rc rc = mdb_txn_begin (env, p, f, tp);
236 const char *path;
237 mdb_env_get_path (env, &path);
238 log_debug (
239 "BEGIN %s transaction %p child of %p in env %s",
240 f == 0 ? "RW" : "RO", *tp, p, path);
241 return rc;
242}
243
244
245static inline VOLK_rc
246txn_commit (MDB_txn *t) {
247 log_debug ("COMMIT transaction %p", t);
248 return mdb_txn_commit (t);
249}
250
251
255
262static VOLK_rc
263mdbstore_setup (const char *id, bool clear)
264{
265 if (!VOLK_env_is_init) return VOLK_ENV_ERR;
266
267 const char *path = mdbstore_path_from_id (id);
268 if (!path) return VOLK_VALUE_ERR;
269
270 // If the directory exists (unless clear == true), do nothing.
271 if (clear) rm_r (path);
272 VOLK_rc rc = mkdir_p (path, ENV_DIR_MODE);
273 log_debug ("Create dir rc: %s", VOLK_strerror (rc));
274 PRCCK (rc);
275
276 // Open a temporary environment and txn to create the DBs.
277 MDB_env *env;
278 RCCK (mdb_env_create (&env));
279
280 RCCK (mdb_env_set_maxdbs (env, N_DB));
281 RCCK (mdb_env_open (env, path, 0, ENV_FILE_MODE));
282 log_debug ("Environment opened at %s.", path);
283
284 MDB_txn *txn;
285 RCCK (txn_begin (env, NULL, 0, &txn));
286 MDB_dbi dbi;
287 for (int i = 0; i < N_DB; i++) {
288 log_trace ("Creating DB %s", db_labels[i]);
289 RCCK (
290 mdb_dbi_open (txn, db_labels[i], db_flags[i] | MDB_CREATE, &dbi)
291 );
292 }
293
294 // Bootstrap the permanent store with initial data.
295 MDB_stat stat;
296 CHECK (mdb_dbi_open (
297 txn, db_labels[IDX_T_ST], db_flags[IDX_T_ST], &dbi), fail);
298 CHECK (mdb_stat (txn, dbi, &stat), fail);
299
300 if (stat.ms_entries == 0) {
301 log_debug ("Loading initial data into %s", path);
302 // Index default context.
303 MDB_cursor *cur;
304 CHECK (mdb_cursor_open (txn, dbi, &cur), fail);
306 MDB_val key, data;
307 key.mv_data = &k;
308 key.mv_size = sizeof (k);
309
310 data.mv_data = VOLK_default_ctx_buf->addr;
311 data.mv_size = VOLK_default_ctx_buf->size;
312
313 VOLK_rc db_rc = mdb_cursor_put (cur, &key, &data, 0);
314 CHECK (db_rc, fail);
315 }
316
317 CHECK (txn_commit (txn), fail);
318 mdb_env_close (env);
319
320 return clear ? VOLK_OK : rc;
321
322fail:
323 if (rc >= 0) rc = VOLK_DB_ERR;
324 mdb_txn_abort (txn);
325 return rc;
326}
327
328
339static void *
340mdbstore_new (const char *id, size_t _unused)
341{
342 if (!VOLK_env_is_init) {
343 log_error (VOLK_strerror (VOLK_ENV_ERR));
344 return NULL;
345 }
346
347 (void) _unused;
348 const char *path = mdbstore_path_from_id (id);
349 if (!path) return NULL;
350
351 MDBStore *store;
352 CALLOC_GUARD (store, NULL);
353
354 RCNL (mdb_env_create (&store->env));
355 MDB_txn *txn = NULL;
356
357 // Set map size.
358 size_t mapsize;
359 char *env_mapsize = getenv ("VOLK_MDB_MAPSIZE");
360 if (env_mapsize == NULL) mapsize = DEFAULT_MAPSIZE;
361 else sscanf (env_mapsize, "%zu", &mapsize);
362 log_debug (
363 "Setting environment map size at %s to %zu Mb.",
364 path, mapsize / 1024 / 1024);
365 CHECK (mdb_env_set_mapsize (store->env, mapsize), fail);
366 CHECK (mdb_env_set_maxdbs (store->env, N_DB), fail);
367 CHECK (mdb_env_open (store->env, path, 0, ENV_FILE_MODE), fail);
368
369 // Assign DB handles to store->dbi.
370 CHECK (txn_begin (store->env, NULL, 0, &txn), fail);
371 for (int i = 0; i < N_DB; i++)
372 CHECK (mdb_dbi_open (
373 txn, db_labels[i], db_flags[i], store->dbi + i), fail);
374
375 store->flags |= LSSTORE_OPEN;
376 CHECK (txn_commit(txn), fail);
377 txn = NULL;
378
379 log_info ("Created MDB environment at %s", path);
380
381 return store;
382
383fail:
384 if (txn) mdb_txn_abort (txn);
385 mdb_env_close (store->env);
386
387 return NULL;
388}
389
390
391static void
392mdbstore_free (void *h)
393{
394 MDBStore *store = h;
395 if (store->flags & LSSTORE_OPEN) {
396 const char *path;
397 mdb_env_get_path (store->env, &path);
398 log_info ("Closing MDB environment at %s.", path);
399 mdb_env_close (store->env);
400 }
401
402 free (store);
403}
404
405
406static VOLK_rc
407mdbstore_stat (const MDBStore *store, MDB_stat *stat)
408{
409 if (!(store->flags & LSSTORE_OPEN)) return 0;
410
411 MDB_txn *txn;
412 RCCK (txn_begin (store->env, NULL, MDB_RDONLY, &txn));
413
414 if (mdb_stat (txn, store->dbi[IDX_SPO_C], stat) != MDB_SUCCESS)
415 return VOLK_DB_ERR;
416 mdb_txn_abort (txn);
417
418 return VOLK_OK;
419}
420
421
422static size_t
423mdbstore_size (const void *h)
424{
425 const MDBStore *store = h;
426 // Size is calculated outside of any pending write txn.
427
428 MDB_stat stat;
429 if (mdbstore_stat (store, &stat) != VOLK_OK) return 0;
430
431 return stat.ms_entries;
432}
433
434
435char *
436mdbstore_id (const void *h)
437{
438 const MDBStore *store = h;
439 const char *path;
440
441 RCNL (mdb_env_get_path (store->env, &path));
442
443 char *id = malloc (strlen (path) + 8);
444 sprintf (id, "file://%s", path);
445
446 return id;
447}
448
449
450static VOLK_rc
451mdbstore_txn_begin (void *h, VOLK_StoreFlags flags, void **th)
452{
453 MDBStore *store = h;
454 flags = flags & VOLK_STORE_TXN_RO ? MDB_RDONLY : 0;
455
456 RCCK (txn_begin (store->env, NULL, flags, (MDB_txn **) th));
457
458 return VOLK_OK;
459}
460
461
462static VOLK_rc
463mdbstore_txn_commit (void *th)
464{
465 RCCK (txn_commit (th));
466
467 return VOLK_OK;
468}
469
470
471static void
472mdbstore_txn_abort (void *th)
473{ mdb_txn_abort (th); }
474
475
476static void *
477mdbiter_txn (void *h)
478{ return ((MDBIterator *) h)->txn; }
479
480
491static void *
492mdbstore_add_init (void *h, const VOLK_Buffer *sc, void *th)
493{
494 MDBStore *store = h;
495 /* An iterator is used here. Some members are a bit misused but it does
496 * its job without having to define a very similar struct.
497 */
498 MDBIterator *it;
499 MALLOC_GUARD (it, NULL);
500
501 it->store = store;
502 it->i = 0;
503
504 CHECK (txn_begin (store->env, th, 0, &it->txn), fail);
505
506 if (sc) {
507 // Store context if it's not the default one.
508 it->luc = VOLK_buffer_hash (sc);
509
510 // Insert t:st for context.
511 //log_debug ("Adding context: %s", sc);
512 it->key.mv_data = &it->luc;
513 it->key.mv_size = KLEN;
514 it->data.mv_data = sc->addr;
515 it->data.mv_size = sc->size;
516
517 int db_rc = mdb_put (
518 it->txn, it->store->dbi[IDX_T_ST],
519 &it->key, &it->data, MDB_NOOVERWRITE);
520 if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
521 log_error (VOLK_strerror (db_rc));
522 mdb_txn_abort (it->txn);
523 return NULL;
524 }
525 } else {
526 log_debug ("No context passed to iterator, using default.");
528 }
529
530 return it;
531
532fail:
533 free (it);
534
535 return NULL;
536}
537
538
539/*
540 * NOTE: at the moment #mdbstore_remove() or another
541 * #mdbstore_init() cannot be called between #mdbstore_add_init and
542 * #mdbstore_add_abort or #mdbstore_add_done. FIXME
543 *
544 */
545static VOLK_rc
546mdbstore_add_iter (void *h, const VOLK_BufferTriple *sspo)
547{
548 if (UNLIKELY (!h)) return VOLK_VALUE_ERR;
549
550 MDBIterator *it = h;
551 int db_rc = VOLK_NOACTION;
553
554 // Add triple terms.
555 for (int i = 0; i < 3; i++) {
556 VOLK_Buffer *st = VOLK_btriple_pos (sspo, i);
557
558 spok[i] = VOLK_buffer_hash (st);
559
560 it->key.mv_data = spok + i;
561 it->key.mv_size = KLEN;
562 it->data.mv_data = st->addr;
563 it->data.mv_size = st->size;
564
565 db_rc = mdb_put(
566 it->txn, it->store->dbi[IDX_T_ST],
567 &it->key, &it->data, MDB_NOOVERWRITE);
568 if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
569 LOG_RC (db_rc);
570 return VOLK_DB_ERR;
571 }
572 }
573
574 log_trace ("Inserting spok: {%x, %x, %x}", spok[0], spok[1], spok[2]);
575 log_trace ("Into context: %x", it->luc);
576
577 // Insert spo:c.
578 it->key.mv_data = spok;
579 it->key.mv_size = TRP_KLEN;
580
581 // In triple mode, data is empty (= NULL_KEY).
582 it->data.mv_data = &it->luc;
583 it->data.mv_size = it->luc == NULL_KEY ? 0 : KLEN;
584
585 db_rc = mdb_put(
586 it->txn, it->store->dbi[IDX_SPO_C],
587 &it->key, &it->data, MDB_NODUPDATA);
588
589 if (db_rc == MDB_KEYEXIST) return VOLK_NOACTION;
590 if (db_rc != MDB_SUCCESS) {
591 log_error (
592 "MDB error while inserting triple: %s", VOLK_strerror(db_rc));
593 return VOLK_DB_ERR;
594 }
595
596 // Index.
597 VOLK_rc rc = index_triple (it->store, OP_ADD, spok, it->luc, it->txn);
598 if (rc == VOLK_OK) it->i++;
599
600 return rc;
601}
602
603
604static VOLK_rc
605mdbstore_add_done (void *h)
606{
607 MDBIterator *it = h;
608 VOLK_rc rc = VOLK_OK;
609 log_debug ("Committing add transaction.");
610
611 if (txn_commit (it->txn) != MDB_SUCCESS) {
612 log_error ("Error committing transaction. Aborting instead.");
613 mdb_txn_abort (it->txn);
614 rc = VOLK_TXN_ERR;
615 }
616
617 free (it);
618
619 RCCK (rc);
620 return rc;
621}
622
623
624static void
625mdbstore_add_abort (void *h)
626{
627 MDBIterator *it = h;
628 mdb_txn_abort (it->txn);
629
630 free (it);
631}
632
633
634static VOLK_rc
635key_to_sterm (
636 MDBStore *store, MDB_txn *txn, const VOLK_Key key, VOLK_Buffer *sterm)
637{
639 int db_rc;
640
641 MDB_val key_v, data_v;
642 key_v.mv_data = (void*)&key;
643 key_v.mv_size = KLEN;
644
645 db_rc = mdb_get (txn, store->dbi[IDX_T_ST], &key_v, &data_v);
646
647 sterm->flags |= VOLK_BUF_BORROWED;
648 if (db_rc == MDB_SUCCESS) {
649 sterm->addr = data_v.mv_data;
650 sterm->size = data_v.mv_size;
651 rc = VOLK_OK;
652 } else if (db_rc == MDB_NOTFOUND) {
653 sterm->addr = NULL;
654 sterm->size = 0;
655 } else rc = VOLK_DB_ERR;
656
657 return rc;
658}
659
660
661static void *
662mdbstore_lookup (
663 void *h, const VOLK_Buffer *ss, const VOLK_Buffer *sp,
664 const VOLK_Buffer *so, const VOLK_Buffer *sc, void *th, size_t *ct)
665{
666 VOLK_TripleKey spok = {
667 VOLK_buffer_hash (ss),
668 VOLK_buffer_hash (sp),
669 VOLK_buffer_hash (so),
670 };
671
672 MDBIterator *it;
673 CALLOC_GUARD (it, NULL);
674
675 it->store = h;
676 it->luc = VOLK_buffer_hash (sc);
677 log_debug ("Lookup context: %x", it->luc);
678
679 if (ct) *ct = 0;
680
681 uint8_t idx0, idx1;
682
683 if (th) it->txn = th;
684 else if (!it->txn) {
685 // Start RO transaction if not in a write txn already.
686 RCNL (txn_begin (it->store->env, NULL, MDB_RDONLY, &it->txn));
687 log_trace ("Opening new lookup transaction @%p", it->txn);
688 it->flags |= ITER_OPEN_TXN;
689 }
690
691 // Context index loop.
692 RCNL (mdb_cursor_open (
693 it->txn, it->store->dbi[IDX_SPO_C], &it->ctx_cur));
694
695 /*
696 * Lookup decision tree.
697 */
698 // s p o (all terms bound)
699 if (spok[0] != NULL_KEY && spok[1] != NULL_KEY && spok[2] != NULL_KEY) {
700 it->luk[0] = spok[0];
701 it->luk[1] = spok[1];
702 it->luk[2] = spok[2];
703 PRCNL (lookup_3bound (it, ct));
704
705 } else if (spok[0] != NULL_KEY) {
706 it->luk[0] = spok[0];
707 idx0 = 0;
708
709 // s p ?
710 if (spok[1] != NULL_KEY) {
711 it->luk[1] = spok[1];
712 idx1 = 1;
713 PRCNL (lookup_2bound (idx0, idx1, it, ct));
714
715 // s ? o
716 } else if (spok[2] != NULL_KEY) {
717 it->luk[1] = spok[2];
718 idx1 = 2;
719 PRCNL (lookup_2bound (idx0, idx1, it, ct));
720
721 // s ? ?
722 } else PRCNL (lookup_1bound (idx0, it, ct));
723
724 } else if (spok[1] != NULL_KEY) {
725 it->luk[0] = spok[1];
726 idx0 = 1;
727
728 // ? p o
729 if (spok[2] != NULL_KEY) {
730 it->luk[1] = spok[2];
731 idx1 = 2;
732 PRCNL (lookup_2bound (idx0, idx1, it, ct));
733
734 // ? p ?
735 } else PRCNL (lookup_1bound (idx0, it, ct));
736
737 // ? ? o
738 } else if (spok[2] != NULL_KEY) {
739 it->luk[0] = spok[2];
740 idx0 = 2;
741 PRCNL (lookup_1bound (idx0, it, ct));
742
743 // ? ? ? (all terms unbound)
744 } else PRCNL (lookup_0bound (it, ct));
745
746 return it;
747}
748
749
755static VOLK_rc
756mdbiter_next_key (MDBIterator *it)
757{
759 // Only advance if the previous it->rc wasn't already at the end.
760 if (it->rc == MDB_NOTFOUND) return VOLK_END;
761 RCCK (it->rc);
762
763 /* Retrieve current value and advance cursor to the next result.
764 * it->rc is set to the result of the next iteration.
765 */
766 it->iter_op_fn (it);
767 MDB_val key, data;
768 int db_rc;
769
770 key.mv_size = TRP_KLEN;
771 data.mv_data = &it->luc;
772 data.mv_size = KLEN;
773
774 VOLK_rc rc;
775 if (it->luc) {
776 rc = VOLK_NORESULT; // Flow control value, will never be returned.
777 do {
778 //log_debug ("begin ctx loop.");
779 /* If ctx is specified, look if the matching triple is associated
780 * with it. If not, move on to the next triple.
781 * The loop normally exits when a triple with matching ctx is found
782 * (VOLK_OK), if there are no more triples (VOLK_END), or if there
783 * is an error (VOLK_DB_ERR).
784 */
785 log_trace (
786 "Found spok: {%x, %x, %x}",
787 it->spok[0], it->spok[1], it->spok[2]);
788
789 key.mv_data = it->spok;
790 db_rc = mdb_cursor_get (it->ctx_cur, &key, &data, MDB_GET_BOTH);
791
792 if (db_rc == MDB_NOTFOUND) {
793 log_trace ("Triple not found in context: %x", it->luc);
794 // if the iterator is at the end, stop here.
795 if (it->rc == MDB_NOTFOUND) rc = VOLK_END;
796 // Otherwise, look up the next key.
797 else it->iter_op_fn (it);
798
799 } else {
800 RCCK (db_rc);
801 rc = VOLK_OK;
802 log_trace ("Triple found in context: %x", it->luc);
803 }
804 } while (rc == VOLK_NORESULT);
805
806 } else {
807 log_trace (
808 "Found spok in any context: {%x, %x, %x}",
809 it->spok[0], it->spok[1], it->spok[2]);
810 rc = VOLK_OK;
811 }
812
813 // Get all contexts for a triple.
814 key.mv_data = it->spok;
815 db_rc = mdb_cursor_get (it->ctx_cur, &key, &data, MDB_SET_KEY);
816 if (db_rc != MDB_SUCCESS) {
817 log_error ("No context found for triple!");
818 return VOLK_DB_ERR;
819 }
820
821 size_t ct;
822 db_rc = mdb_cursor_count (it->ctx_cur, &ct);
823 if (db_rc != MDB_SUCCESS) return VOLK_DB_ERR;
824 // 1 spare for sentinel. Always allocated even on zero matches.
825 VOLK_Key *tmp_ck = realloc (it->ck, sizeof (*it->ck) * (ct + 1));
826 NLRCCK (tmp_ck, VOLK_MEM_ERR);
827 it->ck = tmp_ck;
828
829 size_t i = 0;
830 do {
831 //log_trace ("Copying to slot #%lu @%p", i, it->ck + i);
832 memcpy (it->ck + i++, data.mv_data, sizeof (*it->ck));
833 } while (
834 mdb_cursor_get (it->ctx_cur, &key, &data, MDB_NEXT_DUP)
835 == MDB_SUCCESS);
836 //log_trace ("setting sentinel @%p", it->ck + i);
837 it->ck[i] = NULL_KEY;
838
839 return rc;
840}
841
842
843static VOLK_rc
844mdbiter_next (
845 void *h, VOLK_BufferTriple *sspo, VOLK_Buffer **ctx_p)
846{
847 MDBIterator *it = h;
848 VOLK_rc rc = mdbiter_next_key (it);
849
850 if (rc == VOLK_OK) {
851 if (sspo) {
852 RCCK (key_to_sterm (it->store, it->txn, it->spok[0], sspo->s));
853 RCCK (key_to_sterm (it->store, it->txn, it->spok[1], sspo->p));
854 RCCK (key_to_sterm (it->store, it->txn, it->spok[2], sspo->o));
855 }
856
857 // Contexts for current triple.
858 if (ctx_p) {
859 // Preallocate.
860 size_t i = 0;
861 while (it->ck[i++]); // Include sentinel in count.
862 VOLK_Buffer *ctx;
863 log_trace ("Allocating %lu context buffers + sentinel.", i - 1);
864 ctx = malloc(i * sizeof (*ctx));
865 if (!ctx) return VOLK_MEM_ERR;
866
867 for (i = 0; it->ck[i]; i++)
868 RCCK (key_to_sterm (it->store, it->txn, it->ck[i], ctx + i));
869 memset (ctx + i, 0, sizeof (*ctx)); // Sentinel
870 NLRCCK (ctx + i, VOLK_MEM_ERR);
871
872 // TODO error handling.
873 *ctx_p = ctx;
874 }
875 }
876
877 return rc;
878}
879
880
881static void
882mdbiter_free (void *h)
883{
884 if (!h) return;
885 MDBIterator *it = h;
886
887 if (it->cur) mdb_cursor_close (it->cur);
888 if (it->ctx_cur) mdb_cursor_close (it->ctx_cur);
889 if (it->flags & ITER_OPEN_TXN) mdb_txn_abort (it->txn);
890 free (it->ck);
891
892 free (it);
893}
894
895
896static VOLK_rc
897mdbstore_update_ctx (
898 void *h, const VOLK_Buffer *old_c, const VOLK_Buffer *new_c, void *th)
899{
901 MDBStore *store = h;
902 unsigned char *trp_data = NULL;
903
905 old_ck = VOLK_buffer_hash (old_c),
906 new_ck = VOLK_buffer_hash (new_c);
907 // lu_key, lu_data look up all triples with old context in c:spo, and
908 // replace old c with new c.
909 MDB_txn
910 *p_txn = th,
911 *txn;
912 CHECK (rc = txn_begin (store->env, p_txn, 0, &txn), finally);
913
914 MDB_cursor *i_cur, *d_cur;
915 CHECK (
916 rc = mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &i_cur),
917 close_txn);
918
919 MDB_val key, data;
920
921 // Return error if the graph URI already exists.
922 key.mv_data = &new_ck;
923 key.mv_size = KLEN;
924 rc = mdb_cursor_get (i_cur, &key, &data, MDB_SET);
925 if (rc == MDB_SUCCESS) {
926 log_error (
927 "Context key %lu already exists. Not replacing old graph.",
928 new_ck);
929 rc = VOLK_CONFLICT;
930 goto close_i;
931 } else if (rc != MDB_NOTFOUND) CHECK (rc, close_i);
932
933 // Add new context term.
934 CHECK (rc = mdbstore_add_term (store, new_c, txn), close_i);
935
936 key.mv_data = &old_ck;
937 // Count triples in cursor.
938 rc = mdb_cursor_get (i_cur, &key, &data, MDB_SET);
939 if (rc == MDB_NOTFOUND) {
940 log_info ("No triples found associated with old context.");
941 rc = VOLK_NOACTION;
942 goto close_i;
943 } else CHECK (rc, close_i);
944
945 // From here on, it can only be VOLK_OK or error.
946 rc = VOLK_OK;
947 size_t trp_ct;
948 CHECK (rc = mdb_cursor_count (i_cur, &trp_ct), close_i);
949 trp_data = malloc (trp_ct * TRP_KLEN);
950 if (UNLIKELY (!trp_data)) {
951 rc = VOLK_MEM_ERR;
952 goto close_i;
953 }
954
955 // Copy triple data as one block to temp buffer so that entries can be
956 // deleted while cursors are active.
957 rc = mdb_cursor_get (i_cur, &key, &data, MDB_GET_MULTIPLE);
958 if (rc != MDB_SUCCESS) {
959 rc = rc == MDB_NOTFOUND ? VOLK_NOACTION : VOLK_DB_ERR;
960 goto close_i;
961 }
962 size_t loc_cur = 0;
963 do {
964 memcpy (trp_data + loc_cur, data.mv_data, data.mv_size);
965 loc_cur += data.mv_size;
966 } while (mdb_cursor_get (
967 i_cur, &key, &data, MDB_NEXT_MULTIPLE) == MDB_SUCCESS);
968
969 // Zap c:spo entries in one go.
970 key.mv_data = &old_ck;
971 key.mv_size = KLEN;
972 data.mv_size = TRP_KLEN;
973 CHECK (rc = mdb_cursor_get (i_cur, &key, NULL, MDB_SET), close_i);
974 CHECK (rc = mdb_cursor_del (i_cur, MDB_NODUPDATA), close_i);
975
976 // Re-ad c:spo data individually.
977 key.mv_data = &new_ck;
978 for (size_t i = 0; i < trp_ct; i++) {
979 data.mv_data = trp_data + i * data.mv_size;
980 CHECK (
981 rc = mdb_cursor_put (i_cur, &key, &data, MDB_APPENDDUP),
982 close_i);
983 }
984 // Re-add c:spo data in bulk from buffer with new context.
985 // FIXME this is not working. Replaced by the for loop above.
986 /*
987 MDB_val data_block[] = {
988 { .mv_data = &new_ck, .mv_size = TRP_KLEN },
989 { .mv_data = NULL, .mv_size = trp_ct },
990 };
991 db_rc = mdb_cursor_put (i_cur, &key, data_block, MDB_MULTIPLE);
992 */
993
994 // Main table.
995 // Replace spo:c values one by one.
996 CHECK (rc = mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &d_cur), close_i);
997 key.mv_size = TRP_KLEN;
998 data.mv_size = KLEN;
999 for (size_t i = 0; i < trp_ct; i++) {
1000 key.mv_data = trp_data + i * key.mv_size;
1001 data.mv_data = &old_ck;
1002 CHECK (
1003 rc = mdb_cursor_get (d_cur, &key, &data, MDB_GET_BOTH),
1004 close_d);
1005 CHECK (rc = mdb_cursor_del (d_cur, 0), close_d);
1006 data.mv_data = &new_ck;
1007 CHECK (
1008 rc = mdb_cursor_put (d_cur, &key, &data, MDB_NOOVERWRITE),
1009 close_d);
1010 }
1011
1012close_d:
1013 mdb_cursor_close (d_cur);
1014close_i:
1015 mdb_cursor_close (i_cur);
1016close_txn:
1017 if (rc == VOLK_OK) RCCK (txn_commit (txn));
1018 else mdb_txn_abort (txn);
1019
1020 if (trp_data) free (trp_data);
1021finally:
1022
1023 return rc;
1024}
1025
1026
1027static VOLK_rc
1028mdbstore_remove (
1029 void *h, const VOLK_Buffer *ss, const VOLK_Buffer *sp,
1030 const VOLK_Buffer *so, const VOLK_Buffer *sc, void *th, size_t *ct_p)
1031{
1032 MDBStore *store = h;
1034
1035 MDB_txn *txn;
1036 RCCK (txn_begin (store->env, th, 0, &txn));
1037 MDB_cursor *cur;
1038 mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &cur);
1039
1040 if (sc == NULL) sc = VOLK_default_ctx_buf;
1041 VOLK_Key ck = VOLK_buffer_hash (sc);
1042 MDB_val spok_v, ck_v;
1043 spok_v.mv_size = TRP_KLEN;
1044 ck_v.mv_size = KLEN;
1045 ck_v.mv_data = &ck;
1046
1047 // Gather all the matching triples in a first pass.
1048 size_t *ct = ct_p ? ct_p : malloc (sizeof (*ct));
1049 NLRCCK (ct, VOLK_MEM_ERR);
1050 MDBIterator *it = mdbstore_lookup (store, ss, sp, so, sc, txn, ct);
1051 NLRCCK (it, VOLK_DB_ERR);
1052 log_debug ("Found %lu triples to remove.", *ct);
1053 VOLK_Key *keys = malloc (*ct * sizeof (VOLK_Key) * 3);
1054 NLRCCK (it, VOLK_MEM_ERR);
1055 size_t i = 0;
1056 while (mdbiter_next_key (it) == VOLK_OK) {
1057 log_trace ("Adding triple #%zu to remove list.", i);
1058 memcpy (keys + (3 * i++), &it->spok, TRP_KLEN);
1059 }
1060 mdbiter_free (it);
1061
1062 // Iterate over the gathered keys and delete them.
1063 for (i = 0; i < *ct; i++) {
1064 spok_v.mv_data = keys + i * 3;
1065 ck_v.mv_data = &ck;
1066 log_trace (
1067 "Removing triple #%zu: %x {%x %x %x}",
1068 i,
1069 ((VOLK_Key *)ck_v.mv_data)[0],
1070 ((VOLK_Key *)spok_v.mv_data)[0],
1071 ((VOLK_Key *)spok_v.mv_data)[1],
1072 ((VOLK_Key *)spok_v.mv_data)[2]);
1073
1074 rc = mdb_cursor_get (cur, &spok_v, &ck_v, MDB_GET_BOTH);
1075 if (rc == MDB_NOTFOUND) {
1076 log_warn ("No key found in spo:c DB.");
1077 continue; // TODO This could be a data problem.
1078 } else CHECK (rc, fail);
1079
1080 // Delete spo:c entry.
1081 CHECK (rc = mdb_cursor_del (cur, 0), fail);
1082 CHECK (rc = index_triple (
1083 store, OP_REMOVE, keys + i * 3, ck, txn
1084 ), fail);
1085 }
1086 free(keys);
1087 CHECK (txn_commit (txn), fail);
1088
1089 return rc;
1090
1091fail:
1092 mdb_txn_abort (txn);
1093 RCCK (rc);
1094
1095 return VOLK_DB_ERR;
1096}
1097
1098
1110static VOLK_rc
1111mdbstore_add_term (void *h, const VOLK_Buffer *sterm, void *th)
1112{
1113 //log_trace ("Adding term to MDB store: %s", sterm->addr);
1114 MDBStore *store = h;
1115 int db_rc;
1116 MDB_val key, data;
1117
1118 MDB_txn *txn;
1119 // If an active transaction was passed, use it, otherwise open and
1120 // close a new one.
1121 if (th) txn = th;
1122 else RCCK (txn_begin (store->env, th, 0, &txn));
1123
1124 MDB_cursor *cur;
1125 CHECK (mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur), fail);
1126
1127 VOLK_Key k = VOLK_buffer_hash (sterm);
1128 key.mv_data = &k;
1129 key.mv_size = sizeof (k);
1130
1131 data.mv_data = sterm->addr;
1132 data.mv_size = sterm->size;
1133
1134 db_rc = mdb_cursor_put (cur, &key, &data, MDB_NOOVERWRITE);
1135 if (db_rc != MDB_KEYEXIST) CHECK (db_rc, fail);
1136
1137 if (txn != th) CHECK (db_rc = txn_commit (txn), fail);
1138
1139 return VOLK_OK;
1140
1141fail:
1142 if (txn != th) mdb_txn_abort (txn);
1143 log_trace ("Aborted txn for adding term.");
1144 return VOLK_DB_ERR;
1145}
1146
1147
1148VOLK_Buffer **
1149mdbstore_ctx_list (void *h, void *th)
1150{
1151 MDBStore *store = h;
1152 VOLK_rc db_rc;
1153 MDB_txn *txn;
1154 VOLK_Buffer **tdata = NULL;
1155 if (th) txn = th;
1156 else CHECK (txn_begin (store->env, NULL, MDB_RDONLY, &txn), fail);
1157
1158
1159 MDB_cursor *cur;
1160 CHECK (mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur), fail);
1161 MDB_val key, data;
1162 db_rc = mdb_cursor_get (cur, &key, &data, MDB_FIRST);
1163
1164 size_t i = 0;
1165 while (db_rc == MDB_SUCCESS) {
1166 tdata = realloc (tdata, (i + 1) * sizeof (*tdata));
1167 if (UNLIKELY (!tdata)) goto fail;
1168 tdata[i] = BUF_DUMMY;
1169 VOLK_Key tkey = *(VOLK_Key*)key.mv_data;
1170 CHECK (key_to_sterm (store, txn, tkey, tdata[i]), fail);
1171 db_rc = mdb_cursor_get (cur, &key, &data, MDB_NEXT_NODUP);
1172 i++;
1173 }
1174
1175 tdata = realloc (tdata, i * sizeof (data));
1176 tdata[i] = NULL; // Sentinel
1177 mdb_cursor_close (cur);
1178 if (txn != th) mdb_txn_abort (txn);
1179
1180 return tdata;
1181
1182fail:
1183 if (txn != th) mdb_txn_abort (txn);
1184 if (tdata) free (tdata);
1185 return NULL;
1186}
1187
1188
1190 .name = "MDB Store",
1193
1194 .setup_fn = mdbstore_setup,
1195 .new_fn = mdbstore_new,
1196 .free_fn = mdbstore_free,
1197
1198 .size_fn = mdbstore_size,
1199 .id_fn = mdbstore_id,
1200
1201 .txn_begin_fn = mdbstore_txn_begin,
1202 .txn_commit_fn = mdbstore_txn_commit,
1203 .txn_abort_fn = mdbstore_txn_abort,
1204 .iter_txn_fn = mdbiter_txn,
1205
1206 .add_init_fn = mdbstore_add_init,
1207 .add_iter_fn = mdbstore_add_iter,
1208 .add_abort_fn = mdbstore_add_abort,
1209 .add_done_fn = mdbstore_add_done,
1210 .add_term_fn = mdbstore_add_term,
1211
1212 .update_ctx_fn = mdbstore_update_ctx,
1213
1214 .lookup_fn = mdbstore_lookup,
1215 .lu_next_fn = mdbiter_next,
1216 .lu_free_fn = mdbiter_free,
1217
1218 .remove_fn = mdbstore_remove,
1219
1220 .ctx_list_fn = mdbstore_ctx_list,
1221};
1222
1223
1224/* * * Static functions. * * */
1225
1235static VOLK_rc
1236index_triple(
1237 MDBStore *store, StoreOp op, VOLK_TripleKey spok, VOLK_Key ck,
1238 MDB_txn *txn)
1239{
1240 int db_rc;
1242 MDB_val v1, v2;
1243
1244 log_trace ("Indexing triple: {%x %x %x}", spok[0], spok[1], spok[2]);
1245
1246 // Index c:spo.
1247 if (op == OP_REMOVE) {
1248 log_trace ("Indexing op: REMOVE");
1249 if (ck == NULL_KEY) goto skip_remove;
1250
1251 MDB_cursor *cur;
1252 v1.mv_data = &ck;
1253 v1.mv_size = KLEN;
1254 v2.mv_data = spok;
1255 v2.mv_size = TRP_KLEN;
1256
1257 RCCK (mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur));
1258 db_rc = mdb_cursor_get (cur, &v1, &v2, MDB_GET_BOTH);
1259 if (db_rc != MDB_NOTFOUND) {
1260 RCCK (db_rc);
1261 RCCK (mdb_cursor_del (cur, 0));
1262 v1.mv_data = &ck;
1263 v2.mv_data = spok;
1264 rc = VOLK_OK;
1265 }
1266 mdb_cursor_close (cur);
1267skip_remove:
1268
1269 } else if (op == OP_ADD) {
1270 log_trace ("Indexing op: ADD");
1271 if (ck == NULL_KEY) goto skip_add;
1272
1273 v1.mv_data = &ck;
1274 v1.mv_size = KLEN;
1275 v2.mv_data = spok;
1276 v2.mv_size = TRP_KLEN;
1277
1278 db_rc = mdb_put(
1279 txn, store->dbi[IDX_C_SPO],
1280 &v1, &v2, MDB_NODUPDATA);
1281 if (db_rc != MDB_SUCCESS) return VOLK_DB_ERR;
1282 if (db_rc != MDB_KEYEXIST) rc = VOLK_OK;
1283skip_add:
1284
1285 } else return VOLK_VALUE_ERR;
1286
1287 VOLK_DoubleKey dbl_keys[3] = {
1288 {spok[1], spok[2]}, // po
1289 {spok[0], spok[2]}, // so
1290 {spok[0], spok[1]}, // sp
1291 };
1292
1293 // Add or remove index terms.
1294 v1.mv_size = KLEN;
1295 v2.mv_size = DBL_KLEN;
1296
1297 for (int i = 0; i < 3; i++) {
1298 MDB_dbi
1299 db1 = store->dbi[lookup_indices[i]], // s:po, p:so, o:sp
1300 db2 = store->dbi[lookup_indices[i + 3]]; // po:s, so:p, sp:o
1301
1302 v1.mv_data = spok + i;
1303 v2.mv_data = dbl_keys[i];
1304
1305 if (op == OP_REMOVE) {
1306 // Remove from 1-bound index.
1307 MDB_cursor *cur1;
1308 mdb_cursor_open(txn, store->dbi[lookup_indices[i]], &cur1);
1309 RCCK (db_rc = mdb_cursor_get (cur1, &v1, &v2, MDB_GET_BOTH));
1310 mdb_cursor_del (cur1, 0);
1311 mdb_cursor_close (cur1);
1312
1313 // Restore pointers invalidated after delete.
1314 v1.mv_data = spok + i;
1315 v2.mv_data = dbl_keys[i];
1316
1317 // Remove from 2-bound index.
1318 MDB_cursor *cur2;
1319 mdb_cursor_open(txn, store->dbi[lookup_indices[i + 3]], &cur2);
1320 RCCK (db_rc = mdb_cursor_get (cur2, &v2, &v1, MDB_GET_BOTH));
1321 mdb_cursor_del (cur2, 0);
1322 mdb_cursor_close (cur2);
1323
1324 rc = VOLK_OK;
1325
1326 } else { // OP_ADD is guaranteed.
1327 // Add to 1-bound index.
1328 log_trace ("Indexing in %s: ", db_labels[lookup_indices[i]]);
1329 log_trace (
1330 "%x: %x %x", *(size_t*)(v1.mv_data),
1331 *(size_t*)(v2.mv_data), *(size_t*)(v2.mv_data) + 1);
1332
1333 db_rc = mdb_put (txn, db1, &v1, &v2, MDB_NODUPDATA);
1334
1335 if (db_rc == MDB_SUCCESS) rc = VOLK_OK;
1336 else if (db_rc != MDB_KEYEXIST) return VOLK_DB_ERR;
1337
1338 // Add to 2-bound index.
1339 log_trace ("Indexing in %s: ", db_labels[lookup_indices[i + 3]]);
1340 log_trace (
1341 "%x %x: %x", *(size_t*)(v2.mv_data),
1342 *(size_t*)(v2.mv_data) + 1, *(size_t*)(v1.mv_data));
1343
1344 db_rc = mdb_put (txn, db2, &v2, &v1, MDB_NODUPDATA);
1345
1346 if (db_rc == MDB_SUCCESS) rc = VOLK_OK;
1347 else if (db_rc != MDB_KEYEXIST) return VOLK_DB_ERR;
1348 }
1349 }
1350
1351 return rc;
1352}
1353
1354
1355/* * * Term-specific iterators. * * */
1356
1361inline static void
1362it_next_0bound (MDBIterator *it)
1363{
1364 memcpy (it->spok, it->key.mv_data, sizeof (VOLK_TripleKey));
1365 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT_NODUP);
1366}
1367
1368
1373inline static void
1374it_next_0bound_ctx (MDBIterator *it)
1375{
1376 memcpy (it->spok, it->data.mv_data, sizeof (VOLK_TripleKey));
1377 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT_DUP);
1378}
1379
1380
1387inline static void
1388it_next_1bound (MDBIterator *it)
1389{
1390 VOLK_DoubleKey *lu_dset = it->data.mv_data;
1391
1392 it->spok[it->term_order[0]] = it->luk[0];
1393 it->spok[it->term_order[1]] = lu_dset[it->i][0];
1394 it->spok[it->term_order[2]] = lu_dset[it->i][1];
1395
1396 log_trace (
1397 "Composed triple: {%x %x %x}",
1398 it->spok[0], it->spok[1], it->spok[2]);
1399
1400 // Ensure next block within the same page is not beyond the last.
1401 if (it->i < it->data.mv_size / DBL_KLEN - 1) {
1402 it->i ++;
1403 //log_debug ("Increasing page cursor to %lu.", it->i);
1404 //log_debug ("it->rc: %d", it->rc);
1405
1406 } else {
1407 // If the last block in the page is being yielded,
1408 // move cursor to beginning of next page.
1409 it->i = 0;
1410 //log_debug ("Reset page cursor to %lu.", it->i);
1411 it->rc = mdb_cursor_get (
1412 it->cur, &it->key, &it->data, MDB_NEXT_MULTIPLE);
1413 }
1414}
1415
1416
1423inline static void
1424it_next_2bound (MDBIterator *it)
1425{
1426 VOLK_Key *lu_dset = it->data.mv_data;
1427
1428 it->spok[it->term_order[0]] = it->luk[0];
1429 it->spok[it->term_order[1]] = it->luk[1];
1430 it->spok[it->term_order[2]] = lu_dset[it->i];
1431
1432 // Ensure next block within the same page is not beyond the last.
1433 if (it->i < it->data.mv_size / KLEN - 1)
1434 it->i ++;
1435 else {
1436 // If the last block in the page is being yielded,
1437 // move cursor to beginning of next page.
1438 it->i = 0;
1439 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT_MULTIPLE);
1440 }
1441}
1442
1443
1450inline static void
1451it_next_3bound (MDBIterator *it)
1452{ it->rc = MDB_NOTFOUND; }
1453
1454
1455/* * * Term-specific lookups. * * */
1456
1457inline static VOLK_rc
1458lookup_0bound (MDBIterator *it, size_t *ct)
1459{
1460 log_debug ("Looking up 0 bound terms.");
1461
1462 // Context search looks for all values in c:spo.
1463 if (it->luc != NULL_KEY) {
1464 // Look up by given context.
1465 RCCK (it->rc = mdb_cursor_open (
1466 it->txn, it->store->dbi[IDX_C_SPO], &it->cur));
1467
1468 it->key.mv_data = &it->luc;
1469 it->key.mv_size = KLEN;
1470
1471 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1472 if (ct) {
1473 if (it->rc == MDB_NOTFOUND) *ct = 0;
1474 else {
1475 RCCK (it->rc);
1476 mdb_cursor_count (it->cur, ct);
1477 }
1478 log_debug ("Found %lu triples.", *ct);
1479 }
1480 //it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_FIRST_DUP);
1481 it->iter_op_fn = it_next_0bound_ctx;
1482
1483 // No-context search looks for all keys in spo:c.
1484 } else {
1485 RCCK (it->rc = mdb_cursor_open (
1486 it->txn, it->store->dbi[IDX_SPO_C], &it->cur));
1487 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_FIRST);
1488 if (ct) {
1489 if (it->rc == MDB_NOTFOUND) *ct = 0;
1490 else {
1491 MDB_stat stat;
1492 // s:po and 1- and 2-bound indices have 1 entry per triple.
1493 mdb_stat (it->txn, it->store->dbi[IDX_S_PO], &stat);
1494
1495 *ct = stat.ms_entries;
1496 }
1497 log_debug ("Found %lu triples.", *ct);
1498 }
1499 it->iter_op_fn = it_next_0bound;
1500 }
1501 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1502
1503 return VOLK_OK;
1504}
1505
1506
1507inline static VOLK_rc
1508lookup_1bound (uint8_t idx0, MDBIterator *it, size_t *ct)
1509{
1510 it->term_order = (const uint8_t*)lookup_ordering_1bound[idx0];
1511
1512 log_debug ("Looking up 1 bound term: %x", it->luk[0]);
1513
1514 RCCK (mdb_cursor_open (
1515 it->txn, it->store->dbi[lookup_indices[idx0]], &it->cur));
1516
1517 it->key.mv_data = it->luk;
1518 it->key.mv_size = KLEN;
1519
1520 if (ct) {
1521 // If a context is specified, the only way to count triples matching
1522 // the context is to loop over them.
1523 if (it->luc != NULL_KEY) {
1524 log_debug ("Counting in context: %x", it->luc);
1525 MDBIterator *ct_it;
1526 MALLOC_GUARD (ct_it, VOLK_MEM_ERR);
1527 /*
1528 memcpy (ct_it, it, sizeof (*ct_it));
1529 */
1530
1531 ct_it->store = it->store;
1532 ct_it->txn = it->txn;
1533 ct_it->ctx_cur = it->ctx_cur;
1534 ct_it->key = it->key;
1535 ct_it->data = it->data;
1536 ct_it->ck = NULL;
1537 ct_it->luk[0] = it->luk[0];
1538 ct_it->luc = it->luc;
1539 ct_it->i = 0;
1540
1541 PRCCK (lookup_1bound (idx0, ct_it, NULL));
1542
1543 VOLK_rc db_rc;
1544 while (VOLK_END != (db_rc = mdbiter_next_key (ct_it))) {
1545 PRCCK (db_rc);
1546 (*ct)++;
1547 }
1548
1549 // Free the counter iterator without freeing the shared txn.
1550 if (ct_it->cur) mdb_cursor_close (ct_it->cur);
1551 free (ct_it->ck);
1552 free (ct_it);
1553
1554 } else {
1555 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1556 if (it->rc == MDB_SUCCESS) mdb_cursor_count (it->cur, ct);
1557 }
1558 log_debug ("Found %lu triples.", *ct);
1559 }
1560
1561 it->i = 0;
1562 it->iter_op_fn = it_next_1bound;
1563
1564 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1565 if (it->rc == MDB_SUCCESS)
1566 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_MULTIPLE);
1567 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1568
1569 return VOLK_OK;
1570}
1571
1572
1573inline static VOLK_rc
1574lookup_2bound(uint8_t idx0, uint8_t idx1, MDBIterator *it, size_t *ct)
1575{
1576 uint8_t luk1_offset, luk2_offset;
1577 MDB_dbi dbi = 0;
1578
1579 // Establish lookup ordering with some awkward offset math.
1580 for (int i = 0; i < 3; i++) {
1581 if (
1582 (
1583 idx0 == lookup_ordering_2bound[i][0] &&
1584 idx1 == lookup_ordering_2bound[i][1]
1585 ) || (
1586 idx0 == lookup_ordering_2bound[i][1] &&
1587 idx1 == lookup_ordering_2bound[i][0]
1588 )
1589 ) {
1590 it->term_order = (const uint8_t*)lookup_ordering_2bound[i];
1591 if (it->term_order[0] == idx0) {
1592 luk1_offset = 0;
1593 luk2_offset = 1;
1594 } else {
1595 luk1_offset = 1;
1596 luk2_offset = 0;
1597 }
1598 dbi = it->store->dbi[lookup_indices[i + 3]];
1599 log_debug (
1600 "Looking up 2 bound in %s",
1601 db_labels[lookup_indices[i + 3]]);
1602
1603 break;
1604 }
1605 }
1606
1607 if (dbi == 0) {
1608 log_error (
1609 "Values %d and %d not found in lookup keys.",
1610 idx0, idx1);
1611 return VOLK_VALUE_ERR;
1612 }
1613
1614 // Compose term keys in lookup key.
1615 VOLK_DoubleKey luk;
1616 luk[luk1_offset] = it->luk[0];
1617 luk[luk2_offset] = it->luk[1];
1618
1619 it->key.mv_data = luk;
1620 it->key.mv_size = DBL_KLEN;
1621
1622 mdb_cursor_open (it->txn, dbi, &it->cur);
1623 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1624
1625 if (ct) {
1626 // If a context is specified, the only way to count triples matching
1627 // the context is to loop over them.
1628 if (it->luc != NULL_KEY) {
1629 MDBIterator *ct_it;
1630 MALLOC_GUARD (ct_it, VOLK_MEM_ERR);
1631
1632 ct_it->store = it->store;
1633 ct_it->txn = it->txn;
1634 ct_it->ctx_cur = it->ctx_cur;
1635 ct_it->ck = NULL;
1636 ct_it->luk[0] = it->luk[0];
1637 ct_it->luk[1] = it->luk[1];
1638 ct_it->luc = it->luc;
1639 ct_it->i = 0;
1640
1641 lookup_2bound (idx0, idx1, ct_it, NULL);
1642
1643 while (mdbiter_next_key (ct_it) != VOLK_END) (*ct) ++;
1644
1645 // Free the counter iterator without freeing the shared txn.
1646 if (ct_it->cur) mdb_cursor_close (ct_it->cur);
1647 free (ct_it->ck);
1648 free (ct_it);
1649
1650 } else {
1651 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1652 if (it->rc == MDB_SUCCESS) mdb_cursor_count (it->cur, ct);
1653 }
1654 log_debug ("Found %lu triples.", *ct);
1655 }
1656
1657 it->i = 0;
1658 it->iter_op_fn = it_next_2bound;
1659
1660 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1661 if (it->rc == MDB_SUCCESS)
1662 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_MULTIPLE);
1663 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1664
1665 return VOLK_OK;
1666}
1667
1668
1669inline static VOLK_rc
1670lookup_3bound (MDBIterator *it, size_t *ct)
1671{
1672 log_debug (
1673 "Looking up 3 bound: {%x, %x, %x}",
1674 it->luk[0], it->luk[1], it->luk[2]);
1675
1676 it->key.mv_data = it->luk;
1677
1678 if (it->luc != NULL_KEY) {
1679 it->rc = mdb_cursor_open (
1680 it->txn, it->store->dbi[IDX_SPO_C], &it->cur);
1681
1682 it->key.mv_size = TRP_KLEN;
1683 it->data.mv_data = &it->luc;
1684 it->data.mv_size = KLEN;
1685
1686 } else {
1687 it->rc = mdb_cursor_open (it->txn, it->store->dbi[IDX_S_PO], &it->cur);
1688
1689 it->key.mv_size = KLEN;
1690 it->data.mv_data = it->luk + 1;
1691 it->data.mv_size = DBL_KLEN;
1692 }
1693
1694 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_BOTH);
1695 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1696
1697 mdb_cursor_close (it->cur);
1698 it->cur = NULL;
1699
1700 if (ct && it->rc == MDB_SUCCESS) *ct = 1;
1701
1702 it->iter_op_fn = it_next_3bound;
1703 memcpy (it->spok, it->luk, sizeof (VOLK_TripleKey));
1704
1705 return VOLK_OK;
1706}
1707
1708
#define UNLIKELY(x)
Definition core.h:39
#define NULL_KEY
"NULL" key, a value that is never user-provided.
Definition buffer.h:15
VOLK_Key VOLK_buffer_hash(const VOLK_Buffer *buf)
Hash a buffer.
Definition buffer.h:175
VOLK_Buffer * VOLK_btriple_pos(const VOLK_BufferTriple *trp, VOLK_TriplePos n)
Get serialized triple by term position.
Definition buffer.h:297
VOLK_Buffer * VOLK_default_ctx_buf
Serialized default context.
Definition buffer.c:5
#define BUF_DUMMY
Dummy buffer to be used with VOLK_buffer_init.
Definition buffer.h:154
@ VOLK_BUF_BORROWED
Definition buffer.h:28
#define NULL_TRP
"NULL" triple, a value that is never user-provided.
Definition core.h:66
#define DBL_KLEN
Definition core.h:58
#define KLEN
Definition core.h:57
#define TRP_KLEN
Definition core.h:59
bool VOLK_env_is_init
Whether the environment is initialized.
Definition core.c:11
#define MALLOC_GUARD(var, rc)
Allocate one pointer with malloc and return rc if it fails.
Definition core.h:382
#define RCNL(exp)
Return NULL if exp returns a nonzero value.
Definition core.h:352
#define PRCNL(exp)
Return NULL if exp returns a negative value (=error).
Definition core.h:363
#define CHECK(exp, marker)
Jump to marker if exp does not return VOLK_OK.
Definition core.h:290
#define CALLOC_GUARD(var, rc)
Allocate one pointer with calloc and return rc if it fails.
Definition core.h:388
VOLK_rc rm_r(const char *path)
Remove a directory recursively (POSIX compatible).
Definition core.c:124
#define log_debug(...)
Definition core.h:273
#define RCCK(exp)
Return exp return value if it is of VOLK_rc type and nonzero.
Definition core.h:322
#define NLRCCK(exp, _rc)
Return rc return code if exp is NULL.
Definition core.h:344
VOLK_rc mkdir_p(const char *_path, mode_t mode)
Make recursive directories.
Definition core.c:50
#define LOG_RC(rc)
Log an error or warning for return codes that are not VOLK_OK.
Definition core.h:284
#define PRCCK(exp)
Return exp return value if it is of VOLK_rc type and negative (=error).
Definition core.h:333
#define log_trace(...)
Definition core.h:275
size_t VOLK_Key
Term key, i.e., hash of a serialized term.
Definition core.h:230
VOLK_Key VOLK_TripleKey[3]
Array of three VOLK_Key values, representing a triple.
Definition core.h:234
VOLK_Key VOLK_DoubleKey[2]
Array of two VOLK_Key values.
Definition core.h:232
#define VOLK_VALUE_ERR
An invalid input value was provided.
Definition core.h:129
#define VOLK_MEM_ERR
Memory allocation error.
Definition core.h:144
#define VOLK_NORESULT
No result yielded.
Definition core.h:100
#define VOLK_DB_ERR
Low-level database error.
Definition core.h:135
#define VOLK_CONFLICT
Conflict warning.
Definition core.h:120
#define VOLK_END
Loop end.
Definition core.h:107
#define VOLK_OK
Generic success return code.
Definition core.h:83
#define VOLK_NOACTION
No action taken.
Definition core.h:93
int VOLK_rc
Definition core.h:79
#define VOLK_ENV_ERR
Error while handling environment setup; or environment not initialized.
Definition core.h:158
#define VOLK_TXN_ERR
Error handling a store transaction.
Definition core.h:132
const char * VOLK_strerror(VOLK_rc rc)
Return an error message for a return code.
Definition core.c:196
VOLK_StoreFlags
Store flags passed to various operations.
@ VOLK_STORE_TXN_RO
Start a read-only transaction.
@ VOLK_STORE_TXN
Supports transaction handling.
@ VOLK_STORE_COW
Copy on write.
@ VOLK_STORE_IDX
Store is fully SPO(C)-indexed.
@ VOLK_STORE_PERM
@ VOLK_STORE_CTX
VOLK_Buffer ** mdbstore_ctx_list(void *h, void *th)
Definition store_mdb.c:1149
StoreFlags
Store state flags.
Definition store_mdb.c:31
@ LSSTORE_OPEN
Env is open.
Definition store_mdb.c:32
#define MAIN_TABLE
Definition store_mdb.c:108
char DbLabel[8]
Definition store_mdb.c:27
#define ENV_FILE_MODE
Definition store_mdb.c:21
#define DEFAULT_MAPSIZE
Definition store_mdb.c:15
#define ENV_DIR_MODE
Definition store_mdb.c:20
const VOLK_StoreInt mdbstore_int
MDB store interface.
Definition store_mdb.c:1189
char * mdbstore_id(const void *h)
Definition store_mdb.c:436
#define LOOKUP_TABLE
Definition store_mdb.c:116
void(* iter_op_fn_t)(MDBIterator *it)
Iterator operation.
Definition store_mdb.c:69
#define N_DB
Definition store_mdb.c:6
IterFlags
Iterator state flags.
Definition store_mdb.c:36
@ ITER_OPEN_TXN
A transaction is open.
Definition store_mdb.c:37
StoreOp
Definition store_mdb.c:48
@ OP_ADD
Definition store_mdb.c:49
@ OP_REMOVE
Definition store_mdb.c:50
DBIdx
Definition store_mdb.c:140
LMDB graph store backend.
#define VOLK_MDB_STORE_URN
Default MDB store identifier and location.
Definition store_mdb.h:38
Triple iterator.
Definition store_mdb.c:73
MDB_cursor * ctx_cur
MDB c:spo index cursor.
Definition store_mdb.c:78
VOLK_TripleKey spok
Triple to be populated with match.
Definition store_mdb.c:81
VOLK_Key luk[3]
0รท3 lookup keys.
Definition store_mdb.c:88
MDB_cursor * cur
MDB cursor.
Definition store_mdb.c:77
const uint8_t * term_order
Term order used in 1-2bound look-ups.
Definition store_mdb.c:87
MDBStore * store
MDB store handle.
Definition store_mdb.c:74
IterFlags flags
Iterator flags.
Definition store_mdb.c:75
MDB_txn * txn
MDB transaction.
Definition store_mdb.c:76
VOLK_Key luc
Ctx key to filter by. May be NULL_KEY.
Definition store_mdb.c:89
size_t i
Internal counter for paged lookups.
Definition store_mdb.c:90
iter_op_fn_t iter_op_fn
Function used to look up next match.
Definition store_mdb.c:86
MDB_val key
Internal data handler.
Definition store_mdb.c:79
size_t ct
Definition store_mdb.c:91
int rc
MDB_* return code for the next result.
Definition store_mdb.c:93
MDB_val data
Internal data handler.
Definition store_mdb.c:80
VOLK_Key * ck
Definition store_mdb.c:82
StoreFlags flags
Store state flags.
Definition store_mdb.c:56
MDB_env * env
Environment handle.
Definition store_mdb.c:54
MDB_dbi dbi[N_DB]
DB handles. Refer to DbIdx enum.
Definition store_mdb.c:55
Triple of byte buffers.
Definition buffer.h:60
VOLK_Buffer * o
Definition buffer.h:63
VOLK_Buffer * s
Definition buffer.h:61
VOLK_Buffer * p
Definition buffer.h:62
General-purpose data buffer.
Definition buffer.h:47
VOLK_BufferFlag flags
Definition buffer.h:50
unsigned char * addr
Definition buffer.h:48
size_t size
Definition buffer.h:49
Store interface.