objectManager.js

// Environment Imports
import logger from "./logger.js";
// S3 Imports
import {
  CopyObjectCommand,
  DeleteObjectCommand,
  GetObjectCommand,
  HeadObjectCommand,
  ListObjectsV2Command,
  S3Client,
} from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
// Helia Imports
import { CarWriter } from "@ipld/car";
import { car } from "@helia/car";
import { mfs } from "@helia/mfs";
import { unixfs } from "@helia/unixfs";
import { MemoryBlockstore } from "blockstore-core";
import { MemoryDatastore } from "datastore-core";
// Utility Imports
import { v4 as uuidv4 } from "uuid";
import { downloadFromGateway } from "./helpers.js";
import PQueue from "p-queue";

/** Interacts with an S3 client to perform various operations on objects in a bucket. */
class ObjectManager {
  #DEFAULT_ENDPOINT = "https://s3.filebase.com";
  #DEFAULT_REGION = "us-east-1";
  #DEFAULT_MAX_CONCURRENT_UPLOADS = 4;

  #client;
  #credentials;
  #defaultBucket;
  #gatewayConfiguration;
  #maxConcurrentUploads;

  /**
   * @typedef {Object} objectManagerOptions Optional settings for the constructor.
   * @property {string} [bucket] Default bucket to use.
   * @property {objectDownloadOptions} [gateway] Default gateway to use.
   * @property {number} [maxConcurrentUploads] The maximum number of concurrent uploads.
   */

  /**
   * @typedef {Object} objectDownloadOptions Optional settings for downloading objects
   * @property {string} endpoint Default gateway to use.
   * @property {string} [token] Token for the default gateway.
   * @property {number} [timeout=60000] Timeout for the default gateway
   */

  /**
   * @summary Creates a new instance of the constructor.
   * @param {string} clientKey - The access key ID for authentication.
   * @param {string} clientSecret - The secret access key for authentication.
   * @param {objectManagerOptions} options - Optional settings for the constructor.
   * @tutorial quickstart-object
   * @example
   * import { ObjectManager } from "@filebase/sdk";
   * const objectManager = new ObjectManager("KEY_FROM_DASHBOARD", "SECRET_FROM_DASHBOARD", {
   *   bucket: "my-default-bucket",
   *   maxConcurrentUploads: 4,
   *   gateway: {
   *     endpoint: "https://my-default-gateway.mydomain.com
   *     token: SUPER_SECRET_GATEWAY_TOKEN
   *   }
   * });
   */
  constructor(clientKey, clientSecret, options) {
    const clientEndpoint =
        process.env.NODE_ENV === "test"
          ? process.env.TEST_S3_ENDPOINT || this.#DEFAULT_ENDPOINT
          : this.#DEFAULT_ENDPOINT,
      clientConfiguration = {
        credentials: {
          accessKeyId: clientKey,
          secretAccessKey: clientSecret,
        },
        endpoint: clientEndpoint,
        region: this.#DEFAULT_REGION,
        forcePathStyle: true,
      };
    this.#defaultBucket = options?.bucket;
    this.#maxConcurrentUploads =
      options?.maxConcurrentUploads || this.#DEFAULT_MAX_CONCURRENT_UPLOADS;
    this.#credentials = {
      key: clientKey,
      secret: clientSecret,
    };
    this.#client = new S3Client(clientConfiguration);

