/** * Copyright (c) 2006 - 2009 Smaxe Ltd (www.smaxe.com). * All rights reserved. */ import com.smaxe.uv.client.ICamera; import com.smaxe.uv.client.IMicrophone; import com.smaxe.uv.client.rtmp.INetConnection; import com.smaxe.uv.client.rtmp.INetStream; import com.smaxe.uv.client.rtmp.IVideo; import com.smaxe.uv.client.rtmp.video.AbstractVideo; import com.smaxe.uv.stream.MediaData; import com.smaxe.uv.stream.MediaDataFactory; import java.io.IOException; import java.util.Map; /** * <code>ExRtmpRetransmitter</code> - retrieves RTMP <code>sourceStream</code> from <code>sourceUrl</code> and * publishes it to the <code>destinationUrl</code>. * * @author Andrei Sochirca * @see <a href="http://www.smaxe.com/product.jsf?id=juv-rtmp-client" target="_blank">JUV RTMP Client</a> * @see <a href="http://www.smaxe.com/product.jsf?id=juv-rtmfp-client" target="_blank">JUV RTMFP/RTMP Client</a> */ public final class ExRtmpRetransmitter extends Object { /** * Entry point. * * @param args * @throws Exception if an exception occurred */ public static void main(final String[] args) throws Exception { // NOTE: // you can get Evaluation Key at: // http://www.smaxe.com/order.jsf#request_evaluation_key // or buy at: // http://www.smaxe.com/order.jsf // Android-specific: // - please add permission to the AndroidManifest.xml : // <uses-permission android:name="android.permission.INTERNET" /> // - please use separate thread to connect to the server (not UI thread) : // NetConnection#connect() connects to the remote server in the invocation thread, // so it causes NetworkOnMainThreadException on 4.0 (http://developer.android.com/reference/android/os/NetworkOnMainThreadException.html) // Nginx-specific: // - please use AMF0 encoding com.smaxe.uv.client.rtmp.License.setKey("SET-YOUR-KEY"); final String sourceUrl = "rtmp://localhost:1935/live"; final String sourceStream = "livestream"; final String destinationUrl = "rtmp://localhost:1935/live"; final String destinationStream = "livestream2"; final AudioVideoStream avstream = new AudioVideoStream(); new Thread(new Runnable() { public void run() { final Player player = new Player(); player.play(new com.smaxe.uv.client.rtmp.NetConnection(), sourceUrl, sourceStream, avstream); } }, "ExRtmpRetransmitter-Player").start(); Thread.sleep(1000); new Thread(new Runnable() { public void run() { final Publisher publisher = new Publisher(); publisher.publish(new com.smaxe.uv.client.rtmp.NetConnection(), destinationUrl, destinationStream, avstream, avstream); } }, "ExRtmpRetransmitter-Publisher").start(); } /** * <code>AudioVideoStream</code> - a/v stream. */ private static class AudioVideoStream extends AbstractVideo implements IMicrophone, ICamera { // fields private IMicrophone.IListener microphoneListener = null; private ICamera.IListener cameraListener = null; private byte[] audioConfiguration = null; private byte[] videoConfiguration = null; private boolean active = false; /** * Constructor. */ public AudioVideoStream() { } // IVideo implementation @Override public void onAudioData(final MediaData data) { if (microphoneListener == null) { final int tag = data.tag(); if (getAudioCodec(tag) == 0x0A) // AAC { byte[] id = new byte[2]; try { data.read().read(id); } catch (IOException e) { e.printStackTrace(); } if (id[0] == (byte) 0xAF && id[1] == 0x00) // is config frame? { audioConfiguration = new byte[data.size()]; try { data.read().read(audioConfiguration); } catch (IOException e) { e.printStackTrace(); } } } } else { if (active) { microphoneListener.onAudioData(data); } } } @Override public void onVideoData(final MediaData data) { if (cameraListener == null) { final int tag = data.tag(); if (getVideoCodec(tag) == 0x07) // AVC { byte[] id = new byte[2]; try { data.read().read(id); } catch (IOException e) { e.printStackTrace(); } if (id[0] == 0x17 && id[1] == 0x00) // is config frame? { videoConfiguration = new byte[data.size()]; try { data.read().read(videoConfiguration); } catch (IOException e) { e.printStackTrace(); } } } } else { if (!active) { if (getVideoFrame(data.tag()) != 0x01) // is key frame? { return; } else { active = true; } } cameraListener.onVideoData(data); } } @Override public void onFlvData(final MediaData data) { if (cameraListener != null) { cameraListener.onFlvData(data); } } @Override public void onCuePoint(final Object data) { System.out.println("onCuePoint: " + data); } @Override public void onMetaData(final Object data) { System.out.println("onMetaData: " + data); } // IMicrophone implementation public void addListener(final IMicrophone.IListener listener) { if (audioConfiguration != null) { listener.onAudioData(MediaDataFactory.create(0 /*rtime*/, audioConfiguration)); } microphoneListener = listener; } public void removeListener(final IMicrophone.IListener listener) { microphoneListener = null; } // ICamera implementation public void addListener(final ICamera.IListener listener) { if (videoConfiguration != null) { listener.onVideoData(MediaDataFactory.create(0 /*rtime*/, videoConfiguration)); } cameraListener = listener; } public void removeListener(final ICamera.IListener listener) { cameraListener = new ICamera.ListenerAdapter(); } // inner use methods /** * @param tag * @return format encoded in the <code>tag</code> */ public int getAudioCodec(final int tag) { return (tag >> 4) & 0x0F; } /** * @param tag * @return codec encoded in the <code>tag</code> */ public int getVideoCodec(final int tag) { return tag & 0x0F; } /** * @param tag * @return frame encoded in the <code>tag</code> */ public int getVideoFrame(final int tag) { return (tag >> 4) & 0x0F; } } /** * <code>Player</code> - player. */ public static final class Player extends Object { /** * <code>NetConnectionListener</code> - {@link INetConnection} listener implementation. */ public class NetConnectionListener extends INetConnection.ListenerAdapter { /** * Constructor. */ public NetConnectionListener() { } @Override public void onAsyncError(final INetConnection source, final String message, final Exception e) { System.out.println("Player#NetConnection#onAsyncError: " + message + " " + e); } @Override public void onIOError(final INetConnection source, final String message) { System.out.println("Player#NetConnection#onIOError: " + message); } @Override public void onNetStatus(final INetConnection source, final Map<String, Object> info) { System.out.println("Player#NetConnection#onNetStatus: " + info); final Object code = info.get("code"); if (INetConnection.CONNECT_SUCCESS.equals(code)) { } else { disconnected = true; } } } // fields private volatile boolean disconnected = false; /** * Constructor. */ public Player() { } /** * Plays the stream. * * @param connection * @param url * @param streamName * @param video */ public void play(final INetConnection connection, final String url, final String streamName, final IVideo video) { connection.addEventListener(new NetConnectionListener()); connection.connect(url); // wait till connected while (!connection.connected() && !disconnected) { try { Thread.sleep(100); } catch (Exception e) {/*ignore*/} } if (!disconnected) { INetStream stream = connection.createNetStream(); stream.addEventListener(new INetStream.ListenerAdapter() { @Override public void onNetStatus(final INetStream source, final Map<String, Object> info) { System.out.println("Player#NetStream#onNetStatus: " + info); } }); stream.play(video, streamName); } while (!disconnected) { try { Thread.sleep(100); } catch (Exception e) {/*ignore*/} } connection.close(); } } /** * <code>Publisher</code> - publisher. */ public static final class Publisher extends Object { /** * <code>NetConnectionListener</code> - {@link INetConnection} listener implementation. */ private final class NetConnectionListener extends INetConnection.ListenerAdapter { /** * Constructor. */ public NetConnectionListener() { } @Override public void onAsyncError(final INetConnection source, final String message, final Exception e) { System.out.println("Publisher#NetConnection#onAsyncError: " + message + " " + e); } @Override public void onIOError(final INetConnection source, final String message) { System.out.println("Publisher#NetConnection#onIOError: " + message); } @Override public void onNetStatus(final INetConnection source, final Map<String, Object> info) { System.out.println("Publisher#NetConnection#onNetStatus: " + info); final Object code = info.get("code"); if (INetConnection.CONNECT_SUCCESS.equals(code)) { } else { disconnected = true; } } } // fields private volatile boolean disconnected = false; private INetStream stream = null; /** * Publishes the stream. * * @param connection * @param url * @param streamName * @param microphone microphone * @param camera camera */ public void publish(final INetConnection connection, final String url, final String streamName, final IMicrophone microphone, final ICamera camera) { connection.configuration().put(INetConnection.Configuration.INACTIVITY_TIMEOUT, -1); connection.configuration().put(INetConnection.Configuration.IO_TIMEOUT, 20 /*milliseconds*/); connection.configuration().put(INetConnection.Configuration.RECEIVE_BUFFER_SIZE, 256 * 1024); connection.configuration().put(INetConnection.Configuration.SEND_BUFFER_SIZE, 256 * 1024); connection.configuration().put(INetConnection.Configuration.ENABLE_MEDIA_STREAM_ABSOLUTE_TIMESTAMP, true); connection.addEventListener(new NetConnectionListener()); connection.connect(url); // wait till connected while (!connection.connected() && !disconnected) { try { Thread.sleep(100); } catch (Exception e) {/*ignore*/} } if (!disconnected) { final INetStream stream = connection.createNetStream(); stream.addEventListener(new INetStream.ListenerAdapter() { @Override public void onNetStatus(final INetStream source, final Map<String, Object> info) { System.out.println("Publisher#NetStream#onNetStatus: " + info); final Object code = info.get("code"); if (INetStream.PUBLISH_START.equals(code)) { if (microphone != null) { stream.attachAudio(microphone); } if (camera != null) { stream.attachCamera(camera, -1 /*snapshotMilliseconds*/); } } } }); stream.publish(streamName, INetStream.LIVE); this.stream = stream; } while (!disconnected) { try { Thread.sleep(100); } catch (Exception e) {/*ignore*/} } connection.close(); } /** * Sends a message on the published stream to all subscribing clients. * * @param handler handler name * @param args optional arguments that can be of any type * @return <code>true</code> if sent; otherwise <code>false</code> */ public boolean sendStreamMessage(final String handler, final Object... args) { if (stream == null) return false; stream.send(handler, args); return true; } } }