Volksdata 1.0b7
RDF library
Loading...
Searching...
No Matches
store_mdb.c
Go to the documentation of this file.
2
6#define N_DB 9
7
11#if (defined DEBUG || defined TESTING)
12#define DEFAULT_MAPSIZE 1<<24 // 16Mb (limit for Valgrind)
13#elif !(defined __LP64__ || defined __LLP64__) || \
14 defined _WIN32 && !defined _WIN64
15#define DEFAULT_MAPSIZE 1<<31 // 2Gb (limit for 32-bit systems)
16#else
17#define DEFAULT_MAPSIZE 1UL<<40 // 1Tb
18#endif
19
20#define ENV_DIR_MODE 0750
21#define ENV_FILE_MODE 0640
22
23/*
24 * Data types.
25 */
26
27typedef char DbLabel[8];
28typedef struct mdbstore_iter_t MDBIterator;
29
31typedef enum {
32 LSSTORE_OPEN = 1<<0,
34
36typedef enum {
38 //<
39 //< The iterator has begun a new
40 //< transaction on initialization
41 //< which needs to be closed. If
42 //< false, the iterator is using an
43 //< existing transaction which will
44 //< not be closed with
45 //< #mdbiter_free().
46} IterFlags;
47
48typedef enum {
51} StoreOp;
52
53typedef struct mdbstore_t {
54 MDB_env * env;
55 MDB_dbi dbi[N_DB];
57} MDBStore;
58
69typedef void (*iter_op_fn_t)(MDBIterator *it);
70
71
73typedef struct mdbstore_iter_t {
76 MDB_txn * txn;
77 MDB_cursor * cur;
78 MDB_cursor * ctx_cur;
79 MDB_val key;
80 MDB_val data;
87 const uint8_t * term_order;
90 size_t i;
91 size_t ct;
93 int rc;
95
96
97/*
98 * Static variables.
99 */
100
101#define DUPFIXED_MASK MDB_DUPSORT | MDB_DUPFIXED
102
108#define MAIN_TABLE \
109/* #ID pfx #DB label #Flags */ \
110 ENTRY( T_ST, "t:st", 0 ) /* Key to ser. term */ \
111 ENTRY( SPO_C, "spo:c", DUPFIXED_MASK ) /* Triple to context */ \
112
113
116#define LOOKUP_TABLE \
117/* #ID pfx #DB label #Flags */ \
118 ENTRY( S_PO, "s:po", DUPFIXED_MASK ) /* 1-bound lookup */ \
119 ENTRY( P_SO, "p:so", DUPFIXED_MASK ) /* 1-bound lookup */ \
120 ENTRY( O_SP, "o:sp", DUPFIXED_MASK ) /* 1-bound lookup */ \
121 ENTRY( PO_S, "po:s", DUPFIXED_MASK ) /* 2-bound lookup */ \
122 ENTRY( SO_P, "so:p", DUPFIXED_MASK ) /* 2-bound lookup */ \
123 ENTRY( SP_O, "sp:o", DUPFIXED_MASK ) /* 2-bound lookup */ \
124 ENTRY( C_SPO, "c:spo", DUPFIXED_MASK ) /* Context lookup */ \
125
126
129#define ENTRY(a, b, c) static const DbLabel DB_##a = b;
132#undef ENTRY
133
134/*
135 * Numeric index of each DB. Prefixed with IDX_
136 *
137 * These index numbers are referred to in all the arrays defeined below. They
138 * are independent from the LMDB dbi values which are considered opaque here.
139 */
140typedef enum {
141#define ENTRY(a, b, c) IDX_##a,
144#undef ENTRY
145} DBIdx;
146
150static const char *db_labels[N_DB] = {
151#define ENTRY(a, b, c) DB_##a,
154#undef ENTRY
155};
156
157/*
158 * DB flags. These are aligned with the dbi_labels index.
159 */
160static const unsigned int db_flags[N_DB] = {
161#define ENTRY(a, b, c) c,
164#undef ENTRY
165};
166
167/*
168 * 1-bound and 2-bound lookup indices.
169 *
170 * N.B. Only the first 6 (1-bound and 2-bound term lookup) are used.
171 * The others are added just because they belong logically to the lookup table.
172 */
173static DBIdx lookup_indices[9] = {
174#define ENTRY(a, b, c) IDX_##a,
176#undef ENTRY
177};
178
179static const uint8_t lookup_ordering_1bound[3][3] = {
180 {0, 1, 2}, // s:po
181 {1, 0, 2}, // p:so
182 {2, 0, 1}, // o:sp
183};
184
185static const uint8_t lookup_ordering_2bound[3][3] = {
186 {1, 2, 0}, // po:s
187 {0, 2, 1}, // so:p
188 {0, 1, 2}, // sp:o
189};
190
191
192/*
193 * Static prototypes.
194 */
195static int index_triple(
196 MDBStore *store, StoreOp op, VOLK_TripleKey spok, VOLK_Key ck,
197 MDB_txn *txn);
198static VOLK_rc mdbstore_add_term (void *h, const VOLK_Buffer *sterm, void *th);
199
200inline static VOLK_rc lookup_0bound (MDBIterator *it, size_t *ct);
201inline static VOLK_rc lookup_1bound (
202 uint8_t idx0, MDBIterator *it, size_t *ct);
203inline static VOLK_rc lookup_2bound (
204 uint8_t idx0, uint8_t idx1, MDBIterator *it, size_t *ct);
205inline static VOLK_rc lookup_3bound(MDBIterator *it, size_t *ct);
206
207
208static const char *
209mdbstore_path_from_id (const char *id)
210{
211 // Set environment path.
212 if (!id) id = getenv ("VOLK_MDB_STORE_URN");
213 if (!id) {
215 log_info (
216 "`VOLK_MDB_STORE_URN' environment variable is not "
217 "set. The default URN %s has been set as the store ID.", id
218 );
219 } else if (strncmp ("file://", id, 7) != 0) {
220 log_error ("MDB store ID must be in the `file://<abs_path>` format.");
221
222 return NULL;
223 }
224
225 return id + 7;
226}
227
228
229/*
230 * Inliners.
231 */
232
233static inline VOLK_rc
234txn_begin (MDB_env *env, MDB_txn *p, unsigned int f, MDB_txn **tp) {
235 VOLK_rc rc = mdb_txn_begin (env, p, f, tp);
236 const char *path;
237 mdb_env_get_path (env, &path);
238 log_debug (
239 "BEGIN %s transaction %p child of %p in env %s",
240 f == 0 ? "RW" : "RO", *tp, p, path);
241 return rc;
242}
243
244
245static inline VOLK_rc
246txn_commit (MDB_txn *t) {
247 log_debug ("COMMIT transaction %p", t);
248 return mdb_txn_commit (t);
249}
250
251
255
262static VOLK_rc
263mdbstore_setup (const char *id, bool clear)
264{
265 if (!VOLK_env_is_init) return VOLK_ENV_ERR;
266
267 const char *path = mdbstore_path_from_id (id);
268 if (!path) return VOLK_VALUE_ERR;
269
270 // If the directory exists (unless clear == true), do nothing.
271 if (clear) rm_r (path);
272 VOLK_rc rc = mkdir_p (path, ENV_DIR_MODE);
273 log_debug ("Create dir rc: %s", VOLK_strerror (rc));
274 PRCCK (rc);
275
276 // Open a temporary environment and txn to create the DBs.
277 MDB_env *env;
278 RCCK (mdb_env_create (&env));
279
280 RCCK (mdb_env_set_maxdbs (env, N_DB));
281 RCCK (mdb_env_open (env, path, 0, ENV_FILE_MODE));
282 log_debug ("Environment opened at %s.", path);
283
284 MDB_txn *txn;
285 RCCK (txn_begin (env, NULL, 0, &txn));
286 MDB_dbi dbi;
287 for (int i = 0; i < N_DB; i++) {
288 log_trace ("Creating DB %s", db_labels[i]);
289 RCCK (
290 mdb_dbi_open (txn, db_labels[i], db_flags[i] | MDB_CREATE, &dbi)
291 );
292 }
293
294 // Bootstrap the permanent store with initial data.
295 MDB_stat stat;
296 CHECK (mdb_dbi_open (
297 txn, db_labels[IDX_T_ST], db_flags[IDX_T_ST], &dbi), fail);
298 CHECK (mdb_stat (txn, dbi, &stat), fail);
299
300 if (stat.ms_entries == 0) {
301 log_debug ("Loading initial data into %s", path);
302 // Index default context.
303 MDB_cursor *cur;
304 CHECK (mdb_cursor_open (txn, dbi, &cur), fail);
306 MDB_val key, data;
307 key.mv_data = &k;
308 key.mv_size = sizeof (k);
309
310 data.mv_data = VOLK_default_ctx_buf->addr;
311 data.mv_size = VOLK_default_ctx_buf->size;
312
313 VOLK_rc db_rc = mdb_cursor_put (cur, &key, &data, 0);
314 CHECK (db_rc, fail);
315 }
316
317 CHECK (txn_commit (txn), fail);
318 mdb_env_close (env);
319
320 return clear ? VOLK_OK : rc;
321
322fail:
323 if (rc >= 0) rc = VOLK_DB_ERR;
324 mdb_txn_abort (txn);
325 return rc;
326}
327
328
339static void *
340mdbstore_new (const char *id, size_t _unused)
341{
342 if (!VOLK_env_is_init) {
343 log_error (VOLK_strerror (VOLK_ENV_ERR));
344 return NULL;
345 }
346
347 (void) _unused;
348 const char *path = mdbstore_path_from_id (id);
349 if (!path) return NULL;
350
351 MDBStore *store;
352 CALLOC_GUARD (store, NULL);
353
354 RCNL (mdb_env_create (&store->env));
355 MDB_txn *txn = NULL;
356
357 // Set map size.
358 size_t mapsize;
359 char *env_mapsize = getenv ("VOLK_MDB_MAPSIZE");
360 if (env_mapsize == NULL) mapsize = DEFAULT_MAPSIZE;
361 else sscanf (env_mapsize, "%zu", &mapsize);
362 log_debug (
363 "Setting environment map size at %s to %zu Mb.",
364 path, mapsize / 1024 / 1024);
365 CHECK (mdb_env_set_mapsize (store->env, mapsize), fail);
366 CHECK (mdb_env_set_maxdbs (store->env, N_DB), fail);
367 CHECK (mdb_env_open (store->env, path, 0, ENV_FILE_MODE), fail);
368
369 // Assign DB handles to store->dbi.
370 CHECK (txn_begin (store->env, NULL, 0, &txn), fail);
371 for (int i = 0; i < N_DB; i++)
372 CHECK (mdb_dbi_open (
373 txn, db_labels[i], db_flags[i], store->dbi + i), fail);
374
375 store->flags |= LSSTORE_OPEN;
376 CHECK (txn_commit(txn), fail);
377 txn = NULL;
378
379 log_info ("Created MDB environment at %s", path);
380
381 return store;
382
383fail:
384 if (txn) mdb_txn_abort (txn);
385 mdb_env_close (store->env);
386
387 return NULL;
388}
389
390
391static void
392mdbstore_free (void *h)
393{
394 MDBStore *store = h;
395 if (store->flags & LSSTORE_OPEN) {
396 const char *path;
397 mdb_env_get_path (store->env, &path);
398 log_info ("Closing MDB environment at %s.", path);
399 mdb_env_close (store->env);
400 }
401
402 free (store);
403}
404
405
406static VOLK_rc
407mdbstore_stat (const MDBStore *store, MDB_stat *stat)
408{
409 if (!(store->flags & LSSTORE_OPEN)) return 0;
410
411 MDB_txn *txn;
412 RCCK (txn_begin (store->env, NULL, MDB_RDONLY, &txn));
413
414 if (mdb_stat (txn, store->dbi[IDX_SPO_C], stat) != MDB_SUCCESS)
415 return VOLK_DB_ERR;
416 mdb_txn_abort (txn);
417
418 return VOLK_OK;
419}
420
421
422static size_t
423mdbstore_size (const void *h)
424{
425 const MDBStore *store = h;
426 // Size is calculated outside of any pending write txn.
427
428 MDB_stat stat;
429 if (mdbstore_stat (store, &stat) != VOLK_OK) return 0;
430
431 return stat.ms_entries;
432}
433
434
435char *
436mdbstore_id (const void *h)
437{
438 const MDBStore *store = h;
439 const char *path;
440
441 RCNL (mdb_env_get_path (store->env, &path));
442
443 char *id = malloc (strlen (path) + 8);
444 sprintf (id, "file://%s", path);
445
446 return id;
447}
448
449
450static VOLK_rc
451mdbstore_txn_begin (void *h, VOLK_StoreFlags flags, void **th)
452{
453 MDBStore *store = h;
454 flags = flags & VOLK_STORE_TXN_RO ? MDB_RDONLY : 0;
455
456 RCCK (txn_begin (store->env, NULL, flags, (MDB_txn **) th));
457
458 return VOLK_OK;
459}
460
461
462static VOLK_rc
463mdbstore_txn_commit (void *th)
464{
465 RCCK (txn_commit (th));
466
467 return VOLK_OK;
468}
469
470
471static void
472mdbstore_txn_abort (void *th)
473{ mdb_txn_abort (th); }
474
475
476static void *
477mdbiter_txn (void *h)
478{ return ((MDBIterator *) h)->txn; }
479
480
491static void *
492mdbstore_add_init (void *h, const VOLK_Buffer *sc, void *th)
493{
494 MDBStore *store = h;
495 /* An iterator is used here. Some members are a bit misused but it does
496 * its job without having to define a very similar struct.
497 */
498 MDBIterator *it;
499 MALLOC_GUARD (it, NULL);
500
501 it->store = store;
502 it->i = 0;
503
504 CHECK (txn_begin (store->env, th, 0, &it->txn), fail);
505
506 if (sc) {
507 // Store context if it's not the default one.
508 it->luc = VOLK_buffer_hash (sc);
509
510 // Insert t:st for context.
511 //log_debug ("Adding context: %s", sc);
512 it->key.mv_data = &it->luc;
513 it->key.mv_size = KLEN;
514 it->data.mv_data = sc->addr;
515 it->data.mv_size = sc->size;
516
517 int db_rc = mdb_put (
518 it->txn, it->store->dbi[IDX_T_ST],
519 &it->key, &it->data, MDB_NOOVERWRITE);
520 if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
521 log_error (VOLK_strerror (db_rc));
522 mdb_txn_abort (it->txn);
523 return NULL;
524 }
525 } else {
526 log_debug ("No context passed to iterator, using default.");
528 }
529
530 return it;
531
532fail:
533 free (it);
534
535 return NULL;
536}
537
538
539/*
540 * NOTE: at the moment #mdbstore_remove() or another
541 * #mdbstore_init() cannot be called between #mdbstore_add_init and
542 * #mdbstore_add_abort or #mdbstore_add_done. FIXME
543 *
544 */
545static VOLK_rc
546mdbstore_add_iter (void *h, const VOLK_BufferTriple *sspo)
547{
548 if (UNLIKELY (!h)) return VOLK_VALUE_ERR;
549
550 MDBIterator *it = h;
551 int db_rc = VOLK_NOACTION;
553
554 // Add triple terms.
555 for (int i = 0; i < 3; i++) {
556 VOLK_Buffer *st = VOLK_btriple_pos (sspo, i);
557
558 spok[i] = VOLK_buffer_hash (st);
559
560 it->key.mv_data = spok + i;
561 it->key.mv_size = KLEN;
562 it->data.mv_data = st->addr;
563 it->data.mv_size = st->size;
564
565 db_rc = mdb_put(
566 it->txn, it->store->dbi[IDX_T_ST],
567 &it->key, &it->data, MDB_NOOVERWRITE);
568 if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
569 LOG_RC (db_rc);
570 return VOLK_DB_ERR;
571 }
572 }
573
574 log_trace ("Inserting spok: {%x, %x, %x}", spok[0], spok[1], spok[2]);
575 log_trace ("Into context: %x", it->luc);
576
577 // Insert spo:c.
578 it->key.mv_data = spok;
579 it->key.mv_size = TRP_KLEN;
580
581 // In triple mode, data is empty (= NULL_KEY).
582 it->data.mv_data = &it->luc;
583 it->data.mv_size = it->luc == NULL_KEY ? 0 : KLEN;
584
585 db_rc = mdb_put(
586 it->txn, it->store->dbi[IDX_SPO_C],
587 &it->key, &it->data, MDB_NODUPDATA);
588
589 if (db_rc == MDB_KEYEXIST) return VOLK_NOACTION;
590 if (db_rc != MDB_SUCCESS) {
591 log_error (
592 "MDB error while inserting triple: %s", VOLK_strerror(db_rc));
593 return VOLK_DB_ERR;
594 }
595
596 // Index.
597 VOLK_rc rc = index_triple (it->store, OP_ADD, spok, it->luc, it->txn);
598 if (rc == VOLK_OK) it->i++;
599
600 return rc;
601}
602
603
604static VOLK_rc
605mdbstore_add_done (void *h)
606{
607 MDBIterator *it = h;
608 VOLK_rc rc = VOLK_OK;
609 log_debug ("Committing add transaction.");
610
611 if (txn_commit (it->txn) != MDB_SUCCESS) {
612 mdb_txn_abort (it->txn);
613 rc = VOLK_TXN_ERR;
614 }
615
616 free (it);
617
618 RCCK (rc);
619 return rc;
620}
621
622
623static void
624mdbstore_add_abort (void *h)
625{
626 MDBIterator *it = h;
627 mdb_txn_abort (it->txn);
628
629 free (it);
630}
631
632
633static VOLK_rc
634key_to_sterm (
635 MDBStore *store, MDB_txn *txn, const VOLK_Key key, VOLK_Buffer *sterm)
636{
638 int db_rc;
639
640 MDB_val key_v, data_v;
641 key_v.mv_data = (void*)&key;
642 key_v.mv_size = KLEN;
643
644 db_rc = mdb_get (txn, store->dbi[IDX_T_ST], &key_v, &data_v);
645
646 sterm->flags |= VOLK_BUF_BORROWED;
647 if (db_rc == MDB_SUCCESS) {
648 sterm->addr = data_v.mv_data;
649 sterm->size = data_v.mv_size;
650 rc = VOLK_OK;
651 } else if (db_rc == MDB_NOTFOUND) {
652 sterm->addr = NULL;
653 sterm->size = 0;
654 } else rc = VOLK_DB_ERR;
655
656 return rc;
657}
658
659
660static void *
661mdbstore_lookup (
662 void *h, const VOLK_Buffer *ss, const VOLK_Buffer *sp,
663 const VOLK_Buffer *so, const VOLK_Buffer *sc, void *th, size_t *ct)
664{
665 VOLK_TripleKey spok = {
666 VOLK_buffer_hash (ss),
667 VOLK_buffer_hash (sp),
668 VOLK_buffer_hash (so),
669 };
670
671 MDBIterator *it;
672 CALLOC_GUARD (it, NULL);
673
674 it->store = h;
675 it->luc = VOLK_buffer_hash (sc);
676 log_debug ("Lookup context: %x", it->luc);
677
678 if (ct) *ct = 0;
679
680 uint8_t idx0, idx1;
681
682 if (th) it->txn = th;
683 else if (!it->txn) {
684 // Start RO transaction if not in a write txn already.
685 RCNL (txn_begin (it->store->env, NULL, MDB_RDONLY, &it->txn));
686 log_trace ("Opening new lookup transaction @%p", it->txn);
687 it->flags |= ITER_OPEN_TXN;
688 }
689
690 // Context index loop.
691 RCNL (mdb_cursor_open (
692 it->txn, it->store->dbi[IDX_SPO_C], &it->ctx_cur));
693
694 /*
695 * Lookup decision tree.
696 */
697 // s p o (all terms bound)
698 if (spok[0] != NULL_KEY && spok[1] != NULL_KEY && spok[2] != NULL_KEY) {
699 it->luk[0] = spok[0];
700 it->luk[1] = spok[1];
701 it->luk[2] = spok[2];
702 PRCNL (lookup_3bound (it, ct));
703
704 } else if (spok[0] != NULL_KEY) {
705 it->luk[0] = spok[0];
706 idx0 = 0;
707
708 // s p ?
709 if (spok[1] != NULL_KEY) {
710 it->luk[1] = spok[1];
711 idx1 = 1;
712 PRCNL (lookup_2bound (idx0, idx1, it, ct));
713
714 // s ? o
715 } else if (spok[2] != NULL_KEY) {
716 it->luk[1] = spok[2];
717 idx1 = 2;
718 PRCNL (lookup_2bound (idx0, idx1, it, ct));
719
720 // s ? ?
721 } else PRCNL (lookup_1bound (idx0, it, ct));
722
723 } else if (spok[1] != NULL_KEY) {
724 it->luk[0] = spok[1];
725 idx0 = 1;
726
727 // ? p o
728 if (spok[2] != NULL_KEY) {
729 it->luk[1] = spok[2];
730 idx1 = 2;
731 PRCNL (lookup_2bound (idx0, idx1, it, ct));
732
733 // ? p ?
734 } else PRCNL (lookup_1bound (idx0, it, ct));
735
736 // ? ? o
737 } else if (spok[2] != NULL_KEY) {
738 it->luk[0] = spok[2];
739 idx0 = 2;
740 PRCNL (lookup_1bound (idx0, it, ct));
741
742 // ? ? ? (all terms unbound)
743 } else PRCNL (lookup_0bound (it, ct));
744
745 return it;
746}
747
748
754static VOLK_rc
755mdbiter_next_key (MDBIterator *it)
756{
758 // Only advance if the previous it->rc wasn't already at the end.
759 if (it->rc == MDB_NOTFOUND) return VOLK_END;
760 RCCK (it->rc);
761
762 /* Retrieve current value and advance cursor to the next result.
763 * it->rc is set to the result of the next iteration.
764 */
765 it->iter_op_fn (it);
766 MDB_val key, data;
767 int db_rc;
768
769 key.mv_size = TRP_KLEN;
770 data.mv_data = &it->luc;
771 data.mv_size = KLEN;
772
773 VOLK_rc rc;
774 if (it->luc) {
775 rc = VOLK_NORESULT; // Flow control value, will never be returned.
776 do {
777 //log_debug ("begin ctx loop.");
778 /* If ctx is specified, look if the matching triple is associated
779 * with it. If not, move on to the next triple.
780 * The loop normally exits when a triple with matching ctx is found
781 * (VOLK_OK), if there are no more triples (VOLK_END), or if there
782 * is an error (VOLK_DB_ERR).
783 */
784 log_trace (
785 "Found spok: {%x, %x, %x}",
786 it->spok[0], it->spok[1], it->spok[2]);
787
788 key.mv_data = it->spok;
789 db_rc = mdb_cursor_get (it->ctx_cur, &key, &data, MDB_GET_BOTH);
790
791 if (db_rc == MDB_NOTFOUND) {
792 log_trace ("Triple not found in context: %x", it->luc);
793 // if the iterator is at the end, stop here.
794 if (it->rc == MDB_NOTFOUND) rc = VOLK_END;
795 // Otherwise, look up the next key.
796 else it->iter_op_fn (it);
797
798 } else {
799 RCCK (db_rc);
800 rc = VOLK_OK;
801 log_trace ("Triple found in context: %x", it->luc);
802 }
803 } while (rc == VOLK_NORESULT);
804
805 } else {
806 log_trace (
807 "Found spok in any context: {%x, %x, %x}",
808 it->spok[0], it->spok[1], it->spok[2]);
809 rc = VOLK_OK;
810 }
811
812 // Get all contexts for a triple.
813 key.mv_data = it->spok;
814 db_rc = mdb_cursor_get (it->ctx_cur, &key, &data, MDB_SET_KEY);
815 if (db_rc != MDB_SUCCESS) {
816 log_error ("No context found for triple!");
817 return VOLK_DB_ERR;
818 }
819
820 size_t ct;
821 db_rc = mdb_cursor_count (it->ctx_cur, &ct);
822 if (db_rc != MDB_SUCCESS) return VOLK_DB_ERR;
823 // 1 spare for sentinel. Always allocated even on zero matches.
824 VOLK_Key *tmp_ck = realloc (it->ck, sizeof (*it->ck) * (ct + 1));
825 NLRCCK (tmp_ck, VOLK_MEM_ERR);
826 it->ck = tmp_ck;
827
828 size_t i = 0;
829 do {
830 //log_trace ("Copying to slot #%lu @%p", i, it->ck + i);
831 memcpy (it->ck + i++, data.mv_data, sizeof (*it->ck));
832 } while (
833 mdb_cursor_get (it->ctx_cur, &key, &data, MDB_NEXT_DUP)
834 == MDB_SUCCESS);
835 //log_trace ("setting sentinel @%p", it->ck + i);
836 it->ck[i] = NULL_KEY;
837
838 return rc;
839}
840
841
842static VOLK_rc
843mdbiter_next (
844 void *h, VOLK_BufferTriple *sspo, VOLK_Buffer **ctx_p)
845{
846 MDBIterator *it = h;
847 VOLK_rc rc = mdbiter_next_key (it);
848
849 if (rc == VOLK_OK) {
850 if (sspo) {
851 RCCK (key_to_sterm (it->store, it->txn, it->spok[0], sspo->s));
852 RCCK (key_to_sterm (it->store, it->txn, it->spok[1], sspo->p));
853 RCCK (key_to_sterm (it->store, it->txn, it->spok[2], sspo->o));
854 }
855
856 // Contexts for current triple.
857 if (ctx_p) {
858 // Preallocate.
859 size_t i = 0;
860 while (it->ck[i++]); // Include sentinel in count.
861 VOLK_Buffer *ctx;
862 log_trace ("Allocating %lu context buffers + sentinel.", i - 1);
863 ctx = malloc(i * sizeof (*ctx));
864 if (!ctx) return VOLK_MEM_ERR;
865
866 for (i = 0; it->ck[i]; i++)
867 RCCK (key_to_sterm (it->store, it->txn, it->ck[i], ctx + i));
868 memset (ctx + i, 0, sizeof (*ctx)); // Sentinel
869 NLRCCK (ctx + i, VOLK_MEM_ERR);
870
871 // TODO error handling.
872 *ctx_p = ctx;
873 }
874 }
875
876 return rc;
877}
878
879
880static void
881mdbiter_free (void *h)
882{
883 if (!h) return;
884 MDBIterator *it = h;
885
886 if (it->cur) mdb_cursor_close (it->cur);
887 if (it->ctx_cur) mdb_cursor_close (it->ctx_cur);
888 if (it->flags & ITER_OPEN_TXN) mdb_txn_abort (it->txn);
889 free (it->ck);
890
891 free (it);
892}
893
894
895static VOLK_rc
896mdbstore_update_ctx (
897 void *h, const VOLK_Buffer *old_c, const VOLK_Buffer *new_c, void *th)
898{
900 MDBStore *store = h;
901 unsigned char *trp_data = NULL;
902
904 old_ck = VOLK_buffer_hash (old_c),
905 new_ck = VOLK_buffer_hash (new_c);
906 // lu_key, lu_data look up all triples with old context in c:spo, and
907 // replace old c with new c.
908 MDB_txn
909 *p_txn = th,
910 *txn;
911 CHECK (rc = txn_begin (store->env, p_txn, 0, &txn), finally);
912
913 MDB_cursor *i_cur, *d_cur;
914 CHECK (
915 rc = mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &i_cur),
916 close_txn);
917
918 MDB_val key, data;
919
920 // Return error if the graph URI already exists.
921 key.mv_data = &new_ck;
922 key.mv_size = KLEN;
923 rc = mdb_cursor_get (i_cur, &key, &data, MDB_SET);
924 if (rc == MDB_SUCCESS) {
925 log_error (
926 "Context key %lu already exists. Not replacing old graph.",
927 new_ck);
928 rc = VOLK_CONFLICT;
929 goto close_i;
930 } else if (rc != MDB_NOTFOUND) CHECK (rc, close_i);
931
932 // Add new context term.
933 CHECK (rc = mdbstore_add_term (store, new_c, txn), close_i);
934
935 key.mv_data = &old_ck;
936 // Count triples in cursor.
937 rc = mdb_cursor_get (i_cur, &key, &data, MDB_SET);
938 if (rc == MDB_NOTFOUND) {
939 log_info ("No triples found associated with old context.");
940 rc = VOLK_NOACTION;
941 goto close_i;
942 } else CHECK (rc, close_i);
943
944 // From here on, it can only be VOLK_OK or error.
945 rc = VOLK_OK;
946 size_t trp_ct;
947 CHECK (rc = mdb_cursor_count (i_cur, &trp_ct), close_i);
948 trp_data = malloc (trp_ct * TRP_KLEN);
949 if (UNLIKELY (!trp_data)) {
950 rc = VOLK_MEM_ERR;
951 goto close_i;
952 }
953
954 // Copy triple data as one block to temp buffer so that entries can be
955 // deleted while cursors are active.
956 rc = mdb_cursor_get (i_cur, &key, &data, MDB_GET_MULTIPLE);
957 if (rc != MDB_SUCCESS) {
958 rc = rc == MDB_NOTFOUND ? VOLK_NOACTION : VOLK_DB_ERR;
959 goto close_i;
960 }
961 size_t loc_cur = 0;
962 do {
963 memcpy (trp_data + loc_cur, data.mv_data, data.mv_size);
964 loc_cur += data.mv_size;
965 } while (mdb_cursor_get (
966 i_cur, &key, &data, MDB_NEXT_MULTIPLE) == MDB_SUCCESS);
967
968 // Zap c:spo entries in one go.
969 key.mv_data = &old_ck;
970 key.mv_size = KLEN;
971 data.mv_size = TRP_KLEN;
972 CHECK (rc = mdb_cursor_get (i_cur, &key, NULL, MDB_SET), close_i);
973 CHECK (rc = mdb_cursor_del (i_cur, MDB_NODUPDATA), close_i);
974
975 // Re-ad c:spo data individually.
976 key.mv_data = &new_ck;
977 for (size_t i = 0; i < trp_ct; i++) {
978 data.mv_data = trp_data + i * data.mv_size;
979 CHECK (
980 rc = mdb_cursor_put (i_cur, &key, &data, MDB_APPENDDUP),
981 close_i);
982 }
983 // Re-add c:spo data in bulk from buffer with new context.
984 // FIXME this is not working. Replaced by the for loop above.
985 /*
986 MDB_val data_block[] = {
987 { .mv_data = &new_ck, .mv_size = TRP_KLEN },
988 { .mv_data = NULL, .mv_size = trp_ct },
989 };
990 db_rc = mdb_cursor_put (i_cur, &key, data_block, MDB_MULTIPLE);
991 */
992
993 // Main table.
994 // Replace spo:c values one by one.
995 CHECK (rc = mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &d_cur), close_i);
996 key.mv_size = TRP_KLEN;
997 data.mv_size = KLEN;
998 for (size_t i = 0; i < trp_ct; i++) {
999 key.mv_data = trp_data + i * key.mv_size;
1000 data.mv_data = &old_ck;
1001 CHECK (
1002 rc = mdb_cursor_get (d_cur, &key, &data, MDB_GET_BOTH),
1003 close_d);
1004 CHECK (rc = mdb_cursor_del (d_cur, 0), close_d);
1005 data.mv_data = &new_ck;
1006 CHECK (
1007 rc = mdb_cursor_put (d_cur, &key, &data, MDB_NOOVERWRITE),
1008 close_d);
1009 }
1010
1011close_d:
1012 mdb_cursor_close (d_cur);
1013close_i:
1014 mdb_cursor_close (i_cur);
1015close_txn:
1016 if (rc == VOLK_OK) RCCK (txn_commit (txn));
1017 else mdb_txn_abort (txn);
1018
1019 if (trp_data) free (trp_data);
1020finally:
1021
1022 return rc;
1023}
1024
1025
1026static VOLK_rc
1027mdbstore_remove (
1028 void *h, const VOLK_Buffer *ss, const VOLK_Buffer *sp,
1029 const VOLK_Buffer *so, const VOLK_Buffer *sc, void *th, size_t *ct_p)
1030{
1031 MDBStore *store = h;
1033
1034 MDB_txn *txn;
1035 RCCK (txn_begin (store->env, th, 0, &txn));
1036 MDB_cursor *cur;
1037 mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &cur);
1038
1039 if (sc == NULL) sc = VOLK_default_ctx_buf;
1040 VOLK_Key ck = VOLK_buffer_hash (sc);
1041 MDB_val spok_v, ck_v;
1042 spok_v.mv_size = TRP_KLEN;
1043 ck_v.mv_size = KLEN;
1044 ck_v.mv_data = &ck;
1045
1046 // Gather all the matching triples in a first pass.
1047 size_t *ct = ct_p ? ct_p : malloc (sizeof (*ct));
1048 NLRCCK (ct, VOLK_MEM_ERR);
1049 MDBIterator *it = mdbstore_lookup (store, ss, sp, so, sc, txn, ct);
1050 NLRCCK (it, VOLK_DB_ERR);
1051 log_debug ("Found %lu triples to remove.", *ct);
1052 VOLK_Key *keys = malloc (*ct * sizeof (VOLK_Key) * 3);
1053 NLRCCK (it, VOLK_MEM_ERR);
1054 size_t i = 0;
1055 while (mdbiter_next_key (it) == VOLK_OK) {
1056 log_trace ("Adding triple #%zu to remove list.", i);
1057 memcpy (keys + (3 * i++), &it->spok, TRP_KLEN);
1058 }
1059 mdbiter_free (it);
1060
1061 // Iterate over the gathered keys and delete them.
1062 for (i = 0; i < *ct; i++) {
1063 spok_v.mv_data = keys + i * 3;
1064 ck_v.mv_data = &ck;
1065 log_trace (
1066 "Removing triple #%zu: %x {%x %x %x}",
1067 i,
1068 ((VOLK_Key *)ck_v.mv_data)[0],
1069 ((VOLK_Key *)spok_v.mv_data)[0],
1070 ((VOLK_Key *)spok_v.mv_data)[1],
1071 ((VOLK_Key *)spok_v.mv_data)[2]);
1072
1073 rc = mdb_cursor_get (cur, &spok_v, &ck_v, MDB_GET_BOTH);
1074 if (rc == MDB_NOTFOUND) {
1075 log_warn ("No key found in spo:c DB.");
1076 continue; // TODO This could be a data problem.
1077 } else CHECK (rc, fail);
1078
1079 // Delete spo:c entry.
1080 CHECK (rc = mdb_cursor_del (cur, 0), fail);
1081 CHECK (rc = index_triple (
1082 store, OP_REMOVE, keys + i * 3, ck, txn
1083 ), fail);
1084 }
1085 free(keys);
1086 CHECK (txn_commit (txn), fail);
1087
1088 return rc;
1089
1090fail:
1091 mdb_txn_abort (txn);
1092 RCCK (rc);
1093
1094 return VOLK_DB_ERR;
1095}
1096
1097
1109static VOLK_rc
1110mdbstore_add_term (void *h, const VOLK_Buffer *sterm, void *th)
1111{
1112 //log_trace ("Adding term to MDB store: %s", sterm->addr);
1113 MDBStore *store = h;
1114 int db_rc;
1115 MDB_val key, data;
1116
1117 MDB_txn *txn;
1118 // If an active transaction was passed, use it, otherwise open and
1119 // close a new one.
1120 if (th) txn = th;
1121 else RCCK (txn_begin (store->env, th, 0, &txn));
1122
1123 MDB_cursor *cur;
1124 CHECK (mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur), fail);
1125
1126 VOLK_Key k = VOLK_buffer_hash (sterm);
1127 key.mv_data = &k;
1128 key.mv_size = sizeof (k);
1129
1130 data.mv_data = sterm->addr;
1131 data.mv_size = sterm->size;
1132
1133 db_rc = mdb_cursor_put (cur, &key, &data, MDB_NOOVERWRITE);
1134 if (db_rc != MDB_KEYEXIST) CHECK (db_rc, fail);
1135
1136 if (txn != th) CHECK (db_rc = txn_commit (txn), fail);
1137
1138 return VOLK_OK;
1139
1140fail:
1141 if (txn != th) mdb_txn_abort (txn);
1142 log_trace ("Aborted txn for adding term.");
1143 return VOLK_DB_ERR;
1144}
1145
1146
1147VOLK_Buffer **
1148mdbstore_ctx_list (void *h, void *th)
1149{
1150 MDBStore *store = h;
1151 VOLK_rc db_rc;
1152 MDB_txn *txn;
1153 VOLK_Buffer **tdata = NULL;
1154 if (th) txn = th;
1155 else CHECK (txn_begin (store->env, NULL, MDB_RDONLY, &txn), fail);
1156
1157
1158 MDB_cursor *cur;
1159 CHECK (mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur), fail);
1160 MDB_val key, data;
1161 db_rc = mdb_cursor_get (cur, &key, &data, MDB_FIRST);
1162
1163 size_t i = 0;
1164 while (db_rc == MDB_SUCCESS) {
1165 tdata = realloc (tdata, (i + 1) * sizeof (*tdata));
1166 if (UNLIKELY (!tdata)) goto fail;
1167 tdata[i] = BUF_DUMMY;
1168 VOLK_Key tkey = *(VOLK_Key*)key.mv_data;
1169 CHECK (key_to_sterm (store, txn, tkey, tdata[i]), fail);
1170 db_rc = mdb_cursor_get (cur, &key, &data, MDB_NEXT_NODUP);
1171 i++;
1172 }
1173
1174 tdata = realloc (tdata, i * sizeof (data));
1175 tdata[i] = NULL; // Sentinel
1176 mdb_cursor_close (cur);
1177 if (txn != th) mdb_txn_abort (txn);
1178
1179 return tdata;
1180
1181fail:
1182 if (txn != th) mdb_txn_abort (txn);
1183 if (tdata) free (tdata);
1184 return NULL;
1185}
1186
1187
1189 .name = "MDB Store",
1192
1193 .setup_fn = mdbstore_setup,
1194 .new_fn = mdbstore_new,
1195 .free_fn = mdbstore_free,
1196
1197 .size_fn = mdbstore_size,
1198 .id_fn = mdbstore_id,
1199
1200 .txn_begin_fn = mdbstore_txn_begin,
1201 .txn_commit_fn = mdbstore_txn_commit,
1202 .txn_abort_fn = mdbstore_txn_abort,
1203 .iter_txn_fn = mdbiter_txn,
1204
1205 .add_init_fn = mdbstore_add_init,
1206 .add_iter_fn = mdbstore_add_iter,
1207 .add_abort_fn = mdbstore_add_abort,
1208 .add_done_fn = mdbstore_add_done,
1209 .add_term_fn = mdbstore_add_term,
1210
1211 .update_ctx_fn = mdbstore_update_ctx,
1212
1213 .lookup_fn = mdbstore_lookup,
1214 .lu_next_fn = mdbiter_next,
1215 .lu_free_fn = mdbiter_free,
1216
1217 .remove_fn = mdbstore_remove,
1218
1219 .ctx_list_fn = mdbstore_ctx_list,
1220};
1221
1222
1223/* * * Static functions. * * */
1224
1234static VOLK_rc
1235index_triple(
1236 MDBStore *store, StoreOp op, VOLK_TripleKey spok, VOLK_Key ck,
1237 MDB_txn *txn)
1238{
1239 int db_rc;
1241 MDB_val v1, v2;
1242
1243 log_trace ("Indexing triple: {%x %x %x}", spok[0], spok[1], spok[2]);
1244
1245 // Index c:spo.
1246 if (op == OP_REMOVE) {
1247 log_trace ("Indexing op: REMOVE");
1248 if (ck == NULL_KEY) goto skip_remove;
1249
1250 MDB_cursor *cur;
1251 v1.mv_data = &ck;
1252 v1.mv_size = KLEN;
1253 v2.mv_data = spok;
1254 v2.mv_size = TRP_KLEN;
1255
1256 RCCK (mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur));
1257 db_rc = mdb_cursor_get (cur, &v1, &v2, MDB_GET_BOTH);
1258 if (db_rc != MDB_NOTFOUND) {
1259 RCCK (db_rc);
1260 RCCK (mdb_cursor_del (cur, 0));
1261 v1.mv_data = &ck;
1262 v2.mv_data = spok;
1263 rc = VOLK_OK;
1264 }
1265 mdb_cursor_close (cur);
1266skip_remove:
1267
1268 } else if (op == OP_ADD) {
1269 log_trace ("Indexing op: ADD");
1270 if (ck == NULL_KEY) goto skip_add;
1271
1272 v1.mv_data = &ck;
1273 v1.mv_size = KLEN;
1274 v2.mv_data = spok;
1275 v2.mv_size = TRP_KLEN;
1276
1277 db_rc = mdb_put(
1278 txn, store->dbi[IDX_C_SPO],
1279 &v1, &v2, MDB_NODUPDATA);
1280 if (db_rc != MDB_SUCCESS) return VOLK_DB_ERR;
1281 if (db_rc != MDB_KEYEXIST) rc = VOLK_OK;
1282skip_add:
1283
1284 } else return VOLK_VALUE_ERR;
1285
1286 VOLK_DoubleKey dbl_keys[3] = {
1287 {spok[1], spok[2]}, // po
1288 {spok[0], spok[2]}, // so
1289 {spok[0], spok[1]}, // sp
1290 };
1291
1292 // Add or remove index terms.
1293 v1.mv_size = KLEN;
1294 v2.mv_size = DBL_KLEN;
1295
1296 for (int i = 0; i < 3; i++) {
1297 MDB_dbi
1298 db1 = store->dbi[lookup_indices[i]], // s:po, p:so, o:sp
1299 db2 = store->dbi[lookup_indices[i + 3]]; // po:s, so:p, sp:o
1300
1301 v1.mv_data = spok + i;
1302 v2.mv_data = dbl_keys[i];
1303
1304 if (op == OP_REMOVE) {
1305 // Remove from 1-bound index.
1306 MDB_cursor *cur1;
1307 mdb_cursor_open(txn, store->dbi[lookup_indices[i]], &cur1);
1308 RCCK (db_rc = mdb_cursor_get (cur1, &v1, &v2, MDB_GET_BOTH));
1309 mdb_cursor_del (cur1, 0);
1310 mdb_cursor_close (cur1);
1311
1312 // Restore pointers invalidated after delete.
1313 v1.mv_data = spok + i;
1314 v2.mv_data = dbl_keys[i];
1315
1316 // Remove from 2-bound index.
1317 MDB_cursor *cur2;
1318 mdb_cursor_open(txn, store->dbi[lookup_indices[i + 3]], &cur2);
1319 RCCK (db_rc = mdb_cursor_get (cur2, &v2, &v1, MDB_GET_BOTH));
1320 mdb_cursor_del (cur2, 0);
1321 mdb_cursor_close (cur2);
1322
1323 rc = VOLK_OK;
1324
1325 } else { // OP_ADD is guaranteed.
1326 // Add to 1-bound index.
1327 log_trace ("Indexing in %s: ", db_labels[lookup_indices[i]]);
1328 log_trace (
1329 "%x: %x %x", *(size_t*)(v1.mv_data),
1330 *(size_t*)(v2.mv_data), *(size_t*)(v2.mv_data) + 1);
1331
1332 db_rc = mdb_put (txn, db1, &v1, &v2, MDB_NODUPDATA);
1333
1334 if (db_rc == MDB_SUCCESS) rc = VOLK_OK;
1335 else if (db_rc != MDB_KEYEXIST) return VOLK_DB_ERR;
1336
1337 // Add to 2-bound index.
1338 log_trace ("Indexing in %s: ", db_labels[lookup_indices[i + 3]]);
1339 log_trace (
1340 "%x %x: %x", *(size_t*)(v2.mv_data),
1341 *(size_t*)(v2.mv_data) + 1, *(size_t*)(v1.mv_data));
1342
1343 db_rc = mdb_put (txn, db2, &v2, &v1, MDB_NODUPDATA);
1344
1345 if (db_rc == MDB_SUCCESS) rc = VOLK_OK;
1346 else if (db_rc != MDB_KEYEXIST) return VOLK_DB_ERR;
1347 }
1348 }
1349
1350 return rc;
1351}
1352
1353
1354/* * * Term-specific iterators. * * */
1355
1360inline static void
1361it_next_0bound (MDBIterator *it)
1362{
1363 memcpy (it->spok, it->key.mv_data, sizeof (VOLK_TripleKey));
1364 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT_NODUP);
1365}
1366
1367
1372inline static void
1373it_next_0bound_ctx (MDBIterator *it)
1374{
1375 memcpy (it->spok, it->data.mv_data, sizeof (VOLK_TripleKey));
1376 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT_DUP);
1377}
1378
1379
1386inline static void
1387it_next_1bound (MDBIterator *it)
1388{
1389 VOLK_DoubleKey *lu_dset = it->data.mv_data;
1390
1391 it->spok[it->term_order[0]] = it->luk[0];
1392 it->spok[it->term_order[1]] = lu_dset[it->i][0];
1393 it->spok[it->term_order[2]] = lu_dset[it->i][1];
1394
1395 log_trace (
1396 "Composed triple: {%x %x %x}",
1397 it->spok[0], it->spok[1], it->spok[2]);
1398
1399 // Ensure next block within the same page is not beyond the last.
1400 if (it->i < it->data.mv_size / DBL_KLEN - 1) {
1401 it->i ++;
1402 //log_debug ("Increasing page cursor to %lu.", it->i);
1403 //log_debug ("it->rc: %d", it->rc);
1404
1405 } else {
1406 // If the last block in the page is being yielded,
1407 // move cursor to beginning of next page.
1408 it->i = 0;
1409 //log_debug ("Reset page cursor to %lu.", it->i);
1410 it->rc = mdb_cursor_get (
1411 it->cur, &it->key, &it->data, MDB_NEXT_MULTIPLE);
1412 }
1413}
1414
1415
1422inline static void
1423it_next_2bound (MDBIterator *it)
1424{
1425 VOLK_Key *lu_dset = it->data.mv_data;
1426
1427 it->spok[it->term_order[0]] = it->luk[0];
1428 it->spok[it->term_order[1]] = it->luk[1];
1429 it->spok[it->term_order[2]] = lu_dset[it->i];
1430
1431 // Ensure next block within the same page is not beyond the last.
1432 if (it->i < it->data.mv_size / KLEN - 1)
1433 it->i ++;
1434 else {
1435 // If the last block in the page is being yielded,
1436 // move cursor to beginning of next page.
1437 it->i = 0;
1438 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT_MULTIPLE);
1439 }
1440}
1441
1442
1449inline static void
1450it_next_3bound (MDBIterator *it)
1451{ it->rc = MDB_NOTFOUND; }
1452
1453
1454/* * * Term-specific lookups. * * */
1455
1456inline static VOLK_rc
1457lookup_0bound (MDBIterator *it, size_t *ct)
1458{
1459 log_debug ("Looking up 0 bound terms.");
1460
1461 // Context search looks for all values in c:spo.
1462 if (it->luc != NULL_KEY) {
1463 // Look up by given context.
1464 RCCK (it->rc = mdb_cursor_open (
1465 it->txn, it->store->dbi[IDX_C_SPO], &it->cur));
1466
1467 it->key.mv_data = &it->luc;
1468 it->key.mv_size = KLEN;
1469
1470 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1471 if (ct) {
1472 if (it->rc == MDB_NOTFOUND) *ct = 0;
1473 else {
1474 RCCK (it->rc);
1475 mdb_cursor_count (it->cur, ct);
1476 }
1477 log_debug ("Found %lu triples.", *ct);
1478 }
1479 //it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_FIRST_DUP);
1480 it->iter_op_fn = it_next_0bound_ctx;
1481
1482 // No-context search looks for all keys in spo:c.
1483 } else {
1484 RCCK (it->rc = mdb_cursor_open (
1485 it->txn, it->store->dbi[IDX_SPO_C], &it->cur));
1486 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_FIRST);
1487 if (ct) {
1488 if (it->rc == MDB_NOTFOUND) *ct = 0;
1489 else {
1490 MDB_stat stat;
1491 // s:po and 1- and 2-bound indices have 1 entry per triple.
1492 mdb_stat (it->txn, it->store->dbi[IDX_S_PO], &stat);
1493
1494 *ct = stat.ms_entries;
1495 }
1496 log_debug ("Found %lu triples.", *ct);
1497 }
1498 it->iter_op_fn = it_next_0bound;
1499 }
1500 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1501
1502 return VOLK_OK;
1503}
1504
1505
1506inline static VOLK_rc
1507lookup_1bound (uint8_t idx0, MDBIterator *it, size_t *ct)
1508{
1509 it->term_order = (const uint8_t*)lookup_ordering_1bound[idx0];
1510
1511 log_debug ("Looking up 1 bound term: %x", it->luk[0]);
1512
1513 RCCK (mdb_cursor_open (
1514 it->txn, it->store->dbi[lookup_indices[idx0]], &it->cur));
1515
1516 it->key.mv_data = it->luk;
1517 it->key.mv_size = KLEN;
1518
1519 if (ct) {
1520 // If a context is specified, the only way to count triples matching
1521 // the context is to loop over them.
1522 if (it->luc != NULL_KEY) {
1523 log_debug ("Counting in context: %x", it->luc);
1524 MDBIterator *ct_it;
1525 MALLOC_GUARD (ct_it, VOLK_MEM_ERR);
1526 /*
1527 memcpy (ct_it, it, sizeof (*ct_it));
1528 */
1529
1530 ct_it->store = it->store;
1531 ct_it->txn = it->txn;
1532 ct_it->ctx_cur = it->ctx_cur;
1533 ct_it->key = it->key;
1534 ct_it->data = it->data;
1535 ct_it->ck = NULL;
1536 ct_it->luk[0] = it->luk[0];
1537 ct_it->luc = it->luc;
1538 ct_it->i = 0;
1539
1540 PRCCK (lookup_1bound (idx0, ct_it, NULL));
1541
1542 VOLK_rc db_rc;
1543 while (VOLK_END != (db_rc = mdbiter_next_key (ct_it))) {
1544 PRCCK (db_rc);
1545 (*ct)++;
1546 }
1547
1548 // Free the counter iterator without freeing the shared txn.
1549 if (ct_it->cur) mdb_cursor_close (ct_it->cur);
1550 free (ct_it->ck);
1551 free (ct_it);
1552
1553 } else {
1554 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1555 if (it->rc == MDB_SUCCESS) mdb_cursor_count (it->cur, ct);
1556 }
1557 log_debug ("Found %lu triples.", *ct);
1558 }
1559
1560 it->i = 0;
1561 it->iter_op_fn = it_next_1bound;
1562
1563 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1564 if (it->rc == MDB_SUCCESS)
1565 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_MULTIPLE);
1566 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1567
1568 return VOLK_OK;
1569}
1570
1571
1572inline static VOLK_rc
1573lookup_2bound(uint8_t idx0, uint8_t idx1, MDBIterator *it, size_t *ct)
1574{
1575 uint8_t luk1_offset, luk2_offset;
1576 MDB_dbi dbi = 0;
1577
1578 // Establish lookup ordering with some awkward offset math.
1579 for (int i = 0; i < 3; i++) {
1580 if (
1581 (
1582 idx0 == lookup_ordering_2bound[i][0] &&
1583 idx1 == lookup_ordering_2bound[i][1]
1584 ) || (
1585 idx0 == lookup_ordering_2bound[i][1] &&
1586 idx1 == lookup_ordering_2bound[i][0]
1587 )
1588 ) {
1589 it->term_order = (const uint8_t*)lookup_ordering_2bound[i];
1590 if (it->term_order[0] == idx0) {
1591 luk1_offset = 0;
1592 luk2_offset = 1;
1593 } else {
1594 luk1_offset = 1;
1595 luk2_offset = 0;
1596 }
1597 dbi = it->store->dbi[lookup_indices[i + 3]];
1598 log_debug (
1599 "Looking up 2 bound in %s",
1600 db_labels[lookup_indices[i + 3]]);
1601
1602 break;
1603 }
1604 }
1605
1606 if (dbi == 0) {
1607 log_error (
1608 "Values %d and %d not found in lookup keys.",
1609 idx0, idx1);
1610 return VOLK_VALUE_ERR;
1611 }
1612
1613 // Compose term keys in lookup key.
1614 VOLK_DoubleKey luk;
1615 luk[luk1_offset] = it->luk[0];
1616 luk[luk2_offset] = it->luk[1];
1617
1618 it->key.mv_data = luk;
1619 it->key.mv_size = DBL_KLEN;
1620
1621 mdb_cursor_open (it->txn, dbi, &it->cur);
1622 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1623
1624 if (ct) {
1625 // If a context is specified, the only way to count triples matching
1626 // the context is to loop over them.
1627 if (it->luc != NULL_KEY) {
1628 MDBIterator *ct_it;
1629 MALLOC_GUARD (ct_it, VOLK_MEM_ERR);
1630
1631 ct_it->store = it->store;
1632 ct_it->txn = it->txn;
1633 ct_it->ctx_cur = it->ctx_cur;
1634 ct_it->ck = NULL;
1635 ct_it->luk[0] = it->luk[0];
1636 ct_it->luk[1] = it->luk[1];
1637 ct_it->luc = it->luc;
1638 ct_it->i = 0;
1639
1640 lookup_2bound (idx0, idx1, ct_it, NULL);
1641
1642 while (mdbiter_next_key (ct_it) != VOLK_END) (*ct) ++;
1643
1644 // Free the counter iterator without freeing the shared txn.
1645 if (ct_it->cur) mdb_cursor_close (ct_it->cur);
1646 free (ct_it->ck);
1647 free (ct_it);
1648
1649 } else {
1650 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1651 if (it->rc == MDB_SUCCESS) mdb_cursor_count (it->cur, ct);
1652 }
1653 log_debug ("Found %lu triples.", *ct);
1654 }
1655
1656 it->i = 0;
1657 it->iter_op_fn = it_next_2bound;
1658
1659 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1660 if (it->rc == MDB_SUCCESS)
1661 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_MULTIPLE);
1662 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1663
1664 return VOLK_OK;
1665}
1666
1667
1668inline static VOLK_rc
1669lookup_3bound (MDBIterator *it, size_t *ct)
1670{
1671 log_debug (
1672 "Looking up 3 bound: {%x, %x, %x}",
1673 it->luk[0], it->luk[1], it->luk[2]);
1674
1675 it->key.mv_data = it->luk;
1676
1677 if (it->luc != NULL_KEY) {
1678 it->rc = mdb_cursor_open (
1679 it->txn, it->store->dbi[IDX_SPO_C], &it->cur);
1680
1681 it->key.mv_size = TRP_KLEN;
1682 it->data.mv_data = &it->luc;
1683 it->data.mv_size = KLEN;
1684
1685 } else {
1686 it->rc = mdb_cursor_open (it->txn, it->store->dbi[IDX_S_PO], &it->cur);
1687
1688 it->key.mv_size = KLEN;
1689 it->data.mv_data = it->luk + 1;
1690 it->data.mv_size = DBL_KLEN;
1691 }
1692
1693 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_BOTH);
1694 if (it->rc != MDB_NOTFOUND) RCCK (it->rc);
1695
1696 mdb_cursor_close (it->cur);
1697 it->cur = NULL;
1698
1699 if (ct && it->rc == MDB_SUCCESS) *ct = 1;
1700
1701 it->iter_op_fn = it_next_3bound;
1702 memcpy (it->spok, it->luk, sizeof (VOLK_TripleKey));
1703
1704 return VOLK_OK;
1705}
1706
1707
#define UNLIKELY(x)
Definition core.h:39
#define NULL_KEY
"NULL" key, a value that is never user-provided.
Definition buffer.h:15
VOLK_Key VOLK_buffer_hash(const VOLK_Buffer *buf)
Hash a buffer.
Definition buffer.h:175
VOLK_Buffer * VOLK_btriple_pos(const VOLK_BufferTriple *trp, VOLK_TriplePos n)
Get serialized triple by term position.
Definition buffer.h:297
VOLK_Buffer * VOLK_default_ctx_buf
Serialized default context.
Definition buffer.c:5
#define BUF_DUMMY
Dummy buffer to be used with VOLK_buffer_init.
Definition buffer.h:154
@ VOLK_BUF_BORROWED
Definition buffer.h:28
#define NULL_TRP
"NULL" triple, a value that is never user-provided.
Definition core.h:66
#define DBL_KLEN
Definition core.h:58
#define KLEN
Definition core.h:57
#define TRP_KLEN
Definition core.h:59
bool VOLK_env_is_init
Whether the environment is initialized.
Definition core.c:11
#define MALLOC_GUARD(var, rc)
Allocate one pointer with malloc and return rc if it fails.
Definition core.h:382
#define RCNL(exp)
Return NULL if exp returns a nonzero value.
Definition core.h:352
#define PRCNL(exp)
Return NULL if exp returns a negative value (=error).
Definition core.h:363
#define CHECK(exp, marker)
Jump to marker if exp does not return VOLK_OK.
Definition core.h:290
#define CALLOC_GUARD(var, rc)
Allocate one pointer with calloc and return rc if it fails.
Definition core.h:388
VOLK_rc rm_r(const char *path)
Remove a directory recursively (POSIX compatible).
Definition core.c:124
#define log_debug(...)
Definition core.h:273
#define RCCK(exp)
Return exp return value if it is of VOLK_rc type and nonzero.
Definition core.h:322
#define NLRCCK(exp, _rc)
Return rc return code if exp is NULL.
Definition core.h:344
VOLK_rc mkdir_p(const char *_path, mode_t mode)
Make recursive directories.
Definition core.c:50
#define LOG_RC(rc)
Log an error or warning for return codes that are not VOLK_OK.
Definition core.h:284
#define PRCCK(exp)
Return exp return value if it is of VOLK_rc type and negative (=error).
Definition core.h:333
#define log_trace(...)
Definition core.h:275
size_t VOLK_Key
Term key, i.e., hash of a serialized term.
Definition core.h:230
VOLK_Key VOLK_TripleKey[3]
Array of three VOLK_Key values, representing a triple.
Definition core.h:234
VOLK_Key VOLK_DoubleKey[2]
Array of two VOLK_Key values.
Definition core.h:232
#define VOLK_VALUE_ERR
An invalid input value was provided.
Definition core.h:129
#define VOLK_MEM_ERR
Memory allocation error.
Definition core.h:144
#define VOLK_NORESULT
No result yielded.
Definition core.h:100
#define VOLK_DB_ERR
Low-level database error.
Definition core.h:135
#define VOLK_CONFLICT
Conflict warning.
Definition core.h:120
#define VOLK_END
Loop end.
Definition core.h:107
#define VOLK_OK
Generic success return code.
Definition core.h:83
#define VOLK_NOACTION
No action taken.
Definition core.h:93
int VOLK_rc
Definition core.h:79
#define VOLK_ENV_ERR
Error while handling environment setup; or environment not initialized.
Definition core.h:158
#define VOLK_TXN_ERR
Error handling a store transaction.
Definition core.h:132
const char * VOLK_strerror(VOLK_rc rc)
Return an error message for a return code.
Definition core.c:196
VOLK_StoreFlags
Store flags passed to various operations.
@ VOLK_STORE_TXN_RO
Start a read-only transaction.
@ VOLK_STORE_TXN
Supports transaction handling.
@ VOLK_STORE_COW
Copy on write.
@ VOLK_STORE_IDX
Store is fully SPO(C)-indexed.
@ VOLK_STORE_PERM
@ VOLK_STORE_CTX
VOLK_Buffer ** mdbstore_ctx_list(void *h, void *th)
Definition store_mdb.c:1148
StoreFlags
Store state flags.
Definition store_mdb.c:31
@ LSSTORE_OPEN
Env is open.
Definition store_mdb.c:32
#define MAIN_TABLE
Definition store_mdb.c:108
char DbLabel[8]
Definition store_mdb.c:27
#define ENV_FILE_MODE
Definition store_mdb.c:21
#define DEFAULT_MAPSIZE
Definition store_mdb.c:15
#define ENV_DIR_MODE
Definition store_mdb.c:20
const VOLK_StoreInt mdbstore_int
MDB store interface.
Definition store_mdb.c:1188
char * mdbstore_id(const void *h)
Definition store_mdb.c:436
#define LOOKUP_TABLE
Definition store_mdb.c:116
void(* iter_op_fn_t)(MDBIterator *it)
Iterator operation.
Definition store_mdb.c:69
#define N_DB
Definition store_mdb.c:6
IterFlags
Iterator state flags.
Definition store_mdb.c:36
@ ITER_OPEN_TXN
A transaction is open.
Definition store_mdb.c:37
StoreOp
Definition store_mdb.c:48
@ OP_ADD
Definition store_mdb.c:49
@ OP_REMOVE
Definition store_mdb.c:50
DBIdx
Definition store_mdb.c:140
LMDB graph store backend.
#define VOLK_MDB_STORE_URN
Default MDB store identifier and location.
Definition store_mdb.h:38
Triple iterator.
Definition store_mdb.c:73
MDB_cursor * ctx_cur
MDB c:spo index cursor.
Definition store_mdb.c:78
VOLK_TripleKey spok
Triple to be populated with match.
Definition store_mdb.c:81
VOLK_Key luk[3]
0รท3 lookup keys.
Definition store_mdb.c:88
MDB_cursor * cur
MDB cursor.
Definition store_mdb.c:77
const uint8_t * term_order
Term order used in 1-2bound look-ups.
Definition store_mdb.c:87
MDBStore * store
MDB store handle.
Definition store_mdb.c:74
IterFlags flags
Iterator flags.
Definition store_mdb.c:75
MDB_txn * txn
MDB transaction.
Definition store_mdb.c:76
VOLK_Key luc
Ctx key to filter by. May be NULL_KEY.
Definition store_mdb.c:89
size_t i
Internal counter for paged lookups.
Definition store_mdb.c:90
iter_op_fn_t iter_op_fn
Function used to look up next match.
Definition store_mdb.c:86
MDB_val key
Internal data handler.
Definition store_mdb.c:79
size_t ct
Definition store_mdb.c:91
int rc
MDB_* return code for the next result.
Definition store_mdb.c:93
MDB_val data
Internal data handler.
Definition store_mdb.c:80
VOLK_Key * ck
Definition store_mdb.c:82
StoreFlags flags
Store state flags.
Definition store_mdb.c:56
MDB_env * env
Environment handle.
Definition store_mdb.c:54
MDB_dbi dbi[N_DB]
DB handles. Refer to DbIdx enum.
Definition store_mdb.c:55
Triple of byte buffers.
Definition buffer.h:60
VOLK_Buffer * o
Definition buffer.h:63
VOLK_Buffer * s
Definition buffer.h:61
VOLK_Buffer * p
Definition buffer.h:62
General-purpose data buffer.
Definition buffer.h:47
VOLK_BufferFlag flags
Definition buffer.h:50
unsigned char * addr
Definition buffer.h:48
size_t size
Definition buffer.h:49
Store interface.