Abstract
This example illustrates the manipulation of a
BLOB
column in the NDB API using the
NdbRecord
interface available beginning with
MySQL Cluster NDB 6.2.3. It demonstrates how to perform insert,
read, and update operations, using both inline value buffers as
well as read and write methods. It can be found in the file
storage/ndb/ndbapi-examples/ndbapi_blob_ndbrecord/main.cpp
in the MySQL Cluster source trees.
While the MySQL data type used in the example is actually
TEXT
, the same principles apply
#include <mysql.h> #include <mysqld_error.h> #include <NdbApi.hpp> /* Used for cout. */ #include <iostream> #include <stdio.h> #include <ctype.h> /** * Helper debugging macros */ #define PRINT_ERROR(code,msg) \ std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl #define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); } #define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); } /* Quote taken from Project Gutenberg. */ const char *text_quote= "Just at this moment, somehow or other, they began to run.\n" "\n" " Alice never could quite make out, in thinking it over\n" "afterwards, how it was that they began: all she remembers is,\n" "that they were running hand in hand, and the Queen went so fast\n" "that it was all she could do to keep up with her: and still the\n" "Queen kept crying 'Faster! Faster!' but Alice felt she COULD NOT\n" "go faster, though she had not breath left to say so.\n" "\n" " The most curious part of the thing was, that the trees and the\n" "other things round them never changed their places at all:\n" "however fast they went, they never seemed to pass anything. 'I\n" "wonder if all the things move along with us?' thought poor\n" "puzzled Alice. And the Queen seemed to guess her thoughts, for\n" "she cried, 'Faster! Don't try to talk!'\n" "\n" " Not that Alice had any idea of doing THAT. She felt as if she\n" "would never be able to talk again, she was getting so much out of\n" "breath: and still the Queen cried 'Faster! Faster!' and dragged\n" "her along. 'Are we nearly there?' Alice managed to pant out at\n" "last.\n" "\n" " 'Nearly there!' the Queen repeated. 'Why, we passed it ten\n" "minutes ago! Faster!' And they ran on for a time in silence,\n" "with the wind whistling in Alice's ears, and almost blowing her\n" "hair off her head, she fancied.\n" "\n" " 'Now! Now!' cried the Queen. 'Faster! Faster!' And they\n" "went so fast that at last they seemed to skim through the air,\n" "hardly touching the ground with their feet, till suddenly, just\n" "as Alice was getting quite exhausted, they stopped, and she found\n" "herself sitting on the ground, breathless and giddy.\n" "\n" " The Queen propped her up against a tree, and said kindly, 'You\n" "may rest a little now.'\n" "\n" " Alice looked round her in great surprise. 'Why, I do believe\n" "we've been under this tree the whole time! Everything's just as\n" "it was!'\n" "\n" " 'Of course it is,' said the Queen, 'what would you have it?'\n" "\n" " 'Well, in OUR country,' said Alice, still panting a little,\n" "'you'd generally get to somewhere else--if you ran very fast\n" "for a long time, as we've been doing.'\n" "\n" " 'A slow sort of country!' said the Queen. 'Now, HERE, you see,\n" "it takes all the running YOU can do, to keep in the same place.\n" "If you want to get somewhere else, you must run at least twice as\n" "fast as that!'\n" "\n" " 'I'd rather not try, please!' said Alice. 'I'm quite content\n" "to stay here--only I AM so hot and thirsty!'\n" "\n" " -- Lewis Carroll, 'Through the Looking-Glass'."; /* NdbRecord objects. */ const NdbRecord *key_record; // For specifying table key const NdbRecord *blob_record; // For accessing blob const NdbRecord *full_record; // All columns, for insert /* The row is 4 bytes of primary key + space for blob handle pointer. */ #define ROWSIZE (4 + sizeof(NdbBlob *)) static void setup_records(Ndb *myNdb) { NdbDictionary::RecordSpecification spec[2]; NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("my_text"); if (myTable == NULL) APIERROR(myDict->getNdbError()); const NdbDictionary::Column *col1= myTable->getColumn("my_id"); if (col1 == NULL) APIERROR(myDict->getNdbError()); const NdbDictionary::Column *col2= myTable->getColumn("my_text"); if (col2 == NULL) APIERROR(myDict->getNdbError()); spec[0].column= col1; spec[0].offset= 0; spec[0].nullbit_byte_offset= 0; spec[0].nullbit_bit_in_byte= 0; spec[1].column= col2; spec[1].offset= 4; spec[1].nullbit_byte_offset= 0; spec[1].nullbit_bit_in_byte= 0; key_record= myDict->createRecord(myTable, &spec[0], 1, sizeof(spec[0])); if (key_record == NULL) APIERROR(myDict->getNdbError()); blob_record= myDict->createRecord(myTable, &spec[1], 1, sizeof(spec[0])); if (blob_record == NULL) APIERROR(myDict->getNdbError()); full_record= myDict->createRecord(myTable, &spec[0], 2, sizeof(spec[0])); if (full_record == NULL) APIERROR(myDict->getNdbError()); } /* Function to drop table. */ void drop_table(MYSQL &mysql) { if (mysql_query(&mysql, "DROP TABLE my_text")) MYSQLERROR(mysql); } /* Functions to create table. */ int try_create_table(MYSQL &mysql) { return mysql_query(&mysql, "CREATE TABLE" " my_text" " (my_id INT UNSIGNED NOT NULL," " my_text TEXT NOT NULL," " PRIMARY KEY USING HASH (my_id))" " ENGINE=NDB"); } void create_table(MYSQL &mysql) { if (try_create_table(mysql)) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); std::cout << "MySQL Cluster already has example table: my_text. " << "Dropping it..." << std::endl; /****************** * Recreate table * ******************/ drop_table(mysql); if (try_create_table(mysql)) MYSQLERROR(mysql); } } int populate(Ndb *myNdb) { char row[ROWSIZE]; NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); Uint32 id= 1; memcpy(&row[0], &id, 4); NdbOperation *myNdbOperation= myTrans->insertTuple(full_record, row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); myBlobHandle->setValue(text_quote, strlen(text_quote)); int check= myTrans->execute(NdbTransaction::Commit); myTrans->close(); return check != -1; } int update_key(Ndb *myNdb) { char row[ROWSIZE]; /* Uppercase all characters in TEXT field, using primary key operation. Use piece-wise read/write to avoid loading entire data into memory at once. */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); Uint32 id= 1; memcpy(&row[0], &id, 4); NdbOperation *myNdbOperation= myTrans->updateTuple(key_record, row, blob_record, row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); /* Execute NoCommit to make the blob handle active. */ if (-1 == myTrans->execute(NdbTransaction::NoCommit)) APIERROR(myTrans->getNdbError()); Uint64 length= 0; if (-1 == myBlobHandle->getLength(length)) APIERROR(myBlobHandle->getNdbError()); /* A real application should use a much larger chunk size for efficiency, preferably much larger than the part size, which defaults to 2000. 64000 might be a good value. */ #define CHUNK_SIZE 100 int chunk; char buffer[CHUNK_SIZE]; for (chunk= (length-1)/CHUNK_SIZE; chunk >=0; chunk--) { Uint64 pos= chunk*CHUNK_SIZE; Uint32 chunk_length= CHUNK_SIZE; if (pos + chunk_length > length) chunk_length= length - pos; /* Read from the end back, to illustrate seeking. */ if (-1 == myBlobHandle->setPos(pos)) APIERROR(myBlobHandle->getNdbError()); if (-1 == myBlobHandle->readData(buffer, chunk_length)) APIERROR(myBlobHandle->getNdbError()); int res= myTrans->execute(NdbTransaction::NoCommit); if (-1 == res) APIERROR(myTrans->getNdbError()); /* Uppercase everything. */ for (Uint64 j= 0; j < chunk_length; j++) buffer[j]= toupper(buffer[j]); if (-1 == myBlobHandle->setPos(pos)) APIERROR(myBlobHandle->getNdbError()); if (-1 == myBlobHandle->writeData(buffer, chunk_length)) APIERROR(myBlobHandle->getNdbError()); /* Commit on the final update. */ if (-1 == myTrans->execute(chunk ? NdbTransaction::NoCommit : NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); } myNdb->closeTransaction(myTrans); return 1; } int update_scan(Ndb *myNdb) { /* Lowercase all characters in TEXT field, using a scan with updateCurrentTuple(). */ char buffer[10000]; char row[ROWSIZE]; NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); NdbScanOperation *myScanOp= myTrans->scanTable(blob_record, NdbOperation::LM_Exclusive); if (myScanOp == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myScanOp->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myScanOp->getNdbError()); if (myBlobHandle->getValue(buffer, sizeof(buffer))) APIERROR(myBlobHandle->getNdbError()); /* Start the scan. */ if (-1 == myTrans->execute(NdbTransaction::NoCommit)) APIERROR(myTrans->getNdbError()); const char *out_row; int res; for (;;) { res= myScanOp->nextResult(out_row, true); if (res==1) break; // Scan done. else if (res) APIERROR(myScanOp->getNdbError()); Uint64 length= 0; if (myBlobHandle->getLength(length) == -1) APIERROR(myBlobHandle->getNdbError()); /* Lowercase everything. */ for (Uint64 j= 0; j < length; j++) buffer[j]= tolower(buffer[j]); NdbOperation *myUpdateOp= myScanOp->updateCurrentTuple(myTrans, blob_record, row); if (myUpdateOp == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle2= myUpdateOp->getBlobHandle("my_text"); if (myBlobHandle2 == NULL) APIERROR(myUpdateOp->getNdbError()); if (myBlobHandle2->setValue(buffer, length)) APIERROR(myBlobHandle2->getNdbError()); if (-1 == myTrans->execute(NdbTransaction::NoCommit)) APIERROR(myTrans->getNdbError()); } if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); return 1; } struct ActiveHookData { char buffer[10000]; Uint32 readLength; }; int myFetchHook(NdbBlob* myBlobHandle, void* arg) { ActiveHookData *ahd= (ActiveHookData *)arg; ahd->readLength= sizeof(ahd->buffer) - 1; return myBlobHandle->readData(ahd->buffer, ahd->readLength); } int fetch_key(Ndb *myNdb) { char key_row[ROWSIZE]; char out_row[ROWSIZE]; /* Fetch and show the blob field, using setActiveHook(). */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); Uint32 id= 1; memcpy(&key_row[0], &id, 4); NdbOperation *myNdbOperation= myTrans->readTuple(key_record, key_row, blob_record, out_row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); struct ActiveHookData ahd; if (myBlobHandle->setActiveHook(myFetchHook, &ahd) == -1) APIERROR(myBlobHandle->getNdbError()); /* Execute Commit, but calling our callback set up in setActiveHook() before actually committing. */ if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); /* Our fetch callback will have been called during the execute(). */ ahd.buffer[ahd.readLength]= '\0'; std::cout << "Fetched data:" << std::endl << ahd.buffer << std::endl; return 1; } int update2_key(Ndb *myNdb) { char buffer[10000]; char row[ROWSIZE]; /* Simple setValue() update. */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); Uint32 id= 1; memcpy(&row[0], &id, 4); NdbOperation *myNdbOperation= myTrans->updateTuple(key_record, row, blob_record, row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); memset(buffer, ' ', sizeof(buffer)); if (myBlobHandle->setValue(buffer, sizeof(buffer)) == -1) APIERROR(myBlobHandle->getNdbError()); if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); return 1; } int delete_key(Ndb *myNdb) { char row[ROWSIZE]; /* Deletion of blob row. */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); Uint32 id= 1; memcpy(&row[0], &id, 4); NdbOperation *myNdbOperation= myTrans->deleteTuple(key_record, row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); return 1; } int main(int argc, char**argv) { if (argc != 3) { std::cout << "Arguments are <socket mysqld> <connect_string cluster>.\n"; exit(-1); } char *mysqld_sock = argv[1]; const char *connectstring = argv[2]; ndb_init(); MYSQL mysql; /* Connect to mysql server and create table. */ { if ( !mysql_init(&mysql) ) { std::cout << "mysql_init failed.\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 0, mysqld_sock, 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE TEST_DB"); if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql); create_table(mysql); } /* Connect to ndb cluster. */ Ndb_cluster_connection cluster_connection(connectstring); if (cluster_connection.connect(4, 5, 1)) { std::cout << "Unable to connect to cluster within 30 secs." << std::endl; exit(-1); } /* Optionally connect and wait for the storage nodes (ndbd's). */ if (cluster_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb myNdb(&cluster_connection,"TEST_DB"); if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb.getNdbError()); exit(-1); } setup_records(&myNdb); if(populate(&myNdb) > 0) std::cout << "populate: Success!" << std::endl; if(update_key(&myNdb) > 0) std::cout << "update_key: Success!" << std::endl; if(update_scan(&myNdb) > 0) std::cout << "update_scan: Success!" << std::endl; if(fetch_key(&myNdb) > 0) std::cout << "fetch_key: Success!" << std::endl; if(update2_key(&myNdb) > 0) std::cout << "update2_key: Success!" << std::endl; if(delete_key(&myNdb) > 0) std::cout << "delete_key: Success!" << std::endl; /* Drop table. */ drop_table(mysql); return 0; }