    this.#gatewayConfiguration = {
      endpoint: options?.gateway?.endpoint,
      token: options?.gateway?.token,
      timeout: options?.gateway?.timeout,
    };
  }

  /**
   * @typedef {Object} objectOptions
   * @property {string} [bucket] - The bucket to pin the IPFS CID into.
   */

  /**
   * @typedef {Object} objectHeadResult
   * @property {string} cid The CID of the uploaded object
   * @property {function} download Convenience function to download the object via S3 or the selected gateway
   * @property {array<Object>} [entries] If a directory then returns an array of the containing objects
   * @property {string} entries.cid The CID of the uploaded object
   * @property {string} entries.path The path of the object
   */

  /**
   * If the source parameter is an array of objects, it will pack multiple files into a CAR file for upload.
   * The method returns a Promise that resolves to an object containing the CID (Content Identifier) of the uploaded file
   * and an optional entries object when uploading a CAR file.
   *
   * @summary Uploads a file or a CAR file to the specified bucket.
   * @param {string} key - The key or path of the file in the bucket.
   * @param {Buffer|ReadableStream|Array<Object>} source - The content of the object to be uploaded.
   *    If an array of files is provided, each file should have a 'path' property specifying the path of the file
   *    and a 'content' property specifying the content of the file.  The SDK will then construct a CAR file locally
   *    and use that as the content of the object to be uploaded.
   * @param {Object} [metadata] Optional metadata for pin object
   * @param {objectOptions} [options] - The options for uploading the object.
   * @returns {Promise<objectHeadResult>}
   * @example
   * // Upload Object
   * await objectManager.upload("my-object", Buffer.from("Hello World!"));
   * // Upload Object with Metadata
   * await objectManager.upload("my-custom-object", Buffer.from("Hello Big World!"), {
   *   "application": "my-filebase-app"
   * });
   * // Upload Directory
   * await objectManager.upload("my-first-directory", [
   *  {
   *   path: "/testObjects/1.txt",  // Virtual Path to store contents at within IPFS Folder/Directory
   *   content: Buffer.from("upload test object", "utf-8"),
   *  },
   *  {
   *   path: "/testObjects/deep/1.txt",
   *   content: Buffer.from("upload deep test object", "utf-8"),
   *  },
   *  {
   *   path: "/topLevel.txt",
   *   content: Buffer.from("upload top level test object", "utf-8"),
   *  },
   * ]);
   */
  async upload(key, source, metadata, options) {
    // Generate Upload UUID
    const uploadUUID = uuidv4();
    const uploadLogger = logger.child({ uploadUUID });

    // Setup Upload Options
    const bucket = options?.bucket || this.#defaultBucket,
      uploadOptions = {
        client: this.#client,
        params: {
          Bucket: bucket,
          Key: key,
          Body: source,
          Metadata: metadata || {},
        },
        queueSize: this.#maxConcurrentUploads,
        partSize: 26843546, //25.6Mb || 250Gb Max File Size
      };

    // Pack Multiple Files into CAR file for upload
    let parsedEntries = {};
    if (Array.isArray(source)) {
      // Mark Upload as a CAR file import
      uploadOptions.params.Metadata = {
        ...uploadOptions.params.Metadata,
        import: "car",
      };
      source.sort((a, b) => {
        return countSlashes(b.path) - countSlashes(a.path);
      });

      let temporaryCarFilePath, temporaryBlockstoreDir;
      try {
        // Setup Blockstore
        let temporaryBlockstore = new MemoryBlockstore(),
          temporaryDatastore = new MemoryDatastore();
        if (isNode()) {
          const { mkdir } = await import("node:fs/promises");
          const { FsBlockstore } = await import("blockstore-fs");
          const os = await import("node:os");
          const path = await import("node:path");
          temporaryBlockstoreDir = path.resolve(
            os.tmpdir(),
            "filebase-sdk",
            "uploads",
            uploadUUID,
          );
          temporaryCarFilePath = `${temporaryBlockstoreDir}/main.car`;
          await mkdir(temporaryBlockstoreDir, { recursive: true });
          temporaryBlockstore = new FsBlockstore(temporaryBlockstoreDir);
        }
        let createdFiles = new Map();
        const heliaFs = unixfs({
          blockstore: temporaryBlockstore,
        });
        uploadLogger.verbose("UNIXFS_ADD", {
          count: source.length,
        });
        let createFilePromises = [];
        const queue = new PQueue({ concurrency: 50 });
        for (let entry of source) {
          entry.path = entry.path.startsWith("/") ? entry.path : `/${entry.path}`;
          if (entry.content === null) {
            continue;
          }
          const task = (async () => {
            await queue.add(async () => {
              uploadLogger.silly("SOURCE_IMPORT_STARTED", {
                path: entry.path,
                size: queue.size,
              });

              if (isNode()) {
                const { Readable } = await import("node:stream");
                let createdFile;
                if (
                  (entry.type === "import" && entry.content !== null) ||
                  entry.content instanceof Readable
                ) {
                  let filehandle;
                  try {
                    if (entry.type === "import") {
                      const { open } = await import("node:fs/promises");
                      const path = await import("node:path");
                      filehandle = await open(path.resolve(entry.content), "r");
                      entry.content = filehandle.createReadStream();
                    }
                    createdFile = await heliaFs.addByteStream(entry.content);
                  } catch (err) {
                    if (typeof filehandle !== "undefined") {
                      await filehandle.close();
                    }
                    throw err;
                  }
                  if (typeof filehandle !== "undefined") {
                    await filehandle.close();
                  }
                } else if (entry.content !== null) {
                  createdFile = await heliaFs.addBytes(entry.content);
                } else {
                  return;
                }
                createdFiles.set(entry.path, createdFile);
                uploadLogger.verbose("SOURCE_IMPORT_COMPLETED", {
                  path: entry.path,
                  size: queue.size,
                });
              } else {
                let createdFile;
                if (entry.type === "import" && entry.content !== null) {
                  createdFile = await heliaFs.addByteStream(entry.content);
                } else if (entry.content !== null) {
                  createdFile = await heliaFs.addBytes(entry.content);
                } else {
                  return;
                }
                createdFiles.set(entry.path, createdFile);
                uploadLogger.verbose("SOURCE_IMPORT_COMPLETED", {
                  path: entry.path,
                  size: queue.size,
                });
              }
            });
          })();
          if (queue.size > 150) {
            await queue.onSizeLessThan(100);
          }
          createFilePromises.push(task);
          uploadLogger.verbose("SOURCE_IMPORT_QUEUED", {
            path: entry.path,
            size: queue.size,
          });
        }
        await Promise.all(createFilePromises);
        uploadLogger.verbose("UNIXFS_ADDED", {
          count: createdFiles.size,
        });

        const heliaMfs = mfs({
          blockstore: temporaryBlockstore,
          datastore: temporaryDatastore,
        });
        uploadLogger.verbose("MFS_ADDING", {
          count: source.length,
          output: temporaryCarFilePath,
        });
        for (const entry of source) {
          if (entry.content === null) {
            uploadLogger.silly("MFS_DIR_CREATING", {
              path: entry.path,
            });
            await heliaMfs.mkdir(entry.path);
            uploadLogger.verbose("MFS_DIR_CREATED", {
              path: entry.path,
            });
          } else {
            const entryFile = createdFiles.get(entry.path);
            uploadLogger.silly("MFS_FILE_COPY", {
              cid: entryFile,
              path: entry.path,
            });
            await heliaMfs.cp(entryFile, entry.path, {
              force: true,
            });
            uploadLogger.verbose("MFS_FILE_COPIED", {
              cid: entryFile,
              path: entry.path,
            });
          }
        }
        for (const entry of source) {
          parsedEntries[entry.path] = await heliaMfs.stat(entry.path);
          uploadLogger.silly("MFS_PATH_STAT", parsedEntries[entry.path]);
        }
        parsedEntries["/"] = await heliaMfs.stat("/");
        const rootEntry = parsedEntries["/"];
        uploadLogger.verbose("MFS_ADDED", {
          root: rootEntry,
          count: Object.keys(parsedEntries).length,
        });

        // Get carFile stream here
        uploadLogger.verbose("CAR_EXPORTING", {
          root: rootEntry,
        });
        const carExporter = car({ blockstore: temporaryBlockstore }),
          { writer, out } = CarWriter.create([rootEntry.cid]);

        if (isNode()) {
          const { createReadStream, createWriteStream } = await import(
            "node:fs"
          );
          const { Readable } = await import("node:stream");
          // Put carFile stream to disk
          const output = createWriteStream(temporaryCarFilePath);
          Readable.from(out).pipe(output);
          await carExporter.export(rootEntry.cid, writer);
          uploadLogger.verbose("CAR_EXPORTED", {
            root: rootEntry,
          });

          // Set Uploader to Read from carFile on disk
          uploadOptions.params.Body = createReadStream(temporaryCarFilePath);
        }

        // Upload carFile via S3
        uploadLogger.verbose("CAR_UPLOADING", {
          entry: rootEntry,
          source: temporaryCarFilePath,
        });
        const parallelUploads3 = new Upload(uploadOptions);
        parallelUploads3.on("httpUploadProgress", (progress) => {
          uploadLogger.debug("CAR_UPLOAD_PROGRESS", progress);
        });
        await parallelUploads3.done();
        uploadLogger.verbose("CAR_UPLOADED", {
          entry: rootEntry,
          source: temporaryCarFilePath,
        });
        await temporaryBlockstore.close();
      } catch (err) {
        console.error(err.message);
        throw err;
      } finally {
        if (typeof temporaryBlockstoreDir !== "undefined" && isNode()) {
          const { rm } = await import("node:fs/promises");
          // Delete Temporary Blockstore
          await rm(temporaryBlockstoreDir, { recursive: true, force: true });
        }
      }
    } else {
      // Upload file via S3
      const parallelUploads3 = new Upload(uploadOptions);
      await parallelUploads3.done();
    }

    // Get CID from Platform
    const command = new HeadObjectCommand({
        Bucket: bucket,
        Key: key,
        Body: source,
      }),
      headResult = await this.#client.send(command),
      responseCid = headResult.Metadata.cid;

    if (Object.keys(parsedEntries).length === 0) {
      return {
        cid: responseCid,
        download: () => {
          return this.#routeDownload(responseCid, key, options);
        },
      };
    }
    return {
      cid: responseCid,
      download: () => {
        return this.#routeDownload(responseCid, key, options);
      },
      entries: parsedEntries,
    };
  }

  async #routeDownload(cid, key, options) {
    return typeof this.#gatewayConfiguration.endpoint !== "undefined"
      ? downloadFromGateway(cid, this.#gatewayConfiguration)
      : this.download(key, options);
  }

  /**
   * @summary Gets an objects info and metadata using the S3 API.
   * @param {string} key - The key of the object to be inspected.
   * @param {objectOptions} [options] - The options for inspecting the object.
   * @returns {Promise<objectHeadResult|false>}
   */
  async get(key, options) {
    const bucket = options?.bucket || this.#defaultBucket;
    try {
      const command = new HeadObjectCommand({
          Bucket: bucket,
          Key: key,
        }),
        response = await this.#client.send(command);

      response.download = () => {
        return this.#routeDownload(response.Metadata.cid, key, options);
      };

      return response;
    } catch (err) {
      if (err.name === "NotFound") {
        return false;
      }
      throw err;
    }
  }

  /**
   * @summary Downloads an object from the specified bucket using the provided key.
   * @param {string} key - The key of the object to be downloaded.
   * @param {objectOptions} [options] - The options for downloading the object.
   * @returns {Promise<Object>} - A promise that resolves with the contents of the downloaded object as a Stream.
   * @example
   * // Download object with name of `download-object-example`
   * await objectManager.download(`download-object-example`);
   */
  async download(key, options) {
    // Download via IPFS Gateway if Setup or S3 by Default
    if (typeof this.#gatewayConfiguration.endpoint === "string") {
      const objectToFetch = await this.get(key, options);
      return objectToFetch.download();
    } else {
      const command = new GetObjectCommand({
          Bucket: options?.bucket || this.#defaultBucket,
          Key: key,
        }),
        response = await this.#client.send(command);

      return response.Body;
    }
  }

  /**
   * @typedef {Object} listObjectsResult
   * @property {boolean} IsTruncated Indicates if more results exist on the server
   * @property {string} NextContinuationToken ContinuationToken used to paginate list requests
   * @property {Array<Object>} Contents List of Keys stored in the S3 Bucket
   * @property {string} Contents.Key Key of the Object
   * @property {string} Contents.LastModified Date Last Modified of the Object
   * @property {string} Contents.CID CID of the Object
   * @property {string} Contents.ETag ETag of the Object
   * @property {number} Contents.Size Size in Bytes of the Object
   * @property {string} Contents.StorageClass Class of Storage of the Object
   * @property {function} Contents.download Convenience function to download the item using the S3 gateway
   */

  /**
   * @typedef {Object} listObjectOptions
   * @property {string} [Bucket] The name of the bucket. If not provided, the default bucket will be used.
   * @property {string} [ContinuationToken=null] Continues listing from this objects name.
   * @property {string} [Delimiter=null] Character used to group keys
   * @property {number} [MaxKeys=1000] The maximum number of objects to retrieve. Defaults to 1000.
   */

  /**
   * Retrieves a list of objects from a specified bucket.
   *
   * @param {listObjectOptions} options - The options for listing objects.
   * @returns {Promise<listObjectsResult>} - A promise that resolves to an array of objects.
   * @example
   * // List objects in bucket with a limit of 1000
   * await objectManager.list({
   *   MaxKeys: 1000
   * });
   */
  async list(
    options = {
      Bucket: this.#defaultBucket,
      ContinuationToken: null,
      Delimiter: null,
      MaxKeys: 1000,
    },
  ) {
    if (options?.MaxKeys && options.MaxKeys > 100000) {
      throw new Error(`MaxKeys Maximum value is 100000`);
    }
    const bucket = options?.Bucket || this.#defaultBucket,
      limit = options?.MaxKeys || 1000,
      commandOptions = {
        Bucket: bucket,
        MaxKeys: limit,
      },
      command = new ListObjectsV2Command({
        ...options,
        ...commandOptions,
      });

    const { Contents, IsTruncated, NextContinuationToken } =
      await this.#client.send(command);
    return { Contents, IsTruncated, NextContinuationToken };
  }

  /**
   * @summary Deletes an object from the specified bucket using the provided key.
   * @param {string} key - The key of the object to be deleted.
   * @param {objectOptions} [options] - The options for deleting the file.
   * @returns {Promise<Boolean>} - A Promise that resolves with the result of the delete operation.
   * @example
   * // Delete object with name of `delete-object-example`
   * await objectManager.delete(`delete-object-example`);
   */
  async delete(key, options) {
    const command = new DeleteObjectCommand({
      Bucket: options?.bucket || this.#defaultBucket,
      Key: key,
    });

    await this.#client.send(command);
    return true;
  }

  /**
   * @typedef {Object} copyObjectOptions
   * @property {string} [sourceBucket] The source bucket from where the object is to be copied.
   * @property {string} [destinationKey] The key of the object in the destination bucket. By default, it is the same as the sourceKey.
   */

  /**
   * If the destinationKey is not provided, the object will be copied with the same key as the sourceKey.
   *
   * @summary Copy the object from sourceKey in the sourceBucket to destinationKey in the destinationBucket.
   * @param {string} sourceKey - The key of the object to be copied from the sourceBucket.
   * @param {string} destinationBucket - The bucket where the object will be copied to.
   * @param {copyObjectOptions} [options] - Additional options for the copy operation.
   *
   * @returns {Promise<Boolean>} - A Promise that resolves with the result of the copy operation.
   * @example
   * // Copy object `copy-object-test` from `copy-object-test-pass-src` to `copy-object-test-pass-dest`
   * // TIP: Set bucket on constructor, it will be used as the default source for copying objects.
   * await objectManager.copy(`copy-object-test`, `copy-object-dest`, {
   *   sourceBucket: `copy-object-src`
   * });
   */
  async copy(
    sourceKey,
    destinationBucket,
    options = {
      sourceBucket: this.#defaultBucket,
      destinationKey: undefined,
    },
  ) {
    const copySource = `${
        options?.sourceBucket || this.#defaultBucket
      }/${sourceKey}`,
      command = new CopyObjectCommand({
        CopySource: copySource,
        Bucket: destinationBucket,
        Key: options?.destinationKey || sourceKey,
      });

    await this.#client.send(command);
    return true;
  }
}

// Function to count slashes in a path
function countSlashes(path) {
  return (path.match(/\//g) || []).length;
}

// Function to check if the code is running in Node.js or the browser
function isNode() {
  return typeof process !== "undefined" && process.release.name === "node";
}

export default ObjectManager;